Files
auto_cursor_online/retry_failed.py
2025-03-31 09:55:54 +08:00

200 lines
7.4 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import asyncio
import sys
from typing import List
from loguru import logger
from core.config import Config
from core.database import DatabaseManager
from core.logger import setup_logger
from register.register_worker import RegisterWorker
from services.email_manager import EmailAccount, EmailManager
from services.fetch_manager import FetchManager
from services.proxy_pool import ProxyPool
from services.token_pool import TokenPool
class FailedAccountsRetry:
def __init__(self):
self.config = Config.from_yaml()
self.logger = setup_logger(self.config)
self.db_manager = DatabaseManager(self.config)
self.fetch_manager = FetchManager(self.config)
self.proxy_pool = ProxyPool(self.config, self.fetch_manager)
self.token_pool = TokenPool(self.config)
self.email_manager = EmailManager(self.config, self.db_manager)
self.register_worker = RegisterWorker(
self.config,
self.fetch_manager,
self.email_manager
)
async def initialize(self):
"""初始化数据库"""
await self.db_manager.initialize()
async def cleanup(self):
"""清理资源"""
await self.db_manager.cleanup()
async def get_failed_accounts(self, limit: int = 0) -> List[EmailAccount]:
"""获取注册失败的账号"""
async with self.db_manager.get_connection() as conn:
# 将limit=0视为无限制
limit_clause = f"LIMIT {limit}" if limit > 0 else ""
# 查询所有失败的账号
cursor = await conn.execute(
f"""
SELECT id, email, password, client_id, refresh_token, status
FROM email_accounts
WHERE status = 'failed'
ORDER BY id DESC
{limit_clause}
"""
)
rows = await cursor.fetchall()
accounts = []
for row in rows:
account = EmailAccount(
id=row[0],
email=row[1],
password=row[2],
client_id=row[3],
refresh_token=row[4],
status=row[5]
)
accounts.append(account)
return accounts
async def update_account_status(self, account_id: int, status: str):
"""更新账号状态"""
try:
async with self.db_manager.get_connection() as conn:
await conn.execute(
"""
UPDATE email_accounts
SET status = ?, updated_at = CURRENT_TIMESTAMP
WHERE id = ?
""",
(status, account_id)
)
await conn.commit()
self.logger.debug(f"账号 ID {account_id} 状态已更新为: {status}")
except Exception as e:
self.logger.error(f"更新账号状态失败: {e}")
async def retry_account(self, account: EmailAccount) -> bool:
"""重试一个失败的账号"""
self.logger.info(f"开始重试账号: {account.email}")
try:
# 先将状态改为处理中
await self.update_account_status(account.id, 'retrying')
# 获取代理
proxy = (await self.proxy_pool.batch_get(1))[0]
# 获取token对
token_pair = (await self.token_pool.batch_generate(1))[0]
# 执行注册
result = await self.register_worker.register(proxy, token_pair, account)
# 处理结果
if result is None:
self.logger.warning(f"账号 {account.email} 被跳过")
await self.update_account_status(account.id, 'unavailable')
return False
# 详细记录结果内容,用于调试
self.logger.debug(f"注册结果: {result}")
# 检查必要字段是否存在
required_fields = ['account_id', 'cursor_password', 'cursor_cookie']
missing_fields = [field for field in required_fields if field not in result]
if missing_fields:
self.logger.error(f"注册结果缺少必要字段: {missing_fields}")
await self.update_account_status(account.id, 'failed')
return False
# 检查值是否有效
if not result['cursor_password'] or not result['cursor_cookie']:
self.logger.error(f"注册结果包含空值: password={bool(result['cursor_password'])}, cookie={bool(result['cursor_cookie'])}")
await self.update_account_status(account.id, 'failed')
return False
# 更新数据库
try:
await self.email_manager.update_account(
result['account_id'],
result['cursor_password'],
result['cursor_cookie'],
result.get('cursor_jwt', '') # 添加cursor_jwt参数如果不存在则使用空字符串
)
self.logger.success(f"账号数据更新成功: {account.email}")
except Exception as update_error:
self.logger.error(f"更新账号数据失败: {update_error}")
await self.update_account_status(account.id, 'failed')
return False
self.logger.success(f"账号 {account.email} 重试成功")
return True
except Exception as e:
self.logger.error(f"账号 {account.email} 重试失败: {str(e)}")
await self.update_account_status(account.id, 'failed')
return False
async def retry_batch(self, batch_size: int):
"""批量重试失败的账号"""
# 获取所有失败的账号
failed_accounts = await self.get_failed_accounts(batch_size)
if not failed_accounts:
self.logger.info("没有找到失败的账号")
return
self.logger.info(f"找到 {len(failed_accounts)} 个失败的账号,开始重试")
# 并发重试
tasks = [self.retry_account(account) for account in failed_accounts]
results = await asyncio.gather(*tasks, return_exceptions=True)
# 统计结果
success_count = sum(1 for r in results if r is True)
error_count = sum(1 for r in results if isinstance(r, Exception))
failed_count = len(failed_accounts) - success_count - error_count
self.logger.info(f"重试完成: 成功 {success_count}, 失败 {failed_count}, 错误 {error_count}")
async def main():
# 解析命令行参数
batch_size = 10 # 默认批次大小
if len(sys.argv) > 1:
try:
batch_size = int(sys.argv[1])
except ValueError:
print(f"参数错误: 批次大小必须是整数,将使用默认值 {batch_size}")
# 初始化
retry = FailedAccountsRetry()
await retry.initialize()
try:
# 执行重试
await retry.retry_batch(batch_size)
except Exception as e:
logger.error(f"程序执行出错: {str(e)}")
finally:
await retry.cleanup()
if __name__ == "__main__":
asyncio.run(main())