Files
auto_cursor/register/register_worker.py
2025-04-07 14:55:33 +08:00

405 lines
15 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import asyncio
import json
import random
import string
from typing import Optional, Tuple
from urllib.parse import parse_qs, urlparse
from loguru import logger
from core.config import Config
from core.exceptions import RegisterError
from services.email_manager import EmailAccount, EmailManager
from services.fetch_manager import FetchManager
from services.uuid import ULID
def extract_jwt(cookie_string: str) -> str:
"""从cookie字符串中提取JWT token"""
try:
return cookie_string.split(';')[0].split('=')[1].split('%3A%3A')[1]
except Exception as e:
logger.error(f"[错误] 提取JWT失败: {str(e)}")
return ""
class FormBuilder:
@staticmethod
def _generate_password() -> str:
"""生成随机密码
规则: 12-16位包含大小写字母、数字和特殊字符
"""
length = random.randint(12, 16)
lowercase = string.ascii_lowercase
uppercase = string.ascii_uppercase
digits = string.digits
special = "!@#$%^&*"
# 确保每种字符至少有一个
password = [
random.choice(lowercase),
random.choice(uppercase),
random.choice(digits),
random.choice(special)
]
# 填充剩余长度
all_chars = lowercase + uppercase + digits + special
password.extend(random.choice(all_chars) for _ in range(length - 4))
# 打乱顺序
random.shuffle(password)
return ''.join(password)
@staticmethod
def _generate_name() -> tuple[str, str]:
"""生成随机的名字和姓氏
Returns:
tuple: (first_name, last_name)
"""
first_names = ["Alex", "Sam", "Chris", "Jordan", "Taylor", "Morgan", "Casey", "Drew", "Pat", "Quinn"]
last_names = ["Smith", "Johnson", "Brown", "Davis", "Wilson", "Moore", "Taylor", "Anderson", "Thomas", "Jackson"]
return (
random.choice(first_names),
random.choice(last_names)
)
@staticmethod
def build_register_form(boundary: str, email: str, token: str) -> tuple[str, str]:
"""构建注册表单数据,返回(form_data, password)"""
password = FormBuilder._generate_password()
fields = {
"1_state": "{\"returnTo\":\"/settings\"}",
"1_redirect_uri": "https://cursor.com/api/auth/callback",
"1_bot_detection_token": token,
"1_first_name": "wa",
"1_last_name": "niu",
"1_email": email,
"1_password": password,
"1_intent": "sign-up",
"0": "[\"$K1\"]"
}
form_data = []
for key, value in fields.items():
form_data.append(f'--{boundary}')
form_data.append(f'Content-Disposition: form-data; name="{key}"')
form_data.append('')
form_data.append(value)
form_data.append(f'--{boundary}--')
return '\r\n'.join(form_data), password
@staticmethod
def build_verify_form(boundary: str, email: str, token: str, code: str, pending_token: str) -> str:
"""构建验证表单数据"""
fields = {
"1_pending_authentication_token": pending_token,
"1_email": email,
"1_state": "{\"returnTo\":\"/settings\"}",
"1_redirect_uri": "https://cursor.com/api/auth/callback",
"1_bot_detection_token": token,
"1_code": code,
"0": "[\"$K1\"]"
}
form_data = []
for key, value in fields.items():
form_data.append(f'--{boundary}')
form_data.append(f'Content-Disposition: form-data; name="{key}"')
form_data.append('')
form_data.append(value)
form_data.append(f'--{boundary}--')
return '\r\n'.join(form_data)
class RegisterWorker:
def __init__(self, config: Config, fetch_manager: FetchManager, email_manager: EmailManager):
self.config = config
self.fetch_manager = fetch_manager
self.email_manager = email_manager
self.form_builder = FormBuilder()
self.uuid = ULID()
async def random_delay(self):
delay = random.uniform(*self.config.register_config.delay_range)
await asyncio.sleep(delay)
@staticmethod
async def _extract_auth_token(response_text: str, email_account: EmailAccount, email_manager: EmailManager) -> str | None:
"""从响应文本中提取pending_authentication_token"""
res = response_text.split('\n')
logger.debug(f"开始提取 auth_token响应行数: {len(res)}")
# 检查邮箱是否可用
for line in res:
if '"code":"email_not_available"' in line:
logger.error("不受支持的邮箱")
await email_manager.update_account_status(email_account.id, 'unavailable')
raise RegisterError("Email is not available")
try:
for i, r in enumerate(res):
if r.startswith('0:'):
logger.debug(f"在第 {i+1} 行找到匹配")
data = json.loads(r.split('0:')[1])
auth_data = data[1][0][0][1]["children"][1]["children"][1]["children"][1]["children"][0]
params_str = auth_data.split('?')[1]
params_dict = json.loads(params_str)
token = params_dict['pending_authentication_token']
logger.debug(f"方法2提取成功: {token[:10]}...")
return token
except Exception as e:
logger.error(f"提取token失败: {str(e)}")
logger.debug("响应内容预览:", response_text[:200])
return None
async def register(self, proxy: str, token_pair: Tuple[str, str], email_account: EmailAccount):
"""完整的注册流程"""
token1, token2 = token_pair
session_id = self.uuid.generate()
try:
logger.info(f"开始注册账号: {email_account.email}")
# 第一次注册请求
try:
email, pending_token, cursor_password = await self._first_register(
proxy,
token1,
email_account.email,
email_account,
session_id=session_id
)
except RegisterError as e:
if "Email is not available" in str(e):
logger.warning(f"邮箱 {email_account.email} 不受支持,跳过处理")
return None
raise e
# 获取验证码的同时,可以开始准备下一步的操作
verification_code_task = self._get_verification_code_with_retry(
email_account.email,
email_account.refresh_token,
email_account.client_id,
max_retries=3
)
# 等待验证码
verification_code = await verification_code_task
if not verification_code:
logger.error(f"账号 {email_account.email} 获取验证码失败")
await self.email_manager.update_account_status(email_account.id, 'failed')
raise RegisterError("Failed to get verification code")
logger.debug(f"邮箱 {email_account.email} 获取到验证码: {verification_code}")
await self.random_delay()
# 验证码验证
redirect_url = await self._verify_code(
proxy=proxy,
token=token2,
code=verification_code,
pending_token=pending_token,
email=email,
session_id=session_id
)
if not redirect_url:
raise RegisterError("No redirect URL found")
await self.random_delay()
# callback请求
cookies = await self._callback(proxy, redirect_url)
if not cookies:
raise RegisterError("Failed to get cookies")
logger.success(f"账号 {email_account.email} 注册成功")
return {
'account_id': email_account.id,
'cursor_password': cursor_password,
'cursor_cookie': cookies,
'cursor_jwt': extract_jwt(cookies)
}
except Exception as e:
logger.error(f"账号 {email_account.email} 注册失败: {str(e)}")
if not str(e).startswith("Email is not available"):
await self.email_manager.update_account_status(email_account.id, 'failed')
raise RegisterError(f"Registration failed: {str(e)}")
async def _first_register(
self,
proxy: str,
token: str,
email: str,
email_account: EmailAccount,
session_id: str
) -> tuple[str, str, str]:
"""第一次注册请求"""
logger.debug(f"开始第一次注册请求 - 邮箱: {email}, 代理: {proxy}")
first_name, last_name = self.form_builder._generate_name()
# 在headers中定义boundary
boundary = "----WebKitFormBoundary2rKlvTagBEhneWi3"
headers = {
"accept": "text/x-component",
"next-action": "770926d8148e29539286d20e1c1548d2aff6c0b9",
"content-type": f"multipart/form-data; boundary={boundary}",
"origin": "https://authenticator.cursor.sh",
"sec-fetch-dest": "empty",
"sec-fetch-mode": "cors",
"sec-fetch-site": "same-origin"
}
params = {
"first_name": first_name,
"last_name": last_name,
"email": email,
"state": "%7B%22returnTo%22%3A%22%2Fsettings%22%7D",
"redirect_uri": "https://cursor.com/api/auth/callback",
}
# 构建form数据
form_data, cursor_password = self.form_builder.build_register_form(boundary, email, token)
response = await self.fetch_manager.request(
"POST",
"https://authenticator.cursor.sh/sign-up/password",
headers=headers,
params=params,
data=form_data,
proxy=proxy
)
if 'error' in response:
raise RegisterError(f"First register request failed: {response['error']}")
text = response['body'].decode()
pending_token = await self._extract_auth_token(text, email_account, self.email_manager)
if not pending_token:
raise RegisterError("Failed to extract auth token")
logger.debug(f"第一次请求完成 - pending_token: {pending_token[:10]}...")
return email, pending_token, cursor_password
async def _verify_code(
self,
proxy: str,
token: str,
code: str,
pending_token: str,
email: str,
session_id: str
) -> str:
"""验证码验证请求"""
logger.debug(f"开始验证码验证 - 邮箱: {email}, 验证码: {code}")
boundary = "----WebKitFormBoundaryqEBf0rEYwwb9aUoF"
headers = {
"accept": "text/x-component",
"content-type": f"multipart/form-data; boundary={boundary}",
"next-action": "e75011da58d295bef5aa55740d0758a006468655",
"origin": "https://authenticator.cursor.sh",
"sec-fetch-dest": "empty",
"sec-fetch-mode": "cors",
"sec-fetch-site": "same-origin",
"user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/133.0.0.0 Safari/537.36"
}
params = {
"email": email,
"pending_authentication_token": pending_token,
"state": "%7B%22returnTo%22%3A%22%2Fsettings%22%7D",
"redirect_uri": "https://cursor.com/api/auth/callback",
"authorization_session_id": session_id
}
form_data = self.form_builder.build_verify_form(
boundary=boundary,
email=email,
token=token,
code=code,
pending_token=pending_token,
)
response = await self.fetch_manager.request(
"POST",
"https://authenticator.cursor.sh/email-verification",
headers=headers,
params=params,
data=form_data,
proxy=proxy
)
redirect_url = response.get('headers', {}).get('x-action-redirect')
if not redirect_url:
raise RegisterError("未找到重定向URL响应头: %s" % json.dumps(response.get('headers')))
return redirect_url
async def _callback(self, proxy: str, redirect_url: str) -> str:
"""Callback请求"""
logger.debug(f"开始callback请求 - URL: {redirect_url[:50]}...")
parsed = urlparse(redirect_url)
code = parse_qs(parsed.query)['code'][0]
logger.debug(f"从URL提取的code: {code[:10]}...")
headers = {
"accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7",
"accept-language": "zh-CN,zh;q=0.9",
"sec-fetch-dest": "document",
"sec-fetch-mode": "navigate",
"sec-fetch-site": "cross-site",
"upgrade-insecure-requests": "1",
"user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/133.0.0.0 Safari/537.36"
}
callback_url = "https://www.cursor.com/api/auth/callback"
params = {
"code": code,
"state": "%7B%22returnTo%22%3A%22%2Fsettings%22%7D"
}
response = await self.fetch_manager.request(
"GET",
callback_url,
headers=headers,
params=params,
proxy=proxy,
allow_redirects=False
)
if 'error' in response:
raise RegisterError(f"Callback request failed: {response['error']}")
cookies = response['headers'].get('set-cookie')
if cookies:
logger.debug("成功获取到cookies")
else:
logger.error("未获取到cookies")
return cookies
async def _get_verification_code_with_retry(self, email: str, refresh_token: str, client_id: str, max_retries: int = 3) -> Optional[str]:
"""带重试的验证码获取"""
for attempt in range(max_retries):
try:
code = await self.email_manager.get_verification_code(
email, refresh_token, client_id
)
if code:
return code
await asyncio.sleep(2) # 短暂延迟后重试
except Exception as e:
logger.warning(f"{attempt + 1} 次获取验证码失败: {str(e)}")
if attempt == max_retries - 1: # 最后一次尝试
return None
await asyncio.sleep(2) # 失败后等待更长时间
return None