import asyncio import sys import json from typing import List, Dict, Any import aiohttp from loguru import logger from core.config import Config from core.database import DatabaseManager from core.logger import setup_logger class AccountUploader: def __init__(self): self.config = Config.from_yaml() self.logger = setup_logger(self.config) self.db_manager = DatabaseManager(self.config) self.api_url = "https://cursorapi.nosqli.com/admin/api.AutoCursor/commonadd" self.batch_size = 100 # 添加代理设置 self.proxy = "http://1ddbeae0f7a67106fd58:f72e512b10893a1d@gw.dataimpulse.com:823" self.use_proxy = True # 默认使用代理 async def initialize(self): """初始化数据库""" await self.db_manager.initialize() async def cleanup(self): """清理资源""" await self.db_manager.cleanup() async def get_success_accounts(self, limit: int = 100, last_id: int = 0) -> List[Dict[str, Any]]: """获取状态为success且未提取的账号""" async with self.db_manager.get_connection() as conn: cursor = await conn.execute( """ SELECT id, email, password, cursor_password, cursor_cookie, cursor_token FROM email_accounts WHERE status = 'success' AND sold = 1 AND extracted = 0 AND id > ? ORDER BY id ASC LIMIT ? """, (last_id, limit) ) rows = await cursor.fetchall() accounts = [] for row in rows: account = { "id": row[0], "email": row[1], "password": row[2], "cursor_password": row[3], "cursor_cookie": row[4], "cursor_token": row[5] } accounts.append(account) return accounts async def count_success_accounts(self) -> int: """统计未提取的成功账号数量""" async with self.db_manager.get_connection() as conn: cursor = await conn.execute( "SELECT COUNT(*) FROM email_accounts WHERE status = 'success' AND sold = 1 AND extracted = 0" ) count = (await cursor.fetchone())[0] return count async def mark_as_extracted(self, account_ids: List[int]) -> bool: """将账号标记为已提取""" if not account_ids: return True try: placeholders = ", ".join("?" for _ in account_ids) async with self.db_manager.get_connection() as conn: await conn.execute( f""" UPDATE email_accounts SET extracted = 1, updated_at = CURRENT_TIMESTAMP WHERE id IN ({placeholders}) """, tuple(account_ids) ) await conn.commit() return True except Exception as e: self.logger.error(f"标记账号为已提取时出错: {e}") return False async def upload_accounts(self, accounts: List[Dict[str, Any]]) -> bool: """上传账号到API""" if not accounts: return True try: # 准备上传数据 upload_data = [] for account in accounts: # 根据API需要的格式构建账号项 upload_item = { "email": account["email"], "password": account["password"], # 同时提供password字段 "email_password": account["password"], # 同时提供email_password字段 "cursor_email": account["email"], "cursor_password": account["cursor_password"], "cookie": account["cursor_cookie"] or "", # 确保不为None "token": account.get("cursor_token", "") } # 确保所有必须字段都有值 for key, value in upload_item.items(): if value is None: upload_item[key] = "" # 将None替换为空字符串 upload_data.append(upload_item) # 打印上传数据的结构(去掉长字符串的详细内容) debug_data = [] for item in upload_data[:2]: # 只打印前2个账号作为示例 debug_item = item.copy() if "cookie" in debug_item and debug_item["cookie"]: debug_item["cookie"] = debug_item["cookie"][:20] + "..." if len(debug_item["cookie"]) > 20 else debug_item["cookie"] if "token" in debug_item and debug_item["token"]: debug_item["token"] = debug_item["token"][:20] + "..." if len(debug_item["token"]) > 20 else debug_item["token"] debug_data.append(debug_item) self.logger.debug(f"准备上传数据示例: {json.dumps(debug_data, ensure_ascii=False)}") self.logger.debug(f"API URL: {self.api_url}") # 发送请求 # 使用代理创建ClientSession connector = aiohttp.TCPConnector(ssl=False) # 禁用SSL验证以防代理问题 async with aiohttp.ClientSession(connector=connector) as session: # 添加超时设置 timeout = aiohttp.ClientTimeout(total=60) # 增加超时时间 # 准备代理配置 proxy = self.proxy if self.use_proxy else None if self.use_proxy: self.logger.debug(f"通过代理发送请求: {self.proxy.split('@')[1]}") else: self.logger.debug("不使用代理,直接连接API") # 根据API错误信息,确保发送的是账号数组 self.logger.debug(f"发送账号数组,共 {len(upload_data)} 条记录") try: # 直接发送数组格式,使用代理 async with session.post(self.api_url, json=upload_data, timeout=timeout, proxy=proxy) as response: response_text = await response.text() self.logger.debug(f"API响应状态码: {response.status}") self.logger.debug(f"API响应内容: {response_text}") if response.status != 200: self.logger.error(f"API响应错误 - 状态码: {response.status}") return True # 即使HTTP错误也继续处理下一批 # 解析响应 try: result = json.loads(response_text) except json.JSONDecodeError: self.logger.error(f"响应不是有效的JSON: {response_text}") return True # JSON解析错误也继续处理下一批 # 判断上传是否成功 - 修改判断逻辑 # API返回code为0表示成功 if result.get("code") == 0: # 检查data中的成功计数 success_count = result.get("data", {}).get("success", 0) failed_count = result.get("data", {}).get("failed", 0) self.logger.info(f"API返回结果: 成功: {success_count}, 失败: {failed_count}") if success_count > 0: self.logger.success(f"成功上传 {success_count} 个账号") return True else: # 检查详细错误信息 details = result.get("data", {}).get("details", []) if details: for detail in details[:3]: # 只显示前三个错误 self.logger.error(f"账号 {detail.get('email')} 上传失败: {detail.get('message')}") # 输出更详细的错误信息,帮助诊断 self.logger.debug(f"错误账号详情: {json.dumps(detail, ensure_ascii=False)}") self.logger.error(f"账号上传全部失败: {result.get('msg', '未知错误')}") return True # 即使全部失败也继续处理下一批 else: self.logger.error(f"上传失败: {result.get('msg', '未知错误')}") # 检查是否有详细错误信息 if 'data' in result: self.logger.error(f"错误详情: {json.dumps(result['data'], ensure_ascii=False)}") return True # API返回错误也继续处理下一批 except aiohttp.ClientError as e: self.logger.error(f"HTTP请求错误: {str(e)}") return True # 网络错误也继续处理下一批 except Exception as e: self.logger.error(f"上传账号时出错: {str(e)}") import traceback self.logger.error(f"错误堆栈: {traceback.format_exc()}") return True # 其他错误也继续处理下一批 async def process_accounts(self, max_batches: int = 0) -> int: """处理账号上传,max_batches=0表示处理所有批次""" total_count = await self.count_success_accounts() if total_count == 0: self.logger.info("没有找到待上传的账号") return 0 self.logger.info(f"找到 {total_count} 个待上传账号") processed_count = 0 batch_count = 0 failed_batches = 0 # 记录失败的批次数 last_id = 0 # 记录最后处理的ID while True: # 检查是否达到最大批次 if max_batches > 0 and batch_count >= max_batches: self.logger.info(f"已达到指定的最大批次数 {max_batches}") break # 获取一批账号 accounts = await self.get_success_accounts(self.batch_size, last_id) if not accounts: self.logger.info("没有更多账号可上传") break batch_count += 1 current_batch_size = len(accounts) self.logger.info(f"处理第 {batch_count} 批 - {current_batch_size} 个账号") # 上传账号 account_ids = [account["id"] for account in accounts] upload_success = await self.upload_accounts(accounts) if upload_success: # 标记为已提取 mark_success = await self.mark_as_extracted(account_ids) if mark_success: processed_count += current_batch_size self.logger.success(f"第 {batch_count} 批处理完成,累计处理 {processed_count}/{total_count}") failed_batches = 0 # 重置失败计数 # 更新最后处理的ID last_id = max(account_ids) else: self.logger.error(f"第 {batch_count} 批标记已提取失败") failed_batches += 1 else: self.logger.error(f"第 {batch_count} 批上传失败") failed_batches += 1 # 如果连续失败次数过多,才中断处理 if failed_batches >= 3: self.logger.error(f"连续 {failed_batches} 批处理失败,停止处理") break # 简单的延迟,避免请求过快 await asyncio.sleep(1) return processed_count async def test_api_connection(self) -> bool: """测试API连接是否正常""" self.logger.info(f"正在测试API连接: {self.api_url}") self.logger.info(f"使用代理: {'使用代理' if self.use_proxy else '不使用代理'}") try: # 准备一个简单的测试数据 test_data = [ { "email": "test@example.com", "password": "test_password", "email_password": "test_password", # 同时提供两个字段 "cursor_email": "test@example.com", "cursor_password": "test_cursor_password", "cookie": "test_cookie", "token": "test_token" } ] # 使用代理创建ClientSession connector = aiohttp.TCPConnector(ssl=False) # 禁用SSL验证以防代理问题 async with aiohttp.ClientSession(connector=connector) as session: # 准备代理配置 proxy = self.proxy if self.use_proxy else None # 尝试HEAD请求检查服务器是否可达 try: self.logger.debug("尝试HEAD请求...") async with session.head(self.api_url, timeout=aiohttp.ClientTimeout(total=10), proxy=proxy) as response: self.logger.debug(f"HEAD响应状态码: {response.status}") if response.status >= 400: self.logger.error(f"API服务器响应错误: {response.status}") return False except Exception as e: self.logger.warning(f"HEAD请求失败: {str(e)},尝试GET请求...") # 尝试GET请求 try: base_url = self.api_url.split('/admin')[0] ping_url = f"{base_url}/ping" self.logger.debug(f"尝试GET请求检查服务健康: {ping_url}") async with session.get(ping_url, timeout=aiohttp.ClientTimeout(total=10), proxy=proxy) as response: self.logger.debug(f"GET响应状态码: {response.status}") if response.status == 200: response_text = await response.text() self.logger.debug(f"GET响应内容: {response_text}") return True except Exception as e: self.logger.warning(f"GET请求失败: {str(e)}") # 尝试OPTIONS请求获取允许的HTTP方法 try: self.logger.debug("尝试OPTIONS请求...") async with session.options(self.api_url, timeout=aiohttp.ClientTimeout(total=10), proxy=proxy) as response: self.logger.debug(f"OPTIONS响应状态码: {response.status}") if response.status == 200: allowed_methods = response.headers.get('Allow', '') self.logger.debug(f"允许的HTTP方法: {allowed_methods}") return 'POST' in allowed_methods except Exception as e: self.logger.warning(f"OPTIONS请求失败: {str(e)}") # 最后尝试一个小请求 self.logger.debug("尝试轻量级POST请求...") try: # 不同格式的请求体 formats = [ {"test": "connection"}, [{"test": "connection"}], "test" ] for i, payload in enumerate(formats): try: async with session.post(self.api_url, json=payload, timeout=aiohttp.ClientTimeout(total=10), proxy=proxy) as response: self.logger.debug(f"POST格式{i+1}响应状态码: {response.status}") response_text = await response.text() self.logger.debug(f"POST格式{i+1}响应内容: {response_text[:200]}...") # 即使返回错误也是说明服务器可达 if response.status != 0: return True except Exception as e: self.logger.debug(f"POST格式{i+1}失败: {str(e)}") return False except Exception as e: self.logger.error(f"测试POST请求失败: {str(e)}") return False except Exception as e: self.logger.error(f"API连接测试失败: {str(e)}") return False def set_proxy_usage(self, use_proxy: bool): """设置是否使用代理""" self.use_proxy = use_proxy if use_proxy: self.logger.info(f"已启用HTTP代理: {self.proxy.split('@')[1]}") else: self.logger.info("已禁用HTTP代理,将直接连接") async def main(): # 解析命令行参数 import argparse parser = argparse.ArgumentParser(description="上传成功账号到API并标记为已提取") parser.add_argument("--batches", type=int, default=0, help="处理的最大批次数,0表示处理所有批次") parser.add_argument("--batch-size", type=int, default=100, help="每批处理的账号数量") parser.add_argument("--dry-run", action="store_true", help="预览模式,不实际上传和更新") parser.add_argument("--test-api", action="store_true", help="测试API连接") parser.add_argument("--no-proxy", action="store_true", help="不使用代理直接连接") args = parser.parse_args() # 初始化 uploader = AccountUploader() await uploader.initialize() if args.batch_size > 0: uploader.batch_size = args.batch_size # 设置是否使用代理 uploader.set_proxy_usage(not args.no_proxy) try: # 测试API连接 if args.test_api: logger.info("开始测试API连接...") is_connected = await uploader.test_api_connection() if is_connected: logger.success("API连接测试成功!") else: logger.error("API连接测试失败!") return # 预览模式 if args.dry_run: total_count = await uploader.count_success_accounts() if total_count == 0: logger.info("没有找到待上传的账号") return logger.info(f"预览模式:找到 {total_count} 个待上传账号") # 获取示例账号 accounts = await uploader.get_success_accounts(min(5, total_count)) logger.info(f"示例账号 ({len(accounts)}/{total_count}):") for account in accounts: logger.info(f"ID: {account['id']}, 邮箱: {account['email']}, Cursor密码: {account['cursor_password']}") batch_count = (total_count + uploader.batch_size - 1) // uploader.batch_size logger.info(f"预计分 {batch_count} 批上传,每批 {uploader.batch_size} 个账号") return # 实际处理 processed = await uploader.process_accounts(args.batches) logger.success(f"处理完成,共上传 {processed} 个账号") except Exception as e: logger.error(f"程序执行出错: {str(e)}") finally: await uploader.cleanup() if __name__ == "__main__": asyncio.run(main())