231 lines
10 KiB
Python
231 lines
10 KiB
Python
from concurrent.futures import ThreadPoolExecutor
|
||
import json
|
||
import os
|
||
import threading
|
||
import time
|
||
from ProxyPool import ProxyPool
|
||
from SteamRegistrationWithPyno import SteamRegistrationWithPyno
|
||
from EmailAPIClient import EmailAPIClient
|
||
|
||
|
||
class ThreadManagerWithPyno:
|
||
"""线程管理器 - 使用验证码解决方案"""
|
||
def __init__(self, config_path, email_file_path, proxy_file_path, email_source="txt_file", reg_count=0, email_api_client=None):
|
||
self.config = self._load_config(config_path)
|
||
self._validate_config()
|
||
self.proxy_pool = ProxyPool(proxy_file_path)
|
||
self.email_file = email_file_path
|
||
self.executor = ThreadPoolExecutor(max_workers=self.config['executornum'])
|
||
self._registration_local = threading.local()
|
||
self._running = True
|
||
self._registrations = set()
|
||
self._registrations_lock = threading.Lock()
|
||
self.email_source = email_source
|
||
self.reg_count = reg_count
|
||
self.processed_count = 0
|
||
self.count_lock = threading.Lock()
|
||
self.email_api_client = email_api_client or EmailAPIClient()
|
||
|
||
def _validate_config(self):
|
||
"""验证配置有效性"""
|
||
required_fields = ['protocol', 'ssl', 'email_url', 'executornum', 'pyno_user_token']
|
||
for field in required_fields:
|
||
if field not in self.config:
|
||
raise ValueError(f"配置缺少必要字段: {field}")
|
||
|
||
def _load_config(self, config_path):
|
||
"""加载配置"""
|
||
with open(config_path, 'r') as f:
|
||
return json.load(f)
|
||
|
||
def parse_email_credentials(self, line):
|
||
"""解析邮件凭据"""
|
||
parts = line.strip().split("----")
|
||
if len(parts) == 2:
|
||
return {
|
||
'email': parts[0],
|
||
'password': parts[1]
|
||
}
|
||
elif len(parts) == 4:
|
||
return {
|
||
'email': parts[0],
|
||
'password': parts[1],
|
||
'client_id': parts[2],
|
||
'refresh_token': parts[3]
|
||
}
|
||
raise ValueError("Invalid email credentials format")
|
||
|
||
def _get_registration(self):
|
||
"""获取线程本地的SteamRegistrationWithPyno实例"""
|
||
if not hasattr(self._registration_local, 'registration'):
|
||
registration = SteamRegistrationWithPyno(
|
||
self.config,
|
||
self.proxy_pool,
|
||
)
|
||
self._registration_local.registration = registration
|
||
# 将新创建的实例添加到跟踪集合
|
||
with self._registrations_lock:
|
||
self._registrations.add(registration)
|
||
return self._registration_local.registration
|
||
|
||
def process_email(self, email_data):
|
||
"""处理一个邮箱账号的注册"""
|
||
# 检查是否达到注册数量限制
|
||
if self.reg_count > 0:
|
||
with self.count_lock:
|
||
if self.processed_count >= self.reg_count:
|
||
print(f"已达到注册数量限制: {self.reg_count}")
|
||
return
|
||
self.processed_count += 1
|
||
|
||
registration = self._get_registration()
|
||
registration.main(email_data)
|
||
|
||
def get_email_from_api(self):
|
||
"""从API获取邮箱账号 - 每次只获取一个"""
|
||
try:
|
||
print("从API获取单个邮箱账号...")
|
||
email_data = self.email_api_client.get_email_credentials()
|
||
if email_data:
|
||
print(f"获取到邮箱: {email_data['email']}")
|
||
return email_data
|
||
except Exception as e:
|
||
print(f"从API获取邮箱失败: {e}")
|
||
return None
|
||
|
||
def stop(self):
|
||
"""停止所有任务"""
|
||
self._running = False
|
||
# 首先关闭线程池,不再接受新任务
|
||
self.executor.shutdown(wait=False)
|
||
|
||
# 遍历并停止所有注册实例
|
||
with self._registrations_lock:
|
||
for registration in self._registrations:
|
||
try:
|
||
registration.running = False
|
||
if (hasattr(registration, 'session') and
|
||
registration.session and
|
||
not getattr(registration.session, 'closed', True)):
|
||
# 只有当session存在且未关闭时才进行关闭
|
||
registration.session.close()
|
||
except Exception as e:
|
||
pass
|
||
|
||
# 清空实例集合
|
||
self._registrations.clear()
|
||
|
||
def start(self):
|
||
"""启动处理"""
|
||
self._running = True
|
||
try:
|
||
if self.email_source == "api":
|
||
# 使用API获取邮箱 - 只获取一个
|
||
email_data = self.get_email_from_api()
|
||
if not email_data:
|
||
print("无法从API获取邮箱,任务停止")
|
||
return
|
||
# 处理单个邮箱
|
||
self.process_email(email_data)
|
||
else:
|
||
# 使用本地文件获取邮箱
|
||
with open(self.email_file, "r", encoding="utf-8") as file:
|
||
for line in file:
|
||
if not self._running or (self.reg_count > 0 and self.processed_count >= self.reg_count):
|
||
break
|
||
try:
|
||
email_data = self.parse_email_credentials(line.strip())
|
||
self.executor.submit(self.process_email, email_data)
|
||
except ValueError as e:
|
||
print(f"错误的邮箱格式: {e}")
|
||
continue
|
||
finally:
|
||
self.executor.shutdown(wait=True)
|
||
|
||
class GUIThreadManagerWithPyno(ThreadManagerWithPyno):
|
||
def __init__(self, config_path, email_path, proxy_path, gui, completed_tasks=None, email_source="txt_file", reg_count=0, email_api_client=None):
|
||
super().__init__(config_path, email_path, proxy_path, email_source, reg_count, email_api_client)
|
||
self.gui = gui
|
||
self.completed_tasks = completed_tasks or set()
|
||
|
||
def process_email(self, email_data):
|
||
"""处理单个邮件账号"""
|
||
try:
|
||
# 检查是否达到注册数量限制
|
||
if self.reg_count > 0:
|
||
with self.count_lock:
|
||
if self.processed_count >= self.reg_count:
|
||
print(f"已达到注册数量限制: {self.reg_count}")
|
||
return
|
||
self.processed_count += 1
|
||
print(f"当前处理数量: {self.processed_count}/{self.reg_count}")
|
||
|
||
if not self._running: # 检查是否应该继续
|
||
self.gui.update_status(email_data['email'], "任务已停止")
|
||
return
|
||
|
||
# 先更新状态为"准备中"
|
||
self.gui.update_status(email_data['email'], "准备验证邮箱...")
|
||
|
||
# 获取注册实例
|
||
registration = self._get_registration()
|
||
registration.set_gui(self.gui) # 设置GUI引用
|
||
registration.running = self._running
|
||
|
||
# 开始处理
|
||
self.gui.update_status(email_data['email'], "开始验证邮箱")
|
||
registration.main(email_data)
|
||
|
||
except Exception as e:
|
||
# 捕获所有异常并更新状态
|
||
error_msg = str(e)
|
||
# 确保错误信息不包含敏感数据
|
||
if len(error_msg) > 100:
|
||
error_msg = error_msg[:100] + "..."
|
||
self.gui.update_status(email_data['email'], f"处理失败: {error_msg}", result="失败")
|
||
|
||
def start(self):
|
||
"""启动处理"""
|
||
self._running = True
|
||
self.processed_count = 0 # 重置计数器
|
||
|
||
try:
|
||
if self.email_source == "api":
|
||
# 使用API模式,循环获取邮箱直到达到设定数量
|
||
with ThreadPoolExecutor(max_workers=1) as self.executor: # 使用单线程处理
|
||
# 循环注册,直到达到设定数量
|
||
while self._running and (self.reg_count == 0 or self.processed_count < self.reg_count):
|
||
self.gui.update_status("系统消息", f"正在从API获取邮箱({self.processed_count+1}/{self.reg_count})...")
|
||
|
||
# 每次只获取一个邮箱
|
||
email_data = self.get_email_from_api()
|
||
if not email_data:
|
||
self.gui.update_status("API错误", "无法从API获取邮箱", result="失败")
|
||
break
|
||
|
||
# 处理单个邮箱,依然使用线程池但限制为1个线程
|
||
if email_data['email'] not in self.completed_tasks:
|
||
# 提交任务并等待完成
|
||
future = self.executor.submit(self.process_email, email_data)
|
||
future.result() # 等待当前注册完成
|
||
else:
|
||
self.gui.update_status(email_data['email'], "邮箱已处理过,跳过", result="跳过")
|
||
|
||
# 每次处理完一个邮箱后,暂停一下避免请求过快
|
||
time.sleep(2)
|
||
else:
|
||
# 使用本地文件获取邮箱
|
||
with open(self.email_file, 'r', encoding='utf-8') as file:
|
||
with ThreadPoolExecutor(max_workers=self.config['executornum']) as self.executor:
|
||
for line in file:
|
||
if not self._running or (self.reg_count > 0 and self.processed_count >= self.reg_count):
|
||
break
|
||
try:
|
||
email_data = self.parse_email_credentials(line)
|
||
# 跳过已完成的任务
|
||
if email_data['email'] not in self.completed_tasks:
|
||
self.executor.submit(self.process_email, email_data)
|
||
except ValueError as e:
|
||
self.gui.update_status("解析错误", f"解析邮箱文件失败: {e}")
|
||
except Exception as e:
|
||
self.gui.update_status("系统错误", f"启动任务失败: {str(e)}", result="失败") |