#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ 邮件系统监控脚本 此脚本用于监控邮件系统的状态,包括: - API可用性 - SMTP连接 - 磁盘使用量 - 系统资源使用情况 - 邮件队列状态 使用方法: python monitor_email_system.py [--host HOST] [--api-port API_PORT] [--smtp-port SMTP_PORT] [--alert] 选项: --host HOST 服务器主机名或IP地址 (默认: localhost) --api-port API_PORT API服务端口 (默认: 5000) --smtp-port SMTP_PORT SMTP服务端口 (默认: 25) --alert 启用警报 (通过邮件发送) --interval SECONDS 监控间隔,单位为秒 (默认: 300) --log 将结果记录到日志文件 --help 显示帮助信息 """ import os import sys import time import socket import smtplib import argparse import requests import platform import json import logging import datetime import subprocess import psutil from pathlib import Path from email.mime.text import MIMEText from email.mime.multipart import MIMEMultipart # 颜色定义(终端输出) class Colors: GREEN = '\033[92m' YELLOW = '\033[93m' RED = '\033[91m' BLUE = '\033[94m' ENDC = '\033[0m' BOLD = '\033[1m' # 配置日志 def setup_logging(log_enabled): logger = logging.getLogger('email_system_monitor') logger.setLevel(logging.INFO) # 控制台处理器 console_handler = logging.StreamHandler() console_handler.setLevel(logging.INFO) console_format = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s') console_handler.setFormatter(console_format) logger.addHandler(console_handler) # 文件处理器(如果启用) if log_enabled: log_dir = Path('logs') log_dir.mkdir(exist_ok=True) log_file = log_dir / 'email_system_monitor.log' file_handler = logging.FileHandler(log_file) file_handler.setLevel(logging.INFO) file_format = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s') file_handler.setFormatter(file_format) logger.addHandler(file_handler) return logger def print_header(title): """打印格式化的标题""" print(f"\n{Colors.BLUE}{Colors.BOLD}{'=' * 50}{Colors.ENDC}") print(f"{Colors.BLUE}{Colors.BOLD}{title.center(50)}{Colors.ENDC}") print(f"{Colors.BLUE}{Colors.BOLD}{'=' * 50}{Colors.ENDC}\n") def print_status(name, status, message=""): """打印状态信息""" if status == "OK": status_color = f"{Colors.GREEN}OK{Colors.ENDC}" elif status == "WARNING": status_color = f"{Colors.YELLOW}WARNING{Colors.ENDC}" else: status_color = f"{Colors.RED}ERROR{Colors.ENDC}" print(f"{name.ljust(30)}: {status_color} {message}") return status def check_api_status(host, port, logger): """检查API服务状态""" url = f"http://{host}:{port}/api/status" try: response = requests.get(url, timeout=5) if response.status_code == 200: data = response.json() logger.info(f"API服务正常运行 - 状态: {data.get('status', 'OK')}") return "OK", data else: logger.warning(f"API服务返回非正常状态码: {response.status_code}") return "WARNING", {"message": f"状态码: {response.status_code}"} except requests.RequestException as e: logger.error(f"API服务连接失败: {str(e)}") return "ERROR", {"message": f"连接错误: {str(e)}"} def check_smtp_status(host, port, logger): """检查SMTP服务状态""" try: with socket.create_connection((host, port), timeout=5) as sock: # 接收欢迎消息 data = sock.recv(1024).decode('utf-8') if data.startswith('220'): # 发送EHLO命令 sock.sendall(b'EHLO example.com\r\n') response = sock.recv(1024).decode('utf-8') logger.info("SMTP服务正常运行") return "OK", {"message": "SMTP服务响应正常", "response": response.split('\n')[0]} else: logger.warning(f"SMTP服务返回非标准欢迎消息: {data}") return "WARNING", {"message": f"非标准欢迎消息: {data}"} except (socket.timeout, socket.error, ConnectionRefusedError) as e: logger.error(f"SMTP服务连接失败: {str(e)}") return "ERROR", {"message": f"连接错误: {str(e)}"} def check_disk_usage(logger): """检查磁盘使用情况""" try: # 获取当前目录磁盘使用情况 disk = psutil.disk_usage('.') # 计算使用百分比 used_percent = disk.percent if used_percent < 70: status = "OK" elif used_percent < 90: status = "WARNING" else: status = "ERROR" # 格式化大小 total_gb = disk.total / (1024**3) used_gb = disk.used / (1024**3) free_gb = disk.free / (1024**3) message = f"使用率: {used_percent:.1f}% (已用: {used_gb:.1f}GB, 可用: {free_gb:.1f}GB, 总计: {total_gb:.1f}GB)" logger.info(f"磁盘使用情况: {message}") return status, { "used_percent": used_percent, "used_gb": used_gb, "free_gb": free_gb, "total_gb": total_gb, "message": message } except Exception as e: logger.error(f"检查磁盘使用情况时出错: {str(e)}") return "ERROR", {"message": f"错误: {str(e)}"} def check_email_data_size(logger): """检查邮件数据目录大小""" try: email_data_path = Path('email_data') if not email_data_path.exists(): logger.warning("邮件数据目录不存在") return "WARNING", {"message": "邮件数据目录不存在"} # 计算目录大小 total_size = 0 for path in email_data_path.rglob('*'): if path.is_file(): total_size += path.stat().st_size # 转换为MB size_mb = total_size / (1024**2) if size_mb < 100: status = "OK" elif size_mb < 500: status = "WARNING" else: status = "ERROR" message = f"邮件数据大小: {size_mb:.2f}MB" logger.info(message) return status, { "size_mb": size_mb, "message": message } except Exception as e: logger.error(f"检查邮件数据大小时出错: {str(e)}") return "ERROR", {"message": f"错误: {str(e)}"} def check_system_resources(logger): """检查系统资源使用情况""" try: # CPU使用率 cpu_percent = psutil.cpu_percent(interval=1) # 内存使用情况 memory = psutil.virtual_memory() memory_percent = memory.percent # 系统负载 if platform.system() != 'Windows': load1, load5, load15 = os.getloadavg() load_status = "OK" # 检查系统负载 cpu_count = psutil.cpu_count() if load5 > cpu_count * 0.7: load_status = "WARNING" if load5 > cpu_count: load_status = "ERROR" load_message = f"系统负载: {load1:.2f}, {load5:.2f}, {load15:.2f} (1, 5, 15 min)" else: load_status = "OK" load1, load5, load15 = 0, 0, 0 load_message = "Windows系统不支持负载检查" # CPU状态 if cpu_percent < 70: cpu_status = "OK" elif cpu_percent < 90: cpu_status = "WARNING" else: cpu_status = "ERROR" # 内存状态 if memory_percent < 70: memory_status = "OK" elif memory_percent < 90: memory_status = "WARNING" else: memory_status = "ERROR" # 综合状态 if cpu_status == "ERROR" or memory_status == "ERROR" or load_status == "ERROR": status = "ERROR" elif cpu_status == "WARNING" or memory_status == "WARNING" or load_status == "WARNING": status = "WARNING" else: status = "OK" message = ( f"CPU使用率: {cpu_percent:.1f}%, " f"内存使用率: {memory_percent:.1f}% " f"({memory.used / (1024**3):.1f}GB/{memory.total / (1024**3):.1f}GB)" ) logger.info(f"系统资源: {message}") logger.info(load_message) return status, { "cpu_percent": cpu_percent, "memory_percent": memory_percent, "memory_used_gb": memory.used / (1024**3), "memory_total_gb": memory.total / (1024**3), "load1": load1, "load5": load5, "load15": load15, "message": message, "load_message": load_message } except Exception as e: logger.error(f"检查系统资源时出错: {str(e)}") return "ERROR", {"message": f"错误: {str(e)}"} def check_log_files(logger): """检查日志文件大小和错误情况""" try: log_dir = Path('logs') if not log_dir.exists(): logger.warning("日志目录不存在") return "WARNING", {"message": "日志目录不存在"} log_files = list(log_dir.glob('*.log')) if not log_files: logger.warning("未找到日志文件") return "WARNING", {"message": "未找到日志文件"} total_size = 0 error_count = 0 large_files = [] # 检查最近的错误日志 for log_file in log_files: # 检查文件大小 size_mb = log_file.stat().st_size / (1024**2) total_size += size_mb if size_mb > 100: large_files.append((log_file.name, size_mb)) # 检查文件中的错误 try: # 只检查最后100行 if log_file.exists(): with log_file.open('r', encoding='utf-8', errors='ignore') as f: # 使用逆序读取最后100行来寻找错误 lines = f.readlines()[-100:] if len(lines := f.readlines()) > 100 else lines for line in lines: if "ERROR" in line or "Exception" in line or "Error" in line: error_count += 1 except Exception as e: logger.error(f"读取日志文件 {log_file} 时出错: {str(e)}") # 确定状态 if error_count > 10 or len(large_files) > 2: status = "ERROR" elif error_count > 0 or large_files: status = "WARNING" else: status = "OK" message = f"日志总大小: {total_size:.2f}MB, 最近错误数: {error_count}" if large_files: message += f", 大文件: {', '.join([f'{name} ({size:.1f}MB)' for name, size in large_files])}" logger.info(f"日志文件状态: {message}") return status, { "total_size_mb": total_size, "error_count": error_count, "large_files": large_files, "message": message } except Exception as e: logger.error(f"检查日志文件时出错: {str(e)}") return "ERROR", {"message": f"错误: {str(e)}"} def check_process_status(logger): """检查关联进程状态""" try: python_processes = [] for proc in psutil.process_iter(['pid', 'name', 'cmdline', 'cpu_percent', 'memory_percent', 'create_time']): try: # 过滤出Python进程,并且命令行中包含run.py if 'python' in proc.info['name'].lower() and proc.info['cmdline'] and any('run.py' in cmd for cmd in proc.info['cmdline']): # 计算运行时间 uptime = datetime.datetime.now() - datetime.datetime.fromtimestamp(proc.info['create_time']) # 获取CPU和内存使用情况 cpu_percent = proc.info['cpu_percent'] memory_percent = proc.info['memory_percent'] python_processes.append({ 'pid': proc.info['pid'], 'cmdline': ' '.join(proc.info['cmdline']), 'cpu_percent': cpu_percent, 'memory_percent': memory_percent, 'uptime': str(uptime).split('.')[0] # 去除微秒 }) except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess): pass if not python_processes: logger.warning("未找到邮件系统相关进程") return "WARNING", {"message": "未找到邮件系统相关进程"} status = "OK" message = f"找到 {len(python_processes)} 个邮件系统相关进程" logger.info(message) # 检查是否有进程使用过高的资源 high_resource_procs = [] for proc in python_processes: if proc['cpu_percent'] > 50 or proc['memory_percent'] > 30: high_resource_procs.append(proc) status = "WARNING" if high_resource_procs: logger.warning(f"发现 {len(high_resource_procs)} 个进程使用较高资源") return status, { "processes": python_processes, "high_resource": high_resource_procs, "message": message } except Exception as e: logger.error(f"检查进程状态时出错: {str(e)}") return "ERROR", {"message": f"错误: {str(e)}"} def send_alert_email(subject, message, to_email, from_email, smtp_host, smtp_port, logger): """发送警报邮件""" try: msg = MIMEMultipart() msg['Subject'] = subject msg['From'] = from_email msg['To'] = to_email # 添加HTML内容 html_content = f"""
时间: {datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")}
主机: {report["host"]}
总体状态: {report["overall_status"]}
| 检查项 | 状态 | 详情 |
|---|---|---|
| {check_name} | {status} | {message} |