first commit: DuoPlus云手机协议注册工具 - 完整实现

This commit is contained in:
huangzhenpc
2025-08-12 14:33:24 +08:00
commit f9dcea2d06
15 changed files with 2314 additions and 0 deletions

48
.gitignore vendored Normal file
View File

@@ -0,0 +1,48 @@
# Python
__pycache__/
*.py[cod]
*$py.class
*.so
.Python
env/
venv/
ENV/
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
*.egg-info/
.installed.cfg
*.egg
# Environment
.env
.env.local
.env.*.local
# IDE
.vscode/
.idea/
*.swp
*.swo
*~
# Logs
*.log
logs/
# OS
.DS_Store
Thumbs.db
# Project specific
captured_requests.json
test_accounts.txt

124
QUICKSTART.md Normal file
View File

@@ -0,0 +1,124 @@
# DuoPlus 协议注册 - 快速开始指南
## 🚀 快速上手
### 1. 安装依赖
```bash
pip install -r requirements.txt
```
### 2. 配置 API Key
```bash
cp .env.example .env
# 编辑 .env 文件,填入你的 2captcha API Key
```
### 3. 运行注册
```bash
python main.py
# 或
python duoplus_register.py
```
## 📝 重要说明
### API 端点说明
当前脚本中的 API 端点是基于常见模式推测的,实际使用时可能需要:
1. **使用网络抓包工具分析实际请求**
```bash
# 安装 mitmproxy
pip install mitmproxy
# 运行分析脚本
mitmdump -s analyze_requests.py -p 8080
```
2. **需要更新的关键参数**
- 腾讯验证码 app_id在 `duoplus_register.py` 第 53 行)
- API 端点 URL
- 请求头参数
### 腾讯验证码处理流程
1. **初始化验证码**
- 获取验证码配置app_id
- 调用 2captcha API 提交任务
2. **等待识别结果**
- 2captcha 会返回 ticket 和 randstr
- 这两个参数用于验证
3. **提交验证结果**
- 将 ticket 和 randstr 随注册请求一起提交
## 🛠️ 调试技巧
### 1. 测试 2captcha 连接
```bash
python test_register.py
# 选择选项 1
```
### 2. 查看详细日志
脚本使用彩色输出显示不同类型的信息:
- 🟦 蓝色:信息
- 🟨 黄色:警告/输入提示
- 🟩 绿色:成功
- 🟥 红色:错误
### 3. 常见问题
**Q: 验证码识别失败**
- 检查 2captcha 余额
- 确认 API Key 正确
- 检查网络连接
**Q: 注册请求失败**
- 使用抓包工具获取最新的 API 端点
- 检查请求头是否完整
- 确认邮箱未被注册
## 📌 注意事项
1. **费用提醒**:每次验证码识别约需 $0.003
2. **频率限制**:避免短时间内大量注册
3. **邮箱要求**:使用真实可接收邮件的邮箱
4. **合规使用**:遵守服务条款,仅用于合法用途
## 💡 扩展使用
### 批量注册(示例)
```python
from duoplus_register import DuoPlusRegister
import os
from dotenv import load_dotenv
load_dotenv()
registrar = DuoPlusRegister(os.getenv('CAPTCHA_API_KEY'))
# 批量注册
emails = ["test1@example.com", "test2@example.com"]
for email in emails:
success = registrar.auto_register(email)
if success:
print(f"✅ {email} 注册成功")
else:
print(f"❌ {email} 注册失败")
```
## 🔧 项目文件说明
- `main.py` - 主入口文件
- `duoplus_register.py` - 核心注册逻辑
- `captcha_solver.py` - 2captcha 集成
- `test_register.py` - 测试工具
- `analyze_requests.py` - 网络请求分析工具
- `.env` - 配置文件(需自行创建)
---
如有问题,请查看 README.md 获取更详细的文档。

109
README.md Normal file
View File

