Files
nezhacursormac/update_cursor_token.py

576 lines
23 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import os
import json
import platform
import requests
import urllib3
import ssl
import sys
import subprocess
import hashlib
from cursor_auth_manager import CursorAuthManager
from logger import logging
from reset_machine import MachineIDResetter
import patch_cursor_get_machine_id
from exit_cursor import ExitCursor
import go_cursor_help
from logo import print_logo
from typing import Tuple, Dict, Optional
import time
import requests.adapters
class CursorTokenUpdater:
def __init__(self):
self.auth_manager = CursorAuthManager()
self._hardware_id = None # 延迟初始化硬件ID
@property
def hardware_id(self) -> str:
"""获取硬件ID延迟初始化"""
if self._hardware_id is None:
self._hardware_id = self._get_hardware_id()
return self._hardware_id
def _get_hardware_id(self) -> str:
"""获取硬件唯一标识
macOS: 使用系统序列号和硬件UUID
Windows: CPU ID + 主板序列号 + BIOS序列号
其他: 计算机名(最后的备选方案)
"""
try:
system = platform.system()
if system == "Darwin": # macOS
try:
# 获取系统序列号
serial_number = subprocess.check_output(['system_profiler', 'SPHardwareDataType']).decode()
serial = ""
for line in serial_number.split('\n'):
if 'Serial Number' in line:
serial = line.split(':')[1].strip()
break
# 获取硬件UUID
ioreg_output = subprocess.check_output(['ioreg', '-d2', '-c', 'IOPlatformExpertDevice']).decode()
uuid = ""
for line in ioreg_output.split('\n'):
if 'IOPlatformUUID' in line:
uuid = line.split('=')[1].strip().replace('"', '').replace(' ', '')
break
if serial and uuid:
combined = f"{serial}:{uuid}"
hardware_id = hashlib.md5(combined.encode()).hexdigest()
logging.info("使用macOS硬件信息生成ID成功")
return hardware_id
except Exception as e:
logging.warning(f"获取macOS硬件信息失败: {str(e)}")
elif system == "Windows":
# 创建startupinfo对象来隐藏命令行窗口
startupinfo = subprocess.STARTUPINFO()
startupinfo.dwFlags |= subprocess.STARTF_USESHOWWINDOW
startupinfo.wShowWindow = subprocess.SW_HIDE
try:
# 获取CPU ID
cpu_info = subprocess.check_output('wmic cpu get ProcessorId', startupinfo=startupinfo).decode()
cpu_id = cpu_info.split('\n')[1].strip()
# 获取主板序列号
board_info = subprocess.check_output('wmic baseboard get SerialNumber', startupinfo=startupinfo).decode()
board_id = board_info.split('\n')[1].strip()
# 获取BIOS序列号
bios_info = subprocess.check_output('wmic bios get SerialNumber', startupinfo=startupinfo).decode()
bios_id = bios_info.split('\n')[1].strip()
# 如果所有信息都获取成功且有效
if all([cpu_id, board_id, bios_id]) and not all(x in ['', '0', 'None', 'To be filled by O.E.M.'] for x in [cpu_id, board_id, bios_id]):
combined = f"{cpu_id}:{board_id}:{bios_id}"
hardware_id = hashlib.md5(combined.encode()).hexdigest()
logging.info("使用Windows硬件信息生成ID成功")
return hardware_id
except Exception as e:
logging.warning(f"获取Windows硬件信息失败: {str(e)}")
# 最后的备选方案:使用计算机名
computer_name = platform.node()
if computer_name:
hardware_id = hashlib.md5(computer_name.encode()).hexdigest()
logging.info("使用计算机名生成ID成功")
return hardware_id
raise ValueError("无法获取任何可用信息来生成硬件ID")
except Exception as e:
error_msg = f"生成硬件ID失败: {str(e)}"
logging.error(error_msg)
raise RuntimeError(error_msg)
def get_unused_account(self) -> Tuple[bool, str, Optional[Dict]]:
"""
从API获取未使用的账号
Returns:
Tuple[bool, str, Optional[Dict]]:
- 是否成功
- 错误信息
- 账号数据(如果成功)
"""
endpoint = "https://cursorapi.nosqli.com/admin/api.account/getUnused"
data = {
"machine_id": self.hardware_id
}
headers = {
"Content-Type": "application/json"
}
# 禁用SSL警告
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
request_kwargs = {
"json": data,
"headers": headers,
"timeout": 30,
"verify": False
}
try:
try:
response = requests.post(endpoint, **request_kwargs)
except requests.exceptions.SSLError:
# SSL错误时使用自定义SSL上下文
ssl_context = ssl.create_default_context()
ssl_context.check_hostname = False
ssl_context.verify_mode = ssl.CERT_NONE
session = requests.Session()
session.verify = False
response = session.post(endpoint, **request_kwargs)
response_data = response.json()
if response_data.get("code") == 200:
account_data = response_data.get("data", {})
# 获取账号信息
email = account_data.get("email", "")
access_token = account_data.get("access_token", "")
refresh_token = account_data.get("refresh_token", "")
expire_time = account_data.get("expire_time", "")
days_left = account_data.get("days_left", 0)
if not all([email, access_token, refresh_token]):
return False, "获取账号信息不完整", None
account_info = {
"email": email,
"access_token": access_token,
"refresh_token": refresh_token,
"expire_time": expire_time,
"days_left": days_left
}
logging.info(f"成功获取账号信息 - 邮箱: {email}, 剩余天数: {days_left}")
return True, "", account_info
else:
error_msg = response_data.get("msg", "未知错误")
return False, f"API返回错误: {error_msg}", None
except Exception as e:
error_msg = f"获取账号时发生错误: {str(e)}"
logging.error(error_msg)
return False, error_msg, None
def update_auth_info(self, email: str, access_token: str, refresh_token: str = None) -> bool:
"""
更新Cursor的认证信息
Args:
email: 用户邮箱
access_token: 访问令牌
refresh_token: 刷新令牌如果没有提供将使用access_token
Returns:
bool: 更新是否成功
"""
try:
# 如果没有提供refresh_token使用access_token
if refresh_token is None:
refresh_token = access_token
# 更新认证信息
result = self.auth_manager.update_auth(
email=email,
access_token=access_token,
refresh_token=refresh_token
)
if result:
logging.info(f"认证信息更新成功 - 邮箱: {email}")
return True
else:
logging.error("认证信息更新失败")
return False
except Exception as e:
logging.error(f"更新认证信息时发生错误: {str(e)}")
return False
def reset_machine_id(self, greater_than_0_45: bool = True) -> bool:
"""
重置机器码
Args:
greater_than_0_45: 是否大于0.45版本
Returns:
bool: 重置是否成功
"""
try:
logging.info("开始重置机器码...")
if greater_than_0_45:
# 对于0.45以上版本使用go_cursor_help
go_cursor_help.go_cursor_help()
logging.info("已调用go_cursor_help重置机器码")
return True
else:
# 对于0.45及以下版本,使用传统方式
resetter = MachineIDResetter()
result = resetter.reset_machine_ids()
if result:
logging.info("机器码重置成功")
# 重置后更新硬件ID缓存
self._hardware_id = None
return True
else:
logging.error("机器码重置失败")
return False
except Exception as e:
logging.error(f"重置机器码时发生错误: {str(e)}")
return False
def patch_machine_id(self) -> bool:
"""
修补机器码获取方法
Returns:
bool: 修补是否成功
"""
try:
logging.info("开始修补机器码获取方法...")
patch_cursor_get_machine_id.patch()
logging.info("机器码获取方法修补完成")
return True
except Exception as e:
logging.error(f"修补机器码获取方法时发生错误: {str(e)}")
return False
def exit_cursor(self) -> bool:
"""
退出Cursor进程
Returns:
bool: 退出是否成功
"""
try:
logging.info("正在退出Cursor进程...")
result = ExitCursor()
return result
except Exception as e:
logging.error(f"退出Cursor进程时发生错误: {str(e)}")
return False
def full_update_process(self, email: str = None, access_token: str = None, refresh_token: str = None) -> bool:
"""
执行完整的更新流程
Args:
email: 用户邮箱可选如果不提供则从API获取
access_token: 访问令牌可选如果不提供则从API获取
refresh_token: 刷新令牌可选如果不提供则从API获取
Returns:
bool: 更新流程是否全部成功
"""
print_logo()
logging.info("=== 开始完整更新流程 ===")
try:
# 1. 退出Cursor进程
if not self.exit_cursor():
logging.error("退出Cursor进程失败")
return False
# 2. 修补机器码获取方法
if not self.patch_machine_id():
logging.error("修补机器码获取方法失败")
return False
# 3. 重置机器码
# 对于0.45以上版本使用go_cursor_help
try:
go_cursor_help.go_cursor_help()
logging.info("已调用go_cursor_help重置机器码")
except Exception as e:
logging.error(f"使用go_cursor_help重置机器码失败: {str(e)}")
# 如果go_cursor_help失败尝试使用传统方式
if not self.reset_machine_id(greater_than_0_45=False):
logging.error("重置机器码失败")
return False
# 4. 如果没有提供认证信息从API获取
if not all([email, access_token]):
success, error_msg, account_info = self.get_unused_account()
if not success:
logging.error(f"无法获取账号信息: {error_msg}")
return False
email = account_info["email"]
access_token = account_info["access_token"]
refresh_token = account_info["refresh_token"]
# 5. 更新认证信息
if not self.update_auth_info(email, access_token, refresh_token):
logging.error("更新认证信息失败")
return False
logging.info("=== 所有操作已完成 ===")
return True
except Exception as e:
logging.error(f"更新流程发生错误: {str(e)}")
return False
def _get_network_error_message(self, error: Exception) -> str:
"""获取网络错误的友好提示信息"""
if isinstance(error, requests.exceptions.ConnectTimeout):
return "连接服务器超时,请检查网络连接"
elif isinstance(error, requests.exceptions.ConnectionError):
return "无法连接到服务器,请检查网络连接"
elif isinstance(error, requests.exceptions.ReadTimeout):
return "读取服务器响应超时,请重试"
else:
return str(error)
def get_device_info(self) -> dict:
"""获取设备信息"""
return {
"os": platform.system(),
"os_version": platform.version(),
"machine": platform.machine(),
"hostname": platform.node(),
"hardware_id": self.hardware_id
}
def check_activation_code(self, code: str) -> tuple:
"""检查激活码
Args:
code: 激活码
Returns:
tuple: (成功标志, 消息, 账号信息)
"""
max_retries = 3 # 最大重试次数
retry_delay = 1 # 重试间隔(秒)
for attempt in range(max_retries):
try:
data = {
"machine_id": self.hardware_id,
"code": code
}
# 禁用SSL警告
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
# 设置请求参数
request_kwargs = {
"json": data,
"headers": {
"Content-Type": "application/json",
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/121.0.0.0 Safari/537.36",
"Accept": "*/*",
"Connection": "keep-alive"
},
"timeout": 10, # 增加超时时间
"verify": False # 禁用SSL验证
}
# 创建session
session = requests.Session()
session.verify = False
# 设置重试策略
retry_strategy = urllib3.Retry(
total=3, # 总重试次数
backoff_factor=0.5, # 重试间隔
status_forcelist=[500, 502, 503, 504] # 需要重试的HTTP状态码
)
adapter = requests.adapters.HTTPAdapter(max_retries=retry_strategy)
session.mount("http://", adapter)
session.mount("https://", adapter)
try:
# 尝试发送请求
response = session.post(
"https://cursorapi.nosqli.com/admin/api.member/activate",
**request_kwargs
)
response.raise_for_status() # 检查HTTP状态码
result = response.json()
# 激活成功
if result["code"] == 200:
api_data = result["data"]
# 构造标准的返回数据结构
account_info = {
"status": "active",
"expire_time": api_data.get("expire_time", ""),
"total_days": api_data.get("total_days", 0),
"days_left": api_data.get("days_left", 0),
"device_info": self.get_device_info()
}
return True, result["msg"], account_info
# 激活码无效或已被使用
elif result["code"] == 400:
logging.warning(f"激活码无效或已被使用: {result.get('msg', '未知错误')}")
return False, result.get("msg", "激活码无效或已被使用"), None
# 其他错误情况
else:
error_msg = result.get("msg", "未知错误")
if attempt < max_retries - 1: # 如果还有重试机会
logging.warning(f"{attempt + 1}次尝试失败: {error_msg}, 准备重试...")
time.sleep(retry_delay)
continue
logging.error(f"激活失败: {error_msg}")
return False, error_msg, None
except requests.exceptions.RequestException as e:
if attempt < max_retries - 1: # 如果还有重试机会
logging.warning(f"{attempt + 1}次网络请求失败: {str(e)}, 准备重试...")
time.sleep(retry_delay)
continue
error_msg = self._get_network_error_message(e)
logging.error(f"网络请求失败: {error_msg}")
return False, f"网络连接失败: {error_msg}", None
except Exception as e:
if attempt < max_retries - 1: # 如果还有重试机会
logging.warning(f"{attempt + 1}次请求发生错误: {str(e)}, 准备重试...")
time.sleep(retry_delay)
continue
logging.error(f"激活失败: {str(e)}")
return False, f"激活失败: {str(e)}", None
# 如果所有重试都失败了
return False, "多次尝试后激活失败,请检查网络连接或稍后重试", None
def check_member_status(self) -> Tuple[bool, str, Optional[Dict]]:
"""检查会员状态
Returns:
Tuple[bool, str, Optional[Dict]]:
- 是否成功
- 错误信息
- 账号数据(如果成功)
"""
max_retries = 3
retry_delay = 1
for attempt in range(max_retries):
try:
data = {
"machine_id": self.hardware_id
}
# 禁用SSL警告
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
# 设置请求参数
request_kwargs = {
"json": data,
"headers": {
"Content-Type": "application/json",
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/121.0.0.0 Safari/537.36",
"Accept": "*/*",
"Connection": "keep-alive"
},
"timeout": 10,
"verify": False
}
# 创建session
session = requests.Session()
session.verify = False
# 设置重试策略
retry_strategy = urllib3.Retry(
total=3,
backoff_factor=0.5,
status_forcelist=[500, 502, 503, 504]
)
adapter = requests.adapters.HTTPAdapter(max_retries=retry_strategy)
session.mount("http://", adapter)
session.mount("https://", adapter)
try:
# 发送请求
response = session.post(
"https://cursorapi.nosqli.com/admin/api.member/status",
**request_kwargs
)
response.raise_for_status()
result = response.json()
if result["code"] == 200:
api_data = result["data"]
account_info = {
"status": "active",
"expire_time": api_data.get("expire_time", ""),
"days_left": api_data.get("days_left", 0),
"device_info": self.get_device_info()
}
return True, "success", account_info
else:
error_msg = result.get("msg", "未知错误")
if attempt < max_retries - 1:
logging.warning(f"{attempt + 1}次检查失败: {error_msg}, 准备重试...")
time.sleep(retry_delay)
continue
return False, error_msg, None
except requests.exceptions.RequestException as e:
if attempt < max_retries - 1:
logging.warning(f"{attempt + 1}次网络请求失败: {str(e)}, 准备重试...")
time.sleep(retry_delay)
continue
error_msg = self._get_network_error_message(e)
return False, f"网络连接失败: {error_msg}", None
except Exception as e:
if attempt < max_retries - 1:
logging.warning(f"{attempt + 1}次请求发生错误: {str(e)}, 准备重试...")
time.sleep(retry_delay)
continue
return False, f"检查失败: {str(e)}", None
return False, "多次尝试后检查失败,请稍后重试", None
def main():
updater = CursorTokenUpdater()
# 从环境变量获取认证信息(可选)
# 如果环境变量中有认证信息,使用环境变量中的信息
# 否则将从API获取新的账号信息
success = updater.full_update_process(
)
print("更新状态:", "成功" if success else "失败")
if __name__ == "__main__":
main()