197 lines
7.1 KiB
Python
197 lines
7.1 KiB
Python
import os
|
||
import sys
|
||
import sqlite3
|
||
import requests
|
||
import urllib3
|
||
import uuid
|
||
import winreg
|
||
import ctypes
|
||
import shutil
|
||
import json
|
||
from datetime import datetime
|
||
from typing import Dict, Tuple, Any, Optional
|
||
from logger import logger
|
||
from exit_cursor import ExitCursor
|
||
|
||
class MachineResetter:
|
||
"""机器码重置核心类"""
|
||
|
||
def __init__(self, storage_file: str = None, backup_dir: str = None):
|
||
"""
|
||
初始化机器码重置器
|
||
:param storage_file: 可选,storage.json文件路径
|
||
:param backup_dir: 可选,备份目录路径
|
||
"""
|
||
# 设置基础路径
|
||
appdata = os.getenv('APPDATA')
|
||
if not appdata:
|
||
raise EnvironmentError("APPDATA 环境变量未设置")
|
||
|
||
self.storage_file = storage_file or os.path.join(appdata, 'Cursor', 'User', 'globalStorage', 'storage.json')
|
||
self.backup_dir = backup_dir or os.path.join(os.path.dirname(self.storage_file), 'backups')
|
||
|
||
# 确保备份目录存在
|
||
os.makedirs(self.backup_dir, exist_ok=True)
|
||
|
||
# 获取日志记录器
|
||
self.logger = logger.get_logger("MachineReset")
|
||
|
||
# 进度回调
|
||
self._callback = None
|
||
|
||
def set_progress_callback(self, callback) -> None:
|
||
"""设置进度回调函数"""
|
||
self._callback = callback
|
||
|
||
def _update_progress(self, status: str, message: str) -> None:
|
||
"""更新进度信息"""
|
||
if self._callback:
|
||
self._callback({"status": status, "message": message})
|
||
|
||
@staticmethod
|
||
def is_admin() -> bool:
|
||
"""检查是否具有管理员权限"""
|
||
try:
|
||
return ctypes.windll.shell32.IsUserAnAdmin()
|
||
except:
|
||
return False
|
||
|
||
def _get_storage_content(self) -> Dict[str, Any]:
|
||
"""读取storage.json的内容"""
|
||
try:
|
||
with open(self.storage_file, 'r', encoding='utf-8') as f:
|
||
return json.load(f)
|
||
except Exception as e:
|
||
raise Exception(f"读取配置文件失败: {str(e)}")
|
||
|
||
def _save_storage_content(self, content: Dict[str, Any]) -> None:
|
||
"""保存内容到storage.json"""
|
||
try:
|
||
with open(self.storage_file, 'w', encoding='utf-8') as f:
|
||
json.dump(content, f, indent=2, ensure_ascii=False)
|
||
except Exception as e:
|
||
raise Exception(f"保存配置文件失败: {str(e)}")
|
||
|
||
def backup_file(self) -> str:
|
||
"""
|
||
备份配置文件
|
||
:return: 备份文件路径
|
||
"""
|
||
if not os.path.exists(self.storage_file):
|
||
raise FileNotFoundError(f"配置文件不存在:{self.storage_file}")
|
||
|
||
backup_name = f"storage.json.backup_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
|
||
backup_path = os.path.join(self.backup_dir, backup_name)
|
||
shutil.copy2(self.storage_file, backup_path)
|
||
self.logger.info(f"配置已备份到: {backup_path}")
|
||
return backup_path
|
||
|
||
def generate_new_ids(self) -> Dict[str, str]:
|
||
"""
|
||
生成新的机器码系列
|
||
:return: 包含新ID的字典
|
||
"""
|
||
# 生成 auth0|user_ 前缀的十六进制
|
||
prefix = "auth0|user_".encode('utf-8').hex()
|
||
# 生成32字节(64个十六进制字符)的随机数
|
||
random_part = uuid.uuid4().hex + uuid.uuid4().hex
|
||
|
||
return {
|
||
"machineId": f"{prefix}{random_part}",
|
||
"macMachineId": str(uuid.uuid4()),
|
||
"devDeviceId": str(uuid.uuid4()),
|
||
"sqmId": "{" + str(uuid.uuid4()).upper() + "}"
|
||
}
|
||
|
||
def update_config(self, new_ids: Dict[str, str]) -> None:
|
||
"""
|
||
更新配置文件
|
||
:param new_ids: 新的ID字典
|
||
"""
|
||
config_content = self._get_storage_content()
|
||
|
||
# 更新配置
|
||
config_content['telemetry.machineId'] = new_ids['machineId']
|
||
config_content['telemetry.macMachineId'] = new_ids['macMachineId']
|
||
config_content['telemetry.devDeviceId'] = new_ids['devDeviceId']
|
||
config_content['telemetry.sqmId'] = new_ids['sqmId']
|
||
|
||
# 保存更新后的配置
|
||
self._save_storage_content(config_content)
|
||
self.logger.info("配置文件更新成功")
|
||
|
||
def update_machine_guid(self) -> str:
|
||
"""
|
||
更新注册表中的MachineGuid
|
||
:return: 新的GUID
|
||
"""
|
||
reg_path = r"SOFTWARE\Microsoft\Cryptography"
|
||
try:
|
||
# 打开注册表键
|
||
reg_key = winreg.OpenKey(winreg.HKEY_LOCAL_MACHINE, reg_path, 0,
|
||
winreg.KEY_WRITE | winreg.KEY_READ)
|
||
|
||
# 获取当前GUID用于备份
|
||
current_guid, _ = winreg.QueryValueEx(reg_key, "MachineGuid")
|
||
self.logger.info(f"当前MachineGuid: {current_guid}")
|
||
|
||
# 生成新GUID
|
||
new_guid = str(uuid.uuid4())
|
||
|
||
# 更新注册表
|
||
winreg.SetValueEx(reg_key, "MachineGuid", 0, winreg.REG_SZ, new_guid)
|
||
|
||
# 验证更改
|
||
updated_guid, _ = winreg.QueryValueEx(reg_key, "MachineGuid")
|
||
if updated_guid != new_guid:
|
||
raise Exception("注册表验证失败,值未成功更新")
|
||
|
||
winreg.CloseKey(reg_key)
|
||
self.logger.info(f"新MachineGuid: {new_guid}")
|
||
return new_guid
|
||
|
||
except Exception as e:
|
||
raise Exception(f"更新MachineGuid失败: {str(e)}")
|
||
|
||
def run(self) -> Tuple[Dict[str, str], str]:
|
||
"""
|
||
执行重置操作
|
||
:return: (新ID字典, 新GUID)
|
||
"""
|
||
try:
|
||
# 检查管理员权限
|
||
self._update_progress("checking", "检查管理员权限...")
|
||
if not self.is_admin():
|
||
raise PermissionError("必须以管理员权限运行该程序")
|
||
|
||
# 先退出 Cursor 进程
|
||
self._update_progress("exiting", "正在关闭 Cursor 进程...")
|
||
if not ExitCursor():
|
||
raise Exception("无法完全关闭 Cursor 进程,请手动关闭后重试")
|
||
|
||
# 备份配置文件
|
||
self._update_progress("backup", "备份配置文件...")
|
||
backup_path = self.backup_file()
|
||
self.logger.info(f"配置已备份到: {backup_path}")
|
||
|
||
# 生成新机器码
|
||
self._update_progress("generating", "生成新的机器码...")
|
||
new_ids = self.generate_new_ids()
|
||
|
||
# 更新配置文件
|
||
self._update_progress("updating", "更新配置文件...")
|
||
self.update_config(new_ids)
|
||
|
||
# 更新注册表
|
||
self._update_progress("registry", "更新注册表...")
|
||
new_guid = self.update_machine_guid()
|
||
|
||
self._update_progress("complete", "重置完成")
|
||
self.logger.info("机器码重置成功")
|
||
|
||
return new_ids, new_guid
|
||
|
||
except Exception as e:
|
||
self.logger.error(f"重置失败: {str(e)}")
|
||
self._update_progress("error", f"操作失败: {str(e)}")
|
||
raise |