420 lines
14 KiB
Python
420 lines
14 KiB
Python
"""通用工具函数库"""
|
||
|
||
import os
|
||
import sys
|
||
import hashlib
|
||
import subprocess
|
||
import platform
|
||
import json
|
||
import requests
|
||
import urllib3
|
||
import uuid
|
||
import winreg
|
||
import sqlite3
|
||
from typing import List, Tuple, Optional, Dict
|
||
from datetime import datetime
|
||
|
||
from logger import logger
|
||
from config import config
|
||
|
||
# 获取日志记录器
|
||
_logger = logger.get_logger("CommonUtils")
|
||
|
||
# 全局用户状态
|
||
_global_user_state = {
|
||
"is_activated": False,
|
||
"expire_time": None,
|
||
"days_left": 0,
|
||
"total_days": 0,
|
||
"device_info": {},
|
||
"status": "inactive"
|
||
}
|
||
|
||
# 设备信息缓存
|
||
_device_info_cache = None
|
||
|
||
def _get_cached_device_info() -> Dict:
|
||
"""
|
||
获取设备信息(使用缓存)
|
||
只在首次调用时获取系统信息,后续使用缓存
|
||
|
||
Returns:
|
||
Dict: 设备基础信息
|
||
"""
|
||
global _device_info_cache
|
||
|
||
if _device_info_cache is None:
|
||
try:
|
||
# 基础系统信息
|
||
device_info = {
|
||
"os": platform.system(),
|
||
"device_name": platform.node(),
|
||
"ip": "未知",
|
||
"location": "未知"
|
||
}
|
||
|
||
# 获取IP和位置信息
|
||
try:
|
||
# 使用ip-api.com的免费API获取IP和位置信息
|
||
response = requests.get('http://ip-api.com/json/?lang=zh-CN', timeout=5)
|
||
if response.status_code == 200:
|
||
data = response.json()
|
||
if data.get('status') == 'success':
|
||
device_info.update({
|
||
"ip": data.get('query', '未知'),
|
||
"location": f"{data.get('country', '')} {data.get('regionName', '')} {data.get('city', '')}"
|
||
})
|
||
except Exception as e:
|
||
_logger.warning(f"获取IP和位置信息失败: {str(e)}")
|
||
|
||
_device_info_cache = device_info
|
||
_logger.debug("已初始化设备信息缓存")
|
||
|
||
except Exception as e:
|
||
_logger.error(f"获取设备信息失败: {str(e)}")
|
||
_device_info_cache = {
|
||
"os": "Windows",
|
||
"device_name": "未知",
|
||
"ip": "未知",
|
||
"location": "未知"
|
||
}
|
||
|
||
return _device_info_cache.copy()
|
||
|
||
def update_user_state(state_data: Dict) -> None:
|
||
"""
|
||
更新全局用户状态
|
||
Args:
|
||
state_data: 新的状态数据
|
||
"""
|
||
global _global_user_state
|
||
|
||
# 保留现有的设备信息
|
||
current_device_info = _global_user_state.get("device_info", {})
|
||
|
||
# 更新状态数据
|
||
_global_user_state.update(state_data)
|
||
|
||
# 合并设备信息
|
||
if "device_info" in state_data:
|
||
new_device_info = state_data["device_info"]
|
||
# 只更新可能变动的信息(IP和地区)
|
||
current_device_info.update({
|
||
"ip": new_device_info.get("ip", current_device_info.get("ip", "未知")),
|
||
"location": new_device_info.get("location", current_device_info.get("location", "未知"))
|
||
})
|
||
_global_user_state["device_info"] = current_device_info
|
||
|
||
_logger.info(f"用户状态已更新: {state_data}")
|
||
|
||
def get_user_state() -> Dict:
|
||
"""
|
||
获取当前用户状态
|
||
Returns:
|
||
Dict: 用户状态信息
|
||
"""
|
||
return _global_user_state.copy()
|
||
|
||
def check_user_state() -> bool:
|
||
"""
|
||
检查用户是否处于可操作状态
|
||
Returns:
|
||
bool: 是否可以操作
|
||
"""
|
||
return _global_user_state.get("is_activated", False)
|
||
|
||
def refresh_user_state(machine_id: str) -> Tuple[bool, str, Dict]:
|
||
"""
|
||
检查设备激活状态
|
||
|
||
通过设备ID向服务器请求验证设备的激活状态、剩余时间等信息。
|
||
服务器会进行以下检查:
|
||
1. 设备ID是否合法
|
||
2. 是否已激活
|
||
3. 激活是否过期
|
||
4. 剩余使用时间
|
||
|
||
Args:
|
||
machine_id: 设备唯一标识
|
||
|
||
Returns:
|
||
Tuple[bool, str, Dict]:
|
||
- bool: 是否验证通过
|
||
- str: 状态消息
|
||
- Dict: 状态数据
|
||
"""
|
||
try:
|
||
# 禁用SSL警告
|
||
urllib3.disable_warnings()
|
||
|
||
# 准备请求数据
|
||
data = {"machine_id": machine_id}
|
||
|
||
# 发送状态检查请求
|
||
response = requests.post(
|
||
config.status_url,
|
||
json=data,
|
||
headers={"Content-Type": "application/json"},
|
||
timeout=30,
|
||
verify=False
|
||
)
|
||
|
||
# 解析响应
|
||
result = response.json()
|
||
|
||
if result.get("code") in [1, 200]:
|
||
api_data = result.get("data", {})
|
||
|
||
# 获取缓存的设备信息
|
||
device_info = _get_cached_device_info()
|
||
# 只更新可能变动的信息
|
||
device_info.update({
|
||
"ip": api_data.get("ip", device_info["ip"]),
|
||
"location": api_data.get("location", device_info["location"])
|
||
})
|
||
|
||
# 更新状态数据
|
||
state_data = {
|
||
"is_activated": api_data.get("status") == "active",
|
||
"status": api_data.get("status", "inactive"),
|
||
"expire_time": api_data.get("expire_time", ""),
|
||
"days_left": api_data.get("days_left", 0),
|
||
"total_days": api_data.get("total_days", 0),
|
||
"device_info": device_info
|
||
}
|
||
|
||
# 更新全局状态
|
||
update_user_state(state_data)
|
||
|
||
# 根据状态返回对应消息
|
||
if state_data["is_activated"]:
|
||
msg = f"设备已激活,剩余{state_data['days_left']}天"
|
||
else:
|
||
if api_data.get("status") == "expired":
|
||
msg = "设备授权已过期"
|
||
else:
|
||
msg = "设备未激活"
|
||
|
||
return True, msg, state_data
|
||
|
||
else:
|
||
error_msg = result.get("msg", "未知错误")
|
||
_logger.error(f"设备状态检查失败: {error_msg}")
|
||
return False, f"状态检查失败: {error_msg}", {}
|
||
|
||
except Exception as e:
|
||
error_msg = str(e)
|
||
_logger.error(f"设备状态检查异常: {error_msg}")
|
||
return False, f"状态检查异常: {error_msg}", {}
|
||
|
||
def get_hardware_id() -> str:
|
||
"""获取硬件唯一标识
|
||
方案1: CPU ID + 主板序列号 + BIOS序列号
|
||
方案2: 系统盘序列号 + Windows安装时间
|
||
方案3: 计算机名(最后的备选方案)
|
||
|
||
Returns:
|
||
str: 硬件ID的MD5哈希值
|
||
|
||
Raises:
|
||
RuntimeError: 当无法获取任何可用的硬件信息时
|
||
"""
|
||
try:
|
||
# 创建startupinfo对象来隐藏命令行窗口
|
||
startupinfo = None
|
||
if sys.platform == "win32":
|
||
startupinfo = subprocess.STARTUPINFO()
|
||
startupinfo.dwFlags |= subprocess.STARTF_USESHOWWINDOW
|
||
startupinfo.wShowWindow = subprocess.SW_HIDE
|
||
|
||
# 方案1: 尝试获取硬件信息
|
||
try:
|
||
# 获取CPU ID
|
||
cpu_info = subprocess.check_output('wmic cpu get ProcessorId', startupinfo=startupinfo).decode()
|
||
cpu_id = cpu_info.split('\n')[1].strip()
|
||
|
||
# 获取主板序列号
|
||
board_info = subprocess.check_output('wmic baseboard get SerialNumber', startupinfo=startupinfo).decode()
|
||
board_id = board_info.split('\n')[1].strip()
|
||
|
||
# 获取BIOS序列号
|
||
bios_info = subprocess.check_output('wmic bios get SerialNumber', startupinfo=startupinfo).decode()
|
||
bios_id = bios_info.split('\n')[1].strip()
|
||
|
||
# 如果所有信息都获取成功且有效
|
||
if all([cpu_id, board_id, bios_id]) and not all(x in ['', '0', 'None', 'To be filled by O.E.M.'] for x in [cpu_id, board_id, bios_id]):
|
||
combined = f"{cpu_id}:{board_id}:{bios_id}"
|
||
hardware_id = hashlib.md5(combined.encode()).hexdigest()
|
||
_logger.info("使用硬件信息生成ID成功")
|
||
return hardware_id
|
||
|
||
except Exception as e:
|
||
_logger.warning(f"方案1失败: {str(e)}")
|
||
|
||
# 方案2: 系统盘序列号 + Windows安装时间
|
||
try:
|
||
backup_info = []
|
||
|
||
# 获取系统盘序列号
|
||
volume_info = subprocess.check_output('wmic logicaldisk where "DeviceID=\'C:\'" get VolumeSerialNumber', startupinfo=startupinfo).decode()
|
||
volume_serial = volume_info.split('\n')[1].strip()
|
||
if volume_serial and volume_serial not in ['', '0']:
|
||
backup_info.append(("volume", volume_serial))
|
||
|
||
# 获取Windows安装时间
|
||
os_info = subprocess.check_output('wmic os get InstallDate', startupinfo=startupinfo).decode()
|
||
install_date = os_info.split('\n')[1].strip()
|
||
if install_date:
|
||
backup_info.append(("install", install_date))
|
||
|
||
if backup_info:
|
||
combined = "|".join(f"{k}:{v}" for k, v in sorted(backup_info))
|
||
hardware_id = hashlib.md5(combined.encode()).hexdigest()
|
||
_logger.info("使用系统信息生成ID成功")
|
||
return hardware_id
|
||
|
||
except Exception as e:
|
||
_logger.warning(f"方案2失败: {str(e)}")
|
||
|
||
# 方案3: 使用计算机名(最后的备选方案)
|
||
computer_name = platform.node()
|
||
if computer_name:
|
||
hardware_id = hashlib.md5(computer_name.encode()).hexdigest()
|
||
_logger.info("使用计算机名生成ID成功")
|
||
return hardware_id
|
||
|
||
raise ValueError("无法获取任何可用信息来生成硬件ID")
|
||
|
||
except Exception as e:
|
||
error_msg = f"生成硬件ID失败: {str(e)}"
|
||
_logger.error(error_msg)
|
||
raise RuntimeError(error_msg)
|
||
|
||
def verify_hardware_id(stored_id: str) -> bool:
|
||
"""
|
||
验证当前硬件ID是否与存储的ID匹配
|
||
|
||
Args:
|
||
stored_id: 存储的硬件ID
|
||
|
||
Returns:
|
||
bool: 如果匹配返回True,否则返回False
|
||
"""
|
||
try:
|
||
current_id = get_hardware_id()
|
||
return current_id == stored_id
|
||
except Exception as e:
|
||
_logger.error(f"验证硬件ID失败: {str(e)}")
|
||
return False
|
||
|
||
def get_system_info() -> dict:
|
||
"""
|
||
获取系统信息
|
||
|
||
Returns:
|
||
dict: 包含系统信息的字典
|
||
"""
|
||
info = {
|
||
"os": platform.system(),
|
||
"os_version": platform.version(),
|
||
"machine": platform.machine(),
|
||
"processor": platform.processor(),
|
||
"node": platform.node()
|
||
}
|
||
|
||
if sys.platform == "win32":
|
||
try:
|
||
# 获取更多Windows特定信息
|
||
startupinfo = subprocess.STARTUPINFO()
|
||
startupinfo.dwFlags |= subprocess.STARTF_USESHOWWINDOW
|
||
startupinfo.wShowWindow = subprocess.SW_HIDE
|
||
|
||
# Windows产品ID
|
||
windows_info = subprocess.check_output('wmic os get SerialNumber', startupinfo=startupinfo).decode()
|
||
info["windows_id"] = windows_info.split('\n')[1].strip()
|
||
|
||
except Exception as e:
|
||
_logger.warning(f"获取Windows特定信息失败: {str(e)}")
|
||
|
||
return info
|
||
|
||
def activate_device(machine_id: str, activation_code: str) -> Tuple[bool, str, Optional[Dict]]:
|
||
"""
|
||
激活设备
|
||
|
||
Args:
|
||
machine_id: 设备ID
|
||
activation_code: 激活码
|
||
|
||
Returns:
|
||
Tuple[bool, str, Optional[Dict]]:
|
||
- bool: 是否成功
|
||
- str: 消息
|
||
- Optional[Dict]: 激活数据
|
||
"""
|
||
try:
|
||
# 禁用SSL警告
|
||
urllib3.disable_warnings()
|
||
|
||
# 准备请求数据
|
||
data = {
|
||
"machine_id": machine_id,
|
||
"code": activation_code
|
||
}
|
||
|
||
# 发送激活请求
|
||
response = requests.post(
|
||
config.activate_url,
|
||
json=data,
|
||
headers={
|
||
"Content-Type": "application/json",
|
||
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36",
|
||
"Accept": "*/*"
|
||
},
|
||
timeout=30,
|
||
verify=False
|
||
)
|
||
|
||
# 解析响应
|
||
result = response.json()
|
||
|
||
if result.get("code") == 200:
|
||
api_data = result.get("data", {})
|
||
|
||
# 获取缓存的设备信息
|
||
device_info = _get_cached_device_info()
|
||
|
||
# 构造状态数据
|
||
state_data = {
|
||
"is_activated": True,
|
||
"status": "active",
|
||
"expire_time": api_data.get("expire_time", ""),
|
||
"days_left": api_data.get("days_left", 0),
|
||
"total_days": api_data.get("total_days", 0),
|
||
"device_info": device_info
|
||
}
|
||
|
||
# 更新全局状态
|
||
update_user_state(state_data)
|
||
|
||
return True, f"激活成功,剩余{state_data['days_left']}天", state_data
|
||
|
||
elif result.get("code") == 400:
|
||
error_msg = result.get("msg", "激活码无效或已被使用")
|
||
_logger.warning(f"激活失败: {error_msg}")
|
||
return False, error_msg, None
|
||
|
||
else:
|
||
error_msg = result.get("msg", "未知错误")
|
||
_logger.error(f"激活失败: {error_msg}")
|
||
return False, f"激活失败: {error_msg}", None
|
||
|
||
except requests.exceptions.RequestException as e:
|
||
error_msg = f"网络连接失败: {str(e)}"
|
||
_logger.error(error_msg)
|
||
return False, error_msg, None
|
||
|
||
except Exception as e:
|
||
error_msg = f"激活失败: {str(e)}"
|
||
_logger.error(error_msg)
|
||
return False, error_msg, None |