import asyncio import sys from typing import List from loguru import logger from core.config import Config from core.database import DatabaseManager from core.logger import setup_logger from register.register_worker import RegisterWorker from services.email_manager import EmailAccount, EmailManager from services.fetch_manager import FetchManager from services.proxy_pool import ProxyPool from services.token_pool import TokenPool class FailedAccountsRetry: def __init__(self): self.config = Config.from_yaml() self.logger = setup_logger(self.config) self.db_manager = DatabaseManager(self.config) self.fetch_manager = FetchManager(self.config) self.proxy_pool = ProxyPool(self.config, self.fetch_manager) self.token_pool = TokenPool(self.config) self.email_manager = EmailManager(self.config, self.db_manager) self.register_worker = RegisterWorker( self.config, self.fetch_manager, self.email_manager ) async def initialize(self): """初始化数据库""" await self.db_manager.initialize() async def cleanup(self): """清理资源""" await self.db_manager.cleanup() async def get_failed_accounts(self, limit: int = 0) -> List[EmailAccount]: """获取注册失败的账号""" async with self.db_manager.get_connection() as conn: # 将limit=0视为无限制 limit_clause = f"LIMIT {limit}" if limit > 0 else "" # 查询所有失败的账号 cursor = await conn.execute( f""" SELECT id, email, password, client_id, refresh_token, status FROM email_accounts WHERE status = 'failed' ORDER BY id DESC {limit_clause} """ ) rows = await cursor.fetchall() accounts = [] for row in rows: account = EmailAccount( id=row[0], email=row[1], password=row[2], client_id=row[3], refresh_token=row[4], status=row[5] ) accounts.append(account) return accounts async def update_account_status(self, account_id: int, status: str): """更新账号状态""" try: async with self.db_manager.get_connection() as conn: await conn.execute( """ UPDATE email_accounts SET status = ?, updated_at = CURRENT_TIMESTAMP WHERE id = ? """, (status, account_id) ) await conn.commit() self.logger.debug(f"账号 ID {account_id} 状态已更新为: {status}") except Exception as e: self.logger.error(f"更新账号状态失败: {e}") async def retry_account(self, account: EmailAccount) -> bool: """重试一个失败的账号""" self.logger.info(f"开始重试账号: {account.email}") try: # 先将状态改为处理中 await self.update_account_status(account.id, 'retrying') # 获取代理 proxy = (await self.proxy_pool.batch_get(1))[0] # 获取token对 token_pair = (await self.token_pool.batch_generate(1))[0] # 执行注册 result = await self.register_worker.register(proxy, token_pair, account) # 处理结果 if result is None: self.logger.warning(f"账号 {account.email} 被跳过") await self.update_account_status(account.id, 'unavailable') return False # 详细记录结果内容,用于调试 self.logger.debug(f"注册结果: {result}") # 检查必要字段是否存在 required_fields = ['account_id', 'cursor_password', 'cursor_cookie'] missing_fields = [field for field in required_fields if field not in result] if missing_fields: self.logger.error(f"注册结果缺少必要字段: {missing_fields}") await self.update_account_status(account.id, 'failed') return False # 检查值是否有效 if not result['cursor_password'] or not result['cursor_cookie']: self.logger.error(f"注册结果包含空值: password={bool(result['cursor_password'])}, cookie={bool(result['cursor_cookie'])}") await self.update_account_status(account.id, 'failed') return False # 更新数据库 try: await self.email_manager.update_account( result['account_id'], result['cursor_password'], result['cursor_cookie'], result.get('cursor_jwt', '') # 添加cursor_jwt参数,如果不存在则使用空字符串 ) self.logger.success(f"账号数据更新成功: {account.email}") except Exception as update_error: self.logger.error(f"更新账号数据失败: {update_error}") await self.update_account_status(account.id, 'failed') return False self.logger.success(f"账号 {account.email} 重试成功") return True except Exception as e: self.logger.error(f"账号 {account.email} 重试失败: {str(e)}") await self.update_account_status(account.id, 'failed') return False async def retry_batch(self, batch_size: int): """批量重试失败的账号""" # 获取所有失败的账号 failed_accounts = await self.get_failed_accounts(batch_size) if not failed_accounts: self.logger.info("没有找到失败的账号") return self.logger.info(f"找到 {len(failed_accounts)} 个失败的账号,开始重试") # 并发重试 tasks = [self.retry_account(account) for account in failed_accounts] results = await asyncio.gather(*tasks, return_exceptions=True) # 统计结果 success_count = sum(1 for r in results if r is True) error_count = sum(1 for r in results if isinstance(r, Exception)) failed_count = len(failed_accounts) - success_count - error_count self.logger.info(f"重试完成: 成功 {success_count}, 失败 {failed_count}, 错误 {error_count}") async def main(): # 解析命令行参数 batch_size = 10 # 默认批次大小 if len(sys.argv) > 1: try: batch_size = int(sys.argv[1]) except ValueError: print(f"参数错误: 批次大小必须是整数,将使用默认值 {batch_size}") # 初始化 retry = FailedAccountsRetry() await retry.initialize() try: # 执行重试 await retry.retry_batch(batch_size) except Exception as e: logger.error(f"程序执行出错: {str(e)}") finally: await retry.cleanup() if __name__ == "__main__": asyncio.run(main())