#!/usr/bin/env python3 """ Cursor自动化服务 - 监控API,控制注册进程 - 自动获取邮箱账号 - 自动上传注册成功的账号 """ import asyncio import sys import json import signal import subprocess import time from datetime import datetime from typing import Dict, List, Optional, Tuple, Any import aiohttp from loguru import logger # Windows平台特殊处理,强制使用SelectorEventLoop if sys.platform.startswith("win"): asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy()) from core.config import Config from core.database import DatabaseManager from import_emails import import_emails class AutoCursorService: def __init__(self): self.config = Config.from_yaml() self.db_manager = DatabaseManager(self.config) # 添加EmailManager实例,用于操作黑名单 self.email_manager = None # API相关 self.api_base_url = "https://cursorapi.nosqli.com/admin/api.AutoCursor" self.proxy = None # 获取hostname,用于API请求参数 self.hostname = getattr(self.config, "hostname", None) if not self.hostname: # 尝试从config.yaml中的server_config获取 server_config = getattr(self.config, "server_config", None) if server_config: self.hostname = getattr(server_config, "hostname", "unknown") else: self.hostname = "unknown" logger.warning(f"未在配置中找到hostname,使用默认值: {self.hostname}") # 子进程控制 self.registration_process = None self.reg_enabled = False # 从配置中获取自动服务参数 auto_service_config = getattr(self.config, "auto_service_config", None) if auto_service_config: self.check_interval = getattr(auto_service_config, "check_interval", 60) self.upload_interval = getattr(auto_service_config, "upload_interval", 60) self.email_check_threshold = getattr(auto_service_config, "email_check_threshold", 30) self.email_fetch_count = getattr(auto_service_config, "email_fetch_count", 2) else: self.check_interval = 60 # 检查API状态的间隔(秒) self.upload_interval = 60 # 上传账号间隔(秒) self.email_check_threshold = 30 # 当可用邮箱少于这个数时获取新邮箱 self.email_fetch_count = 2 # 获取邮箱的次数(每次15个) # 处理邮箱的最小数量阈值,只要有1个邮箱就会立即处理 self.min_email_to_process = 1 # 运行状态控制 self.running = True # 设置信号处理 signal.signal(signal.SIGINT, self._handle_signal) signal.signal(signal.SIGTERM, self._handle_signal) def _handle_signal(self, signum, frame): """处理信号,优雅关闭服务""" logger.info(f"收到信号 {signum},准备关闭服务") self.running = False # 确保这不是在异步上下文中调用的 if self.registration_process: logger.info("正在终止注册进程...") try: if sys.platform.startswith("win"): subprocess.run(["taskkill", "/F", "/T", "/PID", str(self.registration_process.pid)]) else: self.registration_process.terminate() except Exception as e: logger.error(f"终止注册进程时出错: {e}") async def initialize(self): """初始化服务""" logger.info("初始化自动化服务") await self.db_manager.initialize() # 初始化EmailManager from services.email_manager import EmailManager self.email_manager = EmailManager(self.config, self.db_manager) await self.email_manager.initialize() # 检查并设置代理 if hasattr(self.config, "proxy_config") and self.config.proxy_config: if hasattr(self.config.proxy_config, "api_proxy") and self.config.proxy_config.api_proxy: self.proxy = self.config.proxy_config.api_proxy logger.info(f"使用API代理: {self.proxy}") async def cleanup(self): """清理资源""" logger.info("清理服务资源") if self.registration_process and self.registration_process.poll() is None: logger.info("终止注册进程") try: if sys.platform.startswith("win"): subprocess.run(["taskkill", "/F", "/T", "/PID", str(self.registration_process.pid)]) else: self.registration_process.terminate() self.registration_process.wait(timeout=5) except Exception as e: logger.error(f"终止注册进程时出错: {e}") await self.db_manager.cleanup() async def check_registration_status(self) -> bool: """检查注册API是否开启 Returns: bool: True表示开启注册,False表示关闭注册 """ url = f"{self.api_base_url}/regstates" params = {"hostname": self.hostname} try: async with aiohttp.ClientSession() as session: async with session.get(url, params=params, proxy=self.proxy, ssl=False) as response: if response.status != 200: logger.error(f"API请求失败,状态码: {response.status}") return self.reg_enabled # 保持当前状态 data = await response.json() if data.get("code") != 0: logger.error(f"API返回错误: {data.get('msg', 'Unknown error')}") return self.reg_enabled # 保持当前状态 reg_state = data.get("data", {}).get("reg_state", False) logger.info(f"注册状态: {'开启' if reg_state else '关闭'}") return reg_state except Exception as e: logger.error(f"检查注册状态时出错: {e}") return self.reg_enabled # 出错时保持当前状态 async def fetch_email_accounts(self) -> List[Dict[str, str]]: """从API获取邮箱账号 Returns: List[Dict[str, str]]: 邮箱账号列表 """ url = f"{self.api_base_url}/getemailaccount" params = {"hostname": self.hostname} try: async with aiohttp.ClientSession() as session: async with session.get(url, params=params, proxy=self.proxy, ssl=False) as response: if response.status != 200: logger.error(f"获取邮箱API请求失败,状态码: {response.status}") return [] data = await response.json() if data.get("code") != 0: logger.error(f"获取邮箱API返回错误: {data.get('msg', 'Unknown error')}") return [] accounts = data.get("data", {}).get("accounts", []) logger.info(f"成功获取 {len(accounts)} 个邮箱账号") return accounts except Exception as e: logger.error(f"获取邮箱账号时出错: {e}") return [] async def import_email_accounts(self, accounts: List[Dict[str, str]]) -> int: """导入邮箱账号到数据库 Args: accounts: 邮箱账号列表 Returns: int: 成功导入的账号数量 """ if not accounts: logger.warning("没有邮箱账号可导入") return 0 count = 0 blacklist_added = 0 # 如果有EmailManager,确保黑名单初始化 if self.email_manager and self.email_manager.use_redis: await self.email_manager._ensure_blacklist_initialized() for account in accounts: try: email = account.get("email", "") password = account.get("password", "") client_id = account.get("client_id", "") refresh_token = account.get("refresh_token", "") if not (email and password and client_id and refresh_token): logger.warning(f"账号数据不完整: {account}") continue # 检查邮箱是否已在黑名单中 if self.email_manager and await self.email_manager.is_email_blacklisted(email): logger.warning(f"跳过黑名单中的邮箱: {email}") blacklist_added += 1 continue # 插入数据库 insert_query = ''' INSERT INTO email_accounts (email, password, client_id, refresh_token, status) VALUES (%s, %s, %s, %s, 'pending') ON DUPLICATE KEY UPDATE password = VALUES(password), client_id = VALUES(client_id), refresh_token = VALUES(refresh_token), status = 'pending', updated_at = CURRENT_TIMESTAMP ''' await self.db_manager.execute( insert_query, (email, password, client_id, refresh_token) ) count += 1 except Exception as e: logger.error(f"导入邮箱账号时出错: {e}") if blacklist_added > 0: logger.warning(f"已跳过 {blacklist_added} 个黑名单中的邮箱") logger.success(f"成功导入 {count} 个邮箱账号") return count async def count_pending_accounts(self) -> int: """统计可用的pending状态账号数量""" if self.email_manager: # 使用EmailManager的方法,确保黑名单一致性 return await self.email_manager.count_pending_accounts() # 兼容旧代码,直接查询 query = """ SELECT COUNT(*) FROM email_accounts WHERE status = 'pending' AND in_use = 0 AND sold = 0 """ result = await self.db_manager.fetch_one(query) if result: return result.get("COUNT(*)", 0) return 0 async def check_and_fetch_emails(self) -> int: """检查并获取邮箱账号(如果需要) Returns: int: 获取的邮箱账号数量 """ # 检查当前可用邮箱数量 pending_count = await self.count_pending_accounts() logger.info(f"当前可用邮箱数量: {pending_count}") # 如果有可用邮箱并且注册已开启,立即确保注册进程在运行 if pending_count > 0 and self.reg_enabled: if not self.registration_process or self.registration_process.poll() is not None: logger.info(f"有 {pending_count} 个可用邮箱但注册进程未运行,立即启动注册进程") await self.start_registration_process() # 如果成功启动了注册进程且邮箱数量足够,可以不再获取新邮箱 if pending_count >= self.email_check_threshold: return 0 # 只有当邮箱数量不足时才获取新邮箱 if pending_count < self.email_check_threshold: logger.info(f"可用邮箱不足 ({pending_count} < {self.email_check_threshold}),准备获取新邮箱") total_imported = 0 for i in range(self.email_fetch_count): accounts = await self.fetch_email_accounts() if not accounts: logger.warning(f"第 {i+1} 次获取邮箱失败或无可用邮箱") break imported = await self.import_email_accounts(accounts) total_imported += imported # 导入后等待一秒,确保数据库状态完全更新 logger.debug("等待数据库状态更新...") await asyncio.sleep(2) # 每次获取到新邮箱并成功导入后,立即启动注册进程 if imported > 0 and self.reg_enabled: logger.info(f"已获取到 {imported} 个新邮箱,立即启动注册进程") if not self.registration_process or self.registration_process.poll() is not None: await self.start_registration_process() if imported < 15: # 每次API应返回15个账号 logger.warning(f"获取到的邮箱少于预期 ({imported} < 15),可能没有更多邮箱可用") break if i < self.email_fetch_count - 1: # 在多次请求之间添加延迟 await asyncio.sleep(2) return total_imported return 0 async def start_registration_process(self): """启动注册进程""" # 如果注册进程已在运行,不做任何事 if self.registration_process and self.registration_process.poll() is None: logger.info("注册进程已在运行中") return # 启动前等待1秒,确保数据库状态已更新 await asyncio.sleep(1) # 再次检查可用邮箱数量,确保使用最新状态 pending_count = await self.count_pending_accounts() if pending_count < 1: logger.warning(f"没有可用邮箱,暂不启动注册进程") return logger.info(f"有 {pending_count} 个可用邮箱,启动注册进程") try: # 获取配置中的batch_size,确保并发注册 batch_size = 1 # 默认值 if hasattr(self.config, "register_config") and self.config.register_config: batch_size = getattr(self.config.register_config, "batch_size", 1) logger.info(f"注册批次大小设置为: {batch_size}") # 使用subprocess启动main.py if sys.platform.startswith("win"): self.registration_process = subprocess.Popen( ["python", "main.py"], creationflags=subprocess.CREATE_NEW_PROCESS_GROUP ) else: self.registration_process = subprocess.Popen( ["python3", "main.py"] ) logger.info(f"注册进程已启动,PID: {self.registration_process.pid}") except Exception as e: logger.error(f"启动注册进程时出错: {e}") self.registration_process = None def stop_registration_process(self): """停止注册进程""" if not self.registration_process or self.registration_process.poll() is not None: logger.info("注册进程未在运行") self.registration_process = None return logger.info("停止注册进程") try: if sys.platform.startswith("win"): # Windows下使用taskkill subprocess.run(["taskkill", "/F", "/T", "/PID", str(self.registration_process.pid)]) else: # Linux/Mac下使用terminate self.registration_process.terminate() self.registration_process.wait(timeout=5) logger.info("注册进程已停止") except Exception as e: logger.error(f"停止注册进程时出错: {e}") self.registration_process = None async def upload_accounts(self): """上传已注册成功的账号""" logger.info("开始上传注册成功的账号") try: # 使用subprocess运行upload_account.py if sys.platform.startswith("win"): process = subprocess.Popen( ["python", "upload_account.py"], stdout=subprocess.PIPE, stderr=subprocess.PIPE, creationflags=subprocess.CREATE_NO_WINDOW ) else: process = subprocess.Popen( ["python3", "upload_account.py"], stdout=subprocess.PIPE, stderr=subprocess.PIPE ) # 等待进程完成 stdout, stderr = process.communicate() if process.returncode != 0: logger.error(f"上传账号进程异常终止,退出码: {process.returncode}") if stderr: logger.error(f"错误输出: {stderr.decode('utf-8', errors='ignore')}") else: logger.info("账号上传完成") if stdout: # 只记录最后几行输出 output_lines = stdout.decode('utf-8', errors='ignore').strip().split('\n') for line in output_lines[-5:]: logger.info(f"Upload: {line}") except Exception as e: logger.error(f"执行账号上传脚本时出错: {e}") async def run(self): """运行服务主循环""" logger.info("启动Cursor自动化服务") last_upload_time = 0 while self.running: try: # 1. 检查注册API状态 current_status = await self.check_registration_status() # 状态变化处理 if current_status != self.reg_enabled: self.reg_enabled = current_status logger.info(f"注册状态变化为: {'开启' if self.reg_enabled else '关闭'}") # 状态变化后等待2秒,确保数据库状态稳定 await asyncio.sleep(2) if self.reg_enabled: # 开启注册时,先检查邮箱数量 pending_count = await self.count_pending_accounts() if pending_count > 0: # 有可用邮箱,立即启动注册进程 logger.info(f"注册开启,有 {pending_count} 个可用邮箱,立即启动注册进程") await self.start_registration_process() else: # 没有邮箱,立即获取 logger.info("注册开启,但没有可用邮箱,立即获取新邮箱") await self.check_and_fetch_emails() else: # 关闭注册时,停止注册进程 self.stop_registration_process() # 等待一会,确保上一步操作完全完成 await asyncio.sleep(1) # 2. 如果注册已开启,检查邮箱情况并确保注册进程运行 if self.reg_enabled: # 检查注册进程是否在运行 if not self.registration_process or self.registration_process.poll() is not None: # 如果没有运行,检查是否有可用邮箱 pending_count = await self.count_pending_accounts() if pending_count > 0: # 有邮箱,立即启动注册进程 logger.info(f"有 {pending_count} 个未处理的邮箱,立即启动注册进程") await self.start_registration_process() else: # 没有可用邮箱,获取新邮箱 logger.info("没有可用邮箱,获取新邮箱") await self.check_and_fetch_emails() else: # 注册进程已在运行,仍然检查邮箱数量,确保有足够邮箱 pending_count = await self.count_pending_accounts() if pending_count < self.email_check_threshold: await self.check_and_fetch_emails() # 3. 定期上传账号 current_time = time.time() if current_time - last_upload_time >= self.upload_interval: await self.upload_accounts() last_upload_time = current_time # 上传后等待,确保数据库状态更新 await asyncio.sleep(2) # 4. 等待下一次检查 logger.debug(f"等待 {self.check_interval} 秒后进行下一次检查") for _ in range(self.check_interval): if not self.running: break await asyncio.sleep(1) except Exception as e: logger.error(f"服务运行时出错: {e}") await asyncio.sleep(30) # 出错后等待30秒再继续 logger.info("服务已停止") @classmethod async def start_service(cls): """启动服务的静态方法""" service = cls() await service.initialize() try: await service.run() finally: await service.cleanup() async def main(): """主函数""" # 设置日志 logger.remove() logger.add(sys.stderr, level="INFO") logger.add( "auto_cursor_service.log", rotation="50 MB", retention="10 days", level="DEBUG", compression="zip" ) logger.info(f"Cursor自动化服务启动于: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") try: await AutoCursorService.start_service() except Exception as e: logger.error(f"服务异常终止: {e}") import traceback logger.error(traceback.format_exc()) if __name__ == "__main__": if sys.platform.startswith("win"): asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy()) asyncio.run(main())