refactor: Enhance Turnstile verification with robust error handling and status tracking
This commit is contained in:
@@ -3,6 +3,8 @@ import platform
|
|||||||
import json
|
import json
|
||||||
import sys
|
import sys
|
||||||
from colorama import Fore, Style
|
from colorama import Fore, Style
|
||||||
|
from enum import Enum
|
||||||
|
from typing import Optional
|
||||||
|
|
||||||
from exit_cursor import ExitCursor
|
from exit_cursor import ExitCursor
|
||||||
import patch_cursor_get_machine_id
|
import patch_cursor_get_machine_id
|
||||||
@@ -26,13 +28,28 @@ from datetime import datetime
|
|||||||
EMOJI = {"ERROR": "❌", "WARNING": "⚠️", "INFO": "ℹ️"}
|
EMOJI = {"ERROR": "❌", "WARNING": "⚠️", "INFO": "ℹ️"}
|
||||||
|
|
||||||
|
|
||||||
def save_screenshot(tab, prefix="turnstile"):
|
class VerificationStatus(Enum):
|
||||||
"""保存截图
|
"""验证状态枚举"""
|
||||||
|
|
||||||
|
PASSWORD_PAGE = "@name=password"
|
||||||
|
CAPTCHA_PAGE = "@data-index=0"
|
||||||
|
ACCOUNT_SETTINGS = "Account Settings"
|
||||||
|
|
||||||
|
|
||||||
|
class TurnstileError(Exception):
|
||||||
|
"""Turnstile 验证相关异常"""
|
||||||
|
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
def save_screenshot(tab, stage: str, timestamp: bool = True) -> None:
|
||||||
|
"""
|
||||||
|
保存页面截图
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
tab: 浏览器标签页对象
|
tab: 浏览器标签页对象
|
||||||
prefix: 文件名前缀
|
stage: 截图阶段标识
|
||||||
Returns:
|
timestamp: 是否添加时间戳
|
||||||
str: 截图文件路径
|
|
||||||
"""
|
"""
|
||||||
try:
|
try:
|
||||||
# 创建 screenshots 目录
|
# 创建 screenshots 目录
|
||||||
@@ -40,27 +57,63 @@ def save_screenshot(tab, prefix="turnstile"):
|
|||||||
if not os.path.exists(screenshot_dir):
|
if not os.path.exists(screenshot_dir):
|
||||||
os.makedirs(screenshot_dir)
|
os.makedirs(screenshot_dir)
|
||||||
|
|
||||||
# 生成带时间戳的文件名
|
# 生成文件名
|
||||||
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
|
if timestamp:
|
||||||
filename = f"{prefix}_{timestamp}.png"
|
filename = f"turnstile_{stage}_{int(time.time())}.png"
|
||||||
|
else:
|
||||||
|
filename = f"turnstile_{stage}.png"
|
||||||
|
|
||||||
filepath = os.path.join(screenshot_dir, filename)
|
filepath = os.path.join(screenshot_dir, filename)
|
||||||
|
|
||||||
# 使用 get_screenshot 方法保存截图
|
# 保存截图
|
||||||
tab.get_screenshot(filepath)
|
tab.get_screenshot(filepath)
|
||||||
logging.info(f"已保存截图: {filepath}")
|
logging.debug(f"截图已保存: {filepath}")
|
||||||
return filepath
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logging.error(f"截图保存失败: {str(e)}")
|
logging.warning(f"截图保存失败: {str(e)}")
|
||||||
|
|
||||||
|
|
||||||
|
def check_verification_success(tab) -> Optional[VerificationStatus]:
|
||||||
|
"""
|
||||||
|
检查验证是否成功
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
VerificationStatus: 验证成功时返回对应状态,失败返回 None
|
||||||
|
"""
|
||||||
|
for status in VerificationStatus:
|
||||||
|
if tab.ele(status.value):
|
||||||
|
logging.info(f"验证成功 - 已到达{status.name}页面")
|
||||||
|
return status
|
||||||
return None
|
return None
|
||||||
|
|
||||||
|
|
||||||
def handle_turnstile(tab):
|
def handle_turnstile(tab, max_retries: int = 2, retry_interval: tuple = (1, 2)) -> bool:
|
||||||
|
"""
|
||||||
|
处理 Turnstile 验证
|
||||||
|
|
||||||
|
Args:
|
||||||
|
tab: 浏览器标签页对象
|
||||||
|
max_retries: 最大重试次数
|
||||||
|
retry_interval: 重试间隔时间范围(最小值, 最大值)
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
bool: 验证是否成功
|
||||||
|
|
||||||
|
Raises:
|
||||||
|
TurnstileError: 验证过程中出现异常
|
||||||
|
"""
|
||||||
logging.info("正在检测 Turnstile 验证...")
|
logging.info("正在检测 Turnstile 验证...")
|
||||||
save_screenshot(tab, "turnstile")
|
save_screenshot(tab, "start")
|
||||||
|
|
||||||
|
retry_count = 0
|
||||||
|
|
||||||
try:
|
try:
|
||||||
while True:
|
while retry_count < max_retries:
|
||||||
|
retry_count += 1
|
||||||
|
logging.debug(f"第 {retry_count} 次尝试验证")
|
||||||
|
|
||||||
try:
|
try:
|
||||||
challengeCheck = (
|
# 定位验证框元素
|
||||||
|
challenge_check = (
|
||||||
tab.ele("@id=cf-turnstile", timeout=2)
|
tab.ele("@id=cf-turnstile", timeout=2)
|
||||||
.child()
|
.child()
|
||||||
.shadow_root.ele("tag:iframe")
|
.shadow_root.ele("tag:iframe")
|
||||||
@@ -68,31 +121,43 @@ def handle_turnstile(tab):
|
|||||||
.sr("tag:input")
|
.sr("tag:input")
|
||||||
)
|
)
|
||||||
|
|
||||||
if challengeCheck:
|
if challenge_check:
|
||||||
logging.info("检测到 Turnstile 验证,正在处理...")
|
logging.info("检测到 Turnstile 验证框,开始处理...")
|
||||||
|
# 随机延时后点击验证
|
||||||
time.sleep(random.uniform(1, 3))
|
time.sleep(random.uniform(1, 3))
|
||||||
challengeCheck.click()
|
challenge_check.click()
|
||||||
time.sleep(2)
|
time.sleep(2)
|
||||||
logging.info("Turnstile 验证通过")
|
|
||||||
save_screenshot(tab, "turnstile_pass")
|
|
||||||
return True
|
|
||||||
except:
|
|
||||||
pass
|
|
||||||
|
|
||||||
if tab.ele("@name=password"):
|
# 保存验证后的截图
|
||||||
logging.info("验证成功 - 已到达密码输入页面")
|
save_screenshot(tab, "clicked")
|
||||||
break
|
|
||||||
if tab.ele("@data-index=0"):
|
# 检查验证结果
|
||||||
logging.info("验证成功 - 已到达验证码输入页面")
|
if check_verification_success(tab):
|
||||||
break
|
logging.info("Turnstile 验证通过")
|
||||||
if tab.ele("Account Settings"):
|
save_screenshot(tab, "success")
|
||||||
logging.info("验证成功 - 已到达账户设置页面")
|
return True
|
||||||
break
|
|
||||||
time.sleep(random.uniform(1, 2))
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logging.error(f"Turnstile 验证失败: {str(e)}")
|
logging.debug(f"当前尝试未成功: {str(e)}")
|
||||||
|
|
||||||
|
# 检查是否已经验证成功
|
||||||
|
if check_verification_success(tab):
|
||||||
|
return True
|
||||||
|
|
||||||
|
# 随机延时后继续下一次尝试
|
||||||
|
time.sleep(random.uniform(*retry_interval))
|
||||||
|
|
||||||
|
# 超出最大重试次数
|
||||||
|
logging.error(f"验证失败 - 已达到最大重试次数 {max_retries}")
|
||||||
|
save_screenshot(tab, "failed")
|
||||||
return False
|
return False
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
error_msg = f"Turnstile 验证过程发生异常: {str(e)}"
|
||||||
|
logging.error(error_msg)
|
||||||
|
save_screenshot(tab, "error")
|
||||||
|
raise TurnstileError(error_msg)
|
||||||
|
|
||||||
|
|
||||||
def get_cursor_session_token(tab, max_attempts=3, retry_interval=2):
|
def get_cursor_session_token(tab, max_attempts=3, retry_interval=2):
|
||||||
"""
|
"""
|
||||||
@@ -320,6 +385,8 @@ if __name__ == "__main__":
|
|||||||
browser_manager = None
|
browser_manager = None
|
||||||
try:
|
try:
|
||||||
logging.info("\n=== 初始化程序 ===")
|
logging.info("\n=== 初始化程序 ===")
|
||||||
|
# ExitCursor()
|
||||||
|
|
||||||
# 提示用户选择操作模式
|
# 提示用户选择操作模式
|
||||||
print("\n请选择操作模式:")
|
print("\n请选择操作模式:")
|
||||||
print("1. 仅重置机器码")
|
print("1. 仅重置机器码")
|
||||||
@@ -341,9 +408,6 @@ if __name__ == "__main__":
|
|||||||
logging.info("机器码重置完成")
|
logging.info("机器码重置完成")
|
||||||
sys.exit(0)
|
sys.exit(0)
|
||||||
|
|
||||||
# 小于0.45的版本需要打补丁
|
|
||||||
if not greater_than_0_45:
|
|
||||||
ExitCursor()
|
|
||||||
logging.info("正在初始化浏览器...")
|
logging.info("正在初始化浏览器...")
|
||||||
|
|
||||||
# 获取user_agent
|
# 获取user_agent
|
||||||
|
|||||||
Reference in New Issue
Block a user