import os import sys import requests from packaging import version from typing import Optional, Dict, Any import json import logging from urllib.parse import quote, unquote class VersionManager: """版本管理器 错误码说明: - 0: 成功 - 1: 一般性错误 - 401: 未授权或授权失败 - 404: 请求的资源不存在 - 500: 服务器内部错误 """ def __init__(self): self.base_url = "https://cursorapi.nosqli.com" self.current_version = self._get_current_version() self.platform = "windows" if sys.platform.startswith("win") else "mac" if sys.platform.startswith("darwin") else "linux" def _get_current_version(self) -> str: """获取当前版本号""" try: with open("version.txt", "r") as f: return f.read().strip() except FileNotFoundError: return "0.0.0" def _handle_response(self, response: requests.Response) -> Dict[str, Any]: """处理API响应 Args: response: API响应对象 Returns: Dict[str, Any]: 处理后的响应数据 Raises: Exception: API调用失败时抛出异常 """ try: data = response.json() code = data.get("code") msg = data.get("msg") or data.get("info", "未知错误") # 兼容 info 字段 if code == 0 or code == 1: # 兼容 code=1 的情况 # 处理空数据情况 if not data.get("data"): logging.warning("API返回空数据") return { "code": 0, "msg": msg, "data": { "has_update": False, "is_force": False, "version_info": None } } return { "code": 0, # 统一返回 code=0 "msg": msg, "data": data.get("data") } elif code == 401: raise Exception("未授权或授权失败") elif code == 404: raise Exception("请求的资源不存在") elif code == 500: raise Exception("服务器内部错误") else: raise Exception(msg) except requests.exceptions.JSONDecodeError: raise Exception("服务器响应格式错误") def check_update(self) -> Dict[str, Any]: """检查是否有更新""" try: url = f"{self.base_url}/admin/api.version/check" params = { "version": self.current_version, "platform": self.platform } logging.info(f"正在请求: {url}") logging.info(f"参数: {params}") response = requests.get( url, params=params, timeout=10 ) logging.info(f"状态码: {response.status_code}") logging.info(f"响应头: {dict(response.headers)}") logging.info(f"响应内容: {response.text}") return self._handle_response(response) except requests.exceptions.Timeout: logging.error("检查更新超时") return {"code": -1, "msg": "请求超时,请检查网络连接", "data": None} except requests.exceptions.ConnectionError as e: logging.error(f"检查更新连接失败: {str(e)}") return {"code": -1, "msg": "连接服务器失败,请检查网络连接", "data": None} except Exception as e: logging.error(f"检查更新失败: {str(e)}") return {"code": -1, "msg": str(e), "data": None} def get_latest_version(self) -> Dict[str, Any]: """获取最新版本信息""" try: url = f"{self.base_url}/admin/api.version/latest" params = {"platform": self.platform} logging.info(f"正在请求: {url}") logging.info(f"参数: {params}") response = requests.get( url, params=params, timeout=10 ) logging.info(f"状态码: {response.status_code}") logging.info(f"响应头: {dict(response.headers)}") logging.info(f"响应内容: {response.text}") return self._handle_response(response) except requests.exceptions.Timeout: logging.error("获取最新版本超时") return {"code": -1, "msg": "请求超时,请检查网络连接", "data": None} except requests.exceptions.ConnectionError as e: logging.error(f"获取最新版本连接失败: {str(e)}") return {"code": -1, "msg": "连接服务器失败,请检查网络连接", "data": None} except Exception as e: logging.error(f"获取最新版本失败: {str(e)}") return {"code": -1, "msg": str(e), "data": None} def needs_update(self) -> tuple[bool, bool, Optional[Dict[str, Any]]]: """检查是否需要更新 Returns: tuple: (是否有更新, 是否强制更新, 版本信息) """ result = self.check_update() if result["code"] == 0 and result["data"]: data = result["data"] return ( data["has_update"], bool(data.get("is_force")), data.get("version_info") ) return False, False, None def download_update(self, download_url: str, save_path: str) -> tuple[bool, str]: """下载更新文件 Args: download_url: 下载地址 save_path: 保存路径 Returns: tuple[bool, str]: (是否下载成功, 错误信息) """ try: if not download_url: error_msg = "下载地址为空,请联系管理员" logging.error(error_msg) return False, error_msg # 处理下载地址中的中文字符 url_parts = download_url.split('/') # 只对最后一部分(文件名)进行编码 url_parts[-1] = quote(url_parts[-1]) encoded_url = '/'.join(url_parts) logging.info(f"原始下载地址: {download_url}") logging.info(f"编码后的地址: {encoded_url}") # 设置请求头,模拟浏览器行为 headers = { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/121.0.0.0 Safari/537.36', 'Accept': '*/*', 'Accept-Encoding': 'gzip, deflate, br', 'Connection': 'keep-alive' } response = requests.get( encoded_url, stream=True, headers=headers, timeout=30 # 增加下载超时时间 ) # 检查响应状态 if response.status_code == 404: error_msg = "下载地址无效,请联系管理员更新下载地址" logging.error(error_msg) return False, error_msg response.raise_for_status() total_size = int(response.headers.get('content-length', 0)) if total_size == 0: error_msg = "无法获取文件大小,下载地址可能无效,请联系管理员" logging.error(error_msg) return False, error_msg block_size = 8192 downloaded_size = 0 logging.info(f"开始下载文件,总大小: {total_size} 字节") with open(save_path, 'wb') as f: for chunk in response.iter_content(chunk_size=block_size): if chunk: f.write(chunk) downloaded_size += len(chunk) # 打印下载进度 if total_size > 0: progress = (downloaded_size / total_size) * 100 logging.info(f"下载进度: {progress:.2f}%") # 验证文件大小 actual_size = os.path.getsize(save_path) if actual_size != total_size: error_msg = f"文件下载不完整: 预期{total_size}字节,实际{actual_size}字节,请重试或联系管理员" logging.error(error_msg) # 删除不完整文件 try: os.remove(save_path) logging.info(f"已删除不完整的下载文件: {save_path}") except Exception as clean_e: logging.error(f"清理不完整文件失败: {str(clean_e)}") return False, error_msg logging.info(f"文件下载完成: {save_path}") return True, "下载成功" except requests.exceptions.Timeout: error_msg = "下载超时,请检查网络连接后重试" logging.error(error_msg) return False, error_msg except requests.exceptions.ConnectionError as e: error_msg = "下载连接失败,请检查网络连接后重试" logging.error(f"{error_msg}: {str(e)}") return False, error_msg except requests.exceptions.HTTPError as e: error_msg = f"下载地址无效或服务器错误,请联系管理员 (HTTP {e.response.status_code})" logging.error(error_msg) return False, error_msg except Exception as e: error_msg = f"下载失败,请联系管理员: {str(e)}" logging.error(error_msg) # 如果下载失败,删除可能存在的不完整文件 try: if os.path.exists(save_path): os.remove(save_path) logging.info(f"已删除不完整的下载文件: {save_path}") except Exception as clean_e: logging.error(f"清理不完整文件失败: {str(clean_e)}") return False, error_msg