7 Commits

Author SHA1 Message Date
huangzhenpc
74770fb3dd 保存现有功能 增加域名和添加时间关联 2025-04-07 15:32:42 +08:00
huangzhenpc
8fc5a04903 保存现有功能 增加域名和添加时间关联 2025-04-07 15:26:55 +08:00
huangzhenpc
92f7c0f3e0 保存现有功能 增加域名和添加时间关联 2025-04-07 15:23:56 +08:00
huangzhenpc
85c3095e98 保存现有功能 增加域名和添加时间关联 2025-04-07 15:13:07 +08:00
huangzhenpc
69bf430525 保存现有功能 增加域名和添加时间关联 2025-04-07 15:08:49 +08:00
huangzhenpc
de057ae183 保存现有功能 增加域名和添加时间关联 2025-04-07 14:55:33 +08:00
huangzhenpc
901c8c95e1 自建邮箱版本 2025-04-07 13:14:29 +08:00
9 changed files with 1035 additions and 262 deletions

View File

@@ -65,6 +65,7 @@ sudo systemctl enable redis
mkdir -p /opt/cursor-service mkdir -p /opt/cursor-service
cd /opt/cursor-service cd /opt/cursor-service
sudo apt install python3-venv python3-full
# 创建并激活虚拟环境 # 创建并激活虚拟环境
python3 -m venv venv python3 -m venv venv
source venv/bin/activate source venv/bin/activate

View File

