6 Commits
main ... 1.1

7 changed files with 1148 additions and 280 deletions

73
build_mac_new.py Normal file
View File

@@ -0,0 +1,73 @@
import PyInstaller.__main__
import os
import sys
import shutil
# 获取当前脚本所在目录
current_dir = os.path.dirname(os.path.abspath(__file__))
# 清理之前的构建
dist_dir = os.path.join(current_dir, 'dist')
build_dir = os.path.join(current_dir, 'build')
if os.path.exists(dist_dir):
shutil.rmtree(dist_dir)
if os.path.exists(build_dir):
shutil.rmtree(build_dir)
# 创建必要的目录
os.makedirs('dist', exist_ok=True)
os.makedirs('build', exist_ok=True)
# 创建logs目录
logs_dir = os.path.join(current_dir, 'logs')
os.makedirs(logs_dir, exist_ok=True)
# 打包参数
params = [
'gui/main_mac.py', # 主程序入口
'--name=听泉助手', # 应用名称
'-w', # 不显示控制台窗口
'--clean', # 清理临时文件
'--noconfirm', # 不确认覆盖
'--debug=imports', # 只显示导入相关的调试信息
'--osx-bundle-identifier=com.cursor.pro', # macOS包标识符
f'--distpath={os.path.join(current_dir, "dist")}', # 输出目录
f'--workpath={os.path.join(current_dir, "build")}', # 工作目录
f'--specpath={current_dir}', # spec文件目录
'--add-data=logger.py:.', # 添加额外文件 (Mac系统使用:分隔符)
'--add-data=update_cursor_token.py:.',
'--add-data=cursor_auth_manager.py:.',
'--add-data=reset_machine.py:.',
'--add-data=patch_cursor_get_machine_id.py:.',
'--add-data=exit_cursor.py:.',
'--add-data=go_cursor_help.py:.',
'--add-data=logo.py:.',
'--add-data=config.py:.',
'--add-data=browser_utils.py:.',
'--add-data=get_email_code.py:.',
'--hidden-import=PyQt5',
'--hidden-import=PyQt5.QtCore',
'--hidden-import=PyQt5.QtGui',
'--hidden-import=PyQt5.QtWidgets',
'--hidden-import=requests',
'--hidden-import=urllib3',
'--hidden-import=psutil',
'--hidden-import=colorama',
]
# 执行打包
PyInstaller.__main__.run(params)
print("打包完成!应用程序包(.app)已生成在dist目录下。")
# 创建必要的目录和文件
app_path = os.path.join(current_dir, 'dist', 'CursorPro.app', 'Contents', 'MacOS')
if os.path.exists(app_path):
# 创建logs目录
os.makedirs(os.path.join(app_path, 'logs'), exist_ok=True)
# 创建空的error.log文件
with open(os.path.join(app_path, 'error.log'), 'w') as f:
f.write('')
print("已创建必要的目录和文件")

195
config.py
View File

