Compare commits

...

4 Commits

7 changed files with 1035 additions and 280 deletions

73
build_mac_new.py Normal file
View File

@@ -0,0 +1,73 @@
import PyInstaller.__main__
import os
import sys
import shutil
# 获取当前脚本所在目录
current_dir = os.path.dirname(os.path.abspath(__file__))
# 清理之前的构建
dist_dir = os.path.join(current_dir, 'dist')
build_dir = os.path.join(current_dir, 'build')
if os.path.exists(dist_dir):
shutil.rmtree(dist_dir)
if os.path.exists(build_dir):
shutil.rmtree(build_dir)
# 创建必要的目录
os.makedirs('dist', exist_ok=True)
os.makedirs('build', exist_ok=True)
# 创建logs目录
logs_dir = os.path.join(current_dir, 'logs')
os.makedirs(logs_dir, exist_ok=True)
# 打包参数
params = [
'gui/main_mac.py', # 主程序入口
'--name=听泉助手', # 应用名称
'-w', # 不显示控制台窗口
'--clean', # 清理临时文件
'--noconfirm', # 不确认覆盖
'--debug=imports', # 只显示导入相关的调试信息
'--osx-bundle-identifier=com.cursor.pro', # macOS包标识符
f'--distpath={os.path.join(current_dir, "dist")}', # 输出目录
f'--workpath={os.path.join(current_dir, "build")}', # 工作目录
f'--specpath={current_dir}', # spec文件目录
'--add-data=logger.py:.', # 添加额外文件 (Mac系统使用:分隔符)
'--add-data=update_cursor_token.py:.',
'--add-data=cursor_auth_manager.py:.',
'--add-data=reset_machine.py:.',
'--add-data=patch_cursor_get_machine_id.py:.',
'--add-data=exit_cursor.py:.',
'--add-data=go_cursor_help.py:.',
'--add-data=logo.py:.',
'--add-data=config.py:.',
'--add-data=browser_utils.py:.',
'--add-data=get_email_code.py:.',
'--hidden-import=PyQt5',
'--hidden-import=PyQt5.QtCore',
'--hidden-import=PyQt5.QtGui',
'--hidden-import=PyQt5.QtWidgets',
'--hidden-import=requests',
'--hidden-import=urllib3',
'--hidden-import=psutil',
'--hidden-import=colorama',
]
# 执行打包
PyInstaller.__main__.run(params)
print("打包完成!应用程序包(.app)已生成在dist目录下。")
# 创建必要的目录和文件
app_path = os.path.join(current_dir, 'dist', 'CursorPro.app', 'Contents', 'MacOS')
if os.path.exists(app_path):
# 创建logs目录
os.makedirs(os.path.join(app_path, 'logs'), exist_ok=True)
# 创建空的error.log文件
with open(os.path.join(app_path, 'error.log'), 'w') as f:
f.write('')
print("已创建必要的目录和文件")

219
config.py
View File

