Files
tingquanzhushou/machine_resetter.py
2025-02-20 20:20:19 +08:00

197 lines
7.1 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import os
import sys
import sqlite3
import requests
import urllib3
import uuid
import winreg
import ctypes
import shutil
import json
from datetime import datetime
from typing import Dict, Tuple, Any, Optional
from logger import logger
from exit_cursor import ExitCursor
class MachineResetter:
"""机器码重置核心类"""
def __init__(self, storage_file: str = None, backup_dir: str = None):
"""
初始化机器码重置器
:param storage_file: 可选storage.json文件路径
:param backup_dir: 可选,备份目录路径
"""
# 设置基础路径
appdata = os.getenv('APPDATA')
if not appdata:
raise EnvironmentError("APPDATA 环境变量未设置")
self.storage_file = storage_file or os.path.join(appdata, 'Cursor', 'User', 'globalStorage', 'storage.json')
self.backup_dir = backup_dir or os.path.join(os.path.dirname(self.storage_file), 'backups')
# 确保备份目录存在
os.makedirs(self.backup_dir, exist_ok=True)
# 获取日志记录器
self.logger = logger.get_logger("MachineReset")
# 进度回调
self._callback = None
def set_progress_callback(self, callback) -> None:
"""设置进度回调函数"""
self._callback = callback
def _update_progress(self, status: str, message: str) -> None:
"""更新进度信息"""
if self._callback:
self._callback({"status": status, "message": message})
@staticmethod
def is_admin() -> bool:
"""检查是否具有管理员权限"""
try:
return ctypes.windll.shell32.IsUserAnAdmin()
except:
return False
def _get_storage_content(self) -> Dict[str, Any]:
"""读取storage.json的内容"""
try:
with open(self.storage_file, 'r', encoding='utf-8') as f:
return json.load(f)
except Exception as e:
raise Exception(f"读取配置文件失败: {str(e)}")
def _save_storage_content(self, content: Dict[str, Any]) -> None:
"""保存内容到storage.json"""
try:
with open(self.storage_file, 'w', encoding='utf-8') as f:
json.dump(content, f, indent=2, ensure_ascii=False)
except Exception as e:
raise Exception(f"保存配置文件失败: {str(e)}")
def backup_file(self) -> str:
"""
备份配置文件
:return: 备份文件路径
"""
if not os.path.exists(self.storage_file):
raise FileNotFoundError(f"配置文件不存在:{self.storage_file}")
backup_name = f"storage.json.backup_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
backup_path = os.path.join(self.backup_dir, backup_name)
shutil.copy2(self.storage_file, backup_path)
self.logger.info(f"配置已备份到: {backup_path}")
return backup_path
def generate_new_ids(self) -> Dict[str, str]:
"""
生成新的机器码系列
:return: 包含新ID的字典
"""
# 生成 auth0|user_ 前缀的十六进制
prefix = "auth0|user_".encode('utf-8').hex()
# 生成32字节(64个十六进制字符)的随机数
random_part = uuid.uuid4().hex + uuid.uuid4().hex
return {
"machineId": f"{prefix}{random_part}",
"macMachineId": str(uuid.uuid4()),
"devDeviceId": str(uuid.uuid4()),
"sqmId": "{" + str(uuid.uuid4()).upper() + "}"
}
def update_config(self, new_ids: Dict[str, str]) -> None:
"""
更新配置文件
:param new_ids: 新的ID字典
"""
config_content = self._get_storage_content()
# 更新配置
config_content['telemetry.machineId'] = new_ids['machineId']
config_content['telemetry.macMachineId'] = new_ids['macMachineId']
config_content['telemetry.devDeviceId'] = new_ids['devDeviceId']
config_content['telemetry.sqmId'] = new_ids['sqmId']
# 保存更新后的配置
self._save_storage_content(config_content)
self.logger.info("配置文件更新成功")
def update_machine_guid(self) -> str:
"""
更新注册表中的MachineGuid
:return: 新的GUID
"""
reg_path = r"SOFTWARE\Microsoft\Cryptography"
try:
# 打开注册表键
reg_key = winreg.OpenKey(winreg.HKEY_LOCAL_MACHINE, reg_path, 0,
winreg.KEY_WRITE | winreg.KEY_READ)
# 获取当前GUID用于备份
current_guid, _ = winreg.QueryValueEx(reg_key, "MachineGuid")
self.logger.info(f"当前MachineGuid: {current_guid}")
# 生成新GUID
new_guid = str(uuid.uuid4())
# 更新注册表
winreg.SetValueEx(reg_key, "MachineGuid", 0, winreg.REG_SZ, new_guid)
# 验证更改
updated_guid, _ = winreg.QueryValueEx(reg_key, "MachineGuid")
if updated_guid != new_guid:
raise Exception("注册表验证失败,值未成功更新")
winreg.CloseKey(reg_key)
self.logger.info(f"新MachineGuid: {new_guid}")
return new_guid
except Exception as e:
raise Exception(f"更新MachineGuid失败: {str(e)}")
def run(self) -> Tuple[Dict[str, str], str]:
"""
执行重置操作
:return: (新ID字典, 新GUID)
"""
try:
# 检查管理员权限
self._update_progress("checking", "检查管理员权限...")
if not self.is_admin():
raise PermissionError("必须以管理员权限运行该程序")
# 先退出 Cursor 进程
self._update_progress("exiting", "正在关闭 Cursor 进程...")
if not ExitCursor():
raise Exception("无法完全关闭 Cursor 进程,请手动关闭后重试")
# 备份配置文件
self._update_progress("backup", "备份配置文件...")
backup_path = self.backup_file()
self.logger.info(f"配置已备份到: {backup_path}")
# 生成新机器码
self._update_progress("generating", "生成新的机器码...")
new_ids = self.generate_new_ids()
# 更新配置文件
self._update_progress("updating", "更新配置文件...")
self.update_config(new_ids)
# 更新注册表
self._update_progress("registry", "更新注册表...")
new_guid = self.update_machine_guid()
self._update_progress("complete", "重置完成")
self.logger.info("机器码重置成功")
return new_ids, new_guid
except Exception as e:
self.logger.error(f"重置失败: {str(e)}")
self._update_progress("error", f"操作失败: {str(e)}")
raise