From 8b2fbef54a1461ee59ab40e19eeac8e6f00eaa2a Mon Sep 17 00:00:00 2001 From: huangzhenpc Date: Fri, 14 Feb 2025 15:06:05 +0800 Subject: [PATCH] =?UTF-8?q?feat(v3.4.7):=20=E9=87=8D=E6=9E=84=E9=87=8D?= =?UTF-8?q?=E7=BD=AE=E5=8A=9F=E8=83=BD=E5=92=8C=E4=BC=98=E5=8C=96=E9=87=8D?= =?UTF-8?q?=E5=90=AF=E9=80=BB=E8=BE=91?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 1. 新增 CursorResetter 类,完整封装 cursor_win_id_modifier.ps1 的核心功能 2. 优化 AccountSwitcher 的重启逻辑,避免重复重启 3. 改进进程管理,移除 wmi 依赖,使用 tasklist 替代 4. 提升代码可维护性,后续只需更新 CursorResetter 即可适配脚本变更 --- account_switcher.py | 694 ++++++++++++++++++++++++++++----------- cursor_auth_manager.py | 373 +++++++++++++++++---- cursor_win_id.ps1 | 0 gui/main_window.py | 42 ++- main.py | 114 ++++--- utils/config.py | 3 +- utils/cursor_registry.py | 304 ++++++++++------- utils/cursor_resetter.py | 227 +++++++++++++ version.txt | 2 +- 9 files changed, 1322 insertions(+), 437 deletions(-) create mode 100644 cursor_win_id.ps1 create mode 100644 utils/cursor_resetter.py diff --git a/account_switcher.py b/account_switcher.py index 39af179..aceec33 100644 --- a/account_switcher.py +++ b/account_switcher.py @@ -7,11 +7,53 @@ import uuid import hashlib import sys import time -from typing import Optional, Dict, Tuple +import ctypes +from typing import Optional, Dict, Tuple, List from pathlib import Path from utils.config import Config from utils.cursor_registry import CursorRegistry from cursor_auth_manager import CursorAuthManager +from utils.cursor_resetter import CursorResetter # 添加导入 + +def is_admin() -> bool: + """检查是否具有管理员权限 + + Returns: + bool: 是否具有管理员权限 + """ + try: + return ctypes.windll.shell32.IsUserAnAdmin() != 0 + except: + return False + +def run_as_admin(): + """以管理员权限重新运行程序""" + try: + if not is_admin(): + # 获取当前脚本的路径 + script = sys.argv[0] + params = ' '.join(sys.argv[1:]) + + # 创建 startupinfo 对象来隐藏命令行窗口 + startupinfo = None + if sys.platform == "win32": + startupinfo = subprocess.STARTUPINFO() + startupinfo.dwFlags |= subprocess.STARTF_USESHOWWINDOW + startupinfo.wShowWindow = subprocess.SW_HIDE + + # 以管理员权限重新运行 + ctypes.windll.shell32.ShellExecuteW( + None, + "runas", + sys.executable, + f'"{script}" {params}', + None, + 1 + ) + return True + except Exception as e: + logging.error(f"以管理员权限运行失败: {str(e)}") + return False def get_hardware_id() -> str: """获取硬件唯一标识""" @@ -45,6 +87,15 @@ def get_hardware_id() -> str: class AccountSwitcher: def __init__(self): + # 检查管理员权限 + if not is_admin(): + logging.warning("当前不是管理员权限运行") + if run_as_admin(): + sys.exit(0) + else: + logging.error("无法获取管理员权限") + raise PermissionError("需要管理员权限才能运行此程序") + self.cursor_path = Path(os.path.expanduser("~")) / "AppData" / "Local" / "Programs" / "Cursor" self.app_path = self.cursor_path / "resources" / "app" self.package_json = self.app_path / "package.json" @@ -52,6 +103,9 @@ class AccountSwitcher: self.config = Config() self.hardware_id = self.get_hardware_id() # 先获取硬件ID self.registry = CursorRegistry() # 添加注册表操作工具类 + self.resetter = CursorResetter() # 添加重置工具类 + self.max_retries = 5 + self.wait_time = 1 logging.info(f"初始化硬件ID: {self.hardware_id}") @@ -159,166 +213,333 @@ class AccountSwitcher: Returns: tuple: (成功标志, 消息, 账号信息) """ - try: - data = { - "machine_id": self.hardware_id, - "code": code - } - - # 禁用SSL警告 - import urllib3 - urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) - - # 设置请求参数 - request_kwargs = { - "json": data, - "headers": {"Content-Type": "application/json"}, - "timeout": 5, # 减少超时时间从10秒到5秒 - "verify": False # 禁用SSL验证 - } - - # 尝试发送请求 + max_retries = 3 # 最大重试次数 + retry_delay = 1 # 重试间隔(秒) + + for attempt in range(max_retries): try: - response = requests.post( - self.config.get_api_url("activate"), - **request_kwargs - ) - except requests.exceptions.SSLError: - # 如果发生SSL错误,创建自定义SSL上下文 - import ssl - ssl_context = ssl.create_default_context() - ssl_context.check_hostname = False - ssl_context.verify_mode = ssl.CERT_NONE + data = { + "machine_id": self.hardware_id, + "code": code + } - # 使用自定义SSL上下文重试请求 + # 禁用SSL警告 + import urllib3 + urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) + + # 设置请求参数 + request_kwargs = { + "json": data, + "headers": { + "Content-Type": "application/json", + "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/121.0.0.0 Safari/537.36", + "Accept": "*/*", + "Connection": "keep-alive" + }, + "timeout": 10, # 增加超时时间 + "verify": False # 禁用SSL验证 + } + + # 创建session session = requests.Session() session.verify = False - response = session.post( - self.config.get_api_url("activate"), - **request_kwargs + + # 设置重试策略 + retry_strategy = urllib3.Retry( + total=3, # 总重试次数 + backoff_factor=0.5, # 重试间隔 + status_forcelist=[500, 502, 503, 504] # 需要重试的HTTP状态码 ) + adapter = requests.adapters.HTTPAdapter(max_retries=retry_strategy) + session.mount("http://", adapter) + session.mount("https://", adapter) + + try: + # 尝试发送请求 + response = session.post( + self.config.get_api_url("activate"), + **request_kwargs + ) + response.raise_for_status() # 检查HTTP状态码 + + result = response.json() + # 激活成功 + if result["code"] == 200: + api_data = result["data"] + # 构造标准的返回数据结构 + account_info = { + "status": "active", + "expire_time": api_data.get("expire_time", ""), + "total_days": api_data.get("total_days", 0), + "days_left": api_data.get("days_left", 0), + "device_info": self.get_device_info() + } + return True, result["msg"], account_info + # 激活码无效或已被使用 + elif result["code"] == 400: + logging.warning(f"激活码无效或已被使用: {result.get('msg', '未知错误')}") + return False, result.get("msg", "激活码无效或已被使用"), None + # 其他错误情况 + else: + error_msg = result.get("msg", "未知错误") + if attempt < max_retries - 1: # 如果还有重试机会 + logging.warning(f"第{attempt + 1}次尝试失败: {error_msg}, 准备重试...") + time.sleep(retry_delay) + continue + logging.error(f"激活失败: {error_msg}") + return False, error_msg, None + + except requests.exceptions.RequestException as e: + if attempt < max_retries - 1: # 如果还有重试机会 + logging.warning(f"第{attempt + 1}次网络请求失败: {str(e)}, 准备重试...") + time.sleep(retry_delay) + continue + error_msg = self._get_network_error_message(e) + logging.error(f"网络请求失败: {error_msg}") + return False, f"网络连接失败: {error_msg}", None + + except Exception as e: + if attempt < max_retries - 1: # 如果还有重试机会 + logging.warning(f"第{attempt + 1}次请求发生错误: {str(e)}, 准备重试...") + time.sleep(retry_delay) + continue + logging.error(f"激活失败: {str(e)}") + return False, f"激活失败: {str(e)}", None + + # 如果所有重试都失败了 + return False, "多次尝试后激活失败,请检查网络连接或稍后重试", None + + def _get_network_error_message(self, error: Exception) -> str: + """获取网络错误的友好提示信息""" + if isinstance(error, requests.exceptions.SSLError): + return "SSL证书验证失败,请检查系统时间是否正确" + elif isinstance(error, requests.exceptions.ConnectionError): + if "10054" in str(error): + return "连接被重置,可能是防火墙拦截,请检查防火墙设置" + elif "10061" in str(error): + return "无法连接到服务器,请检查网络连接" + return "网络连接错误,请检查网络设置" + elif isinstance(error, requests.exceptions.Timeout): + return "请求超时,请检查网络连接" + elif isinstance(error, requests.exceptions.RequestException): + return "网络请求失败,请稍后重试" + return str(error) + + def get_process_details(self, process_name: str) -> List[Dict]: + """获取进程详细信息 + + Args: + process_name: 进程名称 - result = response.json() - # 激活成功 - if result["code"] == 200: - api_data = result["data"] - # 构造标准的返回数据结构 - account_info = { - "status": "active", - "expire_time": api_data.get("expire_time", ""), - "total_days": api_data.get("total_days", 0), - "days_left": api_data.get("days_left", 0), - "device_info": self.get_device_info() - } - return True, result["msg"], account_info - # 激活码无效或已被使用 - elif result["code"] == 400: - logging.warning(f"激活码无效或已被使用: {result.get('msg', '未知错误')}") - return False, result.get("msg", "激活码无效或已被使用"), None - # 其他错误情况 - else: - logging.error(f"激活失败: {result.get('msg', '未知错误')}") - return False, result["msg"], None # 返回 None 而不是空的账号信息 - - except Exception as e: - logging.error(f"激活失败: {str(e)}") - return False, f"激活失败: {str(e)}", None # 返回 None 而不是空的账号信息 - - def reset_machine_id(self) -> bool: - """重置机器码""" + Returns: + List[Dict]: 进程详细信息列表 + """ + try: + # 使用 tasklist 命令替代 wmi + startupinfo = subprocess.STARTUPINFO() + startupinfo.dwFlags |= subprocess.STARTF_USESHOWWINDOW + + output = subprocess.check_output( + f'tasklist /FI "IMAGENAME eq {process_name}" /FO CSV /NH', + startupinfo=startupinfo, + shell=True + ).decode('gbk') + + processes = [] + if output.strip(): + for line in output.strip().split('\n'): + if line.strip(): + parts = line.strip('"').split('","') + if len(parts) >= 2: + processes.append({ + 'name': parts[0], + 'pid': parts[1] + }) + return processes + except Exception as e: + logging.error(f"获取进程信息失败: {str(e)}") + return [] + + def close_cursor_process(self) -> bool: + """关闭所有Cursor进程 + + Returns: + bool: 是否成功关闭所有进程 + """ try: - # 1. 先关闭所有Cursor进程 if sys.platform == "win32": # 创建startupinfo对象来隐藏命令行窗口 startupinfo = subprocess.STARTUPINFO() startupinfo.dwFlags |= subprocess.STARTF_USESHOWWINDOW startupinfo.wShowWindow = subprocess.SW_HIDE - # 使用subprocess.run来执行命令,并隐藏窗口 + # 获取进程详情 + processes = self.get_process_details("Cursor.exe") + if processes: + logging.info(f"发现 {len(processes)} 个Cursor进程") + for p in processes: + logging.info(f"进程信息: PID={p['pid']}, 路径={p['name']}") + + # 尝试关闭进程 subprocess.run( "taskkill /f /im Cursor.exe >nul 2>&1", startupinfo=startupinfo, shell=True ) + + # 等待进程关闭 + retry_count = 0 + while retry_count < self.max_retries: + if not self.get_process_details("Cursor.exe"): + logging.info("所有Cursor进程已关闭") + return True + + retry_count += 1 + if retry_count >= self.max_retries: + processes = self.get_process_details("Cursor.exe") + if processes: + logging.error(f"无法关闭以下进程:") + for p in processes: + logging.error(f"PID={p['pid']}, 路径={p['name']}") + return False + + logging.warning(f"等待进程关闭, 尝试 {retry_count}/{self.max_retries}...") + time.sleep(self.wait_time) + + return True + else: + # 其他系统的处理 + if sys.platform == "darwin": + subprocess.run("killall Cursor 2>/dev/null", shell=True) + else: + subprocess.run("pkill -f cursor", shell=True) time.sleep(2) - - # 2. 清理注册表(包括更新系统 MachineGuid) - if not self.registry.clean_registry(): - logging.warning("注册表清理失败") - - # 3. 清理文件(包括备份 storage.json) - if not self.registry.clean_cursor_files(): - logging.warning("文件清理失败") - - # 4. 修改 package.json 中的 machineId - if self.package_json.exists(): - with open(self.package_json, "r", encoding="utf-8") as f: - data = json.load(f) + return True - if "machineId" in data: - del data["machineId"] - - with open(self.package_json, "w", encoding="utf-8") as f: - json.dump(data, f, indent=2) + except Exception as e: + logging.error(f"关闭进程失败: {str(e)}") + return False - # 5. 修改 storage.json 中的遥测 ID - storage_path = Path(os.getenv('APPDATA')) / "Cursor" / "User" / "globalStorage" / "storage.json" - if storage_path.exists(): - with open(storage_path, "r", encoding="utf-8") as f: - storage_data = json.load(f) + def reset_machine_id(self) -> bool: + """重置机器码""" + try: + # 1. 关闭所有Cursor进程 + if not self.close_cursor_process(): + logging.error("无法关闭所有Cursor进程") + return False - # 只修改 machineId,保持其他遥测 ID 不变 - if "telemetry.machineId" in storage_data: - # 生成新的 machineId(使用与 GitHub 脚本类似的格式) - new_machine_id = hashlib.sha256(str(uuid.uuid4()).encode()).hexdigest() - storage_data["telemetry.machineId"] = new_machine_id + # 2. 使用新的重置工具类执行重置 + success, message = self.resetter.reset_cursor(disable_update=True) + if not success: + logging.error(f"重置失败: {message}") + return False - with open(storage_path, "w", encoding="utf-8") as f: - json.dump(storage_data, f, indent=2) - - # 6. 重启Cursor - cursor_exe = self.cursor_path / "Cursor.exe" - if cursor_exe.exists(): - os.startfile(str(cursor_exe)) - logging.info("Cursor重启成功") - + # 不在这里重启Cursor,让调用者决定何时重启 logging.info("机器码重置完成") return True except Exception as e: logging.error(f"重置机器码失败: {str(e)}") return False - - def bypass_version_limit(self) -> Tuple[bool, str]: - """突破Cursor版本限制""" + + def restart_cursor(self) -> bool: + """重启Cursor编辑器 + + Returns: + bool: 是否成功重启 + """ try: - # 1. 先关闭所有Cursor进程 - if sys.platform == "win32": - # 创建startupinfo对象来隐藏命令行窗口 - startupinfo = subprocess.STARTUPINFO() - startupinfo.dwFlags |= subprocess.STARTF_USESHOWWINDOW - startupinfo.wShowWindow = subprocess.SW_HIDE + logging.info("正在重启Cursor...") + + # 确保进程已关闭 + if not self.close_cursor_process(): + logging.error("无法关闭Cursor进程") + return False - # 关闭Cursor - subprocess.run( - "taskkill /f /im Cursor.exe >nul 2>&1", - startupinfo=startupinfo, - shell=True - ) - time.sleep(2) + # 等待进程完全关闭 + time.sleep(2) - # 2. 重置机器码 - if not self.reset_machine_id(): - return False, "重置机器码失败" - - # 3. 等待Cursor启动 - time.sleep(3) - - return True, "突破版本限制成功,Cursor已重启" + # 启动Cursor + if sys.platform == "win32": + cursor_exe = self.cursor_path / "Cursor.exe" + if cursor_exe.exists(): + try: + # 使用subprocess启动 + startupinfo = subprocess.STARTUPINFO() + startupinfo.dwFlags |= subprocess.STARTF_USESHOWWINDOW + + subprocess.Popen( + str(cursor_exe), + startupinfo=startupinfo, + creationflags=subprocess.CREATE_NEW_CONSOLE + ) + + # 等待进程启动 + time.sleep(3) + + # 验证进程是否启动 + processes = self.get_process_details("Cursor.exe") + if processes: + logging.info("Cursor启动成功") + return True + else: + logging.error("Cursor进程未找到") + # 尝试使用 os.startfile 作为备选方案 + try: + os.startfile(str(cursor_exe)) + time.sleep(3) + logging.info("使用备选方案启动Cursor") + return True + except Exception as e: + logging.error(f"备选启动方案失败: {str(e)}") + return False + + except Exception as e: + logging.error(f"启动Cursor失败: {str(e)}") + # 尝试使用 os.startfile 作为备选方案 + try: + os.startfile(str(cursor_exe)) + time.sleep(3) + logging.info("使用备选方案启动Cursor") + return True + except Exception as e: + logging.error(f"备选启动方案失败: {str(e)}") + return False + else: + logging.error(f"未找到Cursor程序: {cursor_exe}") + return False + elif sys.platform == "darwin": + try: + subprocess.run("open -a Cursor", shell=True, check=True) + logging.info("Cursor启动成功") + return True + except subprocess.CalledProcessError as e: + logging.error(f"启动Cursor失败: {str(e)}") + return False + elif sys.platform == "linux": + try: + subprocess.run("cursor &", shell=True, check=True) + logging.info("Cursor启动成功") + return True + except subprocess.CalledProcessError as e: + logging.error(f"启动Cursor失败: {str(e)}") + return False + + return False except Exception as e: - logging.error(f"突破版本限制失败: {str(e)}") - return False, f"突破失败: {str(e)}" + logging.error(f"重启Cursor失败: {str(e)}") + # 尝试使用 os.startfile 作为最后的备选方案 + try: + cursor_exe = self.cursor_path / "Cursor.exe" + if cursor_exe.exists(): + os.startfile(str(cursor_exe)) + time.sleep(3) + logging.info("使用最终备选方案启动Cursor") + return True + except: + pass + return False def activate_and_switch(self, activation_code: str) -> Tuple[bool, str]: """激活并切换账号 @@ -416,60 +637,6 @@ class AccountSwitcher: "device_info": self.get_device_info() } - def restart_cursor(self) -> bool: - """重启Cursor编辑器 - - Returns: - bool: 是否成功重启 - """ - try: - logging.info("正在重启Cursor...") - if sys.platform == "win32": - # Windows系统 - # 创建startupinfo对象来隐藏命令行窗口 - startupinfo = subprocess.STARTUPINFO() - startupinfo.dwFlags |= subprocess.STARTF_USESHOWWINDOW - startupinfo.wShowWindow = subprocess.SW_HIDE - - # 关闭Cursor - subprocess.run( - "taskkill /f /im Cursor.exe 2>nul", - startupinfo=startupinfo, - shell=True - ) - time.sleep(2) - - # 获取Cursor安装路径 - cursor_exe = self.cursor_path / "Cursor.exe" - if cursor_exe.exists(): - # 启动Cursor - os.startfile(str(cursor_exe)) - logging.info("Cursor重启成功") - return True - else: - logging.error(f"未找到Cursor程序: {cursor_exe}") - return False - elif sys.platform == "darwin": - # macOS系统 - subprocess.run("killall Cursor 2>/dev/null", shell=True) - time.sleep(2) - subprocess.run("open -a Cursor", shell=True) - logging.info("Cursor重启成功") - return True - elif sys.platform == "linux": - # Linux系统 - subprocess.run("pkill -f cursor", shell=True) - time.sleep(2) - subprocess.run("cursor &", shell=True) - logging.info("Cursor重启成功") - return True - else: - logging.error(f"不支持的操作系统: {sys.platform}") - return False - except Exception as e: - logging.error(f"重启Cursor时发生错误: {str(e)}") - return False - def refresh_cursor_auth(self) -> Tuple[bool, str]: """刷新Cursor授权 @@ -494,8 +661,8 @@ class AccountSwitcher: request_kwargs = { "json": data, "headers": headers, - "timeout": 30, # 增加超时时间 - "verify": False # 禁用SSL验证 + "timeout": 30, + "verify": False } try: @@ -530,29 +697,54 @@ class AccountSwitcher: return False, "获取账号信息不完整" # 2. 先关闭Cursor进程 - if sys.platform == "win32": - # 创建startupinfo对象来隐藏命令行窗口 - startupinfo = subprocess.STARTUPINFO() - startupinfo.dwFlags |= subprocess.STARTF_USESHOWWINDOW - startupinfo.wShowWindow = subprocess.SW_HIDE - - # 使用subprocess.run来执行命令,并隐藏窗口 - subprocess.run( - "taskkill /f /im Cursor.exe >nul 2>&1", - startupinfo=startupinfo, - shell=True - ) - time.sleep(2) + logging.info("正在关闭Cursor进程...") + if not self.close_cursor_process(): + logging.error("无法关闭Cursor进程") + return False, "无法关闭Cursor进程,请手动关闭后重试" # 3. 更新Cursor认证信息 + logging.info("正在更新认证信息...") if not self.auth_manager.update_auth(email, access_token, refresh_token): return False, "更新Cursor认证信息失败" - # 4. 重置机器码(使用现有的reset_machine_id方法) + # 4. 验证认证信息是否正确写入 + logging.info("正在验证认证信息...") + if not self.auth_manager.verify_auth(email, access_token, refresh_token): + return False, "认证信息验证失败" + + # 5. 保存email到package.json + try: + if self.package_json.exists(): + with open(self.package_json, "r", encoding="utf-8") as f: + package_data = json.load(f) + package_data["email"] = email + with open(self.package_json, "w", encoding="utf-8", newline='\n') as f: + json.dump(package_data, f, indent=2) + logging.info(f"已保存email到package.json: {email}") + except Exception as e: + logging.warning(f"保存email到package.json失败: {str(e)}") + + # 6. 重置机器码 + logging.info("正在重置机器码...") if not self.reset_machine_id(): return False, "重置机器码失败" - return True, f"授权刷新成功,Cursor编辑器已重启\n邮箱: {email}\n" + # 7. 重启Cursor(只在这里执行一次重启) + logging.info("正在重启Cursor...") + retry_count = 0 + max_retries = 3 + while retry_count < max_retries: + if self.restart_cursor(): + break + retry_count += 1 + if retry_count < max_retries: + logging.warning(f"重启失败,正在重试 ({retry_count}/{max_retries})...") + time.sleep(2) + + if retry_count >= max_retries: + return True, f"授权刷新成功,但Cursor重启失败,请手动启动Cursor\n邮箱: {email}\n到期时间: {expire_time}\n剩余天数: {days_left}" + + return True, f"授权刷新成功,Cursor已重启\n邮箱: {email}\n到期时间: {expire_time}\n剩余天数: {days_left}" elif response_data.get("code") == 404: return False, "没有可用的未使用账号" @@ -634,9 +826,112 @@ class AccountSwitcher: logging.error(f"禁用Cursor更新失败: {str(e)}") return False, f"禁用更新失败: {str(e)}" + def send_heartbeat(self) -> Tuple[bool, str]: + """ + 发送心跳请求 + Returns: + Tuple[bool, str]: (是否成功, 消息) + """ + max_retries = 3 # 最大重试次数 + retry_delay = 1 # 重试间隔(秒) + + # 获取硬件ID + hardware_id = self.get_hardware_id() + + # 禁用SSL警告 + import urllib3 + urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) + + # 设置请求参数 + params = { + "machine_id": hardware_id + } + + request_kwargs = { + "params": params, # 使用URL参数而不是JSON body + "headers": { + "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/121.0.0.0 Safari/537.36", + "Accept": "*/*", + "Connection": "keep-alive" + }, + "timeout": 10, + "verify": False + } + + for attempt in range(max_retries): + try: + # 创建session + session = requests.Session() + session.verify = False + + # 设置重试策略 + retry_strategy = urllib3.Retry( + total=3, + backoff_factor=0.5, + status_forcelist=[500, 502, 503, 504] + ) + adapter = requests.adapters.HTTPAdapter(max_retries=retry_strategy) + session.mount("http://", adapter) + session.mount("https://", adapter) + + # 发送请求 + response = session.get( # 改用GET请求 + self.config.get_api_url("heartbeat"), + **request_kwargs + ) + + # 检查响应 + if response.status_code == 200: + result = response.json() + if result.get("code") == 200: # 修改成功码为200 + data = result.get("data", {}) + expire_time = data.get("expire_time", "") + days_left = data.get("days_left", 0) + status = data.get("status", "") + return True, f"心跳发送成功 [到期时间: {expire_time}, 剩余天数: {days_left}, 状态: {status}]" + else: + error_msg = result.get("msg", "未知错误") + if attempt < max_retries - 1: + logging.warning(f"第{attempt + 1}次心跳失败: {error_msg}, 准备重试...") + time.sleep(retry_delay) + continue + return False, f"心跳发送失败: {error_msg}" + else: + if attempt < max_retries - 1: + logging.warning(f"第{attempt + 1}次心跳HTTP错误: {response.status_code}, 准备重试...") + time.sleep(retry_delay) + continue + return False, f"心跳请求失败: HTTP {response.status_code}" + + except requests.exceptions.RequestException as e: + if attempt < max_retries - 1: + logging.warning(f"第{attempt + 1}次网络请求失败: {str(e)}, 准备重试...") + time.sleep(retry_delay) + continue + error_message = self._get_network_error_message(e) + return False, f"心跳发送失败: {error_message}" + except Exception as e: + if attempt < max_retries - 1: + logging.warning(f"第{attempt + 1}次发生异常: {str(e)}, 准备重试...") + time.sleep(retry_delay) + continue + logging.error(f"心跳发送异常: {str(e)}") + return False, f"心跳发送异常: {str(e)}" + + return False, "多次尝试后心跳发送失败" + def main(): """主函数""" try: + # 检查管理员权限 + if not is_admin(): + print("\n[错误] 请以管理员身份运行此程序") + print("请右键点击程序,选择'以管理员身份运行'") + if run_as_admin(): + return + input("\n按回车键退出...") + return + switcher = AccountSwitcher() print("\n=== Cursor账号切换工具 ===") @@ -665,6 +960,9 @@ def main(): else: print("\n机器码重置失败,请查看日志了解详细信息") + except PermissionError as e: + print(f"\n[错误] {str(e)}") + print("请右键点击程序,选择'以管理员身份运行'") except Exception as e: logging.error(f"程序执行出错: {str(e)}") print("\n程序执行出错,请查看日志了解详细信息") diff --git a/cursor_auth_manager.py b/cursor_auth_manager.py index c9499e0..f88dc52 100644 --- a/cursor_auth_manager.py +++ b/cursor_auth_manager.py @@ -6,6 +6,8 @@ import logging import sqlite3 from pathlib import Path import subprocess +from typing import Optional, Dict, Tuple +from datetime import datetime class CursorAuthManager: """Cursor认证信息管理器""" @@ -31,115 +33,344 @@ class CursorAuthManager: raise NotImplementedError(f"不支持的操作系统: {sys.platform}") self.cursor_path = Path(os.path.expanduser("~")) / "AppData" / "Local" / "Programs" / "Cursor" + self.backup_dir = Path(os.getenv('APPDATA')) / "Cursor" / "User" / "globalStorage" / "backups" + self.max_retries = 5 + self.wait_time = 1 - def update_auth(self, email=None, access_token=None, refresh_token=None): + def backup_database(self) -> Optional[Path]: + """备份数据库文件 + + Returns: + Optional[Path]: 备份文件路径,失败返回None """ - 更新Cursor的认证信息 - :param email: 新的邮箱地址 - :param access_token: 新的访问令牌 - :param refresh_token: 新的刷新令牌 - :return: bool 是否成功更新 - """ - updates = [] - # 登录状态 - updates.append(("cursorAuth/cachedSignUpType", "Auth_0")) - - if email is not None: - updates.append(("cursorAuth/cachedEmail", email)) - if access_token is not None: - updates.append(("cursorAuth/accessToken", access_token)) - if refresh_token is not None: - updates.append(("cursorAuth/refreshToken", refresh_token)) - - if not updates: - logging.warning("没有提供任何要更新的值") - return False - - conn = None try: + if not Path(self.db_path).exists(): + logging.warning(f"数据库文件不存在: {self.db_path}") + return None + + self.backup_dir.mkdir(parents=True, exist_ok=True) + backup_name = f"state.vscdb.backup_{datetime.now().strftime('%Y%m%d_%H%M%S')}" + backup_path = self.backup_dir / backup_name + + # 如果数据库正在使用,先关闭连接 + try: + conn = sqlite3.connect(self.db_path) + conn.close() + except: + pass + + import shutil + shutil.copy2(self.db_path, backup_path) + logging.info(f"已备份数据库: {backup_path}") + return backup_path + + except Exception as e: + logging.error(f"备份数据库失败: {str(e)}") + return None + + def get_auth_info(self) -> Optional[Dict]: + """获取当前的认证信息 + + Returns: + Optional[Dict]: 认证信息字典,失败返回None + """ + try: + if not Path(self.db_path).exists(): + return None + conn = sqlite3.connect(self.db_path) cursor = conn.cursor() - - for key, value in updates: - # 检查key是否存在 - check_query = f"SELECT COUNT(*) FROM itemTable WHERE key = ?" - cursor.execute(check_query, (key,)) - if cursor.fetchone()[0] == 0: - insert_query = "INSERT INTO itemTable (key, value) VALUES (?, ?)" - cursor.execute(insert_query, (key, value)) - else: - update_query = "UPDATE itemTable SET value = ? WHERE key = ?" - cursor.execute(update_query, (value, key)) - - if cursor.rowcount > 0: - logging.info(f"成功更新 {key.split('/')[-1]}") - else: - logging.warning(f"未找到 {key.split('/')[-1]} 或值未变化") - - conn.commit() - logging.info(f"认证信息更新成功: {email}") - return True - - except sqlite3.Error as e: - logging.error(f"数据库错误: {str(e)}") - return False + + # 查询认证相关的键值 + auth_keys = [ + 'authentication.currentToken', + 'authentication.refreshToken', + 'authentication.accessToken', + 'authentication.email' + ] + + result = {} + for key in auth_keys: + cursor.execute('SELECT value FROM ItemTable WHERE key = ?', (key,)) + row = cursor.fetchone() + if row: + try: + value = json.loads(row[0]) + result[key] = value + except: + result[key] = row[0] + + conn.close() + return result if result else None + + except Exception as e: + logging.error(f"获取认证信息失败: {str(e)}") + return None + + def update_auth(self, email: str = None, access_token: str = None, refresh_token: str = None) -> bool: + """更新Cursor的认证信息 + + Args: + email: 新的邮箱地址 + access_token: 新的访问令牌 + refresh_token: 新的刷新令牌 + + Returns: + bool: 是否成功更新 + """ + try: + # 备份数据库 + if not self.backup_database(): + logging.warning("数据库备份失败") + + # 准备更新数据 + updates = [] + # 登录状态 + updates.append(("cursorAuth/cachedSignUpType", "Auth_0")) + + if email is not None: + updates.append(("cursorAuth/cachedEmail", email)) + if access_token is not None: + updates.append(("cursorAuth/accessToken", access_token)) + if refresh_token is not None: + updates.append(("cursorAuth/refreshToken", refresh_token)) + + if not updates: + logging.warning("没有提供任何要更新的值") + return False + + # 确保数据库目录存在 + db_dir = Path(self.db_path).parent + db_dir.mkdir(parents=True, exist_ok=True) + + # 确保数据库表存在 + conn = sqlite3.connect(self.db_path) + cursor = conn.cursor() + + # 创建表(如果不存在) + cursor.execute(''' + CREATE TABLE IF NOT EXISTS ItemTable ( + key TEXT PRIMARY KEY, + value TEXT + ) + ''') + + try: + # 开始事务 + cursor.execute('BEGIN TRANSACTION') + + # 执行更新 + for key, value in updates: + # 检查key是否存在 + cursor.execute('SELECT COUNT(*) FROM ItemTable WHERE key = ?', (key,)) + if cursor.fetchone()[0] == 0: + # 插入新值 + cursor.execute( + 'INSERT INTO ItemTable (key, value) VALUES (?, ?)', + (key, value) + ) + else: + # 更新现有值 + cursor.execute( + 'UPDATE ItemTable SET value = ? WHERE key = ?', + (value, key) + ) + + if cursor.rowcount > 0: + logging.info(f"成功更新 {key.split('/')[-1]}") + else: + logging.warning(f"未找到 {key.split('/')[-1]} 或值未变化") + + # 提交事务 + cursor.execute('COMMIT') + logging.info(f"认证信息更新成功: {email}") + return True + + except Exception as e: + # 如果出错,回滚事务 + cursor.execute('ROLLBACK') + raise e + except Exception as e: logging.error(f"更新认证信息失败: {str(e)}") return False + finally: - if conn: + if 'conn' in locals(): conn.close() - def restart_cursor(self) -> bool: - """重启Cursor编辑器 + def verify_auth(self, email: str, access_token: str, refresh_token: str) -> bool: + """验证认证信息是否正确写入 + Args: + email: 邮箱 + access_token: 访问令牌 + refresh_token: 新的刷新令牌 + Returns: - bool: 是否成功重启 + bool: 是否正确写入 + """ + try: + # 连接数据库 + conn = sqlite3.connect(self.db_path) + cursor = conn.cursor() + + # 验证每个字段 + expected = { + 'cursorAuth/cachedEmail': email, + 'cursorAuth/accessToken': access_token, + 'cursorAuth/refreshToken': refresh_token, + 'cursorAuth/cachedSignUpType': 'Auth_0' + } + + for key, expected_value in expected.items(): + cursor.execute('SELECT value FROM ItemTable WHERE key = ?', (key,)) + row = cursor.fetchone() + if not row: + logging.error(f"缺少认证信息: {key}") + return False + + actual_value = row[0] + if actual_value != expected_value: + logging.error(f"认证信息不匹配: {key}") + logging.error(f"预期: {expected_value}") + logging.error(f"实际: {actual_value}") + return False + + conn.close() + return True + + except Exception as e: + logging.error(f"验证认证信息失败: {str(e)}") + return False + finally: + if 'conn' in locals(): + conn.close() + + def clear_auth(self) -> bool: + """清除认证信息 + + Returns: + bool: 是否成功 + """ + try: + # 备份数据库 + if not self.backup_database(): + logging.warning("数据库备份失败") + + # 清除数据库中的认证信息 + conn = sqlite3.connect(self.db_path) + cursor = conn.cursor() + + # 要清除的键 + auth_keys = [ + 'authentication.currentToken', + 'authentication.refreshToken', + 'authentication.accessToken', + 'authentication.email' + ] + + # 执行删除 + for key in auth_keys: + cursor.execute('DELETE FROM ItemTable WHERE key = ?', (key,)) + + conn.commit() + conn.close() + + logging.info("已清除认证信息") + return True + + except Exception as e: + logging.error(f"清除认证信息失败: {str(e)}") + return False + + def close_cursor_process(self) -> bool: + """关闭所有Cursor进程 + + Returns: + bool: 是否成功 """ try: - logging.info("正在重启Cursor...") if sys.platform == "win32": - # Windows系统 # 创建startupinfo对象来隐藏命令行窗口 startupinfo = subprocess.STARTUPINFO() startupinfo.dwFlags |= subprocess.STARTF_USESHOWWINDOW startupinfo.wShowWindow = subprocess.SW_HIDE - # 关闭Cursor + # 关闭进程 subprocess.run( - "taskkill /f /im Cursor.exe 2>nul", + "taskkill /f /im Cursor.exe >nul 2>&1", startupinfo=startupinfo, shell=True ) - time.sleep(2) - # 获取Cursor安装路径 + # 等待进程关闭 + retry_count = 0 + while retry_count < self.max_retries: + try: + subprocess.check_output( + "tasklist | findstr Cursor.exe", + startupinfo=startupinfo, + shell=True + ) + retry_count += 1 + if retry_count >= self.max_retries: + logging.error("无法关闭所有Cursor进程") + return False + time.sleep(self.wait_time) + except subprocess.CalledProcessError: + # 进程已关闭 + break + + return True + + else: + # 其他系统的处理 + if sys.platform == "darwin": + subprocess.run("killall Cursor 2>/dev/null", shell=True) + else: + subprocess.run("pkill -f cursor", shell=True) + time.sleep(2) + return True + + except Exception as e: + logging.error(f"关闭进程失败: {str(e)}") + return False + + def restart_cursor(self) -> bool: + """重启Cursor编辑器 + + Returns: + bool: 是否成功 + """ + try: + logging.info("正在重启Cursor...") + + # 确保进程已关闭 + if not self.close_cursor_process(): + return False + + # 启动Cursor + if sys.platform == "win32": cursor_exe = self.cursor_path / "Cursor.exe" if cursor_exe.exists(): - # 启动Cursor os.startfile(str(cursor_exe)) - logging.info("Cursor重启成功") + logging.info("Cursor启动成功") return True else: logging.error(f"未找到Cursor程序: {cursor_exe}") return False elif sys.platform == "darwin": - # macOS系统 - subprocess.run("killall Cursor 2>/dev/null", shell=True) - time.sleep(2) subprocess.run("open -a Cursor", shell=True) - logging.info("Cursor重启成功") + logging.info("Cursor启动成功") return True elif sys.platform == "linux": - # Linux系统 - subprocess.run("pkill -f cursor", shell=True) - time.sleep(2) subprocess.run("cursor &", shell=True) - logging.info("Cursor重启成功") + logging.info("Cursor启动成功") return True - else: - logging.error(f"不支持的操作系统: {sys.platform}") - return False + + return False + except Exception as e: - logging.error(f"重启Cursor时发生错误: {str(e)}") + logging.error(f"重启Cursor失败: {str(e)}") return False \ No newline at end of file diff --git a/cursor_win_id.ps1 b/cursor_win_id.ps1 new file mode 100644 index 0000000..e69de29 diff --git a/gui/main_window.py b/gui/main_window.py index 9f1b3c1..1e66860 100644 --- a/gui/main_window.py +++ b/gui/main_window.py @@ -310,6 +310,11 @@ class MainWindow(QMainWindow): self._activation_status = None # 缓存的激活状态 self._status_timer = None # 状态更新定时器 + # 添加心跳定时器 + self._heartbeat_timer = QTimer() + self._heartbeat_timer.timeout.connect(self.send_heartbeat) + self._heartbeat_timer.start(5 * 60 * 1000) # 每5分钟发送一次心跳 + # 添加请求锁,防止重复提交 self._is_requesting = False self._last_request_time = 0 @@ -712,7 +717,7 @@ class MainWindow(QMainWindow): self.show_and_activate() def closeEvent(self, event): - """重写关闭事件,最小化到托盘而不是退出""" + """窗口关闭事件""" try: if hasattr(self, 'tray_icon') and self.tray_icon.isVisible(): event.ignore() @@ -729,6 +734,8 @@ class MainWindow(QMainWindow): # 如果托盘图标不可用,则正常退出 if self._status_timer: self._status_timer.stop() + if hasattr(self, '_heartbeat_timer'): + self._heartbeat_timer.stop() event.accept() except Exception as e: @@ -736,12 +743,10 @@ class MainWindow(QMainWindow): # 发生错误时,接受关闭事件 if self._status_timer: self._status_timer.stop() + if hasattr(self, '_heartbeat_timer'): + self._heartbeat_timer.stop() event.accept() - if self._status_timer: - self._status_timer.stop() - super().closeEvent(event) - def copy_device_id(self): """复制设备ID到剪贴板""" QApplication.clipboard().setText(self.hardware_id_edit.text()) @@ -1998,4 +2003,29 @@ class MainWindow(QMainWindow): layout.addLayout(btn_layout) msg.setLayout(layout) - msg.exec_() \ No newline at end of file + msg.exec_() + + def send_heartbeat(self): + """发送心跳请求""" + if not self._check_request_throttle(): + return + + def heartbeat_func(): + return self.switcher.send_heartbeat() + + # 创建工作线程 + self.heartbeat_worker = ApiWorker(heartbeat_func) + self.heartbeat_worker.finished.connect(self.on_heartbeat_complete) + self.heartbeat_worker.start() + + def on_heartbeat_complete(self, result): + """心跳完成回调""" + success, message = result + self._request_complete() + + if success: + logging.info(f"心跳发送成功: {message}") + # 更新状态显示 + self.check_status() + else: + logging.error(f"心跳发送失败: {message}") \ No newline at end of file diff --git a/main.py b/main.py index c6b3bfc..ec90c86 100644 --- a/main.py +++ b/main.py @@ -6,11 +6,13 @@ import atexit import shutil import tempfile import urllib3 +import ctypes from pathlib import Path from PyQt5.QtWidgets import QApplication, QMessageBox, QSystemTrayIcon, QMenu from PyQt5.QtGui import QIcon from PyQt5.QtCore import Qt from gui.main_window import MainWindow +from account_switcher import AccountSwitcher # 禁用所有 SSL 相关警告 urllib3.disable_warnings() @@ -39,88 +41,114 @@ def setup_logging(): log_file = log_dir / "switcher.log" - # 只输出到文件,不输出到控制台 + # 同时输出到文件和控制台 logging.basicConfig( level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s", handlers=[ logging.FileHandler(log_file, encoding="utf-8"), + logging.StreamHandler() ] ) except Exception as e: - # 不打印错误信息,只记录到日志 - pass + print(f"设置日志失败: {str(e)}") + +def is_admin(): + """检查是否具有管理员权限""" + try: + return ctypes.windll.shell32.IsUserAnAdmin() != 0 + except: + return False + +def run_as_admin(): + """以管理员权限重新运行程序""" + try: + if not is_admin(): + # 获取当前脚本的路径 + script = sys.argv[0] + params = ' '.join(sys.argv[1:]) + + # 以管理员权限重新运行 + ctypes.windll.shell32.ShellExecuteW( + None, + "runas", + sys.executable, + f'"{script}" {params}', + None, + 1 + ) + return True + except Exception as e: + print(f"提升权限失败: {str(e)}") + return False + +def print_banner(): + """打印程序横幅""" + print(""" +==================================== + Cursor 账号管理工具 +==================================== + """) def main(): """主函数""" try: - # 注册退出时的清理函数 + # 1. 首先检查管理员权限 + if not is_admin(): + if run_as_admin(): + return 0 + else: + QMessageBox.critical(None, "错误", "需要管理员权限运行此程序。\n请右键点击程序,选择'以管理员身份运行'。") + return 1 + + # 2. 注册退出时的清理函数 atexit.register(cleanup_temp) - # 创建QApplication实例 + # 3. 设置日志 + setup_logging() + + # 4. 创建QApplication实例 app = QApplication(sys.argv) - # 检查系统托盘是否可用 + # 5. 检查系统托盘 if not QSystemTrayIcon.isSystemTrayAvailable(): logging.error("系统托盘不可用") QMessageBox.critical(None, "错误", "系统托盘不可用,程序无法正常运行。") return 1 - - # 设置应用程序不会在最后一个窗口关闭时退出 + + # 6. 设置应用程序不会在最后一个窗口关闭时退出 app.setQuitOnLastWindowClosed(False) - setup_logging() - - # 检查Python版本 + # 7. 记录系统信息 logging.info(f"Python版本: {sys.version}") - - # 检查工作目录 logging.info(f"当前工作目录: {Path.cwd()}") - # 检查模块路径 - logging.info("Python路径:") - for p in sys.path: - logging.info(f" - {p}") - - logging.info("正在初始化主窗口...") - - # 设置应用程序ID (在设置图标之前) + # 8. 设置应用程序ID if sys.platform == "win32": - import ctypes myappid = u'nezha.cursor.helper.v3' ctypes.windll.shell32.SetCurrentProcessExplicitAppUserModelID(myappid) logging.info(f"设置应用程序ID: {myappid}") - # 设置应用程序图标 - try: - icon_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), "icon", "two.ico") - if os.path.exists(icon_path): - app_icon = QIcon(icon_path) - if not app_icon.isNull(): - app.setWindowIcon(app_icon) - logging.info(f"成功设置应用程序图标: {icon_path}") - else: - logging.error("图标文件加载失败") - else: - logging.error(f"图标文件不存在: {icon_path}") - - except Exception as e: - logging.error(f"设置应用程序图标失败: {str(e)}") + # 9. 设置应用程序图标 + icon_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), "icon", "two.ico") + if os.path.exists(icon_path): + app_icon = QIcon(icon_path) + if not app_icon.isNull(): + app.setWindowIcon(app_icon) + logging.info(f"成功设置应用程序图标: {icon_path}") + # 10. 创建并显示主窗口 + logging.info("正在初始化主窗口...") window = MainWindow() - window.setWindowIcon(app.windowIcon()) # 确保窗口使用相同的图标 - - logging.info("正在启动主窗口...") + window.setWindowIcon(app.windowIcon()) window.show() + # 11. 运行应用程序 return app.exec_() except Exception as e: error_msg = f"程序运行出错: {str(e)}\n{traceback.format_exc()}" logging.error(error_msg) - # 使用 QMessageBox 显示错误 - if QApplication.instance() is None: - app = QApplication(sys.argv) QMessageBox.critical(None, "错误", error_msg) return 1 diff --git a/utils/config.py b/utils/config.py index 5e90f94..90af85b 100644 --- a/utils/config.py +++ b/utils/config.py @@ -11,7 +11,8 @@ class Config: self.api_endpoints = { "activate": f"{self.base_url}/admin/api.member/activate", "status": f"{self.base_url}/admin/api.member/status", - "get_unused": f"{self.base_url}/admin/api.account/getUnused" + "get_unused": f"{self.base_url}/admin/api.account/getUnused", + "heartbeat": f"{self.base_url}/admin/api.account/heartbeat" } self.config_dir = Path(os.path.expanduser("~")) / ".cursor_switcher" self.config_file = self.config_dir / "config.json" diff --git a/utils/cursor_registry.py b/utils/cursor_registry.py index 6071c6a..71da95f 100644 --- a/utils/cursor_registry.py +++ b/utils/cursor_registry.py @@ -7,6 +7,8 @@ import uuid from datetime import datetime import json import hashlib +import ctypes +from typing import Optional class CursorRegistry: """Cursor注册表操作工具类""" @@ -14,61 +16,115 @@ class CursorRegistry: def __init__(self): self.cursor_path = Path(os.path.expanduser("~")) / "AppData" / "Local" / "Programs" / "Cursor" self.app_path = self.cursor_path / "resources" / "app" + self.backup_dir = Path(os.getenv('APPDATA')) / "Cursor" / "User" / "globalStorage" / "backups" + def get_random_hex(self, length: int) -> str: + """生成安全的随机十六进制字符串 + + Args: + length: 需要生成的字节长度 + + Returns: + str: 十六进制字符串 + """ + import secrets + return secrets.token_hex(length) + + def new_standard_machine_id(self) -> str: + """生成标准格式的机器ID + + Returns: + str: 标准格式的机器ID + """ + template = "xxxxxxxx-xxxx-4xxx-yxxx-xxxxxxxxxxxx" + + def replace_char(match): + import random + r = random.randint(0, 15) + v = r if match == 'x' else (r & 0x3 | 0x8) + return hex(v)[2:] + + return ''.join(replace_char(c) for c in template) + + def is_admin(self) -> bool: + """检查是否具有管理员权限 + + Returns: + bool: 是否具有管理员权限 + """ + try: + return ctypes.windll.shell32.IsUserAnAdmin() != 0 + except: + return False + + def backup_file(self, source_path: Path, backup_name: Optional[str] = None) -> Optional[Path]: + """备份文件 + + Args: + source_path: 源文件路径 + backup_name: 备份文件名(可选) + + Returns: + Optional[Path]: 备份文件路径,失败返回None + """ + try: + if not source_path.exists(): + return None + + self.backup_dir.mkdir(parents=True, exist_ok=True) + + if backup_name is None: + backup_name = f"{source_path.name}.backup_{datetime.now().strftime('%Y%m%d_%H%M%S')}" + + backup_path = self.backup_dir / backup_name + shutil.copy2(source_path, backup_path) + logging.info(f"已备份文件: {source_path} -> {backup_path}") + return backup_path + except Exception as e: + logging.error(f"备份文件失败 {source_path}: {str(e)}") + return None + def update_machine_guid(self) -> bool: """更新系统的 MachineGuid Returns: bool: 是否成功 """ + if not self.is_admin(): + logging.error("需要管理员权限来修改 MachineGuid") + return False + try: - # 生成新的 GUID new_guid = str(uuid.uuid4()) registry_path = r"SOFTWARE\Microsoft\Cryptography" try: - # 使用管理员权限打开注册表项 - key = None - try: - # 先尝试直接打开读取权限 - key = winreg.OpenKey(winreg.HKEY_LOCAL_MACHINE, registry_path, 0, - winreg.KEY_READ | winreg.KEY_WOW64_64KEY) - # 读取原始值并备份 + # 备份原始值 + with winreg.OpenKey(winreg.HKEY_LOCAL_MACHINE, registry_path, 0, + winreg.KEY_READ | winreg.KEY_WOW64_64KEY) as key: original_guid = winreg.QueryValueEx(key, "MachineGuid")[0] - winreg.CloseKey(key) - # 备份原始 MachineGuid - backup_dir = Path(os.getenv('APPDATA')) / "Cursor" / "User" / "globalStorage" / "backups" - backup_dir.mkdir(parents=True, exist_ok=True) - backup_name = f"MachineGuid.backup_{datetime.now().strftime('%Y%m%d_%H%M%S')}" - with open(backup_dir / backup_name, 'w', encoding='utf-8') as f: - f.write(original_guid) - logging.info(f"备份 MachineGuid 到: {backup_name}") - - # 重新打开写入权限 - key = winreg.OpenKey(winreg.HKEY_LOCAL_MACHINE, registry_path, 0, - winreg.KEY_WRITE | winreg.KEY_WOW64_64KEY) - except WindowsError: - # 如果失败,尝试以管理员权限运行 - import ctypes - if ctypes.windll.shell32.IsUserAnAdmin() == 0: - logging.warning("需要管理员权限来修改 MachineGuid") - return False - key = winreg.OpenKey(winreg.HKEY_LOCAL_MACHINE, registry_path, 0, - winreg.KEY_ALL_ACCESS | winreg.KEY_WOW64_64KEY) + # 备份原始 GUID + backup_path = self.backup_dir / f"MachineGuid.backup_{datetime.now().strftime('%Y%m%d_%H%M%S')}" + self.backup_dir.mkdir(parents=True, exist_ok=True) + with open(backup_path, 'w', encoding='utf-8') as f: + f.write(original_guid) + logging.info(f"已备份 MachineGuid: {backup_path}") - # 设置新的 GUID - winreg.SetValueEx(key, "MachineGuid", 0, winreg.REG_SZ, new_guid) - winreg.CloseKey(key) - logging.info(f"更新系统 MachineGuid 成功: {new_guid}") + # 更新 GUID + with winreg.OpenKey(winreg.HKEY_LOCAL_MACHINE, registry_path, 0, + winreg.KEY_WRITE | winreg.KEY_WOW64_64KEY) as key: + winreg.SetValueEx(key, "MachineGuid", 0, winreg.REG_SZ, new_guid) + + logging.info(f"已更新系统 MachineGuid: {new_guid}") return True except WindowsError as e: - logging.error(f"更新系统 MachineGuid 失败: {str(e)}") + logging.error(f"注册表操作失败: {str(e)}") return False except Exception as e: - logging.error(f"更新 MachineGuid 过程出错: {str(e)}") + logging.error(f"更新 MachineGuid 失败: {str(e)}") return False def clean_registry(self) -> bool: @@ -107,97 +163,111 @@ class CursorRegistry: return False def clean_cursor_files(self) -> bool: - """清理Cursor相关的文件和目录,但保留重要的配置和历史记录""" + """清理Cursor相关的文件和目录,但保留重要的配置和历史记录 + + Returns: + bool: 是否成功 + """ try: - local_app_data = Path(os.getenv('LOCALAPPDATA')) - app_data = Path(os.getenv('APPDATA')) + storage_path = Path(os.getenv('APPDATA')) / "Cursor" / "User" / "globalStorage" / "storage.json" + global_storage_dir = storage_path.parent - # 需要备份的文件 - storage_path = app_data / "Cursor" / "User" / "globalStorage" / "storage.json" - backup_dir = app_data / "Cursor" / "User" / "globalStorage" / "backups" - global_storage_dir = app_data / "Cursor" / "User" / "globalStorage" - - # 如果存在 storage.json,先备份 + # 备份 storage.json + if storage_path.exists(): + if not self.backup_file(storage_path): + return False + + # 备份其他重要文件 + if global_storage_dir.exists(): + for item in global_storage_dir.iterdir(): + if item.name not in ["storage.json", "backups"]: + try: + backup_item_dir = self.backup_dir / f"other_files_{datetime.now().strftime('%Y%m%d_%H%M%S')}" + backup_item_dir.mkdir(exist_ok=True) + + if item.is_file(): + shutil.copy2(item, backup_item_dir / item.name) + elif item.is_dir(): + shutil.copytree(item, backup_item_dir / item.name) + + logging.info(f"已备份: {item}") + except Exception as e: + logging.error(f"备份失败 {item}: {str(e)}") + + # 更新 storage.json if storage_path.exists(): - # 确保备份目录存在 - backup_dir.mkdir(parents=True, exist_ok=True) - - # 备份 storage.json - backup_name = f"storage.json.backup_{datetime.now().strftime('%Y%m%d_%H%M%S')}" - shutil.copy2(storage_path, backup_dir / backup_name) - logging.info(f"备份 storage.json 到: {backup_name}") - - # 备份 global_storage 目录中的其他重要文件 - if global_storage_dir.exists(): - for item in global_storage_dir.iterdir(): - if item.name != "storage.json" and item.name != "backups": - try: - backup_item_dir = backup_dir / f"other_files_{datetime.now().strftime('%Y%m%d_%H%M%S')}" - backup_item_dir.mkdir(exist_ok=True) - if item.is_file(): - shutil.copy2(item, backup_item_dir / item.name) - logging.info(f"备份文件: {item.name}") - elif item.is_dir(): - shutil.copytree(item, backup_item_dir / item.name) - logging.info(f"备份目录: {item.name}") - except Exception as e: - logging.error(f"备份 {item} 失败: {str(e)}") - - # 读取当前内容 - with open(storage_path, "r", encoding="utf-8") as f: - storage_data = json.load(f) - - # 只修改 machineId,保持其他配置不变 - if "telemetry.machineId" in storage_data: - # 生成新的 machineId - new_machine_id = hashlib.sha256(str(uuid.uuid4()).encode()).hexdigest() - storage_data["telemetry.machineId"] = new_machine_id - logging.info(f"更新 machineId: {new_machine_id}") - - # 保存修改后的内容 - with open(storage_path, "w", encoding="utf-8") as f: - json.dump(storage_data, f, indent=2) - - # 处理 updater 目录 - updater_path = local_app_data / "cursor-updater" - try: - # 如果是目录,则删除 - if updater_path.is_dir(): - shutil.rmtree(str(updater_path)) - logging.info("删除 updater 目录成功") - # 如果是文件,则删除 - if updater_path.is_file(): - updater_path.unlink() - logging.info("删除 updater 文件成功") - # 创建同名空文件来阻止更新 - updater_path.touch() - logging.info("创建 updater 空文件成功") - except Exception as e: - logging.error(f"处理 updater 文件失败: {str(e)}") - - # 只清理缓存相关的路径 - paths_to_clean = [ - local_app_data / "Cursor" / "Cache" - ] - - for path in paths_to_clean: try: - if path.is_dir(): - shutil.rmtree(str(path), ignore_errors=True) - logging.info(f"删除目录成功: {path}") - elif path.exists(): - path.unlink() - logging.info(f"删除文件成功: {path}") + with open(storage_path, "r", encoding="utf-8") as f: + storage_data = json.load(f) + + if "telemetry.machineId" in storage_data: + new_machine_id = self.get_random_hex(32) + storage_data["telemetry.machineId"] = new_machine_id + logging.info(f"已更新 machineId: {new_machine_id}") + + # 使用 UTF-8 无 BOM 编码保存 + with open(storage_path, "w", encoding="utf-8", newline='\n') as f: + json.dump(storage_data, f, indent=2) + except Exception as e: - logging.error(f"清理文件/目录失败: {path}, 错误: {str(e)}") - - # 修复 Cursor 启动配置 - self.fix_cursor_startup() - + logging.error(f"更新 storage.json 失败: {str(e)}") + return False + + # 处理更新程序 + updater_path = Path(os.getenv('LOCALAPPDATA')) / "cursor-updater" + if updater_path.exists(): + try: + if updater_path.is_dir(): + shutil.rmtree(updater_path) + else: + updater_path.unlink() + except Exception as e: + logging.error(f"删除更新程序失败: {str(e)}") + return False + return True except Exception as e: - logging.error(f"清理文件过程出错: {str(e)}") + logging.error(f"清理文件失败: {str(e)}") + return False + + def disable_auto_update(self) -> bool: + """禁用自动更新功能 + + Returns: + bool: 是否成功 + """ + try: + updater_path = Path(os.getenv('LOCALAPPDATA')) / "cursor-updater" + + # 删除现有目录/文件 + if updater_path.exists(): + if updater_path.is_dir(): + shutil.rmtree(updater_path) + else: + updater_path.unlink() + + # 创建空文件 + updater_path.touch() + + # 设置只读属性 + import stat + updater_path.chmod(stat.S_IREAD) + + # 设置文件权限(仅Windows) + if os.name == 'nt': + import subprocess + subprocess.run( + f'icacls "{updater_path}" /inheritance:r /grant:r "{os.getenv("USERNAME")}:(R)"', + shell=True, + check=True + ) + + logging.info("已禁用自动更新") + return True + + except Exception as e: + logging.error(f"禁用自动更新失败: {str(e)}") return False def fix_cursor_startup(self) -> bool: diff --git a/utils/cursor_resetter.py b/utils/cursor_resetter.py new file mode 100644 index 0000000..48b1d22 --- /dev/null +++ b/utils/cursor_resetter.py @@ -0,0 +1,227 @@ +import os +import sys +import json +import logging +import subprocess +import uuid +from pathlib import Path +from datetime import datetime +from typing import Optional, Tuple, Dict + +class CursorResetter: + """Cursor重置工具类,封装PowerShell脚本的核心功能""" + + def __init__(self): + self.appdata = os.getenv('APPDATA') + self.localappdata = os.getenv('LOCALAPPDATA') + self.storage_file = Path(self.appdata) / "Cursor" / "User" / "globalStorage" / "storage.json" + self.backup_dir = Path(self.appdata) / "Cursor" / "User" / "globalStorage" / "backups" + self.cursor_path = Path(self.localappdata) / "Programs" / "cursor" + self.app_path = self.cursor_path / "resources" / "app" + self.package_json = self.app_path / "package.json" + + def get_random_hex(self, length: int) -> str: + """生成安全的随机十六进制字符串""" + import secrets + return secrets.token_hex(length) + + def new_standard_machine_id(self) -> str: + """生成标准格式的机器ID""" + template = "xxxxxxxx-xxxx-4xxx-yxxx-xxxxxxxxxxxx" + import random + + def replace_char(match): + r = random.randint(0, 15) + v = r if match == 'x' else (r & 0x3 | 0x8) + return hex(v)[2:] + + return ''.join(replace_char(c) for c in template) + + def generate_ids(self) -> Dict[str, str]: + """生成所有需要的ID""" + # 生成标准格式的ID + mac_machine_id = self.new_standard_machine_id() + uuid_str = str(uuid.uuid4()) + + # 生成带前缀的machineId + prefix = "auth0|user_" + prefix_hex = ''.join(hex(b)[2:].zfill(2) for b in prefix.encode()) + random_part = self.get_random_hex(32) + machine_id = f"{prefix_hex}{random_part}" + + # 生成大写的SQM ID + sqm_id = "{" + str(uuid.uuid4()).upper() + "}" + + return { + "mac_machine_id": mac_machine_id, + "uuid": uuid_str, + "machine_id": machine_id, + "sqm_id": sqm_id + } + + def backup_file(self, file_path: Path) -> Optional[Path]: + """备份文件""" + try: + if not file_path.exists(): + return None + + self.backup_dir.mkdir(parents=True, exist_ok=True) + backup_name = f"{file_path.name}.backup_{datetime.now().strftime('%Y%m%d_%H%M%S')}" + backup_path = self.backup_dir / backup_name + + import shutil + shutil.copy2(file_path, backup_path) + logging.info(f"已备份文件: {backup_path}") + return backup_path + + except Exception as e: + logging.error(f"备份文件失败: {str(e)}") + return None + + def update_machine_guid(self) -> bool: + """更新系统MachineGuid""" + try: + import winreg + new_guid = str(uuid.uuid4()) + registry_path = r"SOFTWARE\Microsoft\Cryptography" + + # 备份原始值 + with winreg.OpenKey(winreg.HKEY_LOCAL_MACHINE, registry_path, 0, + winreg.KEY_READ | winreg.KEY_WOW64_64KEY) as key: + original_guid = winreg.QueryValueEx(key, "MachineGuid")[0] + + # 备份到文件 + self.backup_dir.mkdir(parents=True, exist_ok=True) + backup_path = self.backup_dir / f"MachineGuid.backup_{datetime.now().strftime('%Y%m%d_%H%M%S')}" + with open(backup_path, 'w', encoding='utf-8') as f: + f.write(original_guid) + + # 更新GUID + with winreg.OpenKey(winreg.HKEY_LOCAL_MACHINE, registry_path, 0, + winreg.KEY_WRITE | winreg.KEY_WOW64_64KEY) as key: + winreg.SetValueEx(key, "MachineGuid", 0, winreg.REG_SZ, new_guid) + + logging.info(f"已更新系统MachineGuid: {new_guid}") + return True + + except Exception as e: + logging.error(f"更新MachineGuid失败: {str(e)}") + return False + + def update_storage_json(self) -> bool: + """更新storage.json文件""" + try: + if not self.storage_file.exists(): + logging.error(f"未找到配置文件: {self.storage_file}") + return False + + # 备份文件 + if not self.backup_file(self.storage_file): + logging.warning("配置文件备份失败") + + # 生成新ID + ids = self.generate_ids() + + # 读取并更新配置 + with open(self.storage_file, "r", encoding="utf-8") as f: + config = json.load(f) + + # 更新ID + config['telemetry.machineId'] = ids['machine_id'] + config['telemetry.macMachineId'] = ids['mac_machine_id'] + config['telemetry.devDeviceId'] = ids['uuid'] + config['telemetry.sqmId'] = ids['sqm_id'] + + # 保存更新 + with open(self.storage_file, "w", encoding="utf-8", newline='\n') as f: + json.dump(config, f, indent=2) + + logging.info("已更新配置文件") + return True + + except Exception as e: + logging.error(f"更新配置文件失败: {str(e)}") + return False + + def disable_auto_update(self) -> bool: + """禁用自动更新""" + try: + updater_path = Path(self.localappdata) / "cursor-updater" + + # 删除现有文件/目录 + if updater_path.exists(): + if updater_path.is_dir(): + import shutil + shutil.rmtree(updater_path) + else: + updater_path.unlink() + + # 创建空文件并设置只读 + updater_path.touch() + import stat + updater_path.chmod(stat.S_IREAD) + + # 设置文件权限 + if os.name == 'nt': + subprocess.run( + f'icacls "{updater_path}" /inheritance:r /grant:r "{os.getenv("USERNAME")}:(R)"', + shell=True, + check=True + ) + + logging.info("已禁用自动更新") + return True + + except Exception as e: + logging.error(f"禁用自动更新失败: {str(e)}") + return False + + def reset_cursor(self, disable_update: bool = True) -> Tuple[bool, str]: + """重置Cursor + + Args: + disable_update: 是否禁用自动更新 + + Returns: + Tuple[bool, str]: (是否成功, 消息) + """ + try: + # 1. 检查管理员权限 + if os.name == 'nt': + import ctypes + if not ctypes.windll.shell32.IsUserAnAdmin(): + return False, "需要管理员权限来执行重置操作" + + # 2. 更新配置文件 + if not self.update_storage_json(): + return False, "更新配置文件失败" + + # 3. 更新系统MachineGuid + if not self.update_machine_guid(): + return False, "更新系统MachineGuid失败" + + # 4. 禁用自动更新(如果需要) + if disable_update and not self.disable_auto_update(): + logging.warning("禁用自动更新失败") + + # 5. 修改package.json + if self.package_json.exists(): + try: + with open(self.package_json, "r", encoding="utf-8") as f: + data = json.load(f) + + if "machineId" in data: + del data["machineId"] + data["updateUrl"] = "" + data["disableUpdate"] = True + + with open(self.package_json, "w", encoding="utf-8", newline='\n') as f: + json.dump(data, f, indent=2) + except Exception as e: + logging.warning(f"修改package.json失败: {str(e)}") + + return True, "Cursor重置成功" + + except Exception as e: + logging.error(f"重置过程出错: {str(e)}") + return False, f"重置失败: {str(e)}" \ No newline at end of file diff --git a/version.txt b/version.txt index 3ec370e..81f1b89 100644 --- a/version.txt +++ b/version.txt @@ -1 +1 @@ -3.4.5 \ No newline at end of file +3.4.7 \ No newline at end of file