Files
tingquanzhushou/machine_resetter.py
huangzhenpc 2d603c33aa xxx
2025-05-17 18:16:24 +08:00

250 lines
9.4 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import os
import sys
import sqlite3
import requests
import urllib3
import uuid
import winreg
import ctypes
import shutil
import json
import secrets
from datetime import datetime
from typing import Dict, Tuple, Any, Optional
from logger import logger
from exit_cursor import ExitCursor
class MachineResetter:
"""机器码重置核心类"""
def __init__(self, storage_file: str = None, backup_dir: str = None):
"""
初始化机器码重置器
:param storage_file: 可选storage.json文件路径
:param backup_dir: 可选,备份目录路径
"""
# 设置基础路径
appdata = os.getenv('APPDATA')
if not appdata:
raise EnvironmentError("APPDATA 环境变量未设置")
self.storage_file = storage_file or os.path.join(appdata, 'Cursor', 'User', 'globalStorage', 'storage.json')
self.backup_dir = backup_dir or os.path.join(os.path.dirname(self.storage_file), 'backups')
# 确保备份目录存在
os.makedirs(self.backup_dir, exist_ok=True)
# 获取日志记录器
self.logger = logger.get_logger("MachineReset")
# 进度回调
self._callback = None
def set_progress_callback(self, callback) -> None:
"""设置进度回调函数"""
self._callback = callback
def _update_progress(self, status: str, message: str) -> None:
"""更新进度信息"""
if self._callback:
self._callback({"status": status, "message": message})
@staticmethod
def is_admin() -> bool:
"""检查是否具有管理员权限"""
try:
return ctypes.windll.shell32.IsUserAnAdmin()
except:
return False
def _get_storage_content(self) -> Dict[str, Any]:
"""读取storage.json的内容"""
try:
with open(self.storage_file, 'r', encoding='utf-8') as f:
return json.load(f)
except Exception as e:
raise Exception(f"读取配置文件失败: {str(e)}")
def _save_storage_content(self, content: Dict[str, Any]) -> None:
"""保存内容到storage.json"""
try:
with open(self.storage_file, 'w', encoding='utf-8') as f:
json.dump(content, f, indent=2, ensure_ascii=False)
except Exception as e:
raise Exception(f"保存配置文件失败: {str(e)}")
def backup_file(self) -> str:
"""
备份配置文件
:return: 备份文件路径
"""
if not os.path.exists(self.storage_file):
raise FileNotFoundError(f"配置文件不存在:{self.storage_file}")
backup_name = f"storage.json.backup_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
backup_path = os.path.join(self.backup_dir, backup_name)
shutil.copy2(self.storage_file, backup_path)
self.logger.info(f"配置已备份到: {backup_path}")
return backup_path
def check_cursor_version(self) -> Tuple[bool, str]:
"""
检查 Cursor 版本是否支持
:return: (是否支持, 版本号)
"""
try:
# 主要检测路径
package_paths = [
os.path.join(os.getenv('LOCALAPPDATA'), 'Programs', 'cursor', 'resources', 'app', 'package.json'),
os.path.join(os.getenv('LOCALAPPDATA'), 'cursor', 'resources', 'app', 'package.json')
]
for path in package_paths:
if os.path.exists(path):
with open(path, 'r', encoding='utf-8') as f:
data = json.load(f)
version = data.get('version', '')
# 检查版本号是否低于 0.44.0
version_parts = list(map(int, version.split('.')))
if version_parts[0] == 0 and version_parts[1] < 44:
return False, version
return True, version
return True, "未知版本"
except Exception as e:
self.logger.warning(f"版本检测失败: {str(e)}")
return True, "版本检测失败"
def generate_secure_random(self, length: int) -> str:
"""
生成安全的随机十六进制字符串
:param length: 字节长度
:return: 十六进制字符串
"""
return secrets.token_hex(length)
def generate_new_ids(self) -> Dict[str, str]:
"""
生成新的机器码系列
:return: 包含新ID的字典
"""
# 生成 auth0|user_ 前缀的十六进制
prefix = "auth0|user_".encode('utf-8').hex()
# 使用安全的随机数生成方法
random_part = self.generate_secure_random(32) # 生成64个十六进制字符
return {
"machineId": f"{prefix}{random_part}",
"macMachineId": str(uuid.uuid4()),
"devDeviceId": str(uuid.uuid4()),
"sqmId": "{" + str(uuid.uuid4()).upper() + "}"
}
def update_config(self, new_ids: Dict[str, str]) -> None:
"""
更新配置文件
:param new_ids: 新的ID字典
"""
config_content = self._get_storage_content()
# 更新配置
config_content['telemetry.machineId'] = new_ids['machineId']
config_content['telemetry.macMachineId'] = new_ids['macMachineId']
config_content['telemetry.devDeviceId'] = new_ids['devDeviceId']
config_content['telemetry.sqmId'] = new_ids['sqmId']
# 保存更新后的配置
self._save_storage_content(config_content)
self.logger.info("配置文件更新成功")
def update_machine_guid(self) -> str:
"""
更新注册表中的MachineGuid
:return: 新的GUID
"""
reg_path = r"SOFTWARE\Microsoft\Cryptography"
try:
# 打开注册表键
reg_key = winreg.OpenKey(winreg.HKEY_LOCAL_MACHINE, reg_path, 0,
winreg.KEY_WRITE | winreg.KEY_READ)
# 获取当前GUID用于备份
current_guid, _ = winreg.QueryValueEx(reg_key, "MachineGuid")
self.logger.info(f"当前MachineGuid: {current_guid}")
# 生成新GUID
new_guid = str(uuid.uuid4())
# 更新注册表
winreg.SetValueEx(reg_key, "MachineGuid", 0, winreg.REG_SZ, new_guid)
# 验证更改
updated_guid, _ = winreg.QueryValueEx(reg_key, "MachineGuid")
if updated_guid != new_guid:
raise Exception("注册表验证失败,值未成功更新")
winreg.CloseKey(reg_key)
self.logger.info(f"新MachineGuid: {new_guid}")
return new_guid
except Exception as e:
raise Exception(f"更新MachineGuid失败: {str(e)}")
def run(self) -> Tuple[Dict[str, str], str]:
"""
执行重置操作
:return: (新ID字典, 新GUID)
"""
try:
# 检查 Cursor 版本
version_supported, version = self.check_cursor_version()
if not version_supported:
raise Exception(f"当前 Cursor 版本 ({version}) 不支持重置,请使用 v0.44.0 及以上版本")
# 检查管理员权限
self._update_progress("checking", "检查管理员权限...")
if not self.is_admin():
raise PermissionError("必须以管理员权限运行该程序")
# 先退出 Cursor 进程
self._update_progress("exiting", "正在关闭 Cursor 进程...")
if not ExitCursor():
raise Exception("无法完全关闭 Cursor 进程,请手动关闭后重试")
# 备份配置文件
self._update_progress("backup", "备份配置文件...")
backup_path = self.backup_file()
self.logger.info(f"配置已备份到: {backup_path}")
try:
# 生成新机器码
self._update_progress("generating", "生成新的机器码...")
new_ids = self.generate_new_ids()
# 更新配置文件
self._update_progress("updating", "更新配置文件...")
self.update_config(new_ids)
# 更新注册表
self._update_progress("registry", "更新注册表...")
new_guid = self.update_machine_guid()
self._update_progress("complete", "重置完成")
self.logger.info("机器码重置成功")
return new_ids, new_guid
except Exception as e:
# 如果出错,尝试恢复备份
try:
if os.path.exists(backup_path):
shutil.copy2(backup_path, self.storage_file)
self.logger.info("已恢复配置文件备份")
except Exception as restore_error:
self.logger.error(f"恢复备份失败: {str(restore_error)}")
raise
except Exception as e:
self.logger.error(f"重置失败: {str(e)}")
self._update_progress("error", f"操作失败: {str(e)}")
raise