#!/usr/bin/env python3 """ 邮件系统测试脚本 - 测试SMTP服务和自动创建邮箱功能 """ import os import sys import time import random import string import logging import argparse import requests import smtplib from email.mime.text import MIMEText from email.mime.multipart import MIMEMultipart from email.header import Header from datetime import datetime # 设置日志 logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s' ) logger = logging.getLogger(__name__) def generate_random_string(length=6): """生成随机字符串,用作验证码""" return ''.join(random.choices(string.ascii_uppercase + string.digits, k=length)) def send_test_email(smtp_host, smtp_port, from_addr, to_addr, code=None): """ 发送测试邮件 参数: smtp_host: SMTP服务器地址 smtp_port: SMTP服务器端口 from_addr: 发件人地址 to_addr: 收件人地址 code: 验证码,如果为None则自动生成 返回: (success, verification_code, error_message) """ if code is None: code = generate_random_string() try: msg = MIMEMultipart('alternative') msg['From'] = from_addr msg['To'] = to_addr msg['Subject'] = Header(f'测试邮件 - 您的验证码是 {code} - {datetime.now().strftime("%Y-%m-%d %H:%M:%S")}', 'utf-8') # 纯文本内容 text_content = f""" 您好! 这是一封测试邮件,用于验证邮件系统是否正常工作。 您的验证码是: {code} 请勿回复此邮件。 """ # HTML内容 html_content = f"""

验证码通知

您好!

这是一封测试邮件,用于验证邮件系统是否正常工作。

您的验证码是:

{code}

请在验证页面输入以上验证码。

""" # 添加纯文本和HTML内容 part1 = MIMEText(text_content, 'plain', 'utf-8') part2 = MIMEText(html_content, 'html', 'utf-8') msg.attach(part1) msg.attach(part2) # 连接SMTP服务器并发送 with smtplib.SMTP(smtp_host, smtp_port) as server: # server.set_debuglevel(1) # 调试模式 logger.info(f"连接到SMTP服务器: {smtp_host}:{smtp_port}") server.sendmail(from_addr, to_addr, msg.as_string()) logger.info(f"邮件已发送: 从 {from_addr} 到 {to_addr}") return True, code, None except Exception as e: error_message = f"发送邮件失败: {str(e)}" logger.error(error_message) return False, code, error_message def check_mailbox_exists(api_base_url, email_address): """ 检查邮箱是否存在,如果不存在则创建 参数: api_base_url: API基础URL email_address: 邮箱地址 返回: (success, error_message) """ try: # 尝试创建邮箱 create_url = f"{api_base_url}/api/add_mailbox?email={email_address}" logger.info(f"检查/创建邮箱: {create_url}") response = requests.get(create_url) if response.status_code in [200, 201]: # 同时接受200和201状态码 result = response.json() if response.content else {'success': True} if result.get('success', True): # 默认为True,因为201状态可能没有返回JSON logger.info(f"邮箱检查/创建成功: {email_address}") return True, None else: error_message = f"邮箱创建失败: {result.get('message')}" logger.error(error_message) return False, error_message else: error_message = f"邮箱创建请求失败: HTTP {response.status_code}" logger.error(error_message) return False, error_message except Exception as e: error_message = f"检查邮箱时发生错误: {str(e)}" logger.error(error_message) return False, error_message def wait_for_email(api_base_url, email_address, verification_code, max_attempts=20, delay=3): """ 等待并检查邮件是否收到 参数: api_base_url: API基础URL email_address: 邮箱地址 verification_code: 验证码 max_attempts: 最大尝试次数 delay: 每次尝试间隔时间(秒) 返回: (received, email_data) """ logger.info(f"等待接收邮件,验证码: {verification_code}") for attempt in range(1, max_attempts + 1): try: logger.info(f"检查邮件 (尝试 {attempt}/{max_attempts})...") # 请求最新邮件 url = f"{api_base_url}/api/email?email={email_address}&latest=1" response = requests.get(url) if response.status_code == 200: result = response.json() if result.get('success') and len(result.get('emails', [])) > 0: email_data = result['emails'][0] subject = email_data.get('subject', '') body_text = email_data.get('body_text', '') body_html = email_data.get('body_html', '') logger.info(f"找到邮件: {subject}") # 检查验证码是否匹配 if verification_code in subject or verification_code in body_text or verification_code in body_html: logger.info(f"验证码匹配成功: {verification_code}") return True, email_data else: logger.info(f"验证码不匹配,继续等待...") time.sleep(delay) except Exception as e: logger.error(f"检查邮件时出错: {str(e)}") time.sleep(delay) logger.error(f"等待邮件超时,未收到包含验证码 {verification_code} 的邮件") return False, None def test_email_system(api_base_url, smtp_host, smtp_port): """ 测试邮件系统 参数: api_base_url: API基础URL smtp_host: SMTP服务器地址 smtp_port: SMTP服务器端口 返回: (success, message) """ # 生成测试邮箱地址 timestamp = int(time.time()) test_email = f"test{timestamp}@nosqli.com" from_email = "tester@example.com" logger.info(f"开始测试 - 测试邮箱: {test_email}") # 1. 确保邮箱存在 mailbox_exists, error = check_mailbox_exists(api_base_url, test_email) if not mailbox_exists: return False, f"创建邮箱失败: {error}" # 2. 发送测试邮件 verification_code = generate_random_string() send_success, code, error = send_test_email(smtp_host, smtp_port, from_email, test_email, verification_code) if not send_success: return False, f"发送邮件失败: {error}" # 3. 等待并验证邮件接收 received, email_data = wait_for_email(api_base_url, test_email, verification_code) if not received: return False, "未能接收到发送的邮件" # 4. 检查系统状态 try: status_url = f"{api_base_url}/api/system_check" response = requests.get(status_url) if response.status_code == 200: status_data = response.json() if status_data.get('success'): logger.info("系统状态检查通过") else: logger.warning(f"系统状态异常: {status_data.get('status')}") except Exception as e: logger.error(f"检查系统状态时出错: {str(e)}") return True, f"测试成功完成! 邮箱 {test_email} 成功接收到包含验证码 {verification_code} 的邮件" def main(): """主函数""" parser = argparse.ArgumentParser(description="邮件系统测试工具") parser.add_argument('--api', default="http://localhost:5000", help="API基础URL") parser.add_argument('--smtp', default="localhost", help="SMTP服务器地址") parser.add_argument('--port', type=int, default=25, help="SMTP服务器端口") args = parser.parse_args() logger.info("======= 邮件系统测试开始 =======") logger.info(f"API地址: {args.api}") logger.info(f"SMTP服务器: {args.smtp}:{args.port}") success, message = test_email_system(args.api, args.smtp, args.port) if success: logger.info(f"测试结果: 成功 - {message}") sys.exit(0) else: logger.error(f"测试结果: 失败 - {message}") sys.exit(1) if __name__ == "__main__": main()