自建邮箱版本

This commit is contained in:
huangzhenpc
2025-04-07 13:14:29 +08:00
parent 9d00c0b58e
commit 901c8c95e1
8 changed files with 699 additions and 19 deletions

View File

@@ -65,6 +65,7 @@ sudo systemctl enable redis
mkdir -p /opt/cursor-service
cd /opt/cursor-service
sudo apt install python3-venv python3-full
# 创建并激活虚拟环境
python3 -m venv venv
source venv/bin/activate

View File

@@ -43,13 +43,18 @@ proxy:
# 注册配置
register:
delay_range: [1, 2]
batch_size: 15
batch_size: 1
#这里是注册的并发数量
# 邮件配置
email:
file_path: "email.txt"
# 自建邮箱服务配置
self_hosted_email:
api_base_url: "https://api.cursorpro.com.cn"
api_key: "" # 如果API需要认证请填写
# 自动服务配置
auto_service:
check_interval: 60 # 检查API状态的间隔

75
config_example.yaml Normal file
View File

@@ -0,0 +1,75 @@
# 全局配置
global:
max_concurrency: 20
timeout: 30
retry_times: 3
# 服务器配置
server_config:
hostname: "sg424" # 服务器标识用于API调用
# 数据库配置
database:
# SQLite配置兼容旧版本
path: "cursor.db"
pool_size: 10
# MySQL配置
host: "localhost"
port: 3306
username: "auto_cursor_reg"
password: "this_password_jiaqiao"
database: "auto_cursor_reg"
# 是否使用Redis缓存
# 如果使用Python 3.12请确保安装redis>=4.2.0而不是aioredis
use_redis: true
# Redis配置可选当use_redis为true时生效
redis:
host: "127.0.0.1"
port: 6379
password: ""
db: 0
# 代理配置
proxy:
api_url: "https://share.proxy.qg.net/get?key=969331C5&num=1&area=&isp=0&format=txt&seq=\r\n&distinct=false"
batch_size: 100
check_interval: 300
# API专用代理可选
api_proxy: "http://1ddbeae0f7a67106fd58:f72e512b10893a1d@gw.dataimpulse.com:823"
# 注册配置
register:
delay_range: [1, 2]
batch_size: 15
#这里是注册的并发数量
# 邮件配置
email:
file_path: "email.txt"
# 自建邮箱服务配置
# 如果添加了这部分配置,系统将优先使用自建邮箱进行注册
self_hosted_email:
api_base_url: "https://api.cursorpro.com.cn"
api_key: "your_api_key_here" # 可选如果API需要认证
# 自动服务配置
auto_service:
check_interval: 60 # 检查API状态的间隔
upload_interval: 300 # 上传账号间隔(秒)
email_check_threshold: 30 # 当可用邮箱少于这个数时获取新邮箱
captcha:
provider: "capsolver" # 可选值: "capsolver" 或 "yescaptcha"
capsolver:
api_key: "CAP-36D01B0995C7C8705DF68ACCFE4E2004FE182DDA72AC5A80F25F1E3B601C31F0"
website_url: "https://authenticator.cursor.sh"
website_key: "0x4AAAAAAAMNIvC45A4Wjjln"
yescaptcha:
client_key: "a5ef0062c1d2674900e78722c5670e3a3484bc8c64273"
website_url: "https://authenticator.cursor.sh"
website_key: "a5ef0062c1d2674900e78722c5670e3a3484bc8c64273"
use_cn_server: false

View File

@@ -53,6 +53,13 @@ class EmailConfig:
file_path: str
@dataclass
class SelfHostedEmailConfig:
"""自建邮箱配置"""
api_base_url: str
api_key: Optional[str] = None
@dataclass
class ServerConfig:
hostname: str
@@ -100,6 +107,7 @@ class Config:
captcha_config: CaptchaConfig = None
server_config: Optional[ServerConfig] = None
auto_service_config: Optional[AutoServiceConfig] = None
self_hosted_email_config: Optional[SelfHostedEmailConfig] = None
hostname: Optional[str] = None # 向后兼容
@classmethod
@@ -137,6 +145,9 @@ class Config:
# 创建自动服务配置对象
auto_service_config = AutoServiceConfig(**data.get('auto_service', {})) if 'auto_service' in data else None
# 创建自建邮箱配置对象
self_hosted_email_config = SelfHostedEmailConfig(**data.get('self_hosted_email', {})) if 'self_hosted_email' in data else None
# 设置hostname (优先使用server_config中的hostname)
hostname = None
if server_config and hasattr(server_config, 'hostname'):
@@ -152,5 +163,6 @@ class Config:
captcha_config=captcha_config,
server_config=server_config,
auto_service_config=auto_service_config,
self_hosted_email_config=self_hosted_email_config,
hostname=hostname
)

