From 2219e9c1d1f3f30a8e58844b8f46afb1f4f2e677 Mon Sep 17 00:00:00 2001 From: cheng zhen Date: Sun, 2 Feb 2025 11:58:37 +0800 Subject: [PATCH] =?UTF-8?q?fix:=20=E4=BF=AE=E5=A4=8Dbug?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .gitignore | 2 + browser_utils.py | 18 ++-- cursor_pro_keep_alive.py | 182 +++++++++++++++++++++++++++++++-------- 3 files changed, 160 insertions(+), 42 deletions(-) diff --git a/.gitignore b/.gitignore index c70b215..2b0590d 100644 --- a/.gitignore +++ b/.gitignore @@ -25,3 +25,5 @@ venv/ node_modules/ .env + +screenshots/ \ No newline at end of file diff --git a/browser_utils.py b/browser_utils.py index c001d28..77e333a 100644 --- a/browser_utils.py +++ b/browser_utils.py @@ -11,13 +11,13 @@ class BrowserManager: def __init__(self): self.browser = None - def init_browser(self): + def init_browser(self, user_agent=None): """初始化浏览器""" - co = self._get_browser_options() + co = self._get_browser_options(user_agent) self.browser = Chromium(co) return self.browser - def _get_browser_options(self): + def _get_browser_options(self, user_agent=None): """获取浏览器配置""" co = ChromiumOptions() try: @@ -26,15 +26,19 @@ class BrowserManager: except FileNotFoundError as e: logging.warning(f"警告: {e}") - co.set_pref("credentials_enable_service", False) co.set_argument("--hide-crash-restore-bubble") - proxy = os.getenv('BROWSER_PROXY') + proxy = os.getenv("BROWSER_PROXY") if proxy: co.set_proxy(proxy) - + co.auto_port() - co.headless(os.getenv('BROWSER_HEADLESS', 'True').lower() == 'true') # 生产环境使用无头模式 + if user_agent: + co.set_user_agent(user_agent) + + co.headless( + os.getenv("BROWSER_HEADLESS", "True").lower() == "true" + ) # 生产环境使用无头模式 # Mac 系统特殊处理 if sys.platform == "darwin": diff --git a/cursor_pro_keep_alive.py b/cursor_pro_keep_alive.py index 23b51bd..854296b 100644 --- a/cursor_pro_keep_alive.py +++ b/cursor_pro_keep_alive.py @@ -15,45 +15,129 @@ from browser_utils import BrowserManager from get_email_code import EmailVerificationHandler from logo import print_logo from config import Config +from datetime import datetime -def handle_turnstile(tab): - logging.info("正在检测 Turnstile 验证...") +def save_screenshot(tab, prefix="turnstile"): + """保存截图 + Args: + tab: 浏览器标签页对象 + prefix: 文件名前缀 + Returns: + str: 截图文件路径 + """ try: - while True: - try: - challengeCheck = ( - tab.ele("@id=cf-turnstile", timeout=2) - .child() - .shadow_root.ele("tag:iframe") - .ele("tag:body") - .sr("tag:input") - ) + # 创建 screenshots 目录 + screenshot_dir = "screenshots" + if not os.path.exists(screenshot_dir): + os.makedirs(screenshot_dir) - if challengeCheck: - logging.info("检测到 Turnstile 验证,正在处理...") - time.sleep(random.uniform(1, 3)) - challengeCheck.click() - time.sleep(2) - logging.info("Turnstile 验证通过") - return True - except: - pass + # 生成带时间戳的文件名 + timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") + filename = f"{prefix}_{timestamp}.png" + filepath = os.path.join(screenshot_dir, filename) - if tab.ele("@name=password"): - logging.info("验证成功 - 已到达密码输入页面") - break - if tab.ele("@data-index=0"): - logging.info("验证成功 - 已到达验证码输入页面") - break - if tab.ele("Account Settings"): - logging.info("验证成功 - 已到达账户设置页面") - break - - time.sleep(random.uniform(1, 2)) + # 使用 get_screenshot 方法保存截图 + tab.get_screenshot(filepath) + logging.info(f"已保存截图: {filepath}") + return filepath except Exception as e: - logging.error(f"Turnstile 验证失败: {str(e)}") - return False + logging.error(f"截图保存失败: {str(e)}") + return None + + +def handle_turnstile(tab, max_wait_time=60, retry_attempts=3): + """ + 处理 Turnstile 人机验证 + + Args: + tab: 浏览器标签页对象 + max_wait_time: 最大等待时间(秒) + retry_attempts: 验证失败后的重试次数 + + Returns: + bool: 验证是否成功 + """ + logging.info("正在检测 Turnstile 验证...") + start_time = time.time() + + success_selectors = { + "password": "@name=password", + "verification": "@data-index=0", + "settings": "Account Settings", + } + + while time.time() - start_time < max_wait_time: + try: + # 检查是否已经通过验证 + for name, selector in success_selectors.items(): + if tab.ele(selector, timeout=1): + logging.info(f"验证成功 - 已到达{name}页面") + break + + # 检查并处理 Turnstile 验证 + turnstile = tab.ele("@id=cf-turnstile", timeout=1) + if turnstile: + for attempt in range(retry_attempts): + try: + challengeCheck = ( + turnstile.child() + .shadow_root.ele("tag:iframe") + .ele("tag:body") + .sr("tag:input") + ) + + if challengeCheck: + logging.info( + f"检测到 Turnstile 验证,正在处理... (尝试 {attempt + 1}/{retry_attempts})" + ) + time.sleep(random.uniform(1, 2)) + challengeCheck.click() + time.sleep(2) + + # 保存验证过程的截图 + save_screenshot(tab, f"turnstile_attempt_{attempt + 1}") + + # 检查验证失败提示 + error_text = ( + "Can't verify the user is human. Please try again." + ) + + # 检查验证失败的标志,使用更精确的选择器 + error_selectors = [ + "@data-accent-color=red", # 红色提示div + f"//div[contains(@class, 'rt-Text') and contains(text(), '{error_text}')]", # 包含特定类和文本的div + f"//div[@data-accent-color='red' and contains(text(), '{error_text}')]", # 最精确的选择器 + ] + + is_failed = any( + tab.ele(selector, timeout=2) + for selector in error_selectors + ) + + if not is_failed: + logging.info("人机验证成功") + save_screenshot(tab, "turnstile_success") + return True + + logging.warning( + f"验证失败,尝试重试 ({attempt + 1}/{retry_attempts})" + ) + # 保存失败的截图 + save_screenshot(tab, f"turnstile_fail_{attempt + 1}") + + except Exception as e: + logging.debug(f"处理验证时发生异常: {str(e)}") + continue + + time.sleep(1) + + except Exception as e: + logging.debug(f"验证过程发生异常: {str(e)}") + time.sleep(1) + + logging.error(f"Turnstile 验证超时,已等待 {max_wait_time} 秒") + return False def get_cursor_session_token(tab, max_attempts=3, retry_interval=2): @@ -246,15 +330,42 @@ class EmailGenerator: } +def get_user_agent(): + """获取user_agent""" + try: + # 使用JavaScript获取user agent + browser_manager = BrowserManager() + browser = browser_manager.init_browser() + user_agent = browser.latest_tab.run_js("return navigator.userAgent") + browser_manager.quit() + return user_agent + except Exception as e: + logging.error(f"获取user agent失败: {str(e)}") + return None + + if __name__ == "__main__": print_logo() browser_manager = None try: logging.info("\n=== 初始化程序 ===") - ExitCursor() + # ExitCursor() logging.info("正在初始化浏览器...") + + # 获取user_agent + user_agent = get_user_agent() + if not user_agent: + logging.error("获取user agent失败,使用默认值") + user_agent = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36" + + # 剔除user_agent中的"HeadlessChrome" + user_agent = user_agent.replace("HeadlessChrome", "Chrome") + browser_manager = BrowserManager() - browser = browser_manager.init_browser() + browser = browser_manager.init_browser(user_agent) + + # 获取并打印浏览器的user-agent + user_agent = browser.latest_tab.run_js("return navigator.userAgent") logging.info("正在初始化邮箱验证模块...") email_handler = EmailVerificationHandler() @@ -276,6 +387,7 @@ if __name__ == "__main__": auto_update_cursor_auth = True tab = browser.latest_tab + tab.run_js("try { turnstile.reset() } catch(e) { }") logging.info("\n=== 开始注册流程 ===")