#!/usr/bin/env python3 """ Cursor自动化服务 - 监控API,控制注册进程 - 自动获取邮箱账号 - 自动上传注册成功的账号 """ import asyncio import sys import json import signal import subprocess import time import random import string from datetime import datetime from typing import Dict, List, Optional, Tuple, Any import aiohttp from loguru import logger # Windows平台特殊处理,强制使用SelectorEventLoop if sys.platform.startswith("win"): asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy()) from core.config import Config from core.database import DatabaseManager from services.fetch_manager import FetchManager from services.self_hosted_email import SelfHostedEmail from services.proxy_pool import ProxyPool class AutoCursorService: def __init__(self): self.config = Config.from_yaml() self.db_manager = DatabaseManager(self.config) self.fetch_manager = FetchManager(self.config) # API相关 self.api_base_url = "https://cursorapi.nosqli.com/admin/api.AutoCursor" self.proxy = None # 初始化自建邮箱服务 self.self_hosted_email = None if hasattr(self.config, 'self_hosted_email_config') and self.config.self_hosted_email_config: self.self_hosted_email = SelfHostedEmail( self.fetch_manager, self.config.self_hosted_email_config.api_base_url, self.config.self_hosted_email_config.api_key ) logger.info("自建邮箱服务已初始化") else: logger.warning("未配置自建邮箱服务,部分功能可能不可用") # 初始化代理池 self.proxy_pool = ProxyPool(self.config, self.fetch_manager) # 获取hostname,用于API请求参数 self.hostname = getattr(self.config, "hostname", None) if not self.hostname: # 尝试从config.yaml中的server_config获取 server_config = getattr(self.config, "server_config", None) if server_config: self.hostname = getattr(server_config, "hostname", "unknown") else: self.hostname = "unknown" logger.warning(f"未在配置中找到hostname,使用默认值: {self.hostname}") # 子进程控制 self.registration_process = None self.reg_enabled = False # 从配置中获取自动服务参数 auto_service_config = getattr(self.config, "auto_service_config", None) if auto_service_config: self.check_interval = getattr(auto_service_config, "check_interval", 60) self.upload_interval = getattr(auto_service_config, "upload_interval", 300) self.email_check_threshold = getattr(auto_service_config, "email_check_threshold", 30) self.email_batch_size = getattr(auto_service_config, "email_batch_size", 10) else: self.check_interval = 60 # 检查API状态的间隔(秒) self.upload_interval = 300 # 上传账号间隔(秒) self.email_check_threshold = 30 # 当可用邮箱少于这个数时获取新邮箱 self.email_batch_size = 10 # 每次获取邮箱的数量 # 处理邮箱的最小数量阈值,只要有这么多邮箱就会立即处理 self.min_email_to_process = 1 # 运行状态控制 self.running = True # 设置信号处理 signal.signal(signal.SIGINT, self._handle_signal) signal.signal(signal.SIGTERM, self._handle_signal) def _handle_signal(self, signum, frame): """处理信号,优雅关闭服务""" logger.info(f"收到信号 {signum},准备关闭服务") self.running = False # 确保这不是在异步上下文中调用的 if self.registration_process: logger.info("正在终止注册进程...") try: if sys.platform.startswith("win"): subprocess.run(["taskkill", "/F", "/T", "/PID", str(self.registration_process.pid)]) else: self.registration_process.terminate() except Exception as e: logger.error(f"终止注册进程时出错: {e}") async def initialize(self): """初始化服务""" logger.info("初始化自动化服务") await self.db_manager.initialize() # 检查并设置代理 if hasattr(self.config, "proxy_config") and self.config.proxy_config: if hasattr(self.config.proxy_config, "api_proxy") and self.config.proxy_config.api_proxy: self.proxy = self.config.proxy_config.api_proxy logger.info(f"使用API代理: {self.proxy}") async def cleanup(self): """清理资源""" logger.info("清理服务资源") if self.registration_process and self.registration_process.poll() is None: logger.info("终止注册进程") try: if sys.platform.startswith("win"): subprocess.run(["taskkill", "/F", "/T", "/PID", str(self.registration_process.pid)]) else: self.registration_process.terminate() self.registration_process.wait(timeout=5) except Exception as e: logger.error(f"终止注册进程时出错: {e}") await self.db_manager.cleanup() async def check_registration_status(self) -> bool: """检查注册API是否开启 Returns: bool: True表示开启注册,False表示关闭注册 """ url = f"{self.api_base_url}/regstates" params = {"hostname": self.hostname} try: async with aiohttp.ClientSession() as session: async with session.get(url, params=params, proxy=self.proxy, ssl=False) as response: if response.status != 200: logger.error(f"API请求失败,状态码: {response.status}") return self.reg_enabled # 保持当前状态 data = await response.json() if data.get("code") != 0: logger.error(f"API返回错误: {data.get('msg', 'Unknown error')}") return self.reg_enabled # 保持当前状态 reg_state = data.get("data", {}).get("reg_state", False) logger.info(f"注册状态: {'开启' if reg_state else '关闭'}") return reg_state except Exception as e: logger.error(f"检查注册状态时出错: {e}") return self.reg_enabled # 出错时保持当前状态 async def fetch_email_accounts(self, count: int = 10) -> List[Dict[str, str]]: """使用自建邮箱服务获取邮箱账号 Args: count: 需要获取的邮箱数量 Returns: List[Dict[str, str]]: 邮箱账号列表,每个账号包含email和password字段 """ if not self.self_hosted_email: logger.error("自建邮箱服务未初始化,无法获取邮箱") return [] result = [] for _ in range(count): try: # 获取一个邮箱地址 email = await self.self_hosted_email.get_email() if not email: logger.warning("获取邮箱失败,跳过") continue # 生成随机密码 password = ''.join(random.choices(string.ascii_letters + string.digits, k=12)) # 添加到结果列表 account = { "email": email, "password": password, "client_id": "", # 如果有需要可以生成 "refresh_token": "" # 如果有需要可以生成 } result.append(account) logger.info(f"已获取邮箱: {email}") # 每次获取后等待一小段时间,避免请求过快 await asyncio.sleep(0.5) except Exception as e: logger.error(f"获取邮箱时出错: {e}") await asyncio.sleep(1) # 出错后等待较长时间 logger.info(f"本次共获取 {len(result)} 个邮箱账号") return result async def import_self_hosted_emails(self, count: int = 10) -> List[Dict[str, str]]: """获取并导入自建邮箱 Args: count: 要获取的邮箱数量 Returns: List[Dict[str, str]]: 获取的邮箱列表 """ # 直接获取邮箱列表,不再需要导入到数据库 emails = await self.fetch_email_accounts(count) return emails async def start_registration_process(self): """启动注册进程""" # 如果注册进程已在运行,不做任何事 if self.registration_process and self.registration_process.poll() is None: logger.info("注册进程已在运行中") return logger.info("启动注册进程") try: # 获取配置中的batch_size,确保并发注册 batch_size = 1 # 默认值 if hasattr(self.config, "register_config") and self.config.register_config: batch_size = getattr(self.config.register_config, "batch_size", 1) logger.info(f"注册批次大小设置为: {batch_size}") # 使用subprocess启动main.py if sys.platform.startswith("win"): self.registration_process = subprocess.Popen( ["python", "main.py"], creationflags=subprocess.CREATE_NEW_PROCESS_GROUP ) else: self.registration_process = subprocess.Popen( ["python3", "main.py"] ) logger.info(f"注册进程已启动,PID: {self.registration_process.pid}") except Exception as e: logger.error(f"启动注册进程时出错: {e}") self.registration_process = None def stop_registration_process(self): """停止注册进程""" if not self.registration_process or self.registration_process.poll() is not None: logger.info("注册进程未在运行") self.registration_process = None return logger.info("停止注册进程") try: if sys.platform.startswith("win"): # Windows下使用taskkill subprocess.run(["taskkill", "/F", "/T", "/PID", str(self.registration_process.pid)]) else: # Linux/Mac下使用terminate self.registration_process.terminate() self.registration_process.wait(timeout=5) logger.info("注册进程已停止") except Exception as e: logger.error(f"停止注册进程时出错: {e}") self.registration_process = None async def upload_accounts(self, accounts: List[Dict[str, Any]]) -> bool: """上传账号到API Args: accounts: 要上传的账号列表,每个账号包含email、password、cursor_password等信息 Returns: bool: 是否上传成功 """ if not accounts: return True url = f"{self.api_base_url}/commonadd" try: # 准备上传数据 upload_data = [] for account in accounts: upload_item = { "email": account["email"], "email_password": account.get("password", ""), # 使用account的password字段作为email_password "cursor_email": account["email"], "cursor_password": account["cursor_password"], "cookie": account.get("cursor_cookie", "") or "", "token": account.get("cursor_jwt", ""), "hostname": self.hostname } upload_data.append(upload_item) # 打印上传数据的部分细节(去除敏感信息) debug_data = [] for item in upload_data[:2]: # 只打印前2个账号作为示例 debug_item = item.copy() if "cookie" in debug_item and debug_item["cookie"]: debug_item["cookie"] = debug_item["cookie"][:20] + "..." if len(debug_item["cookie"]) > 20 else debug_item["cookie"] if "token" in debug_item and debug_item["token"]: debug_item["token"] = debug_item["token"][:20] + "..." if len(debug_item["token"]) > 20 else debug_item["token"] debug_data.append(debug_item) logger.debug(f"准备上传 {len(upload_data)} 个账号") logger.debug(f"上传数据示例: {json.dumps(debug_data, ensure_ascii=False)}") logger.debug(f"API URL: {url}") # 发送请求 async with aiohttp.ClientSession() as session: async with session.post( url, json=upload_data, proxy=self.proxy, ssl=False ) as response: response_text = await response.text() logger.debug(f"API响应状态码: {response.status}") logger.debug(f"API响应内容: {response_text}") if response.status != 200: logger.error(f"上传账号API请求失败,状态码: {response.status}") return False try: data = json.loads(response_text) except json.JSONDecodeError: logger.error(f"解析响应失败,非JSON格式: {response_text[:100]}...") return False if data.get("code") != 0: error_msg = data.get("msg", "Unknown error") logger.error(f"上传账号API返回错误: {error_msg}") return False success_count = data.get("data", {}).get("success", 0) failed_count = data.get("data", {}).get("failed", 0) # 检查是否有详细的错误信息 if "details" in data.get("data", {}): details = data.get("data", {}).get("details", []) if details: logger.error("错误详情:") for i, detail in enumerate(details[:5]): # 只显示前5个错误 logger.error(f" 错误 {i+1}: {detail.get('email', '未知邮箱')} - {detail.get('message', '未知错误')}") logger.info(f"账号上传结果: 成功 {success_count}, 失败 {failed_count}") return success_count > 0 except Exception as e: logger.error(f"上传账号时出错: {e}") return False async def run(self): """运行服务主循环""" logger.info("启动Cursor自动化服务") # 设置永久开启注册 self.reg_enabled = True logger.info("注册功能已永久开启") last_upload_time = 0 while self.running: try: # 1. 获取邮箱 logger.info("准备获取自建邮箱") emails = await self.import_self_hosted_emails(self.email_batch_size) if emails: logger.info(f"成功获取 {len(emails)} 个自建邮箱") # 2. 确保注册进程正在运行 if not self.registration_process or self.registration_process.poll() is not None: if emails: logger.info(f"有 {len(emails)} 个可用邮箱,启动注册进程") await self.start_registration_process() else: logger.warning("没有可用邮箱,暂不启动注册进程") # 3. 等待下一次检查 logger.debug(f"等待 {self.check_interval} 秒后进行下一次检查") for _ in range(self.check_interval): if not self.running: break await asyncio.sleep(1) except Exception as e: logger.error(f"服务运行时出错: {e}") await asyncio.sleep(30) # 出错后等待30秒再继续 logger.info("服务已停止") @classmethod async def start_service(cls): """启动服务的静态方法""" service = cls() await service.initialize() try: await service.run() finally: await service.cleanup() async def main(): """主函数""" # 设置日志 logger.remove() logger.add(sys.stderr, level="INFO") logger.add( "logs/auto_cursor_service.log", rotation="50 MB", retention="10 days", level="DEBUG", compression="zip" ) logger.info(f"Cursor自动化服务启动于: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") try: await AutoCursorService.start_service() except Exception as e: logger.error(f"服务异常终止: {e}") import traceback logger.error(traceback.format_exc()) if __name__ == "__main__": if sys.platform.startswith("win"): asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy()) asyncio.run(main())