diff --git a/auto_cursor_service.py b/auto_cursor_service.py index 6e51c9a..4c0dddf 100644 --- a/auto_cursor_service.py +++ b/auto_cursor_service.py @@ -11,6 +11,8 @@ import json import signal import subprocess import time +import random +import string from datetime import datetime from typing import Dict, List, Optional, Tuple, Any @@ -23,21 +25,36 @@ if sys.platform.startswith("win"): from core.config import Config from core.database import DatabaseManager -from import_emails import import_emails +from services.fetch_manager import FetchManager +from services.self_hosted_email import SelfHostedEmail +from services.proxy_pool import ProxyPool class AutoCursorService: def __init__(self): self.config = Config.from_yaml() self.db_manager = DatabaseManager(self.config) - - # 添加EmailManager实例,用于操作黑名单 - self.email_manager = None + self.fetch_manager = FetchManager(self.config) # API相关 self.api_base_url = "https://cursorapi.nosqli.com/admin/api.AutoCursor" self.proxy = None + # 初始化自建邮箱服务 + self.self_hosted_email = None + if hasattr(self.config, 'self_hosted_email_config') and self.config.self_hosted_email_config: + self.self_hosted_email = SelfHostedEmail( + self.fetch_manager, + self.config.self_hosted_email_config.api_base_url, + self.config.self_hosted_email_config.api_key + ) + logger.info("自建邮箱服务已初始化") + else: + logger.warning("未配置自建邮箱服务,部分功能可能不可用") + + # 初始化代理池 + self.proxy_pool = ProxyPool(self.config, self.fetch_manager) + # 获取hostname,用于API请求参数 self.hostname = getattr(self.config, "hostname", None) if not self.hostname: @@ -59,12 +76,12 @@ class AutoCursorService: self.check_interval = getattr(auto_service_config, "check_interval", 60) self.upload_interval = getattr(auto_service_config, "upload_interval", 300) self.email_check_threshold = getattr(auto_service_config, "email_check_threshold", 30) - self.email_fetch_count = getattr(auto_service_config, "email_fetch_count", 2) + self.email_batch_size = getattr(auto_service_config, "email_batch_size", 10) else: self.check_interval = 60 # 检查API状态的间隔(秒) self.upload_interval = 300 # 上传账号间隔(秒) self.email_check_threshold = 30 # 当可用邮箱少于这个数时获取新邮箱 - self.email_fetch_count = 2 # 获取邮箱的次数(每次15个) + self.email_batch_size = 10 # 每次获取邮箱的数量 # 处理邮箱的最小数量阈值,只要有这么多邮箱就会立即处理 self.min_email_to_process = 1 @@ -97,11 +114,6 @@ class AutoCursorService: logger.info("初始化自动化服务") await self.db_manager.initialize() - # 初始化EmailManager - from services.email_manager import EmailManager - self.email_manager = EmailManager(self.config, self.db_manager) - await self.email_manager.initialize() - # 检查并设置代理 if hasattr(self.config, "proxy_config") and self.config.proxy_config: if hasattr(self.config.proxy_config, "api_proxy") and self.config.proxy_config.api_proxy: @@ -153,171 +165,63 @@ class AutoCursorService: logger.error(f"检查注册状态时出错: {e}") return self.reg_enabled # 出错时保持当前状态 - async def fetch_email_accounts(self) -> List[Dict[str, str]]: - """从API获取邮箱账号 - - Returns: - List[Dict[str, str]]: 邮箱账号列表 - """ - url = f"{self.api_base_url}/getemailaccount" - params = {"hostname": self.hostname} - - try: - async with aiohttp.ClientSession() as session: - async with session.get(url, params=params, proxy=self.proxy, ssl=False) as response: - if response.status != 200: - logger.error(f"获取邮箱API请求失败,状态码: {response.status}") - return [] - - data = await response.json() - if data.get("code") != 0: - logger.error(f"获取邮箱API返回错误: {data.get('msg', 'Unknown error')}") - return [] - - accounts = data.get("data", {}).get("accounts", []) - logger.info(f"成功获取 {len(accounts)} 个邮箱账号") - return accounts - - except Exception as e: - logger.error(f"获取邮箱账号时出错: {e}") - return [] - - async def import_email_accounts(self, accounts: List[Dict[str, str]]) -> int: - """导入邮箱账号到数据库 + async def fetch_email_accounts(self, count: int = 10) -> List[Dict[str, str]]: + """使用自建邮箱服务获取邮箱账号 Args: - accounts: 邮箱账号列表 + count: 需要获取的邮箱数量 Returns: - int: 成功导入的账号数量 + List[Dict[str, str]]: 邮箱账号列表,每个账号包含email和password字段 """ - if not accounts: - logger.warning("没有邮箱账号可导入") - return 0 + if not self.self_hosted_email: + logger.error("自建邮箱服务未初始化,无法获取邮箱") + return [] - count = 0 - blacklist_added = 0 - - # 如果有EmailManager,确保黑名单初始化 - if self.email_manager and self.email_manager.use_redis: - await self.email_manager._ensure_blacklist_initialized() - - for account in accounts: + result = [] + for _ in range(count): try: - email = account.get("email", "") - password = account.get("password", "") - client_id = account.get("client_id", "") - refresh_token = account.get("refresh_token", "") - - if not (email and password and client_id and refresh_token): - logger.warning(f"账号数据不完整: {account}") + # 获取一个邮箱地址 + email = await self.self_hosted_email.get_email() + if not email: + logger.warning("获取邮箱失败,跳过") continue - # 检查邮箱是否已在黑名单中 - if self.email_manager and await self.email_manager.is_email_blacklisted(email): - logger.warning(f"跳过黑名单中的邮箱: {email}") - blacklist_added += 1 - continue + # 生成随机密码 + password = ''.join(random.choices(string.ascii_letters + string.digits, k=12)) - # 插入数据库 - insert_query = ''' - INSERT INTO email_accounts - (email, password, client_id, refresh_token, status) - VALUES (%s, %s, %s, %s, 'pending') - ON DUPLICATE KEY UPDATE - password = VALUES(password), - client_id = VALUES(client_id), - refresh_token = VALUES(refresh_token), - status = 'pending', - updated_at = CURRENT_TIMESTAMP - ''' + # 添加到结果列表 + account = { + "email": email, + "password": password, + "client_id": "", # 如果有需要可以生成 + "refresh_token": "" # 如果有需要可以生成 + } + result.append(account) + logger.info(f"已获取邮箱: {email}") - await self.db_manager.execute( - insert_query, - (email, password, client_id, refresh_token) - ) - count += 1 + # 每次获取后等待一小段时间,避免请求过快 + await asyncio.sleep(0.5) except Exception as e: - logger.error(f"导入邮箱账号时出错: {e}") + logger.error(f"获取邮箱时出错: {e}") + await asyncio.sleep(1) # 出错后等待较长时间 - if blacklist_added > 0: - logger.warning(f"已跳过 {blacklist_added} 个黑名单中的邮箱") + logger.info(f"本次共获取 {len(result)} 个邮箱账号") + return result + + async def import_self_hosted_emails(self, count: int = 10) -> List[Dict[str, str]]: + """获取并导入自建邮箱 + + Args: + count: 要获取的邮箱数量 - logger.success(f"成功导入 {count} 个邮箱账号") - return count - - async def count_pending_accounts(self) -> int: - """统计可用的pending状态账号数量""" - if self.email_manager: - # 使用EmailManager的方法,确保黑名单一致性 - return await self.email_manager.count_pending_accounts() - - # 兼容旧代码,直接查询 - query = """ - SELECT COUNT(*) - FROM email_accounts - WHERE status = 'pending' AND in_use = 0 AND sold = 0 - """ - result = await self.db_manager.fetch_one(query) - if result: - return result.get("COUNT(*)", 0) - return 0 - - async def check_and_fetch_emails(self) -> int: - """检查并获取邮箱账号(如果需要) - Returns: - int: 获取的邮箱账号数量 + List[Dict[str, str]]: 获取的邮箱列表 """ - # 检查当前可用邮箱数量 - pending_count = await self.count_pending_accounts() - logger.info(f"当前可用邮箱数量: {pending_count}") - - if pending_count >= self.email_check_threshold: - logger.info(f"可用邮箱数量充足 ({pending_count} >= {self.email_check_threshold})") - # 确保注册进程已启动 - if pending_count >= self.min_email_to_process and self.reg_enabled: - if not self.registration_process or self.registration_process.poll() is not None: - logger.info("有足够邮箱但注册进程未运行,启动注册进程") - await self.start_registration_process() - return 0 - - # 需要获取新邮箱 - logger.info(f"可用邮箱不足 ({pending_count} < {self.email_check_threshold}),准备获取新邮箱") - total_imported = 0 - - for i in range(self.email_fetch_count): - accounts = await self.fetch_email_accounts() - if not accounts: - logger.warning(f"第 {i+1} 次获取邮箱失败或无可用邮箱") - break - - imported = await self.import_email_accounts(accounts) - total_imported += imported - - # 导入后等待一秒,确保数据库状态完全更新 - logger.debug("等待数据库状态更新...") - await asyncio.sleep(2) - - # 每次获取到新邮箱并成功导入后,立即确保注册进程在运行 - if imported > 0 and self.reg_enabled: - # 重新查询一次确保数据是最新的 - pending_count = await self.count_pending_accounts() - if pending_count >= self.min_email_to_process: - logger.info(f"已获取到 {imported} 个新邮箱,总可用邮箱数 {pending_count},确保注册进程运行") - if not self.registration_process or self.registration_process.poll() is not None: - await self.start_registration_process() - - if imported < 15: # 每次API应返回15个账号 - logger.warning(f"获取到的邮箱少于预期 ({imported} < 15),可能没有更多邮箱可用") - break - - if i < self.email_fetch_count - 1: - # 在多次请求之间添加延迟 - await asyncio.sleep(2) - - return total_imported + # 直接获取邮箱列表,不再需要导入到数据库 + emails = await self.fetch_email_accounts(count) + return emails async def start_registration_process(self): """启动注册进程""" @@ -326,16 +230,7 @@ class AutoCursorService: logger.info("注册进程已在运行中") return - # 启动前等待1秒,确保数据库状态已更新 - await asyncio.sleep(1) - - # 再次检查可用邮箱数量,确保使用最新状态 - pending_count = await self.count_pending_accounts() - if pending_count < self.min_email_to_process: - logger.warning(f"可用邮箱数量不足 ({pending_count} < {self.min_email_to_process}),暂不启动注册进程") - return - - logger.info(f"有 {pending_count} 个可用邮箱,启动注册进程") + logger.info("启动注册进程") try: # 获取配置中的batch_size,确保并发注册 batch_size = 1 # 默认值 @@ -447,55 +342,29 @@ class AutoCursorService: """运行服务主循环""" logger.info("启动Cursor自动化服务") + # 设置永久开启注册 + self.reg_enabled = True + logger.info("注册功能已永久开启") + last_upload_time = 0 while self.running: try: - # 1. 检查注册API状态 - current_status = await self.check_registration_status() + # 1. 获取邮箱 + logger.info("准备获取自建邮箱") + emails = await self.import_self_hosted_emails(self.email_batch_size) + if emails: + logger.info(f"成功获取 {len(emails)} 个自建邮箱") - # 状态变化处理 - if current_status != self.reg_enabled: - self.reg_enabled = current_status - logger.info(f"注册状态变化为: {'开启' if self.reg_enabled else '关闭'}") - - # 状态变化后等待2秒,确保数据库状态稳定 - await asyncio.sleep(2) - - if self.reg_enabled: - # 开启注册时,先检查并获取邮箱 - await self.check_and_fetch_emails() - # 检查是否有可用邮箱,有则启动注册进程 - pending_count = await self.count_pending_accounts() - if pending_count >= self.min_email_to_process: - logger.info(f"有 {pending_count} 个可用邮箱,启动注册进程") - await self.start_registration_process() + # 2. 确保注册进程正在运行 + if not self.registration_process or self.registration_process.poll() is not None: + if emails: + logger.info(f"有 {len(emails)} 个可用邮箱,启动注册进程") + await self.start_registration_process() else: - # 关闭注册时,停止注册进程 - self.stop_registration_process() + logger.warning("没有可用邮箱,暂不启动注册进程") - # 等待一会,确保上一步操作完全完成 - await asyncio.sleep(1) - - # 2. 如果注册已开启,检查并获取邮箱 - if self.reg_enabled: - await self.check_and_fetch_emails() - # 确保注册进程正在运行 - if not self.registration_process or self.registration_process.poll() is not None: - pending_count = await self.count_pending_accounts() - if pending_count >= self.min_email_to_process: - logger.info(f"发现 {pending_count} 个未处理的邮箱,重新启动注册进程") - await self.start_registration_process() - - # 3. 定期上传账号 - current_time = time.time() - if current_time - last_upload_time >= self.upload_interval: - await self.upload_accounts() - last_upload_time = current_time - # 上传后等待,确保数据库状态更新 - await asyncio.sleep(2) - - # 4. 等待下一次检查 + # 3. 等待下一次检查 logger.debug(f"等待 {self.check_interval} 秒后进行下一次检查") for _ in range(self.check_interval): if not self.running: @@ -526,7 +395,7 @@ async def main(): logger.remove() logger.add(sys.stderr, level="INFO") logger.add( - "auto_cursor_service.log", + "logs/auto_cursor_service.log", rotation="50 MB", retention="10 days", level="DEBUG",