Files
steamreg/ThreadManagerWithPyno.py

231 lines
10 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from concurrent.futures import ThreadPoolExecutor
import json
import os
import threading
import time
from ProxyPool import ProxyPool
from SteamRegistrationWithPyno import SteamRegistrationWithPyno
from EmailAPIClient import EmailAPIClient
class ThreadManagerWithPyno:
"""线程管理器 - 使用验证码解决方案"""
def __init__(self, config_path, email_file_path, proxy_file_path, email_source="txt_file", reg_count=0, email_api_client=None):
self.config = self._load_config(config_path)
self._validate_config()
self.proxy_pool = ProxyPool(proxy_file_path)
self.email_file = email_file_path
self.executor = ThreadPoolExecutor(max_workers=self.config['executornum'])
self._registration_local = threading.local()
self._running = True
self._registrations = set()
self._registrations_lock = threading.Lock()
self.email_source = email_source
self.reg_count = reg_count
self.processed_count = 0
self.count_lock = threading.Lock()
self.email_api_client = email_api_client or EmailAPIClient()
def _validate_config(self):
"""验证配置有效性"""
required_fields = ['protocol', 'ssl', 'email_url', 'executornum', 'pyno_user_token']
for field in required_fields:
if field not in self.config:
raise ValueError(f"配置缺少必要字段: {field}")
def _load_config(self, config_path):
"""加载配置"""
with open(config_path, 'r') as f:
return json.load(f)
def parse_email_credentials(self, line):
"""解析邮件凭据"""
parts = line.strip().split("----")
if len(parts) == 2:
return {
'email': parts[0],
'password': parts[1]
}
elif len(parts) == 4:
return {
'email': parts[0],
'password': parts[1],
'client_id': parts[2],
'refresh_token': parts[3]
}
raise ValueError("Invalid email credentials format")
def _get_registration(self):
"""获取线程本地的SteamRegistrationWithPyno实例"""
if not hasattr(self._registration_local, 'registration'):
registration = SteamRegistrationWithPyno(
self.config,
self.proxy_pool,
)
self._registration_local.registration = registration
# 将新创建的实例添加到跟踪集合
with self._registrations_lock:
self._registrations.add(registration)
return self._registration_local.registration
def process_email(self, email_data):
"""处理一个邮箱账号的注册"""
# 检查是否达到注册数量限制
if self.reg_count > 0:
with self.count_lock:
if self.processed_count >= self.reg_count:
print(f"已达到注册数量限制: {self.reg_count}")
return
self.processed_count += 1
registration = self._get_registration()
registration.main(email_data)
def get_email_from_api(self):
"""从API获取邮箱账号 - 每次只获取一个"""
try:
print("从API获取单个邮箱账号...")
email_data = self.email_api_client.get_email_credentials()
if email_data:
print(f"获取到邮箱: {email_data['email']}")
return email_data
except Exception as e:
print(f"从API获取邮箱失败: {e}")
return None
def stop(self):
"""停止所有任务"""
self._running = False
# 首先关闭线程池,不再接受新任务
self.executor.shutdown(wait=False)
# 遍历并停止所有注册实例
with self._registrations_lock:
for registration in self._registrations:
try:
registration.running = False
if (hasattr(registration, 'session') and
registration.session and
not getattr(registration.session, 'closed', True)):
# 只有当session存在且未关闭时才进行关闭
registration.session.close()
except Exception as e:
pass
# 清空实例集合
self._registrations.clear()
def start(self):
"""启动处理"""
self._running = True
try:
if self.email_source == "api":
# 使用API获取邮箱 - 只获取一个
email_data = self.get_email_from_api()
if not email_data:
print("无法从API获取邮箱任务停止")
return
# 处理单个邮箱
self.process_email(email_data)
else:
# 使用本地文件获取邮箱
with open(self.email_file, "r", encoding="utf-8") as file:
for line in file:
if not self._running or (self.reg_count > 0 and self.processed_count >= self.reg_count):
break
try:
email_data = self.parse_email_credentials(line.strip())
self.executor.submit(self.process_email, email_data)
except ValueError as e:
print(f"错误的邮箱格式: {e}")
continue
finally:
self.executor.shutdown(wait=True)
class GUIThreadManagerWithPyno(ThreadManagerWithPyno):
def __init__(self, config_path, email_path, proxy_path, gui, completed_tasks=None, email_source="txt_file", reg_count=0, email_api_client=None):
super().__init__(config_path, email_path, proxy_path, email_source, reg_count, email_api_client)
self.gui = gui
self.completed_tasks = completed_tasks or set()
def process_email(self, email_data):
"""处理单个邮件账号"""
try:
# 检查是否达到注册数量限制
if self.reg_count > 0:
with self.count_lock:
if self.processed_count >= self.reg_count:
print(f"已达到注册数量限制: {self.reg_count}")
return
self.processed_count += 1
print(f"当前处理数量: {self.processed_count}/{self.reg_count}")
if not self._running: # 检查是否应该继续
self.gui.update_status(email_data['email'], "任务已停止")
return
# 先更新状态为"准备中"
self.gui.update_status(email_data['email'], "准备验证邮箱...")
# 获取注册实例
registration = self._get_registration()
registration.set_gui(self.gui) # 设置GUI引用
registration.running = self._running
# 开始处理
self.gui.update_status(email_data['email'], "开始验证邮箱")
registration.main(email_data)
except Exception as e:
# 捕获所有异常并更新状态
error_msg = str(e)
# 确保错误信息不包含敏感数据
if len(error_msg) > 100:
error_msg = error_msg[:100] + "..."
self.gui.update_status(email_data['email'], f"处理失败: {error_msg}", result="失败")
def start(self):
"""启动处理"""
self._running = True
self.processed_count = 0 # 重置计数器
try:
if self.email_source == "api":
# 使用API模式循环获取邮箱直到达到设定数量
with ThreadPoolExecutor(max_workers=1) as self.executor: # 使用单线程处理
# 循环注册,直到达到设定数量
while self._running and (self.reg_count == 0 or self.processed_count < self.reg_count):
self.gui.update_status("系统消息", f"正在从API获取邮箱({self.processed_count+1}/{self.reg_count})...")
# 每次只获取一个邮箱
email_data = self.get_email_from_api()
if not email_data:
self.gui.update_status("API错误", "无法从API获取邮箱", result="失败")
break
# 处理单个邮箱依然使用线程池但限制为1个线程
if email_data['email'] not in self.completed_tasks:
# 提交任务并等待完成
future = self.executor.submit(self.process_email, email_data)
future.result() # 等待当前注册完成
else:
self.gui.update_status(email_data['email'], "邮箱已处理过,跳过", result="跳过")
# 每次处理完一个邮箱后,暂停一下避免请求过快
time.sleep(2)
else:
# 使用本地文件获取邮箱
with open(self.email_file, 'r', encoding='utf-8') as file:
with ThreadPoolExecutor(max_workers=self.config['executornum']) as self.executor:
for line in file:
if not self._running or (self.reg_count > 0 and self.processed_count >= self.reg_count):
break
try:
email_data = self.parse_email_credentials(line)
# 跳过已完成的任务
if email_data['email'] not in self.completed_tasks:
self.executor.submit(self.process_email, email_data)
except ValueError as e:
self.gui.update_status("解析错误", f"解析邮箱文件失败: {e}")
except Exception as e:
self.gui.update_status("系统错误", f"启动任务失败: {str(e)}", result="失败")