from concurrent.futures import ThreadPoolExecutor import json import os import threading import time from ProxyPool import ProxyPool from SteamRegistrationWithPyno import SteamRegistrationWithPyno from EmailAPIClient import EmailAPIClient class ThreadManagerWithPyno: """线程管理器 - 使用验证码解决方案""" def __init__(self, config_path, email_file_path, proxy_file_path, email_source="txt_file", reg_count=0, email_api_client=None): self.config = self._load_config(config_path) self._validate_config() self.proxy_pool = ProxyPool(proxy_file_path) self.email_file = email_file_path self.executor = ThreadPoolExecutor(max_workers=self.config['executornum']) self._registration_local = threading.local() self._running = True self._registrations = set() self._registrations_lock = threading.Lock() self.email_source = email_source self.reg_count = reg_count self.processed_count = 0 self.count_lock = threading.Lock() self.email_api_client = email_api_client or EmailAPIClient() def _validate_config(self): """验证配置有效性""" required_fields = ['protocol', 'ssl', 'email_url', 'executornum', 'pyno_user_token'] for field in required_fields: if field not in self.config: raise ValueError(f"配置缺少必要字段: {field}") def _load_config(self, config_path): """加载配置""" with open(config_path, 'r') as f: return json.load(f) def parse_email_credentials(self, line): """解析邮件凭据""" parts = line.strip().split("----") if len(parts) == 2: return { 'email': parts[0], 'password': parts[1] } elif len(parts) == 4: return { 'email': parts[0], 'password': parts[1], 'client_id': parts[2], 'refresh_token': parts[3] } raise ValueError("Invalid email credentials format") def _get_registration(self): """获取线程本地的SteamRegistrationWithPyno实例""" if not hasattr(self._registration_local, 'registration'): registration = SteamRegistrationWithPyno( self.config, self.proxy_pool, ) self._registration_local.registration = registration # 将新创建的实例添加到跟踪集合 with self._registrations_lock: self._registrations.add(registration) return self._registration_local.registration def process_email(self, email_data): """处理一个邮箱账号的注册""" # 检查是否达到注册数量限制 if self.reg_count > 0: with self.count_lock: if self.processed_count >= self.reg_count: print(f"已达到注册数量限制: {self.reg_count}") return self.processed_count += 1 registration = self._get_registration() registration.main(email_data) def get_email_from_api(self): """从API获取邮箱账号 - 每次只获取一个""" try: print("从API获取单个邮箱账号...") email_data = self.email_api_client.get_email_credentials() if email_data: print(f"获取到邮箱: {email_data['email']}") return email_data except Exception as e: print(f"从API获取邮箱失败: {e}") return None def stop(self): """停止所有任务""" self._running = False # 首先关闭线程池,不再接受新任务 self.executor.shutdown(wait=False) # 遍历并停止所有注册实例 with self._registrations_lock: for registration in self._registrations: try: registration.running = False if (hasattr(registration, 'session') and registration.session and not getattr(registration.session, 'closed', True)): # 只有当session存在且未关闭时才进行关闭 registration.session.close() except Exception as e: pass # 清空实例集合 self._registrations.clear() def start(self): """启动处理""" self._running = True try: if self.email_source == "api": # 使用API获取邮箱 - 只获取一个 email_data = self.get_email_from_api() if not email_data: print("无法从API获取邮箱,任务停止") return # 处理单个邮箱 self.process_email(email_data) else: # 使用本地文件获取邮箱 with open(self.email_file, "r", encoding="utf-8") as file: for line in file: if not self._running or (self.reg_count > 0 and self.processed_count >= self.reg_count): break try: email_data = self.parse_email_credentials(line.strip()) self.executor.submit(self.process_email, email_data) except ValueError as e: print(f"错误的邮箱格式: {e}") continue finally: self.executor.shutdown(wait=True) class GUIThreadManagerWithPyno(ThreadManagerWithPyno): def __init__(self, config_path, email_path, proxy_path, gui, completed_tasks=None, email_source="txt_file", reg_count=0, email_api_client=None): super().__init__(config_path, email_path, proxy_path, email_source, reg_count, email_api_client) self.gui = gui self.completed_tasks = completed_tasks or set() def process_email(self, email_data): """处理单个邮件账号""" try: # 检查是否达到注册数量限制 if self.reg_count > 0: with self.count_lock: if self.processed_count >= self.reg_count: print(f"已达到注册数量限制: {self.reg_count}") return self.processed_count += 1 print(f"当前处理数量: {self.processed_count}/{self.reg_count}") if not self._running: # 检查是否应该继续 self.gui.update_status(email_data['email'], "任务已停止") return # 先更新状态为"准备中" self.gui.update_status(email_data['email'], "准备验证邮箱...") # 获取注册实例 registration = self._get_registration() registration.set_gui(self.gui) # 设置GUI引用 registration.running = self._running # 开始处理 self.gui.update_status(email_data['email'], "开始验证邮箱") registration.main(email_data) except Exception as e: # 捕获所有异常并更新状态 error_msg = str(e) # 确保错误信息不包含敏感数据 if len(error_msg) > 100: error_msg = error_msg[:100] + "..." self.gui.update_status(email_data['email'], f"处理失败: {error_msg}", result="失败") def start(self): """启动处理""" self._running = True self.processed_count = 0 # 重置计数器 try: if self.email_source == "api": # 使用API获取单个邮箱 self.gui.update_status("系统消息", "正在从API获取邮箱...") email_data = self.get_email_from_api() if not email_data: self.gui.update_status("API错误", "无法从API获取邮箱", result="失败") return # 处理单个邮箱,不使用线程池 if email_data['email'] not in self.completed_tasks: self.process_email(email_data) else: self.gui.update_status(email_data['email'], "邮箱已处理过,跳过", result="跳过") else: # 使用本地文件获取邮箱 with open(self.email_file, 'r', encoding='utf-8') as file: with ThreadPoolExecutor(max_workers=self.config['executornum']) as self.executor: for line in file: if not self._running or (self.reg_count > 0 and self.processed_count >= self.reg_count): break try: email_data = self.parse_email_credentials(line) # 跳过已完成的任务 if email_data['email'] not in self.completed_tasks: self.executor.submit(self.process_email, email_data) except ValueError as e: self.gui.update_status("解析错误", f"解析邮箱文件失败: {e}") except Exception as e: self.gui.update_status("系统错误", f"启动任务失败: {str(e)}", result="失败")