import os import sys import requests from packaging import version from typing import Optional, Dict, Any import json import logging from urllib.parse import quote, unquote from pathlib import Path class VersionManager: """版本管理器 错误码说明: - 0: 成功 - 1: 一般性错误 - 401: 未授权或授权失败 - 404: 请求的资源不存在 - 500: 服务器内部错误 """ def __init__(self): self.base_url = "https://cursorapi.nosqli.com" # 获取项目根目录路径 self.root_path = Path(__file__).parent.parent self.current_version = self._get_current_version() self.platform = "windows" if sys.platform.startswith("win") else "mac" if sys.platform.startswith("darwin") else "linux" def _get_current_version(self) -> str: """获取当前版本号""" try: version_file = self.root_path / "version.txt" if not version_file.exists(): logging.error(f"版本文件不存在: {version_file}") return "0.0.0" with open(version_file, "r", encoding="utf-8") as f: version = f.read().strip() logging.info(f"当前版本: {version}") return version except Exception as e: logging.error(f"读取版本号失败: {str(e)}") return "0.0.0" def _handle_response(self, response: requests.Response) -> Dict[str, Any]: """处理API响应 Args: response: API响应对象 Returns: Dict[str, Any]: 处理后的响应数据 Raises: Exception: API调用失败时抛出异常 """ try: data = response.json() code = data.get("code") msg = data.get("msg") or data.get("info", "未知错误") # 兼容 info 字段 if code == 0 or code == 1: # 兼容 code=1 的情况 # 处理空数据情况 if not data.get("data"): logging.warning("API返回空数据") return { "code": 0, "msg": msg, "data": { "has_update": False, "is_force": False, "version_info": None } } return { "code": 0, # 统一返回 code=0 "msg": msg, "data": data.get("data") } elif code == 401: raise Exception("未授权或授权失败") elif code == 404: raise Exception("请求的资源不存在") elif code == 500: raise Exception("服务器内部错误") else: raise Exception(msg) except requests.exceptions.JSONDecodeError: raise Exception("服务器响应格式错误") def check_update(self) -> Dict[str, Any]: """检查是否有更新""" try: url = f"{self.base_url}/admin/api.version/check" current_version = self.current_version.lstrip('v') # 移除可能存在的v前缀 params = { "version": current_version, "platform": self.platform } logging.info(f"正在请求: {url}") logging.info(f"参数: {params}") response = requests.get( url, params=params, timeout=10 ) logging.info(f"状态码: {response.status_code}") logging.info(f"响应头: {dict(response.headers)}") logging.info(f"响应内容: {response.text}") result = self._handle_response(response) # 确保返回的数据包含版本信息 if result["code"] == 0 and result.get("data"): data = result["data"] if "version_info" in data: version_info = data["version_info"] # 确保版本号格式一致 if "version_no" in version_info: version_info["version_no"] = version_info["version_no"].lstrip('v') return result except requests.exceptions.Timeout: logging.error("检查更新超时") return {"code": -1, "msg": "请求超时,请检查网络连接", "data": None} except requests.exceptions.ConnectionError as e: logging.error(f"检查更新连接失败: {str(e)}") return {"code": -1, "msg": "连接服务器失败,请检查网络连接", "data": None} except Exception as e: logging.error(f"检查更新失败: {str(e)}") return {"code": -1, "msg": str(e), "data": None} def get_latest_version(self) -> Dict[str, Any]: """获取最新版本信息""" try: url = f"{self.base_url}/admin/api.version/latest" params = {"platform": self.platform} logging.info(f"正在请求: {url}") logging.info(f"参数: {params}") response = requests.get( url, params=params, timeout=10 ) logging.info(f"状态码: {response.status_code}") logging.info(f"响应头: {dict(response.headers)}") logging.info(f"响应内容: {response.text}") return self._handle_response(response) except requests.exceptions.Timeout: logging.error("获取最新版本超时") return {"code": -1, "msg": "请求超时,请检查网络连接", "data": None} except requests.exceptions.ConnectionError as e: logging.error(f"获取最新版本连接失败: {str(e)}") return {"code": -1, "msg": "连接服务器失败,请检查网络连接", "data": None} except Exception as e: logging.error(f"获取最新版本失败: {str(e)}") return {"code": -1, "msg": str(e), "data": None} def needs_update(self) -> tuple[bool, bool, Optional[Dict[str, Any]]]: """检查是否需要更新 Returns: tuple: (是否有更新, 是否强制更新, 版本信息) """ try: result = self.check_update() if result["code"] == 0 and result["data"]: data = result["data"] version_info = data.get("version_info", {}) # 比较版本号(移除v前缀) current = self.current_version.lstrip('v') latest = version_info.get("version_no", "0.0.0").lstrip('v') # 使用packaging.version进行版本比较 has_update = version.parse(latest) > version.parse(current) return ( has_update, bool(data.get("is_force")), version_info ) return False, False, None except Exception as e: logging.error(f"检查更新失败: {str(e)}") return False, False, None def download_update(self, download_url: str, save_path: str) -> tuple[bool, str]: """下载更新文件 Args: download_url: 下载地址 save_path: 保存路径 Returns: tuple[bool, str]: (是否下载成功, 错误信息) """ try: if not download_url: error_msg = "下载地址为空,请联系管理员" logging.error(error_msg) return False, error_msg # 处理下载地址中的中文字符 url_parts = download_url.split('/') # 只对最后一部分(文件名)进行编码 url_parts[-1] = quote(url_parts[-1]) encoded_url = '/'.join(url_parts) logging.info(f"原始下载地址: {download_url}") logging.info(f"编码后的地址: {encoded_url}") # 设置请求头,模拟浏览器行为 headers = { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/121.0.0.0 Safari/537.36', 'Accept': '*/*', 'Accept-Encoding': 'gzip, deflate, br', 'Connection': 'keep-alive' } response = requests.get( encoded_url, stream=True, headers=headers, timeout=30 # 增加下载超时时间 ) # 检查响应状态 if response.status_code == 404: error_msg = "下载地址无效,请联系管理员更新下载地址" logging.error(error_msg) return False, error_msg response.raise_for_status() total_size = int(response.headers.get('content-length', 0)) if total_size == 0: error_msg = "无法获取文件大小,下载地址可能无效,请联系管理员" logging.error(error_msg) return False, error_msg block_size = 8192 downloaded_size = 0 logging.info(f"开始下载文件,总大小: {total_size} 字节") with open(save_path, 'wb') as f: for chunk in response.iter_content(chunk_size=block_size): if chunk: f.write(chunk) downloaded_size += len(chunk) # 打印下载进度 if total_size > 0: progress = (downloaded_size / total_size) * 100 logging.info(f"下载进度: {progress:.2f}%") # 验证文件大小 actual_size = os.path.getsize(save_path) if actual_size != total_size: error_msg = f"文件下载不完整: 预期{total_size}字节,实际{actual_size}字节,请重试或联系管理员" logging.error(error_msg) # 删除不完整文件 try: os.remove(save_path) logging.info(f"已删除不完整的下载文件: {save_path}") except Exception as clean_e: logging.error(f"清理不完整文件失败: {str(clean_e)}") return False, error_msg logging.info(f"文件下载完成: {save_path}") return True, "下载成功" except requests.exceptions.Timeout: error_msg = "下载超时,请检查网络连接后重试" logging.error(error_msg) return False, error_msg except requests.exceptions.ConnectionError as e: error_msg = "下载连接失败,请检查网络连接后重试" logging.error(f"{error_msg}: {str(e)}") return False, error_msg except requests.exceptions.HTTPError as e: error_msg = f"下载地址无效或服务器错误,请联系管理员 (HTTP {e.response.status_code})" logging.error(error_msg) return False, error_msg except Exception as e: error_msg = f"下载失败,请联系管理员: {str(e)}" logging.error(error_msg) # 如果下载失败,删除可能存在的不完整文件 try: if os.path.exists(save_path): os.remove(save_path) logging.info(f"已删除不完整的下载文件: {save_path}") except Exception as clean_e: logging.error(f"清理不完整文件失败: {str(clean_e)}") return False, error_msg