#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ 邮件系统监控脚本 此脚本用于监控邮件系统的状态,包括: - API可用性 - SMTP连接 - 磁盘使用量 - 系统资源使用情况 - 邮件队列状态 使用方法: python monitor_email_system.py [--host HOST] [--api-port API_PORT] [--smtp-port SMTP_PORT] [--alert] 选项: --host HOST 服务器主机名或IP地址 (默认: localhost) --api-port API_PORT API服务端口 (默认: 5000) --smtp-port SMTP_PORT SMTP服务端口 (默认: 25) --alert 启用警报 (通过邮件发送) --interval SECONDS 监控间隔,单位为秒 (默认: 300) --log 将结果记录到日志文件 --help 显示帮助信息 """ import os import sys import time import socket import smtplib import argparse import requests import platform import json import logging import datetime import subprocess import psutil from pathlib import Path from email.mime.text import MIMEText from email.mime.multipart import MIMEMultipart # 颜色定义(终端输出) class Colors: GREEN = '\033[92m' YELLOW = '\033[93m' RED = '\033[91m' BLUE = '\033[94m' ENDC = '\033[0m' BOLD = '\033[1m' # 配置日志 def setup_logging(log_enabled): logger = logging.getLogger('email_system_monitor') logger.setLevel(logging.INFO) # 控制台处理器 console_handler = logging.StreamHandler() console_handler.setLevel(logging.INFO) console_format = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s') console_handler.setFormatter(console_format) logger.addHandler(console_handler) # 文件处理器(如果启用) if log_enabled: log_dir = Path('logs') log_dir.mkdir(exist_ok=True) log_file = log_dir / 'email_system_monitor.log' file_handler = logging.FileHandler(log_file) file_handler.setLevel(logging.INFO) file_format = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s') file_handler.setFormatter(file_format) logger.addHandler(file_handler) return logger def print_header(title): """打印格式化的标题""" print(f"\n{Colors.BLUE}{Colors.BOLD}{'=' * 50}{Colors.ENDC}") print(f"{Colors.BLUE}{Colors.BOLD}{title.center(50)}{Colors.ENDC}") print(f"{Colors.BLUE}{Colors.BOLD}{'=' * 50}{Colors.ENDC}\n") def print_status(name, status, message=""): """打印状态信息""" if status == "OK": status_color = f"{Colors.GREEN}OK{Colors.ENDC}" elif status == "WARNING": status_color = f"{Colors.YELLOW}WARNING{Colors.ENDC}" else: status_color = f"{Colors.RED}ERROR{Colors.ENDC}" print(f"{name.ljust(30)}: {status_color} {message}") return status def check_api_status(host, port, logger): """检查API服务状态""" url = f"http://{host}:{port}/api/status" try: response = requests.get(url, timeout=5) if response.status_code == 200: data = response.json() logger.info(f"API服务正常运行 - 状态: {data.get('status', 'OK')}") return "OK", data else: logger.warning(f"API服务返回非正常状态码: {response.status_code}") return "WARNING", {"message": f"状态码: {response.status_code}"} except requests.RequestException as e: logger.error(f"API服务连接失败: {str(e)}") return "ERROR", {"message": f"连接错误: {str(e)}"} def check_smtp_status(host, port, logger): """检查SMTP服务状态""" try: with socket.create_connection((host, port), timeout=5) as sock: # 接收欢迎消息 data = sock.recv(1024).decode('utf-8') if data.startswith('220'): # 发送EHLO命令 sock.sendall(b'EHLO example.com\r\n') response = sock.recv(1024).decode('utf-8') logger.info("SMTP服务正常运行") return "OK", {"message": "SMTP服务响应正常", "response": response.split('\n')[0]} else: logger.warning(f"SMTP服务返回非标准欢迎消息: {data}") return "WARNING", {"message": f"非标准欢迎消息: {data}"} except (socket.timeout, socket.error, ConnectionRefusedError) as e: logger.error(f"SMTP服务连接失败: {str(e)}") return "ERROR", {"message": f"连接错误: {str(e)}"} def check_disk_usage(logger): """检查磁盘使用情况""" try: # 获取当前目录磁盘使用情况 disk = psutil.disk_usage('.') # 计算使用百分比 used_percent = disk.percent if used_percent < 70: status = "OK" elif used_percent < 90: status = "WARNING" else: status = "ERROR" # 格式化大小 total_gb = disk.total / (1024**3) used_gb = disk.used / (1024**3) free_gb = disk.free / (1024**3) message = f"使用率: {used_percent:.1f}% (已用: {used_gb:.1f}GB, 可用: {free_gb:.1f}GB, 总计: {total_gb:.1f}GB)" logger.info(f"磁盘使用情况: {message}") return status, { "used_percent": used_percent, "used_gb": used_gb, "free_gb": free_gb, "total_gb": total_gb, "message": message } except Exception as e: logger.error(f"检查磁盘使用情况时出错: {str(e)}") return "ERROR", {"message": f"错误: {str(e)}"} def check_email_data_size(logger): """检查邮件数据目录大小""" try: email_data_path = Path('email_data') if not email_data_path.exists(): logger.warning("邮件数据目录不存在") return "WARNING", {"message": "邮件数据目录不存在"} # 计算目录大小 total_size = 0 for path in email_data_path.rglob('*'): if path.is_file(): total_size += path.stat().st_size # 转换为MB size_mb = total_size / (1024**2) if size_mb < 100: status = "OK" elif size_mb < 500: status = "WARNING" else: status = "ERROR" message = f"邮件数据大小: {size_mb:.2f}MB" logger.info(message) return status, { "size_mb": size_mb, "message": message } except Exception as e: logger.error(f"检查邮件数据大小时出错: {str(e)}") return "ERROR", {"message": f"错误: {str(e)}"} def check_system_resources(logger): """检查系统资源使用情况""" try: # CPU使用率 cpu_percent = psutil.cpu_percent(interval=1) # 内存使用情况 memory = psutil.virtual_memory() memory_percent = memory.percent # 系统负载 if platform.system() != 'Windows': load1, load5, load15 = os.getloadavg() load_status = "OK" # 检查系统负载 cpu_count = psutil.cpu_count() if load5 > cpu_count * 0.7: load_status = "WARNING" if load5 > cpu_count: load_status = "ERROR" load_message = f"系统负载: {load1:.2f}, {load5:.2f}, {load15:.2f} (1, 5, 15 min)" else: load_status = "OK" load1, load5, load15 = 0, 0, 0 load_message = "Windows系统不支持负载检查" # CPU状态 if cpu_percent < 70: cpu_status = "OK" elif cpu_percent < 90: cpu_status = "WARNING" else: cpu_status = "ERROR" # 内存状态 if memory_percent < 70: memory_status = "OK" elif memory_percent < 90: memory_status = "WARNING" else: memory_status = "ERROR" # 综合状态 if cpu_status == "ERROR" or memory_status == "ERROR" or load_status == "ERROR": status = "ERROR" elif cpu_status == "WARNING" or memory_status == "WARNING" or load_status == "WARNING": status = "WARNING" else: status = "OK" message = ( f"CPU使用率: {cpu_percent:.1f}%, " f"内存使用率: {memory_percent:.1f}% " f"({memory.used / (1024**3):.1f}GB/{memory.total / (1024**3):.1f}GB)" ) logger.info(f"系统资源: {message}") logger.info(load_message) return status, { "cpu_percent": cpu_percent, "memory_percent": memory_percent, "memory_used_gb": memory.used / (1024**3), "memory_total_gb": memory.total / (1024**3), "load1": load1, "load5": load5, "load15": load15, "message": message, "load_message": load_message } except Exception as e: logger.error(f"检查系统资源时出错: {str(e)}") return "ERROR", {"message": f"错误: {str(e)}"} def check_log_files(logger): """检查日志文件大小和错误情况""" try: log_dir = Path('logs') if not log_dir.exists(): logger.warning("日志目录不存在") return "WARNING", {"message": "日志目录不存在"} log_files = list(log_dir.glob('*.log')) if not log_files: logger.warning("未找到日志文件") return "WARNING", {"message": "未找到日志文件"} total_size = 0 error_count = 0 large_files = [] # 检查最近的错误日志 for log_file in log_files: # 检查文件大小 size_mb = log_file.stat().st_size / (1024**2) total_size += size_mb if size_mb > 100: large_files.append((log_file.name, size_mb)) # 检查文件中的错误 try: # 只检查最后100行 if log_file.exists(): with log_file.open('r', encoding='utf-8', errors='ignore') as f: # 使用逆序读取最后100行来寻找错误 lines = f.readlines()[-100:] if len(lines := f.readlines()) > 100 else lines for line in lines: if "ERROR" in line or "Exception" in line or "Error" in line: error_count += 1 except Exception as e: logger.error(f"读取日志文件 {log_file} 时出错: {str(e)}") # 确定状态 if error_count > 10 or len(large_files) > 2: status = "ERROR" elif error_count > 0 or large_files: status = "WARNING" else: status = "OK" message = f"日志总大小: {total_size:.2f}MB, 最近错误数: {error_count}" if large_files: message += f", 大文件: {', '.join([f'{name} ({size:.1f}MB)' for name, size in large_files])}" logger.info(f"日志文件状态: {message}") return status, { "total_size_mb": total_size, "error_count": error_count, "large_files": large_files, "message": message } except Exception as e: logger.error(f"检查日志文件时出错: {str(e)}") return "ERROR", {"message": f"错误: {str(e)}"} def check_process_status(logger): """检查关联进程状态""" try: python_processes = [] for proc in psutil.process_iter(['pid', 'name', 'cmdline', 'cpu_percent', 'memory_percent', 'create_time']): try: # 过滤出Python进程,并且命令行中包含run.py if 'python' in proc.info['name'].lower() and proc.info['cmdline'] and any('run.py' in cmd for cmd in proc.info['cmdline']): # 计算运行时间 uptime = datetime.datetime.now() - datetime.datetime.fromtimestamp(proc.info['create_time']) # 获取CPU和内存使用情况 cpu_percent = proc.info['cpu_percent'] memory_percent = proc.info['memory_percent'] python_processes.append({ 'pid': proc.info['pid'], 'cmdline': ' '.join(proc.info['cmdline']), 'cpu_percent': cpu_percent, 'memory_percent': memory_percent, 'uptime': str(uptime).split('.')[0] # 去除微秒 }) except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess): pass if not python_processes: logger.warning("未找到邮件系统相关进程") return "WARNING", {"message": "未找到邮件系统相关进程"} status = "OK" message = f"找到 {len(python_processes)} 个邮件系统相关进程" logger.info(message) # 检查是否有进程使用过高的资源 high_resource_procs = [] for proc in python_processes: if proc['cpu_percent'] > 50 or proc['memory_percent'] > 30: high_resource_procs.append(proc) status = "WARNING" if high_resource_procs: logger.warning(f"发现 {len(high_resource_procs)} 个进程使用较高资源") return status, { "processes": python_processes, "high_resource": high_resource_procs, "message": message } except Exception as e: logger.error(f"检查进程状态时出错: {str(e)}") return "ERROR", {"message": f"错误: {str(e)}"} def send_alert_email(subject, message, to_email, from_email, smtp_host, smtp_port, logger): """发送警报邮件""" try: msg = MIMEMultipart() msg['Subject'] = subject msg['From'] = from_email msg['To'] = to_email # 添加HTML内容 html_content = f"""