@@ -11,6 +11,8 @@ import json
import signal import signal
import subprocess import subprocess
import time import time
import random
import string
from datetime import datetime from datetime import datetime
from typing import Dict, List, Optional, Tuple, Any from typing import Dict, List, Optional, Tuple, Any
@@ -23,21 +25,36 @@ if sys.platform.startswith("win"):
from core.config import Config from core.config import Config
from core.database import DatabaseManager from core.database import DatabaseManager
from import_emails import import_emails from services.fetch_manager import FetchManager
from services.self_hosted_email import SelfHostedEmail
from services.proxy_pool import ProxyPool
class AutoCursorService: class AutoCursorService:
def __init__(self): def __init__(self):
self.config = Config.from_yaml() self.config = Config.from_yaml()
self.db_manager = DatabaseManager(self.config) self.db_manager = DatabaseManager(self.config)
self.fetch_manager = FetchManager(self.config)
# 添加EmailManager实例用于操作黑名单
self.email_manager = None
# API相关 # API相关
self.api_base_url = "https://cursorapi.nosqli.com/admin/api.AutoCursor" self.api_base_url = "https://cursorapi.nosqli.com/admin/api.AutoCursor"
self.proxy = None self.proxy = None
# 初始化自建邮箱服务
self.self_hosted_email = None
if hasattr(self.config, 'self_hosted_email_config') and self.config.self_hosted_email_config:
self.self_hosted_email = SelfHostedEmail(
self.fetch_manager,
self.config.self_hosted_email_config.api_base_url,
self.config.self_hosted_email_config.api_key
)
logger.info("自建邮箱服务已初始化")
else:
logger.warning("未配置自建邮箱服务,部分功能可能不可用")
# 初始化代理池
self.proxy_pool = ProxyPool(self.config, self.fetch_manager)
# 获取hostname用于API请求参数 # 获取hostname用于API请求参数
self.hostname = getattr(self.config, "hostname", None) self.hostname = getattr(self.config, "hostname", None)
if not self.hostname: if not self.hostname:
@@ -59,12 +76,12 @@ class AutoCursorService:
self.check_interval = getattr(auto_service_config, "check_interval", 60) self.check_interval = getattr(auto_service_config, "check_interval", 60)
self.upload_interval = getattr(auto_service_config, "upload_interval", 300) self.upload_interval = getattr(auto_service_config, "upload_interval", 300)
self.email_check_threshold = getattr(auto_service_config, "email_check_threshold", 30) self.email_check_threshold = getattr(auto_service_config, "email_check_threshold", 30)
self.email_fetch_count = getattr(auto_service_config, "email_fetch_count", 2) self.email_batch_size = getattr(auto_service_config, "email_batch_size", 10)
else: else:
self.check_interval = 60 # 检查API状态的间隔 self.check_interval = 60 # 检查API状态的间隔
self.upload_interval = 300 # 上传账号间隔(秒) self.upload_interval = 300 # 上传账号间隔(秒)
self.email_check_threshold = 30 # 当可用邮箱少于这个数时获取新邮箱 self.email_check_threshold = 30 # 当可用邮箱少于这个数时获取新邮箱
self.email_fetch_count = 2 # 获取邮箱的次数每次15个 self.email_batch_size = 10 # 每次获取邮箱的数量
# 处理邮箱的最小数量阈值,只要有这么多邮箱就会立即处理 # 处理邮箱的最小数量阈值,只要有这么多邮箱就会立即处理
self.min_email_to_process = 1 self.min_email_to_process = 1
@@ -97,11 +114,6 @@ class AutoCursorService:
logger.info("初始化自动化服务") logger.info("初始化自动化服务")
await self.db_manager.initialize() await self.db_manager.initialize()
# 初始化EmailManager
from services.email_manager import EmailManager
self.email_manager = EmailManager(self.config, self.db_manager)
await self.email_manager.initialize()
# 检查并设置代理 # 检查并设置代理
if hasattr(self.config, "proxy_config") and self.config.proxy_config: if hasattr(self.config, "proxy_config") and self.config.proxy_config:
if hasattr(self.config.proxy_config, "api_proxy") and self.config.proxy_config.api_proxy: if hasattr(self.config.proxy_config, "api_proxy") and self.config.proxy_config.api_proxy:
@@ -153,171 +165,63 @@ class AutoCursorService:
logger.error(f"检查注册状态时出错: {e}") logger.error(f"检查注册状态时出错: {e}")
return self.reg_enabled # 出错时保持当前状态 return self.reg_enabled # 出错时保持当前状态
async def fetch_email_accounts(self) -> List[Dict[str, str]]: async def fetch_email_accounts(self, count: int = 10) -> List[Dict[str, str]]:
"""从API获取邮箱账号 """使用自建邮箱服务获取邮箱账号
Returns:
List[Dict[str, str]]: 邮箱账号列表
"""
url = f"{self.api_base_url}/getemailaccount"
params = {"hostname": self.hostname}
try:
async with aiohttp.ClientSession() as session:
async with session.get(url, params=params, proxy=self.proxy, ssl=False) as response:
if response.status != 200:
logger.error(f"获取邮箱API请求失败状态码: {response.status}")
return []
data = await response.json()
if data.get("code") != 0:
logger.error(f"获取邮箱API返回错误: {data.get('msg', 'Unknown error')}")
return []
accounts = data.get("data", {}).get("accounts", [])
logger.info(f"成功获取 {len(accounts)} 个邮箱账号")
return accounts
except Exception as e:
logger.error(f"获取邮箱账号时出错: {e}")
return []
async def import_email_accounts(self, accounts: List[Dict[str, str]]) -> int:
"""导入邮箱账号到数据库
Args: Args:
accounts: 邮箱账号列表 count: 需要获取的邮箱数量
Returns: Returns:
int: 成功导入的账号数量 List[Dict[str, str]]: 邮箱账号列表每个账号包含email和password字段
""" """
if not accounts: if not self.self_hosted_email:
logger.warning("没有邮箱账号可导入") logger.error("自建邮箱服务未初始化,无法获取邮箱")
return 0 return []
count = 0 result = []
blacklist_added = 0 for _ in range(count):
# 如果有EmailManager确保黑名单初始化
if self.email_manager and self.email_manager.use_redis:
await self.email_manager._ensure_blacklist_initialized()
for account in accounts:
try: try:
email = account.get("email", "") # 获取一个邮箱地址
password = account.get("password", "") email = await self.self_hosted_email.get_email()
client_id = account.get("client_id", "") if not email:
refresh_token = account.get("refresh_token", "") logger.warning("获取邮箱失败,跳过")
if not (email and password and client_id and refresh_token):
logger.warning(f"账号数据不完整: {account}")
continue continue
# 检查邮箱是否已在黑名单中 # 生成随机密码
if self.email_manager and await self.email_manager.is_email_blacklisted(email): password = ''.join(random.choices(string.ascii_letters + string.digits, k=12))
logger.warning(f"跳过黑名单中的邮箱: {email}")
blacklist_added += 1
continue
# 插入数据库 # 添加到结果列表
insert_query = ''' account = {
INSERT INTO email_accounts "email": email,
(email, password, client_id, refresh_token, status) "password": password,
VALUES (%s, %s, %s, %s, 'pending') "client_id": "", # 如果有需要可以生成
ON DUPLICATE KEY UPDATE "refresh_token": "" # 如果有需要可以生成
password = VALUES(password), }
client_id = VALUES(client_id), result.append(account)
refresh_token = VALUES(refresh_token), logger.info(f"已获取邮箱: {email}")
status = 'pending',
updated_at = CURRENT_TIMESTAMP
'''
await self.db_manager.execute( # 每次获取后等待一小段时间,避免请求过快
insert_query, await asyncio.sleep(0.5)
(email, password, client_id, refresh_token)
)
count += 1
except Exception as e: except Exception as e:
logger.error(f"导入邮箱账号时出错: {e}") logger.error(f"获取邮箱时出错: {e}")
await asyncio.sleep(1) # 出错后等待较长时间
if blacklist_added > 0: logger.info(f"本次共获取 {len(result)} 个邮箱账号")
logger.warning(f"已跳过 {blacklist_added} 个黑名单中的邮箱") return result
async def import_self_hosted_emails(self, count: int = 10) -> List[Dict[str, str]]:
"""获取并导入自建邮箱
Args:
count: 要获取的邮箱数量
logger.success(f"成功导入 {count} 个邮箱账号")
return count
async def count_pending_accounts(self) -> int:
"""统计可用的pending状态账号数量"""
if self.email_manager:
# 使用EmailManager的方法确保黑名单一致性
return await self.email_manager.count_pending_accounts()
# 兼容旧代码,直接查询
query = """
SELECT COUNT(*)
FROM email_accounts
WHERE status = 'pending' AND in_use = 0 AND sold = 0
"""
result = await self.db_manager.fetch_one(query)
if result:
return result.get("COUNT(*)", 0)
return 0
async def check_and_fetch_emails(self) -> int:
"""检查并获取邮箱账号(如果需要)
Returns: Returns:
int: 获取的邮箱账号数量 List[Dict[str, str]]: 获取的邮箱列表
""" """
# 检查当前可用邮箱数量 # 直接获取邮箱列表,不再需要导入到数据库
pending_count = await self.count_pending_accounts() emails = await self.fetch_email_accounts(count)
logger.info(f"当前可用邮箱数量: {pending_count}") return emails
if pending_count >= self.email_check_threshold:
logger.info(f"可用邮箱数量充足 ({pending_count} >= {self.email_check_threshold})")
# 确保注册进程已启动
if pending_count >= self.min_email_to_process and self.reg_enabled:
if not self.registration_process or self.registration_process.poll() is not None:
logger.info("有足够邮箱但注册进程未运行,启动注册进程")
await self.start_registration_process()
return 0
# 需要获取新邮箱
logger.info(f"可用邮箱不足 ({pending_count} < {self.email_check_threshold}),准备获取新邮箱")
total_imported = 0
for i in range(self.email_fetch_count):
accounts = await self.fetch_email_accounts()
if not accounts:
logger.warning(f"{i+1} 次获取邮箱失败或无可用邮箱")
break
imported = await self.import_email_accounts(accounts)
total_imported += imported
# 导入后等待一秒,确保数据库状态完全更新
logger.debug("等待数据库状态更新...")
await asyncio.sleep(2)
# 每次获取到新邮箱并成功导入后,立即确保注册进程在运行
if imported > 0 and self.reg_enabled:
# 重新查询一次确保数据是最新的
pending_count = await self.count_pending_accounts()
if pending_count >= self.min_email_to_process:
logger.info(f"已获取到 {imported} 个新邮箱,总可用邮箱数 {pending_count},确保注册进程运行")
if not self.registration_process or self.registration_process.poll() is not None:
await self.start_registration_process()
if imported < 15: # 每次API应返回15个账号
logger.warning(f"获取到的邮箱少于预期 ({imported} < 15),可能没有更多邮箱可用")
break
if i < self.email_fetch_count - 1:
# 在多次请求之间添加延迟
await asyncio.sleep(2)
return total_imported
async def start_registration_process(self): async def start_registration_process(self):
"""启动注册进程""" """启动注册进程"""
@@ -326,16 +230,7 @@ class AutoCursorService:
logger.info("注册进程已在运行中") logger.info("注册进程已在运行中")
return return
# 启动前等待1秒确保数据库状态已更新 logger.info("启动注册进程")
await asyncio.sleep(1)
# 再次检查可用邮箱数量,确保使用最新状态
pending_count = await self.count_pending_accounts()
if pending_count < self.min_email_to_process:
logger.warning(f"可用邮箱数量不足 ({pending_count} < {self.min_email_to_process}),暂不启动注册进程")
return
logger.info(f"{pending_count} 个可用邮箱,启动注册进程")
try: try:
# 获取配置中的batch_size确保并发注册 # 获取配置中的batch_size确保并发注册
batch_size = 1 # 默认值 batch_size = 1 # 默认值
@@ -385,96 +280,122 @@ class AutoCursorService:
self.registration_process = None self.registration_process = None
async def upload_accounts(self): async def upload_accounts(self, accounts: List[Dict[str, Any]]) -> bool:
"""上传已注册成功的账号""" """上传账号到API
logger.info("开始上传注册成功的账号")
try:
# 使用subprocess运行upload_account.py
if sys.platform.startswith("win"):
process = subprocess.Popen(
["python", "upload_account.py"],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
creationflags=subprocess.CREATE_NO_WINDOW
)
else:
process = subprocess.Popen(
["python3", "upload_account.py"],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE
)
# 等待进程完成
stdout, stderr = process.communicate()
if process.returncode != 0:
logger.error(f"上传账号进程异常终止,退出码: {process.returncode}")
if stderr:
logger.error(f"错误输出: {stderr.decode('utf-8', errors='ignore')}")
else:
logger.info("账号上传完成")
if stdout:
# 只记录最后几行输出
output_lines = stdout.decode('utf-8', errors='ignore').strip().split('\n')
for line in output_lines[-5:]:
logger.info(f"Upload: {line}")
Args:
accounts: 要上传的账号列表每个账号包含email、password、cursor_password等信息
Returns:
bool: 是否上传成功
"""
if not accounts:
return True
url = f"{self.api_base_url}/commonadd"
try:
# 准备上传数据
upload_data = []
for account in accounts:
upload_item = {
"email": account["email"],
"email_password": account.get("password", ""), # 使用account的password字段作为email_password
"cursor_email": account["email"],
"cursor_password": account["cursor_password"],
"cookie": account.get("cursor_cookie", "") or "",
"token": account.get("cursor_jwt", ""),
"hostname": self.hostname
}
upload_data.append(upload_item)
# 打印上传数据的部分细节(去除敏感信息)
debug_data = []
for item in upload_data[:2]: # 只打印前2个账号作为示例
debug_item = item.copy()
if "cookie" in debug_item and debug_item["cookie"]:
debug_item["cookie"] = debug_item["cookie"][:20] + "..." if len(debug_item["cookie"]) > 20 else debug_item["cookie"]
if "token" in debug_item and debug_item["token"]:
debug_item["token"] = debug_item["token"][:20] + "..." if len(debug_item["token"]) > 20 else debug_item["token"]
debug_data.append(debug_item)
logger.debug(f"准备上传 {len(upload_data)} 个账号")
logger.debug(f"上传数据示例: {json.dumps(debug_data, ensure_ascii=False)}")
logger.debug(f"API URL: {url}")
# 发送请求
async with aiohttp.ClientSession() as session:
async with session.post(
url,
json=upload_data,
proxy=self.proxy,
ssl=False
) as response:
response_text = await response.text()
logger.debug(f"API响应状态码: {response.status}")
logger.debug(f"API响应内容: {response_text}")
if response.status != 200:
logger.error(f"上传账号API请求失败状态码: {response.status}")
return False
try:
data = json.loads(response_text)
except json.JSONDecodeError:
logger.error(f"解析响应失败非JSON格式: {response_text[:100]}...")
return False
if data.get("code") != 0:
error_msg = data.get("msg", "Unknown error")
logger.error(f"上传账号API返回错误: {error_msg}")
return False
success_count = data.get("data", {}).get("success", 0)
failed_count = data.get("data", {}).get("failed", 0)
# 检查是否有详细的错误信息
if "details" in data.get("data", {}):
details = data.get("data", {}).get("details", [])
if details:
logger.error("错误详情:")
for i, detail in enumerate(details[:5]): # 只显示前5个错误
logger.error(f" 错误 {i+1}: {detail.get('email', '未知邮箱')} - {detail.get('message', '未知错误')}")
logger.info(f"账号上传结果: 成功 {success_count}, 失败 {failed_count}")
return success_count > 0
except Exception as e: except Exception as e:
logger.error(f"执行账号上传脚本时出错: {e}") logger.error(f"上传账号时出错: {e}")
return False
async def run(self): async def run(self):
"""运行服务主循环""" """运行服务主循环"""
logger.info("启动Cursor自动化服务") logger.info("启动Cursor自动化服务")
# 设置永久开启注册
self.reg_enabled = True
logger.info("注册功能已永久开启")
last_upload_time = 0 last_upload_time = 0
while self.running: while self.running:
try: try:
# 1. 检查注册API状态 # 1. 获取邮箱
current_status = await self.check_registration_status() logger.info("准备获取自建邮箱")
emails = await self.import_self_hosted_emails(self.email_batch_size)
if emails:
logger.info(f"成功获取 {len(emails)} 个自建邮箱")
# 状态变化处理 # 2. 确保注册进程正在运行
if current_status != self.reg_enabled: if not self.registration_process or self.registration_process.poll() is not None:
self.reg_enabled = current_status if emails:
logger.info(f"注册状态变化为: {'开启' if self.reg_enabled else '关闭'}") logger.info(f"{len(emails)} 个可用邮箱,启动注册进程")
await self.start_registration_process()
# 状态变化后等待2秒确保数据库状态稳定
await asyncio.sleep(2)
if self.reg_enabled:
# 开启注册时,先检查并获取邮箱
await self.check_and_fetch_emails()
# 检查是否有可用邮箱,有则启动注册进程
pending_count = await self.count_pending_accounts()
if pending_count >= self.min_email_to_process:
logger.info(f"{pending_count} 个可用邮箱,启动注册进程")
await self.start_registration_process()
else: else:
# 关闭注册时,停止注册进程 logger.warning("没有可用邮箱,暂不启动注册进程")
self.stop_registration_process()
# 等待一会,确保上一步操作完全完成 # 3. 等待下一次检查
await asyncio.sleep(1)
# 2. 如果注册已开启,检查并获取邮箱
if self.reg_enabled:
await self.check_and_fetch_emails()
# 确保注册进程正在运行
if not self.registration_process or self.registration_process.poll() is not None:
pending_count = await self.count_pending_accounts()
if pending_count >= self.min_email_to_process:
logger.info(f"发现 {pending_count} 个未处理的邮箱,重新启动注册进程")
await self.start_registration_process()
# 3. 定期上传账号
current_time = time.time()
if current_time - last_upload_time >= self.upload_interval:
await self.upload_accounts()
last_upload_time = current_time
# 上传后等待,确保数据库状态更新
await asyncio.sleep(2)
# 4. 等待下一次检查
logger.debug(f"等待 {self.check_interval} 秒后进行下一次检查") logger.debug(f"等待 {self.check_interval} 秒后进行下一次检查")
for _ in range(self.check_interval): for _ in range(self.check_interval):
if not self.running: if not self.running:
@@ -505,7 +426,7 @@ async def main():
logger.remove() logger.remove()
logger.add(sys.stderr, level="INFO") logger.add(sys.stderr, level="INFO")
logger.add( logger.add(
"auto_cursor_service.log", "logs/auto_cursor_service.log",
rotation="50 MB", rotation="50 MB",
retention="10 days", retention="10 days",
level="DEBUG", level="DEBUG",

View File

@@ -43,13 +43,18 @@ proxy:
# 注册配置 # 注册配置
register: register:
delay_range: [1, 2] delay_range: [1, 2]
batch_size: 15 batch_size: 1
#这里是注册的并发数量 #这里是注册的并发数量
# 邮件配置 # 邮件配置
email: email:
file_path: "email.txt" file_path: "email.txt"
# 自建邮箱服务配置
self_hosted_email:
api_base_url: "https://api.cursorpro.com.cn"
api_key: "" # 如果API需要认证请填写
# 自动服务配置 # 自动服务配置
auto_service: auto_service:
check_interval: 60 # 检查API状态的间隔 check_interval: 60 # 检查API状态的间隔

75
config_example.yaml Normal file
View File

@@ -0,0 +1,75 @@
# 全局配置
global:
max_concurrency: 20
timeout: 30
retry_times: 3
# 服务器配置
server_config:
hostname: "sg424" # 服务器标识用于API调用
# 数据库配置
database:
# SQLite配置兼容旧版本
path: "cursor.db"
pool_size: 10
# MySQL配置
host: "localhost"
port: 3306
username: "auto_cursor_reg"
password: "this_password_jiaqiao"
database: "auto_cursor_reg"
# 是否使用Redis缓存
# 如果使用Python 3.12请确保安装redis>=4.2.0而不是aioredis
use_redis: true
# Redis配置可选当use_redis为true时生效
redis:
host: "127.0.0.1"
port: 6379
password: ""
db: 0
# 代理配置
proxy:
api_url: "https://share.proxy.qg.net/get?key=969331C5&num=1&area=&isp=0&format=txt&seq=\r\n&distinct=false"
batch_size: 100
check_interval: 300
# API专用代理可选
api_proxy: "http://1ddbeae0f7a67106fd58:f72e512b10893a1d@gw.dataimpulse.com:823"
# 注册配置
register:
delay_range: [1, 2]
batch_size: 15
#这里是注册的并发数量
# 邮件配置
email:
file_path: "email.txt"
# 自建邮箱服务配置
# 如果添加了这部分配置,系统将优先使用自建邮箱进行注册
self_hosted_email:
api_base_url: "https://api.cursorpro.com.cn"
api_key: "your_api_key_here" # 可选如果API需要认证
# 自动服务配置
auto_service:
check_interval: 60 # 检查API状态的间隔
upload_interval: 300 # 上传账号间隔(秒)
email_check_threshold: 30 # 当可用邮箱少于这个数时获取新邮箱
captcha:
provider: "capsolver" # 可选值: "capsolver" 或 "yescaptcha"
capsolver:
api_key: "CAP-36D01B0995C7C8705DF68ACCFE4E2004FE182DDA72AC5A80F25F1E3B601C31F0"
website_url: "https://authenticator.cursor.sh"
website_key: "0x4AAAAAAAMNIvC45A4Wjjln"
yescaptcha:
client_key: "a5ef0062c1d2674900e78722c5670e3a3484bc8c64273"
website_url: "https://authenticator.cursor.sh"
website_key: "a5ef0062c1d2674900e78722c5670e3a3484bc8c64273"
use_cn_server: false

View File

@@ -53,6 +53,13 @@ class EmailConfig:
file_path: str file_path: str
@dataclass
class SelfHostedEmailConfig:
"""自建邮箱配置"""
api_base_url: str
api_key: Optional[str] = None
@dataclass @dataclass
class ServerConfig: class ServerConfig:
hostname: str hostname: str
@@ -100,6 +107,7 @@ class Config:
captcha_config: CaptchaConfig = None captcha_config: CaptchaConfig = None
server_config: Optional[ServerConfig] = None server_config: Optional[ServerConfig] = None
auto_service_config: Optional[AutoServiceConfig] = None auto_service_config: Optional[AutoServiceConfig] = None
self_hosted_email_config: Optional[SelfHostedEmailConfig] = None
hostname: Optional[str] = None # 向后兼容 hostname: Optional[str] = None # 向后兼容
@classmethod @classmethod
@@ -137,6 +145,9 @@ class Config:
# 创建自动服务配置对象 # 创建自动服务配置对象
auto_service_config = AutoServiceConfig(**data.get('auto_service', {})) if 'auto_service' in data else None auto_service_config = AutoServiceConfig(**data.get('auto_service', {})) if 'auto_service' in data else None
# 创建自建邮箱配置对象
self_hosted_email_config = SelfHostedEmailConfig(**data.get('self_hosted_email', {})) if 'self_hosted_email' in data else None
# 设置hostname (优先使用server_config中的hostname) # 设置hostname (优先使用server_config中的hostname)
hostname = None hostname = None
if server_config and hasattr(server_config, 'hostname'): if server_config and hasattr(server_config, 'hostname'):
@@ -152,5 +163,6 @@ class Config:
captcha_config=captcha_config, captcha_config=captcha_config,
server_config=server_config, server_config=server_config,
auto_service_config=auto_service_config, auto_service_config=auto_service_config,
self_hosted_email_config=self_hosted_email_config,
hostname=hostname hostname=hostname
) )

