355 lines
12 KiB
Python
355 lines
12 KiB
Python
#!/usr/bin/env python
|
||
# -*- coding: utf-8 -*-
|
||
|
||
"""
|
||
邮箱系统部署准备检查脚本
|
||
用于验证服务器环境是否满足系统运行的要求
|
||
"""
|
||
|
||
import os
|
||
import sys
|
||
import platform
|
||
import socket
|
||
import subprocess
|
||
import importlib
|
||
import shutil
|
||
import json
|
||
from datetime import datetime
|
||
|
||
# 颜色输出
|
||
class Colors:
|
||
HEADER = '\033[95m'
|
||
BLUE = '\033[94m'
|
||
GREEN = '\033[92m'
|
||
YELLOW = '\033[93m'
|
||
RED = '\033[91m'
|
||
ENDC = '\033[0m'
|
||
BOLD = '\033[1m'
|
||
UNDERLINE = '\033[4m'
|
||
|
||
# 检查结果
|
||
class CheckResult:
|
||
PASS = "PASS"
|
||
WARN = "WARN"
|
||
FAIL = "FAIL"
|
||
INFO = "INFO"
|
||
|
||
def print_header(title):
|
||
"""打印检查标题"""
|
||
print(f"\n{Colors.HEADER}{Colors.BOLD}{'=' * 80}{Colors.ENDC}")
|
||
print(f"{Colors.HEADER}{Colors.BOLD}{title.center(80)}{Colors.ENDC}")
|
||
print(f"{Colors.HEADER}{Colors.BOLD}{'=' * 80}{Colors.ENDC}\n")
|
||
|
||
def print_result(check_name, result, message, details=None):
|
||
"""打印检查结果"""
|
||
if result == CheckResult.PASS:
|
||
status = f"{Colors.GREEN}[{result}]{Colors.ENDC}"
|
||
elif result == CheckResult.WARN:
|
||
status = f"{Colors.YELLOW}[{result}]{Colors.ENDC}"
|
||
elif result == CheckResult.FAIL:
|
||
status = f"{Colors.RED}[{result}]{Colors.ENDC}"
|
||
else:
|
||
status = f"{Colors.BLUE}[{result}]{Colors.ENDC}"
|
||
|
||
print(f"{status} {check_name}: {message}")
|
||
|
||
if details:
|
||
print(f" {Colors.BLUE}Details:{Colors.ENDC} {details}")
|
||
|
||
def check_python_version():
|
||
"""检查Python版本"""
|
||
required_version = (3, 7)
|
||
current_version = sys.version_info
|
||
|
||
if current_version >= required_version:
|
||
result = CheckResult.PASS
|
||
message = f"Python版本 {sys.version.split()[0]} 满足要求 (最低要求: {required_version[0]}.{required_version[1]})"
|
||
else:
|
||
result = CheckResult.FAIL
|
||
message = f"Python版本 {sys.version.split()[0]} 低于要求的最低版本 {required_version[0]}.{required_version[1]}"
|
||
|
||
print_result("Python版本", result, message)
|
||
return result == CheckResult.PASS
|
||
|
||
def check_os():
|
||
"""检查操作系统"""
|
||
system = platform.system()
|
||
release = platform.release()
|
||
|
||
result = CheckResult.INFO
|
||
message = f"操作系统: {system} {release}"
|
||
|
||
if system.lower() == "windows":
|
||
details = "Windows环境需要特殊配置,请确保已正确设置local_settings.py"
|
||
else:
|
||
details = "Linux环境适合生产部署"
|
||
|
||
print_result("操作系统", result, message, details)
|
||
return True
|
||
|
||
def check_port_availability(host='0.0.0.0', ports=[25, 5000]):
|
||
"""检查端口是否可用"""
|
||
all_available = True
|
||
|
||
for port in ports:
|
||
try:
|
||
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||
sock.bind((host, port))
|
||
sock.close()
|
||
result = CheckResult.PASS
|
||
message = f"端口 {port} 可用"
|
||
details = None
|
||
except socket.error as e:
|
||
all_available = False
|
||
result = CheckResult.FAIL
|
||
message = f"端口 {port} 已被占用或无权限访问"
|
||
details = f"错误: {str(e)}"
|
||
|
||
# 对于Windows环境和低端口,提供额外提示
|
||
if platform.system().lower() == "windows" and port < 1024:
|
||
details += "\n Windows环境下需要管理员权限才能使用低端口,或者使用大于1024的端口"
|
||
|
||
print_result(f"端口 {port}", result, message, details)
|
||
|
||
return all_available
|
||
|
||
def check_dependencies():
|
||
"""检查依赖库是否已安装"""
|
||
required_packages = [
|
||
"flask", "python-dotenv", "aiosmtpd", "sqlalchemy",
|
||
"pydantic", "requests", "pytz", "email-validator",
|
||
"psutil"
|
||
]
|
||
|
||
missing_packages = []
|
||
|
||
for package in required_packages:
|
||
try:
|
||
importlib.import_module(package.replace("-", "_"))
|
||
result = CheckResult.PASS
|
||
message = f"已安装"
|
||
except ImportError:
|
||
missing_packages.append(package)
|
||
result = CheckResult.FAIL
|
||
message = f"未安装"
|
||
|
||
print_result(f"依赖库 {package}", result, message)
|
||
|
||
if missing_packages:
|
||
print(f"\n{Colors.YELLOW}缺少以下依赖库,可以使用以下命令安装:{Colors.ENDC}")
|
||
print(f"pip install {' '.join(missing_packages)}")
|
||
|
||
return len(missing_packages) == 0
|
||
|
||
def check_network():
|
||
"""检查网络连接"""
|
||
try:
|
||
# 尝试连接到一个外部DNS服务器
|
||
socket.create_connection(("8.8.8.8", 53), timeout=5)
|
||
result = CheckResult.PASS
|
||
message = "网络连接正常"
|
||
except socket.error:
|
||
result = CheckResult.WARN
|
||
message = "无法连接到互联网,可能会影响部分功能"
|
||
|
||
print_result("网络连接", result, message)
|
||
return result == CheckResult.PASS
|
||
|
||
def check_disk_space():
|
||
"""检查磁盘空间"""
|
||
# 获取当前脚本所在目录的可用空间
|
||
if platform.system().lower() == "windows":
|
||
try:
|
||
import ctypes
|
||
free_bytes = ctypes.c_ulonglong(0)
|
||
ctypes.windll.kernel32.GetDiskFreeSpaceExW(
|
||
ctypes.c_wchar_p(os.getcwd()),
|
||
None, None,
|
||
ctypes.pointer(free_bytes)
|
||
)
|
||
free_space = free_bytes.value / (1024 * 1024 * 1024) # GB
|
||
except:
|
||
free_space = None
|
||
else:
|
||
try:
|
||
st = os.statvfs(os.getcwd())
|
||
free_space = (st.f_bavail * st.f_frsize) / (1024 * 1024 * 1024) # GB
|
||
except:
|
||
free_space = None
|
||
|
||
if free_space is not None:
|
||
if free_space > 5:
|
||
result = CheckResult.PASS
|
||
message = f"磁盘空间充足: {free_space:.2f} GB可用"
|
||
elif free_space > 1:
|
||
result = CheckResult.WARN
|
||
message = f"磁盘空间有限: {free_space:.2f} GB可用"
|
||
else:
|
||
result = CheckResult.FAIL
|
||
message = f"磁盘空间不足: {free_space:.2f} GB可用"
|
||
else:
|
||
result = CheckResult.INFO
|
||
message = "无法检查磁盘空间"
|
||
|
||
print_result("磁盘空间", result, message)
|
||
return result != CheckResult.FAIL
|
||
|
||
def check_permissions():
|
||
"""检查文件和目录权限"""
|
||
# 检查当前目录是否可写
|
||
try:
|
||
test_file = os.path.join(os.getcwd(), ".permission_test")
|
||
with open(test_file, "w") as f:
|
||
f.write("test")
|
||
os.remove(test_file)
|
||
result = CheckResult.PASS
|
||
message = "当前目录可写"
|
||
except:
|
||
result = CheckResult.FAIL
|
||
message = "当前目录不可写,可能影响系统运行"
|
||
|
||
print_result("文件权限", result, message)
|
||
return result == CheckResult.PASS
|
||
|
||
def check_firewall():
|
||
"""检查防火墙状态"""
|
||
result = CheckResult.INFO
|
||
|
||
if platform.system().lower() == "linux":
|
||
# 检查常见的Linux防火墙
|
||
if shutil.which("ufw"):
|
||
try:
|
||
output = subprocess.check_output(["sudo", "ufw", "status"], universal_newlines=True)
|
||
if "inactive" in output.lower():
|
||
message = "UFW防火墙未启用"
|
||
else:
|
||
message = "UFW防火墙已启用,请确保已开放25和5000端口"
|
||
except:
|
||
message = "无法检查UFW防火墙状态"
|
||
elif shutil.which("firewall-cmd"):
|
||
try:
|
||
subprocess.check_call(["firewall-cmd", "--state"], stdout=subprocess.DEVNULL)
|
||
message = "Firewalld防火墙已启用,请确保已开放25和5000端口"
|
||
except:
|
||
message = "Firewalld防火墙未启用或无法检查状态"
|
||
else:
|
||
message = "未检测到常见防火墙,请手动确认防火墙状态"
|
||
elif platform.system().lower() == "windows":
|
||
message = "Windows环境下请确保Windows防火墙已允许Python和相关端口"
|
||
else:
|
||
message = "未知操作系统,请手动检查防火墙状态"
|
||
|
||
print_result("防火墙状态", result, message)
|
||
return True
|
||
|
||
def check_smtp_tools():
|
||
"""检查是否存在常用SMTP测试工具"""
|
||
tools = ["telnet", "nc", "curl"]
|
||
available_tools = []
|
||
|
||
for tool in tools:
|
||
if shutil.which(tool):
|
||
available_tools.append(tool)
|
||
|
||
if available_tools:
|
||
result = CheckResult.PASS
|
||
message = f"可用的网络工具: {', '.join(available_tools)}"
|
||
else:
|
||
result = CheckResult.WARN
|
||
message = "未找到常用网络工具,可能影响故障排查"
|
||
|
||
print_result("网络工具", result, message)
|
||
return True
|
||
|
||
def generate_report(checks):
|
||
"""生成检查报告"""
|
||
report = {
|
||
"timestamp": datetime.now().isoformat(),
|
||
"system": platform.system(),
|
||
"platform": platform.platform(),
|
||
"python_version": sys.version,
|
||
"checks": checks,
|
||
"summary": {
|
||
"pass": sum(1 for c in checks.values() if c["result"] == CheckResult.PASS),
|
||
"warn": sum(1 for c in checks.values() if c["result"] == CheckResult.WARN),
|
||
"fail": sum(1 for c in checks.values() if c["result"] == CheckResult.FAIL),
|
||
"total": len(checks)
|
||
}
|
||
}
|
||
|
||
with open("deployment_check_report.json", "w") as f:
|
||
json.dump(report, f, indent=2)
|
||
|
||
print(f"\n{Colors.BLUE}检查报告已保存到 deployment_check_report.json{Colors.ENDC}")
|
||
|
||
def main():
|
||
"""主函数"""
|
||
print_header("邮箱系统部署环境检查")
|
||
print(f"检查时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
|
||
print(f"检查目录: {os.getcwd()}")
|
||
|
||
checks = {}
|
||
|
||
# 基础环境检查
|
||
print_header("1. 基础环境检查")
|
||
checks["python_version"] = {"result": CheckResult.PASS if check_python_version() else CheckResult.FAIL}
|
||
checks["os"] = {"result": CheckResult.INFO}
|
||
check_os()
|
||
|
||
# 网络检查
|
||
print_header("2. 网络检查")
|
||
checks["network"] = {"result": CheckResult.PASS if check_network() else CheckResult.WARN}
|
||
checks["ports"] = {"result": CheckResult.PASS if check_port_availability() else CheckResult.FAIL}
|
||
checks["firewall"] = {"result": CheckResult.INFO}
|
||
check_firewall()
|
||
|
||
# 依赖检查
|
||
print_header("3. 依赖检查")
|
||
checks["dependencies"] = {"result": CheckResult.PASS if check_dependencies() else CheckResult.FAIL}
|
||
|
||
# 系统资源检查
|
||
print_header("4. 系统资源检查")
|
||
checks["disk_space"] = {"result": CheckResult.PASS if check_disk_space() else CheckResult.FAIL}
|
||
checks["permissions"] = {"result": CheckResult.PASS if check_permissions() else CheckResult.FAIL}
|
||
|
||
# 工具检查
|
||
print_header("5. 工具检查")
|
||
checks["smtp_tools"] = {"result": CheckResult.PASS if check_smtp_tools() else CheckResult.WARN}
|
||
|
||
# 总结
|
||
print_header("检查结果摘要")
|
||
passes = sum(1 for c in checks.values() if c["result"] == CheckResult.PASS)
|
||
warns = sum(1 for c in checks.values() if c["result"] == CheckResult.WARN)
|
||
fails = sum(1 for c in checks.values() if c["result"] == CheckResult.FAIL)
|
||
infos = sum(1 for c in checks.values() if c["result"] == CheckResult.INFO)
|
||
|
||
print(f"{Colors.GREEN}通过: {passes}{Colors.ENDC}")
|
||
print(f"{Colors.YELLOW}警告: {warns}{Colors.ENDC}")
|
||
print(f"{Colors.RED}失败: {fails}{Colors.ENDC}")
|
||
print(f"{Colors.BLUE}信息: {infos}{Colors.ENDC}")
|
||
print(f"总检查项: {len(checks)}")
|
||
|
||
if fails > 0:
|
||
print(f"\n{Colors.RED}系统存在{fails}项关键问题,需要解决后再部署{Colors.ENDC}")
|
||
readiness = "不适合部署"
|
||
elif warns > 0:
|
||
print(f"\n{Colors.YELLOW}系统存在{warns}项警告,建议处理后再部署{Colors.ENDC}")
|
||
readiness = "需谨慎部署"
|
||
else:
|
||
print(f"\n{Colors.GREEN}系统检查通过,可以进行部署{Colors.ENDC}")
|
||
readiness = "可以部署"
|
||
|
||
print(f"\n{Colors.BOLD}部署就绪状态: {readiness}{Colors.ENDC}")
|
||
|
||
# 生成报告
|
||
generate_report(checks)
|
||
|
||
if __name__ == "__main__":
|
||
try:
|
||
main()
|
||
except KeyboardInterrupt:
|
||
print("\n检查被用户中断")
|
||
except Exception as e:
|
||
print(f"\n{Colors.RED}检查过程中出错: {str(e)}{Colors.ENDC}")
|
||
import traceback
|
||
traceback.print_exc() |