邮件系统监控警报

{message}
""" msg.attach(MIMEText(html_content, 'html')) # 连接SMTP服务器并发送 with smtplib.SMTP(smtp_host, smtp_port) as server: server.sendmail(from_email, to_email, msg.as_string()) logger.info(f"成功发送警报邮件到 {to_email}") return True except Exception as e: logger.error(f"发送警报邮件失败: {str(e)}") return False def format_duration(seconds): """格式化持续时间""" days, remainder = divmod(seconds, 86400) hours, remainder = divmod(remainder, 3600) minutes, seconds = divmod(remainder, 60) parts = [] if days > 0: parts.append(f"{int(days)}天") if hours > 0 or parts: parts.append(f"{int(hours)}小时") if minutes > 0 or parts: parts.append(f"{int(minutes)}分钟") if seconds > 0 or not parts: parts.append(f"{int(seconds)}秒") return "".join(parts) def generate_report(results, host, api_port, smtp_port, start_time, logger): """生成监控报告""" end_time = time.time() duration = end_time - start_time # 计算总体状态 overall_status = "OK" for check_name, (status, _) in results.items(): if status == "ERROR": overall_status = "ERROR" break elif status == "WARNING" and overall_status != "ERROR": overall_status = "WARNING" # 创建报告 report = { "timestamp": datetime.datetime.now().isoformat(), "host": host, "api_port": api_port, "smtp_port": smtp_port, "duration": f"{duration:.2f}秒", "overall_status": overall_status, "checks": {} } # 添加检查结果 for check_name, (status, data) in results.items(): report["checks"][check_name] = { "status": status, "data": data } # 保存报告到文件 report_dir = Path('logs') report_dir.mkdir(exist_ok=True) report_file = report_dir / f"monitor_report_{datetime.datetime.now().strftime('%Y%m%d_%H%M%S')}.json" try: with open(report_file, 'w') as f: json.dump(report, f, indent=2, ensure_ascii=False) logger.info(f"监控报告已保存到 {report_file}") except Exception as e: logger.error(f"保存监控报告失败: {str(e)}") return report, overall_status def format_html_report(report): """格式化HTML报告""" status_colors = { "OK": "green", "WARNING": "orange", "ERROR": "red" } overall_color = status_colors.get(report["overall_status"], "black") html = f"""