2
debug_response.txt Normal file
View File

@@ -0,0 +1,2 @@
0:["$@1",["ARI0To80kOF93OzKZnFMN",null]]
1:{"payload":"Fe26.2*1*fb496fc3e44b03fba263d4904365d08ced6500c32da425de639cf4fd54b97e67*q0VzEVML3A50n2VWgjWCbw*jnmrTYQYUCSf8e1ly67O4w*1749190572271*0180c7834ee9ad01e6c2cb234416f8824ec91f4d96526d5aadbc1569e89f6245*GUdEqde3p6MD0n_JGEPb7-HF5-WEff4CtzpwyNl95k4~2","ttl":5184000000}

162
main.py
View File

@@ -11,10 +11,14 @@ from core.config import Config
from core.database import DatabaseManager from core.database import DatabaseManager
from core.logger import setup_logger from core.logger import setup_logger
from register.register_worker import RegisterWorker from register.register_worker import RegisterWorker
from register.host_register_worker import HostRegisterWorker
from services.email_manager import EmailManager from services.email_manager import EmailManager
from services.fetch_manager import FetchManager from services.fetch_manager import FetchManager
from services.proxy_pool import ProxyPool from services.proxy_pool import ProxyPool
from services.token_pool import TokenPool from services.token_pool import TokenPool
from services.self_hosted_email import SelfHostedEmail
from upload_account import AccountUploader
from auto_cursor_service import AutoCursorService
class CursorRegister: class CursorRegister:
@@ -25,6 +29,8 @@ class CursorRegister:
self.fetch_manager = FetchManager(self.config) self.fetch_manager = FetchManager(self.config)
self.proxy_pool = ProxyPool(self.config, self.fetch_manager) self.proxy_pool = ProxyPool(self.config, self.fetch_manager)
self.token_pool = TokenPool(self.config) self.token_pool = TokenPool(self.config)
# 初始化常规邮箱服务
self.email_manager = EmailManager(self.config, self.db_manager) self.email_manager = EmailManager(self.config, self.db_manager)
self.register_worker = RegisterWorker( self.register_worker = RegisterWorker(
self.config, self.config,
@@ -32,6 +38,25 @@ class CursorRegister:
self.email_manager self.email_manager
) )
# 初始化自建邮箱服务(如果配置了)
self.self_hosted_email = None
self.host_register_worker = None
if hasattr(self.config, 'self_hosted_email_config') and self.config.self_hosted_email_config:
self.self_hosted_email = SelfHostedEmail(
self.fetch_manager,
self.config.self_hosted_email_config.api_base_url,
self.config.self_hosted_email_config.api_key
)
self.logger.info("自建邮箱服务已初始化")
# 初始化自建邮箱注册工作器
self.host_register_worker = HostRegisterWorker(
self.config,
self.fetch_manager,
self.self_hosted_email
)
self.logger.info("自建邮箱注册工作器已初始化")
async def initialize(self): async def initialize(self):
"""初始化数据库""" """初始化数据库"""
await self.db_manager.initialize() await self.db_manager.initialize()
@@ -136,6 +161,105 @@ class CursorRegister:
return successful return successful
async def batch_register_with_host(self, num: int):
"""使用自建邮箱批量注册"""
if not self.host_register_worker:
self.logger.error("未配置自建邮箱注册工作器,无法继续")
return []
try:
self.logger.info(f"开始使用自建邮箱批量注册 {num} 个账号")
# 1. 获取token对
token_pairs = await self.token_pool.batch_generate(num)
if not token_pairs:
self.logger.error("获取token失败终止注册")
return []
actual_num = len(token_pairs)
if actual_num < num:
self.logger.warning(f"只获取到 {actual_num} 对token将减少注册数量")
num = actual_num
# 2. 获取代理
proxies = await self.proxy_pool.batch_get(num)
self.logger.debug(f"代理列表: {proxies}")
self.logger.debug(f"尝试使用的token对数量: {len(token_pairs)}")
# 3. 创建注册任务
tasks = []
for proxy, token_pair in zip(proxies, token_pairs):
task = self.host_register_worker.register(proxy, token_pair)
tasks.append(task)
# 4. 并发执行
results = await asyncio.gather(*tasks, return_exceptions=True)
# 5. 处理结果
successful = []
failed = []
skipped = 0
for i, result in enumerate(results):
if isinstance(result, Exception):
self.logger.error(f"注册任务 {i+1} 失败: {str(result)}")
failed.append(str(result))
elif result is None:
skipped += 1
else:
successful.append(result)
# 添加详细的账号信息日志
self.logger.info("=" * 50)
self.logger.info(f"账号 {i+1} 注册成功:")
self.logger.info(f"邮箱: {result['email']}")
if 'email_password' in result and result['email_password']:
self.logger.info(f"邮箱密码: {result['email_password']}")
else:
self.logger.info("邮箱密码: (无)")
self.logger.info(f"Cursor密码: {result['cursor_password']}")
# 只显示token的前30个字符避免日志过长
token = result.get('cursor_jwt', '')
if token:
self.logger.info(f"Token: {token[:30]}...")
self.logger.info("=" * 50)
# 6. 直接上传成功注册的账号
if successful:
try:
service = AutoCursorService()
await service.initialize()
# 准备上传数据
upload_data = []
for account in successful:
upload_item = {
"email": account["email"],
"password": account.get("email_password", ""), # 使用get并提供默认值
"cursor_password": account["cursor_password"],
"cursor_cookie": account["cursor_cookie"],
"cursor_jwt": account.get("cursor_jwt", "")
}
upload_data.append(upload_item)
# 上传账号
upload_result = await service.upload_accounts(upload_data)
await service.cleanup()
if upload_result:
self.logger.info(f"成功上传 {len(upload_data)} 个账号到服务器")
else:
self.logger.error(f"账号上传失败,请检查日志了解详细信息")
except Exception as e:
self.logger.error(f"上传账号时发生错误: {str(e)}")
self.logger.info(f"注册完成: 成功 {len(successful)}, 失败 {len(failed)}, 跳过 {skipped}")
return successful
except Exception as e:
self.logger.error(f"批量注册失败: {str(e)}")
return []
async def main(): async def main():
register = CursorRegister() register = CursorRegister()
@@ -145,16 +269,18 @@ async def main():
batch_size = register.config.register_config.batch_size batch_size = register.config.register_config.batch_size
total_registered = 0 total_registered = 0
while True: # 直接使用自建邮箱模式,不再检查配置
# 直接检查数据库中是否有可用的邮箱账号 register.logger.info("使用自建邮箱模式")
pending_count = await register.email_manager.count_pending_accounts()
if pending_count <= 0: # 确保已初始化自建邮箱服务
register.logger.info("没有可用的邮箱账号,注册完成") if not register.host_register_worker:
break register.logger.error("自建邮箱注册工作器未初始化,请检查配置")
return
# 执行批量注册 # 自建邮箱模式,直接执行自建邮箱批量注册
register.logger.info(f"发现 {pending_count} 个可用邮箱,开始新一轮批量注册,批次大小: {batch_size}") while True:
results = await register.batch_register(batch_size) register.logger.info(f"开始新一轮自建邮箱批量注册,批次大小: {batch_size}")
results = await register.batch_register_with_host(batch_size)
# 统计结果 # 统计结果
successful = len(results) successful = len(results)
@@ -162,9 +288,8 @@ async def main():
register.logger.info(f"当前总进度: 已注册 {total_registered} 个账号") register.logger.info(f"当前总进度: 已注册 {total_registered} 个账号")
# 批次结束后等待3秒确保所有数据库更新都已完成 # 批次之间的间隔
register.logger.info("本批次注册完成,等待数据库状态完全更新...") await asyncio.sleep(5)
await asyncio.sleep(3)
# 如果本批次注册失败率过高,暂停一段时间 # 如果本批次注册失败率过高,暂停一段时间
if successful < batch_size * 0.5 and successful > 0: # 成功率低于50%但不为零 if successful < batch_size * 0.5 and successful > 0: # 成功率低于50%但不为零
@@ -173,9 +298,16 @@ async def main():
elif successful == 0 and batch_size > 0: # 完全失败 elif successful == 0 and batch_size > 0: # 完全失败
register.logger.error("本批次完全失败可能存在系统问题暂停120秒后继续") register.logger.error("本批次完全失败可能存在系统问题暂停120秒后继续")
await asyncio.sleep(120) await asyncio.sleep(120)
else:
# 正常等待一个较短的时间再继续下一批 # 让用户决定是否继续
await asyncio.sleep(5) try:
answer = input("是否继续下一批注册? (y/n): ").strip().lower()
if answer != 'y':
register.logger.info("用户选择停止注册")
break
except Exception:
# 如果在非交互环境中运行,默认继续
pass
except Exception as e: except Exception as e:
register.logger.error(f"程序执行出错: {str(e)}") register.logger.error(f"程序执行出错: {str(e)}")