116
main.py
View File

@@ -11,10 +11,12 @@ from core.config import Config
from core.database import DatabaseManager
from core.logger import setup_logger
from register.register_worker import RegisterWorker
from register.host_register_worker import HostRegisterWorker
from services.email_manager import EmailManager
from services.fetch_manager import FetchManager
from services.proxy_pool import ProxyPool
from services.token_pool import TokenPool
from services.self_hosted_email import SelfHostedEmail
class CursorRegister:
@@ -25,6 +27,8 @@ class CursorRegister:
self.fetch_manager = FetchManager(self.config)
self.proxy_pool = ProxyPool(self.config, self.fetch_manager)
self.token_pool = TokenPool(self.config)
# 初始化常规邮箱服务
self.email_manager = EmailManager(self.config, self.db_manager)
self.register_worker = RegisterWorker(
self.config,
@@ -32,6 +36,25 @@ class CursorRegister:
self.email_manager
)
# 初始化自建邮箱服务(如果配置了)
self.self_hosted_email = None
self.host_register_worker = None
if hasattr(self.config, 'self_hosted_email_config') and self.config.self_hosted_email_config:
self.self_hosted_email = SelfHostedEmail(
self.fetch_manager,
self.config.self_hosted_email_config.api_base_url,
self.config.self_hosted_email_config.api_key
)
self.logger.info("自建邮箱服务已初始化")
# 初始化自建邮箱注册工作器
self.host_register_worker = HostRegisterWorker(
self.config,
self.fetch_manager,
self.self_hosted_email
)
self.logger.info("自建邮箱注册工作器已初始化")
async def initialize(self):
"""初始化数据库"""
await self.db_manager.initialize()
@@ -136,6 +159,61 @@ class CursorRegister:
return successful
async def batch_register_with_host(self, num: int):
"""使用自建邮箱批量注册"""
if not self.host_register_worker:
self.logger.error("未配置自建邮箱注册工作器,无法继续")
return []
try:
self.logger.info(f"开始使用自建邮箱批量注册 {num} 个账号")
# 1. 获取token对
token_pairs = await self.token_pool.batch_generate(num)
if not token_pairs:
self.logger.error("获取token失败终止注册")
return []
actual_num = len(token_pairs) # 根据实际获取到的token对数量调整注册数量
if actual_num < num:
self.logger.warning(f"只获取到 {actual_num} 对token将减少注册数量")
num = actual_num
# 2. 获取代理
proxies = await self.proxy_pool.batch_get(num)
# 在关键位置添加详细日志
self.logger.debug(f"代理列表: {proxies}")
self.logger.debug(f"尝试使用的token对数量: {len(token_pairs)}")
# 3. 创建注册任务
tasks = []
for proxy, token_pair in zip(proxies, token_pairs):
task = self.host_register_worker.register(proxy, token_pair)
tasks.append(task)
# 4. 并发执行
results = await asyncio.gather(*tasks, return_exceptions=True)
# 5. 处理结果
successful = []
failed = []
for i, result in enumerate(results):
if isinstance(result, Exception):
self.logger.error(f"自建邮箱注册任务 {i+1} 失败: {str(result)}")
failed.append(str(result))
else:
self.logger.info(f"自建邮箱注册任务 {i+1} 成功: {result.get('email')}")
successful.append(result)
self.logger.info(f"自建邮箱注册完成: 成功 {len(successful)}, 失败 {len(failed)}")
return successful
except Exception as e:
self.logger.error(f"自建邮箱批量注册失败: {str(e)}")
return []
async def main():
register = CursorRegister()
@@ -145,16 +223,18 @@ async def main():
batch_size = register.config.register_config.batch_size
total_registered = 0
while True:
# 直接检查数据库中是否有可用的邮箱账号
pending_count = await register.email_manager.count_pending_accounts()
if pending_count <= 0:
register.logger.info("没有可用的邮箱账号,注册完成")
break
# 直接使用自建邮箱模式,不再检查配置
register.logger.info("使用自建邮箱模式")
# 确保已初始化自建邮箱服务
if not register.host_register_worker:
register.logger.error("自建邮箱注册工作器未初始化,请检查配置")
return
# 执行批量注册
register.logger.info(f"发现 {pending_count} 个可用邮箱,开始新一轮批量注册,批次大小: {batch_size}")
results = await register.batch_register(batch_size)
# 自建邮箱模式,直接执行自建邮箱批量注册
while True:
register.logger.info(f"开始新一轮自建邮箱批量注册,批次大小: {batch_size}")
results = await register.batch_register_with_host(batch_size)
# 统计结果
successful = len(results)
@@ -162,9 +242,8 @@ async def main():
register.logger.info(f"当前总进度: 已注册 {total_registered} 个账号")
# 批次结束后等待3秒确保所有数据库更新都已完成
register.logger.info("本批次注册完成,等待数据库状态完全更新...")
await asyncio.sleep(3)
# 批次之间的间隔
await asyncio.sleep(5)
# 如果本批次注册失败率过高,暂停一段时间
if successful < batch_size * 0.5 and successful > 0: # 成功率低于50%但不为零
@@ -173,9 +252,16 @@ async def main():
elif successful == 0 and batch_size > 0: # 完全失败
register.logger.error("本批次完全失败可能存在系统问题暂停120秒后继续")
await asyncio.sleep(120)
else:
# 正常等待一个较短的时间再继续下一批
await asyncio.sleep(5)
# 让用户决定是否继续
try:
answer = input("是否继续下一批注册? (y/n): ").strip().lower()
if answer != 'y':
register.logger.info("用户选择停止注册")
break
except Exception:
# 如果在非交互环境中运行,默认继续
pass
except Exception as e:
register.logger.error(f"程序执行出错: {str(e)}")

