diff --git a/auto_cursor_service.py b/auto_cursor_service.py index 89e6753..b3e17f3 100644 --- a/auto_cursor_service.py +++ b/auto_cursor_service.py @@ -31,6 +31,9 @@ class AutoCursorService: self.config = Config.from_yaml() self.db_manager = DatabaseManager(self.config) + # 添加EmailManager实例,用于操作黑名单 + self.email_manager = None + # API相关 self.api_base_url = "https://cursorapi.nosqli.com/admin/api.AutoCursor" self.proxy = None @@ -94,6 +97,11 @@ class AutoCursorService: logger.info("初始化自动化服务") await self.db_manager.initialize() + # 初始化EmailManager + from services.email_manager import EmailManager + self.email_manager = EmailManager(self.config, self.db_manager) + await self.email_manager.initialize() + # 检查并设置代理 if hasattr(self.config, "proxy_config") and self.config.proxy_config: if hasattr(self.config.proxy_config, "api_proxy") and self.config.proxy_config.api_proxy: @@ -188,6 +196,12 @@ class AutoCursorService: return 0 count = 0 + blacklist_added = 0 + + # 如果有EmailManager,确保黑名单初始化 + if self.email_manager and self.email_manager.use_redis: + await self.email_manager._ensure_blacklist_initialized() + for account in accounts: try: email = account.get("email", "") @@ -199,6 +213,12 @@ class AutoCursorService: logger.warning(f"账号数据不完整: {account}") continue + # 检查邮箱是否已在黑名单中 + if self.email_manager and await self.email_manager.is_email_blacklisted(email): + logger.warning(f"跳过黑名单中的邮箱: {email}") + blacklist_added += 1 + continue + # 插入数据库 insert_query = ''' INSERT INTO email_accounts @@ -221,11 +241,19 @@ class AutoCursorService: except Exception as e: logger.error(f"导入邮箱账号时出错: {e}") + if blacklist_added > 0: + logger.warning(f"已跳过 {blacklist_added} 个黑名单中的邮箱") + logger.success(f"成功导入 {count} 个邮箱账号") return count async def count_pending_accounts(self) -> int: """统计可用的pending状态账号数量""" + if self.email_manager: + # 使用EmailManager的方法,确保黑名单一致性 + return await self.email_manager.count_pending_accounts() + + # 兼容旧代码,直接查询 query = """ SELECT COUNT(*) FROM email_accounts diff --git a/main.py b/main.py index e628e78..c2a316f 100644 --- a/main.py +++ b/main.py @@ -35,6 +35,8 @@ class CursorRegister: async def initialize(self): """初始化数据库""" await self.db_manager.initialize() + # 确保EmailManager完成初始化 + await self.email_manager.initialize() async def cleanup(self): """清理资源""" @@ -160,6 +162,10 @@ async def main(): register.logger.info(f"当前总进度: 已注册 {total_registered} 个账号") + # 批次结束后等待3秒,确保所有数据库更新都已完成 + register.logger.info("本批次注册完成,等待数据库状态完全更新...") + await asyncio.sleep(3) + # 如果本批次注册失败率过高,暂停一段时间 if successful < batch_size * 0.5 and successful > 0: # 成功率低于50%但不为零 register.logger.warning("本批次成功率过低,暂停60秒后继续") diff --git a/services/email_manager.py b/services/email_manager.py index a48cd45..14364a1 100644 --- a/services/email_manager.py +++ b/services/email_manager.py @@ -6,6 +6,7 @@ from typing import Dict, List, Optional import aiohttp from loguru import logger +import aiomysql from core.config import Config from core.database import DatabaseManager @@ -34,106 +35,360 @@ class EmailManager: "Verify your email address", "Complete code challenge", ] + # Redis相关配置 + self.use_redis = False + if hasattr(self.db, 'redis') and self.db.redis: + self.use_redis = True + logger.info("Redis可用,将使用Redis进行邮箱状态管理") + else: + logger.warning("Redis不可用,将使用MySQL进行邮箱状态管理") + + # Redis键前缀 + self.redis_prefix = "emailmanager:" + # 锁和黑名单的过期时间(秒) + self.lock_timeout = 600 # 10分钟 + self.blacklist_timeout = 86400 * 30 # 30天 + # 黑名单初始化标记 + self.blacklist_initialized = False + + async def initialize(self): + """初始化EmailManager,确保在使用前完成必要的设置""" + if self.use_redis: + await self._ensure_blacklist_initialized() + self.blacklist_initialized = True + + async def _ensure_blacklist_initialized(self): + """确保黑名单已经初始化""" + if not self.use_redis: + return False + + blacklist_key = f"{self.redis_prefix}blacklist:initialized" + initialized = await self.db.redis.exists(blacklist_key) + + if not initialized: + logger.info("初始化Redis邮箱黑名单...") + # 查询所有已成功或不可用的邮箱 + query = """ + SELECT DISTINCT email + FROM email_accounts + WHERE status = 'success' OR status = 'unavailable' + """ + results = await self.db.fetch_all(query) + + if results: + # 批量添加到黑名单 + blacklist_key = f"{self.redis_prefix}blacklist:emails" + emails = [row['email'] for row in results] + if emails: + pipeline = self.db.redis.pipeline() + for email in emails: + pipeline.sadd(blacklist_key, email) + pipeline.expire(blacklist_key, self.blacklist_timeout) + await pipeline.execute() + + logger.info(f"已将 {len(emails)} 个邮箱添加到黑名单") + + # 标记为已初始化 + await self.db.redis.setex(f"{self.redis_prefix}blacklist:initialized", self.blacklist_timeout, "1") + return True + + return True + + async def is_email_blacklisted(self, email: str) -> bool: + """检查邮箱是否在黑名单中""" + if not self.use_redis: + # 回退到数据库查询 + query = """ + SELECT 1 FROM email_accounts + WHERE email = %s AND (status = 'success' OR status = 'unavailable') + LIMIT 1 + """ + result = await self.db.fetch_one(query, (email,)) + return result is not None + + # 使用Redis SET存储黑名单 + blacklist_key = f"{self.redis_prefix}blacklist:emails" + return await self.db.redis.sismember(blacklist_key, email) + + async def add_email_to_blacklist(self, email: str): + """将邮箱添加到黑名单""" + if not self.use_redis: + return False + + blacklist_key = f"{self.redis_prefix}blacklist:emails" + await self.db.redis.sadd(blacklist_key, email) + await self.db.redis.expire(blacklist_key, self.blacklist_timeout) + logger.debug(f"邮箱 {email} 已添加到黑名单") + return True + + async def lock_account(self, account_id: int) -> bool: + """锁定账号""" + if not self.use_redis: + # 如果不使用Redis,通过数据库更新来锁定 + try: + query = """ + UPDATE email_accounts + SET in_use = 1, updated_at = CURRENT_TIMESTAMP + WHERE id = %s AND in_use = 0 + """ + affected = await self.db.execute(query, (account_id,)) + return affected > 0 + except Exception as e: + logger.error(f"通过数据库锁定账号 {account_id} 失败: {e}") + return False + + # 使用Redis实现分布式锁 + lock_key = f"{self.redis_prefix}lock:account:{account_id}" + locked = await self.db.redis.setnx(lock_key, "1") + if locked: + # 锁定成功,设置过期时间 + await self.db.redis.expire(lock_key, self.lock_timeout) + + # 同时更新数据库状态 + try: + update_query = """ + UPDATE email_accounts + SET in_use = 1, updated_at = CURRENT_TIMESTAMP + WHERE id = %s + """ + await self.db.execute(update_query, (account_id,)) + except Exception as e: + # 如果数据库更新失败,释放Redis锁 + logger.error(f"锁定账号 {account_id} 后更新数据库失败: {e}") + await self.db.redis.delete(lock_key) + return False + + logger.debug(f"账号 {account_id} 已锁定") + return locked + + async def unlock_account(self, account_id: int) -> bool: + """解锁账号""" + # 无论是否使用Redis,都更新数据库 + try: + update_query = """ + UPDATE email_accounts + SET in_use = 0, updated_at = CURRENT_TIMESTAMP + WHERE id = %s + """ + await self.db.execute(update_query, (account_id,)) + except Exception as e: + logger.error(f"解锁账号 {account_id} 更新数据库失败: {e}") + + if not self.use_redis: + return True + + # 删除Redis锁 + lock_key = f"{self.redis_prefix}lock:account:{account_id}" + deleted = await self.db.redis.delete(lock_key) + logger.debug(f"账号 {account_id} 锁已释放") + return deleted > 0 async def batch_get_accounts(self, num: int) -> List[EmailAccount]: """批量获取未使用的邮箱账号""" logger.info(f"尝试获取 {num} 个未使用的邮箱账号") - - # 使用简单条件获取未使用的账号 + + # 如果使用Redis,确保黑名单已初始化 + if self.use_redis: + await self._ensure_blacklist_initialized() + + # 1. 先从数据库中获取候选账号 select_query = """ SELECT id, email, password, client_id, refresh_token FROM email_accounts WHERE in_use = 0 AND sold = 0 AND status = 'pending' - AND email NOT IN ( - SELECT email FROM email_accounts - WHERE status = 'success' OR status = 'unavailable' - ) LIMIT %s """ - accounts = await self.db.fetch_all(select_query, (num,)) + # 多获取一些候选账号,防止有些被排除 + candidate_accounts = await self.db.fetch_all(select_query, (num * 2,)) - if not accounts: - logger.debug("没有找到符合条件的账号") + if not candidate_accounts: + logger.debug("没有找到符合条件的候选账号") return [] - - # 2. 提取账号ID列表 - account_ids = [account['id'] for account in accounts] - - # 3. 更新这些账号的状态,设置注册开始时间戳,避免其他进程获取 - if account_ids: - placeholders = ', '.join(['%s' for _ in account_ids]) - update_query = f""" - UPDATE email_accounts - SET in_use = 1, updated_at = CURRENT_TIMESTAMP - WHERE id IN ({placeholders}) - """ - await self.db.execute(update_query, tuple(account_ids)) - # 记录被锁定的账号信息 - logger.info(f"已锁定 {len(account_ids)} 个账号用于注册: {account_ids}") + logger.debug(f"找到 {len(candidate_accounts)} 个候选账号") - # 4. 返回账号数据 - logger.debug(f"实际获取到 {len(accounts)} 个账号") - return [ - EmailAccount( - id=row['id'], - email=row['email'], - password=row['password'], - client_id=row['client_id'], - refresh_token=row['refresh_token'], - in_use=True - ) - for row in accounts - ] + # 2. 筛选并锁定账号 + result_accounts = [] + for account in candidate_accounts: + # 检查邮箱是否在黑名单中 + if await self.is_email_blacklisted(account['email']): + logger.debug(f"邮箱 {account['email']} 在黑名单中,跳过") + continue + + # 尝试锁定账号 + if await self.lock_account(account['id']): + # 添加到结果列表 + result_accounts.append(EmailAccount( + id=account['id'], + email=account['email'], + password=account['password'], + client_id=account['client_id'], + refresh_token=account['refresh_token'], + in_use=True + )) + + # 如果已经获取足够的账号,退出循环 + if len(result_accounts) >= num: + break + else: + logger.debug(f"账号 {account['id']} 锁定失败,可能被其他进程使用") + + logger.info(f"实际获取到 {len(result_accounts)} 个可用账号") + + # 如果账号数量不足,尝试清理长时间锁定但未更新的账号 + if len(result_accounts) < num and len(result_accounts) < len(candidate_accounts): + logger.warning("可用账号不足,尝试清理长时间锁定的账号") + await self._cleanup_stuck_accounts() + + return result_accounts + + async def _cleanup_stuck_accounts(self): + """清理长时间锁定但未更新的账号""" + try: + # 清理超过30分钟未更新且仍标记为in_use=1的账号 + cleanup_query = """ + UPDATE email_accounts + SET in_use = 0 + WHERE in_use = 1 + AND updated_at < DATE_SUB(NOW(), INTERVAL 30 MINUTE) + """ + affected = await self.db.execute(cleanup_query) + if affected > 0: + logger.info(f"已清理 {affected} 个长时间锁定的账号") + + # 如果使用Redis,同时清理对应的锁 + if self.use_redis: + # 清理可能存在的Redis锁,但这需要知道具体的account_id + # 这里简化处理,依赖锁的自动过期机制 + pass + except Exception as e: + logger.error(f"清理账号时出错: {e}") async def update_account_status(self, account_id: int, status: str): """更新账号状态""" - query = ''' - UPDATE email_accounts - SET - status = %s, - in_use = 0, - updated_at = CURRENT_TIMESTAMP - WHERE id = %s - ''' - await self.db.execute(query, (status, account_id)) + try: + # 获取账号邮箱信息(用于黑名单) + account_query = "SELECT email, status as current_status FROM email_accounts WHERE id = %s" + account_info = await self.db.fetch_one(account_query, (account_id,)) + + if not account_info: + logger.error(f"账号 {account_id} 不存在") + return + + # 检查状态变更是否合理 + current_status = account_info.get('current_status') + if current_status == 'success' and status != 'success': + logger.warning(f"警告: 尝试将成功账号 {account_id} 状态改为 {status},这可能是不正确的操作") + # 如果已经是success状态,不允许降级为其他状态 + # return + + # 更新数据库状态 + query = ''' + UPDATE email_accounts + SET + status = %s, + in_use = 0, + updated_at = CURRENT_TIMESTAMP + WHERE id = %s + ''' + await self.db.execute(query, (status, account_id)) + logger.info(f"账号 {account_id} 状态已更新为 {status}") + + # 如果是success或unavailable状态,添加到黑名单 + if account_info and (status == 'success' or status == 'unavailable'): + email = account_info['email'] + logger.debug(f"将邮箱 {email} 添加到黑名单 (状态: {status})") + await self.add_email_to_blacklist(email) + + # 解锁账号 + await self.unlock_account(account_id) + + # 清除数据库缓存 + if self.db.redis: + await self.db.clear_cache("db:*email_accounts*") + + except Exception as e: + logger.error(f"更新账号 {account_id} 状态为 {status} 时出错: {e}") + # 确保无论如何都解锁账号 + try: + await self.unlock_account(account_id) + except Exception as unlock_error: + logger.error(f"尝试解锁账号 {account_id} 时出错: {unlock_error}") + raise async def update_account(self, account_id: int, cursor_password: str, cursor_cookie: str, cursor_token: str): - """更新账号信息""" - query = ''' - UPDATE email_accounts - SET - cursor_password = %s, - cursor_cookie = %s, - cursor_token = %s, - in_use = 0, - sold = 1, - status = 'success', - updated_at = CURRENT_TIMESTAMP - WHERE id = %s - ''' - await self.db.execute(query, (cursor_password, cursor_cookie, cursor_token, account_id)) + """更新账号信息(注册成功)""" + try: + # 获取账号邮箱信息(用于黑名单) + account_query = "SELECT email FROM email_accounts WHERE id = %s" + account_info = await self.db.fetch_one(account_query, (account_id,)) + + # 更新数据库 + query = ''' + UPDATE email_accounts + SET + cursor_password = %s, + cursor_cookie = %s, + cursor_token = %s, + in_use = 0, + sold = 1, + status = 'success', + updated_at = CURRENT_TIMESTAMP + WHERE id = %s + ''' + await self.db.execute(query, (cursor_password, cursor_cookie, cursor_token, account_id)) + logger.info(f"账号 {account_id} 更新为注册成功") + + # 添加到黑名单 + if account_info: + email = account_info['email'] + logger.debug(f"将邮箱 {email} 添加到黑名单 (注册成功)") + await self.add_email_to_blacklist(email) + + # 解锁账号 + await self.unlock_account(account_id) + + # 清除数据库缓存 + if self.db.redis: + await self.db.clear_cache("db:*email_accounts*") + + except Exception as e: + logger.error(f"更新账号 {account_id} 信息时出错: {e}") + # 确保无论如何都解锁账号 + try: + await self.unlock_account(account_id) + except: + pass + raise async def release_account(self, account_id: int): """释放账号""" - query = ''' - UPDATE email_accounts - SET in_use = 0, updated_at = CURRENT_TIMESTAMP - WHERE id = %s - ''' - await self.db.execute(query, (account_id,)) + try: + await self.unlock_account(account_id) + logger.info(f"账号 {account_id} 已释放") + + # 清除数据库缓存 + if self.db.redis: + await self.db.clear_cache("db:*email_accounts*") + + except Exception as e: + logger.error(f"释放账号 {account_id} 时出错: {e}") + raise async def count_pending_accounts(self) -> int: """统计可用的pending状态账号数量""" + if self.use_redis: + await self._ensure_blacklist_initialized() + query = """ SELECT COUNT(*) FROM email_accounts WHERE status = 'pending' AND in_use = 0 AND sold = 0 - AND email NOT IN ( - SELECT email FROM email_accounts - WHERE status = 'success' OR status = 'unavailable' - ) """ + + # 注:这里不使用黑名单过滤,因为数据量可能很大, + # 但实际获取账号时会应用黑名单过滤 + result = await self.db.fetch_one(query) if result: return result.get("COUNT(*)", 0)