diff --git a/build_mac_new.py b/build_mac_new.py index 04060c8..ba97bb8 100644 --- a/build_mac_new.py +++ b/build_mac_new.py @@ -25,7 +25,7 @@ os.makedirs(logs_dir, exist_ok=True) # 打包参数 params = [ 'gui/main_mac.py', # 主程序入口 - '--name=CursorPro', # 应用名称 + '--name=听泉助手', # 应用名称 '-w', # 不显示控制台窗口 '--clean', # 清理临时文件 '--noconfirm', # 不确认覆盖 diff --git a/config.py b/config.py index 69944ac..384b498 100644 --- a/config.py +++ b/config.py @@ -1,146 +1,95 @@ -from dotenv import load_dotenv import os -import sys -from logger import logging - +import json +import logging +from pathlib import Path class Config: + """配置类""" + def __init__(self): - # 获取应用程序的根目录路径 - if getattr(sys, "frozen", False): - # 如果是打包后的可执行文件 - application_path = os.path.dirname(sys.executable) - else: - # 如果是开发环境 - application_path = os.path.dirname(os.path.abspath(__file__)) - - # 指定 .env 文件的路径 - dotenv_path = os.path.join(application_path, ".env") - - if not os.path.exists(dotenv_path): - raise FileNotFoundError(f"文件 {dotenv_path} 不存在") - - # 加载 .env 文件 - load_dotenv(dotenv_path) - - self.imap = False - self.temp_mail = os.getenv("TEMP_MAIL", "").strip().split("@")[0] - self.temp_mail_epin = os.getenv("TEMP_MAIL_EPIN", "").strip() - self.temp_mail_ext = os.getenv("TEMP_MAIL_EXT", "").strip() - self.domain = os.getenv("DOMAIN", "").strip() - - # 如果临时邮箱为null则加载IMAP - if self.temp_mail == "null": - self.imap = True - self.imap_server = os.getenv("IMAP_SERVER", "").strip() - self.imap_port = os.getenv("IMAP_PORT", "").strip() - self.imap_user = os.getenv("IMAP_USER", "").strip() - self.imap_pass = os.getenv("IMAP_PASS", "").strip() - self.imap_dir = os.getenv("IMAP_DIR", "inbox").strip() - - self.check_config() - - def get_temp_mail(self): - - return self.temp_mail - - def get_temp_mail_epin(self): - - return self.temp_mail_epin - - def get_temp_mail_ext(self): - - return self.temp_mail_ext - - def get_imap(self): - if not self.imap: - return False + self.base_url = "https://cursorapi.nosqli.com" + self.api_endpoints = { + "activate": f"{self.base_url}/admin/api.member/activate", + "status": f"{self.base_url}/admin/api.member/status", + "get_unused": f"{self.base_url}/admin/api.account/getUnused", + "heartbeat": f"{self.base_url}/admin/api.account/heartbeat" + } + + # macOS配置目录 + self.config_dir = Path(os.path.expanduser("~")) / ".cursor_pro" + self.config_file = self.config_dir / "config.json" + self.member_file = self.config_dir / "member.json" + self.load_config() + + def load_config(self): + """加载配置""" + try: + self.config_dir.mkdir(parents=True, exist_ok=True) + + if not self.config_file.exists(): + self.save_default_config() + + with open(self.config_file, "r", encoding="utf-8") as f: + config = json.load(f) + self.api_token = config.get("api_token", "") + + except Exception as e: + logging.error(f"加载配置失败: {str(e)}") + self.api_token = "" + + def save_member_info(self, info: dict): + """保存会员信息""" + try: + with open(self.member_file, "w", encoding="utf-8") as f: + json.dump(info, f, indent=2, ensure_ascii=False) + logging.info("会员信息已保存") + except Exception as e: + logging.error(f"保存会员信息失败: {str(e)}") + + def load_member_info(self) -> dict: + """读取会员信息""" + try: + if self.member_file.exists(): + with open(self.member_file, "r", encoding="utf-8") as f: + info = json.load(f) + logging.info(f"已读取会员信息: 到期时间 {info.get('expire_time', '')}") + return info + except Exception as e: + logging.error(f"读取会员信息失败: {str(e)}") return { - "imap_server": self.imap_server, - "imap_port": self.imap_port, - "imap_user": self.imap_user, - "imap_pass": self.imap_pass, - "imap_dir": self.imap_dir, + "expire_time": "", + "days": 0, + "new_days": 0 } - - def get_domain(self): - return self.domain - - def check_config(self): - """检查配置项是否有效 - - 检查规则: - 1. 如果使用 tempmail.plus,需要配置 TEMP_MAIL 和 DOMAIN - 2. 如果使用 IMAP,需要配置 IMAP_SERVER、IMAP_PORT、IMAP_USER、IMAP_PASS - 3. IMAP_DIR 是可选的 - """ - # 基础配置检查 - required_configs = { - "domain": "域名", + + def save_default_config(self): + """保存默认配置""" + config = { + "api_token": "" } + with open(self.config_file, "w", encoding="utf-8") as f: + json.dump(config, f, indent=2, ensure_ascii=False) + + def save_config(self, api_token: str): + """保存新的配置""" + config = { + "api_token": api_token + } + with open(self.config_file, "w", encoding="utf-8") as f: + json.dump(config, f, indent=2, ensure_ascii=False) + self.api_token = api_token + logging.info("配置已更新") - # 检查基础配置 - for key, name in required_configs.items(): - if not self.check_is_valid(getattr(self, key)): - raise ValueError(f"{name}未配置,请在 .env 文件中设置 {key.upper()}") - - # 检查邮箱配置 - if self.temp_mail != "null": - # tempmail.plus 模式 - if not self.check_is_valid(self.temp_mail): - raise ValueError("临时邮箱未配置,请在 .env 文件中设置 TEMP_MAIL") - else: - # IMAP 模式 - imap_configs = { - "imap_server": "IMAP服务器", - "imap_port": "IMAP端口", - "imap_user": "IMAP用户名", - "imap_pass": "IMAP密码", - } - - for key, name in imap_configs.items(): - value = getattr(self, key) - if value == "null" or not self.check_is_valid(value): - raise ValueError( - f"{name}未配置,请在 .env 文件中设置 {key.upper()}" - ) - - # IMAP_DIR 是可选的,如果设置了就检查其有效性 - if self.imap_dir != "null" and not self.check_is_valid(self.imap_dir): - raise ValueError( - "IMAP收件箱目录配置无效,请在 .env 文件中正确设置 IMAP_DIR" - ) - - def check_is_valid(self, value): - """检查配置项是否有效 - + def get_api_url(self, endpoint_name: str) -> str: + """获取API端点URL + Args: - value: 配置项的值 - + endpoint_name: 端点名称 + Returns: - bool: 配置项是否有效 + str: 完整的API URL """ - return isinstance(value, str) and len(str(value).strip()) > 0 - - def print_config(self): - if self.imap: - logging.info(f"\033[32mIMAP服务器: {self.imap_server}\033[0m") - logging.info(f"\033[32mIMAP端口: {self.imap_port}\033[0m") - logging.info(f"\033[32mIMAP用户名: {self.imap_user}\033[0m") - logging.info(f"\033[32mIMAP密码: {'*' * len(self.imap_pass)}\033[0m") - logging.info(f"\033[32mIMAP收件箱目录: {self.imap_dir}\033[0m") - if self.temp_mail != "null": - logging.info( - f"\033[32m临时邮箱: {self.temp_mail}{self.temp_mail_ext}\033[0m" - ) - logging.info(f"\033[32m域名: {self.domain}\033[0m") - - -# 使用示例 -if __name__ == "__main__": - try: - config = Config() - print("环境变量加载成功!") - config.print_config() - except ValueError as e: - print(f"错误: {e}") + url = self.api_endpoints.get(endpoint_name, "") + if not url: + logging.error(f"未找到API端点: {endpoint_name}") + return url \ No newline at end of file diff --git a/gui/main_mac.py b/gui/main_mac.py index 8f05f8b..1e0f7ff 100644 --- a/gui/main_mac.py +++ b/gui/main_mac.py @@ -114,7 +114,7 @@ class MainWindow(QMainWindow): def init_ui(self): # 设置窗口基本属性 - self.setWindowTitle('Cursor Pro') + self.setWindowTitle('听泉助手') self.setMinimumSize(600, 500) # 设置macOS风格的样式 @@ -183,7 +183,7 @@ class MainWindow(QMainWindow): layout.setContentsMargins(20, 20, 20, 20) # 标题 - title_label = QLabel("Cursor Pro") + title_label = QLabel("听泉助手") title_label.setProperty("title", "true") title_label.setAlignment(Qt.AlignCenter) layout.addWidget(title_label) @@ -306,16 +306,51 @@ class MainWindow(QMainWindow): QMessageBox.critical(self, "错误", f"重置失败: {str(e)}") def activate_license(self): - license_key = self.activate_input.text().strip() - if not license_key: - QMessageBox.warning(self, "提示", "请输入激活码") - return - + """激活许可证""" try: - # 这里添加激活码验证逻辑 - QMessageBox.information(self, "成功", "激活成功") + license_key = self.activate_input.text().strip() + if not license_key: + QMessageBox.warning(self, "提示", "请输入激活码") + return + + # 禁用激活按钮,防止重复点击 + self.activate_input.setEnabled(False) + self.update_button.setEnabled(False) + + # 显示处理中的提示 + QApplication.processEvents() + + # 调用激活接口 + success, message, account_info = self.updater.check_activation_code(license_key) + + if success: + # 更新界面显示 + self.member_info.clear() + self.member_info.append(f"会员状态: 已激活") + self.member_info.append(f"到期时间: {account_info['expire_time']}") + self.member_info.append(f"剩余天数: {account_info['days_left']}天") + + # 显示成功消息 + QMessageBox.information(self, "激活成功", + f"设备已成功激活!\n" + f"到期时间: {account_info['expire_time']}\n" + f"剩余天数: {account_info['days_left']}天") + + # 清空激活码输入框 + self.activate_input.clear() + + logging.info(f"设备激活成功,到期时间: {account_info['expire_time']}") + else: + QMessageBox.warning(self, "激活失败", message) + logging.error(f"激活失败: {message}") + except Exception as e: - QMessageBox.critical(self, "错误", f"激活失败: {str(e)}") + logging.error(f"激活过程中发生错误: {str(e)}") + QMessageBox.critical(self, "错误", f"激活过程发生错误: {str(e)}") + finally: + # 恢复按钮状态 + self.activate_input.setEnabled(True) + self.update_button.setEnabled(True) def disable_cursor_update(self): try: @@ -331,8 +366,8 @@ def main(): logging.info("应用程序启动") app = QApplication(sys.argv) - app.setApplicationName("CursorPro") - app.setOrganizationName("Cursor") + app.setApplicationName("听泉助手") + app.setOrganizationName("听泉") app.setOrganizationDomain("cursor.pro") try: diff --git a/gui/main_window.py b/gui/main_window.py index 44d51a6..ccac7d6 100644 --- a/gui/main_window.py +++ b/gui/main_window.py @@ -365,13 +365,44 @@ class MainWindow(QMainWindow): QMessageBox.warning(self, "提示", "请输入激活码") return - # TODO: 实现激活逻辑 - logging.info(f"正在处理激活请求,激活码: {activation_code}") - QMessageBox.information(self, "提示", "激活功能即将实现") + # 禁用激活按钮,防止重复点击 + self.activate_input.setEnabled(False) + self.update_button.setEnabled(False) + # 显示处理中的提示 + QApplication.processEvents() + + # 调用激活接口 + success, message, account_info = self.updater.check_activation_code(activation_code) + + if success: + # 更新界面显示 + self.member_info.clear() + self.member_info.append(f"会员状态: 已激活") + self.member_info.append(f"到期时间: {account_info['expire_time']}") + self.member_info.append(f"剩余天数: {account_info['days_left']}天") + + # 显示成功消息 + QMessageBox.information(self, "激活成功", + f"设备已成功激活!\n" + f"到期时间: {account_info['expire_time']}\n" + f"剩余天数: {account_info['days_left']}天") + + # 清空激活码输入框 + self.activate_input.clear() + + logging.info(f"设备激活成功,到期时间: {account_info['expire_time']}") + else: + QMessageBox.warning(self, "激活失败", message) + logging.error(f"激活失败: {message}") + except Exception as e: logging.error(f"激活过程中发生错误: {str(e)}") - QMessageBox.warning(self, "错误", f"激活失败: {str(e)}") + QMessageBox.critical(self, "错误", f"激活过程发生错误: {str(e)}") + finally: + # 恢复按钮状态 + self.activate_input.setEnabled(True) + self.update_button.setEnabled(True) def disable_cursor_update(self): """禁用Cursor版本更新""" diff --git a/patch_cursor_get_machine_id.py b/patch_cursor_get_machine_id.py index 7dd7d1c..97ae081 100644 --- a/patch_cursor_get_machine_id.py +++ b/patch_cursor_get_machine_id.py @@ -40,42 +40,50 @@ def get_cursor_paths() -> Tuple[str, str]: OSError: 当找不到有效路径或系统不支持时抛出 """ system = platform.system() + logger.info(f"当前操作系统: {system}") - paths_map = { - "Darwin": { - "base": "/Applications/Cursor.app/Contents/Resources/app", - "package": "package.json", - "main": "out/main.js", - }, - "Windows": { - "base": os.path.join( - os.getenv("LOCALAPPDATA", ""), "Programs", "Cursor", "resources", "app" - ), - "package": "package.json", - "main": "out/main.js", - }, - "Linux": { - "bases": ["/opt/Cursor/resources/app", "/usr/share/cursor/resources/app"], - "package": "package.json", - "main": "out/main.js", - }, - } - - if system not in paths_map: - raise OSError(f"不支持的操作系统: {system}") - - if system == "Linux": - for base in paths_map["Linux"]["bases"]: - pkg_path = os.path.join(base, paths_map["Linux"]["package"]) - if os.path.exists(pkg_path): - return (pkg_path, os.path.join(base, paths_map["Linux"]["main"])) + if system == "Darwin": # macOS + base_path = "/Applications/Cursor.app/Contents/Resources/app" + pkg_path = os.path.join(base_path, "package.json") + main_path = os.path.join(base_path, "out", "main.js") + + if not os.path.exists(pkg_path) or not os.path.exists(main_path): + raise OSError("在 macOS 系统上未找到 Cursor 安装路径") + + return pkg_path, main_path + + elif system == "Windows": + base_path = os.path.join( + os.getenv("LOCALAPPDATA", ""), + "Programs", + "Cursor", + "resources", + "app" + ) + pkg_path = os.path.join(base_path, "package.json") + main_path = os.path.join(base_path, "out", "main.js") + + if not os.path.exists(pkg_path) or not os.path.exists(main_path): + raise OSError("在 Windows 系统上未找到 Cursor 安装路径") + + return pkg_path, main_path + + elif system == "Linux": + linux_paths = [ + "/opt/Cursor/resources/app", + "/usr/share/cursor/resources/app" + ] + + for base_path in linux_paths: + pkg_path = os.path.join(base_path, "package.json") + main_path = os.path.join(base_path, "out", "main.js") + if os.path.exists(pkg_path) and os.path.exists(main_path): + return pkg_path, main_path + raise OSError("在 Linux 系统上未找到 Cursor 安装路径") - - base_path = paths_map[system]["base"] - return ( - os.path.join(base_path, paths_map[system]["package"]), - os.path.join(base_path, paths_map[system]["main"]), - ) + + else: + raise OSError(f"不支持的操作系统: {system}") def check_system_requirements(pkg_path: str, main_path: str) -> bool: @@ -298,5 +306,24 @@ def patch_cursor_get_machine_id(restore_mode=False) -> None: sys.exit(1) +# 添加patch函数作为主函数的别名 +def patch(restore_mode=False) -> bool: + """ + patch函数,作为patch_cursor_get_machine_id的别名 + + Args: + restore_mode: 是否为恢复模式 + + Returns: + bool: 修补是否成功 + """ + try: + patch_cursor_get_machine_id(restore_mode) + return True + except Exception as e: + logger.error(f"修补失败: {str(e)}") + return False + + if __name__ == "__main__": patch_cursor_get_machine_id() diff --git a/update_cursor_token.py b/update_cursor_token.py index b272f8f..2de43d1 100644 --- a/update_cursor_token.py +++ b/update_cursor_token.py @@ -15,12 +15,14 @@ from exit_cursor import ExitCursor import go_cursor_help from logo import print_logo from typing import Tuple, Dict, Optional +import time +import requests.adapters class CursorTokenUpdater: def __init__(self): self.auth_manager = CursorAuthManager() self._hardware_id = None # 延迟初始化硬件ID - + @property def hardware_id(self) -> str: """获取硬件ID(延迟初始化)""" @@ -30,68 +32,68 @@ class CursorTokenUpdater: def _get_hardware_id(self) -> str: """获取硬件唯一标识 - 方案1: CPU ID + 主板序列号 + BIOS序列号 - 方案2: 系统盘序列号 + Windows安装时间 - 方案3: 计算机名(最后的备选方案) + macOS: 使用系统序列号和硬件UUID + Windows: CPU ID + 主板序列号 + BIOS序列号 + 其他: 计算机名(最后的备选方案) """ try: - # 创建startupinfo对象来隐藏命令行窗口 - startupinfo = None - if sys.platform == "win32": + system = platform.system() + + if system == "Darwin": # macOS + try: + # 获取系统序列号 + serial_number = subprocess.check_output(['system_profiler', 'SPHardwareDataType']).decode() + serial = "" + for line in serial_number.split('\n'): + if 'Serial Number' in line: + serial = line.split(':')[1].strip() + break + + # 获取硬件UUID + ioreg_output = subprocess.check_output(['ioreg', '-d2', '-c', 'IOPlatformExpertDevice']).decode() + uuid = "" + for line in ioreg_output.split('\n'): + if 'IOPlatformUUID' in line: + uuid = line.split('=')[1].strip().replace('"', '').replace(' ', '') + break + + if serial and uuid: + combined = f"{serial}:{uuid}" + hardware_id = hashlib.md5(combined.encode()).hexdigest() + logging.info("使用macOS硬件信息生成ID成功") + return hardware_id + except Exception as e: + logging.warning(f"获取macOS硬件信息失败: {str(e)}") + + elif system == "Windows": + # 创建startupinfo对象来隐藏命令行窗口 startupinfo = subprocess.STARTUPINFO() startupinfo.dwFlags |= subprocess.STARTF_USESHOWWINDOW startupinfo.wShowWindow = subprocess.SW_HIDE - # 方案1: 尝试获取硬件信息 - try: - # 获取CPU ID - cpu_info = subprocess.check_output('wmic cpu get ProcessorId', startupinfo=startupinfo).decode() - cpu_id = cpu_info.split('\n')[1].strip() - - # 获取主板序列号 - board_info = subprocess.check_output('wmic baseboard get SerialNumber', startupinfo=startupinfo).decode() - board_id = board_info.split('\n')[1].strip() - - # 获取BIOS序列号 - bios_info = subprocess.check_output('wmic bios get SerialNumber', startupinfo=startupinfo).decode() - bios_id = bios_info.split('\n')[1].strip() - - # 如果所有信息都获取成功且有效 - if all([cpu_id, board_id, bios_id]) and not all(x in ['', '0', 'None', 'To be filled by O.E.M.'] for x in [cpu_id, board_id, bios_id]): - combined = f"{cpu_id}:{board_id}:{bios_id}" - hardware_id = hashlib.md5(combined.encode()).hexdigest() - logging.info("使用硬件信息生成ID成功") - return hardware_id + try: + # 获取CPU ID + cpu_info = subprocess.check_output('wmic cpu get ProcessorId', startupinfo=startupinfo).decode() + cpu_id = cpu_info.split('\n')[1].strip() - except Exception as e: - logging.warning(f"方案1失败: {str(e)}") - - # 方案2: 系统盘序列号 + Windows安装时间 - try: - backup_info = [] - - # 获取系统盘序列号 - volume_info = subprocess.check_output('wmic logicaldisk where "DeviceID=\'C:\'" get VolumeSerialNumber', startupinfo=startupinfo).decode() - volume_serial = volume_info.split('\n')[1].strip() - if volume_serial and volume_serial not in ['', '0']: - backup_info.append(("volume", volume_serial)) - - # 获取Windows安装时间 - os_info = subprocess.check_output('wmic os get InstallDate', startupinfo=startupinfo).decode() - install_date = os_info.split('\n')[1].strip() - if install_date: - backup_info.append(("install", install_date)) - - if backup_info: - combined = "|".join(f"{k}:{v}" for k, v in sorted(backup_info)) - hardware_id = hashlib.md5(combined.encode()).hexdigest() - logging.info("使用系统信息生成ID成功") - return hardware_id + # 获取主板序列号 + board_info = subprocess.check_output('wmic baseboard get SerialNumber', startupinfo=startupinfo).decode() + board_id = board_info.split('\n')[1].strip() - except Exception as e: - logging.warning(f"方案2失败: {str(e)}") + # 获取BIOS序列号 + bios_info = subprocess.check_output('wmic bios get SerialNumber', startupinfo=startupinfo).decode() + bios_id = bios_info.split('\n')[1].strip() + + # 如果所有信息都获取成功且有效 + if all([cpu_id, board_id, bios_id]) and not all(x in ['', '0', 'None', 'To be filled by O.E.M.'] for x in [cpu_id, board_id, bios_id]): + combined = f"{cpu_id}:{board_id}:{bios_id}" + hardware_id = hashlib.md5(combined.encode()).hexdigest() + logging.info("使用Windows硬件信息生成ID成功") + return hardware_id + except Exception as e: + logging.warning(f"获取Windows硬件信息失败: {str(e)}") - # 方案3: 使用计算机名(最后的备选方案) + # 最后的备选方案:使用计算机名 computer_name = platform.node() if computer_name: hardware_id = hashlib.md5(computer_name.encode()).hexdigest() @@ -227,17 +229,23 @@ class CursorTokenUpdater: """ try: logging.info("开始重置机器码...") - resetter = MachineIDResetter() - result = resetter.reset(greater_than_0_45) - - if result: - logging.info("机器码重置成功") - # 重置后更新硬件ID缓存 - self._hardware_id = None + if greater_than_0_45: + # 对于0.45以上版本,使用go_cursor_help + go_cursor_help.go_cursor_help() + logging.info("已调用go_cursor_help重置机器码") return True else: - logging.error("机器码重置失败") - return False + # 对于0.45及以下版本,使用传统方式 + resetter = MachineIDResetter() + result = resetter.reset_machine_ids() + if result: + logging.info("机器码重置成功") + # 重置后更新硬件ID缓存 + self._hardware_id = None + return True + else: + logging.error("机器码重置失败") + return False except Exception as e: logging.error(f"重置机器码时发生错误: {str(e)}") @@ -268,10 +276,8 @@ class CursorTokenUpdater: """ try: logging.info("正在退出Cursor进程...") - exit_handler = ExitCursor() - exit_handler.exit() - logging.info("Cursor进程已退出") - return True + result = ExitCursor() + return result except Exception as e: logging.error(f"退出Cursor进程时发生错误: {str(e)}") return False @@ -291,34 +297,175 @@ class CursorTokenUpdater: print_logo() logging.info("=== 开始完整更新流程 ===") - # 1. 退出Cursor进程 - if not self.exit_cursor(): - return False - - # 2. 修补机器码获取方法 - if not self.patch_machine_id(): - return False - - # 3. 重置机器码 - if not self.reset_machine_id(greater_than_0_45=True): - return False - - # 4. 如果没有提供认证信息,从API获取 - if not all([email, access_token]): - success, error_msg, account_info = self.get_unused_account() - if not success: - logging.error(f"无法获取账号信息: {error_msg}") + try: + # 1. 退出Cursor进程 + if not self.exit_cursor(): + logging.error("退出Cursor进程失败") return False - email = account_info["email"] - access_token = account_info["access_token"] - refresh_token = account_info["refresh_token"] + + # 2. 修补机器码获取方法 + if not self.patch_machine_id(): + logging.error("修补机器码获取方法失败") + return False + + # 3. 重置机器码 + # 对于0.45以上版本,使用go_cursor_help + try: + go_cursor_help.go_cursor_help() + logging.info("已调用go_cursor_help重置机器码") + except Exception as e: + logging.error(f"使用go_cursor_help重置机器码失败: {str(e)}") + # 如果go_cursor_help失败,尝试使用传统方式 + if not self.reset_machine_id(greater_than_0_45=False): + logging.error("重置机器码失败") + return False + + # 4. 如果没有提供认证信息,从API获取 + if not all([email, access_token]): + success, error_msg, account_info = self.get_unused_account() + if not success: + logging.error(f"无法获取账号信息: {error_msg}") + return False + email = account_info["email"] + access_token = account_info["access_token"] + refresh_token = account_info["refresh_token"] + + # 5. 更新认证信息 + if not self.update_auth_info(email, access_token, refresh_token): + logging.error("更新认证信息失败") + return False + + logging.info("=== 所有操作已完成 ===") + return True - # 5. 更新认证信息 - if not self.update_auth_info(email, access_token, refresh_token): + except Exception as e: + logging.error(f"更新流程发生错误: {str(e)}") return False + + def _get_network_error_message(self, error: Exception) -> str: + """获取网络错误的友好提示信息""" + if isinstance(error, requests.exceptions.ConnectTimeout): + return "连接服务器超时,请检查网络连接" + elif isinstance(error, requests.exceptions.ConnectionError): + return "无法连接到服务器,请检查网络连接" + elif isinstance(error, requests.exceptions.ReadTimeout): + return "读取服务器响应超时,请重试" + else: + return str(error) - logging.info("=== 所有操作已完成 ===") - return True + def get_device_info(self) -> dict: + """获取设备信息""" + return { + "os": platform.system(), + "os_version": platform.version(), + "machine": platform.machine(), + "hostname": platform.node(), + "hardware_id": self.hardware_id + } + + def check_activation_code(self, code: str) -> tuple: + """检查激活码 + + Args: + code: 激活码 + + Returns: + tuple: (成功标志, 消息, 账号信息) + """ + max_retries = 3 # 最大重试次数 + retry_delay = 1 # 重试间隔(秒) + + for attempt in range(max_retries): + try: + data = { + "machine_id": self.hardware_id, + "code": code + } + + # 禁用SSL警告 + urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) + + # 设置请求参数 + request_kwargs = { + "json": data, + "headers": { + "Content-Type": "application/json", + "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/121.0.0.0 Safari/537.36", + "Accept": "*/*", + "Connection": "keep-alive" + }, + "timeout": 10, # 增加超时时间 + "verify": False # 禁用SSL验证 + } + + # 创建session + session = requests.Session() + session.verify = False + + # 设置重试策略 + retry_strategy = urllib3.Retry( + total=3, # 总重试次数 + backoff_factor=0.5, # 重试间隔 + status_forcelist=[500, 502, 503, 504] # 需要重试的HTTP状态码 + ) + adapter = requests.adapters.HTTPAdapter(max_retries=retry_strategy) + session.mount("http://", adapter) + session.mount("https://", adapter) + + try: + # 尝试发送请求 + response = session.post( + "https://cursorapi.nosqli.com/admin/api.member/activate", + **request_kwargs + ) + response.raise_for_status() # 检查HTTP状态码 + + result = response.json() + # 激活成功 + if result["code"] == 200: + api_data = result["data"] + # 构造标准的返回数据结构 + account_info = { + "status": "active", + "expire_time": api_data.get("expire_time", ""), + "total_days": api_data.get("total_days", 0), + "days_left": api_data.get("days_left", 0), + "device_info": self.get_device_info() + } + return True, result["msg"], account_info + # 激活码无效或已被使用 + elif result["code"] == 400: + logging.warning(f"激活码无效或已被使用: {result.get('msg', '未知错误')}") + return False, result.get("msg", "激活码无效或已被使用"), None + # 其他错误情况 + else: + error_msg = result.get("msg", "未知错误") + if attempt < max_retries - 1: # 如果还有重试机会 + logging.warning(f"第{attempt + 1}次尝试失败: {error_msg}, 准备重试...") + time.sleep(retry_delay) + continue + logging.error(f"激活失败: {error_msg}") + return False, error_msg, None + + except requests.exceptions.RequestException as e: + if attempt < max_retries - 1: # 如果还有重试机会 + logging.warning(f"第{attempt + 1}次网络请求失败: {str(e)}, 准备重试...") + time.sleep(retry_delay) + continue + error_msg = self._get_network_error_message(e) + logging.error(f"网络请求失败: {error_msg}") + return False, f"网络连接失败: {error_msg}", None + + except Exception as e: + if attempt < max_retries - 1: # 如果还有重试机会 + logging.warning(f"第{attempt + 1}次请求发生错误: {str(e)}, 准备重试...") + time.sleep(retry_delay) + continue + logging.error(f"激活失败: {str(e)}") + return False, f"激活失败: {str(e)}", None + + # 如果所有重试都失败了 + return False, "多次尝试后激活失败,请检查网络连接或稍后重试", None def main(): updater = CursorTokenUpdater()