修复邮件系统问题:1. 优化邮件存储逻辑确保mailbox_id正确赋值 2. 修复SMTP服务处理逻辑 3. 添加系统诊断接口 4. 新增测试脚本

This commit is contained in:
huangzhenpc
2025-02-26 13:36:40 +08:00
parent 34b1047481
commit a99d59823c
4 changed files with 729 additions and 200 deletions

View File

@@ -1,15 +1,155 @@
from flask import request, jsonify, current_app
import base64
import re
import os
import email
from email import policy
import re
import base64
from datetime import datetime, timedelta
import psutil
import time
from flask import jsonify, request, current_app
from sqlalchemy import or_, func, desc
from . import api_bp
from ..models import get_session, Domain, Mailbox, Email
from ..models import Email, Domain, Mailbox, get_session
from ..services import get_mail_store
# 调试接口 - 检查邮件接收状态
@api_bp.route('/debug_email', methods=['GET'])
def debug_email():
"""
调试接口:检查某个邮箱的邮件状态并提供详细信息
查询参数:
- email: 邮箱地址 (例如: newsadd1test@nosqli.com)
"""
try:
# 获取查询参数
email_address = request.args.get('email')
# 验证邮箱地址是否有效
if not email_address or '@' not in email_address:
return jsonify({
'success': False,
'error': '无效的邮箱地址',
'message': '请提供有效的邮箱地址格式为user@domain.com'
}), 400
# 解析邮箱地址
username, domain_name = email_address.split('@', 1)
result = {
'email_address': email_address,
'username': username,
'domain': domain_name,
'system_info': {},
'logs': [],
'files': []
}
# 查询数据库
db = get_session()
try:
# 查找域名
domain = db.query(Domain).filter_by(name=domain_name).first()
if domain:
result['system_info']['domain'] = {
'id': domain.id,
'name': domain.name,
'active': domain.active,
'created_at': str(domain.created_at) if hasattr(domain, 'created_at') else None
}
# 查找邮箱
mailbox = db.query(Mailbox).filter_by(
domain_id=domain.id,
address=username
).first()
if mailbox:
result['system_info']['mailbox'] = {
'id': mailbox.id,
'address': mailbox.address,
'full_address': f"{mailbox.address}@{domain_name}",
'created_at': str(mailbox.created_at) if hasattr(mailbox, 'created_at') else None
}
# 获取邮件
emails = db.query(Email).filter_by(mailbox_id=mailbox.id).all()
result['system_info']['emails_count'] = len(emails)
result['system_info']['emails'] = []
for email_obj in emails:
email_info = {
'id': email_obj.id,
'subject': email_obj.subject,
'sender': email_obj.sender,
'received_at': str(email_obj.received_at),
'verification_code': email_obj.verification_code if hasattr(email_obj, 'verification_code') else None
}
result['system_info']['emails'].append(email_info)
else:
result['system_info']['mailbox'] = "未找到邮箱记录"
else:
result['system_info']['domain'] = "未找到域名记录"
# 查找文件
email_data_dir = current_app.config.get('MAIL_STORAGE_PATH', 'email_data')
emails_dir = os.path.join(email_data_dir, 'emails')
if os.path.exists(emails_dir):
for file_name in os.listdir(emails_dir):
if file_name.endswith('.eml'):
file_path = os.path.join(emails_dir, file_name)
file_info = {
'name': file_name,
'path': file_path,
'size': os.path.getsize(file_path),
'modified': datetime.fromtimestamp(os.path.getmtime(file_path)).isoformat()
}
# 尝试读取文件内容并检查是否包含收件人地址
try:
with open(file_path, 'r', encoding='utf-8', errors='replace') as f:
content = f.read(10000) # 只读取前10000个字符用于检查
if email_address.lower() in content.lower():
file_info['contains_address'] = True
result['files'].append(file_info)
except Exception as e:
file_info['error'] = str(e)
result['files'].append(file_info)
# 检查日志
log_file = current_app.config.get('LOG_FILE', os.path.join('logs', 'email_system.log'))
if os.path.exists(log_file):
try:
with open(log_file, 'r', encoding='utf-8', errors='replace') as f:
# 从日志尾部读取最后200行
lines = f.readlines()[-200:]
for line in lines:
if email_address.lower() in line.lower():
result['logs'].append(line.strip())
except Exception as e:
result['logs'] = [f"读取日志出错: {str(e)}"]
return jsonify({
'success': True,
'debug_info': result
}), 200
finally:
db.close()
except Exception as e:
current_app.logger.error(f"调试邮件出错: {str(e)}")
return jsonify({
'success': False,
'error': '服务器错误',
'message': str(e)
}), 500
# 创建邮箱接口
@api_bp.route('/add_mailbox', methods=['POST', 'GET'])
def add_mailbox():
@@ -408,4 +548,161 @@ def extract_verification_code(body_text, body_html):
if matches:
return matches[0].strip()
return None
return None
# 系统诊断接口
@api_bp.route('/system_check', methods=['GET'])
def system_check():
"""
系统诊断接口:检查邮件系统各组件状态
"""
try:
result = {
'timestamp': datetime.now().isoformat(),
'system_status': 'normal',
'components': {},
'recent_activity': {},
'mailboxes': [],
'storage': {}
}
# 检查系统资源
try:
result['components']['system'] = {
'cpu_percent': psutil.cpu_percent(),
'memory_percent': psutil.virtual_memory().percent,
'disk_usage': psutil.disk_usage('/').percent
}
except Exception as e:
result['components']['system'] = {'error': str(e)}
# 检查数据库状态
db = get_session()
try:
# 获取域名数量
domain_count = db.query(Domain).count()
# 获取邮箱数量
mailbox_count = db.query(Mailbox).count()
# 获取邮件数量
email_count = db.query(Email).count()
# 获取最新邮件
latest_emails = db.query(Email).order_by(Email.received_at.desc()).limit(5).all()
result['components']['database'] = {
'status': 'connected',
'domain_count': domain_count,
'mailbox_count': mailbox_count,
'email_count': email_count
}
# 最近活动
result['recent_activity']['latest_emails'] = [
{
'id': email.id,
'subject': email.subject,
'sender': email.sender,
'received_at': email.received_at.isoformat() if email.received_at else None
} for email in latest_emails
]
# 获取所有活跃邮箱
active_mailboxes = db.query(Mailbox).order_by(Mailbox.id).limit(10).all()
result['mailboxes'] = [
{
'id': mb.id,
'address': mb.address,
'domain_id': mb.domain_id,
'full_address': f"{mb.address}@{mb.domain.name}" if hasattr(mb, 'domain') and mb.domain else f"{mb.address}@unknown",
'email_count': db.query(Email).filter_by(mailbox_id=mb.id).count()
} for mb in active_mailboxes
]
except Exception as e:
result['components']['database'] = {'status': 'error', 'error': str(e)}
finally:
db.close()
# 检查存储状态
email_data_dir = current_app.config.get('MAIL_STORAGE_PATH', 'email_data')
try:
emails_dir = os.path.join(email_data_dir, 'emails')
attachments_dir = os.path.join(email_data_dir, 'attachments')
# 检查目录是否存在
emails_dir_exists = os.path.exists(emails_dir)
attachments_dir_exists = os.path.exists(attachments_dir)
# 计算文件数量和大小
email_files_count = 0
email_files_size = 0
if emails_dir_exists:
for file_name in os.listdir(emails_dir):
if file_name.endswith('.eml'):
email_files_count += 1
email_files_size += os.path.getsize(os.path.join(emails_dir, file_name))
attachment_files_count = 0
attachment_files_size = 0
if attachments_dir_exists:
for file_name in os.listdir(attachments_dir):
attachment_files_count += 1
attachment_files_size += os.path.getsize(os.path.join(attachments_dir, file_name))
result['storage'] = {
'emails_dir': {
'exists': emails_dir_exists,
'path': emails_dir,
'file_count': email_files_count,
'size_bytes': email_files_size,
'size_mb': round(email_files_size / (1024 * 1024), 2) if email_files_size > 0 else 0
},
'attachments_dir': {
'exists': attachments_dir_exists,
'path': attachments_dir,
'file_count': attachment_files_count,
'size_bytes': attachment_files_size,
'size_mb': round(attachment_files_size / (1024 * 1024), 2) if attachment_files_size > 0 else 0
}
}
# 检查最近的邮件文件
if emails_dir_exists and email_files_count > 0:
files = [(os.path.getmtime(os.path.join(emails_dir, f)), f)
for f in os.listdir(emails_dir) if f.endswith('.eml')]
files.sort(reverse=True)
result['recent_activity']['latest_files'] = [
{
'filename': f,
'modified': datetime.fromtimestamp(t).isoformat(),
'age_seconds': int(time.time() - t)
} for t, f in files[:5]
]
except Exception as e:
result['storage'] = {'error': str(e)}
# 整体状态评估
if ('database' in result['components'] and result['components']['database'].get('status') != 'connected'):
result['system_status'] = 'warning'
if not emails_dir_exists or not attachments_dir_exists:
result['system_status'] = 'warning'
return jsonify({
'success': True,
'status': result['system_status'],
'diagnostics': result
}), 200
except Exception as e:
current_app.logger.error(f"系统诊断出错: {str(e)}")
return jsonify({
'success': False,
'error': '系统诊断失败',
'message': str(e)
}), 500

