Files
macm1new/cursor_account_manager.py

472 lines
17 KiB
Python
Raw Permalink Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import os
import platform
import json
import sys
import hashlib
import subprocess
import uuid
import requests
from exit_cursor import ExitCursor
from logger import logging
from cursor_auth_manager import CursorAuthManager
import go_cursor_help
import patch_cursor_get_machine_id
from reset_machine import MachineIDResetter
from logo import print_logo
from typing import Optional, Tuple, Dict, Any
from urllib3.exceptions import InsecureRequestWarning
import urllib3
import time
# 禁用不安全请求警告
requests.packages.urllib3.disable_warnings(InsecureRequestWarning)
# 定义 EMOJI 字典
EMOJI = {"ERROR": "", "WARNING": "⚠️", "INFO": ""}
def check_cursor_version():
"""检查cursor版本"""
pkg_path, main_path = patch_cursor_get_machine_id.get_cursor_paths()
with open(pkg_path, "r", encoding="utf-8") as f:
version = json.load(f)["version"]
return patch_cursor_get_machine_id.version_check(version, min_version="0.45.0")
def reset_machine_id(greater_than_0_45):
if greater_than_0_45:
# 提示请手动执行脚本 https://github.com/chengazhen/cursor-auto-free/blob/main/patch_cursor_get_machine_id.py
go_cursor_help.go_cursor_help()
else:
MachineIDResetter().reset_machine_ids()
def print_end_message():
logging.info("\n\n\n\n\n")
logging.info("=" * 30)
logging.info("所有操作已完成")
def get_account_from_api() -> tuple[bool, dict]:
"""从API获取账号信息
Returns:
tuple[bool, dict]: (是否成功, 账号信息)
"""
try:
# 获取设备唯一ID
hardware_id = get_mac_unique_id()
logging.info(f"设备唯一ID: {hardware_id}")
endpoint = "https://cursorapi.nosqli.com/admin/api.account/getUnused"
data = {
"machine_id": hardware_id
}
headers = {
"Content-Type": "application/json"
}
import urllib3
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
request_kwargs = {
"json": data,
"headers": headers,
"timeout": 30,
"verify": False
}
try:
try:
response = requests.post(endpoint, **request_kwargs)
except requests.exceptions.SSLError:
import ssl
ssl_context = ssl.create_default_context()
ssl_context.check_hostname = False
ssl_context.verify_mode = ssl.CERT_NONE
session = requests.Session()
session.verify = False
response = session.post(endpoint, **request_kwargs)
response_data = response.json()
if response_data.get("code") == 200:
account_data = response_data.get("data", {})
return True, account_data
else:
error_msg = response_data.get("msg", "未知错误")
logging.error(f"获取未使用账号失败: {error_msg}")
return False, {}
except Exception as e:
logging.error(f"API请求失败: {str(e)}")
return False, {}
except Exception as e:
logging.error(f"获取账号过程出错: {str(e)}")
return False, {}
def get_mac_unique_id() -> str:
"""
获取Mac设备的唯一ID32位MD5
组合以下信息生成唯一标识:
1. 硬件UUID
2. 系统序列号
3. 主板序列号
4. CPU信息
"""
def run_cmd(cmd: str) -> str:
try:
result = subprocess.check_output(cmd, shell=True).decode('utf-8').strip()
return result
except:
return ""
# 收集系统信息
identifiers = []
# 1. 获取硬件UUID
hw_uuid = run_cmd("ioreg -d2 -c IOPlatformExpertDevice | awk -F\\\" '/IOPlatformUUID/{print $(NF-1)}'")
identifiers.append(hw_uuid)
# 2. 获取系统序列号
serial = run_cmd("ioreg -d2 -c IOPlatformExpertDevice | awk -F\\\" '/IOPlatformSerialNumber/{print $(NF-1)}'")
identifiers.append(serial)
# 3. 获取主板信息
board_id = run_cmd("ioreg -d2 -c IOPlatformExpertDevice | awk -F\\\" '/board-id/{print $(NF-1)}'")
identifiers.append(board_id)
# 4. 获取CPU信息
cpu_info = run_cmd("sysctl -n machdep.cpu.brand_string")
identifiers.append(cpu_info)
# 如果以上方法都失败,使用备用方法
if not any(identifiers):
# 使用 UUID 模块获取 UUID不太稳定仅作为备用
identifiers = [
str(uuid.getnode()), # MAC 地址的整数表示
platform.machine(), # CPU 架构
platform.system(), # 操作系统名称
platform.version() # 操作系统版本
]
# 组合所有标识符并生成MD5
unique_string = "".join(filter(None, identifiers))
return hashlib.md5(unique_string.encode()).hexdigest()
class CursorAccountManager:
def __init__(self):
self.base_url = "https://cursorapi.nosqli.com"
self.api_endpoints = {
"activate": f"{self.base_url}/admin/api.member/activate",
"status": f"{self.base_url}/admin/api.member/status",
"get_unused": f"{self.base_url}/admin/api.account/getUnused",
"heartbeat": f"{self.base_url}/admin/api.member/heartbeat"
}
self.hardware_id = get_mac_unique_id()
def get_device_info(self) -> dict:
"""获取设备信息"""
return {
"system": platform.system(),
"device_name": platform.node(),
"ip": self._get_ip_address(),
"location": self._get_location()
}
def _get_ip_address(self) -> str:
"""获取IP地址"""
try:
response = requests.get('https://api.ipify.org?format=json', timeout=5)
return response.json()['ip']
except:
return "未知"
def _get_location(self) -> str:
"""获取地理位置"""
try:
ip = self._get_ip_address()
if ip != "未知":
response = requests.get(f'http://ip-api.com/json/{ip}', timeout=5)
data = response.json()
if data.get('status') == 'success':
return f"{data.get('country', '')} {data.get('city', '')}"
except:
pass
return "未知"
def check_member_status(self) -> tuple[bool, dict]:
"""检查会员状态
Returns:
tuple[bool, dict]: (是否成功, 状态信息)
"""
try:
data = {
"machine_id": self.hardware_id
}
api_url = self.api_endpoints["status"]
logging.info(f"正在检查会员状态...")
request_kwargs = {
"json": data,
"headers": {"Content-Type": "application/json"},
"timeout": 2,
"verify": False
}
session = requests.Session()
session.verify = False
try:
response = session.post(api_url, **request_kwargs)
except requests.exceptions.Timeout:
logging.warning("首次请求超时,正在重试...")
response = session.post(api_url, **request_kwargs)
result = response.json()
logging.info(f"状态检查响应: {result}")
if result.get("code") in [1, 200]:
api_data = result.get("data", {})
status_data = {
"is_active": api_data.get("status") == "active",
"expire_time": api_data.get("expire_time", ""),
"total_days": api_data.get("total_days", 0),
"days_left": api_data.get("days_left", 0),
"device_info": self.get_device_info()
}
return True, status_data
else:
error_msg = result.get("msg", "未知错误")
logging.error(f"获取状态失败: {error_msg}")
return False, {
"is_active": False,
"expire_time": "",
"total_days": 0,
"days_left": 0,
"device_info": self.get_device_info()
}
except Exception as e:
logging.error(f"获取会员状态失败: {str(e)}")
return False, {
"is_active": False,
"expire_time": "",
"total_days": 0,
"days_left": 0,
"device_info": self.get_device_info()
}
def check_activation_code(self, code: str) -> tuple[bool, str, dict | None]:
"""检查激活码
Args:
code: 激活码
Returns:
tuple: (成功标志, 消息, 账号信息)
"""
max_retries = 3 # 最大重试次数
retry_delay = 1 # 重试间隔(秒)
for attempt in range(max_retries):
try:
data = {
"machine_id": self.hardware_id,
"code": code
}
# 设置请求参数
request_kwargs = {
"json": data,
"headers": {
"Content-Type": "application/json",
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/121.0.0.0 Safari/537.36",
"Accept": "*/*",
"Connection": "keep-alive"
},
"timeout": 10,
"verify": False
}
# 创建session
session = requests.Session()
session.verify = False
# 设置重试策略
retry_strategy = urllib3.Retry(
total=3,
backoff_factor=0.5,
status_forcelist=[500, 502, 503, 504]
)
adapter = requests.adapters.HTTPAdapter(max_retries=retry_strategy)
session.mount("http://", adapter)
session.mount("https://", adapter)
try:
# 尝试发送请求
response = session.post(
self.api_endpoints["activate"],
**request_kwargs
)
response.raise_for_status()
result = response.json()
logging.info(f"激活响应: {result}")
# 激活成功
if result["code"] == 200:
api_data = result["data"]
account_info = {
"status": "active",
"expire_time": api_data.get("expire_time", ""),
"total_days": api_data.get("total_days", 0),
"days_left": api_data.get("days_left", 0),
"device_info": self.get_device_info()
}
return True, result["msg"], account_info
# 激活码无效或已被使用
elif result["code"] == 400:
logging.warning(f"激活码无效或已被使用: {result.get('msg', '未知错误')}")
return False, result.get("msg", "激活码无效或已被使用"), None
# 其他错误情况
else:
error_msg = result.get("msg", "未知错误")
if attempt < max_retries - 1:
logging.warning(f"{attempt + 1}次尝试失败: {error_msg}, 准备重试...")
time.sleep(retry_delay)
continue
logging.error(f"激活失败: {error_msg}")
return False, error_msg, None
except requests.exceptions.RequestException as e:
if attempt < max_retries - 1:
logging.warning(f"{attempt + 1}次网络请求失败: {str(e)}, 准备重试...")
time.sleep(retry_delay)
continue
logging.error(f"网络请求失败: {str(e)}")
return False, f"网络连接失败: {str(e)}", None
except Exception as e:
if attempt < max_retries - 1:
logging.warning(f"{attempt + 1}次请求发生错误: {str(e)}, 准备重试...")
time.sleep(retry_delay)
continue
logging.error(f"激活失败: {str(e)}")
return False, f"激活失败: {str(e)}", None
return False, "多次尝试后激活失败,请检查网络连接或稍后重试", None
def reset_auth_with_password(password: str = None) -> tuple[bool, str]:
"""
封装重置授权的完整流程
Args:
password: 系统密码(可选)
Returns:
tuple[bool, str]: (是否成功, 消息)
"""
try:
logging.info("\n=== 初始化重置流程 ===")
greater_than_0_45 = check_cursor_version()
logging.info("正在从API获取账号信息...")
success, account_data = get_account_from_api()
if not success:
return False, "获取账号信息失败"
email = account_data.get("email", "")
access_token = account_data.get("access_token", "")
refresh_token = account_data.get("refresh_token", "")
if not all([email, access_token, refresh_token]):
return False, "账号信息不完整"
logging.info(f"获取到账号信息:\n邮箱: {email}")
# 更新认证信息
logging.info("正在更新认证信息...")
auth_manager = CursorAuthManager()
if auth_manager.update_auth(email, access_token, refresh_token):
logging.info("认证信息更新成功")
# 重置机器码
logging.info("正在重置机器码...")
# 如果提供了密码,设置环境变量
if password:
import os
os.environ['SUDO_PASSWORD'] = password
reset_machine_id(greater_than_0_45)
logging.info("重置完成")
return True, "重置成功"
else:
return False, "更新认证信息失败"
except Exception as e:
logging.error(f"重置过程出错: {str(e)}")
import traceback
logging.error(traceback.format_exc())
return False, f"重置失败: {str(e)}"
if __name__ == "__main__":
print_logo()
greater_than_0_45 = check_cursor_version()
browser_manager = None
try:
logging.info("\n=== 初始化程序 ===")
# ExitCursor()
logging.info("正在从API获取账号信息...")
success, account_data = get_account_from_api()
if not success:
logging.error("获取账号信息失败")
sys.exit(1)
email = account_data.get("email", "")
access_token = account_data.get("access_token", "")
refresh_token = account_data.get("refresh_token", "")
expire_time = account_data.get("expire_time", "")
days_left = account_data.get("days_left", 0)
if not all([email, access_token, refresh_token]):
logging.error("账号信息不完整")
sys.exit(1)
logging.info(f"获取到账号信息:\n邮箱: {email}\n到期时间: {expire_time}\n剩余天数: {days_left}")
# 更新认证信息
logging.info("正在更新认证信息...")
auth_manager = CursorAuthManager()
if auth_manager.update_auth(email, access_token, refresh_token):
logging.info("认证信息更新成功")
# 重置机器码
logging.info("正在重置机器码...")
reset_machine_id(greater_than_0_45)
logging.info("所有操作已完成")
print_end_message()
else:
logging.error("更新认证信息失败")
except Exception as e:
logging.error(f"程序执行出现错误: {str(e)}")
import traceback
logging.error(traceback.format_exc())
finally:
if browser_manager:
browser_manager.quit()
input("\n程序执行完毕,按回车键退出...")