279 lines
9.8 KiB
Python
279 lines
9.8 KiB
Python
#!/usr/bin/env python3
|
||
"""
|
||
邮件系统测试脚本 - 测试SMTP服务和自动创建邮箱功能
|
||
"""
|
||
|
||
import os
|
||
import sys
|
||
import time
|
||
import random
|
||
import string
|
||
import logging
|
||
import argparse
|
||
import requests
|
||
import smtplib
|
||
from email.mime.text import MIMEText
|
||
from email.mime.multipart import MIMEMultipart
|
||
from email.header import Header
|
||
from datetime import datetime
|
||
|
||
# 设置日志
|
||
logging.basicConfig(
|
||
level=logging.INFO,
|
||
format='%(asctime)s - %(levelname)s - %(message)s'
|
||
)
|
||
logger = logging.getLogger(__name__)
|
||
|
||
def generate_random_string(length=6):
|
||
"""生成随机字符串,用作验证码"""
|
||
return ''.join(random.choices(string.ascii_uppercase + string.digits, k=length))
|
||
|
||
def send_test_email(smtp_host, smtp_port, from_addr, to_addr, code=None):
|
||
"""
|
||
发送测试邮件
|
||
|
||
参数:
|
||
smtp_host: SMTP服务器地址
|
||
smtp_port: SMTP服务器端口
|
||
from_addr: 发件人地址
|
||
to_addr: 收件人地址
|
||
code: 验证码,如果为None则自动生成
|
||
|
||
返回:
|
||
(success, verification_code, error_message)
|
||
"""
|
||
if code is None:
|
||
code = generate_random_string()
|
||
|
||
try:
|
||
msg = MIMEMultipart('alternative')
|
||
msg['From'] = from_addr
|
||
msg['To'] = to_addr
|
||
msg['Subject'] = Header(f'测试邮件 - 您的验证码是 {code} - {datetime.now().strftime("%Y-%m-%d %H:%M:%S")}', 'utf-8')
|
||
|
||
# 纯文本内容
|
||
text_content = f"""
|
||
您好!
|
||
|
||
这是一封测试邮件,用于验证邮件系统是否正常工作。
|
||
|
||
您的验证码是: {code}
|
||
|
||
请勿回复此邮件。
|
||
"""
|
||
|
||
# HTML内容
|
||
html_content = f"""
|
||
<html>
|
||
<head>
|
||
<style>
|
||
body {{ font-family: Arial, sans-serif; line-height: 1.6; }}
|
||
.container {{ max-width: 600px; margin: 0 auto; padding: 20px; }}
|
||
.header {{ background-color: #f0f0f0; padding: 10px; text-align: center; }}
|
||
.content {{ padding: 20px; }}
|
||
.code {{ font-size: 24px; font-weight: bold; color: #1a73e8; letter-spacing: 5px; }}
|
||
.footer {{ font-size: 12px; color: #666; text-align: center; margin-top: 20px; }}
|
||
</style>
|
||
</head>
|
||
<body>
|
||
<div class="container">
|
||
<div class="header">
|
||
<h2>验证码通知</h2>
|
||
</div>
|
||
<div class="content">
|
||
<p>您好!</p>
|
||
<p>这是一封测试邮件,用于验证邮件系统是否正常工作。</p>
|
||
<p>您的验证码是:</p>
|
||
<div class="code">{code}</div>
|
||
<p>请在验证页面输入以上验证码。</p>
|
||
</div>
|
||
<div class="footer">
|
||
<p>此邮件由系统自动发送,请勿回复。</p>
|
||
<p>发送时间: {datetime.now().strftime("%Y-%m-%d %H:%M:%S")}</p>
|
||
</div>
|
||
</div>
|
||
</body>
|
||
</html>
|
||
"""
|
||
|
||
# 添加纯文本和HTML内容
|
||
part1 = MIMEText(text_content, 'plain', 'utf-8')
|
||
part2 = MIMEText(html_content, 'html', 'utf-8')
|
||
msg.attach(part1)
|
||
msg.attach(part2)
|
||
|
||
# 连接SMTP服务器并发送
|
||
with smtplib.SMTP(smtp_host, smtp_port) as server:
|
||
# server.set_debuglevel(1) # 调试模式
|
||
logger.info(f"连接到SMTP服务器: {smtp_host}:{smtp_port}")
|
||
server.sendmail(from_addr, to_addr, msg.as_string())
|
||
logger.info(f"邮件已发送: 从 {from_addr} 到 {to_addr}")
|
||
|
||
return True, code, None
|
||
|
||
except Exception as e:
|
||
error_message = f"发送邮件失败: {str(e)}"
|
||
logger.error(error_message)
|
||
return False, code, error_message
|
||
|
||
def check_mailbox_exists(api_base_url, email_address):
|
||
"""
|
||
检查邮箱是否存在,如果不存在则创建
|
||
|
||
参数:
|
||
api_base_url: API基础URL
|
||
email_address: 邮箱地址
|
||
|
||
返回:
|
||
(success, error_message)
|
||
"""
|
||
try:
|
||
# 尝试创建邮箱
|
||
create_url = f"{api_base_url}/api/add_mailbox?email={email_address}"
|
||
logger.info(f"检查/创建邮箱: {create_url}")
|
||
|
||
response = requests.get(create_url)
|
||
if response.status_code in [200, 201]: # 同时接受200和201状态码
|
||
result = response.json() if response.content else {'success': True}
|
||
if result.get('success', True): # 默认为True,因为201状态可能没有返回JSON
|
||
logger.info(f"邮箱检查/创建成功: {email_address}")
|
||
return True, None
|
||
else:
|
||
error_message = f"邮箱创建失败: {result.get('message')}"
|
||
logger.error(error_message)
|
||
return False, error_message
|
||
else:
|
||
error_message = f"邮箱创建请求失败: HTTP {response.status_code}"
|
||
logger.error(error_message)
|
||
return False, error_message
|
||
|
||
except Exception as e:
|
||
error_message = f"检查邮箱时发生错误: {str(e)}"
|
||
logger.error(error_message)
|
||
return False, error_message
|
||
|
||
def wait_for_email(api_base_url, email_address, verification_code, max_attempts=20, delay=3):
|
||
"""
|
||
等待并检查邮件是否收到
|
||
|
||
参数:
|
||
api_base_url: API基础URL
|
||
email_address: 邮箱地址
|
||
verification_code: 验证码
|
||
max_attempts: 最大尝试次数
|
||
delay: 每次尝试间隔时间(秒)
|
||
|
||
返回:
|
||
(received, email_data)
|
||
"""
|
||
logger.info(f"等待接收邮件,验证码: {verification_code}")
|
||
|
||
for attempt in range(1, max_attempts + 1):
|
||
try:
|
||
logger.info(f"检查邮件 (尝试 {attempt}/{max_attempts})...")
|
||
|
||
# 请求最新邮件
|
||
url = f"{api_base_url}/api/email?email={email_address}&latest=1"
|
||
response = requests.get(url)
|
||
|
||
if response.status_code == 200:
|
||
result = response.json()
|
||
|
||
if result.get('success') and len(result.get('emails', [])) > 0:
|
||
email_data = result['emails'][0]
|
||
subject = email_data.get('subject', '')
|
||
body_text = email_data.get('body_text', '')
|
||
body_html = email_data.get('body_html', '')
|
||
|
||
logger.info(f"找到邮件: {subject}")
|
||
|
||
# 检查验证码是否匹配
|
||
if verification_code in subject or verification_code in body_text or verification_code in body_html:
|
||
logger.info(f"验证码匹配成功: {verification_code}")
|
||
return True, email_data
|
||
else:
|
||
logger.info(f"验证码不匹配,继续等待...")
|
||
|
||
time.sleep(delay)
|
||
|
||
except Exception as e:
|
||
logger.error(f"检查邮件时出错: {str(e)}")
|
||
time.sleep(delay)
|
||
|
||
logger.error(f"等待邮件超时,未收到包含验证码 {verification_code} 的邮件")
|
||
return False, None
|
||
|
||
def test_email_system(api_base_url, smtp_host, smtp_port):
|
||
"""
|
||
测试邮件系统
|
||
|
||
参数:
|
||
api_base_url: API基础URL
|
||
smtp_host: SMTP服务器地址
|
||
smtp_port: SMTP服务器端口
|
||
|
||
返回:
|
||
(success, message)
|
||
"""
|
||
# 生成测试邮箱地址
|
||
timestamp = int(time.time())
|
||
test_email = f"test{timestamp}@nosqli.com"
|
||
from_email = "tester@example.com"
|
||
|
||
logger.info(f"开始测试 - 测试邮箱: {test_email}")
|
||
|
||
# 1. 确保邮箱存在
|
||
mailbox_exists, error = check_mailbox_exists(api_base_url, test_email)
|
||
if not mailbox_exists:
|
||
return False, f"创建邮箱失败: {error}"
|
||
|
||
# 2. 发送测试邮件
|
||
verification_code = generate_random_string()
|
||
send_success, code, error = send_test_email(smtp_host, smtp_port, from_email, test_email, verification_code)
|
||
if not send_success:
|
||
return False, f"发送邮件失败: {error}"
|
||
|
||
# 3. 等待并验证邮件接收
|
||
received, email_data = wait_for_email(api_base_url, test_email, verification_code)
|
||
if not received:
|
||
return False, "未能接收到发送的邮件"
|
||
|
||
# 4. 检查系统状态
|
||
try:
|
||
status_url = f"{api_base_url}/api/system_check"
|
||
response = requests.get(status_url)
|
||
if response.status_code == 200:
|
||
status_data = response.json()
|
||
if status_data.get('success'):
|
||
logger.info("系统状态检查通过")
|
||
else:
|
||
logger.warning(f"系统状态异常: {status_data.get('status')}")
|
||
except Exception as e:
|
||
logger.error(f"检查系统状态时出错: {str(e)}")
|
||
|
||
return True, f"测试成功完成! 邮箱 {test_email} 成功接收到包含验证码 {verification_code} 的邮件"
|
||
|
||
def main():
|
||
"""主函数"""
|
||
parser = argparse.ArgumentParser(description="邮件系统测试工具")
|
||
parser.add_argument('--api', default="http://localhost:5000", help="API基础URL")
|
||
parser.add_argument('--smtp', default="localhost", help="SMTP服务器地址")
|
||
parser.add_argument('--port', type=int, default=25, help="SMTP服务器端口")
|
||
|
||
args = parser.parse_args()
|
||
|
||
logger.info("======= 邮件系统测试开始 =======")
|
||
logger.info(f"API地址: {args.api}")
|
||
logger.info(f"SMTP服务器: {args.smtp}:{args.port}")
|
||
|
||
success, message = test_email_system(args.api, args.smtp, args.port)
|
||
|
||
if success:
|
||
logger.info(f"测试结果: 成功 - {message}")
|
||
sys.exit(0)
|
||
else:
|
||
logger.error(f"测试结果: 失败 - {message}")
|
||
sys.exit(1)
|
||
|
||
if __name__ == "__main__":
|
||
main() |