@@ -1,146 +1,95 @@
from dotenv import load_dotenv
import os
import sys
from logger import logging
import json
import logging
from pathlib import Path
class Config:
"""配置类"""
def __init__(self):
# 获取应用程序的根目录路径
if getattr(sys, "frozen", False):
# 如果是打包后的可执行文件
application_path = os.path.dirname(sys.executable)
else:
# 如果是开发环境
application_path = os.path.dirname(os.path.abspath(__file__))
# 指定 .env 文件的路径
dotenv_path = os.path.join(application_path, ".env")
if not os.path.exists(dotenv_path):
raise FileNotFoundError(f"文件 {dotenv_path} 不存在")
# 加载 .env 文件
load_dotenv(dotenv_path)
self.imap = False
self.temp_mail = os.getenv("TEMP_MAIL", "").strip().split("@")[0]
self.temp_mail_epin = os.getenv("TEMP_MAIL_EPIN", "").strip()
self.temp_mail_ext = os.getenv("TEMP_MAIL_EXT", "").strip()
self.domain = os.getenv("DOMAIN", "").strip()
# 如果临时邮箱为null则加载IMAP
if self.temp_mail == "null":
self.imap = True
self.imap_server = os.getenv("IMAP_SERVER", "").strip()
self.imap_port = os.getenv("IMAP_PORT", "").strip()
self.imap_user = os.getenv("IMAP_USER", "").strip()
self.imap_pass = os.getenv("IMAP_PASS", "").strip()
self.imap_dir = os.getenv("IMAP_DIR", "inbox").strip()
self.check_config()
def get_temp_mail(self):
return self.temp_mail
def get_temp_mail_epin(self):
return self.temp_mail_epin
def get_temp_mail_ext(self):
return self.temp_mail_ext
def get_imap(self):
if not self.imap:
return False
self.base_url = "https://cursorapi.nosqli.com"
self.api_endpoints = {
"activate": f"{self.base_url}/admin/api.member/activate",
"status": f"{self.base_url}/admin/api.member/status",
"get_unused": f"{self.base_url}/admin/api.account/getUnused",
"heartbeat": f"{self.base_url}/admin/api.account/heartbeat"
}
# macOS配置目录
self.config_dir = Path(os.path.expanduser("~")) / ".cursor_pro"
self.config_file = self.config_dir / "config.json"
self.member_file = self.config_dir / "member.json"
self.load_config()
def load_config(self):
"""加载配置"""
try:
self.config_dir.mkdir(parents=True, exist_ok=True)
if not self.config_file.exists():
self.save_default_config()
with open(self.config_file, "r", encoding="utf-8") as f:
config = json.load(f)
self.api_token = config.get("api_token", "")
except Exception as e:
logging.error(f"加载配置失败: {str(e)}")
self.api_token = ""
def save_member_info(self, info: dict):
"""保存会员信息"""
try:
with open(self.member_file, "w", encoding="utf-8") as f:
json.dump(info, f, indent=2, ensure_ascii=False)
logging.info("会员信息已保存")
except Exception as e:
logging.error(f"保存会员信息失败: {str(e)}")
def load_member_info(self) -> dict:
"""读取会员信息"""
try:
if self.member_file.exists():
with open(self.member_file, "r", encoding="utf-8") as f:
info = json.load(f)
logging.info(f"已读取会员信息: 到期时间 {info.get('expire_time', '')}")
return info
except Exception as e:
logging.error(f"读取会员信息失败: {str(e)}")
return {
"imap_server": self.imap_server,
"imap_port": self.imap_port,
"imap_user": self.imap_user,
"imap_pass": self.imap_pass,
"imap_dir": self.imap_dir,
"expire_time": "",
"days": 0,
"new_days": 0
}
def get_domain(self):
return self.domain
def check_config(self):
"""检查配置项是否有效
检查规则:
1. 如果使用 tempmail.plus需要配置 TEMP_MAIL 和 DOMAIN
2. 如果使用 IMAP需要配置 IMAP_SERVER、IMAP_PORT、IMAP_USER、IMAP_PASS
3. IMAP_DIR 是可选的
"""
# 基础配置检查
required_configs = {
"domain": "域名",
def save_default_config(self):
"""保存默认配置"""
config = {
"api_token": ""
}
with open(self.config_file, "w", encoding="utf-8") as f:
json.dump(config, f, indent=2, ensure_ascii=False)
def save_config(self, api_token: str):
"""保存新的配置"""
config = {
"api_token": api_token
}
with open(self.config_file, "w", encoding="utf-8") as f:
json.dump(config, f, indent=2, ensure_ascii=False)
self.api_token = api_token
logging.info("配置已更新")
# 检查基础配置
for key, name in required_configs.items():
if not self.check_is_valid(getattr(self, key)):
raise ValueError(f"{name}未配置,请在 .env 文件中设置 {key.upper()}")
# 检查邮箱配置
if self.temp_mail != "null":
# tempmail.plus 模式
if not self.check_is_valid(self.temp_mail):
raise ValueError("临时邮箱未配置,请在 .env 文件中设置 TEMP_MAIL")
else:
# IMAP 模式
imap_configs = {
"imap_server": "IMAP服务器",
"imap_port": "IMAP端口",
"imap_user": "IMAP用户名",
"imap_pass": "IMAP密码",
}
for key, name in imap_configs.items():
value = getattr(self, key)
if value == "null" or not self.check_is_valid(value):
raise ValueError(
f"{name}未配置,请在 .env 文件中设置 {key.upper()}"
)
# IMAP_DIR 是可选的,如果设置了就检查其有效性
if self.imap_dir != "null" and not self.check_is_valid(self.imap_dir):
raise ValueError(
"IMAP收件箱目录配置无效请在 .env 文件中正确设置 IMAP_DIR"
)
def check_is_valid(self, value):
"""检查配置项是否有效
def get_api_url(self, endpoint_name: str) -> str:
"""获取API端点URL
Args:
value: 配置项的值
endpoint_name: 端点名称
Returns:
bool: 配置项是否有效
str: 完整的API URL
"""
return isinstance(value, str) and len(str(value).strip()) > 0
def print_config(self):
if self.imap:
logging.info(f"\033[32mIMAP服务器: {self.imap_server}\033[0m")
logging.info(f"\033[32mIMAP端口: {self.imap_port}\033[0m")
logging.info(f"\033[32mIMAP用户名: {self.imap_user}\033[0m")
logging.info(f"\033[32mIMAP密码: {'*' * len(self.imap_pass)}\033[0m")
logging.info(f"\033[32mIMAP收件箱目录: {self.imap_dir}\033[0m")
if self.temp_mail != "null":
logging.info(
f"\033[32m临时邮箱: {self.temp_mail}{self.temp_mail_ext}\033[0m"
)
logging.info(f"\033[32m域名: {self.domain}\033[0m")
# 使用示例
if __name__ == "__main__":
try:
config = Config()
print("环境变量加载成功!")
config.print_config()
except ValueError as e:
print(f"错误: {e}")
url = self.api_endpoints.get(endpoint_name, "")
if not url:
logging.error(f"未找到API端点: {endpoint_name}")
return url

425
gui/main_mac.py Normal file
View File

@@ -0,0 +1,425 @@
# -*- coding: utf-8 -*-
import sys
import os
import traceback
from datetime import datetime
def get_app_path():
"""获取应用程序路径"""
if getattr(sys, 'frozen', False):
# 如果是打包后的应用
return os.path.dirname(sys.executable)
else:
# 如果是开发环境
return os.path.dirname(os.path.abspath(__file__))
def setup_logging():
"""设置日志"""
app_path = get_app_path()
log_dir = os.path.join(app_path, 'logs')
os.makedirs(log_dir, exist_ok=True)
log_file = os.path.join(log_dir, 'app.log')
error_log = os.path.join(app_path, 'error.log')
# 配置日志
logging.basicConfig(
filename=log_file,
level=logging.DEBUG,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
# 设置错误日志处理
def handle_exception(exc_type, exc_value, exc_traceback):
if issubclass(exc_type, KeyboardInterrupt):
sys.__excepthook__(exc_type, exc_value, exc_traceback)
return
error_msg = ''.join(traceback.format_exception(exc_type, exc_value, exc_traceback))
try:
with open(error_log, 'a') as f:
f.write(f"\n{'-'*60}\n")
f.write(f"Time: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n")
f.write(error_msg)
logging.error(f"Uncaught exception:\n{error_msg}")
except Exception as e:
print(f"Error writing to log file: {str(e)}")
print(error_msg)
sys.excepthook = handle_exception
# 添加父目录到系统路径
current_dir = os.path.dirname(os.path.abspath(__file__))
parent_dir = os.path.dirname(current_dir)
sys.path.append(parent_dir)
try:
from PyQt5.QtWidgets import (QApplication, QMainWindow, QWidget, QVBoxLayout,
QPushButton, QLabel, QLineEdit, QTextEdit, QMessageBox,
QHBoxLayout, QFrame, QStackedWidget)
from PyQt5.QtCore import Qt, QThread, pyqtSignal
from PyQt5.QtGui import QFont, QIcon, QPalette, QColor
from update_cursor_token import CursorTokenUpdater
from logger import logging
except Exception as e:
error_path = os.path.join(get_app_path(), 'error.log')
with open(error_path, 'w') as f:
f.write(f"Import Error: {str(e)}\n")
f.write(traceback.format_exc())
sys.exit(1)
# macOS 风格的颜色
MACOS_COLORS = {
'background': '#F5F5F7', # 浅灰色背景
'button': '#0066CC', # 蓝色按钮
'button_hover': '#0052A3', # 深蓝色悬停
'button_pressed': '#003D7A', # 更深的蓝色按下
'text': '#1D1D1F', # 深色文字
'frame': '#FFFFFF', # 白色框架
'input': '#FFFFFF', # 白色输入框
'input_text': '#1D1D1F', # 深色输入文字
'border': '#E5E5E5' # 边框颜色
}
class UpdateWorker(QThread):
"""后台更新线程"""
finished = pyqtSignal(bool, str)
progress = pyqtSignal(str)
def __init__(self, updater):
super().__init__()
self.updater = updater
def run(self):
try:
success = self.updater.full_update_process()
if success:
self.finished.emit(True, "更新成功!")
else:
self.finished.emit(False, "更新失败,请查看日志")
except Exception as e:
self.finished.emit(False, f"发生错误: {str(e)}")
class MainWindow(QMainWindow):
def __init__(self):
super().__init__()
try:
logging.info("正在初始化主窗口...")
self.updater = CursorTokenUpdater()
self.init_ui()
# 检查会员状态
self.check_member_status()
logging.info("主窗口初始化完成")
except Exception as e:
logging.error(f"初始化主窗口时发生错误: {str(e)}")
QMessageBox.critical(self, "错误", f"初始化失败: {str(e)}")
def init_ui(self):
# 设置窗口基本属性
self.setWindowTitle('听泉助手')
self.setMinimumSize(600, 500)
# 设置macOS风格的样式
self.setStyleSheet(f"""
QMainWindow {{
background-color: {MACOS_COLORS['background']};
}}
QPushButton {{
background-color: {MACOS_COLORS['button']};
color: white;
border: none;
border-radius: 6px;
padding: 8px 16px;
font-size: 13px;
min-width: 100px;
}}
QPushButton:hover {{
background-color: {MACOS_COLORS['button_hover']};
}}
QPushButton:pressed {{
background-color: {MACOS_COLORS['button_pressed']};
}}
QLabel {{
color: {MACOS_COLORS['text']};
font-size: 13px;
}}
QLabel[title="true"] {{
font-size: 24px;
font-weight: bold;
}}
QTextEdit {{
background-color: {MACOS_COLORS['input']};
color: {MACOS_COLORS['input_text']};
border: 1px solid {MACOS_COLORS['border']};
border-radius: 6px;
padding: 8px;
font-size: 13px;
}}
QLineEdit {{
background-color: {MACOS_COLORS['input']};
color: {MACOS_COLORS['input_text']};
border: 1px solid {MACOS_COLORS['border']};
border-radius: 6px;
padding: 8px;
font-size: 13px;
}}
QFrame {{
background-color: {MACOS_COLORS['frame']};
border: 1px solid {MACOS_COLORS['border']};
border-radius: 8px;
}}
QMessageBox {{
background-color: {MACOS_COLORS['background']};
}}
QMessageBox QPushButton {{
min-width: 80px;
padding: 6px 12px;
}}
""")
# 创建主窗口部件
central_widget = QWidget()
self.setCentralWidget(central_widget)
layout = QVBoxLayout(central_widget)
layout.setSpacing(16)
layout.setContentsMargins(20, 20, 20, 20)
# 标题
title_label = QLabel("听泉助手")
title_label.setProperty("title", "true")
title_label.setAlignment(Qt.AlignCenter)
layout.addWidget(title_label)
# 设备ID显示区域
device_frame = QFrame()
device_layout = QHBoxLayout(device_frame)
device_layout.setContentsMargins(16, 16, 16, 16)
device_id_label = QLabel("设备标识:")
self.device_id_text = QLineEdit()
self.device_id_text.setReadOnly(True)
try:
hardware_id = self.updater.hardware_id
logging.info(f"获取到硬件ID: {hardware_id}")
self.device_id_text.setText(hardware_id)
except Exception as e:
logging.error(f"获取硬件ID失败: {str(e)}")
self.device_id_text.setText("获取失败")
copy_button = QPushButton("复制ID")
copy_button.setMaximumWidth(100)
copy_button.clicked.connect(self.copy_device_id)
device_layout.addWidget(device_id_label)
device_layout.addWidget(self.device_id_text)
device_layout.addWidget(copy_button)
layout.addWidget(device_frame)
# 会员状态区域
member_frame = QFrame()
member_layout = QVBoxLayout(member_frame)
member_layout.setContentsMargins(16, 16, 16, 16)
member_title = QLabel("会员状态")
member_title.setAlignment(Qt.AlignCenter)
member_layout.addWidget(member_title)
self.member_info = QTextEdit()
self.member_info.setReadOnly(True)
self.member_info.setFixedHeight(80)
self.member_info.setText("会员状态: 正常\n到期时间: 2025-02-22 20:44:23")
member_layout.addWidget(self.member_info)
layout.addWidget(member_frame)
# 激活区域
activate_frame = QFrame()
activate_layout = QVBoxLayout(activate_frame)
activate_layout.setContentsMargins(16, 16, 16, 16)
activate_title = QLabel("激活会员")
activate_title.setAlignment(Qt.AlignCenter)
input_layout = QHBoxLayout()
self.activate_input = QLineEdit()
self.activate_input.setPlaceholderText("请输入激活码")
activate_button = QPushButton("激活")
activate_button.setMaximumWidth(100)
activate_button.clicked.connect(self.activate_license)
input_layout.addWidget(self.activate_input)
input_layout.addWidget(activate_button)
activate_layout.addWidget(activate_title)
activate_layout.addLayout(input_layout)
layout.addWidget(activate_frame)
# 功能按钮区域
button_frame = QFrame()
button_layout = QVBoxLayout(button_frame)
button_layout.setSpacing(12)
button_layout.setContentsMargins(16, 16, 16, 16)
self.update_button = QPushButton("刷新授权")
self.update_button.clicked.connect(self.start_update)
self.reset_button = QPushButton("重置机器码")
self.reset_button.clicked.connect(self.reset_machine)
self.disable_update_button = QPushButton("禁用更新")
self.disable_update_button.clicked.connect(self.disable_cursor_update)
button_layout.addWidget(self.update_button)
button_layout.addWidget(self.reset_button)
button_layout.addWidget(self.disable_update_button)
layout.addWidget(button_frame)
def copy_device_id(self):
clipboard = QApplication.clipboard()
clipboard.setText(self.device_id_text.text())
QMessageBox.information(self, "成功", "设备ID已复制到剪贴板")
def append_log(self, text):
self.member_info.append(text)
def start_update(self):
self.update_button.setEnabled(False)
self.worker = UpdateWorker(self.updater)
self.worker.finished.connect(self.update_finished)
self.worker.start()
def update_finished(self, success, message):
self.update_button.setEnabled(True)
if success:
QMessageBox.information(self, "成功", message)
else:
QMessageBox.warning(self, "失败", message)
def reset_machine(self):
try:
self.updater.reset_machine_id()
QMessageBox.information(self, "成功", "机器ID已重置")
except Exception as e:
QMessageBox.critical(self, "错误", f"重置失败: {str(e)}")
def activate_license(self):
"""激活许可证"""
try:
license_key = self.activate_input.text().strip()
if not license_key:
QMessageBox.warning(self, "提示", "请输入激活码")
return
# 禁用激活按钮,防止重复点击
self.activate_input.setEnabled(False)
self.update_button.setEnabled(False)
# 显示处理中的提示
QApplication.processEvents()
# 调用激活接口
success, message, account_info = self.updater.check_activation_code(license_key)
if success:
# 更新界面显示
self.member_info.clear()
self.member_info.append(f"会员状态: 已激活")
self.member_info.append(f"到期时间: {account_info['expire_time']}")
self.member_info.append(f"剩余天数: {account_info['days_left']}")
# 显示成功消息
QMessageBox.information(self, "激活成功",
f"设备已成功激活!\n"
f"到期时间: {account_info['expire_time']}\n"
f"剩余天数: {account_info['days_left']}")
# 清空激活码输入框
self.activate_input.clear()
logging.info(f"设备激活成功,到期时间: {account_info['expire_time']}")
else:
QMessageBox.warning(self, "激活失败", message)
logging.error(f"激活失败: {message}")
except Exception as e:
logging.error(f"激活过程中发生错误: {str(e)}")
QMessageBox.critical(self, "错误", f"激活过程发生错误: {str(e)}")
finally:
# 恢复按钮状态
self.activate_input.setEnabled(True)
self.update_button.setEnabled(True)
def disable_cursor_update(self):
try:
# 这里添加禁用更新逻辑
QMessageBox.information(self, "成功", "已禁用Cursor更新")
except Exception as e:
QMessageBox.critical(self, "错误", f"操作失败: {str(e)}")
def check_member_status(self):
"""检查会员状态"""
try:
logging.info("正在检查会员状态...")
self.member_info.clear()
self.member_info.append("正在检查会员状态...")
QApplication.processEvents()
# 调用API检查状态
success, message, account_info = self.updater.check_member_status()
if success and account_info:
# 更新会员信息显示
self.member_info.clear()
self.member_info.append(f"会员状态: 已激活")
self.member_info.append(f"到期时间: {account_info['expire_time']}")
self.member_info.append(f"剩余天数: {account_info['days_left']}")
logging.info(f"会员状态检查完成 - 到期时间: {account_info['expire_time']}")
else:
# 显示未激活状态
self.member_info.clear()
self.member_info.append("会员状态: 未激活")
self.member_info.append("请输入激活码进行激活")
logging.warning("会员状态检查结果:未激活")
except Exception as e:
self.member_info.clear()
self.member_info.append("会员状态: 检查失败")
self.member_info.append("请稍后重试")
logging.error(f"检查会员状态时发生错误: {str(e)}")
def main():
try:
# 设置日志
setup_logging()
logging.info("应用程序启动")
app = QApplication(sys.argv)
app.setApplicationName("听泉助手")
app.setOrganizationName("听泉")
app.setOrganizationDomain("cursor.pro")
try:
window = MainWindow()
window.show()
logging.info("主窗口已显示")
return app.exec_()
except Exception as e:
logging.error(f"主窗口创建失败: {str(e)}")
logging.error(traceback.format_exc())
QMessageBox.critical(None, "错误", f"程序启动失败: {str(e)}")
return 1
except Exception as e:
error_path = os.path.join(get_app_path(), 'error.log')
with open(error_path, 'a') as f:
f.write(f"\nApplication Error: {str(e)}\n")
f.write(traceback.format_exc())
return 1
if __name__ == '__main__':
sys.exit(main())

View File

@@ -317,7 +317,15 @@ class MainWindow(QMainWindow):
if success:
QMessageBox.information(self, "成功", message)
else:
QMessageBox.warning(self, "失败", message)
# 检查是否是未激活错误
if "设备未激活" in message:
QMessageBox.warning(self, "设备未激活",
"请先激活设备后再进行更新。\n"
"您可以:\n"
"1. 输入激活码进行激活\n"
"2. 联系客服获取激活码")
else:
QMessageBox.warning(self, "失败", message)
self.append_log(message)
logging.info(f"更新完成: {message}")
@@ -357,13 +365,44 @@ class MainWindow(QMainWindow):
QMessageBox.warning(self, "提示", "请输入激活码")
return
# TODO: 实现激活逻辑
logging.info(f"正在处理激活请求,激活码: {activation_code}")
QMessageBox.information(self, "提示", "激活功能即将实现")
# 禁用激活按钮,防止重复点击
self.activate_input.setEnabled(False)
self.update_button.setEnabled(False)
# 显示处理中的提示
QApplication.processEvents()
# 调用激活接口
success, message, account_info = self.updater.check_activation_code(activation_code)
if success:
# 更新界面显示
self.member_info.clear()
self.member_info.append(f"会员状态: 已激活")
self.member_info.append(f"到期时间: {account_info['expire_time']}")
self.member_info.append(f"剩余天数: {account_info['days_left']}")
# 显示成功消息
QMessageBox.information(self, "激活成功",
f"设备已成功激活!\n"
f"到期时间: {account_info['expire_time']}\n"
f"剩余天数: {account_info['days_left']}")
# 清空激活码输入框
self.activate_input.clear()
logging.info(f"设备激活成功,到期时间: {account_info['expire_time']}")
else:
QMessageBox.warning(self, "激活失败", message)
logging.error(f"激活失败: {message}")
except Exception as e:
logging.error(f"激活过程中发生错误: {str(e)}")
QMessageBox.warning(self, "错误", f"激活失败: {str(e)}")
QMessageBox.critical(self, "错误", f"激活过程发生错误: {str(e)}")
finally:
# 恢复按钮状态
self.activate_input.setEnabled(True)
self.update_button.setEnabled(True)
def disable_cursor_update(self):
"""禁用Cursor版本更新"""

View File

@@ -1,11 +1,26 @@
# -*- coding: utf-8 -*-
import logging
import os
from datetime import datetime
# Configure logging
log_dir = "logs"
if not os.path.exists(log_dir):
os.makedirs(log_dir)
# 在用户主目录下创建日志目录
home_dir = os.path.expanduser('~')
app_dir = os.path.join(home_dir, '.cursor_pro')
log_dir = os.path.join(app_dir, 'logs')
os.makedirs(log_dir, exist_ok=True)
# 设置日志文件名
log_file = os.path.join(log_dir, f"{datetime.now().strftime('%Y-%m-%d')}.log")
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler(log_file, encoding='utf-8'),
logging.StreamHandler()
]
)
class PrefixFormatter(logging.Formatter):
@@ -17,17 +32,6 @@ class PrefixFormatter(logging.Formatter):
return super().format(record)
logging.basicConfig(
level=logging.DEBUG,
format="%(asctime)s - %(levelname)s - %(message)s",
handlers=[
logging.FileHandler(
os.path.join(log_dir, f"{datetime.now().strftime('%Y-%m-%d')}.log"),
encoding="utf-8",
),
],
)
# 为文件处理器设置自定义格式化器
for handler in logging.getLogger().handlers:
if isinstance(handler, logging.FileHandler):

View File

@@ -40,42 +40,50 @@ def get_cursor_paths() -> Tuple[str, str]:
OSError: 当找不到有效路径或系统不支持时抛出
"""
system = platform.system()
logger.info(f"当前操作系统: {system}")
paths_map = {
"Darwin": {
"base": "/Applications/Cursor.app/Contents/Resources/app",
"package": "package.json",
"main": "out/main.js",
},
"Windows": {
"base": os.path.join(
os.getenv("LOCALAPPDATA", ""), "Programs", "Cursor", "resources", "app"
),
"package": "package.json",
"main": "out/main.js",
},
"Linux": {
"bases": ["/opt/Cursor/resources/app", "/usr/share/cursor/resources/app"],
"package": "package.json",
"main": "out/main.js",
},
}
if system not in paths_map:
raise OSError(f"不支持的操作系统: {system}")
if system == "Linux":
for base in paths_map["Linux"]["bases"]:
pkg_path = os.path.join(base, paths_map["Linux"]["package"])
if os.path.exists(pkg_path):
return (pkg_path, os.path.join(base, paths_map["Linux"]["main"]))
if system == "Darwin": # macOS
base_path = "/Applications/Cursor.app/Contents/Resources/app"
pkg_path = os.path.join(base_path, "package.json")
main_path = os.path.join(base_path, "out", "main.js")
if not os.path.exists(pkg_path) or not os.path.exists(main_path):
raise OSError("在 macOS 系统上未找到 Cursor 安装路径")
return pkg_path, main_path
elif system == "Windows":
base_path = os.path.join(
os.getenv("LOCALAPPDATA", ""),
"Programs",
"Cursor",
"resources",
"app"
)
pkg_path = os.path.join(base_path, "package.json")
main_path = os.path.join(base_path, "out", "main.js")
if not os.path.exists(pkg_path) or not os.path.exists(main_path):
raise OSError("在 Windows 系统上未找到 Cursor 安装路径")
return pkg_path, main_path
elif system == "Linux":
linux_paths = [
"/opt/Cursor/resources/app",
"/usr/share/cursor/resources/app"
]
for base_path in linux_paths:
pkg_path = os.path.join(base_path, "package.json")
main_path = os.path.join(base_path, "out", "main.js")
if os.path.exists(pkg_path) and os.path.exists(main_path):
return pkg_path, main_path
raise OSError("在 Linux 系统上未找到 Cursor 安装路径")
base_path = paths_map[system]["base"]
return (
os.path.join(base_path, paths_map[system]["package"]),
os.path.join(base_path, paths_map[system]["main"]),
)
else:
raise OSError(f"不支持的操作系统: {system}")
def check_system_requirements(pkg_path: str, main_path: str) -> bool:
@@ -298,5 +306,24 @@ def patch_cursor_get_machine_id(restore_mode=False) -> None:
sys.exit(1)
# 添加patch函数作为主函数的别名
def patch(restore_mode=False) -> bool:
"""
patch函数作为patch_cursor_get_machine_id的别名
Args:
restore_mode: 是否为恢复模式
Returns:
bool: 修补是否成功
"""
try:
patch_cursor_get_machine_id(restore_mode)
return True
except Exception as e:
logger.error(f"修补失败: {str(e)}")
return False
if __name__ == "__main__":
patch_cursor_get_machine_id()

View File

@@ -15,12 +15,14 @@ from exit_cursor import ExitCursor
import go_cursor_help
from logo import print_logo
from typing import Tuple, Dict, Optional
import time
import requests.adapters
class CursorTokenUpdater:
def __init__(self):
self.auth_manager = CursorAuthManager()
self._hardware_id = None # 延迟初始化硬件ID
@property
def hardware_id(self) -> str:
"""获取硬件ID延迟初始化"""
@@ -30,68 +32,68 @@ class CursorTokenUpdater:
def _get_hardware_id(self) -> str:
"""获取硬件唯一标识
方案1: CPU ID + 主板序列号 + BIOS序列号
方案2: 系统盘序列号 + Windows安装时间
方案3: 计算机名(最后的备选方案)
macOS: 使用系统序列号和硬件UUID
Windows: CPU ID + 主板序列号 + BIOS序列号
其他: 计算机名(最后的备选方案)
"""
try:
# 创建startupinfo对象来隐藏命令行窗口
startupinfo = None
if sys.platform == "win32":
system = platform.system()
if system == "Darwin": # macOS
try:
# 获取系统序列号
serial_number = subprocess.check_output(['system_profiler', 'SPHardwareDataType']).decode()
serial = ""
for line in serial_number.split('\n'):
if 'Serial Number' in line:
serial = line.split(':')[1].strip()
break
# 获取硬件UUID
ioreg_output = subprocess.check_output(['ioreg', '-d2', '-c', 'IOPlatformExpertDevice']).decode()
uuid = ""
for line in ioreg_output.split('\n'):
if 'IOPlatformUUID' in line:
uuid = line.split('=')[1].strip().replace('"', '').replace(' ', '')
break
if serial and uuid:
combined = f"{serial}:{uuid}"
hardware_id = hashlib.md5(combined.encode()).hexdigest()
logging.info("使用macOS硬件信息生成ID成功")
return hardware_id
except Exception as e:
logging.warning(f"获取macOS硬件信息失败: {str(e)}")
elif system == "Windows":
# 创建startupinfo对象来隐藏命令行窗口
startupinfo = subprocess.STARTUPINFO()
startupinfo.dwFlags |= subprocess.STARTF_USESHOWWINDOW
startupinfo.wShowWindow = subprocess.SW_HIDE
# 方案1: 尝试获取硬件信息
try:
# 获取CPU ID
cpu_info = subprocess.check_output('wmic cpu get ProcessorId', startupinfo=startupinfo).decode()
cpu_id = cpu_info.split('\n')[1].strip()
# 获取主板序列号
board_info = subprocess.check_output('wmic baseboard get SerialNumber', startupinfo=startupinfo).decode()
board_id = board_info.split('\n')[1].strip()
# 获取BIOS序列号
bios_info = subprocess.check_output('wmic bios get SerialNumber', startupinfo=startupinfo).decode()
bios_id = bios_info.split('\n')[1].strip()
# 如果所有信息都获取成功且有效
if all([cpu_id, board_id, bios_id]) and not all(x in ['', '0', 'None', 'To be filled by O.E.M.'] for x in [cpu_id, board_id, bios_id]):
combined = f"{cpu_id}:{board_id}:{bios_id}"
hardware_id = hashlib.md5(combined.encode()).hexdigest()
logging.info("使用硬件信息生成ID成功")
return hardware_id
try:
# 获取CPU ID
cpu_info = subprocess.check_output('wmic cpu get ProcessorId', startupinfo=startupinfo).decode()
cpu_id = cpu_info.split('\n')[1].strip()
except Exception as e:
logging.warning(f"方案1失败: {str(e)}")
# 方案2: 系统盘序列号 + Windows安装时间
try:
backup_info = []
# 获取系统盘序列号
volume_info = subprocess.check_output('wmic logicaldisk where "DeviceID=\'C:\'" get VolumeSerialNumber', startupinfo=startupinfo).decode()
volume_serial = volume_info.split('\n')[1].strip()
if volume_serial and volume_serial not in ['', '0']:
backup_info.append(("volume", volume_serial))
# 获取Windows安装时间
os_info = subprocess.check_output('wmic os get InstallDate', startupinfo=startupinfo).decode()
install_date = os_info.split('\n')[1].strip()
if install_date:
backup_info.append(("install", install_date))
if backup_info:
combined = "|".join(f"{k}:{v}" for k, v in sorted(backup_info))
hardware_id = hashlib.md5(combined.encode()).hexdigest()
logging.info("使用系统信息生成ID成功")
return hardware_id
# 获取主板序列号
board_info = subprocess.check_output('wmic baseboard get SerialNumber', startupinfo=startupinfo).decode()
board_id = board_info.split('\n')[1].strip()
except Exception as e:
logging.warning(f"方案2失败: {str(e)}")
# 获取BIOS序列号
bios_info = subprocess.check_output('wmic bios get SerialNumber', startupinfo=startupinfo).decode()
bios_id = bios_info.split('\n')[1].strip()
# 如果所有信息都获取成功且有效
if all([cpu_id, board_id, bios_id]) and not all(x in ['', '0', 'None', 'To be filled by O.E.M.'] for x in [cpu_id, board_id, bios_id]):
combined = f"{cpu_id}:{board_id}:{bios_id}"
hardware_id = hashlib.md5(combined.encode()).hexdigest()
logging.info("使用Windows硬件信息生成ID成功")
return hardware_id
except Exception as e:
logging.warning(f"获取Windows硬件信息失败: {str(e)}")
# 方案3: 使用计算机名(最后的备选方案)
# 最后的备选方案:使用计算机名
computer_name = platform.node()
if computer_name:
hardware_id = hashlib.md5(computer_name.encode()).hexdigest()
@@ -227,17 +229,23 @@ class CursorTokenUpdater:
"""
try:
logging.info("开始重置机器码...")
resetter = MachineIDResetter()
result = resetter.reset(greater_than_0_45)
if result:
logging.info("机器码重置成功")
# 重置后更新硬件ID缓存
self._hardware_id = None
if greater_than_0_45:
# 对于0.45以上版本使用go_cursor_help
go_cursor_help.go_cursor_help()
logging.info("已调用go_cursor_help重置机器码")
return True
else:
logging.error("机器码重置失败")
return False
# 对于0.45及以下版本,使用传统方式
resetter = MachineIDResetter()
result = resetter.reset_machine_ids()
if result:
logging.info("机器码重置成功")
# 重置后更新硬件ID缓存
self._hardware_id = None
return True
else:
logging.error("机器码重置失败")
return False
except Exception as e:
logging.error(f"重置机器码时发生错误: {str(e)}")
@@ -268,10 +276,8 @@ class CursorTokenUpdater:
"""
try:
logging.info("正在退出Cursor进程...")
exit_handler = ExitCursor()
exit_handler.exit()
logging.info("Cursor进程已退出")
return True
result = ExitCursor()
return result
except Exception as e:
logging.error(f"退出Cursor进程时发生错误: {str(e)}")
return False
@@ -291,34 +297,266 @@ class CursorTokenUpdater:
print_logo()
logging.info("=== 开始完整更新流程 ===")
# 1. 退出Cursor进程
if not self.exit_cursor():
return False
# 2. 修补机器码获取方法
if not self.patch_machine_id():
return False
# 3. 重置机器码
if not self.reset_machine_id(greater_than_0_45=True):
return False
# 4. 如果没有提供认证信息从API获取
if not all([email, access_token]):
success, error_msg, account_info = self.get_unused_account()
if not success:
logging.error(f"无法获取账号信息: {error_msg}")
try:
# 1. 退出Cursor进程
if not self.exit_cursor():
logging.error("退出Cursor进程失败")
return False
email = account_info["email"]
access_token = account_info["access_token"]
refresh_token = account_info["refresh_token"]
# 2. 修补机器码获取方法
if not self.patch_machine_id():
logging.error("修补机器码获取方法失败")
return False
# 3. 重置机器码
# 对于0.45以上版本使用go_cursor_help
try:
go_cursor_help.go_cursor_help()
logging.info("已调用go_cursor_help重置机器码")
except Exception as e:
logging.error(f"使用go_cursor_help重置机器码失败: {str(e)}")
# 如果go_cursor_help失败尝试使用传统方式
if not self.reset_machine_id(greater_than_0_45=False):
logging.error("重置机器码失败")
return False
# 4. 如果没有提供认证信息从API获取
if not all([email, access_token]):
success, error_msg, account_info = self.get_unused_account()
if not success:
logging.error(f"无法获取账号信息: {error_msg}")
return False
email = account_info["email"]
access_token = account_info["access_token"]
refresh_token = account_info["refresh_token"]
# 5. 更新认证信息
if not self.update_auth_info(email, access_token, refresh_token):
logging.error("更新认证信息失败")
return False
logging.info("=== 所有操作已完成 ===")
return True
# 5. 更新认证信息
if not self.update_auth_info(email, access_token, refresh_token):
except Exception as e:
logging.error(f"更新流程发生错误: {str(e)}")
return False
def _get_network_error_message(self, error: Exception) -> str:
"""获取网络错误的友好提示信息"""
if isinstance(error, requests.exceptions.ConnectTimeout):
return "连接服务器超时,请检查网络连接"
elif isinstance(error, requests.exceptions.ConnectionError):
return "无法连接到服务器,请检查网络连接"
elif isinstance(error, requests.exceptions.ReadTimeout):
return "读取服务器响应超时,请重试"
else:
return str(error)
logging.info("=== 所有操作已完成 ===")
return True
def get_device_info(self) -> dict:
"""获取设备信息"""
return {
"os": platform.system(),
"os_version": platform.version(),
"machine": platform.machine(),
"hostname": platform.node(),
"hardware_id": self.hardware_id
}
def check_activation_code(self, code: str) -> tuple:
"""检查激活码
Args:
code: 激活码
Returns:
tuple: (成功标志, 消息, 账号信息)
"""
max_retries = 3 # 最大重试次数
retry_delay = 1 # 重试间隔(秒)
for attempt in range(max_retries):
try:
data = {
"machine_id": self.hardware_id,
"code": code
}
# 禁用SSL警告
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
# 设置请求参数
request_kwargs = {
"json": data,
"headers": {
"Content-Type": "application/json",
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/121.0.0.0 Safari/537.36",
"Accept": "*/*",
"Connection": "keep-alive"
},
"timeout": 10, # 增加超时时间
"verify": False # 禁用SSL验证
}
# 创建session
session = requests.Session()
session.verify = False
# 设置重试策略
retry_strategy = urllib3.Retry(
total=3, # 总重试次数
backoff_factor=0.5, # 重试间隔
status_forcelist=[500, 502, 503, 504] # 需要重试的HTTP状态码
)
adapter = requests.adapters.HTTPAdapter(max_retries=retry_strategy)
session.mount("http://", adapter)
session.mount("https://", adapter)
try:
# 尝试发送请求
response = session.post(
"https://cursorapi.nosqli.com/admin/api.member/activate",
**request_kwargs
)
response.raise_for_status() # 检查HTTP状态码
result = response.json()
# 激活成功
if result["code"] == 200:
api_data = result["data"]
# 构造标准的返回数据结构
account_info = {
"status": "active",
"expire_time": api_data.get("expire_time", ""),
"total_days": api_data.get("total_days", 0),
"days_left": api_data.get("days_left", 0),
"device_info": self.get_device_info()
}
return True, result["msg"], account_info
# 激活码无效或已被使用
elif result["code"] == 400:
logging.warning(f"激活码无效或已被使用: {result.get('msg', '未知错误')}")
return False, result.get("msg", "激活码无效或已被使用"), None
# 其他错误情况
else:
error_msg = result.get("msg", "未知错误")
if attempt < max_retries - 1: # 如果还有重试机会
logging.warning(f"{attempt + 1}次尝试失败: {error_msg}, 准备重试...")
time.sleep(retry_delay)
continue
logging.error(f"激活失败: {error_msg}")
return False, error_msg, None
except requests.exceptions.RequestException as e:
if attempt < max_retries - 1: # 如果还有重试机会
logging.warning(f"{attempt + 1}次网络请求失败: {str(e)}, 准备重试...")
time.sleep(retry_delay)
continue
error_msg = self._get_network_error_message(e)
logging.error(f"网络请求失败: {error_msg}")
return False, f"网络连接失败: {error_msg}", None
except Exception as e:
if attempt < max_retries - 1: # 如果还有重试机会
logging.warning(f"{attempt + 1}次请求发生错误: {str(e)}, 准备重试...")
time.sleep(retry_delay)
continue
logging.error(f"激活失败: {str(e)}")
return False, f"激活失败: {str(e)}", None
# 如果所有重试都失败了
return False, "多次尝试后激活失败,请检查网络连接或稍后重试", None
def check_member_status(self) -> Tuple[bool, str, Optional[Dict]]:
"""检查会员状态
Returns:
Tuple[bool, str, Optional[Dict]]:
- 是否成功
- 错误信息
- 账号数据(如果成功)
"""
max_retries = 3
retry_delay = 1
for attempt in range(max_retries):
try:
data = {
"machine_id": self.hardware_id
}
# 禁用SSL警告
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
# 设置请求参数
request_kwargs = {
"json": data,
"headers": {
"Content-Type": "application/json",
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/121.0.0.0 Safari/537.36",
"Accept": "*/*",
"Connection": "keep-alive"
},
"timeout": 10,
"verify": False
}
# 创建session
session = requests.Session()
session.verify = False
# 设置重试策略
retry_strategy = urllib3.Retry(
total=3,
backoff_factor=0.5,
status_forcelist=[500, 502, 503, 504]
)
adapter = requests.adapters.HTTPAdapter(max_retries=retry_strategy)
session.mount("http://", adapter)
session.mount("https://", adapter)
try:
# 发送请求
response = session.post(
"https://cursorapi.nosqli.com/admin/api.member/status",
**request_kwargs
)
response.raise_for_status()
result = response.json()
if result["code"] == 200:
api_data = result["data"]
account_info = {
"status": "active",
"expire_time": api_data.get("expire_time", ""),
"days_left": api_data.get("days_left", 0),
"device_info": self.get_device_info()
}
return True, "success", account_info
else:
error_msg = result.get("msg", "未知错误")
if attempt < max_retries - 1:
logging.warning(f"{attempt + 1}次检查失败: {error_msg}, 准备重试...")
time.sleep(retry_delay)
continue
return False, error_msg, None
except requests.exceptions.RequestException as e:
if attempt < max_retries - 1:
logging.warning(f"{attempt + 1}次网络请求失败: {str(e)}, 准备重试...")
time.sleep(retry_delay)
continue
error_msg = self._get_network_error_message(e)
return False, f"网络连接失败: {error_msg}", None
except Exception as e:
if attempt < max_retries - 1:
logging.warning(f"{attempt + 1}次请求发生错误: {str(e)}, 准备重试...")
time.sleep(retry_delay)
continue
return False, f"检查失败: {str(e)}", None
return False, "多次尝试后检查失败,请稍后重试", None
def main():
updater = CursorTokenUpdater()