Files
auto_cursor/upload_account.py
2025-04-01 16:25:08 +08:00

456 lines
21 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import asyncio
import sys
import json
from typing import List, Dict, Any
import aiohttp
from loguru import logger
from core.config import Config
from core.database import DatabaseManager
from core.logger import setup_logger
class AccountUploader:
def __init__(self):
self.config = Config.from_yaml()
self.logger = setup_logger(self.config)
self.db_manager = DatabaseManager(self.config)
self.api_url = "https://cursorapi.nosqli.com/admin/api.AutoCursor/commonadd"
self.batch_size = 100
# 获取hostname
self.hostname = getattr(self.config, "hostname", None)
if not self.hostname:
# 尝试从config.yaml中的server_config获取
server_config = getattr(self.config, "server_config", None)
if server_config:
self.hostname = getattr(server_config, "hostname", "unknown")
else:
self.hostname = "unknown"
logger.warning(f"未在配置中找到hostname使用默认值: {self.hostname}")
# 添加代理设置
self.proxy = None
if hasattr(self.config, "proxy_config") and hasattr(self.config.proxy_config, "api_proxy"):
self.proxy = self.config.proxy_config.api_proxy
logger.info(f"使用API代理: {self.proxy}")
self.use_proxy = self.proxy is not None
async def initialize(self):
"""初始化数据库"""
await self.db_manager.initialize()
async def cleanup(self):
"""清理资源"""
await self.db_manager.cleanup()
async def get_success_accounts(self, limit: int = 100, last_id: int = 0) -> List[Dict[str, Any]]:
"""获取状态为success且未提取的账号"""
query = """
SELECT
id, email, password, cursor_password, cursor_cookie, cursor_token
FROM email_accounts
WHERE status = 'success' AND sold = 1 AND extracted = 0
AND id > %s
ORDER BY id ASC
LIMIT %s
"""
rows = await self.db_manager.fetch_all(query, (last_id, limit))
accounts = []
for row in rows:
account = {
"id": row["id"],
"email": row["email"],
"password": row["password"],
"cursor_password": row["cursor_password"],
"cursor_cookie": row["cursor_cookie"],
"cursor_token": row["cursor_token"]
}
accounts.append(account)
return accounts
async def count_success_accounts(self) -> int:
"""统计未提取的成功账号数量"""
query = "SELECT COUNT(*) FROM email_accounts WHERE status = 'success' AND sold = 1 AND extracted = 0"
result = await self.db_manager.fetch_one(query)
if result:
count = result.get("COUNT(*)", 0)
return count
return 0
async def mark_as_extracted(self, account_ids: List[int]) -> bool:
"""将账号标记为已提取"""
if not account_ids:
return True
try:
placeholders = ', '.join(['%s' for _ in account_ids])
query = f"""
UPDATE email_accounts
SET
extracted = 1,
updated_at = CURRENT_TIMESTAMP
WHERE id IN ({placeholders})
"""
await self.db_manager.execute(query, tuple(account_ids))
return True
except Exception as e:
self.logger.error(f"标记账号为已提取时出错: {e}")
return False
async def upload_accounts(self, accounts: List[Dict[str, Any]]) -> bool:
"""上传账号到API"""
if not accounts:
return True
try:
# 准备上传数据
upload_data = []
for account in accounts:
# 根据API需要的格式构建账号项
upload_item = {
"email": account["email"],
"password": account["password"], # 同时提供password字段
"email_password": account["password"], # 同时提供email_password字段
"cursor_email": account["email"],
"cursor_password": account["cursor_password"],
"cookie": account["cursor_cookie"] or "", # 确保不为None
"token": account.get("cursor_token", ""),
"hostname": self.hostname # 添加hostname参数
}
# 确保所有必须字段都有值
for key, value in upload_item.items():
if value is None:
upload_item[key] = "" # 将None替换为空字符串
upload_data.append(upload_item)
# 打印上传数据的结构(去掉长字符串的详细内容)
debug_data = []
for item in upload_data[:2]: # 只打印前2个账号作为示例
debug_item = item.copy()
if "cookie" in debug_item and debug_item["cookie"]:
debug_item["cookie"] = debug_item["cookie"][:20] + "..." if len(debug_item["cookie"]) > 20 else debug_item["cookie"]
if "token" in debug_item and debug_item["token"]:
debug_item["token"] = debug_item["token"][:20] + "..." if len(debug_item["token"]) > 20 else debug_item["token"]
debug_data.append(debug_item)
self.logger.debug(f"准备上传数据示例: {json.dumps(debug_data, ensure_ascii=False)}")
self.logger.debug(f"API URL: {self.api_url}")
self.logger.info(f"上传账号,服务器标识: {self.hostname}")
# 发送请求
# 使用代理创建ClientSession
connector = aiohttp.TCPConnector(ssl=False) # 禁用SSL验证以防代理问题
async with aiohttp.ClientSession(connector=connector) as session:
# 添加超时设置
timeout = aiohttp.ClientTimeout(total=60) # 增加超时时间
# 准备代理配置
proxy = self.proxy if self.use_proxy else None
if self.use_proxy:
self.logger.debug(f"通过代理发送请求: {self.proxy.split('@')[1] if '@' in self.proxy else self.proxy}")
else:
self.logger.debug("不使用代理直接连接API")
# 根据API错误信息确保发送的是账号数组
self.logger.debug(f"发送账号数组,共 {len(upload_data)} 条记录")
try:
# 直接发送数组格式,使用代理
async with session.post(self.api_url, json=upload_data, timeout=timeout, proxy=proxy) as response:
response_text = await response.text()
self.logger.debug(f"API响应状态码: {response.status}")
self.logger.debug(f"API响应内容: {response_text}")
if response.status != 200:
self.logger.error(f"API响应错误 - 状态码: {response.status}")
return True # 即使HTTP错误也继续处理下一批
# 解析响应
try:
result = json.loads(response_text)
except json.JSONDecodeError:
self.logger.error(f"响应不是有效的JSON: {response_text}")
return True # JSON解析错误也继续处理下一批
# 判断上传是否成功 - 修改判断逻辑
# API返回code为0表示成功
if result.get("code") == 0:
# 检查data中的成功计数
success_count = result.get("data", {}).get("success", 0)
failed_count = result.get("data", {}).get("failed", 0)
self.logger.info(f"API返回结果: 成功: {success_count}, 失败: {failed_count}")
if success_count > 0:
self.logger.success(f"成功上传 {success_count} 个账号")
return True
else:
# 检查详细错误信息
details = result.get("data", {}).get("details", [])
if details:
for detail in details[:3]: # 只显示前三个错误
self.logger.error(f"账号 {detail.get('email')} 上传失败: {detail.get('message')}")
# 输出更详细的错误信息,帮助诊断
self.logger.debug(f"错误账号详情: {json.dumps(detail, ensure_ascii=False)}")
self.logger.error(f"账号上传全部失败: {result.get('msg', '未知错误')}")
return True # 即使全部失败也继续处理下一批
else:
self.logger.error(f"上传失败: {result.get('msg', '未知错误')}")
# 检查是否有详细错误信息
if 'data' in result:
self.logger.error(f"错误详情: {json.dumps(result['data'], ensure_ascii=False)}")
return True # API返回错误也继续处理下一批
except aiohttp.ClientError as e:
self.logger.error(f"HTTP请求错误: {str(e)}")
return True # 网络错误也继续处理下一批
except Exception as e:
self.logger.error(f"上传账号时出错: {str(e)}")
import traceback
self.logger.error(f"错误堆栈: {traceback.format_exc()}")
return True # 其他错误也继续处理下一批
async def process_accounts(self, max_batches: int = 0) -> int:
"""处理账号上传max_batches=0表示处理所有批次"""
total_count = await self.count_success_accounts()
if total_count == 0:
self.logger.info("没有找到待上传的账号")
return 0
self.logger.info(f"找到 {total_count} 个待上传账号")
processed_count = 0
batch_count = 0
failed_batches = 0 # 记录失败的批次数
last_id = 0 # 记录最后处理的ID
while True:
# 检查是否达到最大批次
if max_batches > 0 and batch_count >= max_batches:
self.logger.info(f"已达到指定的最大批次数 {max_batches}")
break
# 获取一批账号
accounts = await self.get_success_accounts(self.batch_size, last_id)
if not accounts:
self.logger.info("没有更多账号可上传")
break
batch_count += 1
current_batch_size = len(accounts)
self.logger.info(f"处理第 {batch_count} 批 - {current_batch_size} 个账号")
# 上传账号
account_ids = [account["id"] for account in accounts]
if last_id < max(account_ids, default=0):
last_id = max(account_ids) # 更新最后处理的ID
upload_success = await self.upload_accounts(accounts)
if upload_success:
# 标记为已提取
mark_success = await self.mark_as_extracted(account_ids)
if mark_success:
processed_count += current_batch_size
self.logger.info(f"成功处理 {current_batch_size} 个账号,已标记为已提取")
else:
failed_batches += 1
self.logger.error(f"标记账号为已提取失败,批次 {batch_count}")
else:
failed_batches += 1
self.logger.error(f"上传账号失败,批次 {batch_count}")
# 避免API请求过于频繁
await asyncio.sleep(1)
self.logger.info(f"账号处理完成: 总共 {processed_count} 个, 失败批次 {failed_batches}")
return processed_count
async def test_api_connection(self) -> bool:
"""测试API连接是否正常"""
self.logger.info(f"正在测试API连接: {self.api_url}")
self.logger.info(f"使用代理: {'使用代理' if self.use_proxy else '不使用代理'}")
try:
# 准备一个简单的测试数据
test_data = [
{
"email": "test@example.com",
"password": "test_password",
"email_password": "test_password", # 同时提供两个字段
"cursor_email": "test@example.com",
"cursor_password": "test_cursor_password",
"cookie": "test_cookie",
"token": "test_token"
}
]
# 使用代理创建ClientSession
connector = aiohttp.TCPConnector(ssl=False) # 禁用SSL验证以防代理问题
async with aiohttp.ClientSession(connector=connector) as session:
# 准备代理配置
proxy = self.proxy if self.use_proxy else None
# 尝试HEAD请求检查服务器是否可达
try:
self.logger.debug("尝试HEAD请求...")
async with session.head(self.api_url, timeout=aiohttp.ClientTimeout(total=10), proxy=proxy) as response:
self.logger.debug(f"HEAD响应状态码: {response.status}")
if response.status >= 400:
self.logger.error(f"API服务器响应错误: {response.status}")
return False
except Exception as e:
self.logger.warning(f"HEAD请求失败: {str(e)}尝试GET请求...")
# 尝试GET请求
try:
base_url = self.api_url.split('/admin')[0]
ping_url = f"{base_url}/ping"
self.logger.debug(f"尝试GET请求检查服务健康: {ping_url}")
async with session.get(ping_url, timeout=aiohttp.ClientTimeout(total=10), proxy=proxy) as response:
self.logger.debug(f"GET响应状态码: {response.status}")
if response.status == 200:
response_text = await response.text()
self.logger.debug(f"GET响应内容: {response_text}")
return True
except Exception as e:
self.logger.warning(f"GET请求失败: {str(e)}")
# 尝试OPTIONS请求获取允许的HTTP方法
try:
self.logger.debug("尝试OPTIONS请求...")
async with session.options(self.api_url, timeout=aiohttp.ClientTimeout(total=10), proxy=proxy) as response:
self.logger.debug(f"OPTIONS响应状态码: {response.status}")
if response.status == 200:
allowed_methods = response.headers.get('Allow', '')
self.logger.debug(f"允许的HTTP方法: {allowed_methods}")
return 'POST' in allowed_methods
except Exception as e:
self.logger.warning(f"OPTIONS请求失败: {str(e)}")
# 最后尝试一个小请求
self.logger.debug("尝试轻量级POST请求...")
try:
# 不同格式的请求体
formats = [
{"test": "connection"},
[{"test": "connection"}],
"test"
]
for i, payload in enumerate(formats):
try:
async with session.post(self.api_url, json=payload, timeout=aiohttp.ClientTimeout(total=10), proxy=proxy) as response:
self.logger.debug(f"POST格式{i+1}响应状态码: {response.status}")
response_text = await response.text()
self.logger.debug(f"POST格式{i+1}响应内容: {response_text[:200]}...")
# 即使返回错误也是说明服务器可达
if response.status != 0:
return True
except Exception as e:
self.logger.debug(f"POST格式{i+1}失败: {str(e)}")
return False
except Exception as e:
self.logger.error(f"测试POST请求失败: {str(e)}")
return False
except Exception as e:
self.logger.error(f"API连接测试失败: {str(e)}")
return False
def set_proxy_usage(self, use_proxy: bool):
"""设置是否使用代理"""
self.use_proxy = use_proxy
if use_proxy:
self.logger.info(f"已启用HTTP代理: {self.proxy.split('@')[1] if '@' in self.proxy else self.proxy}")
else:
self.logger.info("已禁用HTTP代理将直接连接")
async def main():
# 解析命令行参数
import argparse
parser = argparse.ArgumentParser(description="上传成功账号到API并标记为已提取")
parser.add_argument("--batches", type=int, default=0, help="处理的最大批次数0表示处理所有批次")
parser.add_argument("--batch-size", type=int, default=100, help="每批处理的账号数量")
parser.add_argument("--dry-run", action="store_true", help="预览模式,不实际上传和更新")
parser.add_argument("--test-api", action="store_true", help="测试API连接")
parser.add_argument("--no-proxy", action="store_true", help="不使用代理直接连接")
args = parser.parse_args()
# 初始化
uploader = AccountUploader()
await uploader.initialize()
if args.batch_size > 0:
uploader.batch_size = args.batch_size
# 设置是否使用代理
uploader.set_proxy_usage(not args.no_proxy)
try:
# 测试API连接
if args.test_api:
logger.info("开始测试API连接...")
is_connected = await uploader.test_api_connection()
if is_connected:
logger.success("API连接测试成功")
else:
logger.error("API连接测试失败")
return
# 预览模式
if args.dry_run:
total_count = await uploader.count_success_accounts()
if total_count == 0:
logger.info("没有找到待上传的账号")
return
logger.info(f"预览模式:找到 {total_count} 个待上传账号")
# 获取示例账号
accounts = await uploader.get_success_accounts(min(5, total_count))
logger.info(f"示例账号 ({len(accounts)}/{total_count}):")
for account in accounts:
logger.info(f"ID: {account['id']}, 邮箱: {account['email']}, Cursor密码: {account['cursor_password']}")
batch_count = (total_count + uploader.batch_size - 1) // uploader.batch_size
logger.info(f"预计分 {batch_count} 批上传,每批 {uploader.batch_size} 个账号")
return
# 实际处理
processed = await uploader.process_accounts(args.batches)
logger.success(f"处理完成,共上传 {processed} 个账号")
except Exception as e:
logger.error(f"程序执行出错: {str(e)}")
finally:
await uploader.cleanup()
if __name__ == "__main__":
# Windows平台特殊处理强制使用SelectorEventLoop
if sys.platform.startswith('win'):
asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())
asyncio.run(main())