feat: 1.1版本更新 - 优化UI界面和激活流程

This commit is contained in:
ruisu
2025-02-17 20:41:33 +08:00
parent 139c73d9c9
commit 3d9835bd7f
6 changed files with 466 additions and 277 deletions

View File

@@ -25,7 +25,7 @@ os.makedirs(logs_dir, exist_ok=True)
# 打包参数
params = [
'gui/main_mac.py', # 主程序入口
'--name=CursorPro', # 应用名称
'--name=听泉助手', # 应用名称
'-w', # 不显示控制台窗口
'--clean', # 清理临时文件
'--noconfirm', # 不确认覆盖

219
config.py
View File

@@ -1,146 +1,95 @@
from dotenv import load_dotenv
import os
import sys
from logger import logging
import json
import logging
from pathlib import Path
class Config:
"""配置类"""
def __init__(self):
# 获取应用程序的根目录路径
if getattr(sys, "frozen", False):
# 如果是打包后的可执行文件
application_path = os.path.dirname(sys.executable)
else:
# 如果是开发环境
application_path = os.path.dirname(os.path.abspath(__file__))
# 指定 .env 文件的路径
dotenv_path = os.path.join(application_path, ".env")
if not os.path.exists(dotenv_path):
raise FileNotFoundError(f"文件 {dotenv_path} 不存在")
# 加载 .env 文件
load_dotenv(dotenv_path)
self.imap = False
self.temp_mail = os.getenv("TEMP_MAIL", "").strip().split("@")[0]
self.temp_mail_epin = os.getenv("TEMP_MAIL_EPIN", "").strip()
self.temp_mail_ext = os.getenv("TEMP_MAIL_EXT", "").strip()
self.domain = os.getenv("DOMAIN", "").strip()
# 如果临时邮箱为null则加载IMAP
if self.temp_mail == "null":
self.imap = True
self.imap_server = os.getenv("IMAP_SERVER", "").strip()
self.imap_port = os.getenv("IMAP_PORT", "").strip()
self.imap_user = os.getenv("IMAP_USER", "").strip()
self.imap_pass = os.getenv("IMAP_PASS", "").strip()
self.imap_dir = os.getenv("IMAP_DIR", "inbox").strip()
self.check_config()
def get_temp_mail(self):
return self.temp_mail
def get_temp_mail_epin(self):
return self.temp_mail_epin
def get_temp_mail_ext(self):
return self.temp_mail_ext
def get_imap(self):
if not self.imap:
return False
self.base_url = "https://cursorapi.nosqli.com"
self.api_endpoints = {
"activate": f"{self.base_url}/admin/api.member/activate",
"status": f"{self.base_url}/admin/api.member/status",
"get_unused": f"{self.base_url}/admin/api.account/getUnused",
"heartbeat": f"{self.base_url}/admin/api.account/heartbeat"
}
# macOS配置目录
self.config_dir = Path(os.path.expanduser("~")) / ".cursor_pro"
self.config_file = self.config_dir / "config.json"
self.member_file = self.config_dir / "member.json"
self.load_config()
def load_config(self):
"""加载配置"""
try:
self.config_dir.mkdir(parents=True, exist_ok=True)
if not self.config_file.exists():
self.save_default_config()
with open(self.config_file, "r", encoding="utf-8") as f:
config = json.load(f)
self.api_token = config.get("api_token", "")
except Exception as e:
logging.error(f"加载配置失败: {str(e)}")
self.api_token = ""
def save_member_info(self, info: dict):
"""保存会员信息"""
try:
with open(self.member_file, "w", encoding="utf-8") as f:
json.dump(info, f, indent=2, ensure_ascii=False)
logging.info("会员信息已保存")
except Exception as e:
logging.error(f"保存会员信息失败: {str(e)}")
def load_member_info(self) -> dict:
"""读取会员信息"""
try:
if self.member_file.exists():
with open(self.member_file, "r", encoding="utf-8") as f:
info = json.load(f)
logging.info(f"已读取会员信息: 到期时间 {info.get('expire_time', '')}")
return info
except Exception as e:
logging.error(f"读取会员信息失败: {str(e)}")
return {
"imap_server": self.imap_server,
"imap_port": self.imap_port,
"imap_user": self.imap_user,
"imap_pass": self.imap_pass,
"imap_dir": self.imap_dir,
"expire_time": "",
"days": 0,
"new_days": 0
}
def get_domain(self):
return self.domain
def check_config(self):
"""检查配置项是否有效
检查规则:
1. 如果使用 tempmail.plus需要配置 TEMP_MAIL 和 DOMAIN
2. 如果使用 IMAP需要配置 IMAP_SERVER、IMAP_PORT、IMAP_USER、IMAP_PASS
3. IMAP_DIR 是可选的
"""
# 基础配置检查
required_configs = {
"domain": "域名",
def save_default_config(self):
"""保存默认配置"""
config = {
"api_token": ""
}
with open(self.config_file, "w", encoding="utf-8") as f:
json.dump(config, f, indent=2, ensure_ascii=False)
def save_config(self, api_token: str):
"""保存新的配置"""
config = {
"api_token": api_token
}
with open(self.config_file, "w", encoding="utf-8") as f:
json.dump(config, f, indent=2, ensure_ascii=False)
self.api_token = api_token
logging.info("配置已更新")
# 检查基础配置
for key, name in required_configs.items():
if not self.check_is_valid(getattr(self, key)):
raise ValueError(f"{name}未配置,请在 .env 文件中设置 {key.upper()}")
# 检查邮箱配置
if self.temp_mail != "null":
# tempmail.plus 模式
if not self.check_is_valid(self.temp_mail):
raise ValueError("临时邮箱未配置,请在 .env 文件中设置 TEMP_MAIL")
else:
# IMAP 模式
imap_configs = {
"imap_server": "IMAP服务器",
"imap_port": "IMAP端口",
"imap_user": "IMAP用户名",
"imap_pass": "IMAP密码",
}
for key, name in imap_configs.items():
value = getattr(self, key)
if value == "null" or not self.check_is_valid(value):
raise ValueError(
f"{name}未配置,请在 .env 文件中设置 {key.upper()}"
)
# IMAP_DIR 是可选的,如果设置了就检查其有效性
if self.imap_dir != "null" and not self.check_is_valid(self.imap_dir):
raise ValueError(
"IMAP收件箱目录配置无效请在 .env 文件中正确设置 IMAP_DIR"
)
def check_is_valid(self, value):
"""检查配置项是否有效
def get_api_url(self, endpoint_name: str) -> str:
"""获取API端点URL
Args:
value: 配置项的值
endpoint_name: 端点名称
Returns:
bool: 配置项是否有效
str: 完整的API URL
"""
return isinstance(value, str) and len(str(value).strip()) > 0
def print_config(self):
if self.imap:
logging.info(f"\033[32mIMAP服务器: {self.imap_server}\033[0m")
logging.info(f"\033[32mIMAP端口: {self.imap_port}\033[0m")
logging.info(f"\033[32mIMAP用户名: {self.imap_user}\033[0m")
logging.info(f"\033[32mIMAP密码: {'*' * len(self.imap_pass)}\033[0m")
logging.info(f"\033[32mIMAP收件箱目录: {self.imap_dir}\033[0m")
if self.temp_mail != "null":
logging.info(
f"\033[32m临时邮箱: {self.temp_mail}{self.temp_mail_ext}\033[0m"
)
logging.info(f"\033[32m域名: {self.domain}\033[0m")
# 使用示例
if __name__ == "__main__":
try:
config = Config()
print("环境变量加载成功!")
config.print_config()
except ValueError as e:
print(f"错误: {e}")
url = self.api_endpoints.get(endpoint_name, "")
if not url:
logging.error(f"未找到API端点: {endpoint_name}")
return url

View File

@@ -114,7 +114,7 @@ class MainWindow(QMainWindow):
def init_ui(self):
# 设置窗口基本属性
self.setWindowTitle('Cursor Pro')
self.setWindowTitle('听泉助手')
self.setMinimumSize(600, 500)
# 设置macOS风格的样式
@@ -183,7 +183,7 @@ class MainWindow(QMainWindow):
layout.setContentsMargins(20, 20, 20, 20)
# 标题
title_label = QLabel("Cursor Pro")
title_label = QLabel("听泉助手")
title_label.setProperty("title", "true")
title_label.setAlignment(Qt.AlignCenter)
layout.addWidget(title_label)
@@ -306,16 +306,51 @@ class MainWindow(QMainWindow):
QMessageBox.critical(self, "错误", f"重置失败: {str(e)}")
def activate_license(self):
license_key = self.activate_input.text().strip()
if not license_key:
QMessageBox.warning(self, "提示", "请输入激活码")
return
"""激活许可证"""
try:
# 这里添加激活码验证逻辑
QMessageBox.information(self, "成功", "激活成功")
license_key = self.activate_input.text().strip()
if not license_key:
QMessageBox.warning(self, "提示", "请输入激活码")
return
# 禁用激活按钮,防止重复点击
self.activate_input.setEnabled(False)
self.update_button.setEnabled(False)
# 显示处理中的提示
QApplication.processEvents()
# 调用激活接口
success, message, account_info = self.updater.check_activation_code(license_key)
if success:
# 更新界面显示
self.member_info.clear()
self.member_info.append(f"会员状态: 已激活")
self.member_info.append(f"到期时间: {account_info['expire_time']}")
self.member_info.append(f"剩余天数: {account_info['days_left']}")
# 显示成功消息
QMessageBox.information(self, "激活成功",
f"设备已成功激活!\n"
f"到期时间: {account_info['expire_time']}\n"
f"剩余天数: {account_info['days_left']}")
# 清空激活码输入框
self.activate_input.clear()
logging.info(f"设备激活成功,到期时间: {account_info['expire_time']}")
else:
QMessageBox.warning(self, "激活失败", message)
logging.error(f"激活失败: {message}")
except Exception as e:
QMessageBox.critical(self, "错误", f"激活失败: {str(e)}")
logging.error(f"激活过程中发生错误: {str(e)}")
QMessageBox.critical(self, "错误", f"激活过程发生错误: {str(e)}")
finally:
# 恢复按钮状态
self.activate_input.setEnabled(True)
self.update_button.setEnabled(True)
def disable_cursor_update(self):
try:
@@ -331,8 +366,8 @@ def main():
logging.info("应用程序启动")
app = QApplication(sys.argv)
app.setApplicationName("CursorPro")
app.setOrganizationName("Cursor")
app.setApplicationName("听泉助手")
app.setOrganizationName("听泉")
app.setOrganizationDomain("cursor.pro")
try:

View File

@@ -365,13 +365,44 @@ class MainWindow(QMainWindow):
QMessageBox.warning(self, "提示", "请输入激活码")
return
# TODO: 实现激活逻辑
logging.info(f"正在处理激活请求,激活码: {activation_code}")
QMessageBox.information(self, "提示", "激活功能即将实现")
# 禁用激活按钮,防止重复点击
self.activate_input.setEnabled(False)
self.update_button.setEnabled(False)
# 显示处理中的提示
QApplication.processEvents()
# 调用激活接口
success, message, account_info = self.updater.check_activation_code(activation_code)
if success:
# 更新界面显示
self.member_info.clear()
self.member_info.append(f"会员状态: 已激活")
self.member_info.append(f"到期时间: {account_info['expire_time']}")
self.member_info.append(f"剩余天数: {account_info['days_left']}")
# 显示成功消息
QMessageBox.information(self, "激活成功",
f"设备已成功激活!\n"
f"到期时间: {account_info['expire_time']}\n"
f"剩余天数: {account_info['days_left']}")
# 清空激活码输入框
self.activate_input.clear()
logging.info(f"设备激活成功,到期时间: {account_info['expire_time']}")
else:
QMessageBox.warning(self, "激活失败", message)
logging.error(f"激活失败: {message}")
except Exception as e:
logging.error(f"激活过程中发生错误: {str(e)}")
QMessageBox.warning(self, "错误", f"激活失败: {str(e)}")
QMessageBox.critical(self, "错误", f"激活过程发生错误: {str(e)}")
finally:
# 恢复按钮状态
self.activate_input.setEnabled(True)
self.update_button.setEnabled(True)
def disable_cursor_update(self):
"""禁用Cursor版本更新"""

View File

@@ -40,42 +40,50 @@ def get_cursor_paths() -> Tuple[str, str]:
OSError: 当找不到有效路径或系统不支持时抛出
"""
system = platform.system()
logger.info(f"当前操作系统: {system}")
paths_map = {
"Darwin": {
"base": "/Applications/Cursor.app/Contents/Resources/app",
"package": "package.json",
"main": "out/main.js",
},
"Windows": {
"base": os.path.join(
os.getenv("LOCALAPPDATA", ""), "Programs", "Cursor", "resources", "app"
),
"package": "package.json",
"main": "out/main.js",
},
"Linux": {
"bases": ["/opt/Cursor/resources/app", "/usr/share/cursor/resources/app"],
"package": "package.json",
"main": "out/main.js",
},
}
if system not in paths_map:
raise OSError(f"不支持的操作系统: {system}")
if system == "Linux":
for base in paths_map["Linux"]["bases"]:
pkg_path = os.path.join(base, paths_map["Linux"]["package"])
if os.path.exists(pkg_path):
return (pkg_path, os.path.join(base, paths_map["Linux"]["main"]))
if system == "Darwin": # macOS
base_path = "/Applications/Cursor.app/Contents/Resources/app"
pkg_path = os.path.join(base_path, "package.json")
main_path = os.path.join(base_path, "out", "main.js")
if not os.path.exists(pkg_path) or not os.path.exists(main_path):
raise OSError("在 macOS 系统上未找到 Cursor 安装路径")
return pkg_path, main_path
elif system == "Windows":
base_path = os.path.join(
os.getenv("LOCALAPPDATA", ""),
"Programs",
"Cursor",
"resources",
"app"
)
pkg_path = os.path.join(base_path, "package.json")
main_path = os.path.join(base_path, "out", "main.js")
if not os.path.exists(pkg_path) or not os.path.exists(main_path):
raise OSError("在 Windows 系统上未找到 Cursor 安装路径")
return pkg_path, main_path
elif system == "Linux":
linux_paths = [
"/opt/Cursor/resources/app",
"/usr/share/cursor/resources/app"
]
for base_path in linux_paths:
pkg_path = os.path.join(base_path, "package.json")
main_path = os.path.join(base_path, "out", "main.js")
if os.path.exists(pkg_path) and os.path.exists(main_path):
return pkg_path, main_path
raise OSError("在 Linux 系统上未找到 Cursor 安装路径")
base_path = paths_map[system]["base"]
return (
os.path.join(base_path, paths_map[system]["package"]),
os.path.join(base_path, paths_map[system]["main"]),
)
else:
raise OSError(f"不支持的操作系统: {system}")
def check_system_requirements(pkg_path: str, main_path: str) -> bool:
@@ -298,5 +306,24 @@ def patch_cursor_get_machine_id(restore_mode=False) -> None:
sys.exit(1)
# 添加patch函数作为主函数的别名
def patch(restore_mode=False) -> bool:
"""
patch函数作为patch_cursor_get_machine_id的别名
Args:
restore_mode: 是否为恢复模式
Returns:
bool: 修补是否成功
"""
try:
patch_cursor_get_machine_id(restore_mode)
return True
except Exception as e:
logger.error(f"修补失败: {str(e)}")
return False
if __name__ == "__main__":
patch_cursor_get_machine_id()

View File

@@ -15,12 +15,14 @@ from exit_cursor import ExitCursor
import go_cursor_help
from logo import print_logo
from typing import Tuple, Dict, Optional
import time
import requests.adapters
class CursorTokenUpdater:
def __init__(self):
self.auth_manager = CursorAuthManager()
self._hardware_id = None # 延迟初始化硬件ID
@property
def hardware_id(self) -> str:
"""获取硬件ID延迟初始化"""
@@ -30,68 +32,68 @@ class CursorTokenUpdater:
def _get_hardware_id(self) -> str:
"""获取硬件唯一标识
方案1: CPU ID + 主板序列号 + BIOS序列号
方案2: 系统盘序列号 + Windows安装时间
方案3: 计算机名(最后的备选方案)
macOS: 使用系统序列号和硬件UUID
Windows: CPU ID + 主板序列号 + BIOS序列号
其他: 计算机名(最后的备选方案)
"""
try:
# 创建startupinfo对象来隐藏命令行窗口
startupinfo = None
if sys.platform == "win32":
system = platform.system()
if system == "Darwin": # macOS
try:
# 获取系统序列号
serial_number = subprocess.check_output(['system_profiler', 'SPHardwareDataType']).decode()
serial = ""
for line in serial_number.split('\n'):
if 'Serial Number' in line:
serial = line.split(':')[1].strip()
break
# 获取硬件UUID
ioreg_output = subprocess.check_output(['ioreg', '-d2', '-c', 'IOPlatformExpertDevice']).decode()
uuid = ""
for line in ioreg_output.split('\n'):
if 'IOPlatformUUID' in line:
uuid = line.split('=')[1].strip().replace('"', '').replace(' ', '')
break
if serial and uuid:
combined = f"{serial}:{uuid}"
hardware_id = hashlib.md5(combined.encode()).hexdigest()
logging.info("使用macOS硬件信息生成ID成功")
return hardware_id
except Exception as e:
logging.warning(f"获取macOS硬件信息失败: {str(e)}")
elif system == "Windows":
# 创建startupinfo对象来隐藏命令行窗口
startupinfo = subprocess.STARTUPINFO()
startupinfo.dwFlags |= subprocess.STARTF_USESHOWWINDOW
startupinfo.wShowWindow = subprocess.SW_HIDE
# 方案1: 尝试获取硬件信息
try:
# 获取CPU ID
cpu_info = subprocess.check_output('wmic cpu get ProcessorId', startupinfo=startupinfo).decode()
cpu_id = cpu_info.split('\n')[1].strip()
# 获取主板序列号
board_info = subprocess.check_output('wmic baseboard get SerialNumber', startupinfo=startupinfo).decode()
board_id = board_info.split('\n')[1].strip()
# 获取BIOS序列号
bios_info = subprocess.check_output('wmic bios get SerialNumber', startupinfo=startupinfo).decode()
bios_id = bios_info.split('\n')[1].strip()
# 如果所有信息都获取成功且有效
if all([cpu_id, board_id, bios_id]) and not all(x in ['', '0', 'None', 'To be filled by O.E.M.'] for x in [cpu_id, board_id, bios_id]):
combined = f"{cpu_id}:{board_id}:{bios_id}"
hardware_id = hashlib.md5(combined.encode()).hexdigest()
logging.info("使用硬件信息生成ID成功")
return hardware_id
try:
# 获取CPU ID
cpu_info = subprocess.check_output('wmic cpu get ProcessorId', startupinfo=startupinfo).decode()
cpu_id = cpu_info.split('\n')[1].strip()
except Exception as e:
logging.warning(f"方案1失败: {str(e)}")
# 方案2: 系统盘序列号 + Windows安装时间
try:
backup_info = []
# 获取系统盘序列号
volume_info = subprocess.check_output('wmic logicaldisk where "DeviceID=\'C:\'" get VolumeSerialNumber', startupinfo=startupinfo).decode()
volume_serial = volume_info.split('\n')[1].strip()
if volume_serial and volume_serial not in ['', '0']:
backup_info.append(("volume", volume_serial))
# 获取Windows安装时间
os_info = subprocess.check_output('wmic os get InstallDate', startupinfo=startupinfo).decode()
install_date = os_info.split('\n')[1].strip()
if install_date:
backup_info.append(("install", install_date))
if backup_info:
combined = "|".join(f"{k}:{v}" for k, v in sorted(backup_info))
hardware_id = hashlib.md5(combined.encode()).hexdigest()
logging.info("使用系统信息生成ID成功")
return hardware_id
# 获取主板序列号
board_info = subprocess.check_output('wmic baseboard get SerialNumber', startupinfo=startupinfo).decode()
board_id = board_info.split('\n')[1].strip()
except Exception as e:
logging.warning(f"方案2失败: {str(e)}")
# 获取BIOS序列号
bios_info = subprocess.check_output('wmic bios get SerialNumber', startupinfo=startupinfo).decode()
bios_id = bios_info.split('\n')[1].strip()
# 如果所有信息都获取成功且有效
if all([cpu_id, board_id, bios_id]) and not all(x in ['', '0', 'None', 'To be filled by O.E.M.'] for x in [cpu_id, board_id, bios_id]):
combined = f"{cpu_id}:{board_id}:{bios_id}"
hardware_id = hashlib.md5(combined.encode()).hexdigest()
logging.info("使用Windows硬件信息生成ID成功")
return hardware_id
except Exception as e:
logging.warning(f"获取Windows硬件信息失败: {str(e)}")
# 方案3: 使用计算机名(最后的备选方案)
# 最后的备选方案:使用计算机名
computer_name = platform.node()
if computer_name:
hardware_id = hashlib.md5(computer_name.encode()).hexdigest()
@@ -227,17 +229,23 @@ class CursorTokenUpdater:
"""
try:
logging.info("开始重置机器码...")
resetter = MachineIDResetter()
result = resetter.reset(greater_than_0_45)
if result:
logging.info("机器码重置成功")
# 重置后更新硬件ID缓存
self._hardware_id = None
if greater_than_0_45:
# 对于0.45以上版本使用go_cursor_help
go_cursor_help.go_cursor_help()
logging.info("已调用go_cursor_help重置机器码")
return True
else:
logging.error("机器码重置失败")
return False
# 对于0.45及以下版本,使用传统方式
resetter = MachineIDResetter()
result = resetter.reset_machine_ids()
if result:
logging.info("机器码重置成功")
# 重置后更新硬件ID缓存
self._hardware_id = None
return True
else:
logging.error("机器码重置失败")
return False
except Exception as e:
logging.error(f"重置机器码时发生错误: {str(e)}")
@@ -268,10 +276,8 @@ class CursorTokenUpdater:
"""
try:
logging.info("正在退出Cursor进程...")
exit_handler = ExitCursor()
exit_handler.exit()
logging.info("Cursor进程已退出")
return True
result = ExitCursor()
return result
except Exception as e:
logging.error(f"退出Cursor进程时发生错误: {str(e)}")
return False
@@ -291,34 +297,175 @@ class CursorTokenUpdater:
print_logo()
logging.info("=== 开始完整更新流程 ===")
# 1. 退出Cursor进程
if not self.exit_cursor():
return False
# 2. 修补机器码获取方法
if not self.patch_machine_id():
return False
# 3. 重置机器码
if not self.reset_machine_id(greater_than_0_45=True):
return False
# 4. 如果没有提供认证信息从API获取
if not all([email, access_token]):
success, error_msg, account_info = self.get_unused_account()
if not success:
logging.error(f"无法获取账号信息: {error_msg}")
try:
# 1. 退出Cursor进程
if not self.exit_cursor():
logging.error("退出Cursor进程失败")
return False
email = account_info["email"]
access_token = account_info["access_token"]
refresh_token = account_info["refresh_token"]
# 2. 修补机器码获取方法
if not self.patch_machine_id():
logging.error("修补机器码获取方法失败")
return False
# 3. 重置机器码
# 对于0.45以上版本使用go_cursor_help
try:
go_cursor_help.go_cursor_help()
logging.info("已调用go_cursor_help重置机器码")
except Exception as e:
logging.error(f"使用go_cursor_help重置机器码失败: {str(e)}")
# 如果go_cursor_help失败尝试使用传统方式
if not self.reset_machine_id(greater_than_0_45=False):
logging.error("重置机器码失败")
return False
# 4. 如果没有提供认证信息从API获取
if not all([email, access_token]):
success, error_msg, account_info = self.get_unused_account()
if not success:
logging.error(f"无法获取账号信息: {error_msg}")
return False
email = account_info["email"]
access_token = account_info["access_token"]
refresh_token = account_info["refresh_token"]
# 5. 更新认证信息
if not self.update_auth_info(email, access_token, refresh_token):
logging.error("更新认证信息失败")
return False
logging.info("=== 所有操作已完成 ===")
return True
# 5. 更新认证信息
if not self.update_auth_info(email, access_token, refresh_token):
except Exception as e:
logging.error(f"更新流程发生错误: {str(e)}")
return False
def _get_network_error_message(self, error: Exception) -> str:
"""获取网络错误的友好提示信息"""
if isinstance(error, requests.exceptions.ConnectTimeout):
return "连接服务器超时,请检查网络连接"
elif isinstance(error, requests.exceptions.ConnectionError):
return "无法连接到服务器,请检查网络连接"
elif isinstance(error, requests.exceptions.ReadTimeout):
return "读取服务器响应超时,请重试"
else:
return str(error)
logging.info("=== 所有操作已完成 ===")
return True
def get_device_info(self) -> dict:
"""获取设备信息"""
return {
"os": platform.system(),
"os_version": platform.version(),
"machine": platform.machine(),
"hostname": platform.node(),
"hardware_id": self.hardware_id
}
def check_activation_code(self, code: str) -> tuple:
"""检查激活码
Args:
code: 激活码
Returns:
tuple: (成功标志, 消息, 账号信息)
"""
max_retries = 3 # 最大重试次数
retry_delay = 1 # 重试间隔(秒)
for attempt in range(max_retries):
try:
data = {
"machine_id": self.hardware_id,
"code": code
}
# 禁用SSL警告
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
# 设置请求参数
request_kwargs = {
"json": data,
"headers": {
"Content-Type": "application/json",
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/121.0.0.0 Safari/537.36",
"Accept": "*/*",
"Connection": "keep-alive"
},
"timeout": 10, # 增加超时时间
"verify": False # 禁用SSL验证
}
# 创建session
session = requests.Session()
session.verify = False
# 设置重试策略
retry_strategy = urllib3.Retry(
total=3, # 总重试次数
backoff_factor=0.5, # 重试间隔
status_forcelist=[500, 502, 503, 504] # 需要重试的HTTP状态码
)
adapter = requests.adapters.HTTPAdapter(max_retries=retry_strategy)
session.mount("http://", adapter)
session.mount("https://", adapter)
try:
# 尝试发送请求
response = session.post(
"https://cursorapi.nosqli.com/admin/api.member/activate",
**request_kwargs
)
response.raise_for_status() # 检查HTTP状态码
result = response.json()
# 激活成功
if result["code"] == 200:
api_data = result["data"]
# 构造标准的返回数据结构
account_info = {
"status": "active",
"expire_time": api_data.get("expire_time", ""),
"total_days": api_data.get("total_days", 0),
"days_left": api_data.get("days_left", 0),
"device_info": self.get_device_info()
}
return True, result["msg"], account_info
# 激活码无效或已被使用
elif result["code"] == 400:
logging.warning(f"激活码无效或已被使用: {result.get('msg', '未知错误')}")
return False, result.get("msg", "激活码无效或已被使用"), None
# 其他错误情况
else:
error_msg = result.get("msg", "未知错误")
if attempt < max_retries - 1: # 如果还有重试机会
logging.warning(f"{attempt + 1}次尝试失败: {error_msg}, 准备重试...")
time.sleep(retry_delay)
continue
logging.error(f"激活失败: {error_msg}")
return False, error_msg, None
except requests.exceptions.RequestException as e:
if attempt < max_retries - 1: # 如果还有重试机会
logging.warning(f"{attempt + 1}次网络请求失败: {str(e)}, 准备重试...")
time.sleep(retry_delay)
continue
error_msg = self._get_network_error_message(e)
logging.error(f"网络请求失败: {error_msg}")
return False, f"网络连接失败: {error_msg}", None
except Exception as e:
if attempt < max_retries - 1: # 如果还有重试机会
logging.warning(f"{attempt + 1}次请求发生错误: {str(e)}, 准备重试...")
time.sleep(retry_delay)
continue
logging.error(f"激活失败: {str(e)}")
return False, f"激活失败: {str(e)}", None
# 如果所有重试都失败了
return False, "多次尝试后激活失败,请检查网络连接或稍后重试", None
def main():
updater = CursorTokenUpdater()