Files
steamreg/SteamRegistrationWithPyno.py

911 lines
39 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import base64
import imaplib
import json
import os
import poplib
import random
import re
import string
import time
import threading
from urllib.parse import urlparse, parse_qs
import requests
from bs4 import BeautifulSoup
import tkinter as tk
import email
import email.header
from email.parser import BytesParser
from email.policy import default
from PynoCaptchaSolver import PynoCaptchaConfig, PynoCaptchaSolver
from loguru import logger
from SteamCaptchaHelper import SteamCaptchaHelper
class SteamRegistrationWithPyno:
"""Steam注册处理器 - 使用PyNoCaptcha解决验证码"""
def __init__(self, config, proxy_pool):
self.config = config
self.proxy_pool = proxy_pool
self.script_dir = os.path.dirname(os.path.abspath(__file__))
self._file_lock = threading.Lock()
self._session_local = threading.local()
self.email_data = None
self.session = None
self.proxy_info = None
self.cookie_str = None
self.token = None
self.access_token = None
self.gid = None
self.init_id = None
self.sessionid = None
self.gui = None
self.running = True
self._steam_gid = None
self._steam_init_id = None
self._steam_sitekey = None
self.BASE_HEADERS = {
"Accept": "*/*",
"Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8,en-GB;q=0.7,en-US;q=0.6",
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/125.0.0.0 Safari/537.36 Edg/125.0.0.0",
"Referer": "https://store.steampowered.com/join/",
"Origin": "https://store.steampowered.com",
"Connection": "keep-alive",
"Content-Type": "application/x-www-form-urlencoded; charset=UTF-8",
"Sec-Fetch-Dest": "empty",
"Sec-Fetch-Mode": "cors",
"Sec-Fetch-Site": "same-origin",
"sec-ch-ua": '"Microsoft Edge";v="125"',
"sec-ch-ua-mobile": "?0",
"sec-ch-ua-platform": '"Windows"'
}
if not hasattr(self._session_local, 'session'):
self._session_local.session = requests.Session()
self._session_local.session.headers.update(self.BASE_HEADERS)
def set_gui(self, gui):
self.gui = gui
def update_status(self, status, account_name=None, password=None, result=None):
if self.gui:
self.gui.update_status(self.email_data['email'], status, account_name, password, result)
def is_email_valid(self):
"""验证邮箱有效性"""
try:
protocol = self.config['protocol']
email_url = self.config['email_url']
use_ssl = self.config['ssl']
email = self.email_data['email']
password = self.email_data['password']
if protocol == "IMAP":
server = imaplib.IMAP4_SSL(email_url) if use_ssl else imaplib.IMAP4(email_url)
server.login(email, password)
server.logout()
elif protocol == "POP3":
server = poplib.POP3_SSL(email_url) if use_ssl else poplib.POP3(email_url)
server.user(email)
server.pass_(password)
server.quit()
elif protocol == "GRAPH":
self._get_access_token()
if not self.access_token:
raise ValueError("获取访问令牌失败")
emails = self._graph_get_email()
if emails is None:
raise ValueError("获取邮件失败")
elif protocol == "IMAP_OAUTH":
self._get_access_token()
if not self.access_token:
raise ValueError("获取访问令牌失败")
server = self._authenticate_oauth2()
server.logout()
elif protocol == "POP3_OAUTH":
self._get_access_token()
if not self.access_token:
raise ValueError("获取访问令牌失败")
server = self._authenticate_oauth2()
server.quit()
else:
raise ValueError("不支持的协议类型")
return True
except Exception as e:
print(f"{email},邮箱验证失败: {e}")
return False
def _get_access_token(self):
"""获取OAuth访问令牌"""
data = {
'client_id': self.email_data['client_id'],
'refresh_token': self.email_data['refresh_token'],
'grant_type': 'refresh_token',
}
if self.config['protocol'] == 'GRAPH':
data['scope'] = 'https://graph.microsoft.com/.default'
response = requests.post(
'https://login.microsoftonline.com/consumers/oauth2/v2.0/token',
data=data
)
if response.status_code == 200:
result = response.json()
access_token = result.get('access_token')
refresh_token = result.get('refresh_token')
if refresh_token and refresh_token != self.email_data['refresh_token']:
self.email_data['refresh_token'] = refresh_token
self._update_refresh_token(refresh_token)
self.access_token = access_token
def _authenticate_oauth2(self):
"""OAuth2认证"""
email = self.email_data['email']
auth_string = f"user={email}\x01auth=Bearer {self.access_token}\x01\x01"
auth_bytes = auth_string.encode('ascii')
auth_b64 = base64.b64encode(auth_bytes).decode('ascii')
if self.config['protocol'] == "IMAP_OAUTH":
server = imaplib.IMAP4_SSL('outlook.office365.com', 993)
server.authenticate('XOAUTH2', lambda x: auth_string)
return server
elif self.config['protocol'] == "POP3_OAUTH":
server = poplib.POP3_SSL('outlook.office365.com', 995)
server._shortcmd('AUTH XOAUTH2')
server._shortcmd(auth_b64)
return server
def _graph_get_email(self):
try:
if not self.access_token:
return None
headers = {
'Authorization': f'Bearer {self.access_token}',
'Content-Type': 'application/json'
}
params = {
'$top': 5,
'$select': 'subject,body,receivedDateTime,from,hasAttachments',
'$orderby': 'receivedDateTime desc'
}
response = requests.get('https://graph.microsoft.com/v1.0/me/messages', headers=headers,params=params)
if response.status_code == 200:
emails = response.json().get('value', [])
return emails
else:
return None
except Exception as e:
return None
def _update_refresh_token(self, refresh_token):
"""更新refresh_token"""
file_path = os.path.join(self.script_dir, 'email_password.txt')
temp_path = os.path.join(self.script_dir, 'email_password.tmp')
with self._file_lock:
try:
with open(file_path, 'r') as f, open(temp_path, 'w') as temp:
for line in f:
parts = line.strip().split('----')
if len(parts) == 4 and parts[0] == self.email_data['email']:
parts[3] = refresh_token
line = '----'.join(parts) + '\n'
temp.write(line)
os.replace(temp_path, file_path)
except Exception as e:
print(f"更新refresh_token失败: {e}")
if os.path.exists(temp_path):
os.remove(temp_path)
def _setup_session(self):
"""设置代理会话"""
if not self.proxy_info:
raise ValueError("代理信息为空")
try:
parts = self.proxy_info.split(":")
if len(parts) != 4:
raise ValueError("代理格式错误")
proxy_ip, proxy_port, username, password = parts
# 验证端口是否为数字
if not proxy_port.isdigit():
raise ValueError("代理端口格式错误")
proxy_url = f"http://{username}:{password}@{proxy_ip}:{proxy_port}"
self.session = self._session_local.session
self.session.proxies = {
"http": proxy_url,
"https": proxy_url
}
# 设置超时
self.session.timeout = (10, 30) # 连接超时10秒读取超时30秒
except Exception as e:
print(f"设置代理会话失败: {e}")
raise
def _refresh_steam_info(self):
"""刷新Steam信息获取最新的gid和sitekey"""
try:
# 使用SteamCaptchaHelper获取验证信息
helper = SteamCaptchaHelper()
steam_data = helper.get_sitekey()
if not steam_data:
print("获取Steam验证信息失败")
return None
# 提取数据
self._steam_init_id = steam_data["init_id"]
self._steam_gid = steam_data["gid"]
self._steam_sitekey = steam_data["sitekey"]
# 使用SteamCaptchaHelper的session
self.session = steam_data["session"]
# 输出获取信息
print(f"获取init_id成功: {self._steam_init_id}")
print(f"获取gid成功: {self._steam_gid}")
print(f"获取sitekey成功: {self._steam_sitekey}")
# 更新配置中的sitekey
if 'pyno_sitekey' in self.config:
self.config['pyno_sitekey'] = self._steam_sitekey
# 确保配置中的user_agent与SteamCaptchaHelper一致
if 'pyno_user_agent' in self.config:
self.config['pyno_user_agent'] = helper.get_user_agent()
return {
"gid": self._steam_gid,
"sitekey": self._steam_sitekey,
"init_id": self._steam_init_id,
"session": self.session
}
except Exception as e:
print(f"刷新Steam信息出错: {str(e)}")
return None
def _get_captcha_with_pyno(self):
"""使用PyNoCaptcha解决验证码"""
try:
print("正在使用PyNoCaptcha解决验证码...")
# 先从SteamCaptchaHelper获取标准UA
helper = SteamCaptchaHelper()
standard_user_agent = helper.get_user_agent()
# 确保sitekey是通过正确的UA获取的
if not self._steam_sitekey or not self._steam_gid:
print("需要先获取有效的sitekey和gid")
steam_data = helper.get_sitekey()
if steam_data:
self._steam_sitekey = steam_data["sitekey"]
self._steam_gid = steam_data["gid"]
self._steam_init_id = steam_data["init_id"]
self.session = steam_data["session"]
else:
print("无法获取有效的Steam验证信息")
return None
# 创建PynoCaptcha配置确保使用正确的UA
config = PynoCaptchaConfig(
user_token=self.config.get('pyno_user_token'),
sitekey=self._steam_sitekey,
referer="https://store.steampowered.com/join/",
user_agent=standard_user_agent,
timeout=self.config.get('pyno_timeout', 60),
debug=self.config.get('pyno_debug', True)
)
# 创建求解器
solver = PynoCaptchaSolver(config)
# 解决验证码
print("开始解决验证码...")
result = solver.solve_hcaptcha()
if not result:
print("PyNoCaptcha验证码解决失败")
return None
print("PyNoCaptcha验证码解决成功")
return result.get("captcha_text")
except Exception as e:
print(f"PyNoCaptcha验证码解决出错: {str(e)}")
return None
def _ajax_verify_email(self):
"""发送邮箱验证请求"""
# 使用从SteamCaptchaHelper获取的headers
headers = {
"Host": "store.steampowered.com",
"X-Prototype-Version": "1.7",
"sec-ch-ua-platform": "\"Windows\"",
"sec-ch-ua": "\"Chromium\";v=\"130\", \"Google Chrome\";v=\"130\", \"Not?A_Brand\";v=\"99\"",
"sec-ch-ua-mobile": "?0",
"X-Requested-With": "XMLHttpRequest",
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64; Valve Steam Client/default/1741737356) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/126.0.6478.183 Safari/537.36",
"Accept": "text/javascript, text/html, application/xml, text/xml, */*",
"Content-type": "application/x-www-form-urlencoded; charset=UTF-8",
"Origin": "https://store.steampowered.com",
"Sec-Fetch-Site": "same-origin",
"Sec-Fetch-Mode": "cors",
"Sec-Fetch-Dest": "empty",
"Referer": "https://store.steampowered.com/join/",
"Accept-Language": "zh-CN,zh;q=0.9"
}
# 更新Cookie
if self.cookie_str:
headers["Cookie"] = self.cookie_str
print(f"使用gid: {self._steam_gid}")
print(f"使用init_id: {self._steam_init_id}")
data = {
"email": self.email_data['email'],
"captchagid": self._steam_gid,
"captcha_text": self.token,
"elang": '6', # 使用中文
"init_id": self._steam_init_id,
"guest": "false"
}
url = "https://store.steampowered.com/join/ajaxverifyemail"
return self.session.post(
url,
headers=headers,
data=data
)
def _generate_random_account_name(self, length):
"""生成随机账户名"""
characters = string.ascii_lowercase + string.digits
return ''.join(random.choice(characters) for _ in range(length))
def _generate_random_password(self, length):
"""生成随机密码"""
characters = string.ascii_letters + string.digits
return ''.join(random.choice(characters) for _ in range(length))
def _check_account_name_availability(self, account_name):
"""检查账户名可用性"""
payload = {
'accountname': account_name,
'count': 1,
'creationid': self.sessionid
}
try:
response = self.session.post(
'https://store.steampowered.com/join/checkavail/',
data=payload,
headers=self.BASE_HEADERS
)
return response.json() if response.status_code == 200 else None
except Exception as e:
print(f"检查账户名失败: {e}")
return None
def _check_password_availability(self, account_name, password):
"""检查密码可用性"""
payload = {
'accountname': account_name,
'password': password,
'count': 1
}
try:
response = self.session.post(
'https://store.steampowered.com/join/checkpasswordavail/',
data=payload,
headers=self.BASE_HEADERS
)
return response.json() if response.status_code == 200 else None
except Exception as e:
print(f"检查密码失败: {e}")
return None
def _save_account(self, account_name, password, success):
"""保存账户信息"""
file_name = "accounts_succ.txt" if success else "accounts_fail.txt"
file_path = os.path.join(self.script_dir, file_name)
with self._file_lock:
try:
with open(file_path, "a", encoding='utf-8') as file:
# 新的固定格式游戏帐xxx 游戏密码xxx 邮箱xxx 邮箱密码xxx webmail.evnmail.com
if self.config['protocol'] in ["GRAPH", "IMAP_OAUTH", "POP3_OAUTH"]:
# OAuth版本包含额外的OAuth信息但不显示在保存的文件中
save_data = (f"游戏帐:{account_name} 游戏密码:{password} 邮箱:{self.email_data['email']} "
f"邮箱密码:{self.email_data['password']} webmail.evnmail.com\n")
# 额外保存OAuth信息到专门的文件中
oauth_path = os.path.join(self.script_dir, "accounts_oauth.txt")
with open(oauth_path, "a", encoding='utf-8') as oauth_file:
oauth_file.write(f"{account_name}----{password}----{self.email_data['email']}----"
f"{self.email_data['password']}----{self.email_data['client_id']}----"
f"{self.email_data['refresh_token']}\n")
else:
save_data = (f"游戏帐:{account_name} 游戏密码:{password} 邮箱:{self.email_data['email']} "
f"邮箱密码:{self.email_data['password']} webmail.evnmail.com\n")
file.write(save_data)
except Exception as e:
print(f"保存账户信息失败: {e}")
def _log_error(self,reason):
self.update_status(f'{reason}',result='线程结束')
"""记录错误"""
error_path = os.path.join(self.script_dir, 'rgerror.txt')
with self._file_lock:
try:
with open(error_path, 'a', encoding='utf-8') as file:
# 根据协议类型保存不同格式的错误信息
if self.config['protocol'] in ["GRAPH", "IMAP_OAUTH", "POP3_OAUTH"]:
# 保存详细的OAuth错误信息到单独文件
oauth_error_path = os.path.join(self.script_dir, 'oauth_error.txt')
with open(oauth_error_path, 'a', encoding='utf-8') as oauth_file:
oauth_file.write(f"{self.email_data['email']}----{self.email_data['password']}----"
f"{self.email_data['client_id']}----{self.email_data['refresh_token']}\n")
# 主错误文件使用标准格式
error_data = f"邮箱:{self.email_data['email']} 邮箱密码:{self.email_data['password']} 错误原因:{reason} webmail.evnmail.com\n"
else:
error_data = f"邮箱:{self.email_data['email']} 邮箱密码:{self.email_data['password']} 错误原因:{reason} webmail.evnmail.com\n"
file.write(error_data)
except Exception as e:
print(f"记录错误失败: {e}")
def _ajax_check_email_verified(self):
"""检查邮箱验证状态"""
if not self.sessionid:
return False
data = {'creationid': self.sessionid}
url = 'https://store.steampowered.com/join/ajaxcheckemailverified'
headers = self.BASE_HEADERS.copy()
headers.update({
"cookie": self.cookie_str,
})
start_time = time.time()
verfy = False
while True:
if time.time() - start_time > 180: # 超过3分钟退出循环
self.update_status("邮箱验证超时")
raise Exception("邮箱验证超时")
response = self.session.post(url, data=data, headers=headers)
if response.ok:
success = response.json()
if success['success'] == 1 or verfy:
# 处理账户创建
self.update_status("邮箱验证完成,提交注册")
return self._create_account()
else:
print('等待邮箱验证')
self.update_status("等待邮箱验证")
if self._fetch_email_verification_url():
time.sleep(5)
verfy = True
time.sleep(2)
else:
time.sleep(5)
def _fetch_email_verification_url(self):
"""获取邮箱验证链接"""
max_attempts = 6
attempts = 0
href = ''
urls = []
def extract_urls_from_body(body, pattern):
found_urls = re.findall(pattern, body)
return [url.replace('&', '&').replace("=3D", "=").replace("=\r\n", "")
.replace("\r\n", "").replace("=\n", "").replace("\n", "")
for url in found_urls]
def decode_mime_words(text):
"""解码MIME编码的文本"""
if not text:
return ""
try:
decoded = email.header.decode_header(text)
return " ".join([
(str(t[0], t[1] or 'utf-8') if isinstance(t[0], bytes) else str(t[0]))
for t in decoded
])
except:
return text
def get_body_text(msg):
"""获取邮件正文文本内容"""
if msg.is_multipart():
# 如果邮件包含多个部分,递归处理
text_parts = []
for part in msg.get_payload():
if part.get_content_type() == 'text/plain':
charset = part.get_content_charset() or 'utf-8'
try:
text_parts.append(part.get_payload(decode=True).decode(charset))
except:
text_parts.append("[无法解码的内容]")
elif part.is_multipart():
# 递归处理多部分
text_parts.append(get_body_text(part))
return "\n".join(text_parts)
else:
# 单部分邮件
content_type = msg.get_content_type()
if content_type == 'text/plain':
charset = msg.get_content_charset() or 'utf-8'
try:
return msg.get_payload(decode=True).decode(charset)
except:
return "[无法解码的内容]"
else:
return f"[内容类型: {content_type}]"
def process_emails(emails, pattern):
for email in emails:
body = email.get('body', {}).get('content', '')
urls.extend(extract_urls_from_body(body, pattern))
def fetch_imap_emails():
"""获取IMAP邮件中的验证链接"""
local_urls = []
try:
self.update_status("正在连接IMAP服务器...")
# 连接到IMAP服务器
if self.config['ssl']:
mail = imaplib.IMAP4_SSL(self.config['email_url'])
else:
mail = imaplib.IMAP4(self.config['email_url'])
# 登录
self.update_status(f"正在登录邮箱...")
mail.login(self.email_data['email'], self.email_data['password'])
self.update_status("登录成功,开始获取邮件...")
# 只检查INBOX文件夹
steam_verification_pattern = r'https://store\.steampowered\.com/account/newaccountverification\?[^\s"\'<>]+'
try:
self.update_status("检查收件箱...")
mail.select("INBOX")
status, messages = mail.search(None, "ALL")
if status != "OK" or not messages[0]:
self.update_status("收件箱为空或无法访问")
else:
# 获取邮件ID列表从最新的开始
message_ids = messages[0].split()
message_count = len(message_ids)
self.update_status(f"找到 {message_count} 封邮件")
# 最多检查5封最新邮件
check_count = min(5, message_count)
message_ids = message_ids[-check_count:]
message_ids.reverse() # 最新的邮件放在前面
for i, msg_id in enumerate(message_ids):
self.update_status(f"正在检查邮件 {i+1}/{check_count}...")
status, msg_data = mail.fetch(msg_id, "(RFC822)")
if status != "OK":
continue
# 解析邮件内容
raw_email = msg_data[0][1]
msg = email.message_from_bytes(raw_email, policy=default)
# 提取信息
subject = decode_mime_words(msg["Subject"])
# 如果是Steam邮件优先处理
if "Steam" in subject:
self.update_status(f"找到Steam邮件: {subject}")
# 获取正文
body = get_body_text(msg)
# 查找Steam验证链接
found_urls = extract_urls_from_body(body, steam_verification_pattern)
if found_urls:
self.update_status(f"发现 {len(found_urls)} 个Steam验证链接")
local_urls.extend(found_urls)
else:
self.update_status("不是Steam邮件跳过")
except Exception as e:
self.update_status(f"处理邮件出错: {str(e)}")
# 尝试关闭连接
try:
mail.close()
except:
pass
mail.logout()
except Exception as e:
self.update_status(f"IMAP连接失败: {str(e)}")
return local_urls
def process_pop3(mail, pattern):
num_messages = len(mail.list()[1])
for i in range(num_messages):
raw_email = b'\n'.join(mail.retr(i + 1)[1])
text_body = raw_email.decode("utf-8")
urls.extend(extract_urls_from_body(text_body, pattern))
while attempts < max_attempts:
try:
if self.config['protocol'] == "GRAPH":
emails = self._graph_get_email()
if emails:
process_emails(emails, r'href="(https://store\.steampowered\.com/account/newaccountverification\?[^"]+)"')
else:
if self.config['protocol'] in ["IMAP", "IMAP_OAUTH"]:
if self.config['protocol'] == "IMAP_OAUTH":
self._get_access_token()
if not self.access_token:
attempts += 1
time.sleep(5)
continue
mail = self._authenticate_oauth2()
else:
# 使用改进的IMAP处理
self.update_status("开始获取邮件...")
imap_urls = fetch_imap_emails()
urls.extend(imap_urls)
elif self.config['protocol'] in ["POP3", "POP3_OAUTH"]:
if self.config['protocol'] == "POP3_OAUTH":
self._get_access_token()
if not self.access_token:
attempts += 1
time.sleep(5)
continue
mail = self._authenticate_oauth2()
else:
mail = poplib.POP3_SSL(self.config['email_url']) if self.config['ssl'] else poplib.POP3(self.config['email_url'])
mail.user(self.email_data['email'])
mail.pass_(self.email_data['password'])
process_pop3(mail, r'https://store\.steampowered\.com/account/newaccountverification\?stoken=3D[^\n]*\n[^\n]*\n[^\n]*\n\n\n')
mail.quit()
# 检查验证链接
for url in urls:
self.update_status(f"找到可能的验证链接,检查匹配...")
parsed_url = urlparse(url)
query_string = parsed_url.query
params = parse_qs(query_string)
creationid = params.get('creationid')
if creationid and creationid[0] == self.sessionid:
href = url
break
if href:
self.update_status("找到匹配的验证链接,正在验证...")
return self._verify_email_link(href)
else:
self.update_status("未找到匹配的验证链接等待5秒后重试...")
attempts += 1
time.sleep(5)
except Exception as e:
self.update_status(f"邮件处理错误: {str(e)}")
attempts += 1
time.sleep(5)
if attempts >= max_attempts:
self.update_status("达到最大尝试次数,未能成功获取邮件或链接")
return False
def _verify_email_link(self, href):
"""验证邮箱链接"""
headers = self.BASE_HEADERS.copy()
headers.update({
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8",
"Upgrade-Insecure-Requests": "1"
})
try:
response = self.session.get(href, headers=headers)
soup = BeautifulSoup(response.content, "html.parser")
error_div = soup.find("div", class_="newaccount_email_verified_text error")
if error_div:
print('验证失败')
return False
print("验证完成")
return True
except Exception as e:
print(f"验证链接访问失败: {e}")
return False
def _create_account(self):
"""创建Steam账户"""
# 生成随机账户名和密码
account_name = self._generate_random_account_name(random.randint(8, 12))
# 检查账户名可用性
result = self._check_account_name_availability(account_name)
if result:
if result['bAvailable']:
print(f"账户名可用: {account_name}")
else:
print("账户名不可用")
if 'rgSuggestions' in result and result['rgSuggestions']:
print("使用建议的账户名:", result['rgSuggestions'][0])
account_name = result['rgSuggestions'][0]
else:
raise Exception("无法获取可用的账户名")
else:
raise Exception("账户名检查失败")
# 生成并检查密码
while True:
password = self._generate_random_password(random.randint(8, 12))
password_result = self._check_password_availability(account_name, password)
if password_result and password_result['bAvailable']:
print(f"密码可用: {password}")
break
print("密码不可用,重新生成")
# 创建账户
self.update_status("创建账户",account_name=account_name,password=password)
self._create_steam_account(account_name, password)
return True
def _create_steam_account(self, account_name, password):
"""提交Steam账户创建请求"""
data = {
'accountname': account_name,
'password': password,
'count': 0,
'lt': 1,
'creation_sessionid': self.sessionid,
'embedded_appid': 0,
'guest': False
}
headers = self.BASE_HEADERS.copy()
headers.update({
"Cookie": self.cookie_str,
"X-Requested-With": "XMLHttpRequest"
})
try:
response = self.session.post(
'https://store.steampowered.com/join/createaccount/',
data=data,
headers=headers
)
if response.ok:
result = response.json()
print(f'{account_name} 提交注册完成')
self.update_status('提交注册完成',account_name=account_name,password=password,result=result['bSuccess'])
self._save_account(account_name, password, result['bSuccess'])
else:
raise Exception(f'创建账户请求失败: {response.status_code}')
except Exception as e:
print(f'创建账户时出错: {str(e)}')
raise
def main(self, email_data, retries=30):
"""主处理函数"""
main_retry_count = 0
self.email_data = email_data
if not self.is_email_valid():
self._log_error("邮箱验证失败")
return
while main_retry_count < retries:
try:
if not self.running:
self.update_status("任务已停止")
break
# 设置代理(其他功能仍然需要代理)
self.proxy_info = self.proxy_pool.get_proxy()
self._setup_session()
# 先使用SteamCaptchaHelper获取验证信息它使用正确的User-Agent
self.update_status("获取Steam验证信息")
helper = SteamCaptchaHelper()
# 如果有代理设置到helper的session
if hasattr(self.session, 'proxies') and self.session.proxies:
helper.session.proxies = self.session.proxies
steam_data = helper.get_sitekey()
if not steam_data:
self.update_status("获取Steam验证信息失败重试")
raise Exception("获取Steam验证信息失败")
# 提取并保存验证信息
self._steam_init_id = steam_data["init_id"]
self._steam_gid = steam_data["gid"]
self._steam_sitekey = steam_data["sitekey"]
# 使用helper的session
self.session = steam_data["session"]
# 设置cookie
self.cookie_str = "timezoneOffset=28800,0; Steam_Language=english; "
cookies = self.session.cookies.get_dict()
if cookies:
self.cookie_str += '; '.join([f"{k}={v}" for k, v in cookies.items()])
# 使用PyNoCaptcha解决验证码
self.update_status("使用PyNoCaptcha进行人机验证")
self.token = self._get_captcha_with_pyno()
if not self.token:
self.update_status("验证码解决失败,重试")
raise Exception("验证码解决失败")
self.update_status("验证码解决成功")
# 提交邮箱验证
self.update_status("提交注册验证")
response = self._ajax_verify_email()
# 处理响应
try:
resp_json = response.json()
success = resp_json.get('success', 0)
details = resp_json.get('details', '')
self.update_status(f'提交结果:状态={success}, 详情={details}')
if success != 1:
self.update_status(f"验证未通过: {details}", result="失败")
raise Exception(f"验证未通过: {details}")
# 获取sessionid
self.sessionid = resp_json.get('sessionid')
if not self.sessionid:
self.update_status("未能获取sessionid", result="失败")
raise Exception("未能获取sessionid")
# 进行邮箱验证
self.update_status("等待邮箱验证")
self._ajax_check_email_verified()
# 成功完成
self.session.close()
self.proxy_pool.mark_success(self.proxy_info)
self.proxy_info = None
break
except Exception as e:
self.update_status(f"处理响应出错: {str(e)}")
raise
except Exception as e:
if not self.running:
self.update_status("任务已停止")
break
# 安全地打印错误,避免泄露敏感信息
error_msg = str(e)
if len(error_msg) > 50:
error_msg = error_msg[:50] + "..."
self.update_status(f"出错: {error_msg}")
# 清理资源
if self.session:
try:
self.session.close()
except:
pass
if self.proxy_info:
self.proxy_pool.mark_fail(self.proxy_info)
self.proxy_info = None
# 增加重试计数
main_retry_count += 1
time.sleep(2)
# 达到最大重试次数
if main_retry_count >= retries:
self._log_error("达到最大重试次数")
finally:
# 确保资源被释放
if self.session:
try:
self.session.close()
except:
pass
if self.proxy_info:
self.proxy_pool.mark_fail(self.proxy_info)
self.proxy_info = None