View File

@@ -0,0 +1,380 @@
import asyncio
import json
import random
import string
from typing import Optional, Tuple, Dict
from urllib.parse import parse_qs, urlparse
from loguru import logger
from core.config import Config
from core.exceptions import RegisterError
from services.fetch_manager import FetchManager
from services.self_hosted_email import SelfHostedEmail
from services.uuid import ULID
def extract_jwt(cookie_string: str) -> str:
"""从cookie字符串中提取JWT token"""
try:
return cookie_string.split(';')[0].split('=')[1].split('%3A%3A')[1]
except Exception as e:
logger.error(f"[错误] 提取JWT失败: {str(e)}")
return ""
class FormBuilder:
@staticmethod
def _generate_password() -> str:
"""生成随机密码
规则: 12-16位包含大小写字母、数字和特殊字符
"""
length = random.randint(12, 16)
lowercase = string.ascii_lowercase
uppercase = string.ascii_uppercase
digits = string.digits
special = "!@#$%^&*"
# 确保每种字符至少有一个
password = [
random.choice(lowercase),
random.choice(uppercase),
random.choice(digits),
random.choice(special)
]
# 填充剩余长度
all_chars = lowercase + uppercase + digits + special
password.extend(random.choice(all_chars) for _ in range(length - 4))
# 打乱顺序
random.shuffle(password)
return ''.join(password)
@staticmethod
def _generate_name() -> tuple[str, str]:
"""生成随机的名字和姓氏
Returns:
tuple: (first_name, last_name)
"""
first_names = ["Alex", "Sam", "Chris", "Jordan", "Taylor", "Morgan", "Casey", "Drew", "Pat", "Quinn"]
last_names = ["Smith", "Johnson", "Brown", "Davis", "Wilson", "Moore", "Taylor", "Anderson", "Thomas", "Jackson"]
return (
random.choice(first_names),
random.choice(last_names)
)
@staticmethod
def build_register_form(boundary: str, email: str, token: str) -> tuple[str, str]:
"""构建注册表单数据,返回(form_data, password)"""
password = FormBuilder._generate_password()
first_name, last_name = FormBuilder._generate_name()
fields = {
"1_state": "{\"returnTo\":\"/settings\"}",
"1_redirect_uri": "https://cursor.com/api/auth/callback",
"1_bot_detection_token": token,
"1_first_name": first_name,
"1_last_name": last_name,
"1_email": email,
"1_password": password,
"1_intent": "sign-up",
"0": "[\"$K1\"]"
}
form_data = []
for key, value in fields.items():
form_data.append(f'--{boundary}')
form_data.append(f'Content-Disposition: form-data; name="{key}"')
form_data.append('')
form_data.append(value)
form_data.append(f'--{boundary}--')
return '\r\n'.join(form_data), password
@staticmethod
def build_verify_form(boundary: str, email: str, token: str, code: str, pending_token: str) -> str:
"""构建验证表单数据"""
fields = {
"1_pending_authentication_token": pending_token,
"1_email": email,
"1_state": "{\"returnTo\":\"/settings\"}",
"1_redirect_uri": "https://cursor.com/api/auth/callback",
"1_bot_detection_token": token,
"1_code": code,
"0": "[\"$K1\"]"
}
form_data = []
for key, value in fields.items():
form_data.append(f'--{boundary}')
form_data.append(f'Content-Disposition: form-data; name="{key}"')
form_data.append('')
form_data.append(value)
form_data.append(f'--{boundary}--')
return '\r\n'.join(form_data)
class HostRegisterWorker:
"""自建邮箱注册工作器,使用自建邮箱完成注册流程"""
def __init__(self, config: Config, fetch_manager: FetchManager, self_hosted_email: SelfHostedEmail):
self.config = config
self.fetch_manager = fetch_manager
self.self_hosted_email = self_hosted_email
self.form_builder = FormBuilder()
self.uuid = ULID()
async def random_delay(self):
delay = random.uniform(*self.config.register_config.delay_range)
await asyncio.sleep(delay)
async def register(self, proxy: str, token_pair: Tuple[str, str]) -> Optional[Dict]:
"""使用自建邮箱完成注册流程"""
if not self.self_hosted_email:
raise RegisterError("自建邮箱服务未配置")
token1, token2 = token_pair
session_id = self.uuid.generate()
try:
# 从自建邮箱API获取邮箱
email = await self.self_hosted_email.get_email()
if not email:
raise RegisterError("获取自建邮箱失败")
logger.info(f"开始使用自建邮箱注册: {email}")
# 第一次注册请求
email, pending_token, cursor_password = await self._first_register(
proxy,
token1,
email,
session_id
)
await self.random_delay()
# 从自建邮箱API获取验证码
verification_code = await self.self_hosted_email.get_verification_code(email)
if not verification_code:
logger.error(f"自建邮箱 {email} 获取验证码失败")
raise RegisterError("获取验证码失败")
logger.debug(f"自建邮箱 {email} 获取到验证码: {verification_code}")
await self.random_delay()
# 验证码验证
redirect_url = await self._verify_code(
proxy=proxy,
token=token2,
code=verification_code,
pending_token=pending_token,
email=email,
session_id=session_id
)
if not redirect_url:
raise RegisterError("未找到重定向URL")
await self.random_delay()
# callback请求
cookies = await self._callback(proxy, redirect_url)
if not cookies:
raise RegisterError("获取cookies失败")
# 提取JWT
jwt_token = extract_jwt(cookies)
logger.success(f"自建邮箱账号 {email} 注册成功")
return {
'email': email,
'cursor_password': cursor_password,
'cursor_cookie': cookies,
'cursor_jwt': jwt_token
}
except Exception as e:
logger.error(f"自建邮箱账号注册失败: {str(e)}")
raise RegisterError(f"注册失败: {str(e)}")
async def _first_register(
self,
proxy: str,
token: str,
email: str,
session_id: str
) -> tuple[str, str, str]:
"""自建邮箱的第一次注册请求"""
logger.debug(f"开始第一次注册请求 - 自建邮箱: {email}, 代理: {proxy}")
first_name, last_name = self.form_builder._generate_name()
# 在headers中定义boundary
boundary = "----WebKitFormBoundary2rKlvTagBEhneWi3"
headers = {
"accept": "text/x-component",
"next-action": "a67eb6646e43eddcbd0d038cbee664aac59f5a53",
"content-type": f"multipart/form-data; boundary={boundary}",
"origin": "https://authenticator.cursor.sh",
"sec-fetch-dest": "empty",
"sec-fetch-mode": "cors",
"sec-fetch-site": "same-origin"
}
params = {
"first_name": first_name,
"last_name": last_name,
"email": email,
"state": "%7B%22returnTo%22%3A%22%2Fsettings%22%7D",
"redirect_uri": "https://cursor.com/api/auth/callback",
}
# 构建form数据
form_data, cursor_password = self.form_builder.build_register_form(boundary, email, token)
response = await self.fetch_manager.request(
"POST",
"https://authenticator.cursor.sh/sign-up/password",
headers=headers,
params=params,
data=form_data,
proxy=proxy
)
if 'error' in response:
raise RegisterError(f"First register request failed: {response['error']}")
text = response['body'].decode()
# 检查邮箱是否可用
if '"code":"email_not_available"' in text:
logger.error(f"邮箱 {email} 不受支持")
raise RegisterError("Email is not available")
# 提取pending_token
pending_token = None
res = text.split('\n')
try:
for i, r in enumerate(res):
if r.startswith('0:'):
logger.debug(f"在第 {i+1} 行找到匹配")
data = json.loads(r.split('0:')[1])
auth_data = data[1][0][0][1]["children"][1]["children"][1]["children"][1]["children"][0]
params_str = auth_data.split('?')[1]
params_dict = json.loads(params_str)
pending_token = params_dict['pending_authentication_token']
logger.debug(f"提取成功: {pending_token[:10]}...")
break
except Exception as e:
logger.error(f"提取token失败: {str(e)}")
raise RegisterError("Failed to extract auth token")
if not pending_token:
raise RegisterError("Failed to extract auth token")
logger.debug(f"第一次请求完成 - pending_token: {pending_token[:10]}...")
return email, pending_token, cursor_password
async def _verify_code(
self,
proxy: str,
token: str,
code: str,
pending_token: str,
email: str,
session_id: str
) -> str:
"""验证码验证请求"""
logger.debug(f"开始验证码验证 - 邮箱: {email}, 验证码: {code}")
boundary = "----WebKitFormBoundaryqEBf0rEYwwb9aUoF"
headers = {
"accept": "text/x-component",
"content-type": f"multipart/form-data; boundary={boundary}",
"next-action": "e75011da58d295bef5aa55740d0758a006468655",
"origin": "https://authenticator.cursor.sh",
"sec-fetch-dest": "empty",
"sec-fetch-mode": "cors",
"sec-fetch-site": "same-origin",
"user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/133.0.0.0 Safari/537.36"
}
params = {
"email": email,
"pending_authentication_token": pending_token,
"state": "%7B%22returnTo%22%3A%22%2Fsettings%22%7D",
"redirect_uri": "https://cursor.com/api/auth/callback",
"authorization_session_id": session_id
}
form_data = self.form_builder.build_verify_form(
boundary=boundary,
email=email,
token=token,
code=code,
pending_token=pending_token,
)
response = await self.fetch_manager.request(
"POST",
"https://authenticator.cursor.sh/email-verification",
headers=headers,
params=params,
data=form_data,
proxy=proxy
)
redirect_url = response.get('headers', {}).get('x-action-redirect')
if not redirect_url:
raise RegisterError("未找到重定向URL响应头: %s" % json.dumps(response.get('headers')))
return redirect_url
async def _callback(self, proxy: str, redirect_url: str) -> str:
"""Callback请求"""
logger.debug(f"开始callback请求 - URL: {redirect_url[:50]}...")
parsed = urlparse(redirect_url)
code = parse_qs(parsed.query)['code'][0]
logger.debug(f"从URL提取的code: {code[:10]}...")
headers = {
"accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7",
"accept-language": "zh-CN,zh;q=0.9",
"sec-fetch-dest": "document",
"sec-fetch-mode": "navigate",
"sec-fetch-site": "cross-site",
"upgrade-insecure-requests": "1",
"user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/133.0.0.0 Safari/537.36"
}
callback_url = "https://www.cursor.com/api/auth/callback"
params = {
"code": code,
"state": "%7B%22returnTo%22%3A%22%2Fsettings%22%7D"
}
response = await self.fetch_manager.request(
"GET",
callback_url,
headers=headers,
params=params,
proxy=proxy,
allow_redirects=False
)
if 'error' in response:
raise RegisterError(f"Callback request failed: {response['error']}")
cookies = response['headers'].get('set-cookie')
if cookies:
logger.debug("成功获取到cookies")
else:
logger.error("未获取到cookies")
return cookies