View File

@@ -0,0 +1,421 @@
import asyncio
import json
import random
import string
from typing import Optional, Tuple, Dict
from urllib.parse import parse_qs, urlparse
from loguru import logger
from core.config import Config
from core.exceptions import RegisterError
from services.fetch_manager import FetchManager
from services.self_hosted_email import SelfHostedEmail
from services.uuid import ULID
def extract_jwt(cookie_string: str) -> str:
"""从cookie字符串中提取JWT token"""
try:
return cookie_string.split(';')[0].split('=')[1].split('%3A%3A')[1]
except Exception as e:
logger.error(f"[错误] 提取JWT失败: {str(e)}")
return ""
class FormBuilder:
@staticmethod
def _generate_password() -> str:
"""生成随机密码
规则: 12-16位包含大小写字母、数字和特殊字符
"""
length = random.randint(12, 16)
lowercase = string.ascii_lowercase
uppercase = string.ascii_uppercase
digits = string.digits
special = "!@#$%^&*"
# 确保每种字符至少有一个
password = [
random.choice(lowercase),
random.choice(uppercase),
random.choice(digits),
random.choice(special)
]
# 填充剩余长度
all_chars = lowercase + uppercase + digits + special
password.extend(random.choice(all_chars) for _ in range(length - 4))
# 打乱顺序
random.shuffle(password)
return ''.join(password)
@staticmethod
def _generate_name() -> tuple[str, str]:
"""生成随机的名字和姓氏
Returns:
tuple: (first_name, last_name)
"""
first_names = ["Alex", "Sam", "Chris", "Jordan", "Taylor", "Morgan", "Casey", "Drew", "Pat", "Quinn"]
last_names = ["Smith", "Johnson", "Brown", "Davis", "Wilson", "Moore", "Taylor", "Anderson", "Thomas", "Jackson"]
return (
random.choice(first_names),
random.choice(last_names)
)
@staticmethod
def build_register_form(boundary: str, email: str, token: str) -> tuple[str, str]:
"""构建注册表单数据,返回(form_data, password)"""
password = FormBuilder._generate_password()
first_name, last_name = FormBuilder._generate_name()
fields = {
"1_state": "{\"returnTo\":\"/settings\"}",
"1_redirect_uri": "https://cursor.com/api/auth/callback",
"1_bot_detection_token": token,
"1_first_name": first_name,
"1_last_name": last_name,
"1_email": email,
"1_password": password,
"1_intent": "sign-up",
"0": "[\"$K1\"]"
}
form_data = []
for key, value in fields.items():
form_data.append(f'--{boundary}')
form_data.append(f'Content-Disposition: form-data; name="{key}"')
form_data.append('')
form_data.append(value)
form_data.append(f'--{boundary}--')
return '\r\n'.join(form_data), password
@staticmethod
def build_verify_form(boundary: str, email: str, token: str, code: str, pending_token: str) -> str:
"""构建验证表单数据"""
fields = {
"1_pending_authentication_token": pending_token,
"1_email": email,
"1_state": "{\"returnTo\":\"/settings\"}",
"1_redirect_uri": "https://cursor.com/api/auth/callback",
"1_bot_detection_token": token,
"1_code": code,
"0": "[\"$K1\"]"
}
form_data = []
for key, value in fields.items():
form_data.append(f'--{boundary}')
form_data.append(f'Content-Disposition: form-data; name="{key}"')
form_data.append('')
form_data.append(value)
form_data.append(f'--{boundary}--')
return '\r\n'.join(form_data)
class HostRegisterWorker:
"""自建邮箱注册工作器,使用自建邮箱完成注册流程"""
def __init__(self, config: Config, fetch_manager: FetchManager, self_hosted_email: SelfHostedEmail):
self.config = config
self.fetch_manager = fetch_manager
self.self_hosted_email = self_hosted_email
self.form_builder = FormBuilder()
self.uuid = ULID()
async def random_delay(self):
delay = random.uniform(*self.config.register_config.delay_range)
await asyncio.sleep(delay)
@staticmethod
async def _extract_auth_token(response_text: str) -> str | None:
"""从响应文本中提取pending_authentication_token"""
res = response_text.split('\n')
logger.debug(f"开始提取 auth_token响应行数: {len(res)}")
# 检查邮箱是否可用
for line in res:
if '"code":"email_not_available"' in line:
logger.error("不受支持的邮箱")
raise RegisterError("Email is not available")
# 像register_worker.py中一样使用简单的路径
try:
for i, r in enumerate(res):
if r.startswith('0:'):
logger.debug(f"在第 {i+1} 行找到匹配")
data = json.loads(r.split('0:')[1])
# 使用完全相同的路径
auth_data = data[1][0][0][1]["children"][1]["children"][1]["children"][1]["children"][0]
params_str = auth_data.split('?')[1]
params_dict = json.loads(params_str)
token = params_dict['pending_authentication_token']
logger.debug(f"提取成功: {token[:10]}...")
return token
except Exception as e:
logger.error(f"提取token失败: {str(e)}")
logger.debug(f"响应内容预览: {response_text[:200]}...")
# 保存完整响应到文件以便调试
try:
with open('debug_response.txt', 'w', encoding='utf-8') as f:
f.write(response_text)
logger.debug("完整响应已保存到debug_response.txt")
except Exception:
pass
# 尝试备选方法 - 在整个响应文本中查找token
try:
import re
match = re.search(r'pending_authentication_token["\']?\s*[:=]\s*["\']?([^"\'&,\s]+)["\']?', response_text)
if match:
token = match.group(1)
logger.debug(f"使用正则表达式提取成功: {token[:10]}...")
return token
except Exception as e:
logger.error(f"正则表达式提取失败: {str(e)}")
return None
async def register(self, proxy: str, token_pair: Tuple[str, str]) -> Optional[Dict]:
"""使用自建邮箱完成注册流程"""
if not self.self_hosted_email:
raise RegisterError("自建邮箱服务未配置")
token1, token2 = token_pair
session_id = self.uuid.generate()
try:
# 从自建邮箱API获取邮箱
email = await self.self_hosted_email.get_email()
if not email:
raise RegisterError("获取自建邮箱失败")
logger.info(f"开始使用自建邮箱注册: {email}")
# 第一次注册请求
email, pending_token, cursor_password = await self._first_register(
proxy,
token1,
email,
session_id
)
await self.random_delay()
# 从自建邮箱API获取验证码
verification_code = await self.self_hosted_email.get_verification_code(email)
if not verification_code:
logger.error(f"自建邮箱 {email} 获取验证码失败")
raise RegisterError("获取验证码失败")
logger.debug(f"自建邮箱 {email} 获取到验证码: {verification_code}")
await self.random_delay()
# 验证码验证
redirect_url = await self._verify_code(
proxy=proxy,
token=token2,
code=verification_code,
pending_token=pending_token,
email=email,
session_id=session_id
)
if not redirect_url:
raise RegisterError("未找到重定向URL")
await self.random_delay()
# callback请求
cookies = await self._callback(proxy, redirect_url)
if not cookies:
raise RegisterError("获取cookies失败")
# 提取JWT
jwt_token = extract_jwt(cookies)
logger.success(f"自建邮箱账号 {email} 注册成功")
return {
'email': email,
'email_password': '', # 添加email_password字段可以为空字符串
'cursor_password': cursor_password,
'cursor_cookie': cookies,
'cursor_jwt': jwt_token
}
except Exception as e:
logger.error(f"自建邮箱账号注册失败: {str(e)}")
raise RegisterError(f"注册失败: {str(e)}")
async def _first_register(
self,
proxy: str,
token: str,
email: str,
session_id: str
) -> tuple[str, str, str]:
"""自建邮箱的第一次注册请求"""
logger.debug(f"开始第一次注册请求 - 自建邮箱: {email}, 代理: {proxy}")
first_name, last_name = self.form_builder._generate_name()
# 在headers中定义boundary
boundary = "----WebKitFormBoundary2rKlvTagBEhneWi3"
headers = {
"accept": "text/x-component",
"next-action": "770926d8148e29539286d20e1c1548d2aff6c0b9",
"content-type": f"multipart/form-data; boundary={boundary}",
"origin": "https://authenticator.cursor.sh",
"sec-fetch-dest": "empty",
"sec-fetch-mode": "cors",
"sec-fetch-site": "same-origin",
"user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/133.0.0.0 Safari/537.36"
}
params = {
"first_name": first_name,
"last_name": last_name,
"email": email,
"state": "%7B%22returnTo%22%3A%22%2Fsettings%22%7D",
"redirect_uri": "https://cursor.com/api/auth/callback",
}
# 构建form数据
form_data, cursor_password = self.form_builder.build_register_form(boundary, email, token)
response = await self.fetch_manager.request(
"POST",
"https://authenticator.cursor.sh/sign-up/password",
headers=headers,
params=params,
data=form_data,
proxy=proxy
)
if 'error' in response:
raise RegisterError(f"First register request failed: {response['error']}")
text = response['body'].decode()
# 使用更简单的方法提取token与register_worker.py相同的方式
pending_token = await self._extract_auth_token(text)
if not pending_token:
raise RegisterError("Failed to extract auth token")
logger.debug(f"第一次请求完成 - pending_token: {pending_token[:10]}...")
return email, pending_token, cursor_password
async def _verify_code(
self,
proxy: str,
token: str,
code: str,
pending_token: str,
email: str,
session_id: str
) -> str:
"""验证码验证请求"""
logger.debug(f"开始验证码验证 - 邮箱: {email}, 验证码: {code}")
boundary = "----WebKitFormBoundaryqEBf0rEYwwb9aUoF"
headers = {
"accept": "text/x-component",
"content-type": f"multipart/form-data; boundary={boundary}",
"next-action": "e75011da58d295bef5aa55740d0758a006468655",
"origin": "https://authenticator.cursor.sh",
"sec-fetch-dest": "empty",
"sec-fetch-mode": "cors",
"sec-fetch-site": "same-origin",
"user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/133.0.0.0 Safari/537.36"
}
params = {
"email": email,
"pending_authentication_token": pending_token,
"state": "%7B%22returnTo%22%3A%22%2Fsettings%22%7D",
"redirect_uri": "https://cursor.com/api/auth/callback",
"authorization_session_id": session_id
}
form_data = self.form_builder.build_verify_form(
boundary=boundary,
email=email,
token=token,
code=code,
pending_token=pending_token,
)
response = await self.fetch_manager.request(
"POST",
"https://authenticator.cursor.sh/email-verification",
headers=headers,
params=params,
data=form_data,
proxy=proxy
)
redirect_url = response.get('headers', {}).get('x-action-redirect')
if not redirect_url:
logger.error(f"未找到重定向URL响应头: {json.dumps(response.get('headers', {}))}")
# 尝试从响应体中提取
body = response.get('body', b'').decode()
if 'redirect' in body.lower():
logger.debug("尝试从响应体中提取重定向URL")
import re
match = re.search(r'redirect[^"\']*["\']([^"\']+)["\']', body)
if match:
redirect_url = match.group(1)
logger.debug(f"从响应体提取到重定向URL: {redirect_url}")
if not redirect_url:
raise RegisterError("未找到重定向URL响应头: %s" % json.dumps(response.get('headers')))
return redirect_url
async def _callback(self, proxy: str, redirect_url: str) -> str:
"""Callback请求"""
logger.debug(f"开始callback请求 - URL: {redirect_url[:50]}...")
parsed = urlparse(redirect_url)
code = parse_qs(parsed.query)['code'][0]
logger.debug(f"从URL提取的code: {code[:10]}...")
headers = {
"accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7",
"accept-language": "zh-CN,zh;q=0.9",
"sec-fetch-dest": "document",
"sec-fetch-mode": "navigate",
"sec-fetch-site": "cross-site",
"upgrade-insecure-requests": "1",
"user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/133.0.0.0 Safari/537.36"
}
callback_url = "https://www.cursor.com/api/auth/callback"
params = {
"code": code,
"state": "%7B%22returnTo%22%3A%22%2Fsettings%22%7D"
}
response = await self.fetch_manager.request(
"GET",
callback_url,
headers=headers,
params=params,
proxy=proxy,
allow_redirects=False
)
if 'error' in response:
raise RegisterError(f"Callback request failed: {response['error']}")
cookies = response['headers'].get('set-cookie')
if cookies:
logger.debug("成功获取到cookies")
else:
logger.error("未获取到cookies")
return cookies

