472 lines
17 KiB
Python
472 lines
17 KiB
Python
import os
|
||
import platform
|
||
import json
|
||
import sys
|
||
import hashlib
|
||
import subprocess
|
||
import uuid
|
||
import requests
|
||
from exit_cursor import ExitCursor
|
||
from logger import logging
|
||
from cursor_auth_manager import CursorAuthManager
|
||
import go_cursor_help
|
||
import patch_cursor_get_machine_id
|
||
from reset_machine import MachineIDResetter
|
||
from logo import print_logo
|
||
from typing import Optional, Tuple, Dict, Any
|
||
from urllib3.exceptions import InsecureRequestWarning
|
||
import urllib3
|
||
import time
|
||
|
||
# 禁用不安全请求警告
|
||
requests.packages.urllib3.disable_warnings(InsecureRequestWarning)
|
||
|
||
# 定义 EMOJI 字典
|
||
EMOJI = {"ERROR": "❌", "WARNING": "⚠️", "INFO": "ℹ️"}
|
||
|
||
|
||
def check_cursor_version():
|
||
"""检查cursor版本"""
|
||
pkg_path, main_path = patch_cursor_get_machine_id.get_cursor_paths()
|
||
with open(pkg_path, "r", encoding="utf-8") as f:
|
||
version = json.load(f)["version"]
|
||
return patch_cursor_get_machine_id.version_check(version, min_version="0.45.0")
|
||
|
||
|
||
def reset_machine_id(greater_than_0_45):
|
||
if greater_than_0_45:
|
||
# 提示请手动执行脚本 https://github.com/chengazhen/cursor-auto-free/blob/main/patch_cursor_get_machine_id.py
|
||
go_cursor_help.go_cursor_help()
|
||
else:
|
||
MachineIDResetter().reset_machine_ids()
|
||
|
||
|
||
def print_end_message():
|
||
logging.info("\n\n\n\n\n")
|
||
logging.info("=" * 30)
|
||
logging.info("所有操作已完成")
|
||
|
||
|
||
|
||
def get_account_from_api() -> tuple[bool, dict]:
|
||
"""从API获取账号信息
|
||
|
||
Returns:
|
||
tuple[bool, dict]: (是否成功, 账号信息)
|
||
"""
|
||
try:
|
||
# 获取设备唯一ID
|
||
hardware_id = get_mac_unique_id()
|
||
logging.info(f"设备唯一ID: {hardware_id}")
|
||
|
||
endpoint = "https://cursorapi.nosqli.com/admin/api.account/getUnused"
|
||
data = {
|
||
"machine_id": hardware_id
|
||
}
|
||
headers = {
|
||
"Content-Type": "application/json"
|
||
}
|
||
|
||
import urllib3
|
||
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
|
||
|
||
request_kwargs = {
|
||
"json": data,
|
||
"headers": headers,
|
||
"timeout": 30,
|
||
"verify": False
|
||
}
|
||
|
||
try:
|
||
try:
|
||
response = requests.post(endpoint, **request_kwargs)
|
||
except requests.exceptions.SSLError:
|
||
import ssl
|
||
ssl_context = ssl.create_default_context()
|
||
ssl_context.check_hostname = False
|
||
ssl_context.verify_mode = ssl.CERT_NONE
|
||
|
||
session = requests.Session()
|
||
session.verify = False
|
||
response = session.post(endpoint, **request_kwargs)
|
||
|
||
response_data = response.json()
|
||
|
||
if response_data.get("code") == 200:
|
||
account_data = response_data.get("data", {})
|
||
return True, account_data
|
||
else:
|
||
error_msg = response_data.get("msg", "未知错误")
|
||
logging.error(f"获取未使用账号失败: {error_msg}")
|
||
return False, {}
|
||
|
||
except Exception as e:
|
||
logging.error(f"API请求失败: {str(e)}")
|
||
return False, {}
|
||
|
||
except Exception as e:
|
||
logging.error(f"获取账号过程出错: {str(e)}")
|
||
return False, {}
|
||
|
||
|
||
def get_mac_unique_id() -> str:
|
||
"""
|
||
获取Mac设备的唯一ID(32位MD5)
|
||
组合以下信息生成唯一标识:
|
||
1. 硬件UUID
|
||
2. 系统序列号
|
||
3. 主板序列号
|
||
4. CPU信息
|
||
"""
|
||
def run_cmd(cmd: str) -> str:
|
||
try:
|
||
result = subprocess.check_output(cmd, shell=True).decode('utf-8').strip()
|
||
return result
|
||
except:
|
||
return ""
|
||
|
||
# 收集系统信息
|
||
identifiers = []
|
||
|
||
# 1. 获取硬件UUID
|
||
hw_uuid = run_cmd("ioreg -d2 -c IOPlatformExpertDevice | awk -F\\\" '/IOPlatformUUID/{print $(NF-1)}'")
|
||
identifiers.append(hw_uuid)
|
||
|
||
# 2. 获取系统序列号
|
||
serial = run_cmd("ioreg -d2 -c IOPlatformExpertDevice | awk -F\\\" '/IOPlatformSerialNumber/{print $(NF-1)}'")
|
||
identifiers.append(serial)
|
||
|
||
# 3. 获取主板信息
|
||
board_id = run_cmd("ioreg -d2 -c IOPlatformExpertDevice | awk -F\\\" '/board-id/{print $(NF-1)}'")
|
||
identifiers.append(board_id)
|
||
|
||
# 4. 获取CPU信息
|
||
cpu_info = run_cmd("sysctl -n machdep.cpu.brand_string")
|
||
identifiers.append(cpu_info)
|
||
|
||
# 如果以上方法都失败,使用备用方法
|
||
if not any(identifiers):
|
||
# 使用 UUID 模块获取 UUID(不太稳定,仅作为备用)
|
||
identifiers = [
|
||
str(uuid.getnode()), # MAC 地址的整数表示
|
||
platform.machine(), # CPU 架构
|
||
platform.system(), # 操作系统名称
|
||
platform.version() # 操作系统版本
|
||
]
|
||
|
||
# 组合所有标识符并生成MD5
|
||
unique_string = "".join(filter(None, identifiers))
|
||
return hashlib.md5(unique_string.encode()).hexdigest()
|
||
|
||
|
||
class CursorAccountManager:
|
||
def __init__(self):
|
||
self.base_url = "https://cursorapi.nosqli.com"
|
||
self.api_endpoints = {
|
||
"activate": f"{self.base_url}/admin/api.member/activate",
|
||
"status": f"{self.base_url}/admin/api.member/status",
|
||
"get_unused": f"{self.base_url}/admin/api.account/getUnused",
|
||
"heartbeat": f"{self.base_url}/admin/api.member/heartbeat"
|
||
}
|
||
self.hardware_id = get_mac_unique_id()
|
||
|
||
def get_device_info(self) -> dict:
|
||
"""获取设备信息"""
|
||
return {
|
||
"system": platform.system(),
|
||
"device_name": platform.node(),
|
||
"ip": self._get_ip_address(),
|
||
"location": self._get_location()
|
||
}
|
||
|
||
def _get_ip_address(self) -> str:
|
||
"""获取IP地址"""
|
||
try:
|
||
response = requests.get('https://api.ipify.org?format=json', timeout=5)
|
||
return response.json()['ip']
|
||
except:
|
||
return "未知"
|
||
|
||
def _get_location(self) -> str:
|
||
"""获取地理位置"""
|
||
try:
|
||
ip = self._get_ip_address()
|
||
if ip != "未知":
|
||
response = requests.get(f'http://ip-api.com/json/{ip}', timeout=5)
|
||
data = response.json()
|
||
if data.get('status') == 'success':
|
||
return f"{data.get('country', '')} {data.get('city', '')}"
|
||
except:
|
||
pass
|
||
return "未知"
|
||
|
||
def check_member_status(self) -> tuple[bool, dict]:
|
||
"""检查会员状态
|
||
|
||
Returns:
|
||
tuple[bool, dict]: (是否成功, 状态信息)
|
||
"""
|
||
try:
|
||
data = {
|
||
"machine_id": self.hardware_id
|
||
}
|
||
|
||
api_url = self.api_endpoints["status"]
|
||
logging.info(f"正在检查会员状态...")
|
||
|
||
request_kwargs = {
|
||
"json": data,
|
||
"headers": {"Content-Type": "application/json"},
|
||
"timeout": 2,
|
||
"verify": False
|
||
}
|
||
|
||
session = requests.Session()
|
||
session.verify = False
|
||
|
||
try:
|
||
response = session.post(api_url, **request_kwargs)
|
||
except requests.exceptions.Timeout:
|
||
logging.warning("首次请求超时,正在重试...")
|
||
response = session.post(api_url, **request_kwargs)
|
||
|
||
result = response.json()
|
||
logging.info(f"状态检查响应: {result}")
|
||
|
||
if result.get("code") in [1, 200]:
|
||
api_data = result.get("data", {})
|
||
status_data = {
|
||
"is_active": api_data.get("status") == "active",
|
||
"expire_time": api_data.get("expire_time", ""),
|
||
"total_days": api_data.get("total_days", 0),
|
||
"days_left": api_data.get("days_left", 0),
|
||
"device_info": self.get_device_info()
|
||
}
|
||
return True, status_data
|
||
else:
|
||
error_msg = result.get("msg", "未知错误")
|
||
logging.error(f"获取状态失败: {error_msg}")
|
||
return False, {
|
||
"is_active": False,
|
||
"expire_time": "",
|
||
"total_days": 0,
|
||
"days_left": 0,
|
||
"device_info": self.get_device_info()
|
||
}
|
||
|
||
except Exception as e:
|
||
logging.error(f"获取会员状态失败: {str(e)}")
|
||
return False, {
|
||
"is_active": False,
|
||
"expire_time": "",
|
||
"total_days": 0,
|
||
"days_left": 0,
|
||
"device_info": self.get_device_info()
|
||
}
|
||
|
||
def check_activation_code(self, code: str) -> tuple[bool, str, dict | None]:
|
||
"""检查激活码
|
||
|
||
Args:
|
||
code: 激活码
|
||
|
||
Returns:
|
||
tuple: (成功标志, 消息, 账号信息)
|
||
"""
|
||
max_retries = 3 # 最大重试次数
|
||
retry_delay = 1 # 重试间隔(秒)
|
||
|
||
for attempt in range(max_retries):
|
||
try:
|
||
data = {
|
||
"machine_id": self.hardware_id,
|
||
"code": code
|
||
}
|
||
|
||
# 设置请求参数
|
||
request_kwargs = {
|
||
"json": data,
|
||
"headers": {
|
||
"Content-Type": "application/json",
|
||
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/121.0.0.0 Safari/537.36",
|
||
"Accept": "*/*",
|
||
"Connection": "keep-alive"
|
||
},
|
||
"timeout": 10,
|
||
"verify": False
|
||
}
|
||
|
||
# 创建session
|
||
session = requests.Session()
|
||
session.verify = False
|
||
|
||
# 设置重试策略
|
||
retry_strategy = urllib3.Retry(
|
||
total=3,
|
||
backoff_factor=0.5,
|
||
status_forcelist=[500, 502, 503, 504]
|
||
)
|
||
adapter = requests.adapters.HTTPAdapter(max_retries=retry_strategy)
|
||
session.mount("http://", adapter)
|
||
session.mount("https://", adapter)
|
||
|
||
try:
|
||
# 尝试发送请求
|
||
response = session.post(
|
||
self.api_endpoints["activate"],
|
||
**request_kwargs
|
||
)
|
||
response.raise_for_status()
|
||
|
||
result = response.json()
|
||
logging.info(f"激活响应: {result}")
|
||
|
||
# 激活成功
|
||
if result["code"] == 200:
|
||
api_data = result["data"]
|
||
account_info = {
|
||
"status": "active",
|
||
"expire_time": api_data.get("expire_time", ""),
|
||
"total_days": api_data.get("total_days", 0),
|
||
"days_left": api_data.get("days_left", 0),
|
||
"device_info": self.get_device_info()
|
||
}
|
||
return True, result["msg"], account_info
|
||
# 激活码无效或已被使用
|
||
elif result["code"] == 400:
|
||
logging.warning(f"激活码无效或已被使用: {result.get('msg', '未知错误')}")
|
||
return False, result.get("msg", "激活码无效或已被使用"), None
|
||
# 其他错误情况
|
||
else:
|
||
error_msg = result.get("msg", "未知错误")
|
||
if attempt < max_retries - 1:
|
||
logging.warning(f"第{attempt + 1}次尝试失败: {error_msg}, 准备重试...")
|
||
time.sleep(retry_delay)
|
||
continue
|
||
logging.error(f"激活失败: {error_msg}")
|
||
return False, error_msg, None
|
||
|
||
except requests.exceptions.RequestException as e:
|
||
if attempt < max_retries - 1:
|
||
logging.warning(f"第{attempt + 1}次网络请求失败: {str(e)}, 准备重试...")
|
||
time.sleep(retry_delay)
|
||
continue
|
||
logging.error(f"网络请求失败: {str(e)}")
|
||
return False, f"网络连接失败: {str(e)}", None
|
||
|
||
except Exception as e:
|
||
if attempt < max_retries - 1:
|
||
logging.warning(f"第{attempt + 1}次请求发生错误: {str(e)}, 准备重试...")
|
||
time.sleep(retry_delay)
|
||
continue
|
||
logging.error(f"激活失败: {str(e)}")
|
||
return False, f"激活失败: {str(e)}", None
|
||
|
||
return False, "多次尝试后激活失败,请检查网络连接或稍后重试", None
|
||
|
||
|
||
def reset_auth_with_password(password: str = None) -> tuple[bool, str]:
|
||
"""
|
||
封装重置授权的完整流程
|
||
|
||
Args:
|
||
password: 系统密码(可选)
|
||
|
||
Returns:
|
||
tuple[bool, str]: (是否成功, 消息)
|
||
"""
|
||
try:
|
||
logging.info("\n=== 初始化重置流程 ===")
|
||
greater_than_0_45 = check_cursor_version()
|
||
|
||
logging.info("正在从API获取账号信息...")
|
||
success, account_data = get_account_from_api()
|
||
|
||
if not success:
|
||
return False, "获取账号信息失败"
|
||
|
||
email = account_data.get("email", "")
|
||
access_token = account_data.get("access_token", "")
|
||
refresh_token = account_data.get("refresh_token", "")
|
||
|
||
if not all([email, access_token, refresh_token]):
|
||
return False, "账号信息不完整"
|
||
|
||
logging.info(f"获取到账号信息:\n邮箱: {email}")
|
||
|
||
# 更新认证信息
|
||
logging.info("正在更新认证信息...")
|
||
auth_manager = CursorAuthManager()
|
||
if auth_manager.update_auth(email, access_token, refresh_token):
|
||
logging.info("认证信息更新成功")
|
||
|
||
# 重置机器码
|
||
logging.info("正在重置机器码...")
|
||
|
||
# 如果提供了密码,设置环境变量
|
||
if password:
|
||
import os
|
||
os.environ['SUDO_PASSWORD'] = password
|
||
|
||
reset_machine_id(greater_than_0_45)
|
||
logging.info("重置完成")
|
||
return True, "重置成功"
|
||
else:
|
||
return False, "更新认证信息失败"
|
||
|
||
except Exception as e:
|
||
logging.error(f"重置过程出错: {str(e)}")
|
||
import traceback
|
||
logging.error(traceback.format_exc())
|
||
return False, f"重置失败: {str(e)}"
|
||
|
||
|
||
if __name__ == "__main__":
|
||
print_logo()
|
||
greater_than_0_45 = check_cursor_version()
|
||
browser_manager = None
|
||
try:
|
||
logging.info("\n=== 初始化程序 ===")
|
||
# ExitCursor()
|
||
|
||
logging.info("正在从API获取账号信息...")
|
||
success, account_data = get_account_from_api()
|
||
|
||
if not success:
|
||
logging.error("获取账号信息失败")
|
||
sys.exit(1)
|
||
|
||
email = account_data.get("email", "")
|
||
access_token = account_data.get("access_token", "")
|
||
refresh_token = account_data.get("refresh_token", "")
|
||
expire_time = account_data.get("expire_time", "")
|
||
days_left = account_data.get("days_left", 0)
|
||
|
||
if not all([email, access_token, refresh_token]):
|
||
logging.error("账号信息不完整")
|
||
sys.exit(1)
|
||
|
||
logging.info(f"获取到账号信息:\n邮箱: {email}\n到期时间: {expire_time}\n剩余天数: {days_left}")
|
||
|
||
# 更新认证信息
|
||
logging.info("正在更新认证信息...")
|
||
auth_manager = CursorAuthManager()
|
||
if auth_manager.update_auth(email, access_token, refresh_token):
|
||
logging.info("认证信息更新成功")
|
||
# 重置机器码
|
||
logging.info("正在重置机器码...")
|
||
reset_machine_id(greater_than_0_45)
|
||
logging.info("所有操作已完成")
|
||
print_end_message()
|
||
else:
|
||
logging.error("更新认证信息失败")
|
||
|
||
except Exception as e:
|
||
logging.error(f"程序执行出现错误: {str(e)}")
|
||
import traceback
|
||
logging.error(traceback.format_exc())
|
||
finally:
|
||
if browser_manager:
|
||
browser_manager.quit()
|
||
input("\n程序执行完毕,按回车键退出...")
|