449 lines
20 KiB
Python
449 lines
20 KiB
Python
import asyncio
|
||
import sys
|
||
import json
|
||
from typing import List, Dict, Any
|
||
import aiohttp
|
||
|
||
from loguru import logger
|
||
|
||
from core.config import Config
|
||
from core.database import DatabaseManager
|
||
from core.logger import setup_logger
|
||
|
||
|
||
class AccountUploader:
|
||
def __init__(self):
|
||
self.config = Config.from_yaml()
|
||
self.logger = setup_logger(self.config)
|
||
self.db_manager = DatabaseManager(self.config)
|
||
self.api_url = "https://cursorapi.nosqli.com/admin/api.AutoCursor/commonadd"
|
||
self.batch_size = 100
|
||
# 添加代理设置
|
||
self.proxy = "http://1ddbeae0f7a67106fd58:f72e512b10893a1d@gw.dataimpulse.com:823"
|
||
self.use_proxy = True # 默认使用代理
|
||
|
||
async def initialize(self):
|
||
"""初始化数据库"""
|
||
await self.db_manager.initialize()
|
||
|
||
async def cleanup(self):
|
||
"""清理资源"""
|
||
await self.db_manager.cleanup()
|
||
|
||
async def get_success_accounts(self, limit: int = 100, last_id: int = 0) -> List[Dict[str, Any]]:
|
||
"""获取状态为success且未提取的账号"""
|
||
async with self.db_manager.get_connection() as conn:
|
||
cursor = await conn.execute(
|
||
"""
|
||
SELECT
|
||
id, email, password, cursor_password, cursor_cookie, cursor_token
|
||
FROM email_accounts
|
||
WHERE status = 'success' AND sold = 1 AND extracted = 0
|
||
AND id > ?
|
||
ORDER BY id ASC
|
||
LIMIT ?
|
||
""",
|
||
(last_id, limit)
|
||
)
|
||
|
||
rows = await cursor.fetchall()
|
||
|
||
accounts = []
|
||
for row in rows:
|
||
account = {
|
||
"id": row[0],
|
||
"email": row[1],
|
||
"password": row[2],
|
||
"cursor_password": row[3],
|
||
"cursor_cookie": row[4],
|
||
"cursor_token": row[5]
|
||
}
|
||
accounts.append(account)
|
||
|
||
return accounts
|
||
|
||
async def count_success_accounts(self) -> int:
|
||
"""统计未提取的成功账号数量"""
|
||
async with self.db_manager.get_connection() as conn:
|
||
cursor = await conn.execute(
|
||
"SELECT COUNT(*) FROM email_accounts WHERE status = 'success' AND sold = 1 AND extracted = 0"
|
||
)
|
||
count = (await cursor.fetchone())[0]
|
||
return count
|
||
|
||
async def mark_as_extracted(self, account_ids: List[int]) -> bool:
|
||
"""将账号标记为已提取"""
|
||
if not account_ids:
|
||
return True
|
||
|
||
try:
|
||
placeholders = ", ".join("?" for _ in account_ids)
|
||
|
||
async with self.db_manager.get_connection() as conn:
|
||
await conn.execute(
|
||
f"""
|
||
UPDATE email_accounts
|
||
SET
|
||
extracted = 1,
|
||
updated_at = CURRENT_TIMESTAMP
|
||
WHERE id IN ({placeholders})
|
||
""",
|
||
tuple(account_ids)
|
||
)
|
||
await conn.commit()
|
||
|
||
return True
|
||
|
||
except Exception as e:
|
||
self.logger.error(f"标记账号为已提取时出错: {e}")
|
||
return False
|
||
|
||
async def upload_accounts(self, accounts: List[Dict[str, Any]]) -> bool:
|
||
"""上传账号到API"""
|
||
if not accounts:
|
||
return True
|
||
|
||
try:
|
||
# 准备上传数据
|
||
upload_data = []
|
||
for account in accounts:
|
||
# 根据API需要的格式构建账号项
|
||
upload_item = {
|
||
"email": account["email"],
|
||
"password": account["password"], # 同时提供password字段
|
||
"email_password": account["password"], # 同时提供email_password字段
|
||
"cursor_email": account["email"],
|
||
"cursor_password": account["cursor_password"],
|
||
"cookie": account["cursor_cookie"] or "", # 确保不为None
|
||
"token": account.get("cursor_token", "")
|
||
}
|
||
# 确保所有必须字段都有值
|
||
for key, value in upload_item.items():
|
||
if value is None:
|
||
upload_item[key] = "" # 将None替换为空字符串
|
||
|
||
upload_data.append(upload_item)
|
||
|
||
# 打印上传数据的结构(去掉长字符串的详细内容)
|
||
debug_data = []
|
||
for item in upload_data[:2]: # 只打印前2个账号作为示例
|
||
debug_item = item.copy()
|
||
if "cookie" in debug_item and debug_item["cookie"]:
|
||
debug_item["cookie"] = debug_item["cookie"][:20] + "..." if len(debug_item["cookie"]) > 20 else debug_item["cookie"]
|
||
if "token" in debug_item and debug_item["token"]:
|
||
debug_item["token"] = debug_item["token"][:20] + "..." if len(debug_item["token"]) > 20 else debug_item["token"]
|
||
debug_data.append(debug_item)
|
||
|
||
self.logger.debug(f"准备上传数据示例: {json.dumps(debug_data, ensure_ascii=False)}")
|
||
self.logger.debug(f"API URL: {self.api_url}")
|
||
|
||
# 发送请求
|
||
# 使用代理创建ClientSession
|
||
connector = aiohttp.TCPConnector(ssl=False) # 禁用SSL验证以防代理问题
|
||
async with aiohttp.ClientSession(connector=connector) as session:
|
||
# 添加超时设置
|
||
timeout = aiohttp.ClientTimeout(total=60) # 增加超时时间
|
||
|
||
# 准备代理配置
|
||
proxy = self.proxy if self.use_proxy else None
|
||
if self.use_proxy:
|
||
self.logger.debug(f"通过代理发送请求: {self.proxy.split('@')[1]}")
|
||
else:
|
||
self.logger.debug("不使用代理,直接连接API")
|
||
|
||
# 根据API错误信息,确保发送的是账号数组
|
||
self.logger.debug(f"发送账号数组,共 {len(upload_data)} 条记录")
|
||
|
||
try:
|
||
# 直接发送数组格式,使用代理
|
||
async with session.post(self.api_url, json=upload_data, timeout=timeout, proxy=proxy) as response:
|
||
response_text = await response.text()
|
||
self.logger.debug(f"API响应状态码: {response.status}")
|
||
self.logger.debug(f"API响应内容: {response_text}")
|
||
|
||
if response.status != 200:
|
||
self.logger.error(f"API响应错误 - 状态码: {response.status}")
|
||
return True # 即使HTTP错误也继续处理下一批
|
||
|
||
# 解析响应
|
||
try:
|
||
result = json.loads(response_text)
|
||
except json.JSONDecodeError:
|
||
self.logger.error(f"响应不是有效的JSON: {response_text}")
|
||
return True # JSON解析错误也继续处理下一批
|
||
|
||
# 判断上传是否成功 - 修改判断逻辑
|
||
# API返回code为0表示成功
|
||
if result.get("code") == 0:
|
||
# 检查data中的成功计数
|
||
success_count = result.get("data", {}).get("success", 0)
|
||
failed_count = result.get("data", {}).get("failed", 0)
|
||
|
||
self.logger.info(f"API返回结果: 成功: {success_count}, 失败: {failed_count}")
|
||
|
||
if success_count > 0:
|
||
self.logger.success(f"成功上传 {success_count} 个账号")
|
||
return True
|
||
else:
|
||
# 检查详细错误信息
|
||
details = result.get("data", {}).get("details", [])
|
||
if details:
|
||
for detail in details[:3]: # 只显示前三个错误
|
||
self.logger.error(f"账号 {detail.get('email')} 上传失败: {detail.get('message')}")
|
||
# 输出更详细的错误信息,帮助诊断
|
||
self.logger.debug(f"错误账号详情: {json.dumps(detail, ensure_ascii=False)}")
|
||
|
||
self.logger.error(f"账号上传全部失败: {result.get('msg', '未知错误')}")
|
||
return True # 即使全部失败也继续处理下一批
|
||
else:
|
||
self.logger.error(f"上传失败: {result.get('msg', '未知错误')}")
|
||
|
||
# 检查是否有详细错误信息
|
||
if 'data' in result:
|
||
self.logger.error(f"错误详情: {json.dumps(result['data'], ensure_ascii=False)}")
|
||
|
||
return True # API返回错误也继续处理下一批
|
||
|
||
except aiohttp.ClientError as e:
|
||
self.logger.error(f"HTTP请求错误: {str(e)}")
|
||
return True # 网络错误也继续处理下一批
|
||
|
||
except Exception as e:
|
||
self.logger.error(f"上传账号时出错: {str(e)}")
|
||
import traceback
|
||
self.logger.error(f"错误堆栈: {traceback.format_exc()}")
|
||
return True # 其他错误也继续处理下一批
|
||
|
||
async def process_accounts(self, max_batches: int = 0) -> int:
|
||
"""处理账号上传,max_batches=0表示处理所有批次"""
|
||
total_count = await self.count_success_accounts()
|
||
|
||
if total_count == 0:
|
||
self.logger.info("没有找到待上传的账号")
|
||
return 0
|
||
|
||
self.logger.info(f"找到 {total_count} 个待上传账号")
|
||
|
||
processed_count = 0
|
||
batch_count = 0
|
||
failed_batches = 0 # 记录失败的批次数
|
||
last_id = 0 # 记录最后处理的ID
|
||
|
||
while True:
|
||
# 检查是否达到最大批次
|
||
if max_batches > 0 and batch_count >= max_batches:
|
||
self.logger.info(f"已达到指定的最大批次数 {max_batches}")
|
||
break
|
||
|
||
# 获取一批账号
|
||
accounts = await self.get_success_accounts(self.batch_size, last_id)
|
||
|
||
if not accounts:
|
||
self.logger.info("没有更多账号可上传")
|
||
break
|
||
|
||
batch_count += 1
|
||
current_batch_size = len(accounts)
|
||
self.logger.info(f"处理第 {batch_count} 批 - {current_batch_size} 个账号")
|
||
|
||
# 上传账号
|
||
account_ids = [account["id"] for account in accounts]
|
||
upload_success = await self.upload_accounts(accounts)
|
||
|
||
if upload_success:
|
||
# 标记为已提取
|
||
mark_success = await self.mark_as_extracted(account_ids)
|
||
|
||
if mark_success:
|
||
processed_count += current_batch_size
|
||
self.logger.success(f"第 {batch_count} 批处理完成,累计处理 {processed_count}/{total_count}")
|
||
failed_batches = 0 # 重置失败计数
|
||
# 更新最后处理的ID
|
||
last_id = max(account_ids)
|
||
else:
|
||
self.logger.error(f"第 {batch_count} 批标记已提取失败")
|
||
failed_batches += 1
|
||
else:
|
||
self.logger.error(f"第 {batch_count} 批上传失败")
|
||
failed_batches += 1
|
||
|
||
# 如果连续失败次数过多,才中断处理
|
||
if failed_batches >= 3:
|
||
self.logger.error(f"连续 {failed_batches} 批处理失败,停止处理")
|
||
break
|
||
|
||
# 简单的延迟,避免请求过快
|
||
await asyncio.sleep(1)
|
||
|
||
return processed_count
|
||
|
||
async def test_api_connection(self) -> bool:
|
||
"""测试API连接是否正常"""
|
||
self.logger.info(f"正在测试API连接: {self.api_url}")
|
||
self.logger.info(f"使用代理: {'使用代理' if self.use_proxy else '不使用代理'}")
|
||
|
||
try:
|
||
# 准备一个简单的测试数据
|
||
test_data = [
|
||
{
|
||
"email": "test@example.com",
|
||
"password": "test_password",
|
||
"email_password": "test_password", # 同时提供两个字段
|
||
"cursor_email": "test@example.com",
|
||
"cursor_password": "test_cursor_password",
|
||
"cookie": "test_cookie",
|
||
"token": "test_token"
|
||
}
|
||
]
|
||
|
||
# 使用代理创建ClientSession
|
||
connector = aiohttp.TCPConnector(ssl=False) # 禁用SSL验证以防代理问题
|
||
async with aiohttp.ClientSession(connector=connector) as session:
|
||
# 准备代理配置
|
||
proxy = self.proxy if self.use_proxy else None
|
||
|
||
# 尝试HEAD请求检查服务器是否可达
|
||
try:
|
||
self.logger.debug("尝试HEAD请求...")
|
||
async with session.head(self.api_url, timeout=aiohttp.ClientTimeout(total=10), proxy=proxy) as response:
|
||
self.logger.debug(f"HEAD响应状态码: {response.status}")
|
||
if response.status >= 400:
|
||
self.logger.error(f"API服务器响应错误: {response.status}")
|
||
return False
|
||
except Exception as e:
|
||
self.logger.warning(f"HEAD请求失败: {str(e)},尝试GET请求...")
|
||
|
||
# 尝试GET请求
|
||
try:
|
||
base_url = self.api_url.split('/admin')[0]
|
||
ping_url = f"{base_url}/ping"
|
||
self.logger.debug(f"尝试GET请求检查服务健康: {ping_url}")
|
||
async with session.get(ping_url, timeout=aiohttp.ClientTimeout(total=10), proxy=proxy) as response:
|
||
self.logger.debug(f"GET响应状态码: {response.status}")
|
||
if response.status == 200:
|
||
response_text = await response.text()
|
||
self.logger.debug(f"GET响应内容: {response_text}")
|
||
return True
|
||
except Exception as e:
|
||
self.logger.warning(f"GET请求失败: {str(e)}")
|
||
|
||
# 尝试OPTIONS请求获取允许的HTTP方法
|
||
try:
|
||
self.logger.debug("尝试OPTIONS请求...")
|
||
async with session.options(self.api_url, timeout=aiohttp.ClientTimeout(total=10), proxy=proxy) as response:
|
||
self.logger.debug(f"OPTIONS响应状态码: {response.status}")
|
||
if response.status == 200:
|
||
allowed_methods = response.headers.get('Allow', '')
|
||
self.logger.debug(f"允许的HTTP方法: {allowed_methods}")
|
||
return 'POST' in allowed_methods
|
||
except Exception as e:
|
||
self.logger.warning(f"OPTIONS请求失败: {str(e)}")
|
||
|
||
# 最后尝试一个小请求
|
||
self.logger.debug("尝试轻量级POST请求...")
|
||
try:
|
||
# 不同格式的请求体
|
||
formats = [
|
||
{"test": "connection"},
|
||
[{"test": "connection"}],
|
||
"test"
|
||
]
|
||
|
||
for i, payload in enumerate(formats):
|
||
try:
|
||
async with session.post(self.api_url, json=payload, timeout=aiohttp.ClientTimeout(total=10), proxy=proxy) as response:
|
||
self.logger.debug(f"POST格式{i+1}响应状态码: {response.status}")
|
||
response_text = await response.text()
|
||
self.logger.debug(f"POST格式{i+1}响应内容: {response_text[:200]}...")
|
||
|
||
# 即使返回错误也是说明服务器可达
|
||
if response.status != 0:
|
||
return True
|
||
except Exception as e:
|
||
self.logger.debug(f"POST格式{i+1}失败: {str(e)}")
|
||
|
||
return False
|
||
|
||
except Exception as e:
|
||
self.logger.error(f"测试POST请求失败: {str(e)}")
|
||
return False
|
||
|
||
except Exception as e:
|
||
self.logger.error(f"API连接测试失败: {str(e)}")
|
||
return False
|
||
|
||
def set_proxy_usage(self, use_proxy: bool):
|
||
"""设置是否使用代理"""
|
||
self.use_proxy = use_proxy
|
||
if use_proxy:
|
||
self.logger.info(f"已启用HTTP代理: {self.proxy.split('@')[1]}")
|
||
else:
|
||
self.logger.info("已禁用HTTP代理,将直接连接")
|
||
|
||
|
||
async def main():
|
||
# 解析命令行参数
|
||
import argparse
|
||
parser = argparse.ArgumentParser(description="上传成功账号到API并标记为已提取")
|
||
parser.add_argument("--batches", type=int, default=0, help="处理的最大批次数,0表示处理所有批次")
|
||
parser.add_argument("--batch-size", type=int, default=100, help="每批处理的账号数量")
|
||
parser.add_argument("--dry-run", action="store_true", help="预览模式,不实际上传和更新")
|
||
parser.add_argument("--test-api", action="store_true", help="测试API连接")
|
||
parser.add_argument("--no-proxy", action="store_true", help="不使用代理直接连接")
|
||
|
||
args = parser.parse_args()
|
||
|
||
# 初始化
|
||
uploader = AccountUploader()
|
||
await uploader.initialize()
|
||
|
||
if args.batch_size > 0:
|
||
uploader.batch_size = args.batch_size
|
||
|
||
# 设置是否使用代理
|
||
uploader.set_proxy_usage(not args.no_proxy)
|
||
|
||
try:
|
||
# 测试API连接
|
||
if args.test_api:
|
||
logger.info("开始测试API连接...")
|
||
is_connected = await uploader.test_api_connection()
|
||
if is_connected:
|
||
logger.success("API连接测试成功!")
|
||
else:
|
||
logger.error("API连接测试失败!")
|
||
return
|
||
|
||
# 预览模式
|
||
if args.dry_run:
|
||
total_count = await uploader.count_success_accounts()
|
||
if total_count == 0:
|
||
logger.info("没有找到待上传的账号")
|
||
return
|
||
|
||
logger.info(f"预览模式:找到 {total_count} 个待上传账号")
|
||
|
||
# 获取示例账号
|
||
accounts = await uploader.get_success_accounts(min(5, total_count))
|
||
logger.info(f"示例账号 ({len(accounts)}/{total_count}):")
|
||
|
||
for account in accounts:
|
||
logger.info(f"ID: {account['id']}, 邮箱: {account['email']}, Cursor密码: {account['cursor_password']}")
|
||
|
||
batch_count = (total_count + uploader.batch_size - 1) // uploader.batch_size
|
||
logger.info(f"预计分 {batch_count} 批上传,每批 {uploader.batch_size} 个账号")
|
||
return
|
||
|
||
# 实际处理
|
||
processed = await uploader.process_accounts(args.batches)
|
||
logger.success(f"处理完成,共上传 {processed} 个账号")
|
||
|
||
except Exception as e:
|
||
logger.error(f"程序执行出错: {str(e)}")
|
||
finally:
|
||
await uploader.cleanup()
|
||
|
||
|
||
if __name__ == "__main__":
|
||
asyncio.run(main())
|