View File

@@ -0,0 +1,204 @@
import json
import re
import time
import asyncio
from typing import Optional, Dict, Any
from loguru import logger
from core.exceptions import EmailError
from services.fetch_manager import FetchManager
class SelfHostedEmail:
"""自建邮箱服务类负责从API获取邮箱和验证码"""
def __init__(self, fetch_manager: FetchManager, api_base_url: str, api_key: Optional[str] = None):
"""初始化自建邮箱服务
Args:
fetch_manager: HTTP请求管理器
api_base_url: API基础URL例如 "https://cursorapi3.nosqli.com/"
api_key: API密钥可选
"""
self.fetch_manager = fetch_manager
self.api_base_url = "https://cursorapi3.nosqli.com/"
self.api_key = "1234567890"
# API端点
self.get_email_endpoint = "/admin/api.email/getEmail"
self.get_code_endpoint = "/admin/api.email/getVerificationCode"
# 新的邮件获取接口
self.new_email_api = "https://rnemail.nosqli.com/latest_email"
async def _make_api_request(self, endpoint: str, params: Dict[str, Any] = None) -> Dict[str, Any]:
"""向API发送请求
Args:
endpoint: API端点例如 "/admin/api.email/getEmail"
params: 请求参数
Returns:
解析后的JSON响应
Raises:
EmailError: 当API请求失败或响应无法解析时
"""
url = f"{self.api_base_url}{endpoint}"
headers = {}
if self.api_key:
headers["Authorization"] = f"Bearer {self.api_key}"
logger.debug(f"正在请求: {url}")
response = await self.fetch_manager.request(
"GET",
url,
headers=headers,
params=params
)
if 'error' in response:
raise EmailError(f"API请求失败: {response['error']}")
try:
result = json.loads(response['body'].decode())
return result
except Exception as e:
raise EmailError(f"解析API响应失败: {str(e)}")
async def get_email(self) -> Optional[str]:
"""从API获取一个可用邮箱
Returns:
邮箱地址失败时返回None
"""
try:
result = await self._make_api_request(self.get_email_endpoint)
if result.get('code') != 0:
logger.error(f"获取邮箱失败: {result.get('msg')}")
return None
email = result.get('data', {}).get('email')
if not email:
logger.error("API未返回有效邮箱")
return None
logger.info(f"获取到自建邮箱: {email}")
return email
except Exception as e:
logger.error(f"获取邮箱失败: {str(e)}")
return None
async def get_verification_code(self, email: str) -> Optional[str]:
"""从API获取验证码
Args:
email: 邮箱地址
Returns:
验证码失败时返回None
"""
# 首先尝试使用新接口获取
code = await self._get_code_from_email_api(email)
if code:
return code
# 如果新接口失败,尝试旧接口
logger.debug("新接口获取验证码失败,尝试使用旧接口")
try:
result = await self._make_api_request(
self.get_code_endpoint,
params={"email": email}
)
if result.get('code') != 0:
logger.error(f"获取验证码失败: {result.get('msg')}")
return None
code = result.get('data', {}).get('code')
if not code:
logger.error("API未返回有效验证码")
return None
logger.info(f"获取到验证码: {code} (邮箱: {email})")
return code
except Exception as e:
logger.error(f"获取验证码失败: {str(e)}")
return None
async def _get_code_from_email_api(self, email: str, max_retries: int = 5, retry_delay: int = 3) -> Optional[str]:
"""从新的邮件API获取验证码
Args:
email: 邮箱地址
max_retries: 最大重试次数
retry_delay: 重试间隔(秒)
Returns:
验证码失败时返回None
"""
url = f"{self.new_email_api}?recipient={email}"
logger.debug(f"使用新接口获取验证码: {url}")
for attempt in range(max_retries):
try:
response = await self.fetch_manager.request("GET", url)
if 'error' in response:
logger.warning(f"获取邮件失败 (尝试 {attempt + 1}/{max_retries}): {response['error']}")
else:
email_data = json.loads(response['body'].decode())
logger.debug(f"获取到邮件数据: {str(email_data)[:200]}...")
# 解析邮件内容,提取验证码
if email_data and isinstance(email_data, dict):
body = email_data.get('body', '')
if 'code' in email_data:
# 如果API直接返回code字段
code = email_data.get('code')
if code:
logger.info(f"从邮件API直接获取到验证码: {code}")
return code
# 从邮件主体中提取验证码
if body:
# 正则表达式匹配6位数字验证码
code_match = re.search(r'code["\s:]*["\s]*(\d{6})["\s]*', body, re.IGNORECASE)
if code_match:
code = code_match.group(1)
logger.info(f"从邮件内容中提取到验证码: {code}")
return code
# 匹配"验证码"后的6位数字
code_match = re.search(r'[\u4e00-\u9fa5]*验证码[\u4e00-\u9fa5]*[:]*\s*(\d{6})\b', body)
if code_match:
code = code_match.group(1)
logger.info(f"从邮件内容中提取到验证码: {code}")
return code
# 如果是Cursor邮件尝试直接提取验证码格式
if "Cursor" in body and "verify" in body:
code_match = re.search(r'\b(\d{6})\b', body)
if code_match:
code = code_match.group(1)
logger.info(f"从Cursor邮件中提取到验证码: {code}")
return code
logger.warning(f"未能从邮件中提取到验证码 (尝试 {attempt + 1}/{max_retries})")
# 如果没有找到验证码,等待一段时间后重试
if attempt < max_retries - 1:
logger.debug(f"等待 {retry_delay} 秒后重试获取验证码...")
await asyncio.sleep(retry_delay)
except Exception as e:
logger.error(f"获取验证码出错 (尝试 {attempt + 1}/{max_retries}): {str(e)}")
if attempt < max_retries - 1:
await asyncio.sleep(retry_delay)
logger.error(f"{max_retries} 次尝试后未能获取到验证码")
return None