feat: 完善版本管理功能 1. 优化使用步骤说明的样式,调整行高和字体大小,提升可读性 2. 新增版本截图 3. 更新依赖包版本 4. 添加版本管理相关文件

This commit is contained in:
huangzhenpc
2025-02-13 15:16:55 +08:00
parent e3b5820d8b
commit d81244c7e4
8 changed files with 936 additions and 82 deletions

265
utils/version_manager.py Normal file
View File

@@ -0,0 +1,265 @@
import os
import sys
import requests
from packaging import version
from typing import Optional, Dict, Any
import json
import logging
from urllib.parse import quote, unquote
class VersionManager:
"""版本管理器
错误码说明:
- 0: 成功
- 1: 一般性错误
- 401: 未授权或授权失败
- 404: 请求的资源不存在
- 500: 服务器内部错误
"""
def __init__(self):
self.base_url = "https://cursorapi.nosqli.com"
self.current_version = self._get_current_version()
self.platform = "windows" if sys.platform.startswith("win") else "mac" if sys.platform.startswith("darwin") else "linux"
def _get_current_version(self) -> str:
"""获取当前版本号"""
try:
with open("version.txt", "r") as f:
return f.read().strip()
except FileNotFoundError:
return "0.0.0"
def _handle_response(self, response: requests.Response) -> Dict[str, Any]:
"""处理API响应
Args:
response: API响应对象
Returns:
Dict[str, Any]: 处理后的响应数据
Raises:
Exception: API调用失败时抛出异常
"""
try:
data = response.json()
code = data.get("code")
msg = data.get("msg") or data.get("info", "未知错误") # 兼容 info 字段
if code == 0 or code == 1: # 兼容 code=1 的情况
# 处理空数据情况
if not data.get("data"):
logging.warning("API返回空数据")
return {
"code": 0,
"msg": msg,
"data": {
"has_update": False,
"is_force": False,
"version_info": None
}
}
return {
"code": 0, # 统一返回 code=0
"msg": msg,
"data": data.get("data")
}
elif code == 401:
raise Exception("未授权或授权失败")
elif code == 404:
raise Exception("请求的资源不存在")
elif code == 500:
raise Exception("服务器内部错误")
else:
raise Exception(msg)
except requests.exceptions.JSONDecodeError:
raise Exception("服务器响应格式错误")
def check_update(self) -> Dict[str, Any]:
"""检查是否有更新"""
try:
url = f"{self.base_url}/admin/api.version/check"
params = {
"version": self.current_version,
"platform": self.platform
}
logging.info(f"正在请求: {url}")
logging.info(f"参数: {params}")
response = requests.get(
url,
params=params,
timeout=10
)
logging.info(f"状态码: {response.status_code}")
logging.info(f"响应头: {dict(response.headers)}")
logging.info(f"响应内容: {response.text}")
return self._handle_response(response)
except requests.exceptions.Timeout:
logging.error("检查更新超时")
return {"code": -1, "msg": "请求超时,请检查网络连接", "data": None}
except requests.exceptions.ConnectionError as e:
logging.error(f"检查更新连接失败: {str(e)}")
return {"code": -1, "msg": "连接服务器失败,请检查网络连接", "data": None}
except Exception as e:
logging.error(f"检查更新失败: {str(e)}")
return {"code": -1, "msg": str(e), "data": None}
def get_latest_version(self) -> Dict[str, Any]:
"""获取最新版本信息"""
try:
url = f"{self.base_url}/admin/api.version/latest"
params = {"platform": self.platform}
logging.info(f"正在请求: {url}")
logging.info(f"参数: {params}")
response = requests.get(
url,
params=params,
timeout=10
)
logging.info(f"状态码: {response.status_code}")
logging.info(f"响应头: {dict(response.headers)}")
logging.info(f"响应内容: {response.text}")
return self._handle_response(response)
except requests.exceptions.Timeout:
logging.error("获取最新版本超时")
return {"code": -1, "msg": "请求超时,请检查网络连接", "data": None}
except requests.exceptions.ConnectionError as e:
logging.error(f"获取最新版本连接失败: {str(e)}")
return {"code": -1, "msg": "连接服务器失败,请检查网络连接", "data": None}
except Exception as e:
logging.error(f"获取最新版本失败: {str(e)}")
return {"code": -1, "msg": str(e), "data": None}
def needs_update(self) -> tuple[bool, bool, Optional[Dict[str, Any]]]:
"""检查是否需要更新
Returns:
tuple: (是否有更新, 是否强制更新, 版本信息)
"""
result = self.check_update()
if result["code"] == 0 and result["data"]:
data = result["data"]
return (
data["has_update"],
bool(data.get("is_force")),
data.get("version_info")
)
return False, False, None
def download_update(self, download_url: str, save_path: str) -> tuple[bool, str]:
"""下载更新文件
Args:
download_url: 下载地址
save_path: 保存路径
Returns:
tuple[bool, str]: (是否下载成功, 错误信息)
"""
try:
if not download_url:
error_msg = "下载地址为空,请联系管理员"
logging.error(error_msg)
return False, error_msg
# 处理下载地址中的中文字符
url_parts = download_url.split('/')
# 只对最后一部分(文件名)进行编码
url_parts[-1] = quote(url_parts[-1])
encoded_url = '/'.join(url_parts)
logging.info(f"原始下载地址: {download_url}")
logging.info(f"编码后的地址: {encoded_url}")
# 设置请求头,模拟浏览器行为
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/121.0.0.0 Safari/537.36',
'Accept': '*/*',
'Accept-Encoding': 'gzip, deflate, br',
'Connection': 'keep-alive'
}
response = requests.get(
encoded_url,
stream=True,
headers=headers,
timeout=30 # 增加下载超时时间
)
# 检查响应状态
if response.status_code == 404:
error_msg = "下载地址无效,请联系管理员更新下载地址"
logging.error(error_msg)
return False, error_msg
response.raise_for_status()
total_size = int(response.headers.get('content-length', 0))
if total_size == 0:
error_msg = "无法获取文件大小,下载地址可能无效,请联系管理员"
logging.error(error_msg)
return False, error_msg
block_size = 8192
downloaded_size = 0
logging.info(f"开始下载文件,总大小: {total_size} 字节")
with open(save_path, 'wb') as f:
for chunk in response.iter_content(chunk_size=block_size):
if chunk:
f.write(chunk)
downloaded_size += len(chunk)
# 打印下载进度
if total_size > 0:
progress = (downloaded_size / total_size) * 100
logging.info(f"下载进度: {progress:.2f}%")
# 验证文件大小
actual_size = os.path.getsize(save_path)
if actual_size != total_size:
error_msg = f"文件下载不完整: 预期{total_size}字节,实际{actual_size}字节,请重试或联系管理员"
logging.error(error_msg)
# 删除不完整文件
try:
os.remove(save_path)
logging.info(f"已删除不完整的下载文件: {save_path}")
except Exception as clean_e:
logging.error(f"清理不完整文件失败: {str(clean_e)}")
return False, error_msg
logging.info(f"文件下载完成: {save_path}")
return True, "下载成功"
except requests.exceptions.Timeout:
error_msg = "下载超时,请检查网络连接后重试"
logging.error(error_msg)
return False, error_msg
except requests.exceptions.ConnectionError as e:
error_msg = "下载连接失败,请检查网络连接后重试"
logging.error(f"{error_msg}: {str(e)}")
return False, error_msg
except requests.exceptions.HTTPError as e:
error_msg = f"下载地址无效或服务器错误,请联系管理员 (HTTP {e.response.status_code})"
logging.error(error_msg)
return False, error_msg
except Exception as e:
error_msg = f"下载失败,请联系管理员: {str(e)}"
logging.error(error_msg)
# 如果下载失败,删除可能存在的不完整文件
try:
if os.path.exists(save_path):
os.remove(save_path)
logging.info(f"已删除不完整的下载文件: {save_path}")
except Exception as clean_e:
logging.error(f"清理不完整文件失败: {str(clean_e)}")
return False, error_msg