View File

@@ -32,115 +32,36 @@ class MailStore:
os.makedirs(self.storage_path)
async def save_email(self, message, sender, recipients, raw_data=None):
"""
保存邮件到数据库
Args:
message: 已解析的邮件对象
sender: 发件人邮箱
recipients: 收件人邮箱列表
raw_data: 原始邮件数据
Returns:
(bool, str): 成功标志和错误信息
"""
"""保存邮件到数据库和文件系统"""
logging.info(f"开始保存邮件: 发件人={sender}, 收件人={recipients}")
try:
logging.info(f"开始保存邮件: 发件人={sender}, 收件人={recipients}")
# 从消息对象中提取主题
subject = message.get('Subject', '')
if subject is None:
subject = ''
logging.info(f"邮件主题: {subject}")
# 获取邮件内容文本和HTML
# 解析邮件内容
email_subject = None
body_text = ""
body_html = ""
attachments = []
# 处理多部分邮件
if message.is_multipart():
logging.info("处理多部分邮件")
for part in message.walk():
content_type = part.get_content_type()
content_disposition = str(part.get("Content-Disposition") or "")
logging.debug(f"处理邮件部分: 类型={content_type}, 处置={content_disposition}")
# 处理附件
if "attachment" in content_disposition:
try:
filename = part.get_filename()
if filename:
payload = part.get_payload(decode=True)
if payload and len(payload) > 0:
logging.info(f"发现附件: {filename}, 大小={len(payload)}字节")
# 将附件信息添加到列表,稍后处理
attachments.append({
'filename': filename,
'content_type': content_type,
'data': payload
})
except Exception as e:
logging.error(f"处理附件时出错: {str(e)}")
continue
# 处理内容部分
elif content_type == "text/plain" and not body_text:
try:
payload = part.get_payload(decode=True)
if payload:
charset = part.get_content_charset() or 'utf-8'
try:
body_text = payload.decode(charset, errors='replace')
logging.info(f"提取到纯文本内容: {len(body_text)}字节")
except Exception as e:
logging.error(f"解码纯文本内容失败: {e}")
body_text = payload.decode('utf-8', errors='replace')
except Exception as e:
logging.error(f"获取纯文本部分时出错: {str(e)}")
elif content_type == "text/html" and not body_html:
try:
payload = part.get_payload(decode=True)
if payload:
charset = part.get_content_charset() or 'utf-8'
try:
body_html = payload.decode(charset, errors='replace')
logging.info(f"提取到HTML内容: {len(body_html)}字节")
except Exception as e:
logging.error(f"解码HTML内容失败: {e}")
body_html = payload.decode('utf-8', errors='replace')
except Exception as e:
logging.error(f"获取HTML部分时出错: {str(e)}")
# 提取邮件主题
if hasattr(message, 'subject') and message.subject:
email_subject = message.subject
# 处理单部分邮件
else:
logging.info("处理单部分邮件")
content_type = message.get_content_type()
logging.debug(f"单部分邮件类型: {content_type}")
# 提取邮件内容
if hasattr(message, 'get_body'):
try:
payload = message.get_payload(decode=True)
if payload:
charset = message.get_content_charset() or 'utf-8'
logging.debug(f"邮件编码: {charset}")
try:
decoded_content = payload.decode(charset, errors='replace')
except Exception as e:
logging.error(f"解码内容失败: {e}")
decoded_content = payload.decode('utf-8', errors='replace')
for part in message.walk():
content_type = part.get_content_type()
if content_type == 'text/plain':
body_text = decoded_content
logging.info(f"提取到纯文本内容: {len(body_text)}字节")
elif content_type == 'text/html':
body_html = decoded_content
logging.info(f"提取到HTML内容: {len(body_html)}字节")
else:
logging.warning(f"未知内容类型: {content_type}")
# 假设为纯文本
body_text = decoded_content
if content_type == "text/plain":
body_text = part.get_payload(decode=True).decode(part.get_content_charset() or 'utf-8', errors='replace')
elif content_type == "text/html":
body_html = part.get_payload(decode=True).decode(part.get_content_charset() or 'utf-8', errors='replace')
# 处理附件
if part.get_filename():
file_name = part.get_filename()
content = part.get_payload(decode=True)
attachments.append((file_name, content))
except Exception as e:
logging.error(f"获取邮件内容时出错: {str(e)}")
@@ -165,87 +86,126 @@ class MailStore:
mailbox_id = None
recipients_list = recipients if isinstance(recipients, list) else [recipients]
# 确保收件人列表不为空
if not recipients_list:
logging.error("收件人列表为空,无法确定邮箱")
return False, "收件人列表为空"
# 尝试找到或创建收件人邮箱
for recipient in recipients_list:
# 跳过空收件人
if not recipient or '@' not in recipient:
continue
# 提取域名和用户名
if '@' in recipient:
username, domain = recipient.split('@', 1)
logging.info(f"查找邮箱: 用户名={username}, 域名={domain}")
# 查询域名
domain_obj = session.query(Domain).filter(Domain.name == domain).first()
if domain_obj:
# 查询邮箱
mailbox = session.query(Mailbox).filter(
Mailbox.domain_id == domain_obj.id,
Mailbox.address == username
).first()
if mailbox:
mailbox_id = mailbox.id
logging.info(f"找到邮箱ID: {mailbox_id}")
break
username, domain = recipient.lower().split('@', 1)
logging.info(f"处理收件人: 用户名={username}, 域名={domain}")
# 1. 先查找域名
domain_obj = session.query(Domain).filter(Domain.name.ilike(domain)).first()
# 如果域名不存在,创建它
if not domain_obj:
logging.info(f"创建新域名: {domain}")
domain_obj = Domain(
name=domain.lower(),
description=f"系统自动创建的域名 ({domain})",
active=True
)
session.add(domain_obj)
session.flush()
# 2. 查找或创建邮箱
mailbox = session.query(Mailbox).filter(
Mailbox.domain_id == domain_obj.id,
Mailbox.address.ilike(username)
).first()
# 如果邮箱不存在,创建它
if not mailbox:
logging.info(f"创建新邮箱: {username}@{domain}")
mailbox = Mailbox(
domain_id=domain_obj.id,
address=username.lower(),
description=f"系统自动创建的邮箱 ({username}@{domain})"
)
session.add(mailbox)
session.flush()
# 设置邮箱ID并结束循环
mailbox_id = mailbox.id
logging.info(f"使用邮箱ID: {mailbox_id} ({username}@{domain})")
break
if not mailbox_id:
logging.error(f"收件人 {recipients} 没有对应的邮箱记录")
return False, "收件邮箱不存在"
# 最终检查是否获取到了邮箱ID
if mailbox_id is None:
error_msg = f"无法确定有效的收件邮箱,无法保存邮件。收件人: {recipients}"
logging.error(error_msg)
return False, error_msg
# 创建邮件记录
new_email = Email(
mailbox_id=mailbox_id, # 设置邮箱ID
subject=subject,
# 创建邮件记录
email_obj = Email(
mailbox_id=mailbox_id, # 确保始终有邮箱ID
sender=sender,
recipients=','.join(recipients_list) if len(recipients_list) > 1 else recipients_list[0],
recipients=str(recipients),
subject=email_subject,
body_text=body_text,
body_html=body_html,
received_at=datetime.now()
)
# 提取验证码和验证链接(如果有)
new_email.extract_verification_data()
# 提取验证码和验证链接
email_obj.extract_verification_data()
session.add(email_obj)
session.flush() # 获取新创建邮件的ID
# 保存附件
for file_name, content in attachments:
attachment = Attachment(
email_id=email_obj.id,
filename=file_name,
content_type="application/octet-stream",
size=len(content)
)
session.add(attachment)
# 保存附件内容到文件
attachment_path = os.path.join(self.storage_path, 'attachments', f"attachment_{attachment.id}")
os.makedirs(os.path.dirname(attachment_path), exist_ok=True)
with open(attachment_path, 'wb') as f:
f.write(content)
# 保存原始邮件数据
raw_path = os.path.join(self.storage_path, 'emails', f"email_{email_obj.id}.eml")
os.makedirs(os.path.dirname(raw_path), exist_ok=True)
# 写入原始邮件
with open(raw_path, 'w', encoding='utf-8', errors='replace') as f:
if isinstance(message, str):
f.write(message)
else:
# 如果是邮件对象,尝试获取原始文本
try:
f.write(message.as_string())
except Exception:
# 如果失败,使用提供的原始数据
if raw_data:
f.write(raw_data)
# 保存邮件
session.add(new_email)
session.commit()
email_id = new_email.id
logging.info(f"邮件保存到数据库, ID={email_id}")
# 处理附件
if attachments:
for attachment_data in attachments:
attachment = Attachment(
email_id=email_id,
filename=attachment_data['filename'],
content_type=attachment_data['content_type'],
size=len(attachment_data['data']),
data=attachment_data['data']
)
session.add(attachment)
session.commit()
logging.info(f"保存了{len(attachments)}个附件")
# 保存原始邮件到文件系统
try:
if raw_data and email_id:
email_dir = os.path.join(self.storage_path, 'emails')
os.makedirs(email_dir, exist_ok=True)
email_path = os.path.join(email_dir, f'email_{email_id}.eml')
with open(email_path, 'w', encoding='utf-8') as f:
f.write(raw_data)
logging.info(f"原始邮件保存到: {email_path}")
except Exception as e:
logging.error(f"保存原始邮件到文件系统失败: {str(e)}")
return True, f"邮件保存成功ID: {email_id}"
logging.info(f"邮件保存成功: ID={email_obj.id}")
return True, f"邮件保存: ID={email_obj.id}"
except Exception as e:
logging.error(f"保存邮件到数据库失败: {str(e)}")
return False, f"保存邮件失败: {str(e)}"
session.rollback()
logging.error(f"数据库操作失败: {str(e)}")
raise
finally:
session.close()
except Exception as e:
logging.error(f"保存邮件时出现未处理异常: {str(e)}")
import traceback
traceback.print_exc()
return False, f"保存邮件过程中出错: {str(e)}"
logging.error(f"邮件保存失败: {str(e)}")
return False, f"保存邮件失败: {str(e)}"
def get_emails_for_mailbox(self, mailbox_id, limit=50, offset=0, unread_only=False):
"""获取指定邮箱的邮件列表"""

View File

@@ -37,45 +37,38 @@ class EmailHandler(Message):
return
async def handle_DATA(self, server, session, envelope):
"""处理接收到的邮件数据"""
"""处理邮件数据"""
try:
logging.info(f"收到邮件: 发件人={envelope.mail_from}, 收件人={envelope.rcpt_tos}")
# 获取邮件数据
message_data = envelope.content.decode('utf-8', errors='replace')
# 保存原始邮件数据
data = envelope.content.decode('utf-8', errors='replace')
# 记录接收到的邮件
sender = envelope.mail_from
recipients = envelope.rcpt_tos
# 解析邮件数据
message = email_parser.Parser().parsestr(data)
subject = message.get('Subject', '')
logging.info(f"邮件主题: {subject}")
logging.info(f"SMTP服务收到邮件: 发件人={sender}, 收件人={recipients}")
# 记录邮件结构和内容
logging.debug(f"邮件结构: is_multipart={message.is_multipart()}")
if message.is_multipart():
logging.debug(f"多部分邮件: 部分数量={len(list(message.walk()))}")
for i, part in enumerate(message.walk()):
content_type = part.get_content_type()
logging.debug(f"部分 {i+1}: 内容类型={content_type}")
# 使用email.parser解析邮件
parser = email.parser.Parser(policy=default)
message = parser.parsestr(message_data)
# 使用邮件存储服务保存邮件
success, error_msg = await self.mail_store.save_email(
message,
envelope.mail_from,
envelope.rcpt_tos,
raw_data=data
)
# 保存邮件
success, result_message = await self.mail_store.save_email(message, sender, recipients, message_data)
if success:
logging.info(f"邮件保存成功: 来自 {envelope.mail_from} 发送给 {envelope.rcpt_tos}")
return '250 消息接收完成'
logging.info(f"邮件已成功保存: {result_message}")
return '250 Message accepted for delivery'
else:
logging.error(f"邮件保存失败: {error_msg}")
return '451 处理邮件时出现错误,请稍后重试'
logging.error(f"邮件保存失败: {result_message}")
# 注意:即使保存失败,我们仍然返回成功,避免发件方重试
# 这是因为问题通常在我们的系统中,重试不会解决问题
return '250 Message accepted for delivery (warning: internal processing error)'
except Exception as e:
logging.error(f"处理邮件时出错: {str(e)}")
traceback.print_exc()
return '451 处理邮件时出现错误,请稍后重试'
# 返回临时错误,让发件方可以重试
return '451 Requested action aborted: local error in processing'
# 为Windows环境自定义SMTP控制器

279
test_smtp_fix.py Normal file
View File

@@ -0,0 +1,279 @@
#!/usr/bin/env python3
"""
邮件系统测试脚本 - 测试SMTP服务和自动创建邮箱功能
"""
import os
import sys
import time
import random
import string
import logging
import argparse
import requests
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
from email.header import Header
from datetime import datetime
# 设置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
def generate_random_string(length=6):
"""生成随机字符串,用作验证码"""
return ''.join(random.choices(string.ascii_uppercase + string.digits, k=length))
def send_test_email(smtp_host, smtp_port, from_addr, to_addr, code=None):
"""
发送测试邮件
参数:
smtp_host: SMTP服务器地址
smtp_port: SMTP服务器端口
from_addr: 发件人地址
to_addr: 收件人地址
code: 验证码如果为None则自动生成
返回:
(success, verification_code, error_message)
"""
if code is None:
code = generate_random_string()
try:
msg = MIMEMultipart('alternative')
msg['From'] = from_addr
msg['To'] = to_addr
msg['Subject'] = Header(f'测试邮件 - 您的验证码是 {code} - {datetime.now().strftime("%Y-%m-%d %H:%M:%S")}', 'utf-8')
# 纯文本内容
text_content = f"""
您好!
这是一封测试邮件,用于验证邮件系统是否正常工作。
您的验证码是: {code}
请勿回复此邮件。
"""
# HTML内容
html_content = f"""
<html>
<head>
<style>
body {{ font-family: Arial, sans-serif; line-height: 1.6; }}
.container {{ max-width: 600px; margin: 0 auto; padding: 20px; }}
.header {{ background-color: #f0f0f0; padding: 10px; text-align: center; }}
.content {{ padding: 20px; }}
.code {{ font-size: 24px; font-weight: bold; color: #1a73e8; letter-spacing: 5px; }}
.footer {{ font-size: 12px; color: #666; text-align: center; margin-top: 20px; }}
</style>
</head>
<body>
<div class="container">
<div class="header">
<h2>验证码通知</h2>
</div>
<div class="content">
<p>您好!</p>
<p>这是一封测试邮件,用于验证邮件系统是否正常工作。</p>
<p>您的验证码是:</p>
<div class="code">{code}</div>
<p>请在验证页面输入以上验证码。</p>
</div>
<div class="footer">
<p>此邮件由系统自动发送,请勿回复。</p>
<p>发送时间: {datetime.now().strftime("%Y-%m-%d %H:%M:%S")}</p>
</div>
</div>
</body>
</html>
"""
# 添加纯文本和HTML内容
part1 = MIMEText(text_content, 'plain', 'utf-8')
part2 = MIMEText(html_content, 'html', 'utf-8')
msg.attach(part1)
msg.attach(part2)
# 连接SMTP服务器并发送
with smtplib.SMTP(smtp_host, smtp_port) as server:
# server.set_debuglevel(1) # 调试模式
logger.info(f"连接到SMTP服务器: {smtp_host}:{smtp_port}")
server.sendmail(from_addr, to_addr, msg.as_string())
logger.info(f"邮件已发送: 从 {from_addr}{to_addr}")
return True, code, None
except Exception as e:
error_message = f"发送邮件失败: {str(e)}"
logger.error(error_message)
return False, code, error_message
def check_mailbox_exists(api_base_url, email_address):
"""
检查邮箱是否存在,如果不存在则创建
参数:
api_base_url: API基础URL
email_address: 邮箱地址
返回:
(success, error_message)
"""
try:
# 尝试创建邮箱
create_url = f"{api_base_url}/api/add_mailbox?email={email_address}"
logger.info(f"检查/创建邮箱: {create_url}")
response = requests.get(create_url)
if response.status_code == 200:
result = response.json()
if result.get('success'):
logger.info(f"邮箱检查/创建成功: {email_address}")
return True, None
else:
error_message = f"邮箱创建失败: {result.get('message')}"
logger.error(error_message)
return False, error_message
else:
error_message = f"邮箱创建请求失败: HTTP {response.status_code}"
logger.error(error_message)
return False, error_message
except Exception as e:
error_message = f"检查邮箱时发生错误: {str(e)}"
logger.error(error_message)
return False, error_message
def wait_for_email(api_base_url, email_address, verification_code, max_attempts=20, delay=3):
"""
等待并检查邮件是否收到
参数:
api_base_url: API基础URL
email_address: 邮箱地址
verification_code: 验证码
max_attempts: 最大尝试次数
delay: 每次尝试间隔时间(秒)
返回:
(received, email_data)
"""
logger.info(f"等待接收邮件,验证码: {verification_code}")
for attempt in range(1, max_attempts + 1):
try:
logger.info(f"检查邮件 (尝试 {attempt}/{max_attempts})...")
# 请求最新邮件
url = f"{api_base_url}/api/email?email={email_address}&latest=1"
response = requests.get(url)
if response.status_code == 200:
result = response.json()
if result.get('success') and len(result.get('emails', [])) > 0:
email_data = result['emails'][0]
subject = email_data.get('subject', '')
body_text = email_data.get('body_text', '')
body_html = email_data.get('body_html', '')
logger.info(f"找到邮件: {subject}")
# 检查验证码是否匹配
if verification_code in subject or verification_code in body_text or verification_code in body_html:
logger.info(f"验证码匹配成功: {verification_code}")
return True, email_data
else:
logger.info(f"验证码不匹配,继续等待...")
time.sleep(delay)
except Exception as e:
logger.error(f"检查邮件时出错: {str(e)}")
time.sleep(delay)
logger.error(f"等待邮件超时,未收到包含验证码 {verification_code} 的邮件")
return False, None
def test_email_system(api_base_url, smtp_host, smtp_port):
"""
测试邮件系统
参数:
api_base_url: API基础URL
smtp_host: SMTP服务器地址
smtp_port: SMTP服务器端口
返回:
(success, message)
"""
# 生成测试邮箱地址
timestamp = int(time.time())
test_email = f"test{timestamp}@nosqli.com"
from_email = "tester@example.com"
logger.info(f"开始测试 - 测试邮箱: {test_email}")
# 1. 确保邮箱存在
mailbox_exists, error = check_mailbox_exists(api_base_url, test_email)
if not mailbox_exists:
return False, f"创建邮箱失败: {error}"
# 2. 发送测试邮件
verification_code = generate_random_string()
send_success, code, error = send_test_email(smtp_host, smtp_port, from_email, test_email, verification_code)
if not send_success:
return False, f"发送邮件失败: {error}"
# 3. 等待并验证邮件接收
received, email_data = wait_for_email(api_base_url, test_email, verification_code)
if not received:
return False, "未能接收到发送的邮件"
# 4. 检查系统状态
try:
status_url = f"{api_base_url}/api/system_check"
response = requests.get(status_url)
if response.status_code == 200:
status_data = response.json()
if status_data.get('success'):
logger.info("系统状态检查通过")
else:
logger.warning(f"系统状态异常: {status_data.get('status')}")
except Exception as e:
logger.error(f"检查系统状态时出错: {str(e)}")
return True, f"测试成功完成! 邮箱 {test_email} 成功接收到包含验证码 {verification_code} 的邮件"
def main():
"""主函数"""
parser = argparse.ArgumentParser(description="邮件系统测试工具")
parser.add_argument('--api', default="http://localhost:5000", help="API基础URL")
parser.add_argument('--smtp', default="localhost", help="SMTP服务器地址")
parser.add_argument('--port', type=int, default=25, help="SMTP服务器端口")
args = parser.parse_args()
logger.info("======= 邮件系统测试开始 =======")
logger.info(f"API地址: {args.api}")
logger.info(f"SMTP服务器: {args.smtp}:{args.port}")
success, message = test_email_system(args.api, args.smtp, args.port)
if success:
logger.info(f"测试结果: 成功 - {message}")
sys.exit(0)
else:
logger.error(f"测试结果: 失败 - {message}")
sys.exit(1)
if __name__ == "__main__":
main()