Files
steamreg/RegistrationGUIWithPynoV2.py

695 lines
29 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import json
import os
import tkinter as tk
from tkinter import ttk
import threading
from tkinter import messagebox
from tkinter import filedialog
from ThreadManagerWithPyno import GUIThreadManagerWithPyno
from PynoCaptchaSolver import PynoCaptchaConfig, PynoCaptchaSolver
from SteamCaptchaHelper import SteamCaptchaHelper
from EmailAPIClient import EmailAPIClient
import time
# 配置日志
from loguru import logger
logger.add("steam_registration.log", rotation="10 MB")
class SteamRegistrationGUI:
def __init__(self, root):
self.root = root
self.root.title("Steam账号注册助手")
self.root.geometry("800x800")
self.root.minsize(800, 800) # 设置最小窗口大小
# 创建主框架
main_frame = ttk.Frame(root)
main_frame.pack(fill="both", expand=True, padx=10, pady=5)
# 顶部框架 - 包含所有输入配置
top_frame = ttk.Frame(main_frame)
top_frame.pack(fill="x", side="top", padx=5, pady=5)
# 加载默认配置
self.script_dir = os.path.dirname(os.path.abspath(__file__))
self.config_path = os.path.join(self.script_dir, 'config.json')
self.config = self._load_config()
# 创建验证码配置框架
captcha_frame = ttk.LabelFrame(top_frame, text="验证码配置")
captcha_frame.pack(fill="x", padx=5, pady=5)
# 创建验证码配置输入框
self.captcha_vars = {}
self._create_captcha_widgets(captcha_frame)
# 创建Steam状态框架
steam_frame = ttk.LabelFrame(top_frame, text="Steam验证状态")
steam_frame.pack(fill="x", padx=5, pady=5)
# 添加Steam状态信息
self.steam_status_var = tk.StringVar(value="未获取验证信息")
self.steam_sitekey_var = tk.StringVar(value="未获取sitekey")
self.steam_gid_var = tk.StringVar(value="未获取gid")
ttk.Label(steam_frame, text="状态:").grid(row=0, column=0, padx=5, pady=2, sticky="e")
ttk.Label(steam_frame, textvariable=self.steam_status_var).grid(row=0, column=1, padx=5, pady=2, sticky="w")
ttk.Label(steam_frame, text="SiteKey:").grid(row=1, column=0, padx=5, pady=2, sticky="e")
ttk.Label(steam_frame, textvariable=self.steam_sitekey_var).grid(row=1, column=1, padx=5, pady=2, sticky="w")
ttk.Label(steam_frame, text="GID:").grid(row=2, column=0, padx=5, pady=2, sticky="e")
ttk.Label(steam_frame, textvariable=self.steam_gid_var).grid(row=2, column=1, padx=5, pady=2, sticky="w")
ttk.Button(steam_frame, text="刷新验证信息",
command=self._refresh_steam_info).grid(row=3, column=0, columnspan=2, padx=5, pady=5)
# 创建邮箱选项框架
email_option_frame = ttk.LabelFrame(top_frame, text="邮箱设置")
email_option_frame.pack(fill="x", padx=5, pady=5)
# 添加邮箱来源选择
self.email_source_var = tk.StringVar(value="txt_file")
email_source_options = [
("自建邮箱API", "api"),
("本地文件", "txt_file"),
("微软Graph", "graph")
]
# 注册数量和线程数量设置
count_frame = ttk.Frame(email_option_frame)
count_frame.pack(fill="x", padx=5, pady=5)
# 注册数量设置
ttk.Label(count_frame, text="注册数量:").grid(row=0, column=0, padx=5, pady=2, sticky="e")
self.reg_count_var = tk.StringVar(value="10")
ttk.Entry(count_frame, textvariable=self.reg_count_var, width=10).grid(row=0, column=1, padx=5, pady=2, sticky="w")
ttk.Label(count_frame, text="(0表示不限制)").grid(row=0, column=2, padx=5, pady=2, sticky="w")
# 线程数量设置
ttk.Label(count_frame, text="线程数量:").grid(row=1, column=0, padx=5, pady=2, sticky="e")
self.thread_count_var = tk.StringVar(value=str(self.config.get("executornum", "1")))
ttk.Entry(count_frame, textvariable=self.thread_count_var, width=10).grid(row=1, column=1, padx=5, pady=2, sticky="w")
# 邮箱协议相关配置(隐藏但保留功能)
self.protocol_var = tk.StringVar(value=str(self.config.get("protocol", "IMAP")))
self.ssl_var = tk.StringVar(value=str(self.config.get("ssl", "True")))
self.email_url_var = tk.StringVar(value=str(self.config.get("email_url", "mail.evnmail.com")))
# 邮箱来源选择
email_source_frame = ttk.Frame(email_option_frame)
email_source_frame.pack(fill="x", padx=5, pady=5)
for i, (text, value) in enumerate(email_source_options):
ttk.Radiobutton(
email_source_frame,
text=text,
variable=self.email_source_var,
value=value,
command=self._toggle_email_source
).grid(row=0, column=i, padx=10, pady=2)
# 创建文件选择框架
self.files_frame = ttk.LabelFrame(top_frame, text="文件选择")
self.files_frame.pack(fill="x", padx=5, pady=5)
# 邮箱文件选择
self.email_path = tk.StringVar(value=os.path.join(self.script_dir, 'email_password.txt'))
self._create_file_selector(self.files_frame, "邮箱文件:", self.email_path, 0)
# 代理文件选择
self.proxy_path = tk.StringVar(value=os.path.join(self.script_dir, 'proxy_ips.txt'))
self._create_file_selector(self.files_frame, "代理文件:", self.proxy_path, 1)
# 按钮框架 - 直接放在主界面上,任务列表上方
button_frame = ttk.Frame(main_frame)
button_frame.pack(fill="x", padx=5, pady=10)
# 添加开始按钮
self.start_button = ttk.Button(
button_frame,
text="开始注册",
command=self.start_registration,
width=15
)
self.start_button.pack(side="left", padx=10, pady=5, expand=True)
# 添加停止按钮
self.stop_button = ttk.Button(
button_frame,
text="停止注册",
command=self.stop_registration,
state="disabled",
width=15
)
self.stop_button.pack(side="left", padx=10, pady=5, expand=True)
# 添加测试验证码按钮
self.test_captcha_button = ttk.Button(
button_frame,
text="测试验证码",
command=self.test_captcha,
width=15
)
self.test_captcha_button.pack(side="left", padx=10, pady=5, expand=True)
# 添加获取邮箱测试按钮
self.test_email_button = ttk.Button(
button_frame,
text="测试邮箱API",
command=self.test_email_api,
width=15
)
self.test_email_button.pack(side="left", padx=10, pady=5, expand=True)
# 中间框架 - 包含任务状态表格
middle_frame = ttk.Frame(main_frame)
middle_frame.pack(fill="both", expand=True, padx=5, pady=5)
# 创建任务列表框架
task_frame = ttk.LabelFrame(middle_frame, text="任务状态")
task_frame.pack(fill="both", expand=True, padx=5, pady=5)
# 创建表格框架
table_frame = ttk.Frame(task_frame)
table_frame.pack(fill="both", expand=True, padx=5, pady=5)
self.tree = ttk.Treeview(table_frame, columns=("邮箱", "状态", "账户名", "密码", "结果"), show="headings")
# 设置列头
self.tree.heading("邮箱", text="邮箱")
self.tree.heading("状态", text="状态")
self.tree.heading("账户名", text="账户名")
self.tree.heading("密码", text="密码")
self.tree.heading("结果", text="结果")
# 设置列宽
self.tree.column("邮箱", width=200)
self.tree.column("状态", width=200)
self.tree.column("账户名", width=100)
self.tree.column("密码", width=100)
self.tree.column("结果", width=100)
# 添加滚动条
scrollbar = ttk.Scrollbar(table_frame, orient="vertical", command=self.tree.yview)
self.tree.configure(yscrollcommand=scrollbar.set)
# 布局
self.tree.pack(side="left", fill="both", expand=True)
scrollbar.pack(side="right", fill="y")
# 存储线程状态
self.thread_status = {}
# 存储Steam验证信息
self.steam_data = None
self.steam_helper = SteamCaptchaHelper()
# 添加进度控制变量
self._stop_progress_update = False
# 添加版本标签
version_label = ttk.Label(main_frame, text="Steam账号注册助手 V3.0", foreground="blue")
version_label.pack(side="bottom", padx=5, pady=5)
# 初始化邮箱API客户端
self.email_api_client = EmailAPIClient()
# 初始设置邮箱来源
self._toggle_email_source()
def _toggle_email_source(self):
"""根据邮箱来源选择切换界面显示"""
source = self.email_source_var.get()
# 首先隐藏所有与邮箱相关的文件选择框
for widget in self.files_frame.winfo_children():
if "邮箱文件" in str(widget):
widget.grid_remove()
# 根据所选邮箱来源显示相应控件
if source == "txt_file":
# 显示邮箱文件选择框
for widget in self.files_frame.winfo_children():
if "邮箱文件" in str(widget):
widget.grid()
def _refresh_steam_info(self):
"""刷新Steam验证信息"""
threading.Thread(target=self._refresh_steam_info_thread, daemon=True).start()
def _refresh_steam_info_thread(self):
"""在新线程中刷新Steam验证信息"""
try:
self.steam_status_var.set("正在获取Steam验证信息...")
self.steam_data = self.steam_helper.get_sitekey()
if self.steam_data:
self.steam_status_var.set("获取验证信息成功")
self.steam_sitekey_var.set(self.steam_data["sitekey"])
self.steam_gid_var.set(self.steam_data["gid"])
# 更新验证码配置中的sitekey
self.captcha_vars["pyno_sitekey"].set(self.steam_data["sitekey"])
# 更新User-Agent
self.captcha_vars["pyno_user_agent"].set(self.steam_helper.get_user_agent())
else:
self.steam_status_var.set("获取验证信息失败")
except Exception as e:
logger.error(f"刷新Steam信息出错: {str(e)}")
self.steam_status_var.set(f"错误: {str(e)}")
def _load_config(self):
"""加载配置文件"""
try:
with open(self.config_path, 'r') as f:
return json.load(f)
except Exception as e:
messagebox.showerror("错误", f"加载配置文件失败: {str(e)}")
return {}
def _create_captcha_widgets(self, parent):
"""创建验证码配置输入控件"""
captcha_items = [
("pyno_user_token", "API密钥:", None),
("pyno_sitekey", "网站Key:", None),
("pyno_referer", "Referer:", None),
("pyno_user_agent", "User-Agent:", None),
("pyno_timeout", "超时时间(秒):", None),
("pyno_debug", "调试模式:", ["True", "False"])
]
for i, (key, label, options) in enumerate(captcha_items):
ttk.Label(parent, text=label, width=10).grid(row=i, column=0, padx=5, pady=2, sticky="e")
if options:
var = tk.StringVar(value=str(self.config.get(key, "True" if key == "pyno_debug" else "False")))
widget = ttk.Combobox(parent, textvariable=var, values=options, width=50)
else:
default_value = ""
if key == "pyno_sitekey":
default_value = "8cf23430-f9c8-4aaa-9ba2-da32f65adf2e"
elif key == "pyno_referer":
default_value = "https://store.steampowered.com/join/"
elif key == "pyno_timeout":
default_value = "60"
elif key == "pyno_user_agent":
default_value = "Mozilla/5.0 (Windows NT 10.0; Win64; x64; Valve Steam Client/default/1741737356) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/126.0.6478.183 Safari/537.36"
var = tk.StringVar(value=str(self.config.get(key, default_value)))
widget = ttk.Entry(parent, textvariable=var, width=50)
widget.grid(row=i, column=1, padx=5, pady=2, sticky="ew")
self.captcha_vars[key] = var
def _create_file_selector(self, parent, label, var, row):
"""创建文件选择器"""
parent.grid_columnconfigure(1, weight=1)
ttk.Label(parent, text=label).grid(row=row, column=0, padx=5, pady=2, sticky="e")
ttk.Entry(parent, textvariable=var).grid(row=row, column=1, padx=5, pady=2, sticky="ew")
ttk.Button(parent, text="选择文件",
command=lambda: var.set(filedialog.askopenfilename())
).grid(row=row, column=2, padx=5, pady=2)
def update_status(self, email, status=None, account_name=None, password=None, result=None):
"""更新任务状态
Args:
email: 邮箱地址
status: 当前状态
account_name: 账户名
password: 密码
result: 结果
"""
try:
# 防止值中包含taskId等敏感信息
if status and isinstance(status, str) and 'taskId' in status:
status = "正在处理验证码..."
if email not in self.thread_status:
# 新建条目将None替换为空字符串
values = (
email,
status if status is not None else "",
account_name if account_name is not None else "",
password if password is not None else "",
result if result is not None else ""
)
self.thread_status[email] = self.tree.insert("", "end", values=values)
else:
# 获取当前值
current_values = list(self.tree.item(self.thread_status[email])['values'])
# 只更新非None的值
if status is not None:
current_values[1] = status
if account_name is not None:
current_values[2] = account_name
if password is not None:
current_values[3] = password
if result is not None:
current_values[4] = result
# 更新现有条目
self.tree.item(self.thread_status[email], values=tuple(current_values))
# 更新界面
self.root.update()
except Exception as e:
# 如果更新UI出错至少在控制台输出信息
print(f"更新界面时出错: {e}")
def get_completed_tasks(self):
"""获取已经完成的任务列表"""
completed_tasks = set()
for email in self.thread_status:
values = self.tree.item(self.thread_status[email])['values']
if values[4]: # 检查result列是否有值
completed_tasks.add(email)
return completed_tasks
def _save_config(self):
"""保存配置"""
try:
config = {}
# 保存邮箱协议配置(保留这些字段)
config['protocol'] = self.protocol_var.get()
config['ssl'] = self.ssl_var.get().lower() == "true"
config['email_url'] = self.email_url_var.get()
# 保存线程数量
try:
config['executornum'] = int(self.thread_count_var.get())
except ValueError:
config['executornum'] = 1
# 保存验证码配置
for key, var in self.captcha_vars.items():
value = var.get()
if key == "pyno_timeout":
value = int(value)
elif key == "pyno_debug":
value = value.lower() == "true"
config[key] = value
# 保存邮箱来源配置
config['email_source'] = self.email_source_var.get()
config['reg_count'] = int(self.reg_count_var.get())
with open(self.config_path, 'w') as f:
json.dump(config, f, indent=2)
return True
except Exception as e:
messagebox.showerror("错误", f"保存配置失败: {str(e)}")
return False
def _validate_inputs(self):
"""验证输入"""
# 检查邮箱来源
email_source = self.email_source_var.get()
if email_source == "txt_file" and not os.path.exists(self.email_path.get()):
messagebox.showerror("错误", "邮箱文件不存在")
return False
if not os.path.exists(self.proxy_path.get()):
messagebox.showerror("错误", "代理文件不存在")
return False
# 验证线程数量
try:
thread_count = int(self.thread_count_var.get())
if thread_count <= 0:
messagebox.showerror("错误", "线程数量必须大于0")
return False
except ValueError:
messagebox.showerror("错误", "线程数量必须是整数")
return False
# 验证验证码配置
required_captcha_keys = ["pyno_user_token"]
for key in required_captcha_keys:
if not self.captcha_vars[key].get().strip():
messagebox.showerror("错误", f"请填写 {key} 配置项")
return False
# 验证注册数量
try:
count = int(self.reg_count_var.get())
if count < 0:
messagebox.showerror("错误", "注册数量不能为负数")
return False
except ValueError:
messagebox.showerror("错误", "注册数量必须是整数")
return False
return True
def get_captcha_config(self):
"""获取验证码配置"""
return PynoCaptchaConfig(
user_token=self.captcha_vars["pyno_user_token"].get(),
sitekey=self.captcha_vars["pyno_sitekey"].get(),
referer=self.captcha_vars["pyno_referer"].get(),
user_agent=self.captcha_vars["pyno_user_agent"].get(),
timeout=int(self.captcha_vars["pyno_timeout"].get()),
debug=self.captcha_vars["pyno_debug"].get().lower() == "true"
)
def test_email_api(self):
"""测试邮箱API功能"""
# 禁用测试按钮
self.test_email_button.config(state="disabled")
# 在新线程中测试
threading.Thread(target=self._test_email_api_thread, daemon=True).start()
def _test_email_api_thread(self):
"""在新线程中测试邮箱API"""
try:
test_id = "测试邮箱API"
self.update_status(test_id, status="正在从API获取邮箱...")
# 获取邮箱账号
email_data = self.email_api_client.get_email_credentials()
if not email_data:
self.update_status(test_id, status="获取邮箱失败", result="失败")
messagebox.showerror("错误", "无法从API获取邮箱账号")
return
# 显示结果
self.update_status(
test_id,
status=f"获取邮箱成功: {email_data['email']}",
account_name="N/A",
password=email_data['password'],
result="成功"
)
messagebox.showinfo("成功", f"成功获取邮箱: {email_data['email']}")
except Exception as e:
logger.error(f"测试邮箱API出错: {str(e)}")
messagebox.showerror("错误", f"测试过程中出错: {str(e)}")
self.update_status(test_id, status=f"出错: {str(e)}", result="错误")
finally:
# 恢复按钮状态
self.test_email_button.config(state="normal")
def test_captcha(self):
"""测试验证码"""
# 验证配置
if not self.captcha_vars["pyno_user_token"].get().strip():
messagebox.showerror("错误", "请先填写验证码API密钥")
return
# 保存配置
if not self._save_config():
return
# 禁用测试按钮
self.test_captcha_button.config(state="disabled")
# 在新线程中测试
threading.Thread(target=self._test_captcha_thread, daemon=True).start()
def _test_captcha_thread(self):
"""在新线程中测试验证码"""
try:
test_id = "测试验证码"
self.update_status(test_id, status="正在准备验证码测试...")
# 首先获取动态sitekey
self.update_status(test_id, status="正在获取Steam验证信息...")
steam_data = self.steam_helper.get_sitekey()
if not steam_data:
self.update_status(test_id, status="获取验证信息失败", result="失败")
messagebox.showerror("错误", "获取Steam验证信息失败")
return
# 更新界面上的sitekey
self.steam_sitekey_var.set(steam_data["sitekey"])
self.steam_gid_var.set(steam_data["gid"])
self.steam_status_var.set("获取验证信息成功")
# 更新验证码配置
self.captcha_vars["pyno_sitekey"].set(steam_data["sitekey"])
self.captcha_vars["pyno_user_agent"].set(self.steam_helper.get_user_agent())
self._save_config()
# 获取配置
config = self.get_captcha_config()
# 创建求解器
solver = PynoCaptchaSolver(config)
# 更新状态
self.update_status(test_id, status="正在尝试解决验证码...")
# 解决验证码 - 每秒更新进度状态
progress_thread = threading.Thread(
target=self._update_captcha_progress,
args=(test_id,),
daemon=True
)
progress_thread.start()
# 解决验证码
result = solver.solve_hcaptcha()
# 停止进度更新
self._stop_progress_update = True
if not result:
messagebox.showerror("失败", "验证码解决失败")
self.update_status(test_id, status="验证码解决失败", result="失败")
return
self.update_status(test_id, status="验证码解决成功")
token_preview = result.get("captcha_text", "")[:30] + "..." if result.get("captcha_text") else "无令牌"
self.update_status(test_id, status=f"验证码解决成功: {token_preview}")
# 尝试验证邮箱
self.update_status(test_id, status="正在验证邮箱...")
captcha_text = result.get("captcha_text")
verify_result = self.steam_helper.verify_email(
"749army@gmail.com",
captcha_text,
steam_data["init_id"],
steam_data["gid"]
)
if verify_result.get("success") == 1:
messagebox.showinfo("成功", "验证邮箱成功!")
self.update_status(test_id, status="验证成功", result="通过")
else:
error_msg = verify_result.get('details', verify_result.get('error', '未知错误'))
messagebox.showerror("失败", f"验证邮箱失败: {error_msg}")
self.update_status(test_id, status=f"验证失败: {error_msg}", result="失败")
except Exception as e:
logger.error(f"测试过程中出错: {str(e)}")
messagebox.showerror("错误", f"测试过程中出错: {str(e)}")
self.update_status("测试验证码", status=f"出错: {str(e)}", result="错误")
finally:
# 恢复按钮状态
self.test_captcha_button.config(state="normal")
# 确保停止进度更新
self._stop_progress_update = True
def _update_captcha_progress(self, test_id):
"""更新验证码进度显示"""
self._stop_progress_update = False
progress_chars = ["|", "/", "-", "\\"]
i = 0
while not self._stop_progress_update:
self.update_status(test_id, status=f"正在尝试解决验证码... {progress_chars[i]}")
time.sleep(0.5)
i = (i + 1) % len(progress_chars)
def start_registration(self):
"""启动注册流程"""
try:
if not self._validate_inputs(): # 添加输入验证
return
if not self._save_config():
return
if hasattr(self, 'manager') and self.manager:
messagebox.showwarning("警告", "任务已在运行中")
return
self.start_button.config(state="disabled")
self.stop_button.config(state="normal")
threading.Thread(target=self._start_registration_thread,
daemon=True,
name="RegistrationThread").start()
except Exception as e:
messagebox.showerror("错误", f"启动失败: {str(e)}")
self.start_button.config(state="normal")
self.stop_button.config(state="disabled")
def _start_registration_thread(self):
"""在新线程中启动注册"""
try:
completed_tasks = self.get_completed_tasks()
# 获取验证码配置并保存到config.json
try:
# 先获取最新的Steam验证信息
steam_data = self.steam_helper.get_sitekey()
if steam_data:
self.steam_sitekey_var.set(steam_data["sitekey"])
self.steam_gid_var.set(steam_data["gid"])
self.steam_status_var.set("获取验证信息成功")
# 更新验证码配置
self.captcha_vars["pyno_sitekey"].set(steam_data["sitekey"])
self.captcha_vars["pyno_user_agent"].set(self.steam_helper.get_user_agent())
# 保存配置
if not self._save_config():
return
except Exception as e:
messagebox.showerror("错误", f"获取Steam验证信息失败: {str(e)}")
return
# 创建参数字典
manager_params = {
'config_path': self.config_path,
'email_path': self.email_path.get(),
'proxy_path': self.proxy_path.get(),
'gui': self,
'completed_tasks': completed_tasks,
'email_source': self.email_source_var.get(),
'reg_count': int(self.reg_count_var.get()),
'email_api_client': self.email_api_client if self.email_source_var.get() == "api" else None
}
# 使用线程管理器
self.manager = GUIThreadManagerWithPyno(**manager_params)
self.manager.start()
except Exception as e:
logger.error(f"启动注册线程出错: {str(e)}")
messagebox.showerror("错误", str(e))
finally:
self.start_button.config(state="normal")
self.stop_button.config(state="disabled")
def stop_registration(self):
"""停止注册流程"""
if hasattr(self, 'manager'):
self.manager.stop()
self.start_button.config(state="normal")
self.stop_button.config(state="disabled")
if __name__ == "__main__":
root = tk.Tk()
gui = SteamRegistrationGUI(root)
# 启动时自动刷新Steam信息
threading.Thread(target=gui._refresh_steam_info_thread, daemon=True).start()
root.mainloop()