View File

@@ -68,13 +68,14 @@ class FormBuilder:
def build_register_form(boundary: str, email: str, token: str) -> tuple[str, str]:
"""构建注册表单数据,返回(form_data, password)"""
password = FormBuilder._generate_password()
first_name, last_name = FormBuilder._generate_name()
fields = {
"1_state": "{\"returnTo\":\"/settings\"}",
"1_redirect_uri": "https://cursor.com/api/auth/callback",
"1_bot_detection_token": token,
"1_first_name": "wa",
"1_last_name": "niu",
"1_first_name": first_name,
"1_last_name": last_name,
"1_email": email,
"1_password": password,
"1_intent": "sign-up",
@@ -249,7 +250,8 @@ class RegisterWorker:
boundary = "----WebKitFormBoundary2rKlvTagBEhneWi3"
headers = {
"accept": "text/x-component",
"next-action": "770926d8148e29539286d20e1c1548d2aff6c0b9",
# "next-action": "770926d8148e29539286d20e1c1548d2aff6c0b9",
"next-action": "a67eb6646e43eddcbd0d038cbee664aac59f5a53",
"content-type": f"multipart/form-data; boundary={boundary}",
"origin": "https://authenticator.cursor.sh",
"sec-fetch-dest": "empty",

View File

@@ -0,0 +1,119 @@
import json
from typing import Optional, Dict, Any
from loguru import logger
from core.exceptions import EmailError
from services.fetch_manager import FetchManager
class SelfHostedEmail:
"""自建邮箱服务类负责从API获取邮箱和验证码"""
def __init__(self, fetch_manager: FetchManager, api_base_url: str, api_key: Optional[str] = None):
"""初始化自建邮箱服务
Args:
fetch_manager: HTTP请求管理器
api_base_url: API基础URL例如 "https://api.cursorpro.com.cn"
api_key: API密钥可选
"""
self.fetch_manager = fetch_manager
self.api_base_url = "https://api.cursorpro.com.cn"
self.api_key = "1234567890"
# API端点
self.get_email_endpoint = "/admin/api.email/getEmail"
self.get_code_endpoint = "/admin/api.email/getVerificationCode"
async def _make_api_request(self, endpoint: str, params: Dict[str, Any] = None) -> Dict[str, Any]:
"""向API发送请求
Args:
endpoint: API端点例如 "/admin/api.email/getEmail"
params: 请求参数
Returns:
解析后的JSON响应
Raises:
EmailError: 当API请求失败或响应无法解析时
"""
url = f"{self.api_base_url}{endpoint}"
headers = {}
if self.api_key:
headers["Authorization"] = f"Bearer {self.api_key}"
logger.debug(f"正在请求: {url}")
response = await self.fetch_manager.request(
"GET",
url,
headers=headers,
params=params
)
if 'error' in response:
raise EmailError(f"API请求失败: {response['error']}")
try:
result = json.loads(response['body'].decode())
return result
except Exception as e:
raise EmailError(f"解析API响应失败: {str(e)}")
async def get_email(self) -> Optional[str]:
"""从API获取一个可用邮箱
Returns:
邮箱地址失败时返回None
"""
try:
result = await self._make_api_request(self.get_email_endpoint)
if result.get('code') != 0:
logger.error(f"获取邮箱失败: {result.get('msg')}")
return None
email = result.get('data', {}).get('email')
if not email:
logger.error("API未返回有效邮箱")
return None
logger.info(f"获取到自建邮箱: {email}")
return email
except Exception as e:
logger.error(f"获取邮箱失败: {str(e)}")
return None
async def get_verification_code(self, email: str) -> Optional[str]:
"""从API获取验证码
Args:
email: 邮箱地址
Returns:
验证码失败时返回None
"""
try:
result = await self._make_api_request(
self.get_code_endpoint,
params={"email": email}
)
if result.get('code') != 0:
logger.error(f"获取验证码失败: {result.get('msg')}")
return None
code = result.get('data', {}).get('code')
if not code:
logger.error("API未返回有效验证码")
return None
logger.info(f"获取到验证码: {code} (邮箱: {email})")
return code
except Exception as e:
logger.error(f"获取验证码失败: {str(e)}")
return None