265 lines
10 KiB
Python
265 lines
10 KiB
Python
import os
|
|
import sys
|
|
import requests
|
|
from packaging import version
|
|
from typing import Optional, Dict, Any
|
|
import json
|
|
import logging
|
|
from urllib.parse import quote, unquote
|
|
|
|
class VersionManager:
|
|
"""版本管理器
|
|
|
|
错误码说明:
|
|
- 0: 成功
|
|
- 1: 一般性错误
|
|
- 401: 未授权或授权失败
|
|
- 404: 请求的资源不存在
|
|
- 500: 服务器内部错误
|
|
"""
|
|
|
|
def __init__(self):
|
|
self.base_url = "https://cursorapi.nosqli.com"
|
|
self.current_version = self._get_current_version()
|
|
self.platform = "windows" if sys.platform.startswith("win") else "mac" if sys.platform.startswith("darwin") else "linux"
|
|
|
|
def _get_current_version(self) -> str:
|
|
"""获取当前版本号"""
|
|
try:
|
|
with open("version.txt", "r") as f:
|
|
return f.read().strip()
|
|
except FileNotFoundError:
|
|
return "0.0.0"
|
|
|
|
def _handle_response(self, response: requests.Response) -> Dict[str, Any]:
|
|
"""处理API响应
|
|
|
|
Args:
|
|
response: API响应对象
|
|
|
|
Returns:
|
|
Dict[str, Any]: 处理后的响应数据
|
|
|
|
Raises:
|
|
Exception: API调用失败时抛出异常
|
|
"""
|
|
try:
|
|
data = response.json()
|
|
code = data.get("code")
|
|
msg = data.get("msg") or data.get("info", "未知错误") # 兼容 info 字段
|
|
|
|
if code == 0 or code == 1: # 兼容 code=1 的情况
|
|
# 处理空数据情况
|
|
if not data.get("data"):
|
|
logging.warning("API返回空数据")
|
|
return {
|
|
"code": 0,
|
|
"msg": msg,
|
|
"data": {
|
|
"has_update": False,
|
|
"is_force": False,
|
|
"version_info": None
|
|
}
|
|
}
|
|
return {
|
|
"code": 0, # 统一返回 code=0
|
|
"msg": msg,
|
|
"data": data.get("data")
|
|
}
|
|
elif code == 401:
|
|
raise Exception("未授权或授权失败")
|
|
elif code == 404:
|
|
raise Exception("请求的资源不存在")
|
|
elif code == 500:
|
|
raise Exception("服务器内部错误")
|
|
else:
|
|
raise Exception(msg)
|
|
|
|
except requests.exceptions.JSONDecodeError:
|
|
raise Exception("服务器响应格式错误")
|
|
|
|
def check_update(self) -> Dict[str, Any]:
|
|
"""检查是否有更新"""
|
|
try:
|
|
url = f"{self.base_url}/admin/api.version/check"
|
|
params = {
|
|
"version": self.current_version,
|
|
"platform": self.platform
|
|
}
|
|
logging.info(f"正在请求: {url}")
|
|
logging.info(f"参数: {params}")
|
|
|
|
response = requests.get(
|
|
url,
|
|
params=params,
|
|
timeout=10
|
|
)
|
|
|
|
logging.info(f"状态码: {response.status_code}")
|
|
logging.info(f"响应头: {dict(response.headers)}")
|
|
logging.info(f"响应内容: {response.text}")
|
|
|
|
return self._handle_response(response)
|
|
except requests.exceptions.Timeout:
|
|
logging.error("检查更新超时")
|
|
return {"code": -1, "msg": "请求超时,请检查网络连接", "data": None}
|
|
except requests.exceptions.ConnectionError as e:
|
|
logging.error(f"检查更新连接失败: {str(e)}")
|
|
return {"code": -1, "msg": "连接服务器失败,请检查网络连接", "data": None}
|
|
except Exception as e:
|
|
logging.error(f"检查更新失败: {str(e)}")
|
|
return {"code": -1, "msg": str(e), "data": None}
|
|
|
|
def get_latest_version(self) -> Dict[str, Any]:
|
|
"""获取最新版本信息"""
|
|
try:
|
|
url = f"{self.base_url}/admin/api.version/latest"
|
|
params = {"platform": self.platform}
|
|
logging.info(f"正在请求: {url}")
|
|
logging.info(f"参数: {params}")
|
|
|
|
response = requests.get(
|
|
url,
|
|
params=params,
|
|
timeout=10
|
|
)
|
|
|
|
logging.info(f"状态码: {response.status_code}")
|
|
logging.info(f"响应头: {dict(response.headers)}")
|
|
logging.info(f"响应内容: {response.text}")
|
|
|
|
return self._handle_response(response)
|
|
except requests.exceptions.Timeout:
|
|
logging.error("获取最新版本超时")
|
|
return {"code": -1, "msg": "请求超时,请检查网络连接", "data": None}
|
|
except requests.exceptions.ConnectionError as e:
|
|
logging.error(f"获取最新版本连接失败: {str(e)}")
|
|
return {"code": -1, "msg": "连接服务器失败,请检查网络连接", "data": None}
|
|
except Exception as e:
|
|
logging.error(f"获取最新版本失败: {str(e)}")
|
|
return {"code": -1, "msg": str(e), "data": None}
|
|
|
|
def needs_update(self) -> tuple[bool, bool, Optional[Dict[str, Any]]]:
|
|
"""检查是否需要更新
|
|
|
|
Returns:
|
|
tuple: (是否有更新, 是否强制更新, 版本信息)
|
|
"""
|
|
result = self.check_update()
|
|
if result["code"] == 0 and result["data"]:
|
|
data = result["data"]
|
|
return (
|
|
data["has_update"],
|
|
bool(data.get("is_force")),
|
|
data.get("version_info")
|
|
)
|
|
return False, False, None
|
|
|
|
def download_update(self, download_url: str, save_path: str) -> tuple[bool, str]:
|
|
"""下载更新文件
|
|
|
|
Args:
|
|
download_url: 下载地址
|
|
save_path: 保存路径
|
|
|
|
Returns:
|
|
tuple[bool, str]: (是否下载成功, 错误信息)
|
|
"""
|
|
try:
|
|
if not download_url:
|
|
error_msg = "下载地址为空,请联系管理员"
|
|
logging.error(error_msg)
|
|
return False, error_msg
|
|
|
|
# 处理下载地址中的中文字符
|
|
url_parts = download_url.split('/')
|
|
# 只对最后一部分(文件名)进行编码
|
|
url_parts[-1] = quote(url_parts[-1])
|
|
encoded_url = '/'.join(url_parts)
|
|
|
|
logging.info(f"原始下载地址: {download_url}")
|
|
logging.info(f"编码后的地址: {encoded_url}")
|
|
|
|
# 设置请求头,模拟浏览器行为
|
|
headers = {
|
|
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/121.0.0.0 Safari/537.36',
|
|
'Accept': '*/*',
|
|
'Accept-Encoding': 'gzip, deflate, br',
|
|
'Connection': 'keep-alive'
|
|
}
|
|
|
|
response = requests.get(
|
|
encoded_url,
|
|
stream=True,
|
|
headers=headers,
|
|
timeout=30 # 增加下载超时时间
|
|
)
|
|
|
|
# 检查响应状态
|
|
if response.status_code == 404:
|
|
error_msg = "下载地址无效,请联系管理员更新下载地址"
|
|
logging.error(error_msg)
|
|
return False, error_msg
|
|
|
|
response.raise_for_status()
|
|
|
|
total_size = int(response.headers.get('content-length', 0))
|
|
if total_size == 0:
|
|
error_msg = "无法获取文件大小,下载地址可能无效,请联系管理员"
|
|
logging.error(error_msg)
|
|
return False, error_msg
|
|
|
|
block_size = 8192
|
|
downloaded_size = 0
|
|
|
|
logging.info(f"开始下载文件,总大小: {total_size} 字节")
|
|
|
|
with open(save_path, 'wb') as f:
|
|
for chunk in response.iter_content(chunk_size=block_size):
|
|
if chunk:
|
|
f.write(chunk)
|
|
downloaded_size += len(chunk)
|
|
# 打印下载进度
|
|
if total_size > 0:
|
|
progress = (downloaded_size / total_size) * 100
|
|
logging.info(f"下载进度: {progress:.2f}%")
|
|
|
|
# 验证文件大小
|
|
actual_size = os.path.getsize(save_path)
|
|
if actual_size != total_size:
|
|
error_msg = f"文件下载不完整: 预期{total_size}字节,实际{actual_size}字节,请重试或联系管理员"
|
|
logging.error(error_msg)
|
|
# 删除不完整文件
|
|
try:
|
|
os.remove(save_path)
|
|
logging.info(f"已删除不完整的下载文件: {save_path}")
|
|
except Exception as clean_e:
|
|
logging.error(f"清理不完整文件失败: {str(clean_e)}")
|
|
return False, error_msg
|
|
|
|
logging.info(f"文件下载完成: {save_path}")
|
|
return True, "下载成功"
|
|
|
|
except requests.exceptions.Timeout:
|
|
error_msg = "下载超时,请检查网络连接后重试"
|
|
logging.error(error_msg)
|
|
return False, error_msg
|
|
except requests.exceptions.ConnectionError as e:
|
|
error_msg = "下载连接失败,请检查网络连接后重试"
|
|
logging.error(f"{error_msg}: {str(e)}")
|
|
return False, error_msg
|
|
except requests.exceptions.HTTPError as e:
|
|
error_msg = f"下载地址无效或服务器错误,请联系管理员 (HTTP {e.response.status_code})"
|
|
logging.error(error_msg)
|
|
return False, error_msg
|
|
except Exception as e:
|
|
error_msg = f"下载失败,请联系管理员: {str(e)}"
|
|
logging.error(error_msg)
|
|
# 如果下载失败,删除可能存在的不完整文件
|
|
try:
|
|
if os.path.exists(save_path):
|
|
os.remove(save_path)
|
|
logging.info(f"已删除不完整的下载文件: {save_path}")
|
|
except Exception as clean_e:
|
|
logging.error(f"清理不完整文件失败: {str(clean_e)}")
|
|
return False, error_msg |