Files
tingquanzhushou/common_utils.py
2025-02-20 20:20:19 +08:00

420 lines
14 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""通用工具函数库"""
import os
import sys
import hashlib
import subprocess
import platform
import json
import requests
import urllib3
import uuid
import winreg
import sqlite3
from typing import List, Tuple, Optional, Dict
from datetime import datetime
from logger import logger
from config import config
# 获取日志记录器
_logger = logger.get_logger("CommonUtils")
# 全局用户状态
_global_user_state = {
"is_activated": False,
"expire_time": None,
"days_left": 0,
"total_days": 0,
"device_info": {},
"status": "inactive"
}
# 设备信息缓存
_device_info_cache = None
def _get_cached_device_info() -> Dict:
"""
获取设备信息(使用缓存)
只在首次调用时获取系统信息,后续使用缓存
Returns:
Dict: 设备基础信息
"""
global _device_info_cache
if _device_info_cache is None:
try:
# 基础系统信息
device_info = {
"os": platform.system(),
"device_name": platform.node(),
"ip": "未知",
"location": "未知"
}
# 获取IP和位置信息
try:
# 使用ip-api.com的免费API获取IP和位置信息
response = requests.get('http://ip-api.com/json/?lang=zh-CN', timeout=5)
if response.status_code == 200:
data = response.json()
if data.get('status') == 'success':
device_info.update({
"ip": data.get('query', '未知'),
"location": f"{data.get('country', '')} {data.get('regionName', '')} {data.get('city', '')}"
})
except Exception as e:
_logger.warning(f"获取IP和位置信息失败: {str(e)}")
_device_info_cache = device_info
_logger.debug("已初始化设备信息缓存")
except Exception as e:
_logger.error(f"获取设备信息失败: {str(e)}")
_device_info_cache = {
"os": "Windows",
"device_name": "未知",
"ip": "未知",
"location": "未知"
}
return _device_info_cache.copy()
def update_user_state(state_data: Dict) -> None:
"""
更新全局用户状态
Args:
state_data: 新的状态数据
"""
global _global_user_state
# 保留现有的设备信息
current_device_info = _global_user_state.get("device_info", {})
# 更新状态数据
_global_user_state.update(state_data)
# 合并设备信息
if "device_info" in state_data:
new_device_info = state_data["device_info"]
# 只更新可能变动的信息(IP和地区)
current_device_info.update({
"ip": new_device_info.get("ip", current_device_info.get("ip", "未知")),
"location": new_device_info.get("location", current_device_info.get("location", "未知"))
})
_global_user_state["device_info"] = current_device_info
_logger.info(f"用户状态已更新: {state_data}")
def get_user_state() -> Dict:
"""
获取当前用户状态
Returns:
Dict: 用户状态信息
"""
return _global_user_state.copy()
def check_user_state() -> bool:
"""
检查用户是否处于可操作状态
Returns:
bool: 是否可以操作
"""
return _global_user_state.get("is_activated", False)
def refresh_user_state(machine_id: str) -> Tuple[bool, str, Dict]:
"""
检查设备激活状态
通过设备ID向服务器请求验证设备的激活状态、剩余时间等信息。
服务器会进行以下检查:
1. 设备ID是否合法
2. 是否已激活
3. 激活是否过期
4. 剩余使用时间
Args:
machine_id: 设备唯一标识
Returns:
Tuple[bool, str, Dict]:
- bool: 是否验证通过
- str: 状态消息
- Dict: 状态数据
"""
try:
# 禁用SSL警告
urllib3.disable_warnings()
# 准备请求数据
data = {"machine_id": machine_id}
# 发送状态检查请求
response = requests.post(
config.status_url,
json=data,
headers={"Content-Type": "application/json"},
timeout=30,
verify=False
)
# 解析响应
result = response.json()
if result.get("code") in [1, 200]:
api_data = result.get("data", {})
# 获取缓存的设备信息
device_info = _get_cached_device_info()
# 只更新可能变动的信息
device_info.update({
"ip": api_data.get("ip", device_info["ip"]),
"location": api_data.get("location", device_info["location"])
})
# 更新状态数据
state_data = {
"is_activated": api_data.get("status") == "active",
"status": api_data.get("status", "inactive"),
"expire_time": api_data.get("expire_time", ""),
"days_left": api_data.get("days_left", 0),
"total_days": api_data.get("total_days", 0),
"device_info": device_info
}
# 更新全局状态
update_user_state(state_data)
# 根据状态返回对应消息
if state_data["is_activated"]:
msg = f"设备已激活,剩余{state_data['days_left']}"
else:
if api_data.get("status") == "expired":
msg = "设备授权已过期"
else:
msg = "设备未激活"
return True, msg, state_data
else:
error_msg = result.get("msg", "未知错误")
_logger.error(f"设备状态检查失败: {error_msg}")
return False, f"状态检查失败: {error_msg}", {}
except Exception as e:
error_msg = str(e)
_logger.error(f"设备状态检查异常: {error_msg}")
return False, f"状态检查异常: {error_msg}", {}
def get_hardware_id() -> str:
"""获取硬件唯一标识
方案1: CPU ID + 主板序列号 + BIOS序列号
方案2: 系统盘序列号 + Windows安装时间
方案3: 计算机名(最后的备选方案)
Returns:
str: 硬件ID的MD5哈希值
Raises:
RuntimeError: 当无法获取任何可用的硬件信息时
"""
try:
# 创建startupinfo对象来隐藏命令行窗口
startupinfo = None
if sys.platform == "win32":
startupinfo = subprocess.STARTUPINFO()
startupinfo.dwFlags |= subprocess.STARTF_USESHOWWINDOW
startupinfo.wShowWindow = subprocess.SW_HIDE
# 方案1: 尝试获取硬件信息
try:
# 获取CPU ID
cpu_info = subprocess.check_output('wmic cpu get ProcessorId', startupinfo=startupinfo).decode()
cpu_id = cpu_info.split('\n')[1].strip()
# 获取主板序列号
board_info = subprocess.check_output('wmic baseboard get SerialNumber', startupinfo=startupinfo).decode()
board_id = board_info.split('\n')[1].strip()
# 获取BIOS序列号
bios_info = subprocess.check_output('wmic bios get SerialNumber', startupinfo=startupinfo).decode()
bios_id = bios_info.split('\n')[1].strip()
# 如果所有信息都获取成功且有效
if all([cpu_id, board_id, bios_id]) and not all(x in ['', '0', 'None', 'To be filled by O.E.M.'] for x in [cpu_id, board_id, bios_id]):
combined = f"{cpu_id}:{board_id}:{bios_id}"
hardware_id = hashlib.md5(combined.encode()).hexdigest()
_logger.info("使用硬件信息生成ID成功")
return hardware_id
except Exception as e:
_logger.warning(f"方案1失败: {str(e)}")
# 方案2: 系统盘序列号 + Windows安装时间
try:
backup_info = []
# 获取系统盘序列号
volume_info = subprocess.check_output('wmic logicaldisk where "DeviceID=\'C:\'" get VolumeSerialNumber', startupinfo=startupinfo).decode()
volume_serial = volume_info.split('\n')[1].strip()
if volume_serial and volume_serial not in ['', '0']:
backup_info.append(("volume", volume_serial))
# 获取Windows安装时间
os_info = subprocess.check_output('wmic os get InstallDate', startupinfo=startupinfo).decode()
install_date = os_info.split('\n')[1].strip()
if install_date:
backup_info.append(("install", install_date))
if backup_info:
combined = "|".join(f"{k}:{v}" for k, v in sorted(backup_info))
hardware_id = hashlib.md5(combined.encode()).hexdigest()
_logger.info("使用系统信息生成ID成功")
return hardware_id
except Exception as e:
_logger.warning(f"方案2失败: {str(e)}")
# 方案3: 使用计算机名(最后的备选方案)
computer_name = platform.node()
if computer_name:
hardware_id = hashlib.md5(computer_name.encode()).hexdigest()
_logger.info("使用计算机名生成ID成功")
return hardware_id
raise ValueError("无法获取任何可用信息来生成硬件ID")
except Exception as e:
error_msg = f"生成硬件ID失败: {str(e)}"
_logger.error(error_msg)
raise RuntimeError(error_msg)
def verify_hardware_id(stored_id: str) -> bool:
"""
验证当前硬件ID是否与存储的ID匹配
Args:
stored_id: 存储的硬件ID
Returns:
bool: 如果匹配返回True否则返回False
"""
try:
current_id = get_hardware_id()
return current_id == stored_id
except Exception as e:
_logger.error(f"验证硬件ID失败: {str(e)}")
return False
def get_system_info() -> dict:
"""
获取系统信息
Returns:
dict: 包含系统信息的字典
"""
info = {
"os": platform.system(),
"os_version": platform.version(),
"machine": platform.machine(),
"processor": platform.processor(),
"node": platform.node()
}
if sys.platform == "win32":
try:
# 获取更多Windows特定信息
startupinfo = subprocess.STARTUPINFO()
startupinfo.dwFlags |= subprocess.STARTF_USESHOWWINDOW
startupinfo.wShowWindow = subprocess.SW_HIDE
# Windows产品ID
windows_info = subprocess.check_output('wmic os get SerialNumber', startupinfo=startupinfo).decode()
info["windows_id"] = windows_info.split('\n')[1].strip()
except Exception as e:
_logger.warning(f"获取Windows特定信息失败: {str(e)}")
return info
def activate_device(machine_id: str, activation_code: str) -> Tuple[bool, str, Optional[Dict]]:
"""
激活设备
Args:
machine_id: 设备ID
activation_code: 激活码
Returns:
Tuple[bool, str, Optional[Dict]]:
- bool: 是否成功
- str: 消息
- Optional[Dict]: 激活数据
"""
try:
# 禁用SSL警告
urllib3.disable_warnings()
# 准备请求数据
data = {
"machine_id": machine_id,
"code": activation_code
}
# 发送激活请求
response = requests.post(
config.activate_url,
json=data,
headers={
"Content-Type": "application/json",
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36",
"Accept": "*/*"
},
timeout=30,
verify=False
)
# 解析响应
result = response.json()
if result.get("code") == 200:
api_data = result.get("data", {})
# 获取缓存的设备信息
device_info = _get_cached_device_info()
# 构造状态数据
state_data = {
"is_activated": True,
"status": "active",
"expire_time": api_data.get("expire_time", ""),
"days_left": api_data.get("days_left", 0),
"total_days": api_data.get("total_days", 0),
"device_info": device_info
}
# 更新全局状态
update_user_state(state_data)
return True, f"激活成功,剩余{state_data['days_left']}", state_data
elif result.get("code") == 400:
error_msg = result.get("msg", "激活码无效或已被使用")
_logger.warning(f"激活失败: {error_msg}")
return False, error_msg, None
else:
error_msg = result.get("msg", "未知错误")
_logger.error(f"激活失败: {error_msg}")
return False, f"激活失败: {error_msg}", None
except requests.exceptions.RequestException as e:
error_msg = f"网络连接失败: {str(e)}"
_logger.error(error_msg)
return False, error_msg, None
except Exception as e:
error_msg = f"激活失败: {str(e)}"
_logger.error(error_msg)
return False, error_msg, None