Files
auto_cursor/register/host_register_worker.py
2025-04-07 15:26:55 +08:00

421 lines
16 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import asyncio
import json
import random
import string
from typing import Optional, Tuple, Dict
from urllib.parse import parse_qs, urlparse
from loguru import logger
from core.config import Config
from core.exceptions import RegisterError
from services.fetch_manager import FetchManager
from services.self_hosted_email import SelfHostedEmail
from services.uuid import ULID
def extract_jwt(cookie_string: str) -> str:
"""从cookie字符串中提取JWT token"""
try:
return cookie_string.split(';')[0].split('=')[1].split('%3A%3A')[1]
except Exception as e:
logger.error(f"[错误] 提取JWT失败: {str(e)}")
return ""
class FormBuilder:
@staticmethod
def _generate_password() -> str:
"""生成随机密码
规则: 12-16位包含大小写字母、数字和特殊字符
"""
length = random.randint(12, 16)
lowercase = string.ascii_lowercase
uppercase = string.ascii_uppercase
digits = string.digits
special = "!@#$%^&*"
# 确保每种字符至少有一个
password = [
random.choice(lowercase),
random.choice(uppercase),
random.choice(digits),
random.choice(special)
]
# 填充剩余长度
all_chars = lowercase + uppercase + digits + special
password.extend(random.choice(all_chars) for _ in range(length - 4))
# 打乱顺序
random.shuffle(password)
return ''.join(password)
@staticmethod
def _generate_name() -> tuple[str, str]:
"""生成随机的名字和姓氏
Returns:
tuple: (first_name, last_name)
"""
first_names = ["Alex", "Sam", "Chris", "Jordan", "Taylor", "Morgan", "Casey", "Drew", "Pat", "Quinn"]
last_names = ["Smith", "Johnson", "Brown", "Davis", "Wilson", "Moore", "Taylor", "Anderson", "Thomas", "Jackson"]
return (
random.choice(first_names),
random.choice(last_names)
)
@staticmethod
def build_register_form(boundary: str, email: str, token: str) -> tuple[str, str]:
"""构建注册表单数据,返回(form_data, password)"""
password = FormBuilder._generate_password()
first_name, last_name = FormBuilder._generate_name()
fields = {
"1_state": "{\"returnTo\":\"/settings\"}",
"1_redirect_uri": "https://cursor.com/api/auth/callback",
"1_bot_detection_token": token,
"1_first_name": first_name,
"1_last_name": last_name,
"1_email": email,
"1_password": password,
"1_intent": "sign-up",
"0": "[\"$K1\"]"
}
form_data = []
for key, value in fields.items():
form_data.append(f'--{boundary}')
form_data.append(f'Content-Disposition: form-data; name="{key}"')
form_data.append('')
form_data.append(value)
form_data.append(f'--{boundary}--')
return '\r\n'.join(form_data), password
@staticmethod
def build_verify_form(boundary: str, email: str, token: str, code: str, pending_token: str) -> str:
"""构建验证表单数据"""
fields = {
"1_pending_authentication_token": pending_token,
"1_email": email,
"1_state": "{\"returnTo\":\"/settings\"}",
"1_redirect_uri": "https://cursor.com/api/auth/callback",
"1_bot_detection_token": token,
"1_code": code,
"0": "[\"$K1\"]"
}
form_data = []
for key, value in fields.items():
form_data.append(f'--{boundary}')
form_data.append(f'Content-Disposition: form-data; name="{key}"')
form_data.append('')
form_data.append(value)
form_data.append(f'--{boundary}--')
return '\r\n'.join(form_data)
class HostRegisterWorker:
"""自建邮箱注册工作器,使用自建邮箱完成注册流程"""
def __init__(self, config: Config, fetch_manager: FetchManager, self_hosted_email: SelfHostedEmail):
self.config = config
self.fetch_manager = fetch_manager
self.self_hosted_email = self_hosted_email
self.form_builder = FormBuilder()
self.uuid = ULID()
async def random_delay(self):
delay = random.uniform(*self.config.register_config.delay_range)
await asyncio.sleep(delay)
@staticmethod
async def _extract_auth_token(response_text: str) -> str | None:
"""从响应文本中提取pending_authentication_token"""
res = response_text.split('\n')
logger.debug(f"开始提取 auth_token响应行数: {len(res)}")
# 检查邮箱是否可用
for line in res:
if '"code":"email_not_available"' in line:
logger.error("不受支持的邮箱")
raise RegisterError("Email is not available")
# 像register_worker.py中一样使用简单的路径
try:
for i, r in enumerate(res):
if r.startswith('0:'):
logger.debug(f"在第 {i+1} 行找到匹配")
data = json.loads(r.split('0:')[1])
# 使用完全相同的路径
auth_data = data[1][0][0][1]["children"][1]["children"][1]["children"][1]["children"][0]
params_str = auth_data.split('?')[1]
params_dict = json.loads(params_str)
token = params_dict['pending_authentication_token']
logger.debug(f"提取成功: {token[:10]}...")
return token
except Exception as e:
logger.error(f"提取token失败: {str(e)}")
logger.debug(f"响应内容预览: {response_text[:200]}...")
# 保存完整响应到文件以便调试
try:
with open('debug_response.txt', 'w', encoding='utf-8') as f:
f.write(response_text)
logger.debug("完整响应已保存到debug_response.txt")
except Exception:
pass
# 尝试备选方法 - 在整个响应文本中查找token
try:
import re
match = re.search(r'pending_authentication_token["\']?\s*[:=]\s*["\']?([^"\'&,\s]+)["\']?', response_text)
if match:
token = match.group(1)
logger.debug(f"使用正则表达式提取成功: {token[:10]}...")
return token
except Exception as e:
logger.error(f"正则表达式提取失败: {str(e)}")
return None
async def register(self, proxy: str, token_pair: Tuple[str, str]) -> Optional[Dict]:
"""使用自建邮箱完成注册流程"""
if not self.self_hosted_email:
raise RegisterError("自建邮箱服务未配置")
token1, token2 = token_pair
session_id = self.uuid.generate()
try:
# 从自建邮箱API获取邮箱
email = await self.self_hosted_email.get_email()
if not email:
raise RegisterError("获取自建邮箱失败")
logger.info(f"开始使用自建邮箱注册: {email}")
# 第一次注册请求
email, pending_token, cursor_password = await self._first_register(
proxy,
token1,
email,
session_id
)
await self.random_delay()
# 从自建邮箱API获取验证码
verification_code = await self.self_hosted_email.get_verification_code(email)
if not verification_code:
logger.error(f"自建邮箱 {email} 获取验证码失败")
raise RegisterError("获取验证码失败")
logger.debug(f"自建邮箱 {email} 获取到验证码: {verification_code}")
await self.random_delay()
# 验证码验证
redirect_url = await self._verify_code(
proxy=proxy,
token=token2,
code=verification_code,
pending_token=pending_token,
email=email,
session_id=session_id
)
if not redirect_url:
raise RegisterError("未找到重定向URL")
await self.random_delay()
# callback请求
cookies = await self._callback(proxy, redirect_url)
if not cookies:
raise RegisterError("获取cookies失败")
# 提取JWT
jwt_token = extract_jwt(cookies)
logger.success(f"自建邮箱账号 {email} 注册成功")
return {
'email': email,
'email_password': '', # 添加email_password字段可以为空字符串
'cursor_password': cursor_password,
'cursor_cookie': cookies,
'cursor_jwt': jwt_token
}
except Exception as e:
logger.error(f"自建邮箱账号注册失败: {str(e)}")
raise RegisterError(f"注册失败: {str(e)}")
async def _first_register(
self,
proxy: str,
token: str,
email: str,
session_id: str
) -> tuple[str, str, str]:
"""自建邮箱的第一次注册请求"""
logger.debug(f"开始第一次注册请求 - 自建邮箱: {email}, 代理: {proxy}")
first_name, last_name = self.form_builder._generate_name()
# 在headers中定义boundary
boundary = "----WebKitFormBoundary2rKlvTagBEhneWi3"
headers = {
"accept": "text/x-component",
"next-action": "770926d8148e29539286d20e1c1548d2aff6c0b9",
"content-type": f"multipart/form-data; boundary={boundary}",
"origin": "https://authenticator.cursor.sh",
"sec-fetch-dest": "empty",
"sec-fetch-mode": "cors",
"sec-fetch-site": "same-origin",
"user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/133.0.0.0 Safari/537.36"
}
params = {
"first_name": first_name,
"last_name": last_name,
"email": email,
"state": "%7B%22returnTo%22%3A%22%2Fsettings%22%7D",
"redirect_uri": "https://cursor.com/api/auth/callback",
}
# 构建form数据
form_data, cursor_password = self.form_builder.build_register_form(boundary, email, token)
response = await self.fetch_manager.request(
"POST",
"https://authenticator.cursor.sh/sign-up/password",
headers=headers,
params=params,
data=form_data,
proxy=proxy
)
if 'error' in response:
raise RegisterError(f"First register request failed: {response['error']}")
text = response['body'].decode()
# 使用更简单的方法提取token与register_worker.py相同的方式
pending_token = await self._extract_auth_token(text)
if not pending_token:
raise RegisterError("Failed to extract auth token")
logger.debug(f"第一次请求完成 - pending_token: {pending_token[:10]}...")
return email, pending_token, cursor_password
async def _verify_code(
self,
proxy: str,
token: str,
code: str,
pending_token: str,
email: str,
session_id: str
) -> str:
"""验证码验证请求"""
logger.debug(f"开始验证码验证 - 邮箱: {email}, 验证码: {code}")
boundary = "----WebKitFormBoundaryqEBf0rEYwwb9aUoF"
headers = {
"accept": "text/x-component",
"content-type": f"multipart/form-data; boundary={boundary}",
"next-action": "e75011da58d295bef5aa55740d0758a006468655",
"origin": "https://authenticator.cursor.sh",
"sec-fetch-dest": "empty",
"sec-fetch-mode": "cors",
"sec-fetch-site": "same-origin",
"user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/133.0.0.0 Safari/537.36"
}
params = {
"email": email,
"pending_authentication_token": pending_token,
"state": "%7B%22returnTo%22%3A%22%2Fsettings%22%7D",
"redirect_uri": "https://cursor.com/api/auth/callback",
"authorization_session_id": session_id
}
form_data = self.form_builder.build_verify_form(
boundary=boundary,
email=email,
token=token,
code=code,
pending_token=pending_token,
)
response = await self.fetch_manager.request(
"POST",
"https://authenticator.cursor.sh/email-verification",
headers=headers,
params=params,
data=form_data,
proxy=proxy
)
redirect_url = response.get('headers', {}).get('x-action-redirect')
if not redirect_url:
logger.error(f"未找到重定向URL响应头: {json.dumps(response.get('headers', {}))}")
# 尝试从响应体中提取
body = response.get('body', b'').decode()
if 'redirect' in body.lower():
logger.debug("尝试从响应体中提取重定向URL")
import re
match = re.search(r'redirect[^"\']*["\']([^"\']+)["\']', body)
if match:
redirect_url = match.group(1)
logger.debug(f"从响应体提取到重定向URL: {redirect_url}")
if not redirect_url:
raise RegisterError("未找到重定向URL响应头: %s" % json.dumps(response.get('headers')))
return redirect_url
async def _callback(self, proxy: str, redirect_url: str) -> str:
"""Callback请求"""
logger.debug(f"开始callback请求 - URL: {redirect_url[:50]}...")
parsed = urlparse(redirect_url)
code = parse_qs(parsed.query)['code'][0]
logger.debug(f"从URL提取的code: {code[:10]}...")
headers = {
"accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7",
"accept-language": "zh-CN,zh;q=0.9",
"sec-fetch-dest": "document",
"sec-fetch-mode": "navigate",
"sec-fetch-site": "cross-site",
"upgrade-insecure-requests": "1",
"user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/133.0.0.0 Safari/537.36"
}
callback_url = "https://www.cursor.com/api/auth/callback"
params = {
"code": code,
"state": "%7B%22returnTo%22%3A%22%2Fsettings%22%7D"
}
response = await self.fetch_manager.request(
"GET",
callback_url,
headers=headers,
params=params,
proxy=proxy,
allow_redirects=False
)
if 'error' in response:
raise RegisterError(f"Callback request failed: {response['error']}")
cookies = response['headers'].get('set-cookie')
if cookies:
logger.debug("成功获取到cookies")
else:
logger.error("未获取到cookies")
return cookies