#!/usr/bin/env python # -*- coding: utf-8 -*- """ 邮箱系统部署准备检查脚本 用于验证服务器环境是否满足系统运行的要求 """ import os import sys import platform import socket import subprocess import importlib import shutil import json from datetime import datetime # 颜色输出 class Colors: HEADER = '\033[95m' BLUE = '\033[94m' GREEN = '\033[92m' YELLOW = '\033[93m' RED = '\033[91m' ENDC = '\033[0m' BOLD = '\033[1m' UNDERLINE = '\033[4m' # 检查结果 class CheckResult: PASS = "PASS" WARN = "WARN" FAIL = "FAIL" INFO = "INFO" def print_header(title): """打印检查标题""" print(f"\n{Colors.HEADER}{Colors.BOLD}{'=' * 80}{Colors.ENDC}") print(f"{Colors.HEADER}{Colors.BOLD}{title.center(80)}{Colors.ENDC}") print(f"{Colors.HEADER}{Colors.BOLD}{'=' * 80}{Colors.ENDC}\n") def print_result(check_name, result, message, details=None): """打印检查结果""" if result == CheckResult.PASS: status = f"{Colors.GREEN}[{result}]{Colors.ENDC}" elif result == CheckResult.WARN: status = f"{Colors.YELLOW}[{result}]{Colors.ENDC}" elif result == CheckResult.FAIL: status = f"{Colors.RED}[{result}]{Colors.ENDC}" else: status = f"{Colors.BLUE}[{result}]{Colors.ENDC}" print(f"{status} {check_name}: {message}") if details: print(f" {Colors.BLUE}Details:{Colors.ENDC} {details}") def check_python_version(): """检查Python版本""" required_version = (3, 7) current_version = sys.version_info if current_version >= required_version: result = CheckResult.PASS message = f"Python版本 {sys.version.split()[0]} 满足要求 (最低要求: {required_version[0]}.{required_version[1]})" else: result = CheckResult.FAIL message = f"Python版本 {sys.version.split()[0]} 低于要求的最低版本 {required_version[0]}.{required_version[1]}" print_result("Python版本", result, message) return result == CheckResult.PASS def check_os(): """检查操作系统""" system = platform.system() release = platform.release() result = CheckResult.INFO message = f"操作系统: {system} {release}" if system.lower() == "windows": details = "Windows环境需要特殊配置,请确保已正确设置local_settings.py" else: details = "Linux环境适合生产部署" print_result("操作系统", result, message, details) return True def check_port_availability(host='0.0.0.0', ports=[25, 5000]): """检查端口是否可用""" all_available = True for port in ports: try: sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.bind((host, port)) sock.close() result = CheckResult.PASS message = f"端口 {port} 可用" details = None except socket.error as e: all_available = False result = CheckResult.FAIL message = f"端口 {port} 已被占用或无权限访问" details = f"错误: {str(e)}" # 对于Windows环境和低端口,提供额外提示 if platform.system().lower() == "windows" and port < 1024: details += "\n Windows环境下需要管理员权限才能使用低端口,或者使用大于1024的端口" print_result(f"端口 {port}", result, message, details) return all_available def check_dependencies(): """检查依赖库是否已安装""" required_packages = [ "flask", "python-dotenv", "aiosmtpd", "sqlalchemy", "pydantic", "requests", "pytz", "email-validator", "psutil" ] missing_packages = [] for package in required_packages: try: importlib.import_module(package.replace("-", "_")) result = CheckResult.PASS message = f"已安装" except ImportError: missing_packages.append(package) result = CheckResult.FAIL message = f"未安装" print_result(f"依赖库 {package}", result, message) if missing_packages: print(f"\n{Colors.YELLOW}缺少以下依赖库,可以使用以下命令安装:{Colors.ENDC}") print(f"pip install {' '.join(missing_packages)}") return len(missing_packages) == 0 def check_network(): """检查网络连接""" try: # 尝试连接到一个外部DNS服务器 socket.create_connection(("8.8.8.8", 53), timeout=5) result = CheckResult.PASS message = "网络连接正常" except socket.error: result = CheckResult.WARN message = "无法连接到互联网,可能会影响部分功能" print_result("网络连接", result, message) return result == CheckResult.PASS def check_disk_space(): """检查磁盘空间""" # 获取当前脚本所在目录的可用空间 if platform.system().lower() == "windows": try: import ctypes free_bytes = ctypes.c_ulonglong(0) ctypes.windll.kernel32.GetDiskFreeSpaceExW( ctypes.c_wchar_p(os.getcwd()), None, None, ctypes.pointer(free_bytes) ) free_space = free_bytes.value / (1024 * 1024 * 1024) # GB except: free_space = None else: try: st = os.statvfs(os.getcwd()) free_space = (st.f_bavail * st.f_frsize) / (1024 * 1024 * 1024) # GB except: free_space = None if free_space is not None: if free_space > 5: result = CheckResult.PASS message = f"磁盘空间充足: {free_space:.2f} GB可用" elif free_space > 1: result = CheckResult.WARN message = f"磁盘空间有限: {free_space:.2f} GB可用" else: result = CheckResult.FAIL message = f"磁盘空间不足: {free_space:.2f} GB可用" else: result = CheckResult.INFO message = "无法检查磁盘空间" print_result("磁盘空间", result, message) return result != CheckResult.FAIL def check_permissions(): """检查文件和目录权限""" # 检查当前目录是否可写 try: test_file = os.path.join(os.getcwd(), ".permission_test") with open(test_file, "w") as f: f.write("test") os.remove(test_file) result = CheckResult.PASS message = "当前目录可写" except: result = CheckResult.FAIL message = "当前目录不可写,可能影响系统运行" print_result("文件权限", result, message) return result == CheckResult.PASS def check_firewall(): """检查防火墙状态""" result = CheckResult.INFO if platform.system().lower() == "linux": # 检查常见的Linux防火墙 if shutil.which("ufw"): try: output = subprocess.check_output(["sudo", "ufw", "status"], universal_newlines=True) if "inactive" in output.lower(): message = "UFW防火墙未启用" else: message = "UFW防火墙已启用,请确保已开放25和5000端口" except: message = "无法检查UFW防火墙状态" elif shutil.which("firewall-cmd"): try: subprocess.check_call(["firewall-cmd", "--state"], stdout=subprocess.DEVNULL) message = "Firewalld防火墙已启用,请确保已开放25和5000端口" except: message = "Firewalld防火墙未启用或无法检查状态" else: message = "未检测到常见防火墙,请手动确认防火墙状态" elif platform.system().lower() == "windows": message = "Windows环境下请确保Windows防火墙已允许Python和相关端口" else: message = "未知操作系统,请手动检查防火墙状态" print_result("防火墙状态", result, message) return True def check_smtp_tools(): """检查是否存在常用SMTP测试工具""" tools = ["telnet", "nc", "curl"] available_tools = [] for tool in tools: if shutil.which(tool): available_tools.append(tool) if available_tools: result = CheckResult.PASS message = f"可用的网络工具: {', '.join(available_tools)}" else: result = CheckResult.WARN message = "未找到常用网络工具,可能影响故障排查" print_result("网络工具", result, message) return True def generate_report(checks): """生成检查报告""" report = { "timestamp": datetime.now().isoformat(), "system": platform.system(), "platform": platform.platform(), "python_version": sys.version, "checks": checks, "summary": { "pass": sum(1 for c in checks.values() if c["result"] == CheckResult.PASS), "warn": sum(1 for c in checks.values() if c["result"] == CheckResult.WARN), "fail": sum(1 for c in checks.values() if c["result"] == CheckResult.FAIL), "total": len(checks) } } with open("deployment_check_report.json", "w") as f: json.dump(report, f, indent=2) print(f"\n{Colors.BLUE}检查报告已保存到 deployment_check_report.json{Colors.ENDC}") def main(): """主函数""" print_header("邮箱系统部署环境检查") print(f"检查时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") print(f"检查目录: {os.getcwd()}") checks = {} # 基础环境检查 print_header("1. 基础环境检查") checks["python_version"] = {"result": CheckResult.PASS if check_python_version() else CheckResult.FAIL} checks["os"] = {"result": CheckResult.INFO} check_os() # 网络检查 print_header("2. 网络检查") checks["network"] = {"result": CheckResult.PASS if check_network() else CheckResult.WARN} checks["ports"] = {"result": CheckResult.PASS if check_port_availability() else CheckResult.FAIL} checks["firewall"] = {"result": CheckResult.INFO} check_firewall() # 依赖检查 print_header("3. 依赖检查") checks["dependencies"] = {"result": CheckResult.PASS if check_dependencies() else CheckResult.FAIL} # 系统资源检查 print_header("4. 系统资源检查") checks["disk_space"] = {"result": CheckResult.PASS if check_disk_space() else CheckResult.FAIL} checks["permissions"] = {"result": CheckResult.PASS if check_permissions() else CheckResult.FAIL} # 工具检查 print_header("5. 工具检查") checks["smtp_tools"] = {"result": CheckResult.PASS if check_smtp_tools() else CheckResult.WARN} # 总结 print_header("检查结果摘要") passes = sum(1 for c in checks.values() if c["result"] == CheckResult.PASS) warns = sum(1 for c in checks.values() if c["result"] == CheckResult.WARN) fails = sum(1 for c in checks.values() if c["result"] == CheckResult.FAIL) infos = sum(1 for c in checks.values() if c["result"] == CheckResult.INFO) print(f"{Colors.GREEN}通过: {passes}{Colors.ENDC}") print(f"{Colors.YELLOW}警告: {warns}{Colors.ENDC}") print(f"{Colors.RED}失败: {fails}{Colors.ENDC}") print(f"{Colors.BLUE}信息: {infos}{Colors.ENDC}") print(f"总检查项: {len(checks)}") if fails > 0: print(f"\n{Colors.RED}系统存在{fails}项关键问题,需要解决后再部署{Colors.ENDC}") readiness = "不适合部署" elif warns > 0: print(f"\n{Colors.YELLOW}系统存在{warns}项警告,建议处理后再部署{Colors.ENDC}") readiness = "需谨慎部署" else: print(f"\n{Colors.GREEN}系统检查通过,可以进行部署{Colors.ENDC}") readiness = "可以部署" print(f"\n{Colors.BOLD}部署就绪状态: {readiness}{Colors.ENDC}") # 生成报告 generate_report(checks) if __name__ == "__main__": try: main() except KeyboardInterrupt: print("\n检查被用户中断") except Exception as e: print(f"\n{Colors.RED}检查过程中出错: {str(e)}{Colors.ENDC}") import traceback traceback.print_exc()