Files
emailsystem/check_deployment_readiness.py
2025-02-25 19:50:00 +08:00

355 lines
12 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
邮箱系统部署准备检查脚本
用于验证服务器环境是否满足系统运行的要求
"""
import os
import sys
import platform
import socket
import subprocess
import importlib
import shutil
import json
from datetime import datetime
# 颜色输出
class Colors:
HEADER = '\033[95m'
BLUE = '\033[94m'
GREEN = '\033[92m'
YELLOW = '\033[93m'
RED = '\033[91m'
ENDC = '\033[0m'
BOLD = '\033[1m'
UNDERLINE = '\033[4m'
# 检查结果
class CheckResult:
PASS = "PASS"
WARN = "WARN"
FAIL = "FAIL"
INFO = "INFO"
def print_header(title):
"""打印检查标题"""
print(f"\n{Colors.HEADER}{Colors.BOLD}{'=' * 80}{Colors.ENDC}")
print(f"{Colors.HEADER}{Colors.BOLD}{title.center(80)}{Colors.ENDC}")
print(f"{Colors.HEADER}{Colors.BOLD}{'=' * 80}{Colors.ENDC}\n")
def print_result(check_name, result, message, details=None):
"""打印检查结果"""
if result == CheckResult.PASS:
status = f"{Colors.GREEN}[{result}]{Colors.ENDC}"
elif result == CheckResult.WARN:
status = f"{Colors.YELLOW}[{result}]{Colors.ENDC}"
elif result == CheckResult.FAIL:
status = f"{Colors.RED}[{result}]{Colors.ENDC}"
else:
status = f"{Colors.BLUE}[{result}]{Colors.ENDC}"
print(f"{status} {check_name}: {message}")
if details:
print(f" {Colors.BLUE}Details:{Colors.ENDC} {details}")
def check_python_version():
"""检查Python版本"""
required_version = (3, 7)
current_version = sys.version_info
if current_version >= required_version:
result = CheckResult.PASS
message = f"Python版本 {sys.version.split()[0]} 满足要求 (最低要求: {required_version[0]}.{required_version[1]})"
else:
result = CheckResult.FAIL
message = f"Python版本 {sys.version.split()[0]} 低于要求的最低版本 {required_version[0]}.{required_version[1]}"
print_result("Python版本", result, message)
return result == CheckResult.PASS
def check_os():
"""检查操作系统"""
system = platform.system()
release = platform.release()
result = CheckResult.INFO
message = f"操作系统: {system} {release}"
if system.lower() == "windows":
details = "Windows环境需要特殊配置请确保已正确设置local_settings.py"
else:
details = "Linux环境适合生产部署"
print_result("操作系统", result, message, details)
return True
def check_port_availability(host='0.0.0.0', ports=[25, 5000]):
"""检查端口是否可用"""
all_available = True
for port in ports:
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.bind((host, port))
sock.close()
result = CheckResult.PASS
message = f"端口 {port} 可用"
details = None
except socket.error as e:
all_available = False
result = CheckResult.FAIL
message = f"端口 {port} 已被占用或无权限访问"
details = f"错误: {str(e)}"
# 对于Windows环境和低端口提供额外提示
if platform.system().lower() == "windows" and port < 1024:
details += "\n Windows环境下需要管理员权限才能使用低端口或者使用大于1024的端口"
print_result(f"端口 {port}", result, message, details)
return all_available
def check_dependencies():
"""检查依赖库是否已安装"""
required_packages = [
"flask", "python-dotenv", "aiosmtpd", "sqlalchemy",
"pydantic", "requests", "pytz", "email-validator",
"psutil"
]
missing_packages = []
for package in required_packages:
try:
importlib.import_module(package.replace("-", "_"))
result = CheckResult.PASS
message = f"已安装"
except ImportError:
missing_packages.append(package)
result = CheckResult.FAIL
message = f"未安装"
print_result(f"依赖库 {package}", result, message)
if missing_packages:
print(f"\n{Colors.YELLOW}缺少以下依赖库,可以使用以下命令安装:{Colors.ENDC}")
print(f"pip install {' '.join(missing_packages)}")
return len(missing_packages) == 0
def check_network():
"""检查网络连接"""
try:
# 尝试连接到一个外部DNS服务器
socket.create_connection(("8.8.8.8", 53), timeout=5)
result = CheckResult.PASS
message = "网络连接正常"
except socket.error:
result = CheckResult.WARN
message = "无法连接到互联网,可能会影响部分功能"
print_result("网络连接", result, message)
return result == CheckResult.PASS
def check_disk_space():
"""检查磁盘空间"""
# 获取当前脚本所在目录的可用空间
if platform.system().lower() == "windows":
try:
import ctypes
free_bytes = ctypes.c_ulonglong(0)
ctypes.windll.kernel32.GetDiskFreeSpaceExW(
ctypes.c_wchar_p(os.getcwd()),
None, None,
ctypes.pointer(free_bytes)
)
free_space = free_bytes.value / (1024 * 1024 * 1024) # GB
except:
free_space = None
else:
try:
st = os.statvfs(os.getcwd())
free_space = (st.f_bavail * st.f_frsize) / (1024 * 1024 * 1024) # GB
except:
free_space = None
if free_space is not None:
if free_space > 5:
result = CheckResult.PASS
message = f"磁盘空间充足: {free_space:.2f} GB可用"
elif free_space > 1:
result = CheckResult.WARN
message = f"磁盘空间有限: {free_space:.2f} GB可用"
else:
result = CheckResult.FAIL
message = f"磁盘空间不足: {free_space:.2f} GB可用"
else:
result = CheckResult.INFO
message = "无法检查磁盘空间"
print_result("磁盘空间", result, message)
return result != CheckResult.FAIL
def check_permissions():
"""检查文件和目录权限"""
# 检查当前目录是否可写
try:
test_file = os.path.join(os.getcwd(), ".permission_test")
with open(test_file, "w") as f:
f.write("test")
os.remove(test_file)
result = CheckResult.PASS
message = "当前目录可写"
except:
result = CheckResult.FAIL
message = "当前目录不可写,可能影响系统运行"
print_result("文件权限", result, message)
return result == CheckResult.PASS
def check_firewall():
"""检查防火墙状态"""
result = CheckResult.INFO
if platform.system().lower() == "linux":
# 检查常见的Linux防火墙
if shutil.which("ufw"):
try:
output = subprocess.check_output(["sudo", "ufw", "status"], universal_newlines=True)
if "inactive" in output.lower():
message = "UFW防火墙未启用"
else:
message = "UFW防火墙已启用请确保已开放25和5000端口"
except:
message = "无法检查UFW防火墙状态"
elif shutil.which("firewall-cmd"):
try:
subprocess.check_call(["firewall-cmd", "--state"], stdout=subprocess.DEVNULL)
message = "Firewalld防火墙已启用请确保已开放25和5000端口"
except:
message = "Firewalld防火墙未启用或无法检查状态"
else:
message = "未检测到常见防火墙,请手动确认防火墙状态"
elif platform.system().lower() == "windows":
message = "Windows环境下请确保Windows防火墙已允许Python和相关端口"
else:
message = "未知操作系统,请手动检查防火墙状态"
print_result("防火墙状态", result, message)
return True
def check_smtp_tools():
"""检查是否存在常用SMTP测试工具"""
tools = ["telnet", "nc", "curl"]
available_tools = []
for tool in tools:
if shutil.which(tool):
available_tools.append(tool)
if available_tools:
result = CheckResult.PASS
message = f"可用的网络工具: {', '.join(available_tools)}"
else:
result = CheckResult.WARN
message = "未找到常用网络工具,可能影响故障排查"
print_result("网络工具", result, message)
return True
def generate_report(checks):
"""生成检查报告"""
report = {
"timestamp": datetime.now().isoformat(),
"system": platform.system(),
"platform": platform.platform(),
"python_version": sys.version,
"checks": checks,
"summary": {
"pass": sum(1 for c in checks.values() if c["result"] == CheckResult.PASS),
"warn": sum(1 for c in checks.values() if c["result"] == CheckResult.WARN),
"fail": sum(1 for c in checks.values() if c["result"] == CheckResult.FAIL),
"total": len(checks)
}
}
with open("deployment_check_report.json", "w") as f:
json.dump(report, f, indent=2)
print(f"\n{Colors.BLUE}检查报告已保存到 deployment_check_report.json{Colors.ENDC}")
def main():
"""主函数"""
print_header("邮箱系统部署环境检查")
print(f"检查时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
print(f"检查目录: {os.getcwd()}")
checks = {}
# 基础环境检查
print_header("1. 基础环境检查")
checks["python_version"] = {"result": CheckResult.PASS if check_python_version() else CheckResult.FAIL}
checks["os"] = {"result": CheckResult.INFO}
check_os()
# 网络检查
print_header("2. 网络检查")
checks["network"] = {"result": CheckResult.PASS if check_network() else CheckResult.WARN}
checks["ports"] = {"result": CheckResult.PASS if check_port_availability() else CheckResult.FAIL}
checks["firewall"] = {"result": CheckResult.INFO}
check_firewall()
# 依赖检查
print_header("3. 依赖检查")
checks["dependencies"] = {"result": CheckResult.PASS if check_dependencies() else CheckResult.FAIL}
# 系统资源检查
print_header("4. 系统资源检查")
checks["disk_space"] = {"result": CheckResult.PASS if check_disk_space() else CheckResult.FAIL}
checks["permissions"] = {"result": CheckResult.PASS if check_permissions() else CheckResult.FAIL}
# 工具检查
print_header("5. 工具检查")
checks["smtp_tools"] = {"result": CheckResult.PASS if check_smtp_tools() else CheckResult.WARN}
# 总结
print_header("检查结果摘要")
passes = sum(1 for c in checks.values() if c["result"] == CheckResult.PASS)
warns = sum(1 for c in checks.values() if c["result"] == CheckResult.WARN)
fails = sum(1 for c in checks.values() if c["result"] == CheckResult.FAIL)
infos = sum(1 for c in checks.values() if c["result"] == CheckResult.INFO)
print(f"{Colors.GREEN}通过: {passes}{Colors.ENDC}")
print(f"{Colors.YELLOW}警告: {warns}{Colors.ENDC}")
print(f"{Colors.RED}失败: {fails}{Colors.ENDC}")
print(f"{Colors.BLUE}信息: {infos}{Colors.ENDC}")
print(f"总检查项: {len(checks)}")
if fails > 0:
print(f"\n{Colors.RED}系统存在{fails}项关键问题,需要解决后再部署{Colors.ENDC}")
readiness = "不适合部署"
elif warns > 0:
print(f"\n{Colors.YELLOW}系统存在{warns}项警告,建议处理后再部署{Colors.ENDC}")
readiness = "需谨慎部署"
else:
print(f"\n{Colors.GREEN}系统检查通过,可以进行部署{Colors.ENDC}")
readiness = "可以部署"
print(f"\n{Colors.BOLD}部署就绪状态: {readiness}{Colors.ENDC}")
# 生成报告
generate_report(checks)
if __name__ == "__main__":
try:
main()
except KeyboardInterrupt:
print("\n检查被用户中断")
except Exception as e:
print(f"\n{Colors.RED}检查过程中出错: {str(e)}{Colors.ENDC}")
import traceback
traceback.print_exc()