576 lines
23 KiB
Python
576 lines
23 KiB
Python
import os
|
||
import json
|
||
import platform
|
||
import requests
|
||
import urllib3
|
||
import ssl
|
||
import sys
|
||
import subprocess
|
||
import hashlib
|
||
from cursor_auth_manager import CursorAuthManager
|
||
from logger import logging
|
||
from reset_machine import MachineIDResetter
|
||
import patch_cursor_get_machine_id
|
||
from exit_cursor import ExitCursor
|
||
import go_cursor_help
|
||
from logo import print_logo
|
||
from typing import Tuple, Dict, Optional
|
||
import time
|
||
import requests.adapters
|
||
|
||
class CursorTokenUpdater:
|
||
def __init__(self):
|
||
self.auth_manager = CursorAuthManager()
|
||
self._hardware_id = None # 延迟初始化硬件ID
|
||
|
||
@property
|
||
def hardware_id(self) -> str:
|
||
"""获取硬件ID(延迟初始化)"""
|
||
if self._hardware_id is None:
|
||
self._hardware_id = self._get_hardware_id()
|
||
return self._hardware_id
|
||
|
||
def _get_hardware_id(self) -> str:
|
||
"""获取硬件唯一标识
|
||
macOS: 使用系统序列号和硬件UUID
|
||
Windows: CPU ID + 主板序列号 + BIOS序列号
|
||
其他: 计算机名(最后的备选方案)
|
||
"""
|
||
try:
|
||
system = platform.system()
|
||
|
||
if system == "Darwin": # macOS
|
||
try:
|
||
# 获取系统序列号
|
||
serial_number = subprocess.check_output(['system_profiler', 'SPHardwareDataType']).decode()
|
||
serial = ""
|
||
for line in serial_number.split('\n'):
|
||
if 'Serial Number' in line:
|
||
serial = line.split(':')[1].strip()
|
||
break
|
||
|
||
# 获取硬件UUID
|
||
ioreg_output = subprocess.check_output(['ioreg', '-d2', '-c', 'IOPlatformExpertDevice']).decode()
|
||
uuid = ""
|
||
for line in ioreg_output.split('\n'):
|
||
if 'IOPlatformUUID' in line:
|
||
uuid = line.split('=')[1].strip().replace('"', '').replace(' ', '')
|
||
break
|
||
|
||
if serial and uuid:
|
||
combined = f"{serial}:{uuid}"
|
||
hardware_id = hashlib.md5(combined.encode()).hexdigest()
|
||
logging.info("使用macOS硬件信息生成ID成功")
|
||
return hardware_id
|
||
except Exception as e:
|
||
logging.warning(f"获取macOS硬件信息失败: {str(e)}")
|
||
|
||
elif system == "Windows":
|
||
# 创建startupinfo对象来隐藏命令行窗口
|
||
startupinfo = subprocess.STARTUPINFO()
|
||
startupinfo.dwFlags |= subprocess.STARTF_USESHOWWINDOW
|
||
startupinfo.wShowWindow = subprocess.SW_HIDE
|
||
|
||
try:
|
||
# 获取CPU ID
|
||
cpu_info = subprocess.check_output('wmic cpu get ProcessorId', startupinfo=startupinfo).decode()
|
||
cpu_id = cpu_info.split('\n')[1].strip()
|
||
|
||
# 获取主板序列号
|
||
board_info = subprocess.check_output('wmic baseboard get SerialNumber', startupinfo=startupinfo).decode()
|
||
board_id = board_info.split('\n')[1].strip()
|
||
|
||
# 获取BIOS序列号
|
||
bios_info = subprocess.check_output('wmic bios get SerialNumber', startupinfo=startupinfo).decode()
|
||
bios_id = bios_info.split('\n')[1].strip()
|
||
|
||
# 如果所有信息都获取成功且有效
|
||
if all([cpu_id, board_id, bios_id]) and not all(x in ['', '0', 'None', 'To be filled by O.E.M.'] for x in [cpu_id, board_id, bios_id]):
|
||
combined = f"{cpu_id}:{board_id}:{bios_id}"
|
||
hardware_id = hashlib.md5(combined.encode()).hexdigest()
|
||
logging.info("使用Windows硬件信息生成ID成功")
|
||
return hardware_id
|
||
except Exception as e:
|
||
logging.warning(f"获取Windows硬件信息失败: {str(e)}")
|
||
|
||
# 最后的备选方案:使用计算机名
|
||
computer_name = platform.node()
|
||
if computer_name:
|
||
hardware_id = hashlib.md5(computer_name.encode()).hexdigest()
|
||
logging.info("使用计算机名生成ID成功")
|
||
return hardware_id
|
||
|
||
raise ValueError("无法获取任何可用信息来生成硬件ID")
|
||
|
||
except Exception as e:
|
||
error_msg = f"生成硬件ID失败: {str(e)}"
|
||
logging.error(error_msg)
|
||
raise RuntimeError(error_msg)
|
||
|
||
def get_unused_account(self) -> Tuple[bool, str, Optional[Dict]]:
|
||
"""
|
||
从API获取未使用的账号
|
||
|
||
Returns:
|
||
Tuple[bool, str, Optional[Dict]]:
|
||
- 是否成功
|
||
- 错误信息
|
||
- 账号数据(如果成功)
|
||
"""
|
||
endpoint = "https://cursorapi.nosqli.com/admin/api.account/getUnused"
|
||
data = {
|
||
"machine_id": self.hardware_id
|
||
}
|
||
headers = {
|
||
"Content-Type": "application/json"
|
||
}
|
||
|
||
# 禁用SSL警告
|
||
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
|
||
|
||
request_kwargs = {
|
||
"json": data,
|
||
"headers": headers,
|
||
"timeout": 30,
|
||
"verify": False
|
||
}
|
||
|
||
try:
|
||
try:
|
||
response = requests.post(endpoint, **request_kwargs)
|
||
except requests.exceptions.SSLError:
|
||
# SSL错误时使用自定义SSL上下文
|
||
ssl_context = ssl.create_default_context()
|
||
ssl_context.check_hostname = False
|
||
ssl_context.verify_mode = ssl.CERT_NONE
|
||
|
||
session = requests.Session()
|
||
session.verify = False
|
||
response = session.post(endpoint, **request_kwargs)
|
||
|
||
response_data = response.json()
|
||
|
||
if response_data.get("code") == 200:
|
||
account_data = response_data.get("data", {})
|
||
|
||
# 获取账号信息
|
||
email = account_data.get("email", "")
|
||
access_token = account_data.get("access_token", "")
|
||
refresh_token = account_data.get("refresh_token", "")
|
||
expire_time = account_data.get("expire_time", "")
|
||
days_left = account_data.get("days_left", 0)
|
||
|
||
if not all([email, access_token, refresh_token]):
|
||
return False, "获取账号信息不完整", None
|
||
|
||
account_info = {
|
||
"email": email,
|
||
"access_token": access_token,
|
||
"refresh_token": refresh_token,
|
||
"expire_time": expire_time,
|
||
"days_left": days_left
|
||
}
|
||
|
||
logging.info(f"成功获取账号信息 - 邮箱: {email}, 剩余天数: {days_left}")
|
||
return True, "", account_info
|
||
else:
|
||
error_msg = response_data.get("msg", "未知错误")
|
||
return False, f"API返回错误: {error_msg}", None
|
||
|
||
except Exception as e:
|
||
error_msg = f"获取账号时发生错误: {str(e)}"
|
||
logging.error(error_msg)
|
||
return False, error_msg, None
|
||
|
||
def update_auth_info(self, email: str, access_token: str, refresh_token: str = None) -> bool:
|
||
"""
|
||
更新Cursor的认证信息
|
||
|
||
Args:
|
||
email: 用户邮箱
|
||
access_token: 访问令牌
|
||
refresh_token: 刷新令牌(如果没有提供,将使用access_token)
|
||
|
||
Returns:
|
||
bool: 更新是否成功
|
||
"""
|
||
try:
|
||
# 如果没有提供refresh_token,使用access_token
|
||
if refresh_token is None:
|
||
refresh_token = access_token
|
||
|
||
# 更新认证信息
|
||
result = self.auth_manager.update_auth(
|
||
email=email,
|
||
access_token=access_token,
|
||
refresh_token=refresh_token
|
||
)
|
||
|
||
if result:
|
||
logging.info(f"认证信息更新成功 - 邮箱: {email}")
|
||
return True
|
||
else:
|
||
logging.error("认证信息更新失败")
|
||
return False
|
||
|
||
except Exception as e:
|
||
logging.error(f"更新认证信息时发生错误: {str(e)}")
|
||
return False
|
||
|
||
def reset_machine_id(self, greater_than_0_45: bool = True) -> bool:
|
||
"""
|
||
重置机器码
|
||
|
||
Args:
|
||
greater_than_0_45: 是否大于0.45版本
|
||
|
||
Returns:
|
||
bool: 重置是否成功
|
||
"""
|
||
try:
|
||
logging.info("开始重置机器码...")
|
||
if greater_than_0_45:
|
||
# 对于0.45以上版本,使用go_cursor_help
|
||
go_cursor_help.go_cursor_help()
|
||
logging.info("已调用go_cursor_help重置机器码")
|
||
return True
|
||
else:
|
||
# 对于0.45及以下版本,使用传统方式
|
||
resetter = MachineIDResetter()
|
||
result = resetter.reset_machine_ids()
|
||
if result:
|
||
logging.info("机器码重置成功")
|
||
# 重置后更新硬件ID缓存
|
||
self._hardware_id = None
|
||
return True
|
||
else:
|
||
logging.error("机器码重置失败")
|
||
return False
|
||
|
||
except Exception as e:
|
||
logging.error(f"重置机器码时发生错误: {str(e)}")
|
||
return False
|
||
|
||
def patch_machine_id(self) -> bool:
|
||
"""
|
||
修补机器码获取方法
|
||
|
||
Returns:
|
||
bool: 修补是否成功
|
||
"""
|
||
try:
|
||
logging.info("开始修补机器码获取方法...")
|
||
patch_cursor_get_machine_id.patch()
|
||
logging.info("机器码获取方法修补完成")
|
||
return True
|
||
except Exception as e:
|
||
logging.error(f"修补机器码获取方法时发生错误: {str(e)}")
|
||
return False
|
||
|
||
def exit_cursor(self) -> bool:
|
||
"""
|
||
退出Cursor进程
|
||
|
||
Returns:
|
||
bool: 退出是否成功
|
||
"""
|
||
try:
|
||
logging.info("正在退出Cursor进程...")
|
||
result = ExitCursor()
|
||
return result
|
||
except Exception as e:
|
||
logging.error(f"退出Cursor进程时发生错误: {str(e)}")
|
||
return False
|
||
|
||
def full_update_process(self, email: str = None, access_token: str = None, refresh_token: str = None) -> bool:
|
||
"""
|
||
执行完整的更新流程
|
||
|
||
Args:
|
||
email: 用户邮箱(可选,如果不提供则从API获取)
|
||
access_token: 访问令牌(可选,如果不提供则从API获取)
|
||
refresh_token: 刷新令牌(可选,如果不提供则从API获取)
|
||
|
||
Returns:
|
||
bool: 更新流程是否全部成功
|
||
"""
|
||
print_logo()
|
||
logging.info("=== 开始完整更新流程 ===")
|
||
|
||
try:
|
||
# 1. 退出Cursor进程
|
||
if not self.exit_cursor():
|
||
logging.error("退出Cursor进程失败")
|
||
return False
|
||
|
||
# 2. 修补机器码获取方法
|
||
if not self.patch_machine_id():
|
||
logging.error("修补机器码获取方法失败")
|
||
return False
|
||
|
||
# 3. 重置机器码
|
||
# 对于0.45以上版本,使用go_cursor_help
|
||
try:
|
||
go_cursor_help.go_cursor_help()
|
||
logging.info("已调用go_cursor_help重置机器码")
|
||
except Exception as e:
|
||
logging.error(f"使用go_cursor_help重置机器码失败: {str(e)}")
|
||
# 如果go_cursor_help失败,尝试使用传统方式
|
||
if not self.reset_machine_id(greater_than_0_45=False):
|
||
logging.error("重置机器码失败")
|
||
return False
|
||
|
||
# 4. 如果没有提供认证信息,从API获取
|
||
if not all([email, access_token]):
|
||
success, error_msg, account_info = self.get_unused_account()
|
||
if not success:
|
||
logging.error(f"无法获取账号信息: {error_msg}")
|
||
return False
|
||
email = account_info["email"]
|
||
access_token = account_info["access_token"]
|
||
refresh_token = account_info["refresh_token"]
|
||
|
||
# 5. 更新认证信息
|
||
if not self.update_auth_info(email, access_token, refresh_token):
|
||
logging.error("更新认证信息失败")
|
||
return False
|
||
|
||
logging.info("=== 所有操作已完成 ===")
|
||
return True
|
||
|
||
except Exception as e:
|
||
logging.error(f"更新流程发生错误: {str(e)}")
|
||
return False
|
||
|
||
def _get_network_error_message(self, error: Exception) -> str:
|
||
"""获取网络错误的友好提示信息"""
|
||
if isinstance(error, requests.exceptions.ConnectTimeout):
|
||
return "连接服务器超时,请检查网络连接"
|
||
elif isinstance(error, requests.exceptions.ConnectionError):
|
||
return "无法连接到服务器,请检查网络连接"
|
||
elif isinstance(error, requests.exceptions.ReadTimeout):
|
||
return "读取服务器响应超时,请重试"
|
||
else:
|
||
return str(error)
|
||
|
||
def get_device_info(self) -> dict:
|
||
"""获取设备信息"""
|
||
return {
|
||
"os": platform.system(),
|
||
"os_version": platform.version(),
|
||
"machine": platform.machine(),
|
||
"hostname": platform.node(),
|
||
"hardware_id": self.hardware_id
|
||
}
|
||
|
||
def check_activation_code(self, code: str) -> tuple:
|
||
"""检查激活码
|
||
|
||
Args:
|
||
code: 激活码
|
||
|
||
Returns:
|
||
tuple: (成功标志, 消息, 账号信息)
|
||
"""
|
||
max_retries = 3 # 最大重试次数
|
||
retry_delay = 1 # 重试间隔(秒)
|
||
|
||
for attempt in range(max_retries):
|
||
try:
|
||
data = {
|
||
"machine_id": self.hardware_id,
|
||
"code": code
|
||
}
|
||
|
||
# 禁用SSL警告
|
||
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
|
||
|
||
# 设置请求参数
|
||
request_kwargs = {
|
||
"json": data,
|
||
"headers": {
|
||
"Content-Type": "application/json",
|
||
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/121.0.0.0 Safari/537.36",
|
||
"Accept": "*/*",
|
||
"Connection": "keep-alive"
|
||
},
|
||
"timeout": 10, # 增加超时时间
|
||
"verify": False # 禁用SSL验证
|
||
}
|
||
|
||
# 创建session
|
||
session = requests.Session()
|
||
session.verify = False
|
||
|
||
# 设置重试策略
|
||
retry_strategy = urllib3.Retry(
|
||
total=3, # 总重试次数
|
||
backoff_factor=0.5, # 重试间隔
|
||
status_forcelist=[500, 502, 503, 504] # 需要重试的HTTP状态码
|
||
)
|
||
adapter = requests.adapters.HTTPAdapter(max_retries=retry_strategy)
|
||
session.mount("http://", adapter)
|
||
session.mount("https://", adapter)
|
||
|
||
try:
|
||
# 尝试发送请求
|
||
response = session.post(
|
||
"https://cursorapi.nosqli.com/admin/api.member/activate",
|
||
**request_kwargs
|
||
)
|
||
response.raise_for_status() # 检查HTTP状态码
|
||
|
||
result = response.json()
|
||
# 激活成功
|
||
if result["code"] == 200:
|
||
api_data = result["data"]
|
||
# 构造标准的返回数据结构
|
||
account_info = {
|
||
"status": "active",
|
||
"expire_time": api_data.get("expire_time", ""),
|
||
"total_days": api_data.get("total_days", 0),
|
||
"days_left": api_data.get("days_left", 0),
|
||
"device_info": self.get_device_info()
|
||
}
|
||
return True, result["msg"], account_info
|
||
# 激活码无效或已被使用
|
||
elif result["code"] == 400:
|
||
logging.warning(f"激活码无效或已被使用: {result.get('msg', '未知错误')}")
|
||
return False, result.get("msg", "激活码无效或已被使用"), None
|
||
# 其他错误情况
|
||
else:
|
||
error_msg = result.get("msg", "未知错误")
|
||
if attempt < max_retries - 1: # 如果还有重试机会
|
||
logging.warning(f"第{attempt + 1}次尝试失败: {error_msg}, 准备重试...")
|
||
time.sleep(retry_delay)
|
||
continue
|
||
logging.error(f"激活失败: {error_msg}")
|
||
return False, error_msg, None
|
||
|
||
except requests.exceptions.RequestException as e:
|
||
if attempt < max_retries - 1: # 如果还有重试机会
|
||
logging.warning(f"第{attempt + 1}次网络请求失败: {str(e)}, 准备重试...")
|
||
time.sleep(retry_delay)
|
||
continue
|
||
error_msg = self._get_network_error_message(e)
|
||
logging.error(f"网络请求失败: {error_msg}")
|
||
return False, f"网络连接失败: {error_msg}", None
|
||
|
||
except Exception as e:
|
||
if attempt < max_retries - 1: # 如果还有重试机会
|
||
logging.warning(f"第{attempt + 1}次请求发生错误: {str(e)}, 准备重试...")
|
||
time.sleep(retry_delay)
|
||
continue
|
||
logging.error(f"激活失败: {str(e)}")
|
||
return False, f"激活失败: {str(e)}", None
|
||
|
||
# 如果所有重试都失败了
|
||
return False, "多次尝试后激活失败,请检查网络连接或稍后重试", None
|
||
|
||
def check_member_status(self) -> Tuple[bool, str, Optional[Dict]]:
|
||
"""检查会员状态
|
||
|
||
Returns:
|
||
Tuple[bool, str, Optional[Dict]]:
|
||
- 是否成功
|
||
- 错误信息
|
||
- 账号数据(如果成功)
|
||
"""
|
||
max_retries = 3
|
||
retry_delay = 1
|
||
|
||
for attempt in range(max_retries):
|
||
try:
|
||
data = {
|
||
"machine_id": self.hardware_id
|
||
}
|
||
|
||
# 禁用SSL警告
|
||
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
|
||
|
||
# 设置请求参数
|
||
request_kwargs = {
|
||
"json": data,
|
||
"headers": {
|
||
"Content-Type": "application/json",
|
||
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/121.0.0.0 Safari/537.36",
|
||
"Accept": "*/*",
|
||
"Connection": "keep-alive"
|
||
},
|
||
"timeout": 10,
|
||
"verify": False
|
||
}
|
||
|
||
# 创建session
|
||
session = requests.Session()
|
||
session.verify = False
|
||
|
||
# 设置重试策略
|
||
retry_strategy = urllib3.Retry(
|
||
total=3,
|
||
backoff_factor=0.5,
|
||
status_forcelist=[500, 502, 503, 504]
|
||
)
|
||
adapter = requests.adapters.HTTPAdapter(max_retries=retry_strategy)
|
||
session.mount("http://", adapter)
|
||
session.mount("https://", adapter)
|
||
|
||
try:
|
||
# 发送请求
|
||
response = session.post(
|
||
"https://cursorapi.nosqli.com/admin/api.member/status",
|
||
**request_kwargs
|
||
)
|
||
response.raise_for_status()
|
||
|
||
result = response.json()
|
||
if result["code"] == 200:
|
||
api_data = result["data"]
|
||
account_info = {
|
||
"status": "active",
|
||
"expire_time": api_data.get("expire_time", ""),
|
||
"days_left": api_data.get("days_left", 0),
|
||
"device_info": self.get_device_info()
|
||
}
|
||
return True, "success", account_info
|
||
else:
|
||
error_msg = result.get("msg", "未知错误")
|
||
if attempt < max_retries - 1:
|
||
logging.warning(f"第{attempt + 1}次检查失败: {error_msg}, 准备重试...")
|
||
time.sleep(retry_delay)
|
||
continue
|
||
return False, error_msg, None
|
||
|
||
except requests.exceptions.RequestException as e:
|
||
if attempt < max_retries - 1:
|
||
logging.warning(f"第{attempt + 1}次网络请求失败: {str(e)}, 准备重试...")
|
||
time.sleep(retry_delay)
|
||
continue
|
||
error_msg = self._get_network_error_message(e)
|
||
return False, f"网络连接失败: {error_msg}", None
|
||
|
||
except Exception as e:
|
||
if attempt < max_retries - 1:
|
||
logging.warning(f"第{attempt + 1}次请求发生错误: {str(e)}, 准备重试...")
|
||
time.sleep(retry_delay)
|
||
continue
|
||
return False, f"检查失败: {str(e)}", None
|
||
|
||
return False, "多次尝试后检查失败,请稍后重试", None
|
||
|
||
def main():
|
||
updater = CursorTokenUpdater()
|
||
|
||
# 从环境变量获取认证信息(可选)
|
||
|
||
|
||
# 如果环境变量中有认证信息,使用环境变量中的信息
|
||
# 否则,将从API获取新的账号信息
|
||
success = updater.full_update_process(
|
||
|
||
)
|
||
|
||
print("更新状态:", "成功" if success else "失败")
|
||
|
||
if __name__ == "__main__":
|
||
main() |