实现邮箱API集成和注册数量限制功能

This commit is contained in:
hkyc
2025-05-22 22:43:44 +08:00
parent 4ee6a2e195
commit 3fb99d17e0
5 changed files with 348 additions and 70 deletions

View File

@@ -4,11 +4,12 @@ import os
import threading
from ProxyPool import ProxyPool
from SteamRegistrationWithPyno import SteamRegistrationWithPyno
from EmailAPIClient import EmailAPIClient
class ThreadManagerWithPyno:
"""线程管理器 - 使用PyNoCaptcha版本"""
def __init__(self, config_path, email_file_path, proxy_file_path):
"""线程管理器 - 使用验证码解决方案"""
def __init__(self, config_path, email_file_path, proxy_file_path, email_source="txt_file", reg_count=0, email_api_client=None):
self.config = self._load_config(config_path)
self._validate_config()
self.proxy_pool = ProxyPool(proxy_file_path)
@@ -18,6 +19,11 @@ class ThreadManagerWithPyno:
self._running = True
self._registrations = set()
self._registrations_lock = threading.Lock()
self.email_source = email_source
self.reg_count = reg_count
self.processed_count = 0
self.count_lock = threading.Lock()
self.email_api_client = email_api_client or EmailAPIClient()
def _validate_config(self):
"""验证配置有效性"""
@@ -62,10 +68,28 @@ class ThreadManagerWithPyno:
return self._registration_local.registration
def process_email(self, email_data):
"""处理一个邮箱账号的注册"""
# 检查是否达到注册数量限制
if self.reg_count > 0:
with self.count_lock:
if self.processed_count >= self.reg_count:
print(f"已达到注册数量限制: {self.reg_count}")
return
self.processed_count += 1
registration = self._get_registration()
registration.main(email_data)
def get_email_from_api(self):
"""从API获取邮箱账号"""
try:
email_data = self.email_api_client.get_email_credentials()
if email_data:
return email_data
except Exception as e:
print(f"从API获取邮箱失败: {e}")
return None
def stop(self):
"""停止所有任务"""
self._running = False
@@ -92,28 +116,47 @@ class ThreadManagerWithPyno:
"""启动处理"""
self._running = True
try:
with open(self.email_file, "r") as file:
for line in file:
if not self._running:
if self.email_source == "api":
# 使用API获取邮箱
while self._running and (self.reg_count == 0 or self.processed_count < self.reg_count):
email_data = self.get_email_from_api()
if not email_data:
print("无法从API获取邮箱任务停止")
break
try:
email_data = self.parse_email_credentials(line.strip())
self.executor.submit(self.process_email, email_data)
except ValueError as e:
print(f"错误的邮箱格式: {e}")
continue
self.executor.submit(self.process_email, email_data)
else:
# 使用本地文件获取邮箱
with open(self.email_file, "r", encoding="utf-8") as file:
for line in file:
if not self._running or (self.reg_count > 0 and self.processed_count >= self.reg_count):
break
try:
email_data = self.parse_email_credentials(line.strip())
self.executor.submit(self.process_email, email_data)
except ValueError as e:
print(f"错误的邮箱格式: {e}")
continue
finally:
self.executor.shutdown(wait=True)
class GUIThreadManagerWithPyno(ThreadManagerWithPyno):
def __init__(self, config_path, email_file_path, proxy_file_path, gui, completed_tasks=None):
super().__init__(config_path, email_file_path, proxy_file_path)
def __init__(self, config_path, email_path, proxy_path, gui, completed_tasks=None, email_source="txt_file", reg_count=0, email_api_client=None):
super().__init__(config_path, email_path, proxy_path, email_source, reg_count, email_api_client)
self.gui = gui
self.completed_tasks = completed_tasks or set()
def process_email(self, email_data):
"""处理单个邮件账号"""
try:
# 检查是否达到注册数量限制
if self.reg_count > 0:
with self.count_lock:
if self.processed_count >= self.reg_count:
print(f"已达到注册数量限制: {self.reg_count}")
return
self.processed_count += 1
print(f"当前处理数量: {self.processed_count}/{self.reg_count}")
if not self._running: # 检查是否应该继续
self.gui.update_status(email_data['email'], "任务已停止")
return
@@ -141,15 +184,36 @@ class GUIThreadManagerWithPyno(ThreadManagerWithPyno):
def start(self):
"""启动处理"""
self._running = True
with open(self.email_file, 'r', encoding='utf-8') as file:
with ThreadPoolExecutor(max_workers=self.config['executornum']) as self.executor:
for line in file:
if not self._running:
break
try:
email_data = self.parse_email_credentials(line)
self.processed_count = 0 # 重置计数器
try:
if self.email_source == "api":
# 使用API获取邮箱
with ThreadPoolExecutor(max_workers=self.config['executornum']) as self.executor:
while self._running and (self.reg_count == 0 or self.processed_count < self.reg_count):
email_data = self.get_email_from_api()
if not email_data:
self.gui.update_status("API错误", "无法从API获取邮箱", result="失败")
break
# 跳过已完成的任务
if email_data['email'] not in self.completed_tasks:
self.executor.submit(self.process_email, email_data)
except ValueError as e:
self.gui.update_status(email_data['email'],f"解析邮箱文件失败: {e}")
else:
print(f"跳过已完成的邮箱: {email_data['email']}")
else:
# 使用本地文件获取邮箱
with open(self.email_file, 'r', encoding='utf-8') as file:
with ThreadPoolExecutor(max_workers=self.config['executornum']) as self.executor:
for line in file:
if not self._running or (self.reg_count > 0 and self.processed_count >= self.reg_count):
break
try:
email_data = self.parse_email_credentials(line)
# 跳过已完成的任务
if email_data['email'] not in self.completed_tasks:
self.executor.submit(self.process_email, email_data)
except ValueError as e:
self.gui.update_status("解析错误", f"解析邮箱文件失败: {e}")
except Exception as e:
self.gui.update_status("系统错误", f"启动任务失败: {str(e)}", result="失败")