邮件系统监控报告

时间: {datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")}

主机: {report["host"]}

总体状态: {report["overall_status"]}

检查结果:

""" for check_name, check_data in report["checks"].items(): status = check_data["status"] color = status_colors.get(status, "black") # 格式化消息 if "message" in check_data["data"]: message = check_data["data"]["message"] else: # 尝试提取有用的信息 message_parts = [] for key, value in check_data["data"].items(): if key != "message" and not isinstance(value, dict) and not isinstance(value, list): message_parts.append(f"{key}: {value}") message = ", ".join(message_parts) html += f""" """ html += """
检查项 状态 详情
{check_name} {status} {message}
""" return html def main(): parser = argparse.ArgumentParser(description='邮件系统监控脚本') parser.add_argument('--host', default='localhost', help='服务器主机名或IP地址') parser.add_argument('--api-port', type=int, default=5000, help='API服务端口') parser.add_argument('--smtp-port', type=int, default=25, help='SMTP服务端口') parser.add_argument('--alert', action='store_true', help='启用警报') parser.add_argument('--interval', type=int, default=300, help='监控间隔,单位为秒') parser.add_argument('--log', action='store_true', help='将结果记录到日志文件') parser.add_argument('--daemon', action='store_true', help='以守护进程模式运行') args = parser.parse_args() # 设置日志 logger = setup_logging(args.log) # 警报设置 alert_settings = { 'to_email': 'admin@example.com', 'from_email': 'monitor@example.com', 'smtp_host': 'localhost', 'smtp_port': args.smtp_port } # 打印监控信息 logger.info("邮件系统监控启动") logger.info(f"主机: {args.host}, API端口: {args.api_port}, SMTP端口: {args.smtp_port}") if args.daemon: logger.info(f"守护进程模式,间隔: {args.interval}秒") # 记录启动次数 run_count = 0 # 循环运行(如果是守护进程模式) while True: run_count += 1 start_time = time.time() try: if not args.daemon or run_count == 1: print_header("邮件系统监控") # 执行检查 results = {} # 检查API状态 api_status, api_data = check_api_status(args.host, args.api_port, logger) results["API服务"] = (api_status, api_data) if not args.daemon or run_count == 1: print_status("API服务", api_status, api_data.get("message", "")) # 检查SMTP状态 smtp_status, smtp_data = check_smtp_status(args.host, args.smtp_port, logger) results["SMTP服务"] = (smtp_status, smtp_data) if not args.daemon or run_count == 1: print_status("SMTP服务", smtp_status, smtp_data.get("message", "")) # 检查磁盘使用情况 disk_status, disk_data = check_disk_usage(logger) results["磁盘使用情况"] = (disk_status, disk_data) if not args.daemon or run_count == 1: print_status("磁盘使用情况", disk_status, disk_data.get("message", "")) # 检查邮件数据大小 email_data_status, email_data_info = check_email_data_size(logger) results["邮件数据"] = (email_data_status, email_data_info) if not args.daemon or run_count == 1: print_status("邮件数据", email_data_status, email_data_info.get("message", "")) # 检查系统资源 resources_status, resources_data = check_system_resources(logger) results["系统资源"] = (resources_status, resources_data) if not args.daemon or run_count == 1: print_status("系统资源", resources_status, resources_data.get("message", "")) if platform.system() != 'Windows': print(f" {resources_data.get('load_message', '')}") # 检查日志文件 logs_status, logs_data = check_log_files(logger) results["日志文件"] = (logs_status, logs_data) if not args.daemon or run_count == 1: print_status("日志文件", logs_status, logs_data.get("message", "")) # 检查进程状态 process_status, process_data = check_process_status(logger) results["进程状态"] = (process_status, process_data) if not args.daemon or run_count == 1: print_status("进程状态", process_status, process_data.get("message", "")) # 显示进程详情 if process_data.get("processes"): print("\n进程详情:") for i, proc in enumerate(process_data["processes"]): print(f" 进程 {i+1}: PID {proc['pid']}, CPU {proc['cpu_percent']:.1f}%, " f"内存 {proc['memory_percent']:.1f}%, 运行时间 {proc['uptime']}") # 生成报告 report, overall_status = generate_report(results, args.host, args.api_port, args.smtp_port, start_time, logger) # 显示总体状态 if not args.daemon or run_count == 1: print("\n" + "-" * 50) status_msg = print_status("总体状态", overall_status) print("-" * 50) # 发送警报邮件(如果启用) if args.alert and (overall_status == "WARNING" or overall_status == "ERROR"): logger.warning(f"发现问题,正在发送警报邮件...") # 格式化HTML报告 html_report = format_html_report(report) # 发送邮件 subject = f"[{'警告' if overall_status == 'WARNING' else '错误'}] 邮件系统监控报告" send_alert_email( subject, html_report, alert_settings['to_email'], alert_settings['from_email'], alert_settings['smtp_host'], alert_settings['smtp_port'], logger ) # 如果是守护进程模式,等待下一次检查 if args.daemon: execution_time = time.time() - start_time sleep_time = max(1, args.interval - execution_time) if run_count == 1: print(f"\n守护进程模式已启动,间隔 {args.interval} 秒") print(f"下次检查将在 {format_duration(sleep_time)} 后进行...") time.sleep(sleep_time) else: break except KeyboardInterrupt: logger.info("监控被用户中断") print("\n监控已停止") break except Exception as e: logger.error(f"监控过程中发生错误: {str(e)}") if not args.daemon: raise time.sleep(args.interval) if __name__ == "__main__": main()