feat: 增加加密

This commit is contained in:
chengchongzhen
2024-12-31 12:56:59 +08:00
parent 331ebe035a
commit e8bc3e08f8
3 changed files with 174 additions and 37 deletions

View File

@@ -9,6 +9,8 @@ from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from cryptography.hazmat.backends import default_backend
import base64
import time
from Crypto.Cipher import AES
import binascii
class LicenseManager:
@@ -24,7 +26,11 @@ class LicenseManager:
self.license_file = os.path.join(config_dir, "CursorPro", "license.json")
self.activation_url = "http://119.8.35.41:3003/activate"
self.verify_url = "http://119.8.35.41:3003/verify"
# 开发环境
# self.activation_url = "http://127.0.0.1:3000/activate"
# self.verify_url = "http://127.0.0.1:3000/verify"
self.key = b"Kj8nP9x2Qs5mY7vR4wL1hC3fA6tD0iB8"
self.encryption_key = b"f1e2d3c4b5a6978899aabbccddeeff00112233445566778899aabbccddeeff00" # 与服务器端相同的密钥
def encrypt(self, text):
"""使用AES-256-CBC加密"""
@@ -96,20 +102,27 @@ class LicenseManager:
)
if response.status_code == 200:
result = response.json()
if result.get("success"):
license_data = {
"license_key": license_key,
"machine_code": machine_code,
"activation_date": activation_data["activation_date"],
"expiry_date": result.get("expiry_date"),
"is_active": True,
}
self._save_license(license_data)
return True, "激活成功!"
else:
error_msg = result.get("message", "激活失败")
return False, error_msg
try:
encrypted_response = response.json()
result = self.decrypt_response(
encrypted_response["encrypted_data"]
)
if result.get("success"):
license_data = {
"license_key": license_key,
"machine_code": machine_code,
"activation_date": activation_data["activation_date"],
"expiry_date": result.get("expiry_date"),
"is_active": True,
}
self._save_license(license_data)
return True, "激活成功!"
else:
error_msg = result.get("message", "激活失败")
return False, error_msg
except Exception as e:
return False, f"解密响应失败: {str(e)}"
else:
return False, f"服务器响应错误: HTTP {response.status_code}"
@@ -117,10 +130,39 @@ class LicenseManager:
return False, "无法连接到激活服务器,请检查网络连接"
except requests.exceptions.Timeout:
return False, "服务器响应超时"
except Exception as e:
return False, f"激活请求失败: {str(e)}"
except Exception as e:
return False, f"激活过程出错: {str(e)}"
def decrypt_response(self, encrypted_data):
"""解密服务器响应的数据"""
try:
# 分离 IV 和加密数据
iv_hex, encrypted_text = encrypted_data.split(":")
# 将十六进制转换为字节
iv = binascii.unhexlify(iv_hex)
encrypted_bytes = binascii.unhexlify(encrypted_text)
# 创建解密器
cipher = AES.new(binascii.unhexlify(self.encryption_key), AES.MODE_CBC, iv)
# 解密数据
decrypted_bytes = cipher.decrypt(encrypted_bytes)
# 移除填充
padding_length = decrypted_bytes[-1]
decrypted_data = decrypted_bytes[:-padding_length]
# 转换为字符串并解析 JSON
decrypted_str = decrypted_data.decode("utf-8")
return json.loads(decrypted_str)
except Exception as e:
raise Exception(f"解密失败: {str(e)}")
def verify_license(self):
"""验证许可证"""
try:
@@ -142,18 +184,19 @@ class LicenseManager:
try:
response = requests.post(self.verify_url, json=verify_data, timeout=30)
time.sleep(2)
try:
result = response.json()
except Exception:
return False, "服务器响应格式错误"
if response.status_code == 200:
if result.get("success"):
return True, "许可证有效"
return False, result.get("message", "许可证无效")
try:
encrypted_response = response.json()
result = self.decrypt_response(
encrypted_response["encrypted_data"]
)
if result.get("success"):
return True, "许可证有效"
return False, result.get("message", "许可证无效")
except Exception as e:
return False, f"解密响应失败: {str(e)}"
return False, f"服务器响应错误: HTTP {response.status_code}"