初始化提交,包含完整的邮件系统代码

This commit is contained in:
huangzhenpc
2025-02-25 19:50:00 +08:00
commit aeffc4f8b8
52 changed files with 6673 additions and 0 deletions

377
test_smtp_server.py Normal file
View File

@@ -0,0 +1,377 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
SMTP服务器测试脚本
用于验证邮件系统的发送和接收功能
"""
import smtplib
import argparse
import socket
import time
import sys
import json
import random
import string
import requests
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
from datetime import datetime
# ANSI颜色代码
class Colors:
GREEN = '\033[92m'
YELLOW = '\033[93m'
RED = '\033[91m'
BLUE = '\033[94m'
BOLD = '\033[1m'
ENDC = '\033[0m'
def generate_random_string(length=8):
"""生成随机字符串"""
return ''.join(random.choices(string.ascii_letters + string.digits, k=length))
def print_step(step, message):
"""打印测试步骤"""
print(f"\n{Colors.BLUE}{Colors.BOLD}[步骤 {step}]{Colors.ENDC} {message}")
def print_success(message):
"""打印成功消息"""
print(f"{Colors.GREEN}{message}{Colors.ENDC}")
def print_warning(message):
"""打印警告消息"""
print(f"{Colors.YELLOW}{message}{Colors.ENDC}")
def print_error(message):
"""打印错误消息"""
print(f"{Colors.RED}{message}{Colors.ENDC}")
def check_smtp_connection(host, port, timeout=5):
"""检查SMTP服务器连接"""
print_step(1, f"检查SMTP服务器连接 ({host}:{port})")
try:
# 尝试创建TCP连接
sock = socket.create_connection((host, port), timeout)
banner = sock.recv(1024).decode('utf-8', errors='ignore')
sock.close()
if banner and banner.startswith('2'):
print_success(f"SMTP服务器连接成功服务器响应: {banner.strip()}")
return True
else:
print_warning(f"SMTP服务器连接成功但响应不标准: {banner.strip()}")
return True
except socket.timeout:
print_error(f"连接超时请检查SMTP服务器是否运行在 {host}:{port}")
return False
except ConnectionRefusedError:
print_error(f"连接被拒绝请检查SMTP服务器是否运行在 {host}:{port}")
return False
except Exception as e:
print_error(f"连接失败: {str(e)}")
return False
def check_smtp_handshake(host, port):
"""测试SMTP握手过程"""
print_step(2, "测试SMTP协议握手")
try:
# 创建SMTP连接并打印通信过程
smtp = smtplib.SMTP(host, port, timeout=10)
smtp.set_debuglevel(1) # 打开调试模式
# 发送EHLO命令
_, ehlo_response = smtp.ehlo()
ehlo_success = all(code == 250 for code, _ in ehlo_response)
if ehlo_success:
print_success("SMTP握手成功服务器支持的功能:")
for code, feature in ehlo_response:
print(f" {feature.decode('utf-8')}")
else:
print_warning("SMTP握手部分成功可能有功能受限")
smtp.quit()
return ehlo_success
except Exception as e:
print_error(f"SMTP握手失败: {str(e)}")
return False
def check_api_status(api_base_url):
"""检查API服务器状态"""
print_step(3, f"检查API服务器状态 ({api_base_url})")
try:
response = requests.get(f"{api_base_url}/status", timeout=5)
response.raise_for_status()
data = response.json()
print_success("API服务器状态正常")
print(f" 状态信息: {json.dumps(data, indent=2)}")
return True
except requests.exceptions.RequestException as e:
print_error(f"API服务器连接失败: {str(e)}")
return False
def get_test_mailbox(api_base_url):
"""获取或创建测试邮箱"""
print_step(4, "获取测试邮箱")
try:
# 获取所有域名
response = requests.get(f"{api_base_url}/domains")
response.raise_for_status()
domains = response.json().get("domains", [])
if not domains:
print_error("没有可用的域名,请先创建域名")
return None
# 获取第一个域名
domain = domains[0]
domain_id = domain.get("id")
domain_name = domain.get("name")
print_success(f"使用域名: {domain_name} (ID: {domain_id})")
# 生成随机地址前缀
address_prefix = f"test-{generate_random_string(6)}"
# 创建新邮箱
mailbox_data = {
"domain_id": domain_id,
"address": address_prefix
}
response = requests.post(f"{api_base_url}/mailboxes", json=mailbox_data)
response.raise_for_status()
mailbox = response.json().get("mailbox", {})
mailbox_id = mailbox.get("id")
full_address = mailbox.get("full_address")
if not mailbox_id or not full_address:
print_error("创建邮箱失败,使用现有邮箱")
# 尝试获取现有邮箱
response = requests.get(f"{api_base_url}/mailboxes")
response.raise_for_status()
mailboxes = response.json().get("mailboxes", [])
if not mailboxes:
print_error("没有可用的邮箱")
return None
mailbox = mailboxes[0]
mailbox_id = mailbox.get("id")
full_address = mailbox.get("full_address")
print_success(f"使用邮箱: {full_address} (ID: {mailbox_id})")
return {"id": mailbox_id, "address": full_address}
except Exception as e:
print_error(f"获取邮箱失败: {str(e)}")
return None
def send_test_email(smtp_host, smtp_port, recipient, verbose=False):
"""发送测试邮件"""
print_step(5, f"发送测试邮件到 {recipient}")
# 生成随机验证码
verification_code = ''.join(random.choices(string.digits, k=6))
# 生成随机主题标识
subject_id = generate_random_string(8)
try:
# 创建邮件
msg = MIMEMultipart()
msg['From'] = 'test@example.com'
msg['To'] = recipient
msg['Subject'] = f'测试邮件 {subject_id} - 验证码 {verification_code}'
# 添加HTML内容
html_content = f"""
<html>
<body>
<div style="font-family: Arial, sans-serif; max-width: 600px; margin: 0 auto; padding: 20px; border: 1px solid #eee; border-radius: 5px;">
<h2 style="color: #333;">邮箱系统测试</h2>
<p>您好!</p>
<p>这是一封测试邮件,用于验证邮箱系统的功能。</p>
<p>您的验证码是:</p>
<div style="background-color: #f7f7f7; padding: 15px; font-size: 24px; font-weight: bold; text-align: center; letter-spacing: 5px; margin: 20px 0; border-radius: 4px;">
{verification_code}
</div>
<p>此测试邮件发送时间:{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}</p>
<p>测试ID: {subject_id}</p>
<p>请不要回复此邮件。</p>
<div style="margin-top: 30px; padding-top: 15px; border-top: 1px solid #eee; font-size: 12px; color: #999;">
此邮件由系统自动发送,用于测试目的。
</div>
</div>
</body>
</html>
"""
msg.attach(MIMEText(html_content, 'html'))
# 连接到SMTP服务器并发送
if verbose:
server = smtplib.SMTP(smtp_host, smtp_port)
server.set_debuglevel(1) # 开启详细输出
else:
server = smtplib.SMTP(smtp_host, smtp_port)
server.send_message(msg)
server.quit()
print_success(f"邮件发送成功!")
print(f" 验证码: {verification_code}")
print(f" 测试ID: {subject_id}")
return {"verification_code": verification_code, "subject_id": subject_id}
except Exception as e:
print_error(f"邮件发送失败: {str(e)}")
return None
def check_email_received(api_base_url, mailbox_id, test_data, max_attempts=10, delay=2):
"""检查邮件是否被接收"""
print_step(6, "检查邮件接收状态")
verification_code = test_data.get("verification_code")
subject_id = test_data.get("subject_id")
print(f"正在查找包含验证码 {verification_code} 和测试ID {subject_id} 的邮件...")
for attempt in range(max_attempts):
try:
print(f"尝试 {attempt + 1}/{max_attempts}...")
response = requests.get(f"{api_base_url}/mailboxes/{mailbox_id}/emails")
response.raise_for_status()
emails = response.json().get("emails", [])
if not emails:
print_warning("未找到邮件,等待中...")
time.sleep(delay)
continue
# 查找包含测试ID的邮件
test_email = None
for email in emails:
subject = email.get("subject", "")
if subject_id in subject:
test_email = email
break
if test_email:
email_id = test_email.get("id")
received_at = test_email.get("received_at") or test_email.get("created_at")
print_success(f"找到测试邮件! ID: {email_id}, 接收时间: {received_at}")
# 获取详细信息
response = requests.get(f"{api_base_url}/emails/{email_id}")
response.raise_for_status()
email_detail = response.json().get("email", {})
# 尝试从邮件中提取验证码
body_html = email_detail.get("body_html", "")
extracted_code = None
if verification_code in body_html:
print_success("邮件内容中包含正确的验证码")
extracted_code = verification_code
else:
print_warning("无法在邮件内容中找到验证码")
return {
"success": True,
"email_id": email_id,
"verification_code": extracted_code,
"received_at": received_at
}
else:
print_warning("未找到包含测试ID的邮件等待中...")
time.sleep(delay)
except Exception as e:
print_error(f"检查邮件失败: {str(e)}")
time.sleep(delay)
print_error(f"{max_attempts} 次尝试后仍未找到测试邮件")
return {"success": False}
def main():
"""主函数"""
parser = argparse.ArgumentParser(description='SMTP服务器测试工具')
parser.add_argument('--smtp-host', default='localhost', help='SMTP服务器地址默认为localhost')
parser.add_argument('--smtp-port', type=int, default=3825, help='SMTP服务器端口默认为3825')
parser.add_argument('--api-url', default='http://localhost:5000/api', help='API服务器URL默认为http://localhost:5000/api')
parser.add_argument('--verbose', action='store_true', help='显示详细的SMTP通信过程')
args = parser.parse_args()
print(f"{Colors.BOLD}邮箱系统SMTP服务器测试{Colors.ENDC}")
print(f"测试时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
print(f"SMTP服务器: {args.smtp_host}:{args.smtp_port}")
print(f"API服务器: {args.api_url}")
# 检查SMTP连接
if not check_smtp_connection(args.smtp_host, args.smtp_port):
print_error("SMTP服务器连接测试失败无法继续后续测试")
return 1
# 测试SMTP握手
if not check_smtp_handshake(args.smtp_host, args.smtp_port):
print_warning("SMTP握手测试失败但将继续后续测试")
# 检查API状态
if not check_api_status(args.api_url):
print_error("API服务器连接测试失败无法继续后续测试")
return 1
# 获取测试邮箱
mailbox = get_test_mailbox(args.api_url)
if not mailbox:
print_error("无法获取测试邮箱,测试终止")
return 1
# 发送测试邮件
test_data = send_test_email(args.smtp_host, args.smtp_port, mailbox["address"], args.verbose)
if not test_data:
print_error("发送测试邮件失败,测试终止")
return 1
# 检查邮件接收
result = check_email_received(args.api_url, mailbox["id"], test_data)
# 打印测试结果摘要
print(f"\n{Colors.BOLD}测试结果摘要{Colors.ENDC}")
if result.get("success"):
print_success("邮件发送和接收测试全部成功!")
print(f" 邮箱: {mailbox['address']}")
print(f" 邮件ID: {result.get('email_id')}")
print(f" 验证码: {test_data.get('verification_code')}")
print(f" 接收时间: {result.get('received_at')}")
print(f"\n{Colors.GREEN}{Colors.BOLD}恭喜!您的邮箱系统运行正常,可以进行部署。{Colors.ENDC}")
return 0
else:
print_error("测试失败: 发送的邮件未被接收")
print(f" 邮箱: {mailbox['address']}")
print(f" 验证码: {test_data.get('verification_code')}")
print(f"\n{Colors.RED}{Colors.BOLD}请检查系统配置并解决问题后再次测试。{Colors.ENDC}")
return 1
if __name__ == "__main__":
try:
sys.exit(main())
except KeyboardInterrupt:
print("\n测试被用户中断")
sys.exit(130)
except Exception as e:
print(f"\n{Colors.RED}测试过程中出现未处理的异常: {str(e)}{Colors.ENDC}")
import traceback
traceback.print_exc()
sys.exit(1)