fix: 修复bug
This commit is contained in:
2
.gitignore
vendored
2
.gitignore
vendored
@@ -25,3 +25,5 @@ venv/
|
||||
node_modules/
|
||||
|
||||
.env
|
||||
|
||||
screenshots/
|
||||
@@ -11,13 +11,13 @@ class BrowserManager:
|
||||
def __init__(self):
|
||||
self.browser = None
|
||||
|
||||
def init_browser(self):
|
||||
def init_browser(self, user_agent=None):
|
||||
"""初始化浏览器"""
|
||||
co = self._get_browser_options()
|
||||
co = self._get_browser_options(user_agent)
|
||||
self.browser = Chromium(co)
|
||||
return self.browser
|
||||
|
||||
def _get_browser_options(self):
|
||||
def _get_browser_options(self, user_agent=None):
|
||||
"""获取浏览器配置"""
|
||||
co = ChromiumOptions()
|
||||
try:
|
||||
@@ -26,15 +26,19 @@ class BrowserManager:
|
||||
except FileNotFoundError as e:
|
||||
logging.warning(f"警告: {e}")
|
||||
|
||||
|
||||
co.set_pref("credentials_enable_service", False)
|
||||
co.set_argument("--hide-crash-restore-bubble")
|
||||
proxy = os.getenv('BROWSER_PROXY')
|
||||
proxy = os.getenv("BROWSER_PROXY")
|
||||
if proxy:
|
||||
co.set_proxy(proxy)
|
||||
|
||||
|
||||
co.auto_port()
|
||||
co.headless(os.getenv('BROWSER_HEADLESS', 'True').lower() == 'true') # 生产环境使用无头模式
|
||||
if user_agent:
|
||||
co.set_user_agent(user_agent)
|
||||
|
||||
co.headless(
|
||||
os.getenv("BROWSER_HEADLESS", "True").lower() == "true"
|
||||
) # 生产环境使用无头模式
|
||||
|
||||
# Mac 系统特殊处理
|
||||
if sys.platform == "darwin":
|
||||
|
||||
@@ -15,45 +15,129 @@ from browser_utils import BrowserManager
|
||||
from get_email_code import EmailVerificationHandler
|
||||
from logo import print_logo
|
||||
from config import Config
|
||||
from datetime import datetime
|
||||
|
||||
|
||||
def handle_turnstile(tab):
|
||||
logging.info("正在检测 Turnstile 验证...")
|
||||
def save_screenshot(tab, prefix="turnstile"):
|
||||
"""保存截图
|
||||
Args:
|
||||
tab: 浏览器标签页对象
|
||||
prefix: 文件名前缀
|
||||
Returns:
|
||||
str: 截图文件路径
|
||||
"""
|
||||
try:
|
||||
while True:
|
||||
try:
|
||||
challengeCheck = (
|
||||
tab.ele("@id=cf-turnstile", timeout=2)
|
||||
.child()
|
||||
.shadow_root.ele("tag:iframe")
|
||||
.ele("tag:body")
|
||||
.sr("tag:input")
|
||||
)
|
||||
# 创建 screenshots 目录
|
||||
screenshot_dir = "screenshots"
|
||||
if not os.path.exists(screenshot_dir):
|
||||
os.makedirs(screenshot_dir)
|
||||
|
||||
if challengeCheck:
|
||||
logging.info("检测到 Turnstile 验证,正在处理...")
|
||||
time.sleep(random.uniform(1, 3))
|
||||
challengeCheck.click()
|
||||
time.sleep(2)
|
||||
logging.info("Turnstile 验证通过")
|
||||
return True
|
||||
except:
|
||||
pass
|
||||
# 生成带时间戳的文件名
|
||||
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
|
||||
filename = f"{prefix}_{timestamp}.png"
|
||||
filepath = os.path.join(screenshot_dir, filename)
|
||||
|
||||
if tab.ele("@name=password"):
|
||||
logging.info("验证成功 - 已到达密码输入页面")
|
||||
break
|
||||
if tab.ele("@data-index=0"):
|
||||
logging.info("验证成功 - 已到达验证码输入页面")
|
||||
break
|
||||
if tab.ele("Account Settings"):
|
||||
logging.info("验证成功 - 已到达账户设置页面")
|
||||
break
|
||||
|
||||
time.sleep(random.uniform(1, 2))
|
||||
# 使用 get_screenshot 方法保存截图
|
||||
tab.get_screenshot(filepath)
|
||||
logging.info(f"已保存截图: {filepath}")
|
||||
return filepath
|
||||
except Exception as e:
|
||||
logging.error(f"Turnstile 验证失败: {str(e)}")
|
||||
return False
|
||||
logging.error(f"截图保存失败: {str(e)}")
|
||||
return None
|
||||
|
||||
|
||||
def handle_turnstile(tab, max_wait_time=60, retry_attempts=3):
|
||||
"""
|
||||
处理 Turnstile 人机验证
|
||||
|
||||
Args:
|
||||
tab: 浏览器标签页对象
|
||||
max_wait_time: 最大等待时间(秒)
|
||||
retry_attempts: 验证失败后的重试次数
|
||||
|
||||
Returns:
|
||||
bool: 验证是否成功
|
||||
"""
|
||||
logging.info("正在检测 Turnstile 验证...")
|
||||
start_time = time.time()
|
||||
|
||||
success_selectors = {
|
||||
"password": "@name=password",
|
||||
"verification": "@data-index=0",
|
||||
"settings": "Account Settings",
|
||||
}
|
||||
|
||||
while time.time() - start_time < max_wait_time:
|
||||
try:
|
||||
# 检查是否已经通过验证
|
||||
for name, selector in success_selectors.items():
|
||||
if tab.ele(selector, timeout=1):
|
||||
logging.info(f"验证成功 - 已到达{name}页面")
|
||||
break
|
||||
|
||||
# 检查并处理 Turnstile 验证
|
||||
turnstile = tab.ele("@id=cf-turnstile", timeout=1)
|
||||
if turnstile:
|
||||
for attempt in range(retry_attempts):
|
||||
try:
|
||||
challengeCheck = (
|
||||
turnstile.child()
|
||||
.shadow_root.ele("tag:iframe")
|
||||
.ele("tag:body")
|
||||
.sr("tag:input")
|
||||
)
|
||||
|
||||
if challengeCheck:
|
||||
logging.info(
|
||||
f"检测到 Turnstile 验证,正在处理... (尝试 {attempt + 1}/{retry_attempts})"
|
||||
)
|
||||
time.sleep(random.uniform(1, 2))
|
||||
challengeCheck.click()
|
||||
time.sleep(2)
|
||||
|
||||
# 保存验证过程的截图
|
||||
save_screenshot(tab, f"turnstile_attempt_{attempt + 1}")
|
||||
|
||||
# 检查验证失败提示
|
||||
error_text = (
|
||||
"Can't verify the user is human. Please try again."
|
||||
)
|
||||
|
||||
# 检查验证失败的标志,使用更精确的选择器
|
||||
error_selectors = [
|
||||
"@data-accent-color=red", # 红色提示div
|
||||
f"//div[contains(@class, 'rt-Text') and contains(text(), '{error_text}')]", # 包含特定类和文本的div
|
||||
f"//div[@data-accent-color='red' and contains(text(), '{error_text}')]", # 最精确的选择器
|
||||
]
|
||||
|
||||
is_failed = any(
|
||||
tab.ele(selector, timeout=2)
|
||||
for selector in error_selectors
|
||||
)
|
||||
|
||||
if not is_failed:
|
||||
logging.info("人机验证成功")
|
||||
save_screenshot(tab, "turnstile_success")
|
||||
return True
|
||||
|
||||
logging.warning(
|
||||
f"验证失败,尝试重试 ({attempt + 1}/{retry_attempts})"
|
||||
)
|
||||
# 保存失败的截图
|
||||
save_screenshot(tab, f"turnstile_fail_{attempt + 1}")
|
||||
|
||||
except Exception as e:
|
||||
logging.debug(f"处理验证时发生异常: {str(e)}")
|
||||
continue
|
||||
|
||||
time.sleep(1)
|
||||
|
||||
except Exception as e:
|
||||
logging.debug(f"验证过程发生异常: {str(e)}")
|
||||
time.sleep(1)
|
||||
|
||||
logging.error(f"Turnstile 验证超时,已等待 {max_wait_time} 秒")
|
||||
return False
|
||||
|
||||
|
||||
def get_cursor_session_token(tab, max_attempts=3, retry_interval=2):
|
||||
@@ -246,15 +330,42 @@ class EmailGenerator:
|
||||
}
|
||||
|
||||
|
||||
def get_user_agent():
|
||||
"""获取user_agent"""
|
||||
try:
|
||||
# 使用JavaScript获取user agent
|
||||
browser_manager = BrowserManager()
|
||||
browser = browser_manager.init_browser()
|
||||
user_agent = browser.latest_tab.run_js("return navigator.userAgent")
|
||||
browser_manager.quit()
|
||||
return user_agent
|
||||
except Exception as e:
|
||||
logging.error(f"获取user agent失败: {str(e)}")
|
||||
return None
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
print_logo()
|
||||
browser_manager = None
|
||||
try:
|
||||
logging.info("\n=== 初始化程序 ===")
|
||||
ExitCursor()
|
||||
# ExitCursor()
|
||||
logging.info("正在初始化浏览器...")
|
||||
|
||||
# 获取user_agent
|
||||
user_agent = get_user_agent()
|
||||
if not user_agent:
|
||||
logging.error("获取user agent失败,使用默认值")
|
||||
user_agent = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36"
|
||||
|
||||
# 剔除user_agent中的"HeadlessChrome"
|
||||
user_agent = user_agent.replace("HeadlessChrome", "Chrome")
|
||||
|
||||
browser_manager = BrowserManager()
|
||||
browser = browser_manager.init_browser()
|
||||
browser = browser_manager.init_browser(user_agent)
|
||||
|
||||
# 获取并打印浏览器的user-agent
|
||||
user_agent = browser.latest_tab.run_js("return navigator.userAgent")
|
||||
|
||||
logging.info("正在初始化邮箱验证模块...")
|
||||
email_handler = EmailVerificationHandler()
|
||||
@@ -276,6 +387,7 @@ if __name__ == "__main__":
|
||||
auto_update_cursor_auth = True
|
||||
|
||||
tab = browser.latest_tab
|
||||
|
||||
tab.run_js("try { turnstile.reset() } catch(e) { }")
|
||||
|
||||
logging.info("\n=== 开始注册流程 ===")
|
||||
|
||||
Reference in New Issue
Block a user