@@ -0,0 +1,109 @@
# DuoPlus 协议注册工具
使用 Python 实现的 DuoPlus 自动注册工具,集成 2captcha 自动处理腾讯滑块验证码。
## 功能特点
- ✅ 协议注册(直接调用 API
- ✅ 自动处理腾讯滑块验证码
- ✅ 支持自定义密码或随机生成
- ✅ 彩色控制台输出
- ✅ 完整的错误处理
## 安装步骤
1. 克隆或下载项目
2. 安装依赖:
```bash
pip install -r requirements.txt
```
3. 配置 2captcha API Key
```bash
cp .env.example .env
```
然后编辑 `.env` 文件,填入你的 2captcha API Key
```
CAPTCHA_API_KEY=你的2captcha_api_key
```
## 使用方法
### 基础使用
运行注册脚本:
```bash
python duoplus_register.py
```
按照提示输入:
- 邮箱地址
- 是否使用随机密码
- 密码(如果选择自定义)
- 邮箱验证码
### 高级使用
你也可以直接在代码中调用:
```python
from duoplus_register import DuoPlusRegister
import os
from dotenv import load_dotenv
# 加载环境变量
load_dotenv()
# 创建注册器
registrar = DuoPlusRegister(os.getenv('CAPTCHA_API_KEY'))
# 执行注册
success = registrar.auto_register("your_email@example.com", "your_password")
```
## 注意事项
1. **验证码识别费用**:每次使用 2captcha 识别腾讯验证码都会产生费用(约 $0.003/次)
2. **邮箱要求**
- 必须是有效的邮箱地址
- 需要能够接收验证码
3. **API 端点**
- 当前脚本中的 API 端点是基于分析得出的
- 如果注册失败,可能需要使用网络抓包工具重新分析
## 项目结构
```
├── captcha_solver.py # 2captcha 验证码处理模块
├── duoplus_register.py # 主注册脚本
├── requirements.txt # Python 依赖
├── .env.example # 环境变量示例
├── .env # 实际配置文件(需自行创建)
└── README.md # 本文档
```
## 故障排查
1. **验证码识别失败**
- 检查 2captcha API Key 是否正确
- 检查 2captcha 账户余额
- 确认网络连接正常
2. **注册失败**
- 检查邮箱是否已被注册
- 确认验证码输入正确
- 查看控制台错误信息
3. **API 请求失败**
- 可能需要更新请求头或 API 端点
- 使用浏览器开发者工具分析实际请求
## 免责声明
本工具仅供学习和研究使用,请遵守相关服务条款。使用本工具产生的任何后果由使用者自行承担。
## 更新日志
- 2024-01-xx: 初始版本发布

80
analyze_requests.py Normal file
View File

@@ -0,0 +1,80 @@
import json
from mitmproxy import http
from mitmproxy.tools.main import mitmdump
import sys
from colorama import init, Fore, Style
# 初始化 colorama
init(autoreset=True)
class RequestAnalyzer:
"""分析 DuoPlus 注册相关的网络请求"""
def __init__(self):
self.target_domain = "my.duoplus.cn"
self.captured_requests = []
def request(self, flow: http.HTTPFlow) -> None:
"""处理请求"""
if self.target_domain in flow.request.pretty_host:
request_info = {
"method": flow.request.method,
"url": flow.request.pretty_url,
"headers": dict(flow.request.headers),
"body": None
}
# 尝试解析请求体
if flow.request.content:
try:
request_info["body"] = json.loads(flow.request.content.decode('utf-8'))
except:
request_info["body"] = flow.request.content.decode('utf-8', errors='ignore')
# 打印关键请求
if any(keyword in flow.request.pretty_url for keyword in ['auth', 'register', 'captcha', 'verify']):
print(f"\n{Fore.CYAN}[REQUEST] {flow.request.method} {flow.request.pretty_url}")
print(f"{Fore.YELLOW}Headers: {json.dumps(dict(flow.request.headers), indent=2, ensure_ascii=False)}")
if request_info["body"]:
print(f"{Fore.GREEN}Body: {json.dumps(request_info['body'], indent=2, ensure_ascii=False)}")
def response(self, flow: http.HTTPFlow) -> None:
"""处理响应"""
if self.target_domain in flow.request.pretty_host:
# 打印关键响应
if any(keyword in flow.request.pretty_url for keyword in ['auth', 'register', 'captcha', 'verify']):
print(f"\n{Fore.MAGENTA}[RESPONSE] {flow.response.status_code} for {flow.request.pretty_url}")
# 尝试解析响应体
if flow.response.content:
try:
response_body = json.loads(flow.response.content.decode('utf-8'))
print(f"{Fore.GREEN}Response: {json.dumps(response_body, indent=2, ensure_ascii=False)}")
except:
print(f"{Fore.RED}Response (raw): {flow.response.content.decode('utf-8', errors='ignore')[:200]}...")
# 创建分析器实例
analyzer = RequestAnalyzer()
def request(flow: http.HTTPFlow) -> None:
analyzer.request(flow)
def response(flow: http.HTTPFlow) -> None:
analyzer.response(flow)
if __name__ == "__main__":
print(f"""
{Fore.CYAN}=== DuoPlus 网络请求分析工具 ===
{Fore.YELLOW}使用方法:
1. 安装 mitmproxy: pip install mitmproxy
2. 运行代理: mitmdump -s analyze_requests.py -p 8080
3. 配置浏览器使用 127.0.0.1:8080 作为HTTP代理
4. 访问 https://my.duoplus.cn/sign-up 并执行注册操作
5. 查看控制台输出的请求和响应信息
{Fore.GREEN}提示:
- 关注包含 'auth', 'register', 'captcha', 'verify' 的请求
- 记录验证码相关的 app_id 和其他参数
- 保存完整的请求头和请求体格式
""")

127
captcha_solver.py Normal file
View File

@@ -0,0 +1,127 @@
import requests
import time
import json
from typing import Optional, Dict, Any
class TwoCaptchaSolver:
"""2captcha 腾讯滑块验证码处理器"""
def __init__(self, api_key: str):
self.api_key = api_key
self.base_url = "http://2captcha.com"
def solve_tencent_captcha(self, app_id: str, page_url: str, extra_params: Optional[Dict[str, Any]] = None) -> Optional[Dict[str, Any]]:
"""
解决腾讯滑块验证码
Args:
app_id: 腾讯验证码的 app_id
page_url: 验证码所在页面的 URL
extra_params: 额外的参数(如果需要)
Returns:
包含验证结果的字典,或 None如果失败
"""
# 提交验证码任务
submit_url = f"{self.base_url}/in.php"
submit_params = {
"key": self.api_key,
"method": "tencent",
"app_id": app_id,
"pageurl": page_url,
"json": 1
}
if extra_params:
submit_params.update(extra_params)
try:
print(f"[2captcha] 提交腾讯验证码任务...")
response = requests.post(submit_url, data=submit_params)
result = response.json()
if result.get("status") != 1:
print(f"[2captcha] 提交失败: {result.get('error_text', 'Unknown error')}")
return None
task_id = result.get("request")
print(f"[2captcha] 任务ID: {task_id}")
# 等待并获取结果
return self._get_result(task_id)
except Exception as e:
print(f"[2captcha] 错误: {str(e)}")
return None
def _get_result(self, task_id: str, max_attempts: int = 60, wait_time: int = 5) -> Optional[Dict[str, Any]]:
"""
获取验证码识别结果
Args:
task_id: 2captcha 任务ID
max_attempts: 最大尝试次数
wait_time: 每次尝试之间的等待时间(秒)
Returns:
验证结果字典
"""
result_url = f"{self.base_url}/res.php"
for attempt in range(max_attempts):
params = {
"key": self.api_key,
"action": "get",
"id": task_id,
"json": 1
}
try:
response = requests.get(result_url, params=params)
result = response.json()
if result.get("status") == 1:
print("[2captcha] 验证码识别成功")
# 返回腾讯验证码需要的参数
return {
"ticket": result.get("request", {}).get("ticket"),
"randstr": result.get("request", {}).get("randstr"),
"raw_response": result
}
elif result.get("request") == "CAPCHA_NOT_READY":
print(f"[2captcha] 等待识别中... ({attempt + 1}/{max_attempts})")
time.sleep(wait_time)
else:
print(f"[2captcha] 获取结果失败: {result.get('error_text', 'Unknown error')}")
return None
except Exception as e:
print(f"[2captcha] 获取结果时出错: {str(e)}")
time.sleep(wait_time)
print("[2captcha] 超时,未能获取验证码结果")
return None
def report_good(self, task_id: str):
"""报告验证码正确"""
try:
requests.get(f"{self.base_url}/res.php", params={
"key": self.api_key,
"action": "reportgood",
"id": task_id
})
print("[2captcha] 已报告验证码正确")
except:
pass
def report_bad(self, task_id: str):
"""报告验证码错误"""
try:
requests.get(f"{self.base_url}/res.php", params={
"key": self.api_key,
"action": "reportbad",
"id": task_id
})
print("[2captcha] 已报告验证码错误")
except:
pass

455
duoplus_register.py Normal file
View File

@@ -0,0 +1,455 @@
import requests
import json
import time
import random
import string
import hashlib
from typing import Optional, Dict, Any
from dotenv import load_dotenv
import os
from captcha_solver import TwoCaptchaSolver
from tracker import DataTracker
from email_manager import EmailManager, AutoEmailVerification
from colorama import init, Fore, Style
# 初始化 colorama
init(autoreset=True)
class DuoPlusRegister:
"""DuoPlus 协议注册类"""
def __init__(self, captcha_api_key: str):
self.session = requests.Session()
self.captcha_solver = TwoCaptchaSolver(captcha_api_key)
self.tracker = DataTracker() # 初始化数据追踪器
self.base_url = "https://my.duoplus.cn"
self.api_url = "https://api.duoplus.cn"
self.captcha_app_id = None
# 生成设备指纹
self.device_fp = self._generate_device_fp()
# 设置请求头
self.session.headers.update({
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/138.0.0.0 Safari/537.36',
'Accept': 'application/json, text/plain, */*',
'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8',
'Accept-Encoding': 'gzip, deflate, br',
'Origin': 'https://my.duoplus.cn',
'Referer': 'https://my.duoplus.cn/',
'Content-Type': 'application/json',
'sec-ch-ua': '"Not)A;Brand";v="8", "Chromium";v="138", "Google Chrome";v="138"',
'sec-ch-ua-mobile': '?0',
'sec-ch-ua-platform': '"Windows"',
'sec-fetch-site': 'same-site',
'sec-fetch-mode': 'cors',
'sec-fetch-dest': 'empty',
'pragma': 'no-cache',
'cache-control': 'no-cache',
'lang': 'zh-CN',
'duoplus-fp': self.device_fp,
'priority': 'u=1, i'
})
# 获取初始 cookies 和配置
self._init_session()
def _generate_device_fp(self) -> str:
"""生成设备指纹"""
# 简单的设备指纹生成,实际可能需要更复杂的算法
random_str = ''.join(random.choices(string.ascii_lowercase + string.digits, k=16))
return hashlib.md5(random_str.encode()).hexdigest()
def _init_session(self):
"""初始化会话,获取必要的 cookies 和配置"""
try:
# 先访问注册页面获取 cookies
response = self.session.get(f"{self.base_url}/sign-up")
print(f"{Fore.GREEN}[INFO] 初始化会话成功")
# 发送数据追踪请求(模拟真实浏览器行为)
self.tracker.track_registration_page()
# 获取腾讯验证码配置
self._get_captcha_config()
except Exception as e:
print(f"{Fore.RED}[ERROR] 初始化会话失败: {str(e)}")
def _get_captcha_config(self) -> Optional[Dict[str, Any]]:
"""
获取腾讯验证码配置信息
Returns:
验证码配置信息
"""
try:
# 调用 API 获取验证码配置
config_url = f"{self.api_url}/common/tencentConfig"
# 更新请求头中的 referer
headers = self.session.headers.copy()
headers['referer'] = 'https://my.duoplus.cn/'
headers['authorization'] = '' # 注册时可能不需要 authorization
response = self.session.get(config_url, headers=headers)
if response.status_code == 200:
data = response.json()
if data.get('code') == 200:
self.captcha_app_id = data['data']['captcha_app_id']
print(f"{Fore.GREEN}[INFO] 获取验证码配置成功app_id: {self.captcha_app_id}")
return {
"app_id": self.captcha_app_id,
"page_url": f"{self.base_url}/sign-up"
}
else:
print(f"{Fore.RED}[ERROR] 获取验证码配置失败: {data.get('message')}")
else:
print(f"{Fore.RED}[ERROR] 请求验证码配置失败: HTTP {response.status_code}")
except Exception as e:
print(f"{Fore.RED}[ERROR] 获取验证码配置异常: {str(e)}")
# 如果获取失败,返回默认值
return {
"app_id": "192789496", # 使用实际的 app_id 作为备份
"page_url": f"{self.base_url}/sign-up"
}
def send_verification_code(self, email: str) -> bool:
"""
发送验证码
Args:
email: 邮箱地址
Returns:
是否成功发送
"""
print(f"{Fore.CYAN}[INFO] 准备发送验证码到: {email}")
# 获取验证码配置
captcha_config = self._get_captcha_config()
if not captcha_config:
print(f"{Fore.RED}[ERROR] 无法获取验证码配置")
return False
# 使用 2captcha 解决验证码
print(f"{Fore.YELLOW}[INFO] 正在处理腾讯滑块验证码...")
captcha_result = self.captcha_solver.solve_tencent_captcha(
app_id=captcha_config["app_id"],
page_url=captcha_config["page_url"]
)
if not captcha_result:
print(f"{Fore.RED}[ERROR] 验证码处理失败")
return False
# 发送验证码请求 - 使用正确的 API 端点
send_code_url = f"{self.api_url}/email/verificationCode"
# 正确的请求体格式
payload = {
"type": 1, # 1 表示注册
"email": email,
"rand_str": captcha_result["randstr"],
"ticket": captcha_result["ticket"]
}
# 更新请求头
headers = self.session.headers.copy()
headers.update({
'referer': 'https://my.duoplus.cn/',
'authorization': '',
'accept': 'application/json'
})
try:
print(f"{Fore.YELLOW}[INFO] 发送请求到: {send_code_url}")
print(f"{Fore.YELLOW}[DEBUG] 请求体: {json.dumps(payload, ensure_ascii=False)}")
response = self.session.post(send_code_url, json=payload, headers=headers)
result = response.json()
if response.status_code == 200 and result.get("code") == 200:
print(f"{Fore.GREEN}[SUCCESS] 验证码发送成功: {result.get('message', 'Success')}")
return True
else:
print(f"{Fore.RED}[ERROR] 发送验证码失败: {result.get('message', 'Unknown error')}")
return False
except Exception as e:
print(f"{Fore.RED}[ERROR] 请求失败: {str(e)}")
return False
def register(self, email: str, password: str, verification_code: str) -> Dict[str, Any]:
"""
执行注册
Args:
email: 邮箱地址
password: 密码
verification_code: 邮箱验证码
Returns:
注册结果字典,包含 token 等信息
"""
print(f"{Fore.CYAN}[INFO] 开始注册账号...")
# 使用正确的注册端点
register_url = f"{self.api_url}/account/signup"
# 正确的请求体格式
payload = {
"username": email, # 使用邮箱作为用户名
"password": password,
"verification_code": verification_code,
"isAgress": True, # 同意条款
"from_landing": 0
}
# 更新请求头
headers = self.session.headers.copy()
headers.update({
'referer': 'https://my.duoplus.cn/',
'authorization': '',
'accept': 'application/json'
})
try:
print(f"{Fore.YELLOW}[INFO] 发送注册请求到: {register_url}")
print(f"{Fore.YELLOW}[DEBUG] 请求体: {json.dumps(payload, ensure_ascii=False)}")
response = self.session.post(register_url, json=payload, headers=headers)
result = response.json()
if response.status_code == 200 and result.get("code") == 200:
print(f"{Fore.GREEN}[SUCCESS] 注册成功!")
# 获取返回的数据
data = result.get("data", {})
if data:
print(f"{Fore.GREEN}[INFO] Access Token: {data.get('access_token')[:20]}...")
print(f"{Fore.GREEN}[INFO] Token 有效期: {data.get('expired_in')}")
print(f"{Fore.GREEN}[INFO] 赠送余额: {data.get('gift_balance', '$0.00')}")
# 获取用户详细信息
self._get_user_profile(data.get('access_token'))
return {"success": True, "data": data}
else:
print(f"{Fore.RED}[ERROR] 注册失败: {result.get('message', 'Unknown error')}")
print(f"{Fore.RED}[DEBUG] 完整响应: {json.dumps(result, ensure_ascii=False)}")
return {"success": False, "message": result.get('message')}
except Exception as e:
print(f"{Fore.RED}[ERROR] 请求失败: {str(e)}")
return {"success": False, "message": str(e)}
def _get_user_profile(self, access_token: str) -> Optional[Dict[str, Any]]:
"""
获取用户详细信息
Args:
access_token: 访问令牌
Returns:
用户信息字典
"""
profile_url = f"{self.api_url}/account/profile"
headers = self.session.headers.copy()
headers.update({
'authorization': access_token,
'accept': 'application/json'
})
try:
response = self.session.post(profile_url, json={}, headers=headers)
result = response.json()
if response.status_code == 200 and result.get("code") == 200:
profile = result.get("data", {})
print(f"{Fore.GREEN}[INFO] 用户ID: {profile.get('user_id')}")
print(f"{Fore.GREEN}[INFO] 邮箱: {profile.get('email')}")
print(f"{Fore.GREEN}[INFO] 团队ID: {profile.get('team_id')}")
print(f"{Fore.GREEN}[INFO] 团队名称: {profile.get('team_name')}")
return profile
except Exception as e:
print(f"{Fore.YELLOW}[WARNING] 获取用户信息失败: {str(e)}")
return None
def generate_random_password(self, length: int = 12) -> str:
"""生成随机密码"""
characters = string.ascii_letters + string.digits + "!@#$%^&*"
return ''.join(random.choice(characters) for _ in range(length))
def auto_register(self, email: str, password: Optional[str] = None) -> Dict[str, Any]:
"""
自动注册流程
Args:
email: 邮箱地址
password: 密码(如果不提供则自动生成)
Returns:
注册结果字典
"""
if not password:
password = self.generate_random_password()
print(f"{Fore.YELLOW}[INFO] 生成随机密码: {password}")
# 保存注册信息
print(f"{Fore.CYAN}{'='*60}")
print(f"{Fore.CYAN}注册信息:")
print(f"{Fore.CYAN}{'='*60}")
print(f" 📧 邮箱: {email}")
print(f" 🔑 密码: {password}")
print(f"{Fore.CYAN}{'='*60}\n")
# 发送验证码
if not self.send_verification_code(email):
return {"success": False, "message": "发送验证码失败"}
# 等待用户输入验证码
print(f"{Fore.CYAN}[INPUT] 请查看邮箱并输入验证码: ", end="")
verification_code = input().strip()
# 执行注册
result = self.register(email, password, verification_code)
if result.get("success"):
# 保存账号信息到文件
self._save_account_info(email, password, result.get("data", {}))
return result
def auto_register_with_temp_email(self) -> Dict[str, Any]:
"""
使用临时邮箱全自动注册
Returns:
注册结果字典
"""
print(f"{Fore.CYAN}[INFO] 使用自建邮箱进行全自动注册...")
# 创建邮箱管理器
email_manager = EmailManager()
auto_verifier = AutoEmailVerification(email_manager)
# 创建临时邮箱
print(f"{Fore.CYAN}[INFO] 创建临时邮箱...")
email_info = auto_verifier.create_temp_email(domain="cursor.edu.kg")
if not email_info.get("success"):
return {"success": False, "message": "创建临时邮箱失败"}
email_address = email_info["email"]
email_password = email_info["password"]
print(f"{Fore.GREEN}[INFO] 临时邮箱创建成功")
print(f"{Fore.GREEN}[INFO] 邮箱: {email_address}")
print(f"{Fore.GREEN}[INFO] 邮箱密码: {email_password}")
# 生成DuoPlus账号密码
duoplus_password = self.generate_random_password()
# 保存注册信息
print(f"\n{Fore.CYAN}{'='*60}")
print(f"{Fore.CYAN}DuoPlus 注册信息:")
print(f"{Fore.CYAN}{'='*60}")
print(f" 📧 邮箱: {email_address}")
print(f" 🔑 密码: {duoplus_password}")
print(f"{Fore.CYAN}{'='*60}\n")
# 发送验证码
if not self.send_verification_code(email_address):
return {"success": False, "message": "发送验证码失败"}
# 自动获取验证码
print(f"{Fore.CYAN}[INFO] 等待接收验证码...")
verification_code = auto_verifier.wait_for_code(
email_info,
sender_filter="duoplus",
timeout=60
)
if not verification_code:
print(f"{Fore.YELLOW}[WARNING] 自动获取验证码失败,请手动输入")
print(f"{Fore.CYAN}[INPUT] 请输入验证码: ", end="")
verification_code = input().strip()
# 执行注册
result = self.register(email_address, duoplus_password, verification_code)
if result.get("success"):
# 保存账号信息到文件,包括邮箱密码
data = result.get("data", {})
data["email_password"] = email_password # 保存邮箱密码
self._save_account_info(email_address, duoplus_password, data)
return result
def _save_account_info(self, email: str, password: str, data: Dict[str, Any]):
"""
保存账号信息到文件
Args:
email: 邮箱
password: 密码
data: 注册返回的数据
"""
try:
import datetime
filename = "registered_accounts.txt"
with open(filename, "a", encoding="utf-8") as f:
f.write(f"\n{'='*60}\n")
f.write(f"注册时间: {datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n")
f.write(f"邮箱: {email}\n")
f.write(f"密码: {password}\n")
f.write(f"Access Token: {data.get('access_token', 'N/A')}\n")
f.write(f"Refresh Token: {data.get('refresh_token', 'N/A')}\n")
f.write(f"Token 有效期: {data.get('expired_in', 0)}\n")
f.write(f"赠送余额: {data.get('gift_balance', '$0.00')}\n")
print(f"{Fore.GREEN}[INFO] 账号信息已保存到 {filename}")
except Exception as e:
print(f"{Fore.YELLOW}[WARNING] 保存账号信息失败: {str(e)}")
def main():
# 加载环境变量
load_dotenv()
# 获取 API Key
captcha_api_key = os.getenv('CAPTCHA_API_KEY')
if not captcha_api_key:
print(f"{Fore.RED}[ERROR] 请在 .env 文件中设置 CAPTCHA_API_KEY")
return
# 创建注册器
registrar = DuoPlusRegister(captcha_api_key)
# 获取用户输入
print(f"{Fore.CYAN}=== DuoPlus 自动注册工具 ===")
email = input(f"{Fore.YELLOW}请输入邮箱地址: ").strip()
use_random_password = input(f"{Fore.YELLOW}是否使用随机密码? (y/n): ").strip().lower() == 'y'
if use_random_password:
password = None
else:
password = input(f"{Fore.YELLOW}请输入密码: ").strip()
# 执行注册
if registrar.auto_register(email, password):
print(f"{Fore.GREEN}[SUCCESS] 注册流程完成!")
else:
print(f"{Fore.RED}[FAILED] 注册失败!")
if __name__ == "__main__":
main()

463
email_manager.py Normal file
View File

@@ -0,0 +1,463 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
自建邮箱管理模块
支持创建邮箱用户和IMAP收件
使用 Stalwart API
"""
import imaplib
import email
import time
import re
import random
import string
import secrets
from typing import Optional, List, Dict, Tuple
from datetime import datetime, timedelta
from colorama import init, Fore
from stalwart_client import StalwartClient
from exceptions import StalwartError, ApiError
# 初始化 colorama
init(autoreset=True)
class EmailManager:
"""邮箱管理器"""
def __init__(self, api_base_url: str = "https://mail.evnmail.com/api",
api_key: str = "admin:Hunter1520.",
imap_server: str = "mail.evnmail.com",
imap_port: int = 993):
"""
初始化邮箱管理器
Args:
api_base_url: Stalwart API基础URL
api_key: API认证密钥 (格式: username:password)
imap_server: IMAP服务器地址
imap_port: IMAP端口
"""
self.api_base_url = api_base_url
self.api_key = api_key
self.imap_server = imap_server
self.imap_port = imap_port
# 初始化 Stalwart 客户端
self.client = StalwartClient(api_base_url, api_key=api_key)
def generate_random_password(self, length: int = 12) -> str:
"""
生成随机安全密码(只包含大小写字母和数字)
Args:
length: 密码长度
Returns:
随机密码
"""
characters = string.ascii_letters + string.digits
password = [
secrets.choice(string.ascii_uppercase),
secrets.choice(string.ascii_lowercase),
secrets.choice(string.digits),
]
password.extend(secrets.choice(characters) for _ in range(length - 3))
random.shuffle(password)
return ''.join(password)
def generate_random_username(self, prefix: str = "user", length: int = 8) -> str:
"""
生成随机用户名
Args:
prefix: 用户名前缀
length: 随机部分长度
Returns:
随机用户名
"""
random_part = ''.join(random.choices(string.ascii_lowercase + string.digits, k=length))
return f"{prefix}{random_part}"
def create_email_account(self, username: Optional[str] = None,
domain: str = "cursor.edu.kg",
password: Optional[str] = None,
quota_mb: int = 10) -> Dict[str, str]:
"""
创建邮箱账户
Args:
username: 用户名如果为None则随机生成
domain: 邮箱域名
password: 密码如果为None则随机生成
quota_mb: 邮箱配额MB
Returns:
包含邮箱信息的字典
"""
# 生成用户名和密码
if not username:
username = self.generate_random_username()
if not password:
password = self.generate_random_password()
email_address = f"{username}@{domain}"
quota_bytes = quota_mb * 1024 * 1024
print(f"{Fore.CYAN}[Email] 创建邮箱账户: {email_address}")
# 准备用户数据
user_data = {
"type": "individual",
"name": email_address, # 使用完整邮箱地址作为name
"description": f"{email_address} account",
"quota": quota_bytes,
"emails": [email_address],
"roles": ["user"],
"memberOf": [],
"secrets": [password] # 直接在创建时设置密码
}
try:
# 使用 Stalwart 客户端创建用户
print(f"{Fore.YELLOW}[Email] 提交用户创建请求...")
response = self.client.create_principal(user_data)
# 获取用户ID
user_id = response.get('data') if isinstance(response, dict) else response
if user_id:
print(f"{Fore.GREEN}[Email] ✅ 邮箱创建成功ID: {user_id}")
# 验证用户信息
try:
user_info = self.client.fetch_principal(user_id)
print(f"{Fore.GREEN}[Email] 验证完成")
except Exception as e:
print(f"{Fore.YELLOW}[Email] 验证失败,但用户已创建: {str(e)}")
return {
"success": True,
"user_id": user_id,
"email": email_address,
"username": username,
"password": password,
"domain": domain,
"imap_server": self.imap_server,
"imap_port": self.imap_port
}
else:
print(f"{Fore.RED}[Email] 创建失败: 未返回用户ID")
return {"success": False, "message": "No user ID returned"}
except ApiError as e:
print(f"{Fore.RED}[Email] API错误: {e}")
return {"success": False, "message": f"API Error: {str(e)}"}
except StalwartError as e:
print(f"{Fore.RED}[Email] Stalwart错误: {e}")
return {"success": False, "message": f"Stalwart Error: {str(e)}"}
except Exception as e:
print(f"{Fore.RED}[Email] 创建异常: {str(e)}")
return {"success": False, "message": str(e)}
def connect_imap(self, email_address: str, password: str) -> Optional[imaplib.IMAP4_SSL]:
"""
连接到IMAP服务器
Args:
email_address: 邮箱地址
password: 密码
Returns:
IMAP连接对象
"""
try:
print(f"{Fore.YELLOW}[IMAP] 连接到 {self.imap_server}:{self.imap_port}")
# 连接到IMAP服务器
imap = imaplib.IMAP4_SSL(self.imap_server, self.imap_port)
# 登录
imap.login(email_address, password)
print(f"{Fore.GREEN}[IMAP] 登录成功")
return imap
except imaplib.IMAP4.error as e:
print(f"{Fore.RED}[IMAP] 登录失败: {str(e)}")
return None
except Exception as e:
print(f"{Fore.RED}[IMAP] 连接失败: {str(e)}")
return None
def fetch_verification_code(self, email_address: str, password: str,
sender_filter: str = "@duoplus",
timeout: int = 60,
check_interval: int = 5) -> Optional[str]:
"""
从邮箱获取验证码
Args:
email_address: 邮箱地址
password: 密码
sender_filter: 发件人过滤条件
timeout: 超时时间(秒)
check_interval: 检查间隔(秒)
Returns:
验证码字符串
"""
print(f"{Fore.CYAN}[IMAP] 等待接收验证码邮件...")
start_time = time.time()
while time.time() - start_time < timeout:
imap = self.connect_imap(email_address, password)
if not imap:
time.sleep(check_interval)
continue
try:
# 选择收件箱
imap.select('INBOX')
# 搜索未读邮件
search_criteria = f'(UNSEEN FROM "{sender_filter}")'
result, data = imap.search(None, search_criteria)
if result == 'OK' and data[0]:
email_ids = data[0].split()
# 获取最新的邮件
for email_id in reversed(email_ids):
result, data = imap.fetch(email_id, '(RFC822)')
if result == 'OK':
raw_email = data[0][1]
msg = email.message_from_bytes(raw_email)
# 获取邮件内容
body = self._get_email_body(msg)
# 提取验证码通常是6位数字
code_patterns = [
r'验证码[:]\s*(\d{6})',
r'验证码是[:]\s*(\d{6})',
r'您的验证码是[:]\s*(\d{6})',
r'(\d{6})', # 直接匹配6位数字
]
for pattern in code_patterns:
match = re.search(pattern, body)
if match:
code = match.group(1)
print(f"{Fore.GREEN}[IMAP] ✅ 获取到验证码: {code}")
# 标记邮件为已读
imap.store(email_id, '+FLAGS', '\\Seen')
imap.close()
imap.logout()
return code
imap.close()
imap.logout()
except Exception as e:
print(f"{Fore.YELLOW}[IMAP] 检查邮件时出错: {str(e)}")
if imap:
try:
imap.logout()
except:
pass
# 等待下次检查
elapsed = int(time.time() - start_time)
remaining = timeout - elapsed
print(f"{Fore.YELLOW}[IMAP] 暂未收到验证码,{remaining}秒后超时...")
time.sleep(check_interval)
print(f"{Fore.RED}[IMAP] 获取验证码超时")
return None
def _get_email_body(self, msg) -> str:
"""
提取邮件正文
Args:
msg: email.message对象
Returns:
邮件正文文本
"""
body = ""
if msg.is_multipart():
for part in msg.walk():
content_type = part.get_content_type()
content_disposition = str(part.get("Content-Disposition"))
if content_type == "text/plain" and "attachment" not in content_disposition:
try:
body = part.get_payload(decode=True).decode('utf-8', errors='ignore')
break
except:
pass
elif content_type == "text/html" and not body:
try:
html_body = part.get_payload(decode=True).decode('utf-8', errors='ignore')
# 简单移除HTML标签
body = re.sub('<[^<]+?>', '', html_body)
except:
pass
else:
try:
body = msg.get_payload(decode=True).decode('utf-8', errors='ignore')
except:
body = str(msg.get_payload())
return body
def get_recent_emails(self, email_address: str, password: str,
count: int = 5) -> List[Dict[str, str]]:
"""
获取最近的邮件列表
Args:
email_address: 邮箱地址
password: 密码
count: 获取邮件数量
Returns:
邮件列表
"""
emails = []
imap = self.connect_imap(email_address, password)
if not imap:
return emails
try:
# 选择收件箱
imap.select('INBOX')
# 搜索所有邮件
result, data = imap.search(None, 'ALL')
if result == 'OK' and data[0]:
email_ids = data[0].split()
# 获取最近的几封邮件
for email_id in reversed(email_ids[-count:]):
result, data = imap.fetch(email_id, '(RFC822)')
if result == 'OK':
raw_email = data[0][1]
msg = email.message_from_bytes(raw_email)
emails.append({
"id": email_id.decode(),
"from": msg.get('From', ''),
"to": msg.get('To', ''),
"subject": msg.get('Subject', ''),
"date": msg.get('Date', ''),
"body": self._get_email_body(msg)[:200] + "..."
})
imap.close()
imap.logout()
except Exception as e:
print(f"{Fore.RED}[IMAP] 获取邮件列表失败: {str(e)}")
if imap:
try:
imap.logout()
except:
pass
return emails
class AutoEmailVerification:
"""自动邮箱验证码获取"""
def __init__(self, email_manager: EmailManager):
self.email_manager = email_manager
def create_temp_email(self, domain: str = "cursor.edu.kg") -> Dict[str, str]:
"""
创建临时邮箱
Args:
domain: 邮箱域名
Returns:
邮箱信息
"""
return self.email_manager.create_email_account(domain=domain)
def wait_for_code(self, email_info: Dict[str, str],
sender_filter: str = "@duoplus",
timeout: int = 60) -> Optional[str]:
"""
等待并获取验证码
Args:
email_info: 邮箱信息字典
sender_filter: 发件人过滤
timeout: 超时时间
Returns:
验证码
"""
if not email_info.get("success"):
print(f"{Fore.RED}[Auto] 邮箱信息无效")
return None
return self.email_manager.fetch_verification_code(
email_info["email"],
email_info["password"],
sender_filter=sender_filter,
timeout=timeout
)
# 测试函数
def test_email_system():
"""测试邮箱系统"""
print(f"{Fore.CYAN}{'='*60}")
print(f"{Fore.CYAN}测试自建邮箱系统")
print(f"{Fore.CYAN}{'='*60}\n")
# 创建邮箱管理器
manager = EmailManager()
# 创建测试邮箱
email_info = manager.create_email_account(
username=None, # 自动生成
domain="cursor.edu.kg",
password=None, # 自动生成
quota_mb=10
)
if email_info.get("success"):
print(f"\n{Fore.GREEN}邮箱创建成功!")
print(f"{Fore.GREEN}邮箱地址: {email_info['email']}")
print(f"{Fore.GREEN}密码: {email_info['password']}")
# 测试IMAP连接
print(f"\n{Fore.CYAN}测试IMAP连接...")
imap = manager.connect_imap(email_info['email'], email_info['password'])
if imap:
print(f"{Fore.GREEN}IMAP连接成功")
imap.logout()
else:
print(f"{Fore.RED}IMAP连接失败")
else:
print(f"{Fore.RED}邮箱创建失败: {email_info.get('message')}")
if __name__ == "__main__":
test_email_system()

17
exceptions.py Normal file
View File

@@ -0,0 +1,17 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
自定义异常类
"""
class StalwartError(Exception):
"""Stalwart 相关错误的基类"""
pass
class ApiError(StalwartError):
"""API 调用错误"""
def __init__(self, message, status_code=None, response=None):
super().__init__(message)
self.status_code = status_code
self.response = response

19
main.py Normal file
View File

@@ -0,0 +1,19 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
DuoPlus 协议注册工具 - 主入口
"""
import os
import sys
from duoplus_register import main
if __name__ == "__main__":
# 检查 Python 版本
if sys.version_info < (3, 6):
print("错误:需要 Python 3.6 或更高版本")
sys.exit(1)
# 运行主程序
main()

216
network_inspector.py Normal file
View File

@@ -0,0 +1,216 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
DuoPlus 注册页面网络请求检查器
用于分析和记录注册过程中的所有网络请求
"""
import requests
import json
from colorama import init, Fore, Style
import time
# 初始化 colorama
init(autoreset=True)
class NetworkInspector:
"""网络请求检查器"""
def __init__(self):
self.session = requests.Session()
self.base_url = "https://my.duoplus.cn"
self.api_url = "https://api.duoplus.cn"
def inspect_registration_apis(self):
"""检查注册相关的 API 端点"""
print(f"{Fore.CYAN}{'='*60}")
print(f"{Fore.CYAN}DuoPlus 注册 API 端点检查")
print(f"{Fore.CYAN}{'='*60}\n")
# 1. 检查腾讯验证码配置 API
self._check_captcha_config()
# 2. 检查注册页面
self._check_signup_page()
# 3. 尝试获取其他可能的 API 端点
self._check_common_apis()
def _check_captcha_config(self):
"""检查验证码配置 API"""
print(f"{Fore.YELLOW}[1] 检查验证码配置 API...")
url = f"{self.api_url}/common/tencentConfig"
headers = {
'pragma': 'no-cache',
'cache-control': 'no-cache',
'sec-ch-ua-platform': '"Windows"',
'authorization': '',
'lang': 'zh-CN',
'sec-ch-ua': '"Not)A;Brand";v="8", "Chromium";v="138", "Google Chrome";v="138"',
'sec-ch-ua-mobile': '?0',
'content-type': 'application/json',
'duoplus-fp': 'e8014cf598dd4c021f2d08abef905801',
'origin': 'https://my.duoplus.cn',
'sec-fetch-site': 'same-site',
'sec-fetch-mode': 'cors',
'sec-fetch-dest': 'empty',
'referer': 'https://my.duoplus.cn/',
'accept-language': 'zh-CN,zh;q=0.9,en;q=0.8',
'priority': 'u=1, i',
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/138.0.0.0 Safari/537.36'
}
try:
response = self.session.get(url, headers=headers)
print(f"{Fore.GREEN}URL: {url}")
print(f"{Fore.GREEN}Status: {response.status_code}")
if response.status_code == 200:
data = response.json()
print(f"{Fore.GREEN}Response: {json.dumps(data, indent=2, ensure_ascii=False)}")
if data.get('code') == 200:
captcha_app_id = data['data']['captcha_app_id']
print(f"{Fore.CYAN}✓ Captcha App ID: {captcha_app_id}")
else:
print(f"{Fore.RED}Failed: HTTP {response.status_code}")
except Exception as e:
print(f"{Fore.RED}Error: {str(e)}")
print()
def _check_signup_page(self):
"""检查注册页面"""
print(f"{Fore.YELLOW}[2] 检查注册页面...")
url = f"{self.base_url}/sign-up"
try:
response = self.session.get(url)
print(f"{Fore.GREEN}URL: {url}")
print(f"{Fore.GREEN}Status: {response.status_code}")
# 获取 cookies
cookies = self.session.cookies.get_dict()
if cookies:
print(f"{Fore.GREEN}Cookies received:")
for key, value in cookies.items():
print(f" - {key}: {value[:20]}..." if len(value) > 20 else f" - {key}: {value}")
except Exception as e:
print(f"{Fore.RED}Error: {str(e)}")
print()
def _check_common_apis(self):
"""检查常见的 API 端点"""
print(f"{Fore.YELLOW}[3] 检查可能的 API 端点...")
# 常见的注册相关 API 端点
possible_endpoints = [
f"{self.api_url}/auth/register",
f"{self.api_url}/auth/send-code",
f"{self.api_url}/auth/verify-code",
f"{self.api_url}/user/register",
f"{self.api_url}/account/register",
f"{self.api_url}/v1/auth/register",
f"{self.api_url}/api/auth/register",
]
headers = {
'origin': 'https://my.duoplus.cn',
'referer': 'https://my.duoplus.cn/',
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/138.0.0.0 Safari/537.36'
}
for endpoint in possible_endpoints:
try:
# 使用 OPTIONS 请求检查端点是否存在
response = self.session.options(endpoint, headers=headers, timeout=3)
if response.status_code in [200, 204, 405]: # 405 表示方法不允许但端点存在
print(f"{Fore.GREEN}✓ Found: {endpoint} (Status: {response.status_code})")
# 检查允许的方法
allow_header = response.headers.get('Allow', '')
if allow_header:
print(f" Allowed methods: {allow_header}")
# 检查 CORS 头
cors_headers = {
'Access-Control-Allow-Origin': response.headers.get('Access-Control-Allow-Origin'),
'Access-Control-Allow-Methods': response.headers.get('Access-Control-Allow-Methods'),
'Access-Control-Allow-Headers': response.headers.get('Access-Control-Allow-Headers')
}
cors_info = [f"{k}: {v}" for k, v in cors_headers.items() if v]
if cors_info:
print(f" CORS: {', '.join(cors_info)}")
else:
print(f"{Fore.YELLOW}? Unknown: {endpoint} (Status: {response.status_code})")
except requests.exceptions.Timeout:
print(f"{Fore.RED}✗ Timeout: {endpoint}")
except requests.exceptions.ConnectionError:
print(f"{Fore.RED}✗ Connection Error: {endpoint}")
except Exception as e:
print(f"{Fore.RED}✗ Error: {endpoint} - {str(e)}")
time.sleep(0.5) # 避免请求过快
print()
def analyze_register_request(self):
"""分析注册请求的结构"""
print(f"{Fore.CYAN}{'='*60}")
print(f"{Fore.CYAN}预期的注册请求结构")
print(f"{Fore.CYAN}{'='*60}\n")
sample_request = {
"email": "user@example.com",
"password": "password123",
"confirmPassword": "password123",
"verificationCode": "123456",
"inviteCode": "",
"agreeToTerms": True,
"ticket": "从腾讯验证码获取",
"randstr": "从腾讯验证码获取"
}
print(f"{Fore.YELLOW}预期的注册请求体结构:")
print(json.dumps(sample_request, indent=2, ensure_ascii=False))
print(f"\n{Fore.YELLOW}关键请求头:")
key_headers = {
"Content-Type": "application/json",
"Origin": "https://my.duoplus.cn",
"Referer": "https://my.duoplus.cn/sign-up",
"duoplus-fp": "设备指纹(可能需要)",
"Authorization": "Bearer token如果需要"
}
for header, value in key_headers.items():
print(f" {header}: {value}")
def main():
"""主函数"""
inspector = NetworkInspector()
print(f"{Fore.CYAN}开始检查 DuoPlus 注册相关 API...\n")
# 检查 API 端点
inspector.inspect_registration_apis()
# 显示预期的请求结构
inspector.analyze_register_request()
print(f"{Fore.GREEN}\n检查完成!")
print(f"{Fore.YELLOW}提示:使用浏览器开发者工具或代理工具(如 mitmproxy可以获取更准确的请求信息。")
if __name__ == "__main__":
main()

262
register_main.py Normal file
View File

@@ -0,0 +1,262 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
DuoPlus 云手机协议注册 - 主程序
完整的注册流程实现
"""
import os
import sys
import json
from dotenv import load_dotenv
from colorama import init, Fore, Style
from duoplus_register import DuoPlusRegister
# 初始化 colorama
init(autoreset=True)
def print_banner():
"""打印程序横幅"""
banner = """
╔══════════════════════════════════════════════════════════╗
║ DuoPlus 云手机协议注册工具 v1.0 ║
║ ║
║ 功能特点: ║
║ • 自动处理腾讯滑块验证码 ║
║ • 支持批量注册 ║
║ • 自动保存账号信息 ║
║ • 完整的错误处理 ║
╚══════════════════════════════════════════════════════════╝
"""
print(f"{Fore.CYAN}{banner}")
def check_environment():
"""检查环境配置"""
if not os.path.exists('.env'):
print(f"{Fore.YELLOW}[WARNING] 未找到 .env 文件")
if os.path.exists('.env.example'):
print(f"{Fore.YELLOW}[INFO] 正在创建 .env 文件...")
import shutil
shutil.copy('.env.example', '.env')
print(f"{Fore.GREEN}[SUCCESS] 已创建 .env 文件")
print(f"{Fore.YELLOW}[ACTION] 请编辑 .env 文件,填入您的 2captcha API Key")
return False
else:
print(f"{Fore.RED}[ERROR] 未找到 .env.example 文件")
return False
# 加载环境变量
load_dotenv()
api_key = os.getenv('CAPTCHA_API_KEY')
if not api_key or api_key == 'your_2captcha_api_key_here':
print(f"{Fore.RED}[ERROR] 请在 .env 文件中设置有效的 CAPTCHA_API_KEY")
return False
return True
def single_registration():
"""单个账号注册"""
print(f"\n{Fore.CYAN}=== 单个账号注册 ===")
# 获取用户输入
email_input = input(f"{Fore.YELLOW}请输入邮箱地址 (直接回车使用自建邮箱): ").strip()
# 创建注册器
api_key = os.getenv('CAPTCHA_API_KEY')
registrar = DuoPlusRegister(api_key)
if not email_input:
# 空回车,使用自建邮箱全自动注册
print(f"{Fore.CYAN}[INFO] 使用自建邮箱进行全自动注册...")
result = registrar.auto_register_with_temp_email()
if result.get("success"):
print(f"\n{Fore.GREEN}{'='*60}")
print(f"{Fore.GREEN}✅ 全自动注册成功!")
print(f"{Fore.GREEN}{'='*60}")
data = result.get("data", {})
print(f"{Fore.GREEN}Access Token: {data.get('access_token', 'N/A')[:30]}...")
print(f"{Fore.GREEN}Token 有效期: {data.get('expired_in', 0) // 3600} 小时")
print(f"{Fore.GREEN}赠送余额: {data.get('gift_balance', '$0.00')}")
# 显示邮箱密码(如果有)
if data.get('email_password'):
print(f"{Fore.GREEN}邮箱密码: {data.get('email_password')}")
print(f"{Fore.GREEN}{'='*60}")
else:
print(f"\n{Fore.RED}❌ 注册失败: {result.get('message', 'Unknown error')}")
else:
# 手动输入邮箱
if '@' not in email_input:
print(f"{Fore.RED}[ERROR] 请输入有效的邮箱地址")
return
email = email_input
# 询问是否使用自定义密码
use_custom_password = input(f"{Fore.YELLOW}是否使用自定义密码? (y/n, 默认n): ").strip().lower() == 'y'
password = None
if use_custom_password:
password = input(f"{Fore.YELLOW}请输入密码: ").strip()
if len(password) < 8:
print(f"{Fore.RED}[ERROR] 密码长度至少8位")
return
# 执行注册
result = registrar.auto_register(email, password)
if result.get("success"):
print(f"\n{Fore.GREEN}{'='*60}")
print(f"{Fore.GREEN}✅ 注册成功!")
print(f"{Fore.GREEN}{'='*60}")
data = result.get("data", {})
print(f"{Fore.GREEN}Access Token: {data.get('access_token', 'N/A')[:30]}...")
print(f"{Fore.GREEN}Token 有效期: {data.get('expired_in', 0) // 3600} 小时")
print(f"{Fore.GREEN}赠送余额: {data.get('gift_balance', '$0.00')}")
print(f"{Fore.GREEN}{'='*60}")
else:
print(f"\n{Fore.RED}❌ 注册失败: {result.get('message', 'Unknown error')}")
def batch_registration():
"""批量注册"""
print(f"\n{Fore.CYAN}=== 批量注册 ===")
# 获取邮箱前缀和域名
email_prefix = input(f"{Fore.YELLOW}请输入邮箱前缀 (例如: test): ").strip()
email_domain = input(f"{Fore.YELLOW}请输入邮箱域名 (例如: example.com): ").strip()
if not email_prefix or not email_domain:
print(f"{Fore.RED}[ERROR] 邮箱前缀和域名不能为空")
return
count = input(f"{Fore.YELLOW}请输入注册数量 (默认3): ").strip()
count = int(count) if count.isdigit() else 3
if count > 10:
print(f"{Fore.YELLOW}[WARNING] 一次注册超过10个账号可能会被限制")
confirm = input(f"{Fore.YELLOW}是否继续? (y/n): ").strip().lower()
if confirm != 'y':
return
# 创建注册器
api_key = os.getenv('CAPTCHA_API_KEY')
registrar = DuoPlusRegister(api_key)
success_count = 0
failed_count = 0
print(f"\n{Fore.CYAN}开始批量注册 {count} 个账号...")
for i in range(1, count + 1):
email = f"{email_prefix}{i}@{email_domain}"
print(f"\n{Fore.CYAN}[{i}/{count}] 正在注册: {email}")
try:
result = registrar.auto_register(email)
if result.get("success"):
success_count += 1
print(f"{Fore.GREEN}{email} 注册成功")
else:
failed_count += 1
print(f"{Fore.RED}{email} 注册失败: {result.get('message')}")
except Exception as e:
failed_count += 1
print(f"{Fore.RED}{email} 注册异常: {str(e)}")
# 避免请求过快
if i < count:
import time
print(f"{Fore.YELLOW}等待3秒后继续...")
time.sleep(3)
# 显示统计结果
print(f"\n{Fore.CYAN}{'='*60}")
print(f"{Fore.CYAN}批量注册完成!")
print(f"{Fore.GREEN}成功: {success_count}")
print(f"{Fore.RED}失败: {failed_count}")
print(f"{Fore.CYAN}{'='*60}")
def test_captcha():
"""测试验证码识别"""
print(f"\n{Fore.CYAN}=== 测试 2captcha 连接 ===")
api_key = os.getenv('CAPTCHA_API_KEY')
try:
import requests
response = requests.get(f"http://2captcha.com/res.php?key={api_key}&action=getbalance")
if response.status_code == 200:
balance = response.text
try:
balance_float = float(balance)
print(f"{Fore.GREEN}✅ 2captcha 连接成功")
print(f"{Fore.GREEN}账户余额: ${balance}")
if balance_float < 1:
print(f"{Fore.YELLOW}[WARNING] 余额较低,请及时充值")
except ValueError:
print(f"{Fore.RED}❌ API Key 无效: {response.text}")
else:
print(f"{Fore.RED}❌ 连接失败: HTTP {response.status_code}")
except Exception as e:
print(f"{Fore.RED}❌ 测试失败: {str(e)}")
def main():
"""主函数"""
# 打印横幅
print_banner()
# 检查环境
if not check_environment():
print(f"\n{Fore.RED}请配置好环境后重新运行程序")
sys.exit(1)
while True:
print(f"\n{Fore.CYAN}请选择操作:")
print("1. 单个账号注册")
print("2. 批量注册")
print("3. 测试 2captcha 连接")
print("4. 查看注册记录")
print("0. 退出")
choice = input(f"\n{Fore.YELLOW}请输入选项 (0-4): ").strip()
if choice == '1':
single_registration()
elif choice == '2':
batch_registration()
elif choice == '3':
test_captcha()
elif choice == '4':
if os.path.exists('registered_accounts.txt'):
print(f"\n{Fore.CYAN}=== 注册记录 ===")
with open('registered_accounts.txt', 'r', encoding='utf-8') as f:
print(f.read())
else:
print(f"{Fore.YELLOW}暂无注册记录")
elif choice == '0':
print(f"{Fore.GREEN}感谢使用,再见!")
break
else:
print(f"{Fore.RED}无效的选项,请重新选择")
if __name__ == "__main__":
try:
main()
except KeyboardInterrupt:
print(f"\n{Fore.YELLOW}程序已中断")
sys.exit(0)
except Exception as e:
print(f"\n{Fore.RED}程序异常: {str(e)}")
sys.exit(1)

9
registered_accounts.txt Normal file
View File

@@ -0,0 +1,9 @@
============================================================
注册时间: 2025-08-12 14:16:56
邮箱: userdu0qb6qd@cursor.edu.kg
密码: 8o4wN4z$Nodh
Access Token: dk9KdXQuWU66s40X_pdcjla6Om8UD_iKdRY
Refresh Token: dk9KdXQulkSQKXHoQD9MeB5hn_28TgTVCag
Token 有效期: 431999 秒
赠送余额: $0.00

3
requirements.txt Normal file
View File

@@ -0,0 +1,3 @@
requests>=2.31.0
python-dotenv>=1.0.0
colorama>=0.4.6

168
stalwart_client.py Normal file
View File

@@ -0,0 +1,168 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import requests
import json
import os
from typing import Dict, List, Any, Optional, Union
# 禁用代理设置
os.environ['HTTP_PROXY'] = ''
os.environ['HTTPS_PROXY'] = ''
os.environ['http_proxy'] = ''
os.environ['https_proxy'] = ''
class StalwartClient:
"""Stalwart API客户端"""
def __init__(self, base_url: str, api_key: Optional[str] = None, oauth_token: Optional[str] = None):
"""
初始化Stalwart API客户端
Args:
base_url: API基础URL例如 https://mail.example.org/api
api_key: API密钥可选
oauth_token: OAuth令牌可选
"""
self.base_url = base_url.rstrip('/')
self.api_key = api_key
self.oauth_token = oauth_token
self.session = requests.Session()
# 禁用环境代理设置
self.session.trust_env = False
# 如果提供了API密钥设置基本认证
if api_key:
if ':' in api_key:
username, password = api_key.split(':', 1)
self.session.auth = (username, password)
else:
# 如果API密钥格式不包含冒号则使用Bearer认证
self.session.headers.update({"Authorization": f"Bearer {api_key}"})
# 如果提供了OAuth令牌设置Bearer认证
if oauth_token:
self.session.headers.update({"Authorization": f"Bearer {oauth_token}"})
def _make_request(self, method: str, endpoint: str, **kwargs) -> Any:
"""
发送API请求
Args:
method: HTTP方法GET, POST, PATCH, DELETE等
endpoint: API端点路径
**kwargs: 传递给requests的其他参数
Returns:
解析后的JSON响应或原始响应
Raises:
requests.HTTPError: 请求失败时引发
"""
url = f"{self.base_url}/{endpoint.lstrip('/')}"
# 确保默认接受JSON响应
headers = kwargs.pop('headers', {})
headers.setdefault('Accept', 'application/json')
# 调试信息
print(f"发送请求: {method} {url}")
try:
response = self.session.request(method, url, headers=headers, **kwargs)
# 调试信息
print(f"响应状态码: {response.status_code}")
# 检查请求是否成功
response.raise_for_status()
# 尝试解析JSON响应
if response.headers.get('content-type', '').startswith('application/json'):
return response.json()
return response
except requests.exceptions.HTTPError as e:
print(f"HTTP错误: {e}")
if hasattr(e.response, 'text'):
print(f"响应内容: {e.response.text}")
raise
except requests.exceptions.ConnectionError as e:
print(f"连接错误: {e}")
raise
except requests.exceptions.Timeout as e:
print(f"超时错误: {e}")
raise
except requests.exceptions.RequestException as e:
print(f"请求异常: {e}")
raise
# -------------------- OAuth相关方法 --------------------
def obtain_oauth_token(self, client_id: str, redirect_uri: str, nonce: str) -> Dict:
"""获取OAuth令牌"""
data = {
"type": "code",
"client_id": client_id,
"redirect_uri": redirect_uri,
"nonce": nonce
}
return self._make_request('POST', '/oauth', json=data)
# -------------------- 主体(Principal)相关方法 --------------------
def list_principals(self, page: Optional[int] = None, limit: Optional[int] = None,
types: Optional[str] = None) -> Dict:
"""列出主体"""
params = {}
if page:
params['page'] = page
if limit:
params['limit'] = limit
if types:
params['types'] = types
return self._make_request('GET', '/principal', params=params)
def create_principal(self, principal_data: Dict) -> Dict:
"""创建主体"""
return self._make_request('POST', '/principal', json=principal_data)
def fetch_principal(self, principal_id: str) -> Dict:
"""获取主体详情"""
return self._make_request('GET', f'/principal/{principal_id}')
def update_principal(self, principal_id: str, updates: List[Dict]) -> Dict:
"""更新主体"""
return self._make_request('PATCH', f'/principal/{principal_id}', json=updates)
def delete_principal(self, principal_id: str) -> Dict:
"""删除主体"""
return self._make_request('DELETE', f'/principal/{principal_id}')
# -------------------- 队列相关方法 --------------------
def list_queued_messages(self, page: Optional[int] = None, max_total: Optional[int] = None,
limit: Optional[int] = None, values: Optional[int] = None) -> Dict:
"""列出排队消息"""
params = {}
if page:
params['page'] = page
if max_total:
params['max-total'] = max_total
if limit:
params['limit'] = limit
if values:
params['values'] = values
return self._make_request('GET', '/queue/messages', params=params)
def disable_proxy():
"""禁用代理设置"""
import os
os.environ['HTTP_PROXY'] = ''
os.environ['HTTPS_PROXY'] = ''
os.environ['http_proxy'] = ''
os.environ['https_proxy'] = ''

214
tracker.py Normal file
View File

@@ -0,0 +1,214 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
DuoPlus 数据追踪模块
处理 data4.net 的统计追踪请求
"""
import requests
import json
import time
import random
import string
from typing import Dict, Any, Optional
from colorama import init, Fore
# 初始化 colorama
init(autoreset=True)
class DataTracker:
"""Data4.net 数据追踪器"""
def __init__(self):
self.api_url = "https://api.data4.net/api"
self.website_id = "f487e5f1981a4c459c973fa4355b7fab"
self.session_id = self._generate_session_id()
self.uuid = self._generate_uuid()
self.visit_id = None
self.tracking_id = None
# 设置请求头
self.headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/138.0.0.0 Safari/537.36',
'Accept': '*/*',
'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8',
'Accept-Encoding': 'gzip, deflate, br, zstd',
'Content-Type': 'application/json',
'Origin': 'https://my.duoplus.cn',
'Referer': 'https://my.duoplus.cn/',
'sec-ch-ua': '"Not)A;Brand";v="8", "Chromium";v="138", "Google Chrome";v="138"',
'sec-ch-ua-mobile': '?0',
'sec-ch-ua-platform': '"Windows"',
'sec-fetch-site': 'cross-site',
'sec-fetch-mode': 'cors',
'sec-fetch-dest': 'empty',
'pragma': 'no-cache',
'cache-control': 'no-cache',
'priority': 'u=1, i'
}
def _generate_session_id(self) -> str:
"""生成会话ID"""
return ''.join(random.choices(string.ascii_lowercase + string.digits, k=32))
def _generate_uuid(self) -> str:
"""生成UUID"""
return ''.join(random.choices(string.ascii_lowercase + string.digits, k=32))
def send_initial_event(self, page_url: str = "/sign-up") -> bool:
"""
发送初始事件
Args:
page_url: 页面URL
Returns:
是否成功
"""
url = f"{self.api_url}/report/send"
payload = {
"type": "event",
"payload": {
"website": self.website_id,
"hostname": "my.duoplus.cn",
"screen": "2048x1152",
"language": "zh-CN",
"title": "DuoPlus 云手机",
"url": page_url,
"referrer": "my.duoplus.cn",
"session_id": self.session_id,
"uuid": self.uuid
}
}
try:
response = requests.post(url, json=payload, headers=self.headers)
if response.status_code == 200:
data = response.json()
if data.get("code") == 200:
self.session_id = data["data"]["session_id"]
self.visit_id = data["data"]["visit_id"]
self.tracking_id = data["data"]["id"]
print(f"{Fore.GREEN}[Tracker] 初始事件发送成功")
return True
print(f"{Fore.YELLOW}[Tracker] 初始事件发送失败")
return False
except Exception as e:
print(f"{Fore.YELLOW}[Tracker] 初始事件发送异常: {str(e)}")
return False
def send_pageview(self, page_url: str = "/sign-up") -> bool:
"""
发送页面浏览事件
Args:
page_url: 页面URL
Returns:
是否成功
"""
url = f"{self.api_url}/report/event"
payload = {
"payload": {
"website": self.website_id,
"hostname": "my.duoplus.cn",
"screen": "2048x1152",
"language": "zh-CN",
"title": "DuoPlus 云手机",
"url": page_url,
"referrer": "my.duoplus.cn",
"client_time": int(time.time()),
"scroll_depth_ratio": 0,
"session_id": self.session_id,
"uuid": self.uuid
},
"type": "pageview"
}
try:
response = requests.post(url, json=payload, headers=self.headers)
if response.status_code == 200:
data = response.json()
if data.get("code") == 200:
print(f"{Fore.GREEN}[Tracker] 页面浏览事件发送成功")
return True
print(f"{Fore.YELLOW}[Tracker] 页面浏览事件发送失败")
return False
except Exception as e:
print(f"{Fore.YELLOW}[Tracker] 页面浏览事件发送异常: {str(e)}")
return False
def send_view_time(self, view_time: int = 10) -> bool:
"""
发送页面停留时间
Args:
view_time: 停留时间(秒)
Returns:
是否成功
"""
if not self.visit_id or not self.tracking_id:
print(f"{Fore.YELLOW}[Tracker] 缺少必要的ID跳过发送停留时间")
return False
url = f"{self.api_url}/report/view_time"
payload = {
"website_id": self.website_id,
"view_time": view_time,
"id": self.tracking_id,
"session_id": self.session_id,
"visit_id": self.visit_id
}
try:
response = requests.post(url, json=payload, headers=self.headers)
if response.status_code == 200:
data = response.json()
if data.get("code") == 200:
print(f"{Fore.GREEN}[Tracker] 停留时间发送成功: {view_time}")
return True
print(f"{Fore.YELLOW}[Tracker] 停留时间发送失败")
return False
except Exception as e:
print(f"{Fore.YELLOW}[Tracker] 停留时间发送异常: {str(e)}")
return False
def track_registration_page(self) -> bool:
"""
追踪注册页面访问
Returns:
是否成功
"""
print(f"{Fore.CYAN}[Tracker] 开始追踪注册页面...")
# 发送初始事件
if not self.send_initial_event("/sign-up"):
print(f"{Fore.YELLOW}[Tracker] 初始事件发送失败,继续执行")
# 发送页面浏览事件
time.sleep(0.5)
if not self.send_pageview("/sign-up"):
print(f"{Fore.YELLOW}[Tracker] 页面浏览事件发送失败,继续执行")
# 模拟页面停留时间
time.sleep(1)
if not self.send_view_time(5):
print(f"{Fore.YELLOW}[Tracker] 停留时间发送失败,继续执行")
print(f"{Fore.GREEN}[Tracker] 页面追踪完成")
return True