chore: 整理代码
This commit is contained in:
@@ -11,7 +11,7 @@ import os
|
||||
import sys
|
||||
import logging
|
||||
from browser_utils import BrowserManager
|
||||
from get_veri_code import EmailVerificationHandler
|
||||
from get_email_code import EmailVerificationHandler
|
||||
|
||||
# 在文件开头设置日志
|
||||
logging.basicConfig(
|
||||
|
||||
@@ -1,229 +0,0 @@
|
||||
import requests
|
||||
import json
|
||||
import os
|
||||
import platform
|
||||
import uuid
|
||||
import hashlib
|
||||
from datetime import datetime
|
||||
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
|
||||
from cryptography.hazmat.backends import default_backend
|
||||
import base64
|
||||
import time
|
||||
from Crypto.Cipher import AES
|
||||
import binascii
|
||||
|
||||
|
||||
class LicenseManager:
|
||||
def __init__(self):
|
||||
# 根据不同操作系统获取适当的配置目录
|
||||
if platform.system() == "Windows":
|
||||
config_dir = os.getenv("APPDATA")
|
||||
elif platform.system() == "Darwin": # macOS
|
||||
config_dir = os.path.expanduser("~/Library/Application Support")
|
||||
else: # Linux 和其他类 Unix 系统
|
||||
config_dir = os.path.expanduser("~/.config")
|
||||
|
||||
self.license_file = os.path.join(config_dir, "CursorPro", "license.json")
|
||||
# self.activation_url = "http://cursor.chengazhen.me/activate"
|
||||
# self.verify_url = "http://cursor.chengazhen.me/verify"
|
||||
self.activation_url = "http://119.8.35.41:3004/activate"
|
||||
self.verify_url = "http://119.8.35.41:3004/verify"
|
||||
self.key = b"Kj8nP9x2Qs5mY7vR4wL1hC3fA6tD0iB8"
|
||||
self.encryption_key = b"f1e2d3c4b5a6978899aabbccddeeff00112233445566778899aabbccddeeff00" # 与服务器端相同的密钥
|
||||
|
||||
def encrypt(self, text):
|
||||
"""使用AES-256-CBC加密"""
|
||||
cipher = Cipher(
|
||||
algorithms.AES(self.key),
|
||||
modes.CBC(self.key[:16]),
|
||||
backend=default_backend(),
|
||||
)
|
||||
encryptor = cipher.encryptor()
|
||||
|
||||
# 添加PKCS7填充
|
||||
length = 16 - (len(text) % 16)
|
||||
text += bytes([length]) * length
|
||||
|
||||
encrypted = encryptor.update(text) + encryptor.finalize()
|
||||
return base64.b64encode(encrypted).decode("utf-8")
|
||||
|
||||
def decrypt(self, encrypted_text):
|
||||
"""使用AES-256-CBC解密"""
|
||||
encrypted = base64.b64decode(encrypted_text)
|
||||
cipher = Cipher(
|
||||
algorithms.AES(self.key),
|
||||
modes.CBC(self.key[:16]),
|
||||
backend=default_backend(),
|
||||
)
|
||||
decryptor = cipher.decryptor()
|
||||
|
||||
decrypted = decryptor.update(encrypted) + decryptor.finalize()
|
||||
|
||||
# 移除PKCS7填充
|
||||
padding_length = decrypted[-1]
|
||||
return decrypted[:-padding_length]
|
||||
|
||||
def get_hardware_info(self):
|
||||
"""获取硬件信息作为机器码"""
|
||||
system_info = {
|
||||
"platform": platform.platform(),
|
||||
"machine": platform.machine(),
|
||||
"processor": platform.processor(),
|
||||
"hostname": platform.node(),
|
||||
}
|
||||
|
||||
# 获取MAC地址
|
||||
mac = ":".join(
|
||||
[
|
||||
"{:02x}".format((uuid.getnode() >> elements) & 0xFF)
|
||||
for elements in range(0, 2 * 6, 2)
|
||||
][::-1]
|
||||
)
|
||||
system_info["mac"] = mac
|
||||
|
||||
# 生成机器码
|
||||
machine_code = hashlib.md5(json.dumps(system_info).encode()).hexdigest()
|
||||
return machine_code
|
||||
|
||||
def activate_license(self, license_key):
|
||||
"""在线激活许可证"""
|
||||
try:
|
||||
machine_code = self.get_hardware_info()
|
||||
activation_data = {
|
||||
"license_key": license_key,
|
||||
"machine_code": machine_code,
|
||||
"activation_date": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
|
||||
}
|
||||
|
||||
try:
|
||||
response = requests.post(
|
||||
self.activation_url, json=activation_data, timeout=10
|
||||
)
|
||||
if response.status_code == 200:
|
||||
try:
|
||||
encrypted_response = response.json()
|
||||
result = self.decrypt_response(
|
||||
encrypted_response["encrypted_data"]
|
||||
)
|
||||
if result.get("code") == 0:
|
||||
license_data = {
|
||||
"license_key": license_key,
|
||||
"machine_code": machine_code,
|
||||
"activation_date": activation_data["activation_date"],
|
||||
"expiry_date": result.get("data").get("expiry_date"),
|
||||
"is_active": True,
|
||||
}
|
||||
self._save_license(license_data)
|
||||
return True, "激活成功!"
|
||||
else:
|
||||
error_msg = result.get("message", "激活失败")
|
||||
return False, error_msg
|
||||
except Exception as e:
|
||||
return False, f"解密响应失败: {str(e)}"
|
||||
|
||||
else:
|
||||
return False, f"服务器响应错误: HTTP {response.status_code}"
|
||||
|
||||
except requests.exceptions.ConnectionError:
|
||||
return False, "无法连接到激活服务器,请检查网络连接"
|
||||
except requests.exceptions.Timeout:
|
||||
return False, "服务器响应超时"
|
||||
except Exception as e:
|
||||
return False, f"激活请求失败: {str(e)}"
|
||||
|
||||
except Exception as e:
|
||||
return False, f"激活过程出错: {str(e)}"
|
||||
|
||||
def decrypt_response(self, encrypted_data):
|
||||
"""解密服务器响应的数据"""
|
||||
try:
|
||||
# 分离 IV 和加密数据
|
||||
iv_hex, encrypted_text = encrypted_data.split(":")
|
||||
|
||||
# 将十六进制转换为字节
|
||||
iv = binascii.unhexlify(iv_hex)
|
||||
encrypted_bytes = binascii.unhexlify(encrypted_text)
|
||||
|
||||
# 创建解密器
|
||||
cipher = AES.new(binascii.unhexlify(self.encryption_key), AES.MODE_CBC, iv)
|
||||
|
||||
# 解密数据
|
||||
decrypted_bytes = cipher.decrypt(encrypted_bytes)
|
||||
|
||||
# 移除填充
|
||||
padding_length = decrypted_bytes[-1]
|
||||
decrypted_data = decrypted_bytes[:-padding_length]
|
||||
|
||||
# 转换为字符串并解析 JSON
|
||||
decrypted_str = decrypted_data.decode("utf-8")
|
||||
return json.loads(decrypted_str)
|
||||
|
||||
except Exception as e:
|
||||
raise Exception(f"解密失败: {str(e)}")
|
||||
|
||||
def verify_license(self):
|
||||
"""验证许可证"""
|
||||
try:
|
||||
if not os.path.exists(self.license_file):
|
||||
return False, "未找到许可证文件"
|
||||
|
||||
license_data = self._load_license()
|
||||
if not license_data:
|
||||
return False, "许可证文件无效"
|
||||
|
||||
current_machine = self.get_hardware_info()
|
||||
if current_machine != license_data.get("machine_code"):
|
||||
return False, "硬件信息不匹配"
|
||||
|
||||
verify_data = {
|
||||
"license_key": license_data.get("license_key"),
|
||||
"machine_code": current_machine,
|
||||
}
|
||||
|
||||
try:
|
||||
response = requests.post(self.verify_url, json=verify_data, timeout=30)
|
||||
time.sleep(2)
|
||||
|
||||
if response.status_code == 200:
|
||||
try:
|
||||
encrypted_response = response.json()
|
||||
result = self.decrypt_response(
|
||||
encrypted_response["encrypted_data"]
|
||||
)
|
||||
if result.get("code") == 0:
|
||||
return True, "许可证有效"
|
||||
return False, result.get("message", "许可证无效")
|
||||
except Exception as e:
|
||||
return False, f"解密响应失败: {str(e)}"
|
||||
|
||||
return False, f"服务器响应错误: HTTP {response.status_code}"
|
||||
|
||||
except requests.exceptions.ConnectionError:
|
||||
return False, "无法连接到验证服务器,请检查网络连接"
|
||||
except requests.exceptions.Timeout:
|
||||
return False, "服务器响应超时"
|
||||
except Exception as e:
|
||||
return False, f"验证请求失败: {str(e)}"
|
||||
|
||||
except Exception as e:
|
||||
return False, f"验证过程出错: {str(e)}"
|
||||
|
||||
def _save_license(self, license_data):
|
||||
"""加密保存许可证数据"""
|
||||
try:
|
||||
os.makedirs(os.path.dirname(self.license_file), exist_ok=True)
|
||||
encrypted_data = self.encrypt(json.dumps(license_data).encode())
|
||||
with open(self.license_file, "w") as f:
|
||||
f.write(encrypted_data)
|
||||
except Exception as e:
|
||||
pass
|
||||
|
||||
def _load_license(self):
|
||||
"""加密读取许可证数据"""
|
||||
try:
|
||||
with open(self.license_file, "r") as f:
|
||||
encrypted_data = f.read()
|
||||
decrypted_data = self.decrypt(encrypted_data)
|
||||
return json.loads(decrypted_data)
|
||||
except Exception:
|
||||
return None
|
||||
@@ -13,6 +13,7 @@ et_xmlfile==2.0.0
|
||||
filelock==3.16.1
|
||||
idna==3.10
|
||||
lxml==5.3.0
|
||||
macholib==1.16.3
|
||||
openpyxl==3.1.5
|
||||
packaging==24.2
|
||||
pefile==2023.2.7
|
||||
|
||||
@@ -1,46 +0,0 @@
|
||||
from cursor_auth_manager import CursorAuthManager
|
||||
|
||||
def update_cursor_auth(email=None, access_token=None, refresh_token=None):
|
||||
"""
|
||||
更新Cursor的认证信息的便捷函数
|
||||
"""
|
||||
auth_manager = CursorAuthManager()
|
||||
return auth_manager.update_auth(email, access_token, refresh_token)
|
||||
|
||||
def main():
|
||||
# 示例用法
|
||||
print("请选择要更新的项目:")
|
||||
print("1. 更新邮箱")
|
||||
print("2. 更新访问令牌")
|
||||
print("3. 更新刷新令牌")
|
||||
print("4. 更新多个值")
|
||||
print("0. 退出")
|
||||
|
||||
choice = input("\n请输入选项数字: ")
|
||||
|
||||
if choice == "1":
|
||||
email = input("请输入新的邮箱: ")
|
||||
update_cursor_auth(email=email)
|
||||
elif choice == "2":
|
||||
token = input("请输入新的访问令牌: ")
|
||||
update_cursor_auth(access_token=token)
|
||||
elif choice == "3":
|
||||
token = input("请输入新的刷新令牌: ")
|
||||
update_cursor_auth(refresh_token=token)
|
||||
elif choice == "4":
|
||||
email = input("请输入新的邮箱 (直接回车跳过): ")
|
||||
access_token = input("请输入新的访问令牌 (直接回车跳过): ")
|
||||
refresh_token = input("请输入新的刷新令牌 (直接回车跳过): ")
|
||||
|
||||
update_cursor_auth(
|
||||
email=email if email else None,
|
||||
access_token=access_token if access_token else None,
|
||||
refresh_token=refresh_token if refresh_token else None
|
||||
)
|
||||
elif choice == "0":
|
||||
print("退出程序")
|
||||
else:
|
||||
print("无效的选项")
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
Reference in New Issue
Block a user