commit f9dcea2d0625e983e3a801b875bf45dae60849d7 Author: huangzhenpc Date: Tue Aug 12 14:33:24 2025 +0800 first commit: DuoPlus云手机协议注册工具 - 完整实现 diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..71b176b --- /dev/null +++ b/.gitignore @@ -0,0 +1,48 @@ +# Python +__pycache__/ +*.py[cod] +*$py.class +*.so +.Python +env/ +venv/ +ENV/ +build/ +develop-eggs/ +dist/ +downloads/ +eggs/ +.eggs/ +lib/ +lib64/ +parts/ +sdist/ +var/ +wheels/ +*.egg-info/ +.installed.cfg +*.egg + +# Environment +.env +.env.local +.env.*.local + +# IDE +.vscode/ +.idea/ +*.swp +*.swo +*~ + +# Logs +*.log +logs/ + +# OS +.DS_Store +Thumbs.db + +# Project specific +captured_requests.json +test_accounts.txt \ No newline at end of file diff --git a/QUICKSTART.md b/QUICKSTART.md new file mode 100644 index 0000000..505ac93 --- /dev/null +++ b/QUICKSTART.md @@ -0,0 +1,124 @@ +# DuoPlus 协议注册 - 快速开始指南 + +## 🚀 快速上手 + +### 1. 安装依赖 +```bash +pip install -r requirements.txt +``` + +### 2. 配置 API Key +```bash +cp .env.example .env +# 编辑 .env 文件,填入你的 2captcha API Key +``` + +### 3. 运行注册 +```bash +python main.py +# 或 +python duoplus_register.py +``` + +## 📝 重要说明 + +### API 端点说明 +当前脚本中的 API 端点是基于常见模式推测的,实际使用时可能需要: + +1. **使用网络抓包工具分析实际请求** + ```bash + # 安装 mitmproxy + pip install mitmproxy + + # 运行分析脚本 + mitmdump -s analyze_requests.py -p 8080 + ``` + +2. **需要更新的关键参数** + - 腾讯验证码 app_id(在 `duoplus_register.py` 第 53 行) + - API 端点 URL + - 请求头参数 + +### 腾讯验证码处理流程 + +1. **初始化验证码** + - 获取验证码配置(app_id) + - 调用 2captcha API 提交任务 + +2. **等待识别结果** + - 2captcha 会返回 ticket 和 randstr + - 这两个参数用于验证 + +3. **提交验证结果** + - 将 ticket 和 randstr 随注册请求一起提交 + +## 🛠️ 调试技巧 + +### 1. 测试 2captcha 连接 +```bash +python test_register.py +# 选择选项 1 +``` + +### 2. 查看详细日志 +脚本使用彩色输出显示不同类型的信息: +- 🟦 蓝色:信息 +- 🟨 黄色:警告/输入提示 +- 🟩 绿色:成功 +- 🟥 红色:错误 + +### 3. 常见问题 + +**Q: 验证码识别失败** +- 检查 2captcha 余额 +- 确认 API Key 正确 +- 检查网络连接 + +**Q: 注册请求失败** +- 使用抓包工具获取最新的 API 端点 +- 检查请求头是否完整 +- 确认邮箱未被注册 + +## 📌 注意事项 + +1. **费用提醒**:每次验证码识别约需 $0.003 + +2. **频率限制**:避免短时间内大量注册 + +3. **邮箱要求**:使用真实可接收邮件的邮箱 + +4. **合规使用**:遵守服务条款,仅用于合法用途 + +## 💡 扩展使用 + +### 批量注册(示例) +```python +from duoplus_register import DuoPlusRegister +import os +from dotenv import load_dotenv + +load_dotenv() +registrar = DuoPlusRegister(os.getenv('CAPTCHA_API_KEY')) + +# 批量注册 +emails = ["test1@example.com", "test2@example.com"] +for email in emails: + success = registrar.auto_register(email) + if success: + print(f"✅ {email} 注册成功") + else: + print(f"❌ {email} 注册失败") +``` + +## 🔧 项目文件说明 + +- `main.py` - 主入口文件 +- `duoplus_register.py` - 核心注册逻辑 +- `captcha_solver.py` - 2captcha 集成 +- `test_register.py` - 测试工具 +- `analyze_requests.py` - 网络请求分析工具 +- `.env` - 配置文件(需自行创建) + +--- + +如有问题,请查看 README.md 获取更详细的文档。 \ No newline at end of file diff --git a/README.md b/README.md new file mode 100644 index 0000000..d43bf5b --- /dev/null +++ b/README.md @@ -0,0 +1,109 @@ +# DuoPlus 协议注册工具 + +使用 Python 实现的 DuoPlus 自动注册工具,集成 2captcha 自动处理腾讯滑块验证码。 + +## 功能特点 + +- ✅ 协议注册(直接调用 API) +- ✅ 自动处理腾讯滑块验证码 +- ✅ 支持自定义密码或随机生成 +- ✅ 彩色控制台输出 +- ✅ 完整的错误处理 + +## 安装步骤 + +1. 克隆或下载项目 +2. 安装依赖: +```bash +pip install -r requirements.txt +``` + +3. 配置 2captcha API Key: +```bash +cp .env.example .env +``` +然后编辑 `.env` 文件,填入你的 2captcha API Key: +``` +CAPTCHA_API_KEY=你的2captcha_api_key +``` + +## 使用方法 + +### 基础使用 + +运行注册脚本: +```bash +python duoplus_register.py +``` + +按照提示输入: +- 邮箱地址 +- 是否使用随机密码 +- 密码(如果选择自定义) +- 邮箱验证码 + +### 高级使用 + +你也可以直接在代码中调用: + +```python +from duoplus_register import DuoPlusRegister +import os +from dotenv import load_dotenv + +# 加载环境变量 +load_dotenv() + +# 创建注册器 +registrar = DuoPlusRegister(os.getenv('CAPTCHA_API_KEY')) + +# 执行注册 +success = registrar.auto_register("your_email@example.com", "your_password") +``` + +## 注意事项 + +1. **验证码识别费用**:每次使用 2captcha 识别腾讯验证码都会产生费用(约 $0.003/次) + +2. **邮箱要求**: + - 必须是有效的邮箱地址 + - 需要能够接收验证码 + +3. **API 端点**: + - 当前脚本中的 API 端点是基于分析得出的 + - 如果注册失败,可能需要使用网络抓包工具重新分析 + +## 项目结构 + +``` +├── captcha_solver.py # 2captcha 验证码处理模块 +├── duoplus_register.py # 主注册脚本 +├── requirements.txt # Python 依赖 +├── .env.example # 环境变量示例 +├── .env # 实际配置文件(需自行创建) +└── README.md # 本文档 +``` + +## 故障排查 + +1. **验证码识别失败** + - 检查 2captcha API Key 是否正确 + - 检查 2captcha 账户余额 + - 确认网络连接正常 + +2. **注册失败** + - 检查邮箱是否已被注册 + - 确认验证码输入正确 + - 查看控制台错误信息 + +3. **API 请求失败** + - 可能需要更新请求头或 API 端点 + - 使用浏览器开发者工具分析实际请求 + +## 免责声明 + +本工具仅供学习和研究使用,请遵守相关服务条款。使用本工具产生的任何后果由使用者自行承担。 + +## 更新日志 + +- 2024-01-xx: 初始版本发布 \ No newline at end of file diff --git a/analyze_requests.py b/analyze_requests.py new file mode 100644 index 0000000..b01148d --- /dev/null +++ b/analyze_requests.py @@ -0,0 +1,80 @@ +import json +from mitmproxy import http +from mitmproxy.tools.main import mitmdump +import sys +from colorama import init, Fore, Style + +# 初始化 colorama +init(autoreset=True) + +class RequestAnalyzer: + """分析 DuoPlus 注册相关的网络请求""" + + def __init__(self): + self.target_domain = "my.duoplus.cn" + self.captured_requests = [] + + def request(self, flow: http.HTTPFlow) -> None: + """处理请求""" + if self.target_domain in flow.request.pretty_host: + request_info = { + "method": flow.request.method, + "url": flow.request.pretty_url, + "headers": dict(flow.request.headers), + "body": None + } + + # 尝试解析请求体 + if flow.request.content: + try: + request_info["body"] = json.loads(flow.request.content.decode('utf-8')) + except: + request_info["body"] = flow.request.content.decode('utf-8', errors='ignore') + + # 打印关键请求 + if any(keyword in flow.request.pretty_url for keyword in ['auth', 'register', 'captcha', 'verify']): + print(f"\n{Fore.CYAN}[REQUEST] {flow.request.method} {flow.request.pretty_url}") + print(f"{Fore.YELLOW}Headers: {json.dumps(dict(flow.request.headers), indent=2, ensure_ascii=False)}") + if request_info["body"]: + print(f"{Fore.GREEN}Body: {json.dumps(request_info['body'], indent=2, ensure_ascii=False)}") + + def response(self, flow: http.HTTPFlow) -> None: + """处理响应""" + if self.target_domain in flow.request.pretty_host: + # 打印关键响应 + if any(keyword in flow.request.pretty_url for keyword in ['auth', 'register', 'captcha', 'verify']): + print(f"\n{Fore.MAGENTA}[RESPONSE] {flow.response.status_code} for {flow.request.pretty_url}") + + # 尝试解析响应体 + if flow.response.content: + try: + response_body = json.loads(flow.response.content.decode('utf-8')) + print(f"{Fore.GREEN}Response: {json.dumps(response_body, indent=2, ensure_ascii=False)}") + except: + print(f"{Fore.RED}Response (raw): {flow.response.content.decode('utf-8', errors='ignore')[:200]}...") + +# 创建分析器实例 +analyzer = RequestAnalyzer() + +def request(flow: http.HTTPFlow) -> None: + analyzer.request(flow) + +def response(flow: http.HTTPFlow) -> None: + analyzer.response(flow) + +if __name__ == "__main__": + print(f""" +{Fore.CYAN}=== DuoPlus 网络请求分析工具 === + +{Fore.YELLOW}使用方法: +1. 安装 mitmproxy: pip install mitmproxy +2. 运行代理: mitmdump -s analyze_requests.py -p 8080 +3. 配置浏览器使用 127.0.0.1:8080 作为HTTP代理 +4. 访问 https://my.duoplus.cn/sign-up 并执行注册操作 +5. 查看控制台输出的请求和响应信息 + +{Fore.GREEN}提示: +- 关注包含 'auth', 'register', 'captcha', 'verify' 的请求 +- 记录验证码相关的 app_id 和其他参数 +- 保存完整的请求头和请求体格式 +""") \ No newline at end of file diff --git a/captcha_solver.py b/captcha_solver.py new file mode 100644 index 0000000..c7b470a --- /dev/null +++ b/captcha_solver.py @@ -0,0 +1,127 @@ +import requests +import time +import json +from typing import Optional, Dict, Any + +class TwoCaptchaSolver: + """2captcha 腾讯滑块验证码处理器""" + + def __init__(self, api_key: str): + self.api_key = api_key + self.base_url = "http://2captcha.com" + + def solve_tencent_captcha(self, app_id: str, page_url: str, extra_params: Optional[Dict[str, Any]] = None) -> Optional[Dict[str, Any]]: + """ + 解决腾讯滑块验证码 + + Args: + app_id: 腾讯验证码的 app_id + page_url: 验证码所在页面的 URL + extra_params: 额外的参数(如果需要) + + Returns: + 包含验证结果的字典,或 None(如果失败) + """ + # 提交验证码任务 + submit_url = f"{self.base_url}/in.php" + submit_params = { + "key": self.api_key, + "method": "tencent", + "app_id": app_id, + "pageurl": page_url, + "json": 1 + } + + if extra_params: + submit_params.update(extra_params) + + try: + print(f"[2captcha] 提交腾讯验证码任务...") + response = requests.post(submit_url, data=submit_params) + result = response.json() + + if result.get("status") != 1: + print(f"[2captcha] 提交失败: {result.get('error_text', 'Unknown error')}") + return None + + task_id = result.get("request") + print(f"[2captcha] 任务ID: {task_id}") + + # 等待并获取结果 + return self._get_result(task_id) + + except Exception as e: + print(f"[2captcha] 错误: {str(e)}") + return None + + def _get_result(self, task_id: str, max_attempts: int = 60, wait_time: int = 5) -> Optional[Dict[str, Any]]: + """ + 获取验证码识别结果 + + Args: + task_id: 2captcha 任务ID + max_attempts: 最大尝试次数 + wait_time: 每次尝试之间的等待时间(秒) + + Returns: + 验证结果字典 + """ + result_url = f"{self.base_url}/res.php" + + for attempt in range(max_attempts): + params = { + "key": self.api_key, + "action": "get", + "id": task_id, + "json": 1 + } + + try: + response = requests.get(result_url, params=params) + result = response.json() + + if result.get("status") == 1: + print("[2captcha] 验证码识别成功") + # 返回腾讯验证码需要的参数 + return { + "ticket": result.get("request", {}).get("ticket"), + "randstr": result.get("request", {}).get("randstr"), + "raw_response": result + } + elif result.get("request") == "CAPCHA_NOT_READY": + print(f"[2captcha] 等待识别中... ({attempt + 1}/{max_attempts})") + time.sleep(wait_time) + else: + print(f"[2captcha] 获取结果失败: {result.get('error_text', 'Unknown error')}") + return None + + except Exception as e: + print(f"[2captcha] 获取结果时出错: {str(e)}") + time.sleep(wait_time) + + print("[2captcha] 超时,未能获取验证码结果") + return None + + def report_good(self, task_id: str): + """报告验证码正确""" + try: + requests.get(f"{self.base_url}/res.php", params={ + "key": self.api_key, + "action": "reportgood", + "id": task_id + }) + print("[2captcha] 已报告验证码正确") + except: + pass + + def report_bad(self, task_id: str): + """报告验证码错误""" + try: + requests.get(f"{self.base_url}/res.php", params={ + "key": self.api_key, + "action": "reportbad", + "id": task_id + }) + print("[2captcha] 已报告验证码错误") + except: + pass \ No newline at end of file diff --git a/duoplus_register.py b/duoplus_register.py new file mode 100644 index 0000000..edef56d --- /dev/null +++ b/duoplus_register.py @@ -0,0 +1,455 @@ +import requests +import json +import time +import random +import string +import hashlib +from typing import Optional, Dict, Any +from dotenv import load_dotenv +import os +from captcha_solver import TwoCaptchaSolver +from tracker import DataTracker +from email_manager import EmailManager, AutoEmailVerification +from colorama import init, Fore, Style + +# 初始化 colorama +init(autoreset=True) + +class DuoPlusRegister: + """DuoPlus 协议注册类""" + + def __init__(self, captcha_api_key: str): + self.session = requests.Session() + self.captcha_solver = TwoCaptchaSolver(captcha_api_key) + self.tracker = DataTracker() # 初始化数据追踪器 + self.base_url = "https://my.duoplus.cn" + self.api_url = "https://api.duoplus.cn" + self.captcha_app_id = None + + # 生成设备指纹 + self.device_fp = self._generate_device_fp() + + # 设置请求头 + self.session.headers.update({ + 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/138.0.0.0 Safari/537.36', + 'Accept': 'application/json, text/plain, */*', + 'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8', + 'Accept-Encoding': 'gzip, deflate, br', + 'Origin': 'https://my.duoplus.cn', + 'Referer': 'https://my.duoplus.cn/', + 'Content-Type': 'application/json', + 'sec-ch-ua': '"Not)A;Brand";v="8", "Chromium";v="138", "Google Chrome";v="138"', + 'sec-ch-ua-mobile': '?0', + 'sec-ch-ua-platform': '"Windows"', + 'sec-fetch-site': 'same-site', + 'sec-fetch-mode': 'cors', + 'sec-fetch-dest': 'empty', + 'pragma': 'no-cache', + 'cache-control': 'no-cache', + 'lang': 'zh-CN', + 'duoplus-fp': self.device_fp, + 'priority': 'u=1, i' + }) + + # 获取初始 cookies 和配置 + self._init_session() + + def _generate_device_fp(self) -> str: + """生成设备指纹""" + # 简单的设备指纹生成,实际可能需要更复杂的算法 + random_str = ''.join(random.choices(string.ascii_lowercase + string.digits, k=16)) + return hashlib.md5(random_str.encode()).hexdigest() + + def _init_session(self): + """初始化会话,获取必要的 cookies 和配置""" + try: + # 先访问注册页面获取 cookies + response = self.session.get(f"{self.base_url}/sign-up") + print(f"{Fore.GREEN}[INFO] 初始化会话成功") + + # 发送数据追踪请求(模拟真实浏览器行为) + self.tracker.track_registration_page() + + # 获取腾讯验证码配置 + self._get_captcha_config() + + except Exception as e: + print(f"{Fore.RED}[ERROR] 初始化会话失败: {str(e)}") + + def _get_captcha_config(self) -> Optional[Dict[str, Any]]: + """ + 获取腾讯验证码配置信息 + + Returns: + 验证码配置信息 + """ + try: + # 调用 API 获取验证码配置 + config_url = f"{self.api_url}/common/tencentConfig" + + # 更新请求头中的 referer + headers = self.session.headers.copy() + headers['referer'] = 'https://my.duoplus.cn/' + headers['authorization'] = '' # 注册时可能不需要 authorization + + response = self.session.get(config_url, headers=headers) + + if response.status_code == 200: + data = response.json() + if data.get('code') == 200: + self.captcha_app_id = data['data']['captcha_app_id'] + print(f"{Fore.GREEN}[INFO] 获取验证码配置成功,app_id: {self.captcha_app_id}") + return { + "app_id": self.captcha_app_id, + "page_url": f"{self.base_url}/sign-up" + } + else: + print(f"{Fore.RED}[ERROR] 获取验证码配置失败: {data.get('message')}") + else: + print(f"{Fore.RED}[ERROR] 请求验证码配置失败: HTTP {response.status_code}") + + except Exception as e: + print(f"{Fore.RED}[ERROR] 获取验证码配置异常: {str(e)}") + + # 如果获取失败,返回默认值 + return { + "app_id": "192789496", # 使用实际的 app_id 作为备份 + "page_url": f"{self.base_url}/sign-up" + } + + def send_verification_code(self, email: str) -> bool: + """ + 发送验证码 + + Args: + email: 邮箱地址 + + Returns: + 是否成功发送 + """ + print(f"{Fore.CYAN}[INFO] 准备发送验证码到: {email}") + + # 获取验证码配置 + captcha_config = self._get_captcha_config() + if not captcha_config: + print(f"{Fore.RED}[ERROR] 无法获取验证码配置") + return False + + # 使用 2captcha 解决验证码 + print(f"{Fore.YELLOW}[INFO] 正在处理腾讯滑块验证码...") + captcha_result = self.captcha_solver.solve_tencent_captcha( + app_id=captcha_config["app_id"], + page_url=captcha_config["page_url"] + ) + + if not captcha_result: + print(f"{Fore.RED}[ERROR] 验证码处理失败") + return False + + # 发送验证码请求 - 使用正确的 API 端点 + send_code_url = f"{self.api_url}/email/verificationCode" + + # 正确的请求体格式 + payload = { + "type": 1, # 1 表示注册 + "email": email, + "rand_str": captcha_result["randstr"], + "ticket": captcha_result["ticket"] + } + + # 更新请求头 + headers = self.session.headers.copy() + headers.update({ + 'referer': 'https://my.duoplus.cn/', + 'authorization': '', + 'accept': 'application/json' + }) + + try: + print(f"{Fore.YELLOW}[INFO] 发送请求到: {send_code_url}") + print(f"{Fore.YELLOW}[DEBUG] 请求体: {json.dumps(payload, ensure_ascii=False)}") + + response = self.session.post(send_code_url, json=payload, headers=headers) + result = response.json() + + if response.status_code == 200 and result.get("code") == 200: + print(f"{Fore.GREEN}[SUCCESS] 验证码发送成功: {result.get('message', 'Success')}") + return True + else: + print(f"{Fore.RED}[ERROR] 发送验证码失败: {result.get('message', 'Unknown error')}") + return False + + except Exception as e: + print(f"{Fore.RED}[ERROR] 请求失败: {str(e)}") + return False + + def register(self, email: str, password: str, verification_code: str) -> Dict[str, Any]: + """ + 执行注册 + + Args: + email: 邮箱地址 + password: 密码 + verification_code: 邮箱验证码 + + Returns: + 注册结果字典,包含 token 等信息 + """ + print(f"{Fore.CYAN}[INFO] 开始注册账号...") + + # 使用正确的注册端点 + register_url = f"{self.api_url}/account/signup" + + # 正确的请求体格式 + payload = { + "username": email, # 使用邮箱作为用户名 + "password": password, + "verification_code": verification_code, + "isAgress": True, # 同意条款 + "from_landing": 0 + } + + # 更新请求头 + headers = self.session.headers.copy() + headers.update({ + 'referer': 'https://my.duoplus.cn/', + 'authorization': '', + 'accept': 'application/json' + }) + + try: + print(f"{Fore.YELLOW}[INFO] 发送注册请求到: {register_url}") + print(f"{Fore.YELLOW}[DEBUG] 请求体: {json.dumps(payload, ensure_ascii=False)}") + + response = self.session.post(register_url, json=payload, headers=headers) + result = response.json() + + if response.status_code == 200 and result.get("code") == 200: + print(f"{Fore.GREEN}[SUCCESS] 注册成功!") + + # 获取返回的数据 + data = result.get("data", {}) + if data: + print(f"{Fore.GREEN}[INFO] Access Token: {data.get('access_token')[:20]}...") + print(f"{Fore.GREEN}[INFO] Token 有效期: {data.get('expired_in')} 秒") + print(f"{Fore.GREEN}[INFO] 赠送余额: {data.get('gift_balance', '$0.00')}") + + # 获取用户详细信息 + self._get_user_profile(data.get('access_token')) + + return {"success": True, "data": data} + else: + print(f"{Fore.RED}[ERROR] 注册失败: {result.get('message', 'Unknown error')}") + print(f"{Fore.RED}[DEBUG] 完整响应: {json.dumps(result, ensure_ascii=False)}") + return {"success": False, "message": result.get('message')} + + except Exception as e: + print(f"{Fore.RED}[ERROR] 请求失败: {str(e)}") + return {"success": False, "message": str(e)} + + def _get_user_profile(self, access_token: str) -> Optional[Dict[str, Any]]: + """ + 获取用户详细信息 + + Args: + access_token: 访问令牌 + + Returns: + 用户信息字典 + """ + profile_url = f"{self.api_url}/account/profile" + + headers = self.session.headers.copy() + headers.update({ + 'authorization': access_token, + 'accept': 'application/json' + }) + + try: + response = self.session.post(profile_url, json={}, headers=headers) + result = response.json() + + if response.status_code == 200 and result.get("code") == 200: + profile = result.get("data", {}) + print(f"{Fore.GREEN}[INFO] 用户ID: {profile.get('user_id')}") + print(f"{Fore.GREEN}[INFO] 邮箱: {profile.get('email')}") + print(f"{Fore.GREEN}[INFO] 团队ID: {profile.get('team_id')}") + print(f"{Fore.GREEN}[INFO] 团队名称: {profile.get('team_name')}") + return profile + + except Exception as e: + print(f"{Fore.YELLOW}[WARNING] 获取用户信息失败: {str(e)}") + + return None + + def generate_random_password(self, length: int = 12) -> str: + """生成随机密码""" + characters = string.ascii_letters + string.digits + "!@#$%^&*" + return ''.join(random.choice(characters) for _ in range(length)) + + def auto_register(self, email: str, password: Optional[str] = None) -> Dict[str, Any]: + """ + 自动注册流程 + + Args: + email: 邮箱地址 + password: 密码(如果不提供则自动生成) + + Returns: + 注册结果字典 + """ + if not password: + password = self.generate_random_password() + print(f"{Fore.YELLOW}[INFO] 生成随机密码: {password}") + + # 保存注册信息 + print(f"{Fore.CYAN}{'='*60}") + print(f"{Fore.CYAN}注册信息:") + print(f"{Fore.CYAN}{'='*60}") + print(f" 📧 邮箱: {email}") + print(f" 🔑 密码: {password}") + print(f"{Fore.CYAN}{'='*60}\n") + + # 发送验证码 + if not self.send_verification_code(email): + return {"success": False, "message": "发送验证码失败"} + + # 等待用户输入验证码 + print(f"{Fore.CYAN}[INPUT] 请查看邮箱并输入验证码: ", end="") + verification_code = input().strip() + + # 执行注册 + result = self.register(email, password, verification_code) + + if result.get("success"): + # 保存账号信息到文件 + self._save_account_info(email, password, result.get("data", {})) + + return result + + def auto_register_with_temp_email(self) -> Dict[str, Any]: + """ + 使用临时邮箱全自动注册 + + Returns: + 注册结果字典 + """ + print(f"{Fore.CYAN}[INFO] 使用自建邮箱进行全自动注册...") + + # 创建邮箱管理器 + email_manager = EmailManager() + auto_verifier = AutoEmailVerification(email_manager) + + # 创建临时邮箱 + print(f"{Fore.CYAN}[INFO] 创建临时邮箱...") + email_info = auto_verifier.create_temp_email(domain="cursor.edu.kg") + + if not email_info.get("success"): + return {"success": False, "message": "创建临时邮箱失败"} + + email_address = email_info["email"] + email_password = email_info["password"] + + print(f"{Fore.GREEN}[INFO] 临时邮箱创建成功") + print(f"{Fore.GREEN}[INFO] 邮箱: {email_address}") + print(f"{Fore.GREEN}[INFO] 邮箱密码: {email_password}") + + # 生成DuoPlus账号密码 + duoplus_password = self.generate_random_password() + + # 保存注册信息 + print(f"\n{Fore.CYAN}{'='*60}") + print(f"{Fore.CYAN}DuoPlus 注册信息:") + print(f"{Fore.CYAN}{'='*60}") + print(f" 📧 邮箱: {email_address}") + print(f" 🔑 密码: {duoplus_password}") + print(f"{Fore.CYAN}{'='*60}\n") + + # 发送验证码 + if not self.send_verification_code(email_address): + return {"success": False, "message": "发送验证码失败"} + + # 自动获取验证码 + print(f"{Fore.CYAN}[INFO] 等待接收验证码...") + verification_code = auto_verifier.wait_for_code( + email_info, + sender_filter="duoplus", + timeout=60 + ) + + if not verification_code: + print(f"{Fore.YELLOW}[WARNING] 自动获取验证码失败,请手动输入") + print(f"{Fore.CYAN}[INPUT] 请输入验证码: ", end="") + verification_code = input().strip() + + # 执行注册 + result = self.register(email_address, duoplus_password, verification_code) + + if result.get("success"): + # 保存账号信息到文件,包括邮箱密码 + data = result.get("data", {}) + data["email_password"] = email_password # 保存邮箱密码 + self._save_account_info(email_address, duoplus_password, data) + + return result + + def _save_account_info(self, email: str, password: str, data: Dict[str, Any]): + """ + 保存账号信息到文件 + + Args: + email: 邮箱 + password: 密码 + data: 注册返回的数据 + """ + try: + import datetime + filename = "registered_accounts.txt" + + with open(filename, "a", encoding="utf-8") as f: + f.write(f"\n{'='*60}\n") + f.write(f"注册时间: {datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n") + f.write(f"邮箱: {email}\n") + f.write(f"密码: {password}\n") + f.write(f"Access Token: {data.get('access_token', 'N/A')}\n") + f.write(f"Refresh Token: {data.get('refresh_token', 'N/A')}\n") + f.write(f"Token 有效期: {data.get('expired_in', 0)} 秒\n") + f.write(f"赠送余额: {data.get('gift_balance', '$0.00')}\n") + + print(f"{Fore.GREEN}[INFO] 账号信息已保存到 {filename}") + + except Exception as e: + print(f"{Fore.YELLOW}[WARNING] 保存账号信息失败: {str(e)}") + +def main(): + # 加载环境变量 + load_dotenv() + + # 获取 API Key + captcha_api_key = os.getenv('CAPTCHA_API_KEY') + if not captcha_api_key: + print(f"{Fore.RED}[ERROR] 请在 .env 文件中设置 CAPTCHA_API_KEY") + return + + # 创建注册器 + registrar = DuoPlusRegister(captcha_api_key) + + # 获取用户输入 + print(f"{Fore.CYAN}=== DuoPlus 自动注册工具 ===") + email = input(f"{Fore.YELLOW}请输入邮箱地址: ").strip() + + use_random_password = input(f"{Fore.YELLOW}是否使用随机密码? (y/n): ").strip().lower() == 'y' + + if use_random_password: + password = None + else: + password = input(f"{Fore.YELLOW}请输入密码: ").strip() + + # 执行注册 + if registrar.auto_register(email, password): + print(f"{Fore.GREEN}[SUCCESS] 注册流程完成!") + else: + print(f"{Fore.RED}[FAILED] 注册失败!") + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/email_manager.py b/email_manager.py new file mode 100644 index 0000000..fd128ed --- /dev/null +++ b/email_manager.py @@ -0,0 +1,463 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + +""" +自建邮箱管理模块 +支持创建邮箱用户和IMAP收件 +使用 Stalwart API +""" + +import imaplib +import email +import time +import re +import random +import string +import secrets +from typing import Optional, List, Dict, Tuple +from datetime import datetime, timedelta +from colorama import init, Fore +from stalwart_client import StalwartClient +from exceptions import StalwartError, ApiError + +# 初始化 colorama +init(autoreset=True) + +class EmailManager: + """邮箱管理器""" + + def __init__(self, api_base_url: str = "https://mail.evnmail.com/api", + api_key: str = "admin:Hunter1520.", + imap_server: str = "mail.evnmail.com", + imap_port: int = 993): + """ + 初始化邮箱管理器 + + Args: + api_base_url: Stalwart API基础URL + api_key: API认证密钥 (格式: username:password) + imap_server: IMAP服务器地址 + imap_port: IMAP端口 + """ + self.api_base_url = api_base_url + self.api_key = api_key + self.imap_server = imap_server + self.imap_port = imap_port + + # 初始化 Stalwart 客户端 + self.client = StalwartClient(api_base_url, api_key=api_key) + + def generate_random_password(self, length: int = 12) -> str: + """ + 生成随机安全密码(只包含大小写字母和数字) + + Args: + length: 密码长度 + + Returns: + 随机密码 + """ + characters = string.ascii_letters + string.digits + password = [ + secrets.choice(string.ascii_uppercase), + secrets.choice(string.ascii_lowercase), + secrets.choice(string.digits), + ] + password.extend(secrets.choice(characters) for _ in range(length - 3)) + random.shuffle(password) + return ''.join(password) + + def generate_random_username(self, prefix: str = "user", length: int = 8) -> str: + """ + 生成随机用户名 + + Args: + prefix: 用户名前缀 + length: 随机部分长度 + + Returns: + 随机用户名 + """ + random_part = ''.join(random.choices(string.ascii_lowercase + string.digits, k=length)) + return f"{prefix}{random_part}" + + def create_email_account(self, username: Optional[str] = None, + domain: str = "cursor.edu.kg", + password: Optional[str] = None, + quota_mb: int = 10) -> Dict[str, str]: + """ + 创建邮箱账户 + + Args: + username: 用户名,如果为None则随机生成 + domain: 邮箱域名 + password: 密码,如果为None则随机生成 + quota_mb: 邮箱配额(MB) + + Returns: + 包含邮箱信息的字典 + """ + # 生成用户名和密码 + if not username: + username = self.generate_random_username() + if not password: + password = self.generate_random_password() + + email_address = f"{username}@{domain}" + quota_bytes = quota_mb * 1024 * 1024 + + print(f"{Fore.CYAN}[Email] 创建邮箱账户: {email_address}") + + # 准备用户数据 + user_data = { + "type": "individual", + "name": email_address, # 使用完整邮箱地址作为name + "description": f"{email_address} account", + "quota": quota_bytes, + "emails": [email_address], + "roles": ["user"], + "memberOf": [], + "secrets": [password] # 直接在创建时设置密码 + } + + try: + # 使用 Stalwart 客户端创建用户 + print(f"{Fore.YELLOW}[Email] 提交用户创建请求...") + response = self.client.create_principal(user_data) + + # 获取用户ID + user_id = response.get('data') if isinstance(response, dict) else response + + if user_id: + print(f"{Fore.GREEN}[Email] ✅ 邮箱创建成功!ID: {user_id}") + + # 验证用户信息 + try: + user_info = self.client.fetch_principal(user_id) + print(f"{Fore.GREEN}[Email] 验证完成") + except Exception as e: + print(f"{Fore.YELLOW}[Email] 验证失败,但用户已创建: {str(e)}") + + return { + "success": True, + "user_id": user_id, + "email": email_address, + "username": username, + "password": password, + "domain": domain, + "imap_server": self.imap_server, + "imap_port": self.imap_port + } + else: + print(f"{Fore.RED}[Email] 创建失败: 未返回用户ID") + return {"success": False, "message": "No user ID returned"} + + except ApiError as e: + print(f"{Fore.RED}[Email] API错误: {e}") + return {"success": False, "message": f"API Error: {str(e)}"} + except StalwartError as e: + print(f"{Fore.RED}[Email] Stalwart错误: {e}") + return {"success": False, "message": f"Stalwart Error: {str(e)}"} + except Exception as e: + print(f"{Fore.RED}[Email] 创建异常: {str(e)}") + return {"success": False, "message": str(e)} + + def connect_imap(self, email_address: str, password: str) -> Optional[imaplib.IMAP4_SSL]: + """ + 连接到IMAP服务器 + + Args: + email_address: 邮箱地址 + password: 密码 + + Returns: + IMAP连接对象 + """ + try: + print(f"{Fore.YELLOW}[IMAP] 连接到 {self.imap_server}:{self.imap_port}") + + # 连接到IMAP服务器 + imap = imaplib.IMAP4_SSL(self.imap_server, self.imap_port) + + # 登录 + imap.login(email_address, password) + print(f"{Fore.GREEN}[IMAP] 登录成功") + + return imap + + except imaplib.IMAP4.error as e: + print(f"{Fore.RED}[IMAP] 登录失败: {str(e)}") + return None + except Exception as e: + print(f"{Fore.RED}[IMAP] 连接失败: {str(e)}") + return None + + def fetch_verification_code(self, email_address: str, password: str, + sender_filter: str = "@duoplus", + timeout: int = 60, + check_interval: int = 5) -> Optional[str]: + """ + 从邮箱获取验证码 + + Args: + email_address: 邮箱地址 + password: 密码 + sender_filter: 发件人过滤条件 + timeout: 超时时间(秒) + check_interval: 检查间隔(秒) + + Returns: + 验证码字符串 + """ + print(f"{Fore.CYAN}[IMAP] 等待接收验证码邮件...") + + start_time = time.time() + + while time.time() - start_time < timeout: + imap = self.connect_imap(email_address, password) + if not imap: + time.sleep(check_interval) + continue + + try: + # 选择收件箱 + imap.select('INBOX') + + # 搜索未读邮件 + search_criteria = f'(UNSEEN FROM "{sender_filter}")' + result, data = imap.search(None, search_criteria) + + if result == 'OK' and data[0]: + email_ids = data[0].split() + + # 获取最新的邮件 + for email_id in reversed(email_ids): + result, data = imap.fetch(email_id, '(RFC822)') + + if result == 'OK': + raw_email = data[0][1] + msg = email.message_from_bytes(raw_email) + + # 获取邮件内容 + body = self._get_email_body(msg) + + # 提取验证码(通常是6位数字) + code_patterns = [ + r'验证码[::]\s*(\d{6})', + r'验证码是[::]\s*(\d{6})', + r'您的验证码是[::]\s*(\d{6})', + r'(\d{6})', # 直接匹配6位数字 + ] + + for pattern in code_patterns: + match = re.search(pattern, body) + if match: + code = match.group(1) + print(f"{Fore.GREEN}[IMAP] ✅ 获取到验证码: {code}") + + # 标记邮件为已读 + imap.store(email_id, '+FLAGS', '\\Seen') + + imap.close() + imap.logout() + return code + + imap.close() + imap.logout() + + except Exception as e: + print(f"{Fore.YELLOW}[IMAP] 检查邮件时出错: {str(e)}") + if imap: + try: + imap.logout() + except: + pass + + # 等待下次检查 + elapsed = int(time.time() - start_time) + remaining = timeout - elapsed + print(f"{Fore.YELLOW}[IMAP] 暂未收到验证码,{remaining}秒后超时...") + time.sleep(check_interval) + + print(f"{Fore.RED}[IMAP] 获取验证码超时") + return None + + def _get_email_body(self, msg) -> str: + """ + 提取邮件正文 + + Args: + msg: email.message对象 + + Returns: + 邮件正文文本 + """ + body = "" + + if msg.is_multipart(): + for part in msg.walk(): + content_type = part.get_content_type() + content_disposition = str(part.get("Content-Disposition")) + + if content_type == "text/plain" and "attachment" not in content_disposition: + try: + body = part.get_payload(decode=True).decode('utf-8', errors='ignore') + break + except: + pass + elif content_type == "text/html" and not body: + try: + html_body = part.get_payload(decode=True).decode('utf-8', errors='ignore') + # 简单移除HTML标签 + body = re.sub('<[^<]+?>', '', html_body) + except: + pass + else: + try: + body = msg.get_payload(decode=True).decode('utf-8', errors='ignore') + except: + body = str(msg.get_payload()) + + return body + + def get_recent_emails(self, email_address: str, password: str, + count: int = 5) -> List[Dict[str, str]]: + """ + 获取最近的邮件列表 + + Args: + email_address: 邮箱地址 + password: 密码 + count: 获取邮件数量 + + Returns: + 邮件列表 + """ + emails = [] + + imap = self.connect_imap(email_address, password) + if not imap: + return emails + + try: + # 选择收件箱 + imap.select('INBOX') + + # 搜索所有邮件 + result, data = imap.search(None, 'ALL') + + if result == 'OK' and data[0]: + email_ids = data[0].split() + + # 获取最近的几封邮件 + for email_id in reversed(email_ids[-count:]): + result, data = imap.fetch(email_id, '(RFC822)') + + if result == 'OK': + raw_email = data[0][1] + msg = email.message_from_bytes(raw_email) + + emails.append({ + "id": email_id.decode(), + "from": msg.get('From', ''), + "to": msg.get('To', ''), + "subject": msg.get('Subject', ''), + "date": msg.get('Date', ''), + "body": self._get_email_body(msg)[:200] + "..." + }) + + imap.close() + imap.logout() + + except Exception as e: + print(f"{Fore.RED}[IMAP] 获取邮件列表失败: {str(e)}") + if imap: + try: + imap.logout() + except: + pass + + return emails + + +class AutoEmailVerification: + """自动邮箱验证码获取""" + + def __init__(self, email_manager: EmailManager): + self.email_manager = email_manager + + def create_temp_email(self, domain: str = "cursor.edu.kg") -> Dict[str, str]: + """ + 创建临时邮箱 + + Args: + domain: 邮箱域名 + + Returns: + 邮箱信息 + """ + return self.email_manager.create_email_account(domain=domain) + + def wait_for_code(self, email_info: Dict[str, str], + sender_filter: str = "@duoplus", + timeout: int = 60) -> Optional[str]: + """ + 等待并获取验证码 + + Args: + email_info: 邮箱信息字典 + sender_filter: 发件人过滤 + timeout: 超时时间 + + Returns: + 验证码 + """ + if not email_info.get("success"): + print(f"{Fore.RED}[Auto] 邮箱信息无效") + return None + + return self.email_manager.fetch_verification_code( + email_info["email"], + email_info["password"], + sender_filter=sender_filter, + timeout=timeout + ) + + +# 测试函数 +def test_email_system(): + """测试邮箱系统""" + print(f"{Fore.CYAN}{'='*60}") + print(f"{Fore.CYAN}测试自建邮箱系统") + print(f"{Fore.CYAN}{'='*60}\n") + + # 创建邮箱管理器 + manager = EmailManager() + + # 创建测试邮箱 + email_info = manager.create_email_account( + username=None, # 自动生成 + domain="cursor.edu.kg", + password=None, # 自动生成 + quota_mb=10 + ) + + if email_info.get("success"): + print(f"\n{Fore.GREEN}邮箱创建成功!") + print(f"{Fore.GREEN}邮箱地址: {email_info['email']}") + print(f"{Fore.GREEN}密码: {email_info['password']}") + + # 测试IMAP连接 + print(f"\n{Fore.CYAN}测试IMAP连接...") + imap = manager.connect_imap(email_info['email'], email_info['password']) + if imap: + print(f"{Fore.GREEN}IMAP连接成功!") + imap.logout() + else: + print(f"{Fore.RED}IMAP连接失败") + else: + print(f"{Fore.RED}邮箱创建失败: {email_info.get('message')}") + + +if __name__ == "__main__": + test_email_system() \ No newline at end of file diff --git a/exceptions.py b/exceptions.py new file mode 100644 index 0000000..53fba37 --- /dev/null +++ b/exceptions.py @@ -0,0 +1,17 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +""" +自定义异常类 +""" + +class StalwartError(Exception): + """Stalwart 相关错误的基类""" + pass + +class ApiError(StalwartError): + """API 调用错误""" + def __init__(self, message, status_code=None, response=None): + super().__init__(message) + self.status_code = status_code + self.response = response \ No newline at end of file diff --git a/main.py b/main.py new file mode 100644 index 0000000..268a5ec --- /dev/null +++ b/main.py @@ -0,0 +1,19 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + +""" +DuoPlus 协议注册工具 - 主入口 +""" + +import os +import sys +from duoplus_register import main + +if __name__ == "__main__": + # 检查 Python 版本 + if sys.version_info < (3, 6): + print("错误:需要 Python 3.6 或更高版本") + sys.exit(1) + + # 运行主程序 + main() \ No newline at end of file diff --git a/network_inspector.py b/network_inspector.py new file mode 100644 index 0000000..137f49b --- /dev/null +++ b/network_inspector.py @@ -0,0 +1,216 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + +""" +DuoPlus 注册页面网络请求检查器 +用于分析和记录注册过程中的所有网络请求 +""" + +import requests +import json +from colorama import init, Fore, Style +import time + +# 初始化 colorama +init(autoreset=True) + +class NetworkInspector: + """网络请求检查器""" + + def __init__(self): + self.session = requests.Session() + self.base_url = "https://my.duoplus.cn" + self.api_url = "https://api.duoplus.cn" + + def inspect_registration_apis(self): + """检查注册相关的 API 端点""" + + print(f"{Fore.CYAN}{'='*60}") + print(f"{Fore.CYAN}DuoPlus 注册 API 端点检查") + print(f"{Fore.CYAN}{'='*60}\n") + + # 1. 检查腾讯验证码配置 API + self._check_captcha_config() + + # 2. 检查注册页面 + self._check_signup_page() + + # 3. 尝试获取其他可能的 API 端点 + self._check_common_apis() + + def _check_captcha_config(self): + """检查验证码配置 API""" + print(f"{Fore.YELLOW}[1] 检查验证码配置 API...") + + url = f"{self.api_url}/common/tencentConfig" + + headers = { + 'pragma': 'no-cache', + 'cache-control': 'no-cache', + 'sec-ch-ua-platform': '"Windows"', + 'authorization': '', + 'lang': 'zh-CN', + 'sec-ch-ua': '"Not)A;Brand";v="8", "Chromium";v="138", "Google Chrome";v="138"', + 'sec-ch-ua-mobile': '?0', + 'content-type': 'application/json', + 'duoplus-fp': 'e8014cf598dd4c021f2d08abef905801', + 'origin': 'https://my.duoplus.cn', + 'sec-fetch-site': 'same-site', + 'sec-fetch-mode': 'cors', + 'sec-fetch-dest': 'empty', + 'referer': 'https://my.duoplus.cn/', + 'accept-language': 'zh-CN,zh;q=0.9,en;q=0.8', + 'priority': 'u=1, i', + 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/138.0.0.0 Safari/537.36' + } + + try: + response = self.session.get(url, headers=headers) + print(f"{Fore.GREEN}URL: {url}") + print(f"{Fore.GREEN}Status: {response.status_code}") + + if response.status_code == 200: + data = response.json() + print(f"{Fore.GREEN}Response: {json.dumps(data, indent=2, ensure_ascii=False)}") + + if data.get('code') == 200: + captcha_app_id = data['data']['captcha_app_id'] + print(f"{Fore.CYAN}✓ Captcha App ID: {captcha_app_id}") + else: + print(f"{Fore.RED}Failed: HTTP {response.status_code}") + + except Exception as e: + print(f"{Fore.RED}Error: {str(e)}") + + print() + + def _check_signup_page(self): + """检查注册页面""" + print(f"{Fore.YELLOW}[2] 检查注册页面...") + + url = f"{self.base_url}/sign-up" + + try: + response = self.session.get(url) + print(f"{Fore.GREEN}URL: {url}") + print(f"{Fore.GREEN}Status: {response.status_code}") + + # 获取 cookies + cookies = self.session.cookies.get_dict() + if cookies: + print(f"{Fore.GREEN}Cookies received:") + for key, value in cookies.items(): + print(f" - {key}: {value[:20]}..." if len(value) > 20 else f" - {key}: {value}") + + except Exception as e: + print(f"{Fore.RED}Error: {str(e)}") + + print() + + def _check_common_apis(self): + """检查常见的 API 端点""" + print(f"{Fore.YELLOW}[3] 检查可能的 API 端点...") + + # 常见的注册相关 API 端点 + possible_endpoints = [ + f"{self.api_url}/auth/register", + f"{self.api_url}/auth/send-code", + f"{self.api_url}/auth/verify-code", + f"{self.api_url}/user/register", + f"{self.api_url}/account/register", + f"{self.api_url}/v1/auth/register", + f"{self.api_url}/api/auth/register", + ] + + headers = { + 'origin': 'https://my.duoplus.cn', + 'referer': 'https://my.duoplus.cn/', + 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/138.0.0.0 Safari/537.36' + } + + for endpoint in possible_endpoints: + try: + # 使用 OPTIONS 请求检查端点是否存在 + response = self.session.options(endpoint, headers=headers, timeout=3) + + if response.status_code in [200, 204, 405]: # 405 表示方法不允许但端点存在 + print(f"{Fore.GREEN}✓ Found: {endpoint} (Status: {response.status_code})") + + # 检查允许的方法 + allow_header = response.headers.get('Allow', '') + if allow_header: + print(f" Allowed methods: {allow_header}") + + # 检查 CORS 头 + cors_headers = { + 'Access-Control-Allow-Origin': response.headers.get('Access-Control-Allow-Origin'), + 'Access-Control-Allow-Methods': response.headers.get('Access-Control-Allow-Methods'), + 'Access-Control-Allow-Headers': response.headers.get('Access-Control-Allow-Headers') + } + + cors_info = [f"{k}: {v}" for k, v in cors_headers.items() if v] + if cors_info: + print(f" CORS: {', '.join(cors_info)}") + else: + print(f"{Fore.YELLOW}? Unknown: {endpoint} (Status: {response.status_code})") + + except requests.exceptions.Timeout: + print(f"{Fore.RED}✗ Timeout: {endpoint}") + except requests.exceptions.ConnectionError: + print(f"{Fore.RED}✗ Connection Error: {endpoint}") + except Exception as e: + print(f"{Fore.RED}✗ Error: {endpoint} - {str(e)}") + + time.sleep(0.5) # 避免请求过快 + + print() + + def analyze_register_request(self): + """分析注册请求的结构""" + print(f"{Fore.CYAN}{'='*60}") + print(f"{Fore.CYAN}预期的注册请求结构") + print(f"{Fore.CYAN}{'='*60}\n") + + sample_request = { + "email": "user@example.com", + "password": "password123", + "confirmPassword": "password123", + "verificationCode": "123456", + "inviteCode": "", + "agreeToTerms": True, + "ticket": "从腾讯验证码获取", + "randstr": "从腾讯验证码获取" + } + + print(f"{Fore.YELLOW}预期的注册请求体结构:") + print(json.dumps(sample_request, indent=2, ensure_ascii=False)) + + print(f"\n{Fore.YELLOW}关键请求头:") + key_headers = { + "Content-Type": "application/json", + "Origin": "https://my.duoplus.cn", + "Referer": "https://my.duoplus.cn/sign-up", + "duoplus-fp": "设备指纹(可能需要)", + "Authorization": "Bearer token(如果需要)" + } + + for header, value in key_headers.items(): + print(f" {header}: {value}") + +def main(): + """主函数""" + inspector = NetworkInspector() + + print(f"{Fore.CYAN}开始检查 DuoPlus 注册相关 API...\n") + + # 检查 API 端点 + inspector.inspect_registration_apis() + + # 显示预期的请求结构 + inspector.analyze_register_request() + + print(f"{Fore.GREEN}\n检查完成!") + print(f"{Fore.YELLOW}提示:使用浏览器开发者工具或代理工具(如 mitmproxy)可以获取更准确的请求信息。") + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/register_main.py b/register_main.py new file mode 100644 index 0000000..9bced74 --- /dev/null +++ b/register_main.py @@ -0,0 +1,262 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + +""" +DuoPlus 云手机协议注册 - 主程序 +完整的注册流程实现 +""" + +import os +import sys +import json +from dotenv import load_dotenv +from colorama import init, Fore, Style +from duoplus_register import DuoPlusRegister + +# 初始化 colorama +init(autoreset=True) + +def print_banner(): + """打印程序横幅""" + banner = """ + ╔══════════════════════════════════════════════════════════╗ + ║ DuoPlus 云手机协议注册工具 v1.0 ║ + ║ ║ + ║ 功能特点: ║ + ║ • 自动处理腾讯滑块验证码 ║ + ║ • 支持批量注册 ║ + ║ • 自动保存账号信息 ║ + ║ • 完整的错误处理 ║ + ╚══════════════════════════════════════════════════════════╝ + """ + print(f"{Fore.CYAN}{banner}") + +def check_environment(): + """检查环境配置""" + if not os.path.exists('.env'): + print(f"{Fore.YELLOW}[WARNING] 未找到 .env 文件") + + if os.path.exists('.env.example'): + print(f"{Fore.YELLOW}[INFO] 正在创建 .env 文件...") + import shutil + shutil.copy('.env.example', '.env') + print(f"{Fore.GREEN}[SUCCESS] 已创建 .env 文件") + print(f"{Fore.YELLOW}[ACTION] 请编辑 .env 文件,填入您的 2captcha API Key") + return False + else: + print(f"{Fore.RED}[ERROR] 未找到 .env.example 文件") + return False + + # 加载环境变量 + load_dotenv() + api_key = os.getenv('CAPTCHA_API_KEY') + + if not api_key or api_key == 'your_2captcha_api_key_here': + print(f"{Fore.RED}[ERROR] 请在 .env 文件中设置有效的 CAPTCHA_API_KEY") + return False + + return True + +def single_registration(): + """单个账号注册""" + print(f"\n{Fore.CYAN}=== 单个账号注册 ===") + + # 获取用户输入 + email_input = input(f"{Fore.YELLOW}请输入邮箱地址 (直接回车使用自建邮箱): ").strip() + + # 创建注册器 + api_key = os.getenv('CAPTCHA_API_KEY') + registrar = DuoPlusRegister(api_key) + + if not email_input: + # 空回车,使用自建邮箱全自动注册 + print(f"{Fore.CYAN}[INFO] 使用自建邮箱进行全自动注册...") + result = registrar.auto_register_with_temp_email() + + if result.get("success"): + print(f"\n{Fore.GREEN}{'='*60}") + print(f"{Fore.GREEN}✅ 全自动注册成功!") + print(f"{Fore.GREEN}{'='*60}") + + data = result.get("data", {}) + print(f"{Fore.GREEN}Access Token: {data.get('access_token', 'N/A')[:30]}...") + print(f"{Fore.GREEN}Token 有效期: {data.get('expired_in', 0) // 3600} 小时") + print(f"{Fore.GREEN}赠送余额: {data.get('gift_balance', '$0.00')}") + + # 显示邮箱密码(如果有) + if data.get('email_password'): + print(f"{Fore.GREEN}邮箱密码: {data.get('email_password')}") + print(f"{Fore.GREEN}{'='*60}") + else: + print(f"\n{Fore.RED}❌ 注册失败: {result.get('message', 'Unknown error')}") + else: + # 手动输入邮箱 + if '@' not in email_input: + print(f"{Fore.RED}[ERROR] 请输入有效的邮箱地址") + return + + email = email_input + + # 询问是否使用自定义密码 + use_custom_password = input(f"{Fore.YELLOW}是否使用自定义密码? (y/n, 默认n): ").strip().lower() == 'y' + + password = None + if use_custom_password: + password = input(f"{Fore.YELLOW}请输入密码: ").strip() + if len(password) < 8: + print(f"{Fore.RED}[ERROR] 密码长度至少8位") + return + + # 执行注册 + result = registrar.auto_register(email, password) + + if result.get("success"): + print(f"\n{Fore.GREEN}{'='*60}") + print(f"{Fore.GREEN}✅ 注册成功!") + print(f"{Fore.GREEN}{'='*60}") + + data = result.get("data", {}) + print(f"{Fore.GREEN}Access Token: {data.get('access_token', 'N/A')[:30]}...") + print(f"{Fore.GREEN}Token 有效期: {data.get('expired_in', 0) // 3600} 小时") + print(f"{Fore.GREEN}赠送余额: {data.get('gift_balance', '$0.00')}") + print(f"{Fore.GREEN}{'='*60}") + else: + print(f"\n{Fore.RED}❌ 注册失败: {result.get('message', 'Unknown error')}") + +def batch_registration(): + """批量注册""" + print(f"\n{Fore.CYAN}=== 批量注册 ===") + + # 获取邮箱前缀和域名 + email_prefix = input(f"{Fore.YELLOW}请输入邮箱前缀 (例如: test): ").strip() + email_domain = input(f"{Fore.YELLOW}请输入邮箱域名 (例如: example.com): ").strip() + + if not email_prefix or not email_domain: + print(f"{Fore.RED}[ERROR] 邮箱前缀和域名不能为空") + return + + count = input(f"{Fore.YELLOW}请输入注册数量 (默认3): ").strip() + count = int(count) if count.isdigit() else 3 + + if count > 10: + print(f"{Fore.YELLOW}[WARNING] 一次注册超过10个账号可能会被限制") + confirm = input(f"{Fore.YELLOW}是否继续? (y/n): ").strip().lower() + if confirm != 'y': + return + + # 创建注册器 + api_key = os.getenv('CAPTCHA_API_KEY') + registrar = DuoPlusRegister(api_key) + + success_count = 0 + failed_count = 0 + + print(f"\n{Fore.CYAN}开始批量注册 {count} 个账号...") + + for i in range(1, count + 1): + email = f"{email_prefix}{i}@{email_domain}" + print(f"\n{Fore.CYAN}[{i}/{count}] 正在注册: {email}") + + try: + result = registrar.auto_register(email) + + if result.get("success"): + success_count += 1 + print(f"{Fore.GREEN}✅ {email} 注册成功") + else: + failed_count += 1 + print(f"{Fore.RED}❌ {email} 注册失败: {result.get('message')}") + + except Exception as e: + failed_count += 1 + print(f"{Fore.RED}❌ {email} 注册异常: {str(e)}") + + # 避免请求过快 + if i < count: + import time + print(f"{Fore.YELLOW}等待3秒后继续...") + time.sleep(3) + + # 显示统计结果 + print(f"\n{Fore.CYAN}{'='*60}") + print(f"{Fore.CYAN}批量注册完成!") + print(f"{Fore.GREEN}成功: {success_count} 个") + print(f"{Fore.RED}失败: {failed_count} 个") + print(f"{Fore.CYAN}{'='*60}") + +def test_captcha(): + """测试验证码识别""" + print(f"\n{Fore.CYAN}=== 测试 2captcha 连接 ===") + + api_key = os.getenv('CAPTCHA_API_KEY') + + try: + import requests + response = requests.get(f"http://2captcha.com/res.php?key={api_key}&action=getbalance") + + if response.status_code == 200: + balance = response.text + try: + balance_float = float(balance) + print(f"{Fore.GREEN}✅ 2captcha 连接成功") + print(f"{Fore.GREEN}账户余额: ${balance}") + + if balance_float < 1: + print(f"{Fore.YELLOW}[WARNING] 余额较低,请及时充值") + + except ValueError: + print(f"{Fore.RED}❌ API Key 无效: {response.text}") + else: + print(f"{Fore.RED}❌ 连接失败: HTTP {response.status_code}") + + except Exception as e: + print(f"{Fore.RED}❌ 测试失败: {str(e)}") + +def main(): + """主函数""" + # 打印横幅 + print_banner() + + # 检查环境 + if not check_environment(): + print(f"\n{Fore.RED}请配置好环境后重新运行程序") + sys.exit(1) + + while True: + print(f"\n{Fore.CYAN}请选择操作:") + print("1. 单个账号注册") + print("2. 批量注册") + print("3. 测试 2captcha 连接") + print("4. 查看注册记录") + print("0. 退出") + + choice = input(f"\n{Fore.YELLOW}请输入选项 (0-4): ").strip() + + if choice == '1': + single_registration() + elif choice == '2': + batch_registration() + elif choice == '3': + test_captcha() + elif choice == '4': + if os.path.exists('registered_accounts.txt'): + print(f"\n{Fore.CYAN}=== 注册记录 ===") + with open('registered_accounts.txt', 'r', encoding='utf-8') as f: + print(f.read()) + else: + print(f"{Fore.YELLOW}暂无注册记录") + elif choice == '0': + print(f"{Fore.GREEN}感谢使用,再见!") + break + else: + print(f"{Fore.RED}无效的选项,请重新选择") + +if __name__ == "__main__": + try: + main() + except KeyboardInterrupt: + print(f"\n{Fore.YELLOW}程序已中断") + sys.exit(0) + except Exception as e: + print(f"\n{Fore.RED}程序异常: {str(e)}") + sys.exit(1) \ No newline at end of file diff --git a/registered_accounts.txt b/registered_accounts.txt new file mode 100644 index 0000000..254ebe8 --- /dev/null +++ b/registered_accounts.txt @@ -0,0 +1,9 @@ + +============================================================ +注册时间: 2025-08-12 14:16:56 +邮箱: userdu0qb6qd@cursor.edu.kg +密码: 8o4wN4z$Nodh +Access Token: dk9KdXQuWU66s40X_pdcjla6Om8UD_iKdRY +Refresh Token: dk9KdXQulkSQKXHoQD9MeB5hn_28TgTVCag +Token 有效期: 431999 秒 +赠送余额: $0.00 diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..5a4d1be --- /dev/null +++ b/requirements.txt @@ -0,0 +1,3 @@ +requests>=2.31.0 +python-dotenv>=1.0.0 +colorama>=0.4.6 \ No newline at end of file diff --git a/stalwart_client.py b/stalwart_client.py new file mode 100644 index 0000000..80377d5 --- /dev/null +++ b/stalwart_client.py @@ -0,0 +1,168 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +import requests +import json +import os +from typing import Dict, List, Any, Optional, Union + +# 禁用代理设置 +os.environ['HTTP_PROXY'] = '' +os.environ['HTTPS_PROXY'] = '' +os.environ['http_proxy'] = '' +os.environ['https_proxy'] = '' + +class StalwartClient: + """Stalwart API客户端""" + + def __init__(self, base_url: str, api_key: Optional[str] = None, oauth_token: Optional[str] = None): + """ + 初始化Stalwart API客户端 + + Args: + base_url: API基础URL,例如 https://mail.example.org/api + api_key: API密钥(可选) + oauth_token: OAuth令牌(可选) + """ + self.base_url = base_url.rstrip('/') + self.api_key = api_key + self.oauth_token = oauth_token + self.session = requests.Session() + + # 禁用环境代理设置 + self.session.trust_env = False + + # 如果提供了API密钥,设置基本认证 + if api_key: + if ':' in api_key: + username, password = api_key.split(':', 1) + self.session.auth = (username, password) + else: + # 如果API密钥格式不包含冒号,则使用Bearer认证 + self.session.headers.update({"Authorization": f"Bearer {api_key}"}) + + # 如果提供了OAuth令牌,设置Bearer认证 + if oauth_token: + self.session.headers.update({"Authorization": f"Bearer {oauth_token}"}) + + def _make_request(self, method: str, endpoint: str, **kwargs) -> Any: + """ + 发送API请求 + + Args: + method: HTTP方法(GET, POST, PATCH, DELETE等) + endpoint: API端点路径 + **kwargs: 传递给requests的其他参数 + + Returns: + 解析后的JSON响应或原始响应 + + Raises: + requests.HTTPError: 请求失败时引发 + """ + url = f"{self.base_url}/{endpoint.lstrip('/')}" + + # 确保默认接受JSON响应 + headers = kwargs.pop('headers', {}) + headers.setdefault('Accept', 'application/json') + + # 调试信息 + print(f"发送请求: {method} {url}") + + try: + response = self.session.request(method, url, headers=headers, **kwargs) + + # 调试信息 + print(f"响应状态码: {response.status_code}") + + # 检查请求是否成功 + response.raise_for_status() + + # 尝试解析JSON响应 + if response.headers.get('content-type', '').startswith('application/json'): + return response.json() + + return response + except requests.exceptions.HTTPError as e: + print(f"HTTP错误: {e}") + if hasattr(e.response, 'text'): + print(f"响应内容: {e.response.text}") + raise + except requests.exceptions.ConnectionError as e: + print(f"连接错误: {e}") + raise + except requests.exceptions.Timeout as e: + print(f"超时错误: {e}") + raise + except requests.exceptions.RequestException as e: + print(f"请求异常: {e}") + raise + + # -------------------- OAuth相关方法 -------------------- + + def obtain_oauth_token(self, client_id: str, redirect_uri: str, nonce: str) -> Dict: + """获取OAuth令牌""" + data = { + "type": "code", + "client_id": client_id, + "redirect_uri": redirect_uri, + "nonce": nonce + } + + return self._make_request('POST', '/oauth', json=data) + + # -------------------- 主体(Principal)相关方法 -------------------- + + def list_principals(self, page: Optional[int] = None, limit: Optional[int] = None, + types: Optional[str] = None) -> Dict: + """列出主体""" + params = {} + if page: + params['page'] = page + if limit: + params['limit'] = limit + if types: + params['types'] = types + + return self._make_request('GET', '/principal', params=params) + + def create_principal(self, principal_data: Dict) -> Dict: + """创建主体""" + return self._make_request('POST', '/principal', json=principal_data) + + def fetch_principal(self, principal_id: str) -> Dict: + """获取主体详情""" + return self._make_request('GET', f'/principal/{principal_id}') + + def update_principal(self, principal_id: str, updates: List[Dict]) -> Dict: + """更新主体""" + return self._make_request('PATCH', f'/principal/{principal_id}', json=updates) + + def delete_principal(self, principal_id: str) -> Dict: + """删除主体""" + return self._make_request('DELETE', f'/principal/{principal_id}') + + # -------------------- 队列相关方法 -------------------- + + def list_queued_messages(self, page: Optional[int] = None, max_total: Optional[int] = None, + limit: Optional[int] = None, values: Optional[int] = None) -> Dict: + """列出排队消息""" + params = {} + if page: + params['page'] = page + if max_total: + params['max-total'] = max_total + if limit: + params['limit'] = limit + if values: + params['values'] = values + + return self._make_request('GET', '/queue/messages', params=params) + +def disable_proxy(): + """禁用代理设置""" + import os + os.environ['HTTP_PROXY'] = '' + os.environ['HTTPS_PROXY'] = '' + os.environ['http_proxy'] = '' + os.environ['https_proxy'] = '' \ No newline at end of file diff --git a/tracker.py b/tracker.py new file mode 100644 index 0000000..38543ed --- /dev/null +++ b/tracker.py @@ -0,0 +1,214 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + +""" +DuoPlus 数据追踪模块 +处理 data4.net 的统计追踪请求 +""" + +import requests +import json +import time +import random +import string +from typing import Dict, Any, Optional +from colorama import init, Fore + +# 初始化 colorama +init(autoreset=True) + +class DataTracker: + """Data4.net 数据追踪器""" + + def __init__(self): + self.api_url = "https://api.data4.net/api" + self.website_id = "f487e5f1981a4c459c973fa4355b7fab" + self.session_id = self._generate_session_id() + self.uuid = self._generate_uuid() + self.visit_id = None + self.tracking_id = None + + # 设置请求头 + self.headers = { + 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/138.0.0.0 Safari/537.36', + 'Accept': '*/*', + 'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8', + 'Accept-Encoding': 'gzip, deflate, br, zstd', + 'Content-Type': 'application/json', + 'Origin': 'https://my.duoplus.cn', + 'Referer': 'https://my.duoplus.cn/', + 'sec-ch-ua': '"Not)A;Brand";v="8", "Chromium";v="138", "Google Chrome";v="138"', + 'sec-ch-ua-mobile': '?0', + 'sec-ch-ua-platform': '"Windows"', + 'sec-fetch-site': 'cross-site', + 'sec-fetch-mode': 'cors', + 'sec-fetch-dest': 'empty', + 'pragma': 'no-cache', + 'cache-control': 'no-cache', + 'priority': 'u=1, i' + } + + def _generate_session_id(self) -> str: + """生成会话ID""" + return ''.join(random.choices(string.ascii_lowercase + string.digits, k=32)) + + def _generate_uuid(self) -> str: + """生成UUID""" + return ''.join(random.choices(string.ascii_lowercase + string.digits, k=32)) + + def send_initial_event(self, page_url: str = "/sign-up") -> bool: + """ + 发送初始事件 + + Args: + page_url: 页面URL + + Returns: + 是否成功 + """ + url = f"{self.api_url}/report/send" + + payload = { + "type": "event", + "payload": { + "website": self.website_id, + "hostname": "my.duoplus.cn", + "screen": "2048x1152", + "language": "zh-CN", + "title": "DuoPlus 云手机", + "url": page_url, + "referrer": "my.duoplus.cn", + "session_id": self.session_id, + "uuid": self.uuid + } + } + + try: + response = requests.post(url, json=payload, headers=self.headers) + + if response.status_code == 200: + data = response.json() + if data.get("code") == 200: + self.session_id = data["data"]["session_id"] + self.visit_id = data["data"]["visit_id"] + self.tracking_id = data["data"]["id"] + print(f"{Fore.GREEN}[Tracker] 初始事件发送成功") + return True + + print(f"{Fore.YELLOW}[Tracker] 初始事件发送失败") + return False + + except Exception as e: + print(f"{Fore.YELLOW}[Tracker] 初始事件发送异常: {str(e)}") + return False + + def send_pageview(self, page_url: str = "/sign-up") -> bool: + """ + 发送页面浏览事件 + + Args: + page_url: 页面URL + + Returns: + 是否成功 + """ + url = f"{self.api_url}/report/event" + + payload = { + "payload": { + "website": self.website_id, + "hostname": "my.duoplus.cn", + "screen": "2048x1152", + "language": "zh-CN", + "title": "DuoPlus 云手机", + "url": page_url, + "referrer": "my.duoplus.cn", + "client_time": int(time.time()), + "scroll_depth_ratio": 0, + "session_id": self.session_id, + "uuid": self.uuid + }, + "type": "pageview" + } + + try: + response = requests.post(url, json=payload, headers=self.headers) + + if response.status_code == 200: + data = response.json() + if data.get("code") == 200: + print(f"{Fore.GREEN}[Tracker] 页面浏览事件发送成功") + return True + + print(f"{Fore.YELLOW}[Tracker] 页面浏览事件发送失败") + return False + + except Exception as e: + print(f"{Fore.YELLOW}[Tracker] 页面浏览事件发送异常: {str(e)}") + return False + + def send_view_time(self, view_time: int = 10) -> bool: + """ + 发送页面停留时间 + + Args: + view_time: 停留时间(秒) + + Returns: + 是否成功 + """ + if not self.visit_id or not self.tracking_id: + print(f"{Fore.YELLOW}[Tracker] 缺少必要的ID,跳过发送停留时间") + return False + + url = f"{self.api_url}/report/view_time" + + payload = { + "website_id": self.website_id, + "view_time": view_time, + "id": self.tracking_id, + "session_id": self.session_id, + "visit_id": self.visit_id + } + + try: + response = requests.post(url, json=payload, headers=self.headers) + + if response.status_code == 200: + data = response.json() + if data.get("code") == 200: + print(f"{Fore.GREEN}[Tracker] 停留时间发送成功: {view_time}秒") + return True + + print(f"{Fore.YELLOW}[Tracker] 停留时间发送失败") + return False + + except Exception as e: + print(f"{Fore.YELLOW}[Tracker] 停留时间发送异常: {str(e)}") + return False + + def track_registration_page(self) -> bool: + """ + 追踪注册页面访问 + + Returns: + 是否成功 + """ + print(f"{Fore.CYAN}[Tracker] 开始追踪注册页面...") + + # 发送初始事件 + if not self.send_initial_event("/sign-up"): + print(f"{Fore.YELLOW}[Tracker] 初始事件发送失败,继续执行") + + # 发送页面浏览事件 + time.sleep(0.5) + if not self.send_pageview("/sign-up"): + print(f"{Fore.YELLOW}[Tracker] 页面浏览事件发送失败,继续执行") + + # 模拟页面停留时间 + time.sleep(1) + if not self.send_view_time(5): + print(f"{Fore.YELLOW}[Tracker] 停留时间发送失败,继续执行") + + print(f"{Fore.GREEN}[Tracker] 页面追踪完成") + return True \ No newline at end of file