import os import platform import json import sys import hashlib import subprocess import uuid import requests from exit_cursor import ExitCursor from logger import logging from cursor_auth_manager import CursorAuthManager import go_cursor_help import patch_cursor_get_machine_id from reset_machine import MachineIDResetter from logo import print_logo from typing import Optional, Tuple, Dict, Any from urllib3.exceptions import InsecureRequestWarning import urllib3 import time # 禁用不安全请求警告 requests.packages.urllib3.disable_warnings(InsecureRequestWarning) # 定义 EMOJI 字典 EMOJI = {"ERROR": "❌", "WARNING": "⚠️", "INFO": "ℹ️"} def check_cursor_version(): """检查cursor版本""" pkg_path, main_path = patch_cursor_get_machine_id.get_cursor_paths() with open(pkg_path, "r", encoding="utf-8") as f: version = json.load(f)["version"] return patch_cursor_get_machine_id.version_check(version, min_version="0.45.0") def reset_machine_id(greater_than_0_45): if greater_than_0_45: # 提示请手动执行脚本 https://github.com/chengazhen/cursor-auto-free/blob/main/patch_cursor_get_machine_id.py go_cursor_help.go_cursor_help() else: MachineIDResetter().reset_machine_ids() def print_end_message(): logging.info("\n\n\n\n\n") logging.info("=" * 30) logging.info("所有操作已完成") def get_account_from_api() -> tuple[bool, dict]: """从API获取账号信息 Returns: tuple[bool, dict]: (是否成功, 账号信息) """ try: # 获取设备唯一ID hardware_id = get_mac_unique_id() logging.info(f"设备唯一ID: {hardware_id}") endpoint = "https://cursorapi.nosqli.com/admin/api.account/getUnused" data = { "machine_id": hardware_id } headers = { "Content-Type": "application/json" } import urllib3 urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) request_kwargs = { "json": data, "headers": headers, "timeout": 30, "verify": False } try: try: response = requests.post(endpoint, **request_kwargs) except requests.exceptions.SSLError: import ssl ssl_context = ssl.create_default_context() ssl_context.check_hostname = False ssl_context.verify_mode = ssl.CERT_NONE session = requests.Session() session.verify = False response = session.post(endpoint, **request_kwargs) response_data = response.json() if response_data.get("code") == 200: account_data = response_data.get("data", {}) return True, account_data else: error_msg = response_data.get("msg", "未知错误") logging.error(f"获取未使用账号失败: {error_msg}") return False, {} except Exception as e: logging.error(f"API请求失败: {str(e)}") return False, {} except Exception as e: logging.error(f"获取账号过程出错: {str(e)}") return False, {} def get_mac_unique_id() -> str: """ 获取Mac设备的唯一ID(32位MD5) 组合以下信息生成唯一标识: 1. 硬件UUID 2. 系统序列号 3. 主板序列号 4. CPU信息 """ def run_cmd(cmd: str) -> str: try: result = subprocess.check_output(cmd, shell=True).decode('utf-8').strip() return result except: return "" # 收集系统信息 identifiers = [] # 1. 获取硬件UUID hw_uuid = run_cmd("ioreg -d2 -c IOPlatformExpertDevice | awk -F\\\" '/IOPlatformUUID/{print $(NF-1)}'") identifiers.append(hw_uuid) # 2. 获取系统序列号 serial = run_cmd("ioreg -d2 -c IOPlatformExpertDevice | awk -F\\\" '/IOPlatformSerialNumber/{print $(NF-1)}'") identifiers.append(serial) # 3. 获取主板信息 board_id = run_cmd("ioreg -d2 -c IOPlatformExpertDevice | awk -F\\\" '/board-id/{print $(NF-1)}'") identifiers.append(board_id) # 4. 获取CPU信息 cpu_info = run_cmd("sysctl -n machdep.cpu.brand_string") identifiers.append(cpu_info) # 如果以上方法都失败,使用备用方法 if not any(identifiers): # 使用 UUID 模块获取 UUID(不太稳定,仅作为备用) identifiers = [ str(uuid.getnode()), # MAC 地址的整数表示 platform.machine(), # CPU 架构 platform.system(), # 操作系统名称 platform.version() # 操作系统版本 ] # 组合所有标识符并生成MD5 unique_string = "".join(filter(None, identifiers)) return hashlib.md5(unique_string.encode()).hexdigest() class CursorAccountManager: def __init__(self): self.base_url = "https://cursorapi.nosqli.com" self.api_endpoints = { "activate": f"{self.base_url}/admin/api.member/activate", "status": f"{self.base_url}/admin/api.member/status", "get_unused": f"{self.base_url}/admin/api.account/getUnused", "heartbeat": f"{self.base_url}/admin/api.member/heartbeat" } self.hardware_id = get_mac_unique_id() def get_device_info(self) -> dict: """获取设备信息""" return { "system": platform.system(), "device_name": platform.node(), "ip": self._get_ip_address(), "location": self._get_location() } def _get_ip_address(self) -> str: """获取IP地址""" try: response = requests.get('https://api.ipify.org?format=json', timeout=5) return response.json()['ip'] except: return "未知" def _get_location(self) -> str: """获取地理位置""" try: ip = self._get_ip_address() if ip != "未知": response = requests.get(f'http://ip-api.com/json/{ip}', timeout=5) data = response.json() if data.get('status') == 'success': return f"{data.get('country', '')} {data.get('city', '')}" except: pass return "未知" def check_member_status(self) -> tuple[bool, dict]: """检查会员状态 Returns: tuple[bool, dict]: (是否成功, 状态信息) """ try: data = { "machine_id": self.hardware_id } api_url = self.api_endpoints["status"] logging.info(f"正在检查会员状态...") request_kwargs = { "json": data, "headers": {"Content-Type": "application/json"}, "timeout": 2, "verify": False } session = requests.Session() session.verify = False try: response = session.post(api_url, **request_kwargs) except requests.exceptions.Timeout: logging.warning("首次请求超时,正在重试...") response = session.post(api_url, **request_kwargs) result = response.json() logging.info(f"状态检查响应: {result}") if result.get("code") in [1, 200]: api_data = result.get("data", {}) status_data = { "is_active": api_data.get("status") == "active", "expire_time": api_data.get("expire_time", ""), "total_days": api_data.get("total_days", 0), "days_left": api_data.get("days_left", 0), "device_info": self.get_device_info() } return True, status_data else: error_msg = result.get("msg", "未知错误") logging.error(f"获取状态失败: {error_msg}") return False, { "is_active": False, "expire_time": "", "total_days": 0, "days_left": 0, "device_info": self.get_device_info() } except Exception as e: logging.error(f"获取会员状态失败: {str(e)}") return False, { "is_active": False, "expire_time": "", "total_days": 0, "days_left": 0, "device_info": self.get_device_info() } def check_activation_code(self, code: str) -> tuple[bool, str, dict | None]: """检查激活码 Args: code: 激活码 Returns: tuple: (成功标志, 消息, 账号信息) """ max_retries = 3 # 最大重试次数 retry_delay = 1 # 重试间隔(秒) for attempt in range(max_retries): try: data = { "machine_id": self.hardware_id, "code": code } # 设置请求参数 request_kwargs = { "json": data, "headers": { "Content-Type": "application/json", "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/121.0.0.0 Safari/537.36", "Accept": "*/*", "Connection": "keep-alive" }, "timeout": 10, "verify": False } # 创建session session = requests.Session() session.verify = False # 设置重试策略 retry_strategy = urllib3.Retry( total=3, backoff_factor=0.5, status_forcelist=[500, 502, 503, 504] ) adapter = requests.adapters.HTTPAdapter(max_retries=retry_strategy) session.mount("http://", adapter) session.mount("https://", adapter) try: # 尝试发送请求 response = session.post( self.api_endpoints["activate"], **request_kwargs ) response.raise_for_status() result = response.json() logging.info(f"激活响应: {result}") # 激活成功 if result["code"] == 200: api_data = result["data"] account_info = { "status": "active", "expire_time": api_data.get("expire_time", ""), "total_days": api_data.get("total_days", 0), "days_left": api_data.get("days_left", 0), "device_info": self.get_device_info() } return True, result["msg"], account_info # 激活码无效或已被使用 elif result["code"] == 400: logging.warning(f"激活码无效或已被使用: {result.get('msg', '未知错误')}") return False, result.get("msg", "激活码无效或已被使用"), None # 其他错误情况 else: error_msg = result.get("msg", "未知错误") if attempt < max_retries - 1: logging.warning(f"第{attempt + 1}次尝试失败: {error_msg}, 准备重试...") time.sleep(retry_delay) continue logging.error(f"激活失败: {error_msg}") return False, error_msg, None except requests.exceptions.RequestException as e: if attempt < max_retries - 1: logging.warning(f"第{attempt + 1}次网络请求失败: {str(e)}, 准备重试...") time.sleep(retry_delay) continue logging.error(f"网络请求失败: {str(e)}") return False, f"网络连接失败: {str(e)}", None except Exception as e: if attempt < max_retries - 1: logging.warning(f"第{attempt + 1}次请求发生错误: {str(e)}, 准备重试...") time.sleep(retry_delay) continue logging.error(f"激活失败: {str(e)}") return False, f"激活失败: {str(e)}", None return False, "多次尝试后激活失败,请检查网络连接或稍后重试", None def reset_auth_with_password(password: str = None) -> tuple[bool, str]: """ 封装重置授权的完整流程 Args: password: 系统密码(可选) Returns: tuple[bool, str]: (是否成功, 消息) """ try: logging.info("\n=== 初始化重置流程 ===") greater_than_0_45 = check_cursor_version() logging.info("正在从API获取账号信息...") success, account_data = get_account_from_api() if not success: return False, "获取账号信息失败" email = account_data.get("email", "") access_token = account_data.get("access_token", "") refresh_token = account_data.get("refresh_token", "") if not all([email, access_token, refresh_token]): return False, "账号信息不完整" logging.info(f"获取到账号信息:\n邮箱: {email}") # 更新认证信息 logging.info("正在更新认证信息...") auth_manager = CursorAuthManager() if auth_manager.update_auth(email, access_token, refresh_token): logging.info("认证信息更新成功") # 重置机器码 logging.info("正在重置机器码...") # 如果提供了密码,设置环境变量 if password: import os os.environ['SUDO_PASSWORD'] = password reset_machine_id(greater_than_0_45) logging.info("重置完成") return True, "重置成功" else: return False, "更新认证信息失败" except Exception as e: logging.error(f"重置过程出错: {str(e)}") import traceback logging.error(traceback.format_exc()) return False, f"重置失败: {str(e)}" if __name__ == "__main__": print_logo() greater_than_0_45 = check_cursor_version() browser_manager = None try: logging.info("\n=== 初始化程序 ===") # ExitCursor() logging.info("正在从API获取账号信息...") success, account_data = get_account_from_api() if not success: logging.error("获取账号信息失败") sys.exit(1) email = account_data.get("email", "") access_token = account_data.get("access_token", "") refresh_token = account_data.get("refresh_token", "") expire_time = account_data.get("expire_time", "") days_left = account_data.get("days_left", 0) if not all([email, access_token, refresh_token]): logging.error("账号信息不完整") sys.exit(1) logging.info(f"获取到账号信息:\n邮箱: {email}\n到期时间: {expire_time}\n剩余天数: {days_left}") # 更新认证信息 logging.info("正在更新认证信息...") auth_manager = CursorAuthManager() if auth_manager.update_auth(email, access_token, refresh_token): logging.info("认证信息更新成功") # 重置机器码 logging.info("正在重置机器码...") reset_machine_id(greater_than_0_45) logging.info("所有操作已完成") print_end_message() else: logging.error("更新认证信息失败") except Exception as e: logging.error(f"程序执行出现错误: {str(e)}") import traceback logging.error(traceback.format_exc()) finally: if browser_manager: browser_manager.quit() input("\n程序执行完毕,按回车键退出...")