@@ -1,146 +1,95 @@
from dotenv import load_dotenv
import os
import sys
from logger import logging
import json
import logging
from pathlib import Path
class Config:
"""配置类"""
def __init__(self):
# 获取应用程序的根目录路径
if getattr(sys, "frozen", False):
# 如果是打包后的可执行文件
application_path = os.path.dirname(sys.executable)
else:
# 如果是开发环境
application_path = os.path.dirname(os.path.abspath(__file__))
self.base_url = "https://cursorapi.nosqli.com"
self.api_endpoints = {
"activate": f"{self.base_url}/admin/api.member/activate",
"status": f"{self.base_url}/admin/api.member/status",
"get_unused": f"{self.base_url}/admin/api.account/getUnused",
"heartbeat": f"{self.base_url}/admin/api.account/heartbeat"
}
# 指定 .env 文件的路径
dotenv_path = os.path.join(application_path, ".env")
# macOS配置目录
self.config_dir = Path(os.path.expanduser("~")) / ".cursor_pro"
self.config_file = self.config_dir / "config.json"
self.member_file = self.config_dir / "member.json"
self.load_config()
if not os.path.exists(dotenv_path):
raise FileNotFoundError(f"文件 {dotenv_path} 不存在")
def load_config(self):
"""加载配置"""
try:
self.config_dir.mkdir(parents=True, exist_ok=True)
# 加载 .env 文件
load_dotenv(dotenv_path)
if not self.config_file.exists():
self.save_default_config()
self.imap = False
self.temp_mail = os.getenv("TEMP_MAIL", "").strip().split("@")[0]
self.temp_mail_epin = os.getenv("TEMP_MAIL_EPIN", "").strip()
self.temp_mail_ext = os.getenv("TEMP_MAIL_EXT", "").strip()
self.domain = os.getenv("DOMAIN", "").strip()
with open(self.config_file, "r", encoding="utf-8") as f:
config = json.load(f)
self.api_token = config.get("api_token", "")
# 如果临时邮箱为null则加载IMAP
if self.temp_mail == "null":
self.imap = True
self.imap_server = os.getenv("IMAP_SERVER", "").strip()
self.imap_port = os.getenv("IMAP_PORT", "").strip()
self.imap_user = os.getenv("IMAP_USER", "").strip()
self.imap_pass = os.getenv("IMAP_PASS", "").strip()
self.imap_dir = os.getenv("IMAP_DIR", "inbox").strip()
except Exception as e:
logging.error(f"加载配置失败: {str(e)}")
self.api_token = ""
self.check_config()
def save_member_info(self, info: dict):
"""保存会员信息"""
try:
with open(self.member_file, "w", encoding="utf-8") as f:
json.dump(info, f, indent=2, ensure_ascii=False)
logging.info("会员信息已保存")
except Exception as e:
logging.error(f"保存会员信息失败: {str(e)}")
def get_temp_mail(self):
return self.temp_mail
def get_temp_mail_epin(self):
return self.temp_mail_epin
def get_temp_mail_ext(self):
return self.temp_mail_ext
def get_imap(self):
if not self.imap:
return False
def load_member_info(self) -> dict:
"""读取会员信息"""
try:
if self.member_file.exists():
with open(self.member_file, "r", encoding="utf-8") as f:
info = json.load(f)
logging.info(f"已读取会员信息: 到期时间 {info.get('expire_time', '')}")
return info
except Exception as e:
logging.error(f"读取会员信息失败: {str(e)}")
return {
"imap_server": self.imap_server,
"imap_port": self.imap_port,
"imap_user": self.imap_user,
"imap_pass": self.imap_pass,
"imap_dir": self.imap_dir,
"expire_time": "",
"days": 0,
"new_days": 0
}
def get_domain(self):
return self.domain
def check_config(self):
"""检查配置项是否有效
检查规则:
1. 如果使用 tempmail.plus需要配置 TEMP_MAIL 和 DOMAIN
2. 如果使用 IMAP需要配置 IMAP_SERVER、IMAP_PORT、IMAP_USER、IMAP_PASS
3. IMAP_DIR 是可选的
"""
# 基础配置检查
required_configs = {
"domain": "域名",
def save_default_config(self):
"""保存默认配置"""
config = {
"api_token": ""
}
with open(self.config_file, "w", encoding="utf-8") as f:
json.dump(config, f, indent=2, ensure_ascii=False)
# 检查基础配置
for key, name in required_configs.items():
if not self.check_is_valid(getattr(self, key)):
raise ValueError(f"{name}未配置,请在 .env 文件中设置 {key.upper()}")
# 检查邮箱配置
if self.temp_mail != "null":
# tempmail.plus 模式
if not self.check_is_valid(self.temp_mail):
raise ValueError("临时邮箱未配置,请在 .env 文件中设置 TEMP_MAIL")
else:
# IMAP 模式
imap_configs = {
"imap_server": "IMAP服务器",
"imap_port": "IMAP端口",
"imap_user": "IMAP用户名",
"imap_pass": "IMAP密码",
def save_config(self, api_token: str):
"""保存新的配置"""
config = {
"api_token": api_token
}
with open(self.config_file, "w", encoding="utf-8") as f:
json.dump(config, f, indent=2, ensure_ascii=False)
self.api_token = api_token
logging.info("配置已更新")
for key, name in imap_configs.items():
value = getattr(self, key)
if value == "null" or not self.check_is_valid(value):
raise ValueError(
f"{name}未配置,请在 .env 文件中设置 {key.upper()}"
)
# IMAP_DIR 是可选的,如果设置了就检查其有效性
if self.imap_dir != "null" and not self.check_is_valid(self.imap_dir):
raise ValueError(
"IMAP收件箱目录配置无效请在 .env 文件中正确设置 IMAP_DIR"
)
def check_is_valid(self, value):
"""检查配置项是否有效
def get_api_url(self, endpoint_name: str) -> str:
"""获取API端点URL
Args:
value: 配置项的值
endpoint_name: 端点名称
Returns:
bool: 配置项是否有效
str: 完整的API URL
"""
return isinstance(value, str) and len(str(value).strip()) > 0
def print_config(self):
if self.imap:
logging.info(f"\033[32mIMAP服务器: {self.imap_server}\033[0m")
logging.info(f"\033[32mIMAP端口: {self.imap_port}\033[0m")
logging.info(f"\033[32mIMAP用户名: {self.imap_user}\033[0m")
logging.info(f"\033[32mIMAP密码: {'*' * len(self.imap_pass)}\033[0m")
logging.info(f"\033[32mIMAP收件箱目录: {self.imap_dir}\033[0m")
if self.temp_mail != "null":
logging.info(
f"\033[32m临时邮箱: {self.temp_mail}{self.temp_mail_ext}\033[0m"
)
logging.info(f"\033[32m域名: {self.domain}\033[0m")
# 使用示例
if __name__ == "__main__":
try:
config = Config()
print("环境变量加载成功!")
config.print_config()
except ValueError as e:
print(f"错误: {e}")
url = self.api_endpoints.get(endpoint_name, "")
if not url:
logging.error(f"未找到API端点: {endpoint_name}")
return url

538
gui/main_mac.py Normal file
View File

@@ -0,0 +1,538 @@
# -*- coding: utf-8 -*-
import sys
import os
import traceback
from datetime import datetime
def get_app_path():
"""获取应用程序路径"""
if getattr(sys, 'frozen', False):
# 如果是打包后的应用
return os.path.dirname(sys.executable)
else:
# 如果是开发环境
return os.path.dirname(os.path.abspath(__file__))
def setup_logging():
"""设置日志"""
app_path = get_app_path()
log_dir = os.path.join(app_path, 'logs')
os.makedirs(log_dir, exist_ok=True)
log_file = os.path.join(log_dir, 'app.log')
error_log = os.path.join(app_path, 'error.log')
# 配置日志
logging.basicConfig(
filename=log_file,
level=logging.DEBUG,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
# 设置错误日志处理
def handle_exception(exc_type, exc_value, exc_traceback):
if issubclass(exc_type, KeyboardInterrupt):
sys.__excepthook__(exc_type, exc_value, exc_traceback)
return
error_msg = ''.join(traceback.format_exception(exc_type, exc_value, exc_traceback))
try:
with open(error_log, 'a') as f:
f.write(f"\n{'-'*60}\n")
f.write(f"Time: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n")
f.write(error_msg)
logging.error(f"Uncaught exception:\n{error_msg}")
except Exception as e:
print(f"Error writing to log file: {str(e)}")
print(error_msg)
sys.excepthook = handle_exception
# 添加父目录到系统路径
current_dir = os.path.dirname(os.path.abspath(__file__))
parent_dir = os.path.dirname(current_dir)
sys.path.append(parent_dir)
try:
from PyQt5.QtWidgets import (QApplication, QMainWindow, QWidget, QVBoxLayout,
QPushButton, QLabel, QLineEdit, QTextEdit, QMessageBox,
QHBoxLayout, QFrame, QStackedWidget)
from PyQt5.QtCore import Qt, QThread, pyqtSignal, QTimer
from PyQt5.QtGui import QFont, QIcon, QPalette, QColor
from update_cursor_token import CursorTokenUpdater
from logger import logging
except Exception as e:
error_path = os.path.join(get_app_path(), 'error.log')
with open(error_path, 'w') as f:
f.write(f"Import Error: {str(e)}\n")
f.write(traceback.format_exc())
sys.exit(1)
# macOS 风格的颜色
MACOS_COLORS = {
'background': '#F5F5F7', # 浅灰色背景
'button': '#0066CC', # 蓝色按钮
'button_hover': '#0052A3', # 深蓝色悬停
'button_pressed': '#003D7A', # 更深的蓝色按下
'text': '#1D1D1F', # 深色文字
'frame': '#FFFFFF', # 白色框架
'input': '#FFFFFF', # 白色输入框
'input_text': '#1D1D1F', # 深色输入文字
'border': '#E5E5E5' # 边框颜色
}
class UpdateWorker(QThread):
"""后台更新线程"""
finished = pyqtSignal(bool, str)
progress = pyqtSignal(str)
def __init__(self, updater):
super().__init__()
self.updater = updater
def run(self):
try:
success = self.updater.full_update_process()
if success:
self.finished.emit(True, "更新成功!")
else:
self.finished.emit(False, "更新失败,请查看日志")
except Exception as e:
self.finished.emit(False, f"发生错误: {str(e)}")
class MainWindow(QMainWindow):
def __init__(self):
super().__init__()
try:
logging.info("正在初始化主窗口...")
self.updater = CursorTokenUpdater()
self.init_ui()
# 检查会员状态
self.check_member_status()
# 启动定时检测
self.setup_status_timer()
logging.info("主窗口初始化完成")
except Exception as e:
logging.error(f"初始化主窗口时发生错误: {str(e)}")
QMessageBox.critical(self, "错误", f"初始化失败: {str(e)}")
def init_ui(self):
# 设置窗口基本属性
self.setWindowTitle('听泉助手')
self.setMinimumSize(600, 500)
# 设置macOS风格的样式
self.setStyleSheet(f"""
QMainWindow {{
background-color: {MACOS_COLORS['background']};
}}
QPushButton {{
background-color: {MACOS_COLORS['button']};
color: white;
border: none;
border-radius: 6px;
padding: 8px 16px;
font-size: 13px;
min-width: 100px;
}}
QPushButton:hover {{
background-color: {MACOS_COLORS['button_hover']};
}}
QPushButton:pressed {{
background-color: {MACOS_COLORS['button_pressed']};
}}
QLabel {{
color: {MACOS_COLORS['text']};
font-size: 13px;
}}
QLabel[title="true"] {{
font-size: 24px;
font-weight: bold;
}}
QTextEdit {{
background-color: {MACOS_COLORS['input']};
color: {MACOS_COLORS['input_text']};
border: 1px solid {MACOS_COLORS['border']};
border-radius: 6px;
padding: 8px;
font-size: 13px;
}}
QLineEdit {{
background-color: {MACOS_COLORS['input']};
color: {MACOS_COLORS['input_text']};
border: 1px solid {MACOS_COLORS['border']};
border-radius: 6px;
padding: 8px;
font-size: 13px;
}}
QFrame {{
background-color: {MACOS_COLORS['frame']};
border: 1px solid {MACOS_COLORS['border']};
border-radius: 8px;
}}
QMessageBox {{
background-color: {MACOS_COLORS['background']};
}}
QMessageBox QPushButton {{
min-width: 80px;
padding: 6px 12px;
}}
""")
# 创建主窗口部件
central_widget = QWidget()
self.setCentralWidget(central_widget)
layout = QVBoxLayout(central_widget)
layout.setSpacing(16)
layout.setContentsMargins(20, 20, 20, 20)
# 标题
title_label = QLabel("听泉助手")
title_label.setProperty("title", "true")
title_label.setAlignment(Qt.AlignCenter)
layout.addWidget(title_label)
# 设备ID显示区域
device_frame = QFrame()
device_layout = QHBoxLayout(device_frame)
device_layout.setContentsMargins(16, 16, 16, 16)
device_id_label = QLabel("设备标识:")
self.device_id_text = QLineEdit()
self.device_id_text.setReadOnly(True)
try:
hardware_id = self.updater.hardware_id
logging.info(f"获取到硬件ID: {hardware_id}")
self.device_id_text.setText(hardware_id)
except Exception as e:
logging.error(f"获取硬件ID失败: {str(e)}")
self.device_id_text.setText("获取失败")
copy_button = QPushButton("复制ID")
copy_button.setMaximumWidth(100)
copy_button.clicked.connect(self.copy_device_id)
device_layout.addWidget(device_id_label)
device_layout.addWidget(self.device_id_text)
device_layout.addWidget(copy_button)
layout.addWidget(device_frame)
# 会员状态区域
member_frame = QFrame()
member_layout = QVBoxLayout(member_frame)
member_layout.setContentsMargins(16, 16, 16, 16)
member_title = QLabel("会员状态")
member_title.setAlignment(Qt.AlignCenter)
member_layout.addWidget(member_title)
self.member_info = QTextEdit()
self.member_info.setReadOnly(True)
self.member_info.setFixedHeight(80)
self.member_info.setText("会员状态: 正常\n到期时间: 2025-02-22 20:44:23")
member_layout.addWidget(self.member_info)
layout.addWidget(member_frame)
# 激活区域
activate_frame = QFrame()
activate_layout = QVBoxLayout(activate_frame)
activate_layout.setContentsMargins(16, 16, 16, 16)
activate_title = QLabel("激活会员")
activate_title.setAlignment(Qt.AlignCenter)
input_layout = QHBoxLayout()
self.activate_input = QLineEdit()
self.activate_input.setPlaceholderText("请输入激活码")
activate_button = QPushButton("激活")
activate_button.setMaximumWidth(100)
activate_button.clicked.connect(self.activate_license)
input_layout.addWidget(self.activate_input)
input_layout.addWidget(activate_button)
activate_layout.addWidget(activate_title)
activate_layout.addLayout(input_layout)
layout.addWidget(activate_frame)
# 功能按钮区域
button_frame = QFrame()
button_layout = QVBoxLayout(button_frame)
button_layout.setSpacing(12)
button_layout.setContentsMargins(16, 16, 16, 16)
self.update_button = QPushButton("刷新授权")
self.update_button.clicked.connect(self.start_update)
self.reset_button = QPushButton("重置机器码")
self.reset_button.clicked.connect(self.reset_machine)
self.disable_update_button = QPushButton("禁用更新")
self.disable_update_button.clicked.connect(self.disable_cursor_update)
button_layout.addWidget(self.update_button)
button_layout.addWidget(self.reset_button)
button_layout.addWidget(self.disable_update_button)
layout.addWidget(button_frame)
def copy_device_id(self):
clipboard = QApplication.clipboard()
clipboard.setText(self.device_id_text.text())
QMessageBox.information(self, "成功", "设备ID已复制到剪贴板")
def append_log(self, text):
self.member_info.append(text)
def start_update(self):
self.update_button.setEnabled(False)
self.worker = UpdateWorker(self.updater)
self.worker.finished.connect(self.update_finished)
self.worker.start()
def update_finished(self, success, message):
self.update_button.setEnabled(True)
if success:
QMessageBox.information(self, "成功", message)
else:
QMessageBox.warning(self, "失败", message)
def reset_machine(self):
try:
self.updater.reset_machine_id()
QMessageBox.information(self, "成功", "机器ID已重置")
except Exception as e:
QMessageBox.critical(self, "错误", f"重置失败: {str(e)}")
def activate_license(self):
"""激活许可证"""
try:
license_key = self.activate_input.text().strip()
if not license_key:
QMessageBox.warning(self, "提示", "请输入激活码")
return
# 禁用激活按钮,防止重复点击
self.activate_input.setEnabled(False)
self.update_button.setEnabled(False)
# 显示处理中的提示
self.member_info.clear()
self.member_info.append("正在激活,请稍候...")
QApplication.processEvents()
# 调用激活接口
success, message, account_info = self.updater.check_activation_code(license_key)
if success:
# 更新界面显示
self.member_info.clear()
self.member_info.append(f"会员状态: 已激活")
self.member_info.append(f"到期时间: {account_info['expire_time']}")
self.member_info.append(f"剩余天数: {account_info['days_left']}")
# 清空激活码输入框并更新提示(不禁用输入框)
self.activate_input.clear()
self.activate_input.setPlaceholderText("输入激活码可叠加时长")
# 启用更新按钮
self.update_button.setEnabled(True)
# 显示成功消息
QMessageBox.information(self, "激活成功",
f"激活成功!\n"
f"到期时间: {account_info['expire_time']}\n"
f"剩余天数: {account_info['days_left']}\n"
f"您可以继续输入其他激活码叠加时长")
logging.info(f"设备激活成功,到期时间: {account_info['expire_time']}")
else:
# 恢复激活输入框
self.activate_input.setPlaceholderText("请输入激活码")
# 更新状态显示
self.member_info.clear()
self.member_info.append("会员状态: 未激活")
self.member_info.append("请输入激活码进行激活")
QMessageBox.warning(self, "激活失败", message)
logging.error(f"激活失败: {message}")
except Exception as e:
# 恢复激活输入框
self.activate_input.setPlaceholderText("请输入激活码")
# 更新状态显示
self.member_info.clear()
self.member_info.append("会员状态: 激活失败")
self.member_info.append("请重新尝试")
logging.error(f"激活过程中发生错误: {str(e)}")
QMessageBox.critical(self, "错误", f"激活过程发生错误: {str(e)}")
finally:
# 根据激活状态设置更新按钮状态
self.update_button.setEnabled(success if 'success' in locals() else False)
def disable_cursor_update(self):
try:
# 这里添加禁用更新逻辑
QMessageBox.information(self, "成功", "已禁用Cursor更新")
except Exception as e:
QMessageBox.critical(self, "错误", f"操作失败: {str(e)}")
def check_member_status(self):
"""检查会员状态"""
try:
logging.info("正在检查会员状态...")
self.member_info.clear()
self.member_info.append("正在检查会员状态...")
QApplication.processEvents()
# 调用API检查状态
success, message, account_info = self.updater.check_member_status()
if success and account_info:
# 更新会员信息显示
self.member_info.clear()
self.member_info.append(f"会员状态: 已激活")
self.member_info.append(f"到期时间: {account_info['expire_time']}")
self.member_info.append(f"剩余天数: {account_info['days_left']}")
# 更新激活输入框提示(不禁用输入框)
self.activate_input.setPlaceholderText("输入激活码可叠加时长")
# 启用更新按钮
self.update_button.setEnabled(True)
logging.info(f"会员状态检查完成 - 到期时间: {account_info['expire_time']}")
else:
# 显示未激活状态
self.member_info.clear()
self.member_info.append("会员状态: 未激活")
self.member_info.append("请输入激活码进行激活")
# 更新激活输入框提示
self.activate_input.setPlaceholderText("请输入激活码")
# 禁用更新按钮
self.update_button.setEnabled(False)
logging.warning("会员状态检查结果:未激活")
if message and "设备未激活" not in message:
QMessageBox.warning(self, "提示", message)
except Exception as e:
self.member_info.clear()
self.member_info.append("会员状态: 检查失败")
self.member_info.append("请稍后重试")
# 启用激活输入框和按钮
self.activate_input.setEnabled(True)
self.activate_input.setPlaceholderText("请输入激活码")
# 禁用更新按钮
self.update_button.setEnabled(False)
logging.error(f"检查会员状态时发生错误: {str(e)}")
QMessageBox.warning(self, "错误", f"检查会员状态失败: {str(e)}")
def setup_status_timer(self):
"""设置定时检测会员状态"""
try:
self.status_timer = QTimer(self)
self.status_timer.timeout.connect(self.silent_check_member_status)
# 设置3分钟检测一次 (180000毫秒)
self.status_timer.start(180000)
logging.info("会员状态定时检测已启动")
except Exception as e:
logging.error(f"设置定时器时发生错误: {str(e)}")
def silent_check_member_status(self):
"""静默检查会员状态"""
try:
logging.info("开始静默检查会员状态...")
success, message, account_info = self.updater.check_member_status()
if success and account_info:
# 更新会员信息显示
self.member_info.clear()
self.member_info.append(f"会员状态: 已激活")
self.member_info.append(f"到期时间: {account_info['expire_time']}")
self.member_info.append(f"剩余天数: {account_info['days_left']}")
# 禁用激活输入框和按钮
self.activate_input.setEnabled(False)
self.activate_input.setPlaceholderText("已激活")
# 启用更新按钮
self.update_button.setEnabled(True)
logging.info(f"静默检查完成 - 会员状态正常,到期时间: {account_info['expire_time']}")
else:
# 显示未激活状态
self.member_info.clear()
self.member_info.append("会员状态: 未激活")
self.member_info.append("请输入激活码进行激活")
# 启用激活输入框和按钮
self.activate_input.setEnabled(True)
self.activate_input.setPlaceholderText("请输入激活码")
# 禁用更新按钮
self.update_button.setEnabled(False)
logging.warning("静默检查结果:会员未激活")
except Exception as e:
logging.error(f"静默检查会员状态时发生错误: {str(e)}")
# 静默检查出错时不显示错误提示,只记录日志
def closeEvent(self, event):
"""窗口关闭事件"""
try:
# 停止定时器
if hasattr(self, 'status_timer'):
self.status_timer.stop()
logging.info("应用程序正常关闭")
event.accept()
except Exception as e:
logging.error(f"关闭窗口时发生错误: {str(e)}")
event.accept()
def main():
try:
# 设置日志
setup_logging()
logging.info("应用程序启动")
app = QApplication(sys.argv)
app.setApplicationName("听泉助手")
app.setOrganizationName("听泉")
app.setOrganizationDomain("cursor.pro")
try:
window = MainWindow()
window.show()
logging.info("主窗口已显示")
return app.exec_()
except Exception as e:
logging.error(f"主窗口创建失败: {str(e)}")
logging.error(traceback.format_exc())
QMessageBox.critical(None, "错误", f"程序启动失败: {str(e)}")
return 1
except Exception as e:
error_path = os.path.join(get_app_path(), 'error.log')
with open(error_path, 'a') as f:
f.write(f"\nApplication Error: {str(e)}\n")
f.write(traceback.format_exc())
return 1
if __name__ == '__main__':
sys.exit(main())

View File

@@ -316,6 +316,14 @@ class MainWindow(QMainWindow):
if success:
QMessageBox.information(self, "成功", message)
else:
# 检查是否是未激活错误
if "设备未激活" in message:
QMessageBox.warning(self, "设备未激活",
"请先激活设备后再进行更新。\n"
"您可以:\n"
"1. 输入激活码进行激活\n"
"2. 联系客服获取激活码")
else:
QMessageBox.warning(self, "失败", message)
@@ -357,13 +365,44 @@ class MainWindow(QMainWindow):
QMessageBox.warning(self, "提示", "请输入激活码")
return
# TODO: 实现激活逻辑
logging.info(f"正在处理激活请求,激活码: {activation_code}")
QMessageBox.information(self, "提示", "激活功能即将实现")
# 禁用激活按钮,防止重复点击
self.activate_input.setEnabled(False)
self.update_button.setEnabled(False)
# 显示处理中的提示
QApplication.processEvents()
# 调用激活接口
success, message, account_info = self.updater.check_activation_code(activation_code)
if success:
# 更新界面显示
self.member_info.clear()
self.member_info.append(f"会员状态: 已激活")
self.member_info.append(f"到期时间: {account_info['expire_time']}")
self.member_info.append(f"剩余天数: {account_info['days_left']}")
# 显示成功消息
QMessageBox.information(self, "激活成功",
f"设备已成功激活!\n"
f"到期时间: {account_info['expire_time']}\n"
f"剩余天数: {account_info['days_left']}")
# 清空激活码输入框
self.activate_input.clear()
logging.info(f"设备激活成功,到期时间: {account_info['expire_time']}")
else:
QMessageBox.warning(self, "激活失败", message)
logging.error(f"激活失败: {message}")
except Exception as e:
logging.error(f"激活过程中发生错误: {str(e)}")
QMessageBox.warning(self, "错误", f"激活失败: {str(e)}")
QMessageBox.critical(self, "错误", f"激活过程发生错误: {str(e)}")
finally:
# 恢复按钮状态
self.activate_input.setEnabled(True)
self.update_button.setEnabled(True)
def disable_cursor_update(self):
"""禁用Cursor版本更新"""

View File

@@ -1,11 +1,26 @@
# -*- coding: utf-8 -*-
import logging
import os
from datetime import datetime
# Configure logging
log_dir = "logs"
if not os.path.exists(log_dir):
os.makedirs(log_dir)
# 在用户主目录下创建日志目录
home_dir = os.path.expanduser('~')
app_dir = os.path.join(home_dir, '.cursor_pro')
log_dir = os.path.join(app_dir, 'logs')
os.makedirs(log_dir, exist_ok=True)
# 设置日志文件名
log_file = os.path.join(log_dir, f"{datetime.now().strftime('%Y-%m-%d')}.log")
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler(log_file, encoding='utf-8'),
logging.StreamHandler()
]
)
class PrefixFormatter(logging.Formatter):
@@ -17,17 +32,6 @@ class PrefixFormatter(logging.Formatter):
return super().format(record)
logging.basicConfig(
level=logging.DEBUG,
format="%(asctime)s - %(levelname)s - %(message)s",
handlers=[
logging.FileHandler(
os.path.join(log_dir, f"{datetime.now().strftime('%Y-%m-%d')}.log"),
encoding="utf-8",
),
],
)
# 为文件处理器设置自定义格式化器
for handler in logging.getLogger().handlers:
if isinstance(handler, logging.FileHandler):

View File

@@ -40,42 +40,50 @@ def get_cursor_paths() -> Tuple[str, str]:
OSError: 当找不到有效路径或系统不支持时抛出
"""
system = platform.system()
logger.info(f"当前操作系统: {system}")
paths_map = {
"Darwin": {
"base": "/Applications/Cursor.app/Contents/Resources/app",
"package": "package.json",
"main": "out/main.js",
},
"Windows": {
"base": os.path.join(
os.getenv("LOCALAPPDATA", ""), "Programs", "Cursor", "resources", "app"
),
"package": "package.json",
"main": "out/main.js",
},
"Linux": {
"bases": ["/opt/Cursor/resources/app", "/usr/share/cursor/resources/app"],
"package": "package.json",
"main": "out/main.js",
},
}
if system == "Darwin": # macOS
base_path = "/Applications/Cursor.app/Contents/Resources/app"
pkg_path = os.path.join(base_path, "package.json")
main_path = os.path.join(base_path, "out", "main.js")
if system not in paths_map:
raise OSError(f"不支持的操作系统: {system}")
if not os.path.exists(pkg_path) or not os.path.exists(main_path):
raise OSError("在 macOS 系统上未找到 Cursor 安装路径")
return pkg_path, main_path
elif system == "Windows":
base_path = os.path.join(
os.getenv("LOCALAPPDATA", ""),
"Programs",
"Cursor",
"resources",
"app"
)
pkg_path = os.path.join(base_path, "package.json")
main_path = os.path.join(base_path, "out", "main.js")
if not os.path.exists(pkg_path) or not os.path.exists(main_path):
raise OSError("在 Windows 系统上未找到 Cursor 安装路径")
return pkg_path, main_path
elif system == "Linux":
linux_paths = [
"/opt/Cursor/resources/app",
"/usr/share/cursor/resources/app"
]
for base_path in linux_paths:
pkg_path = os.path.join(base_path, "package.json")
main_path = os.path.join(base_path, "out", "main.js")
if os.path.exists(pkg_path) and os.path.exists(main_path):
return pkg_path, main_path
if system == "Linux":
for base in paths_map["Linux"]["bases"]:
pkg_path = os.path.join(base, paths_map["Linux"]["package"])
if os.path.exists(pkg_path):
return (pkg_path, os.path.join(base, paths_map["Linux"]["main"]))
raise OSError("在 Linux 系统上未找到 Cursor 安装路径")
base_path = paths_map[system]["base"]
return (
os.path.join(base_path, paths_map[system]["package"]),
os.path.join(base_path, paths_map[system]["main"]),
)
else:
raise OSError(f"不支持的操作系统: {system}")
def check_system_requirements(pkg_path: str, main_path: str) -> bool:
@@ -298,5 +306,24 @@ def patch_cursor_get_machine_id(restore_mode=False) -> None:
sys.exit(1)
# 添加patch函数作为主函数的别名
def patch(restore_mode=False) -> bool:
"""
patch函数作为patch_cursor_get_machine_id的别名
Args:
restore_mode: 是否为恢复模式
Returns:
bool: 修补是否成功
"""
try:
patch_cursor_get_machine_id(restore_mode)
return True
except Exception as e:
logger.error(f"修补失败: {str(e)}")
return False
if __name__ == "__main__":
patch_cursor_get_machine_id()

View File

@@ -15,6 +15,8 @@ from exit_cursor import ExitCursor
import go_cursor_help
from logo import print_logo
from typing import Tuple, Dict, Optional
import time
import requests.adapters
class CursorTokenUpdater:
def __init__(self):
@@ -30,19 +32,45 @@ class CursorTokenUpdater:
def _get_hardware_id(self) -> str:
"""获取硬件唯一标识
方案1: CPU ID + 主板序列号 + BIOS序列号
方案2: 系统盘序列号 + Windows安装时间
方案3: 计算机名(最后的备选方案)
macOS: 使用系统序列号和硬件UUID
Windows: CPU ID + 主板序列号 + BIOS序列号
其他: 计算机名(最后的备选方案)
"""
try:
system = platform.system()
if system == "Darwin": # macOS
try:
# 获取系统序列号
serial_number = subprocess.check_output(['system_profiler', 'SPHardwareDataType']).decode()
serial = ""
for line in serial_number.split('\n'):
if 'Serial Number' in line:
serial = line.split(':')[1].strip()
break
# 获取硬件UUID
ioreg_output = subprocess.check_output(['ioreg', '-d2', '-c', 'IOPlatformExpertDevice']).decode()
uuid = ""
for line in ioreg_output.split('\n'):
if 'IOPlatformUUID' in line:
uuid = line.split('=')[1].strip().replace('"', '').replace(' ', '')
break
if serial and uuid:
combined = f"{serial}:{uuid}"
hardware_id = hashlib.md5(combined.encode()).hexdigest()
logging.info("使用macOS硬件信息生成ID成功")
return hardware_id
except Exception as e:
logging.warning(f"获取macOS硬件信息失败: {str(e)}")
elif system == "Windows":
# 创建startupinfo对象来隐藏命令行窗口
startupinfo = None
if sys.platform == "win32":
startupinfo = subprocess.STARTUPINFO()
startupinfo.dwFlags |= subprocess.STARTF_USESHOWWINDOW
startupinfo.wShowWindow = subprocess.SW_HIDE
# 方案1: 尝试获取硬件信息
try:
# 获取CPU ID
cpu_info = subprocess.check_output('wmic cpu get ProcessorId', startupinfo=startupinfo).decode()
@@ -60,38 +88,12 @@ class CursorTokenUpdater:
if all([cpu_id, board_id, bios_id]) and not all(x in ['', '0', 'None', 'To be filled by O.E.M.'] for x in [cpu_id, board_id, bios_id]):
combined = f"{cpu_id}:{board_id}:{bios_id}"
hardware_id = hashlib.md5(combined.encode()).hexdigest()
logging.info("使用硬件信息生成ID成功")
logging.info("使用Windows硬件信息生成ID成功")
return hardware_id
except Exception as e:
logging.warning(f"方案1失败: {str(e)}")
logging.warning(f"获取Windows硬件信息失败: {str(e)}")
# 方案2: 系统盘序列号 + Windows安装时间
try:
backup_info = []
# 获取系统盘序列号
volume_info = subprocess.check_output('wmic logicaldisk where "DeviceID=\'C:\'" get VolumeSerialNumber', startupinfo=startupinfo).decode()
volume_serial = volume_info.split('\n')[1].strip()
if volume_serial and volume_serial not in ['', '0']:
backup_info.append(("volume", volume_serial))
# 获取Windows安装时间
os_info = subprocess.check_output('wmic os get InstallDate', startupinfo=startupinfo).decode()
install_date = os_info.split('\n')[1].strip()
if install_date:
backup_info.append(("install", install_date))
if backup_info:
combined = "|".join(f"{k}:{v}" for k, v in sorted(backup_info))
hardware_id = hashlib.md5(combined.encode()).hexdigest()
logging.info("使用系统信息生成ID成功")
return hardware_id
except Exception as e:
logging.warning(f"方案2失败: {str(e)}")
# 方案3: 使用计算机名(最后的备选方案)
# 最后的备选方案:使用计算机名
computer_name = platform.node()
if computer_name:
hardware_id = hashlib.md5(computer_name.encode()).hexdigest()
@@ -227,9 +229,15 @@ class CursorTokenUpdater:
"""
try:
logging.info("开始重置机器码...")
if greater_than_0_45:
# 对于0.45以上版本使用go_cursor_help
go_cursor_help.go_cursor_help()
logging.info("已调用go_cursor_help重置机器码")
return True
else:
# 对于0.45及以下版本,使用传统方式
resetter = MachineIDResetter()
result = resetter.reset(greater_than_0_45)
result = resetter.reset_machine_ids()
if result:
logging.info("机器码重置成功")
# 重置后更新硬件ID缓存
@@ -268,10 +276,8 @@ class CursorTokenUpdater:
"""
try:
logging.info("正在退出Cursor进程...")
exit_handler = ExitCursor()
exit_handler.exit()
logging.info("Cursor进程已退出")
return True
result = ExitCursor()
return result
except Exception as e:
logging.error(f"退出Cursor进程时发生错误: {str(e)}")
return False
@@ -291,16 +297,27 @@ class CursorTokenUpdater:
print_logo()
logging.info("=== 开始完整更新流程 ===")
try:
# 1. 退出Cursor进程
if not self.exit_cursor():
logging.error("退出Cursor进程失败")
return False
# 2. 修补机器码获取方法
if not self.patch_machine_id():
logging.error("修补机器码获取方法失败")
return False
# 3. 重置机器码
if not self.reset_machine_id(greater_than_0_45=True):
# 对于0.45以上版本使用go_cursor_help
try:
go_cursor_help.go_cursor_help()
logging.info("已调用go_cursor_help重置机器码")
except Exception as e:
logging.error(f"使用go_cursor_help重置机器码失败: {str(e)}")
# 如果go_cursor_help失败尝试使用传统方式
if not self.reset_machine_id(greater_than_0_45=False):
logging.error("重置机器码失败")
return False
# 4. 如果没有提供认证信息从API获取
@@ -315,11 +332,232 @@ class CursorTokenUpdater:
# 5. 更新认证信息
if not self.update_auth_info(email, access_token, refresh_token):
logging.error("更新认证信息失败")
return False
logging.info("=== 所有操作已完成 ===")
return True
except Exception as e:
logging.error(f"更新流程发生错误: {str(e)}")
return False
def _get_network_error_message(self, error: Exception) -> str:
"""获取网络错误的友好提示信息"""
if isinstance(error, requests.exceptions.ConnectTimeout):
return "连接服务器超时,请检查网络连接"
elif isinstance(error, requests.exceptions.ConnectionError):
return "无法连接到服务器,请检查网络连接"
elif isinstance(error, requests.exceptions.ReadTimeout):
return "读取服务器响应超时,请重试"
else:
return str(error)
def get_device_info(self) -> dict:
"""获取设备信息"""
return {
"os": platform.system(),
"os_version": platform.version(),
"machine": platform.machine(),
"hostname": platform.node(),
"hardware_id": self.hardware_id
}
def check_activation_code(self, code: str) -> tuple:
"""检查激活码
Args:
code: 激活码
Returns:
tuple: (成功标志, 消息, 账号信息)
"""
max_retries = 3 # 最大重试次数
retry_delay = 1 # 重试间隔(秒)
for attempt in range(max_retries):
try:
data = {
"machine_id": self.hardware_id,
"code": code
}
# 禁用SSL警告
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
# 设置请求参数
request_kwargs = {
"json": data,
"headers": {
"Content-Type": "application/json",
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/121.0.0.0 Safari/537.36",
"Accept": "*/*",
"Connection": "keep-alive"
},
"timeout": 10, # 增加超时时间
"verify": False # 禁用SSL验证
}
# 创建session
session = requests.Session()
session.verify = False
# 设置重试策略
retry_strategy = urllib3.Retry(
total=3, # 总重试次数
backoff_factor=0.5, # 重试间隔
status_forcelist=[500, 502, 503, 504] # 需要重试的HTTP状态码
)
adapter = requests.adapters.HTTPAdapter(max_retries=retry_strategy)
session.mount("http://", adapter)
session.mount("https://", adapter)
try:
# 尝试发送请求
response = session.post(
"https://cursorapi.nosqli.com/admin/api.member/activate",
**request_kwargs
)
response.raise_for_status() # 检查HTTP状态码
result = response.json()
# 激活成功
if result["code"] == 200:
api_data = result["data"]
# 构造标准的返回数据结构
account_info = {
"status": "active",
"expire_time": api_data.get("expire_time", ""),
"total_days": api_data.get("total_days", 0),
"days_left": api_data.get("days_left", 0),
"device_info": self.get_device_info()
}
return True, result["msg"], account_info
# 激活码无效或已被使用
elif result["code"] == 400:
logging.warning(f"激活码无效或已被使用: {result.get('msg', '未知错误')}")
return False, result.get("msg", "激活码无效或已被使用"), None
# 其他错误情况
else:
error_msg = result.get("msg", "未知错误")
if attempt < max_retries - 1: # 如果还有重试机会
logging.warning(f"{attempt + 1}次尝试失败: {error_msg}, 准备重试...")
time.sleep(retry_delay)
continue
logging.error(f"激活失败: {error_msg}")
return False, error_msg, None
except requests.exceptions.RequestException as e:
if attempt < max_retries - 1: # 如果还有重试机会
logging.warning(f"{attempt + 1}次网络请求失败: {str(e)}, 准备重试...")
time.sleep(retry_delay)
continue
error_msg = self._get_network_error_message(e)
logging.error(f"网络请求失败: {error_msg}")
return False, f"网络连接失败: {error_msg}", None
except Exception as e:
if attempt < max_retries - 1: # 如果还有重试机会
logging.warning(f"{attempt + 1}次请求发生错误: {str(e)}, 准备重试...")
time.sleep(retry_delay)
continue
logging.error(f"激活失败: {str(e)}")
return False, f"激活失败: {str(e)}", None
# 如果所有重试都失败了
return False, "多次尝试后激活失败,请检查网络连接或稍后重试", None
def check_member_status(self) -> Tuple[bool, str, Optional[Dict]]:
"""检查会员状态
Returns:
Tuple[bool, str, Optional[Dict]]:
- 是否成功
- 错误信息
- 账号数据(如果成功)
"""
max_retries = 3
retry_delay = 1
for attempt in range(max_retries):
try:
data = {
"machine_id": self.hardware_id
}
# 禁用SSL警告
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
# 设置请求参数
request_kwargs = {
"json": data,
"headers": {
"Content-Type": "application/json",
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/121.0.0.0 Safari/537.36",
"Accept": "*/*",
"Connection": "keep-alive"
},
"timeout": 10,
"verify": False
}
# 创建session
session = requests.Session()
session.verify = False
# 设置重试策略
retry_strategy = urllib3.Retry(
total=3,
backoff_factor=0.5,
status_forcelist=[500, 502, 503, 504]
)
adapter = requests.adapters.HTTPAdapter(max_retries=retry_strategy)
session.mount("http://", adapter)
session.mount("https://", adapter)
try:
# 发送请求
response = session.post(
"https://cursorapi.nosqli.com/admin/api.member/status",
**request_kwargs
)
response.raise_for_status()
result = response.json()
if result["code"] == 200:
api_data = result["data"]
account_info = {
"status": "active",
"expire_time": api_data.get("expire_time", ""),
"days_left": api_data.get("days_left", 0),
"device_info": self.get_device_info()
}
return True, "success", account_info
else:
error_msg = result.get("msg", "未知错误")
if attempt < max_retries - 1:
logging.warning(f"{attempt + 1}次检查失败: {error_msg}, 准备重试...")
time.sleep(retry_delay)
continue
return False, error_msg, None
except requests.exceptions.RequestException as e:
if attempt < max_retries - 1:
logging.warning(f"{attempt + 1}次网络请求失败: {str(e)}, 准备重试...")
time.sleep(retry_delay)
continue
error_msg = self._get_network_error_message(e)
return False, f"网络连接失败: {error_msg}", None
except Exception as e:
if attempt < max_retries - 1:
logging.warning(f"{attempt + 1}次请求发生错误: {str(e)}, 准备重试...")
time.sleep(retry_delay)
continue
return False, f"检查失败: {str(e)}", None
return False, "多次尝试后检查失败,请稍后重试", None
def main():
updater = CursorTokenUpdater()