From b20094adfcb3e1d0127675c9fcce0a6072167599 Mon Sep 17 00:00:00 2001 From: huangzhenpc Date: Tue, 1 Apr 2025 16:25:08 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96=E8=87=AA=E5=8A=A8=E5=8C=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- AUTO_SERVICE_README.md | 166 ++++++++++++++ UPGRADE_DATABASE_README.md | 55 +++++ auto_cursor_service.py | 435 +++++++++++++++++++++++++++++++++++ config.yaml | 12 + core/config.py | 36 ++- core/database.py | 4 +- init_database.py | 4 +- migrate_db.py | 4 +- upgrade_database.py | 118 ++++++++++ upload_account.py | 455 +++++++++++++++++++++++++++++++++++++ 10 files changed, 1284 insertions(+), 5 deletions(-) create mode 100644 AUTO_SERVICE_README.md create mode 100644 UPGRADE_DATABASE_README.md create mode 100644 auto_cursor_service.py create mode 100644 upgrade_database.py create mode 100644 upload_account.py diff --git a/AUTO_SERVICE_README.md b/AUTO_SERVICE_README.md new file mode 100644 index 0000000..8cec0b2 --- /dev/null +++ b/AUTO_SERVICE_README.md @@ -0,0 +1,166 @@ +# Cursor自动化服务使用说明 + +本自动化服务实现了Cursor注册流程的完全自动化,包括API状态监控、邮箱自动获取和账号上传功能。 + +## 功能特点 + +1. **API状态监控** + - 实时监控注册API状态,自动开启/关闭注册进程 + - 支持配置检查间隔和自定义服务器标识(hostname) + +2. **邮箱账号自动管理** + - 监控本地邮箱池,当可用邮箱不足时自动从API获取 + - 支持配置获取阈值和批次数量 + +3. **账号自动上传** + - 定期上传注册成功的账号到API + - 支持配置上传间隔和代理设置 + - 通过extracted字段跟踪已上传账号 + +4. **进程管理** + - 智能管理注册子进程的启动和停止 + - 在API关闭注册时自动停止进程 + +5. **完善的日志记录** + - 详细的运行日志,支持日志轮转和压缩 + - 关键操作和错误的可追溯性 + +## 前置条件 + +1. 已配置好MySQL数据库 +2. Redis服务(可选,但推荐) +3. 已在config.yaml中配置好hostname参数 +4. 已安装所需依赖:`pip install -r requirements.txt` + +## 配置说明 + +在`config.yaml`中添加以下配置: + +```yaml +# 服务器配置 +server_config: + hostname: "sg424" # 服务器标识,用于API调用 + +# 代理配置 +proxy: + # ... 其他代理配置 ... + api_proxy: "http://your-proxy-server:port" # API专用代理(可选) + +# 自动服务配置 +auto_service: + check_interval: 60 # 检查API状态的间隔(秒) + upload_interval: 300 # 上传账号间隔(秒) + email_check_threshold: 30 # 当可用邮箱少于这个数时获取新邮箱 +``` + +## 使用方法 + +1. **初始化数据库**(首次使用) + ```bash + python init_database.py + ``` + +2. **升级数据库**(现有数据库升级) + ```bash + python upgrade_database.py + ``` + + > 注意:如果您从之前版本升级,需要运行此脚本添加新的`extracted`字段 + +3. **启动自动化服务** + ```bash + python auto_cursor_service.py + ``` + +4. **查看运行状态** + - 服务会在控制台输出基本运行信息 + - 详细日志保存在`auto_cursor_service.log`文件中 + +5. **停止服务** + - 按Ctrl+C安全停止服务 + - 服务会自动停止注册进程并清理资源 + +## 工作流程 + +1. 服务启动,初始化数据库连接 +2. 定期检查API注册状态(`/regstates`) +3. 如果注册开启: + - 检查本地邮箱池,如果数量不足,从API获取新邮箱 + - 启动注册进程(main.py) +4. 如果注册关闭: + - 停止注册进程 +5. 定期上传注册成功的账号,并标记为已上传(extracted=1) +6. 循环以上步骤,直到服务被手动停止 + +## 常见问题 + +1. **API连接问题** + - 检查网络连接和代理设置 + - 确认API URL和参数正确 + +2. **邮箱获取失败** + - 检查API返回的错误信息 + - 可能是服务端暂无可用邮箱 + +3. **子进程管理问题** + - Windows平台可能需要管理员权限来强制终止进程 + - 检查main.py是否能正常独立运行 + +4. **内存占用过高** + - 调整日志设置,减少日志详细程度 + - 检查是否有内存泄漏 + +5. **数据库结构问题** + - 如遇到"Unknown column 'extracted' in 'where clause'"错误 + - 运行`python upgrade_database.py`更新数据库结构 + +## 进阶使用 + +### 设置为系统服务(Linux) + +1. 创建服务文件`/etc/systemd/system/cursor-service.service`: + ``` + [Unit] + Description=Cursor Auto Registration Service + After=network.target mysql.service redis.service + + [Service] + Type=simple + User=your_user + WorkingDirectory=/path/to/cursor/app + ExecStart=/usr/bin/python3 /path/to/cursor/app/auto_cursor_service.py + Restart=on-failure + RestartSec=10 + + [Install] + WantedBy=multi-user.target + ``` + +2. 重新加载systemd: + ```bash + sudo systemctl daemon-reload + ``` + +3. 启动服务: + ```bash + sudo systemctl start cursor-service + ``` + +4. 设置开机自启: + ```bash + sudo systemctl enable cursor-service + ``` + +### 使用screen或tmux(简易方法) + +```bash +# 使用screen +screen -S cursor +python auto_cursor_service.py +# 按Ctrl+A然后按D分离screen + +# 使用tmux +tmux new -s cursor +python auto_cursor_service.py +# 按Ctrl+B然后按D分离tmux +``` \ No newline at end of file diff --git a/UPGRADE_DATABASE_README.md b/UPGRADE_DATABASE_README.md new file mode 100644 index 0000000..21cd0ef --- /dev/null +++ b/UPGRADE_DATABASE_README.md @@ -0,0 +1,55 @@ +# 数据库升级指南 + +本文档说明如何使用 `upgrade_database.py` 脚本升级现有数据库结构,添加新的 `extracted` 字段和相关索引。 + +## 升级目的 + +新版本的 Cursor 自动化服务增加了账号上传功能,需要在数据库中添加 `extracted` 字段来标记已上传的账号。此脚本会自动完成以下操作: + +1. 检查并添加 `extracted` 布尔字段(默认为 0) +2. 创建 `idx_extracted` 索引以优化查询性能 +3. 将现有的成功注册账号的 `extracted` 状态设为 0(未提取) + +## 使用方法 + +1. 确保 MySQL 数据库正在运行,且 `config.yaml` 中的数据库配置正确 + +2. 运行升级脚本: + ```bash + python upgrade_database.py + ``` + +3. 脚本将自动检测是否需要添加字段和索引,并仅在需要时执行相应的 SQL 语句 + +4. 查看输出日志确认升级结果,详细日志保存在 `upgrade_database.log` 文件中 + +## 安全考虑 + +- 此脚本会修改数据库结构,但不会删除任何数据 +- 建议在执行前备份数据库 +- 脚本执行前会检查字段和索引是否已存在,不会重复创建 + +## 常见问题 + +1. **权限错误** + - 确保数据库用户有 ALTER TABLE 权限 + - 错误信息通常包含 "Access denied" 或 "Insufficient privileges" + +2. **字段已存在** + - 如果收到 "Duplicate column name" 错误,说明字段已存在 + - 脚本会检测字段是否存在,通常不会出现此问题 + +3. **执行成功但无更改** + - 如果日志显示 "extracted字段已存在,无需添加",说明数据库结构已是最新 + - 这是正常现象,无需担心 + +## 验证升级 + +升级完成后,可以通过以下SQL语句验证新字段是否添加成功: + +```sql +DESCRIBE email_accounts; +SHOW INDEX FROM email_accounts; +``` + +应该能看到 `extracted` 字段和 `idx_extracted` 索引。 \ No newline at end of file diff --git a/auto_cursor_service.py b/auto_cursor_service.py new file mode 100644 index 0000000..c6d9bd2 --- /dev/null +++ b/auto_cursor_service.py @@ -0,0 +1,435 @@ +#!/usr/bin/env python3 +""" +Cursor自动化服务 +- 监控API,控制注册进程 +- 自动获取邮箱账号 +- 自动上传注册成功的账号 +""" +import asyncio +import sys +import json +import signal +import subprocess +import time +from datetime import datetime +from typing import Dict, List, Optional, Tuple, Any + +import aiohttp +from loguru import logger + +# Windows平台特殊处理,强制使用SelectorEventLoop +if sys.platform.startswith("win"): + asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy()) + +from core.config import Config +from core.database import DatabaseManager +from import_emails import import_emails + + +class AutoCursorService: + def __init__(self): + self.config = Config.from_yaml() + self.db_manager = DatabaseManager(self.config) + + # API相关 + self.api_base_url = "https://cursorapi.nosqli.com/admin/api.AutoCursor" + self.proxy = None + + # 获取hostname,用于API请求参数 + self.hostname = getattr(self.config, "hostname", None) + if not self.hostname: + # 尝试从config.yaml中的server_config获取 + server_config = getattr(self.config, "server_config", None) + if server_config: + self.hostname = getattr(server_config, "hostname", "unknown") + else: + self.hostname = "unknown" + logger.warning(f"未在配置中找到hostname,使用默认值: {self.hostname}") + + # 子进程控制 + self.registration_process = None + self.reg_enabled = False + self.check_interval = 60 # 检查API状态的间隔(秒) + self.email_check_threshold = 30 # 当可用邮箱少于这个数时获取新邮箱 + self.email_fetch_count = 2 # 获取邮箱的次数(每次15个) + + # 运行状态控制 + self.running = True + self.upload_interval = 300 # 上传账号间隔(秒) + + # 设置信号处理 + signal.signal(signal.SIGINT, self._handle_signal) + signal.signal(signal.SIGTERM, self._handle_signal) + + def _handle_signal(self, signum, frame): + """处理信号,优雅关闭服务""" + logger.info(f"收到信号 {signum},准备关闭服务") + self.running = False + + # 确保这不是在异步上下文中调用的 + if self.registration_process: + logger.info("正在终止注册进程...") + try: + if sys.platform.startswith("win"): + subprocess.run(["taskkill", "/F", "/T", "/PID", str(self.registration_process.pid)]) + else: + self.registration_process.terminate() + except Exception as e: + logger.error(f"终止注册进程时出错: {e}") + + async def initialize(self): + """初始化服务""" + logger.info("初始化自动化服务") + await self.db_manager.initialize() + + # 检查并设置代理 + if hasattr(self.config, "proxy_config") and self.config.proxy_config: + if hasattr(self.config.proxy_config, "api_proxy") and self.config.proxy_config.api_proxy: + self.proxy = self.config.proxy_config.api_proxy + logger.info(f"使用API代理: {self.proxy}") + + async def cleanup(self): + """清理资源""" + logger.info("清理服务资源") + if self.registration_process and self.registration_process.poll() is None: + logger.info("终止注册进程") + try: + if sys.platform.startswith("win"): + subprocess.run(["taskkill", "/F", "/T", "/PID", str(self.registration_process.pid)]) + else: + self.registration_process.terminate() + self.registration_process.wait(timeout=5) + except Exception as e: + logger.error(f"终止注册进程时出错: {e}") + + await self.db_manager.cleanup() + + async def check_registration_status(self) -> bool: + """检查注册API是否开启 + + Returns: + bool: True表示开启注册,False表示关闭注册 + """ + url = f"{self.api_base_url}/regstates" + params = {"hostname": self.hostname} + + try: + async with aiohttp.ClientSession() as session: + async with session.get(url, params=params, proxy=self.proxy, ssl=False) as response: + if response.status != 200: + logger.error(f"API请求失败,状态码: {response.status}") + return self.reg_enabled # 保持当前状态 + + data = await response.json() + if data.get("code") != 0: + logger.error(f"API返回错误: {data.get('msg', 'Unknown error')}") + return self.reg_enabled # 保持当前状态 + + reg_state = data.get("data", {}).get("reg_state", False) + logger.info(f"注册状态: {'开启' if reg_state else '关闭'}") + return reg_state + + except Exception as e: + logger.error(f"检查注册状态时出错: {e}") + return self.reg_enabled # 出错时保持当前状态 + + async def fetch_email_accounts(self) -> List[Dict[str, str]]: + """从API获取邮箱账号 + + Returns: + List[Dict[str, str]]: 邮箱账号列表 + """ + url = f"{self.api_base_url}/getemailaccount" + params = {"hostname": self.hostname} + + try: + async with aiohttp.ClientSession() as session: + async with session.get(url, params=params, proxy=self.proxy, ssl=False) as response: + if response.status != 200: + logger.error(f"获取邮箱API请求失败,状态码: {response.status}") + return [] + + data = await response.json() + if data.get("code") != 0: + logger.error(f"获取邮箱API返回错误: {data.get('msg', 'Unknown error')}") + return [] + + accounts = data.get("data", {}).get("accounts", []) + logger.info(f"成功获取 {len(accounts)} 个邮箱账号") + return accounts + + except Exception as e: + logger.error(f"获取邮箱账号时出错: {e}") + return [] + + async def import_email_accounts(self, accounts: List[Dict[str, str]]) -> int: + """导入邮箱账号到数据库 + + Args: + accounts: 邮箱账号列表 + + Returns: + int: 成功导入的账号数量 + """ + if not accounts: + logger.warning("没有邮箱账号可导入") + return 0 + + count = 0 + for account in accounts: + try: + email = account.get("email", "") + password = account.get("password", "") + client_id = account.get("client_id", "") + refresh_token = account.get("refresh_token", "") + + if not (email and password and client_id and refresh_token): + logger.warning(f"账号数据不完整: {account}") + continue + + # 插入数据库 + insert_query = ''' + INSERT INTO email_accounts + (email, password, client_id, refresh_token, status) + VALUES (%s, %s, %s, %s, 'pending') + ON DUPLICATE KEY UPDATE + password = VALUES(password), + client_id = VALUES(client_id), + refresh_token = VALUES(refresh_token), + status = 'pending', + updated_at = CURRENT_TIMESTAMP + ''' + + await self.db_manager.execute( + insert_query, + (email, password, client_id, refresh_token) + ) + count += 1 + + except Exception as e: + logger.error(f"导入邮箱账号时出错: {e}") + + logger.success(f"成功导入 {count} 个邮箱账号") + return count + + async def count_pending_accounts(self) -> int: + """统计可用的pending状态账号数量""" + query = """ + SELECT COUNT(*) + FROM email_accounts + WHERE status = 'pending' AND in_use = 0 AND sold = 0 + """ + result = await self.db_manager.fetch_one(query) + if result: + return result.get("COUNT(*)", 0) + return 0 + + async def check_and_fetch_emails(self) -> int: + """检查并获取邮箱账号(如果需要) + + Returns: + int: 获取的邮箱账号数量 + """ + # 检查当前可用邮箱数量 + pending_count = await self.count_pending_accounts() + logger.info(f"当前可用邮箱数量: {pending_count}") + + if pending_count >= self.email_check_threshold: + logger.info(f"可用邮箱数量充足 ({pending_count} >= {self.email_check_threshold})") + return 0 + + # 需要获取新邮箱 + logger.info(f"可用邮箱不足 ({pending_count} < {self.email_check_threshold}),准备获取新邮箱") + total_imported = 0 + + for i in range(self.email_fetch_count): + accounts = await self.fetch_email_accounts() + if not accounts: + logger.warning(f"第 {i+1} 次获取邮箱失败或无可用邮箱") + break + + imported = await self.import_email_accounts(accounts) + total_imported += imported + + if imported < 15: # 每次API应返回15个账号 + logger.warning(f"获取到的邮箱少于预期 ({imported} < 15),可能没有更多邮箱可用") + break + + if i < self.email_fetch_count - 1: + # 在多次请求之间添加延迟 + await asyncio.sleep(2) + + return total_imported + + def start_registration_process(self): + """启动注册进程""" + if self.registration_process and self.registration_process.poll() is None: + logger.info("注册进程已在运行中") + return + + logger.info("启动注册进程") + try: + # 使用subprocess启动main.py + if sys.platform.startswith("win"): + self.registration_process = subprocess.Popen( + ["python", "main.py"], + creationflags=subprocess.CREATE_NEW_PROCESS_GROUP + ) + else: + self.registration_process = subprocess.Popen( + ["python3", "main.py"] + ) + + logger.info(f"注册进程已启动,PID: {self.registration_process.pid}") + + except Exception as e: + logger.error(f"启动注册进程时出错: {e}") + self.registration_process = None + + def stop_registration_process(self): + """停止注册进程""" + if not self.registration_process or self.registration_process.poll() is not None: + logger.info("注册进程未在运行") + self.registration_process = None + return + + logger.info("停止注册进程") + try: + if sys.platform.startswith("win"): + # Windows下使用taskkill + subprocess.run(["taskkill", "/F", "/T", "/PID", str(self.registration_process.pid)]) + else: + # Linux/Mac下使用terminate + self.registration_process.terminate() + self.registration_process.wait(timeout=5) + + logger.info("注册进程已停止") + + except Exception as e: + logger.error(f"停止注册进程时出错: {e}") + + self.registration_process = None + + async def upload_accounts(self): + """上传已注册成功的账号""" + logger.info("开始上传注册成功的账号") + try: + # 使用subprocess运行upload_account.py + if sys.platform.startswith("win"): + process = subprocess.Popen( + ["python", "upload_account.py"], + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + creationflags=subprocess.CREATE_NO_WINDOW + ) + else: + process = subprocess.Popen( + ["python3", "upload_account.py"], + stdout=subprocess.PIPE, + stderr=subprocess.PIPE + ) + + # 等待进程完成 + stdout, stderr = process.communicate() + + if process.returncode != 0: + logger.error(f"上传账号进程异常终止,退出码: {process.returncode}") + if stderr: + logger.error(f"错误输出: {stderr.decode('utf-8', errors='ignore')}") + else: + logger.info("账号上传完成") + if stdout: + # 只记录最后几行输出 + output_lines = stdout.decode('utf-8', errors='ignore').strip().split('\n') + for line in output_lines[-5:]: + logger.info(f"Upload: {line}") + + except Exception as e: + logger.error(f"执行账号上传脚本时出错: {e}") + + async def run(self): + """运行服务主循环""" + logger.info("启动Cursor自动化服务") + + last_upload_time = 0 + + while self.running: + try: + # 1. 检查注册API状态 + current_status = await self.check_registration_status() + + # 状态变化处理 + if current_status != self.reg_enabled: + self.reg_enabled = current_status + logger.info(f"注册状态变化为: {'开启' if self.reg_enabled else '关闭'}") + + if self.reg_enabled: + # 开启注册时,先检查并获取邮箱 + await self.check_and_fetch_emails() + # 然后启动注册进程 + self.start_registration_process() + else: + # 关闭注册时,停止注册进程 + self.stop_registration_process() + + # 2. 如果注册已开启,检查并获取邮箱 + if self.reg_enabled: + await self.check_and_fetch_emails() + + # 3. 定期上传账号 + current_time = time.time() + if current_time - last_upload_time >= self.upload_interval: + await self.upload_accounts() + last_upload_time = current_time + + # 4. 等待下一次检查 + for _ in range(self.check_interval): + if not self.running: + break + await asyncio.sleep(1) + + except Exception as e: + logger.error(f"服务运行时出错: {e}") + await asyncio.sleep(30) # 出错后等待30秒再继续 + + logger.info("服务已停止") + + @classmethod + async def start_service(cls): + """启动服务的静态方法""" + service = cls() + await service.initialize() + + try: + await service.run() + finally: + await service.cleanup() + + +async def main(): + """主函数""" + # 设置日志 + logger.remove() + logger.add(sys.stderr, level="INFO") + logger.add( + "auto_cursor_service.log", + rotation="50 MB", + retention="10 days", + level="DEBUG", + compression="zip" + ) + + logger.info(f"Cursor自动化服务启动于: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") + try: + await AutoCursorService.start_service() + except Exception as e: + logger.error(f"服务异常终止: {e}") + import traceback + logger.error(traceback.format_exc()) + + +if __name__ == "__main__": + if sys.platform.startswith("win"): + asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy()) + + asyncio.run(main()) \ No newline at end of file diff --git a/config.yaml b/config.yaml index 72da4c5..10d7051 100644 --- a/config.yaml +++ b/config.yaml @@ -4,6 +4,10 @@ global: timeout: 30 retry_times: 3 +# 服务器配置 +server_config: + hostname: "sg424" # 服务器标识,用于API调用 + # 数据库配置 database: # SQLite配置(兼容旧版本) @@ -33,6 +37,8 @@ proxy: api_url: "https://share.proxy.qg.net/get?key=969331C5&num=1&area=&isp=0&format=txt&seq=\r\n&distinct=false" batch_size: 100 check_interval: 300 + # API专用代理(可选) + api_proxy: "http://1ddbeae0f7a67106fd58:f72e512b10893a1d@gw.dataimpulse.com:823" # 注册配置 register: @@ -44,6 +50,12 @@ register: email: file_path: "email.txt" +# 自动服务配置 +auto_service: + check_interval: 60 # 检查API状态的间隔(秒) + upload_interval: 300 # 上传账号间隔(秒) + email_check_threshold: 30 # 当可用邮箱少于这个数时获取新邮箱 + captcha: provider: "capsolver" # 可选值: "capsolver" 或 "yescaptcha" capsolver: diff --git a/core/config.py b/core/config.py index 89f8ae8..87c6671 100644 --- a/core/config.py +++ b/core/config.py @@ -39,6 +39,7 @@ class ProxyConfig: api_url: str batch_size: int check_interval: int + api_proxy: Optional[str] = None @dataclass @@ -52,6 +53,20 @@ class EmailConfig: file_path: str +@dataclass +class ServerConfig: + hostname: str + description: Optional[str] = None + + +@dataclass +class AutoServiceConfig: + check_interval: int = 60 + upload_interval: int = 300 + email_check_threshold: int = 30 + email_fetch_count: int = 2 + + @dataclass class CapsolverConfig: api_key: str @@ -83,6 +98,9 @@ class Config: register_config: RegisterConfig = None email_config: EmailConfig = None captcha_config: CaptchaConfig = None + server_config: Optional[ServerConfig] = None + auto_service_config: Optional[AutoServiceConfig] = None + hostname: Optional[str] = None # 向后兼容 @classmethod def from_yaml(cls, path: str = "config.yaml"): @@ -112,7 +130,18 @@ class Config: proxy_config = ProxyConfig(**data.get('proxy', {})) if 'proxy' in data else None register_config = RegisterConfig(**data.get('register', {})) if 'register' in data else None email_config = EmailConfig(**data.get('email', {})) if 'email' in data else None - + + # 创建服务器配置对象 + server_config = ServerConfig(**data.get('server_config', {})) if 'server_config' in data else None + + # 创建自动服务配置对象 + auto_service_config = AutoServiceConfig(**data.get('auto_service', {})) if 'auto_service' in data else None + + # 设置hostname (优先使用server_config中的hostname) + hostname = None + if server_config and hasattr(server_config, 'hostname'): + hostname = server_config.hostname + return cls( global_config=global_config, database_config=db_config, @@ -120,5 +149,8 @@ class Config: proxy_config=proxy_config, register_config=register_config, email_config=email_config, - captcha_config=captcha_config + captcha_config=captcha_config, + server_config=server_config, + auto_service_config=auto_service_config, + hostname=hostname ) diff --git a/core/database.py b/core/database.py index 290944e..3c26c07 100644 --- a/core/database.py +++ b/core/database.py @@ -83,9 +83,11 @@ class DatabaseManager: cursor_token TEXT, sold BOOLEAN DEFAULT 0, status VARCHAR(20) DEFAULT 'pending', + extracted BOOLEAN DEFAULT 0, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, - INDEX idx_status_inuse_sold (status, in_use, sold) + INDEX idx_status_inuse_sold (status, in_use, sold), + INDEX idx_extracted (extracted, status, sold) ) ''') diff --git a/init_database.py b/init_database.py index c70bdd1..4209907 100644 --- a/init_database.py +++ b/init_database.py @@ -47,9 +47,11 @@ CREATE TABLE IF NOT EXISTS email_accounts ( cursor_token TEXT, sold BOOLEAN DEFAULT 0, status VARCHAR(20) DEFAULT 'pending', + extracted BOOLEAN DEFAULT 0, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, - INDEX idx_status_inuse_sold (status, in_use, sold) + INDEX idx_status_inuse_sold (status, in_use, sold), + INDEX idx_extracted (extracted, status, sold) ) ''' diff --git a/migrate_db.py b/migrate_db.py index e6b9037..aa43486 100644 --- a/migrate_db.py +++ b/migrate_db.py @@ -81,9 +81,11 @@ async def migrate_data(): cursor_token TEXT, sold BOOLEAN DEFAULT 0, status VARCHAR(20) DEFAULT 'pending', + extracted BOOLEAN DEFAULT 0, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, - INDEX idx_status_inuse_sold (status, in_use, sold) + INDEX idx_status_inuse_sold (status, in_use, sold), + INDEX idx_extracted (extracted, status, sold) ) ''') await mysql_conn.commit() diff --git a/upgrade_database.py b/upgrade_database.py new file mode 100644 index 0000000..82dbfb5 --- /dev/null +++ b/upgrade_database.py @@ -0,0 +1,118 @@ +#!/usr/bin/env python3 +""" +数据库升级脚本 +在已有的数据库中添加新字段和索引 +""" +import asyncio +import sys +import yaml +import traceback +from loguru import logger + +# Windows平台特殊处理,强制使用SelectorEventLoop +if sys.platform.startswith('win'): + asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy()) + +from core.config import Config +from core.database import DatabaseManager + + +async def add_extracted_field(db_manager): + """向email_accounts表添加extracted字段和索引""" + logger.info("正在检查extracted字段...") + + # 检查extracted字段是否存在 + check_query = """ + SELECT COUNT(*) as field_exists + FROM information_schema.COLUMNS + WHERE TABLE_SCHEMA = DATABASE() + AND TABLE_NAME = 'email_accounts' + AND COLUMN_NAME = 'extracted' + """ + + result = await db_manager.fetch_one(check_query) + field_exists = result['field_exists'] > 0 if result else False + + if field_exists: + logger.info("extracted字段已存在,无需添加") + else: + logger.info("添加extracted字段...") + add_field_query = """ + ALTER TABLE email_accounts + ADD COLUMN extracted BOOLEAN DEFAULT 0 + """ + await db_manager.execute(add_field_query) + logger.success("成功添加extracted字段") + + # 检查索引是否存在 + check_index_query = """ + SELECT COUNT(*) as index_exists + FROM information_schema.STATISTICS + WHERE TABLE_SCHEMA = DATABASE() + AND TABLE_NAME = 'email_accounts' + AND INDEX_NAME = 'idx_extracted' + """ + + result = await db_manager.fetch_one(check_index_query) + index_exists = result['index_exists'] > 0 if result else False + + if index_exists: + logger.info("idx_extracted索引已存在,无需添加") + else: + logger.info("添加idx_extracted索引...") + add_index_query = """ + ALTER TABLE email_accounts + ADD INDEX idx_extracted (extracted, status, sold) + """ + await db_manager.execute(add_index_query) + logger.success("成功添加idx_extracted索引") + + logger.info("数据库升级完成") + + # 设置所有注册成功且已售出的账号的extracted状态 + update_query = """ + UPDATE email_accounts + SET extracted = 0 + WHERE status = 'success' AND sold = 1 AND extracted IS NULL + """ + affected = await db_manager.execute(update_query) + logger.info(f"已更新 {affected} 条记录的extracted状态") + + +async def upgrade_database(): + """升级数据库结构""" + logger.info("开始数据库升级...") + + try: + # 加载配置 + config = Config.from_yaml() + + # 创建数据库管理器并初始化 + db_manager = DatabaseManager(config) + await db_manager.initialize() + + try: + # 添加extracted字段和索引 + await add_extracted_field(db_manager) + + logger.success("数据库升级成功完成") + + finally: + # 清理资源 + await db_manager.cleanup() + + except Exception as e: + logger.error(f"数据库升级失败: {str(e)}") + logger.error(traceback.format_exc()) + + +if __name__ == "__main__": + # 设置日志 + logger.remove() + logger.add(sys.stderr, level="INFO") + logger.add("upgrade_database.log", rotation="1 MB", level="DEBUG") + + # 执行升级 + logger.info("启动数据库升级") + asyncio.run(upgrade_database()) + logger.info("数据库升级脚本执行完毕") \ No newline at end of file diff --git a/upload_account.py b/upload_account.py new file mode 100644 index 0000000..6d720ba --- /dev/null +++ b/upload_account.py @@ -0,0 +1,455 @@ +import asyncio +import sys +import json +from typing import List, Dict, Any +import aiohttp + +from loguru import logger + +from core.config import Config +from core.database import DatabaseManager +from core.logger import setup_logger + + +class AccountUploader: + def __init__(self): + self.config = Config.from_yaml() + self.logger = setup_logger(self.config) + self.db_manager = DatabaseManager(self.config) + self.api_url = "https://cursorapi.nosqli.com/admin/api.AutoCursor/commonadd" + self.batch_size = 100 + + # 获取hostname + self.hostname = getattr(self.config, "hostname", None) + if not self.hostname: + # 尝试从config.yaml中的server_config获取 + server_config = getattr(self.config, "server_config", None) + if server_config: + self.hostname = getattr(server_config, "hostname", "unknown") + else: + self.hostname = "unknown" + logger.warning(f"未在配置中找到hostname,使用默认值: {self.hostname}") + + # 添加代理设置 + self.proxy = None + if hasattr(self.config, "proxy_config") and hasattr(self.config.proxy_config, "api_proxy"): + self.proxy = self.config.proxy_config.api_proxy + logger.info(f"使用API代理: {self.proxy}") + self.use_proxy = self.proxy is not None + + async def initialize(self): + """初始化数据库""" + await self.db_manager.initialize() + + async def cleanup(self): + """清理资源""" + await self.db_manager.cleanup() + + async def get_success_accounts(self, limit: int = 100, last_id: int = 0) -> List[Dict[str, Any]]: + """获取状态为success且未提取的账号""" + query = """ + SELECT + id, email, password, cursor_password, cursor_cookie, cursor_token + FROM email_accounts + WHERE status = 'success' AND sold = 1 AND extracted = 0 + AND id > %s + ORDER BY id ASC + LIMIT %s + """ + rows = await self.db_manager.fetch_all(query, (last_id, limit)) + + accounts = [] + for row in rows: + account = { + "id": row["id"], + "email": row["email"], + "password": row["password"], + "cursor_password": row["cursor_password"], + "cursor_cookie": row["cursor_cookie"], + "cursor_token": row["cursor_token"] + } + accounts.append(account) + + return accounts + + async def count_success_accounts(self) -> int: + """统计未提取的成功账号数量""" + query = "SELECT COUNT(*) FROM email_accounts WHERE status = 'success' AND sold = 1 AND extracted = 0" + result = await self.db_manager.fetch_one(query) + if result: + count = result.get("COUNT(*)", 0) + return count + return 0 + + async def mark_as_extracted(self, account_ids: List[int]) -> bool: + """将账号标记为已提取""" + if not account_ids: + return True + + try: + placeholders = ', '.join(['%s' for _ in account_ids]) + + query = f""" + UPDATE email_accounts + SET + extracted = 1, + updated_at = CURRENT_TIMESTAMP + WHERE id IN ({placeholders}) + """ + + await self.db_manager.execute(query, tuple(account_ids)) + return True + + except Exception as e: + self.logger.error(f"标记账号为已提取时出错: {e}") + return False + + async def upload_accounts(self, accounts: List[Dict[str, Any]]) -> bool: + """上传账号到API""" + if not accounts: + return True + + try: + # 准备上传数据 + upload_data = [] + for account in accounts: + # 根据API需要的格式构建账号项 + upload_item = { + "email": account["email"], + "password": account["password"], # 同时提供password字段 + "email_password": account["password"], # 同时提供email_password字段 + "cursor_email": account["email"], + "cursor_password": account["cursor_password"], + "cookie": account["cursor_cookie"] or "", # 确保不为None + "token": account.get("cursor_token", ""), + "hostname": self.hostname # 添加hostname参数 + } + # 确保所有必须字段都有值 + for key, value in upload_item.items(): + if value is None: + upload_item[key] = "" # 将None替换为空字符串 + + upload_data.append(upload_item) + + # 打印上传数据的结构(去掉长字符串的详细内容) + debug_data = [] + for item in upload_data[:2]: # 只打印前2个账号作为示例 + debug_item = item.copy() + if "cookie" in debug_item and debug_item["cookie"]: + debug_item["cookie"] = debug_item["cookie"][:20] + "..." if len(debug_item["cookie"]) > 20 else debug_item["cookie"] + if "token" in debug_item and debug_item["token"]: + debug_item["token"] = debug_item["token"][:20] + "..." if len(debug_item["token"]) > 20 else debug_item["token"] + debug_data.append(debug_item) + + self.logger.debug(f"准备上传数据示例: {json.dumps(debug_data, ensure_ascii=False)}") + self.logger.debug(f"API URL: {self.api_url}") + self.logger.info(f"上传账号,服务器标识: {self.hostname}") + + # 发送请求 + # 使用代理创建ClientSession + connector = aiohttp.TCPConnector(ssl=False) # 禁用SSL验证以防代理问题 + async with aiohttp.ClientSession(connector=connector) as session: + # 添加超时设置 + timeout = aiohttp.ClientTimeout(total=60) # 增加超时时间 + + # 准备代理配置 + proxy = self.proxy if self.use_proxy else None + if self.use_proxy: + self.logger.debug(f"通过代理发送请求: {self.proxy.split('@')[1] if '@' in self.proxy else self.proxy}") + else: + self.logger.debug("不使用代理,直接连接API") + + # 根据API错误信息,确保发送的是账号数组 + self.logger.debug(f"发送账号数组,共 {len(upload_data)} 条记录") + + try: + # 直接发送数组格式,使用代理 + async with session.post(self.api_url, json=upload_data, timeout=timeout, proxy=proxy) as response: + response_text = await response.text() + self.logger.debug(f"API响应状态码: {response.status}") + self.logger.debug(f"API响应内容: {response_text}") + + if response.status != 200: + self.logger.error(f"API响应错误 - 状态码: {response.status}") + return True # 即使HTTP错误也继续处理下一批 + + # 解析响应 + try: + result = json.loads(response_text) + except json.JSONDecodeError: + self.logger.error(f"响应不是有效的JSON: {response_text}") + return True # JSON解析错误也继续处理下一批 + + # 判断上传是否成功 - 修改判断逻辑 + # API返回code为0表示成功 + if result.get("code") == 0: + # 检查data中的成功计数 + success_count = result.get("data", {}).get("success", 0) + failed_count = result.get("data", {}).get("failed", 0) + + self.logger.info(f"API返回结果: 成功: {success_count}, 失败: {failed_count}") + + if success_count > 0: + self.logger.success(f"成功上传 {success_count} 个账号") + return True + else: + # 检查详细错误信息 + details = result.get("data", {}).get("details", []) + if details: + for detail in details[:3]: # 只显示前三个错误 + self.logger.error(f"账号 {detail.get('email')} 上传失败: {detail.get('message')}") + # 输出更详细的错误信息,帮助诊断 + self.logger.debug(f"错误账号详情: {json.dumps(detail, ensure_ascii=False)}") + + self.logger.error(f"账号上传全部失败: {result.get('msg', '未知错误')}") + return True # 即使全部失败也继续处理下一批 + else: + self.logger.error(f"上传失败: {result.get('msg', '未知错误')}") + + # 检查是否有详细错误信息 + if 'data' in result: + self.logger.error(f"错误详情: {json.dumps(result['data'], ensure_ascii=False)}") + + return True # API返回错误也继续处理下一批 + + except aiohttp.ClientError as e: + self.logger.error(f"HTTP请求错误: {str(e)}") + return True # 网络错误也继续处理下一批 + + except Exception as e: + self.logger.error(f"上传账号时出错: {str(e)}") + import traceback + self.logger.error(f"错误堆栈: {traceback.format_exc()}") + return True # 其他错误也继续处理下一批 + + async def process_accounts(self, max_batches: int = 0) -> int: + """处理账号上传,max_batches=0表示处理所有批次""" + total_count = await self.count_success_accounts() + + if total_count == 0: + self.logger.info("没有找到待上传的账号") + return 0 + + self.logger.info(f"找到 {total_count} 个待上传账号") + + processed_count = 0 + batch_count = 0 + failed_batches = 0 # 记录失败的批次数 + last_id = 0 # 记录最后处理的ID + + while True: + # 检查是否达到最大批次 + if max_batches > 0 and batch_count >= max_batches: + self.logger.info(f"已达到指定的最大批次数 {max_batches}") + break + + # 获取一批账号 + accounts = await self.get_success_accounts(self.batch_size, last_id) + + if not accounts: + self.logger.info("没有更多账号可上传") + break + + batch_count += 1 + current_batch_size = len(accounts) + self.logger.info(f"处理第 {batch_count} 批 - {current_batch_size} 个账号") + + # 上传账号 + account_ids = [account["id"] for account in accounts] + if last_id < max(account_ids, default=0): + last_id = max(account_ids) # 更新最后处理的ID + + upload_success = await self.upload_accounts(accounts) + + if upload_success: + # 标记为已提取 + mark_success = await self.mark_as_extracted(account_ids) + if mark_success: + processed_count += current_batch_size + self.logger.info(f"成功处理 {current_batch_size} 个账号,已标记为已提取") + else: + failed_batches += 1 + self.logger.error(f"标记账号为已提取失败,批次 {batch_count}") + else: + failed_batches += 1 + self.logger.error(f"上传账号失败,批次 {batch_count}") + + # 避免API请求过于频繁 + await asyncio.sleep(1) + + self.logger.info(f"账号处理完成: 总共 {processed_count} 个, 失败批次 {failed_batches}") + return processed_count + + async def test_api_connection(self) -> bool: + """测试API连接是否正常""" + self.logger.info(f"正在测试API连接: {self.api_url}") + self.logger.info(f"使用代理: {'使用代理' if self.use_proxy else '不使用代理'}") + + try: + # 准备一个简单的测试数据 + test_data = [ + { + "email": "test@example.com", + "password": "test_password", + "email_password": "test_password", # 同时提供两个字段 + "cursor_email": "test@example.com", + "cursor_password": "test_cursor_password", + "cookie": "test_cookie", + "token": "test_token" + } + ] + + # 使用代理创建ClientSession + connector = aiohttp.TCPConnector(ssl=False) # 禁用SSL验证以防代理问题 + async with aiohttp.ClientSession(connector=connector) as session: + # 准备代理配置 + proxy = self.proxy if self.use_proxy else None + + # 尝试HEAD请求检查服务器是否可达 + try: + self.logger.debug("尝试HEAD请求...") + async with session.head(self.api_url, timeout=aiohttp.ClientTimeout(total=10), proxy=proxy) as response: + self.logger.debug(f"HEAD响应状态码: {response.status}") + if response.status >= 400: + self.logger.error(f"API服务器响应错误: {response.status}") + return False + except Exception as e: + self.logger.warning(f"HEAD请求失败: {str(e)},尝试GET请求...") + + # 尝试GET请求 + try: + base_url = self.api_url.split('/admin')[0] + ping_url = f"{base_url}/ping" + self.logger.debug(f"尝试GET请求检查服务健康: {ping_url}") + async with session.get(ping_url, timeout=aiohttp.ClientTimeout(total=10), proxy=proxy) as response: + self.logger.debug(f"GET响应状态码: {response.status}") + if response.status == 200: + response_text = await response.text() + self.logger.debug(f"GET响应内容: {response_text}") + return True + except Exception as e: + self.logger.warning(f"GET请求失败: {str(e)}") + + # 尝试OPTIONS请求获取允许的HTTP方法 + try: + self.logger.debug("尝试OPTIONS请求...") + async with session.options(self.api_url, timeout=aiohttp.ClientTimeout(total=10), proxy=proxy) as response: + self.logger.debug(f"OPTIONS响应状态码: {response.status}") + if response.status == 200: + allowed_methods = response.headers.get('Allow', '') + self.logger.debug(f"允许的HTTP方法: {allowed_methods}") + return 'POST' in allowed_methods + except Exception as e: + self.logger.warning(f"OPTIONS请求失败: {str(e)}") + + # 最后尝试一个小请求 + self.logger.debug("尝试轻量级POST请求...") + try: + # 不同格式的请求体 + formats = [ + {"test": "connection"}, + [{"test": "connection"}], + "test" + ] + + for i, payload in enumerate(formats): + try: + async with session.post(self.api_url, json=payload, timeout=aiohttp.ClientTimeout(total=10), proxy=proxy) as response: + self.logger.debug(f"POST格式{i+1}响应状态码: {response.status}") + response_text = await response.text() + self.logger.debug(f"POST格式{i+1}响应内容: {response_text[:200]}...") + + # 即使返回错误也是说明服务器可达 + if response.status != 0: + return True + except Exception as e: + self.logger.debug(f"POST格式{i+1}失败: {str(e)}") + + return False + + except Exception as e: + self.logger.error(f"测试POST请求失败: {str(e)}") + return False + + except Exception as e: + self.logger.error(f"API连接测试失败: {str(e)}") + return False + + def set_proxy_usage(self, use_proxy: bool): + """设置是否使用代理""" + self.use_proxy = use_proxy + if use_proxy: + self.logger.info(f"已启用HTTP代理: {self.proxy.split('@')[1] if '@' in self.proxy else self.proxy}") + else: + self.logger.info("已禁用HTTP代理,将直接连接") + + +async def main(): + # 解析命令行参数 + import argparse + parser = argparse.ArgumentParser(description="上传成功账号到API并标记为已提取") + parser.add_argument("--batches", type=int, default=0, help="处理的最大批次数,0表示处理所有批次") + parser.add_argument("--batch-size", type=int, default=100, help="每批处理的账号数量") + parser.add_argument("--dry-run", action="store_true", help="预览模式,不实际上传和更新") + parser.add_argument("--test-api", action="store_true", help="测试API连接") + parser.add_argument("--no-proxy", action="store_true", help="不使用代理直接连接") + + args = parser.parse_args() + + # 初始化 + uploader = AccountUploader() + await uploader.initialize() + + if args.batch_size > 0: + uploader.batch_size = args.batch_size + + # 设置是否使用代理 + uploader.set_proxy_usage(not args.no_proxy) + + try: + # 测试API连接 + if args.test_api: + logger.info("开始测试API连接...") + is_connected = await uploader.test_api_connection() + if is_connected: + logger.success("API连接测试成功!") + else: + logger.error("API连接测试失败!") + return + + # 预览模式 + if args.dry_run: + total_count = await uploader.count_success_accounts() + if total_count == 0: + logger.info("没有找到待上传的账号") + return + + logger.info(f"预览模式:找到 {total_count} 个待上传账号") + + # 获取示例账号 + accounts = await uploader.get_success_accounts(min(5, total_count)) + logger.info(f"示例账号 ({len(accounts)}/{total_count}):") + + for account in accounts: + logger.info(f"ID: {account['id']}, 邮箱: {account['email']}, Cursor密码: {account['cursor_password']}") + + batch_count = (total_count + uploader.batch_size - 1) // uploader.batch_size + logger.info(f"预计分 {batch_count} 批上传,每批 {uploader.batch_size} 个账号") + return + + # 实际处理 + processed = await uploader.process_accounts(args.batches) + logger.success(f"处理完成,共上传 {processed} 个账号") + + except Exception as e: + logger.error(f"程序执行出错: {str(e)}") + finally: + await uploader.cleanup() + + +if __name__ == "__main__": + # Windows平台特殊处理,强制使用SelectorEventLoop + if sys.platform.startswith('win'): + asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy()) + + asyncio.run(main())