import os import json import time import random import string import requests import logging from datetime import datetime from typing import Optional from urllib3.exceptions import InsecureRequestWarning # 禁用 SSL 警告 requests.packages.urllib3.disable_warnings(InsecureRequestWarning) class AccountSync: def __init__(self, api_choice: int = 1, max_retries: int = 3, retry_delay: int = 5): self.api_base_url = "https://cursorapi.nosqli.com/admin" # API地址写死 self.api_choice = api_choice # 1=听泉助手池, 2=高质量号池 self.max_retries = max_retries self.retry_delay = retry_delay def get_machine_id(self) -> str: """生成随机machine_id""" return "".join(random.choices(string.ascii_letters + string.digits, k=32)) def sync_account(self, account_info: dict) -> bool: """同步账号信息到服务器""" retry_count = 0 while retry_count < self.max_retries: try: # 根据选择确定API端点 endpoint = f"{self.api_base_url}/api.account/{'addpool' if self.api_choice == 2 else 'add'}" logging.info(f"使用API端点: {endpoint}") headers = { "Content-Type": "application/json", "Accept": "application/json" } # 获取machine_id machine_id = self.get_machine_id() logging.info(f"使用随机生成的 machine_id: {machine_id}") proxy_host = account_info.get("proxy_host", "") proxy_port = account_info.get("proxy_port", "") proxy_username = account_info.get("proxy_username", "") proxy_password = account_info.get("proxy_password", "") current_proxies = { 'http': f'http://{proxy_username}:{proxy_password}@{proxy_host}:{proxy_port}', 'https': f'http://{proxy_username}:{proxy_password}@{proxy_host}:{proxy_port}' } # 准备请求数据 data = { "email": account_info["email"], "password": account_info["password"], "access_token": account_info.get("token", ""), # 使用token作为access_token "refresh_token": account_info.get("token", ""), # 使用token作为refresh_token "machine_id": machine_id, "user_agent": os.getenv("BROWSER_USER_AGENT", account_info.get("user_agent", ""),), "registration_time": account_info.get("register_time", datetime.now().strftime("%Y-%m-%d %H:%M:%S")), "first_name": account_info.get("first_name", ""), "last_name": account_info.get("last_name", ""), "ip_address": proxy_host, "proxy_info": current_proxies, "proxy_data": account_info.get("proxy_data", ""), "country": account_info.get("country", ""), "batch_index": account_info.get("batch_index", "") } logging.info(f"请求数据: {json.dumps(data, ensure_ascii=False)}") # 打印请求信息 logging.info(f"请求 URL: {endpoint}") logging.info(f"请求数据: {json.dumps(data, ensure_ascii=False)}") # 发送POST请求 response = requests.post( endpoint, json=data, headers=headers, timeout=10, verify=False, proxies=current_proxies ) # 打印响应状态和内容 logging.info(f"响应状态码: {response.status_code}") logging.info(f"响应头: {dict(response.headers)}") logging.info(f"响应内容: {response.text[:200]}") if response.status_code == 502: logging.error("服务器网关错误(502),请检查 API 地址是否正确") time.sleep(self.retry_delay) retry_count += 1 continue try: response_data = response.json() if response_data["code"] == 200 and response_data.get("msg") == "添加成功": logging.info(f"账号 {account_info['email']} 同步成功") return True elif response_data["code"] == 400: logging.warning(f"账号 {account_info['email']} 已存在") return False elif response_data["code"] == 500: logging.error(f"服务器内部错误(500),第 {retry_count + 1} 次重试") time.sleep(self.retry_delay) retry_count += 1 continue else: logging.error(f"账号 {account_info['email']} 同步失败: {response_data.get('msg', '未知错误')}") time.sleep(self.retry_delay) retry_count += 1 continue except json.JSONDecodeError: logging.error(f"服务器返回的不是有效的JSON数据: {response.text[:200]}") time.sleep(self.retry_delay) retry_count += 1 continue except requests.exceptions.RequestException as e: logging.error(f"请求失败: {str(e)}") time.sleep(self.retry_delay) retry_count += 1 continue except Exception as e: logging.error(f"同步账号 {account_info['email']} 时出错: {str(e)}") time.sleep(self.retry_delay) retry_count += 1 continue logging.error(f"账号 {account_info['email']} 同步失败,已重试 {self.max_retries} 次") return False