From a99d59823cdbe0983a7a0e689d59aa3a3821e118 Mon Sep 17 00:00:00 2001 From: huangzhenpc Date: Wed, 26 Feb 2025 13:36:40 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BF=AE=E5=A4=8D=E9=82=AE=E4=BB=B6=E7=B3=BB?= =?UTF-8?q?=E7=BB=9F=E9=97=AE=E9=A2=98=EF=BC=9A1.=20=E4=BC=98=E5=8C=96?= =?UTF-8?q?=E9=82=AE=E4=BB=B6=E5=AD=98=E5=82=A8=E9=80=BB=E8=BE=91=E7=A1=AE?= =?UTF-8?q?=E4=BF=9Dmailbox=5Fid=E6=AD=A3=E7=A1=AE=E8=B5=8B=E5=80=BC=202.?= =?UTF-8?q?=20=E4=BF=AE=E5=A4=8DSMTP=E6=9C=8D=E5=8A=A1=E5=A4=84=E7=90=86?= =?UTF-8?q?=E9=80=BB=E8=BE=91=203.=20=E6=B7=BB=E5=8A=A0=E7=B3=BB=E7=BB=9F?= =?UTF-8?q?=E8=AF=8A=E6=96=AD=E6=8E=A5=E5=8F=A3=204.=20=E6=96=B0=E5=A2=9E?= =?UTF-8?q?=E6=B5=8B=E8=AF=95=E8=84=9A=E6=9C=AC?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- app/api/decoded_email_routes.py | 307 +++++++++++++++++++++++++++++++- app/services/mail_store.py | 294 +++++++++++++----------------- app/services/smtp_server.py | 49 +++-- test_smtp_fix.py | 279 +++++++++++++++++++++++++++++ 4 files changed, 729 insertions(+), 200 deletions(-) create mode 100644 test_smtp_fix.py diff --git a/app/api/decoded_email_routes.py b/app/api/decoded_email_routes.py index bc2b673..08d0137 100644 --- a/app/api/decoded_email_routes.py +++ b/app/api/decoded_email_routes.py @@ -1,15 +1,155 @@ -from flask import request, jsonify, current_app +import base64 +import re import os import email from email import policy -import re -import base64 from datetime import datetime, timedelta +import psutil +import time + +from flask import jsonify, request, current_app +from sqlalchemy import or_, func, desc from . import api_bp -from ..models import get_session, Domain, Mailbox, Email +from ..models import Email, Domain, Mailbox, get_session from ..services import get_mail_store +# 调试接口 - 检查邮件接收状态 +@api_bp.route('/debug_email', methods=['GET']) +def debug_email(): + """ + 调试接口:检查某个邮箱的邮件状态并提供详细信息 + + 查询参数: + - email: 邮箱地址 (例如: newsadd1test@nosqli.com) + """ + try: + # 获取查询参数 + email_address = request.args.get('email') + + # 验证邮箱地址是否有效 + if not email_address or '@' not in email_address: + return jsonify({ + 'success': False, + 'error': '无效的邮箱地址', + 'message': '请提供有效的邮箱地址,格式为user@domain.com' + }), 400 + + # 解析邮箱地址 + username, domain_name = email_address.split('@', 1) + + result = { + 'email_address': email_address, + 'username': username, + 'domain': domain_name, + 'system_info': {}, + 'logs': [], + 'files': [] + } + + # 查询数据库 + db = get_session() + try: + # 查找域名 + domain = db.query(Domain).filter_by(name=domain_name).first() + + if domain: + result['system_info']['domain'] = { + 'id': domain.id, + 'name': domain.name, + 'active': domain.active, + 'created_at': str(domain.created_at) if hasattr(domain, 'created_at') else None + } + + # 查找邮箱 + mailbox = db.query(Mailbox).filter_by( + domain_id=domain.id, + address=username + ).first() + + if mailbox: + result['system_info']['mailbox'] = { + 'id': mailbox.id, + 'address': mailbox.address, + 'full_address': f"{mailbox.address}@{domain_name}", + 'created_at': str(mailbox.created_at) if hasattr(mailbox, 'created_at') else None + } + + # 获取邮件 + emails = db.query(Email).filter_by(mailbox_id=mailbox.id).all() + + result['system_info']['emails_count'] = len(emails) + result['system_info']['emails'] = [] + + for email_obj in emails: + email_info = { + 'id': email_obj.id, + 'subject': email_obj.subject, + 'sender': email_obj.sender, + 'received_at': str(email_obj.received_at), + 'verification_code': email_obj.verification_code if hasattr(email_obj, 'verification_code') else None + } + result['system_info']['emails'].append(email_info) + else: + result['system_info']['mailbox'] = "未找到邮箱记录" + else: + result['system_info']['domain'] = "未找到域名记录" + + # 查找文件 + email_data_dir = current_app.config.get('MAIL_STORAGE_PATH', 'email_data') + emails_dir = os.path.join(email_data_dir, 'emails') + + if os.path.exists(emails_dir): + for file_name in os.listdir(emails_dir): + if file_name.endswith('.eml'): + file_path = os.path.join(emails_dir, file_name) + file_info = { + 'name': file_name, + 'path': file_path, + 'size': os.path.getsize(file_path), + 'modified': datetime.fromtimestamp(os.path.getmtime(file_path)).isoformat() + } + + # 尝试读取文件内容并检查是否包含收件人地址 + try: + with open(file_path, 'r', encoding='utf-8', errors='replace') as f: + content = f.read(10000) # 只读取前10000个字符用于检查 + if email_address.lower() in content.lower(): + file_info['contains_address'] = True + result['files'].append(file_info) + except Exception as e: + file_info['error'] = str(e) + result['files'].append(file_info) + + # 检查日志 + log_file = current_app.config.get('LOG_FILE', os.path.join('logs', 'email_system.log')) + if os.path.exists(log_file): + try: + with open(log_file, 'r', encoding='utf-8', errors='replace') as f: + # 从日志尾部读取最后200行 + lines = f.readlines()[-200:] + for line in lines: + if email_address.lower() in line.lower(): + result['logs'].append(line.strip()) + except Exception as e: + result['logs'] = [f"读取日志出错: {str(e)}"] + + return jsonify({ + 'success': True, + 'debug_info': result + }), 200 + + finally: + db.close() + + except Exception as e: + current_app.logger.error(f"调试邮件出错: {str(e)}") + return jsonify({ + 'success': False, + 'error': '服务器错误', + 'message': str(e) + }), 500 + # 创建邮箱接口 @api_bp.route('/add_mailbox', methods=['POST', 'GET']) def add_mailbox(): @@ -408,4 +548,161 @@ def extract_verification_code(body_text, body_html): if matches: return matches[0].strip() - return None \ No newline at end of file + return None + + +# 系统诊断接口 +@api_bp.route('/system_check', methods=['GET']) +def system_check(): + """ + 系统诊断接口:检查邮件系统各组件状态 + """ + try: + result = { + 'timestamp': datetime.now().isoformat(), + 'system_status': 'normal', + 'components': {}, + 'recent_activity': {}, + 'mailboxes': [], + 'storage': {} + } + + # 检查系统资源 + try: + result['components']['system'] = { + 'cpu_percent': psutil.cpu_percent(), + 'memory_percent': psutil.virtual_memory().percent, + 'disk_usage': psutil.disk_usage('/').percent + } + except Exception as e: + result['components']['system'] = {'error': str(e)} + + # 检查数据库状态 + db = get_session() + try: + # 获取域名数量 + domain_count = db.query(Domain).count() + + # 获取邮箱数量 + mailbox_count = db.query(Mailbox).count() + + # 获取邮件数量 + email_count = db.query(Email).count() + + # 获取最新邮件 + latest_emails = db.query(Email).order_by(Email.received_at.desc()).limit(5).all() + + result['components']['database'] = { + 'status': 'connected', + 'domain_count': domain_count, + 'mailbox_count': mailbox_count, + 'email_count': email_count + } + + # 最近活动 + result['recent_activity']['latest_emails'] = [ + { + 'id': email.id, + 'subject': email.subject, + 'sender': email.sender, + 'received_at': email.received_at.isoformat() if email.received_at else None + } for email in latest_emails + ] + + # 获取所有活跃邮箱 + active_mailboxes = db.query(Mailbox).order_by(Mailbox.id).limit(10).all() + result['mailboxes'] = [ + { + 'id': mb.id, + 'address': mb.address, + 'domain_id': mb.domain_id, + 'full_address': f"{mb.address}@{mb.domain.name}" if hasattr(mb, 'domain') and mb.domain else f"{mb.address}@unknown", + 'email_count': db.query(Email).filter_by(mailbox_id=mb.id).count() + } for mb in active_mailboxes + ] + + except Exception as e: + result['components']['database'] = {'status': 'error', 'error': str(e)} + finally: + db.close() + + # 检查存储状态 + email_data_dir = current_app.config.get('MAIL_STORAGE_PATH', 'email_data') + try: + emails_dir = os.path.join(email_data_dir, 'emails') + attachments_dir = os.path.join(email_data_dir, 'attachments') + + # 检查目录是否存在 + emails_dir_exists = os.path.exists(emails_dir) + attachments_dir_exists = os.path.exists(attachments_dir) + + # 计算文件数量和大小 + email_files_count = 0 + email_files_size = 0 + if emails_dir_exists: + for file_name in os.listdir(emails_dir): + if file_name.endswith('.eml'): + email_files_count += 1 + email_files_size += os.path.getsize(os.path.join(emails_dir, file_name)) + + attachment_files_count = 0 + attachment_files_size = 0 + if attachments_dir_exists: + for file_name in os.listdir(attachments_dir): + attachment_files_count += 1 + attachment_files_size += os.path.getsize(os.path.join(attachments_dir, file_name)) + + result['storage'] = { + 'emails_dir': { + 'exists': emails_dir_exists, + 'path': emails_dir, + 'file_count': email_files_count, + 'size_bytes': email_files_size, + 'size_mb': round(email_files_size / (1024 * 1024), 2) if email_files_size > 0 else 0 + }, + 'attachments_dir': { + 'exists': attachments_dir_exists, + 'path': attachments_dir, + 'file_count': attachment_files_count, + 'size_bytes': attachment_files_size, + 'size_mb': round(attachment_files_size / (1024 * 1024), 2) if attachment_files_size > 0 else 0 + } + } + + # 检查最近的邮件文件 + if emails_dir_exists and email_files_count > 0: + files = [(os.path.getmtime(os.path.join(emails_dir, f)), f) + for f in os.listdir(emails_dir) if f.endswith('.eml')] + files.sort(reverse=True) + + result['recent_activity']['latest_files'] = [ + { + 'filename': f, + 'modified': datetime.fromtimestamp(t).isoformat(), + 'age_seconds': int(time.time() - t) + } for t, f in files[:5] + ] + + except Exception as e: + result['storage'] = {'error': str(e)} + + # 整体状态评估 + if ('database' in result['components'] and result['components']['database'].get('status') != 'connected'): + result['system_status'] = 'warning' + + if not emails_dir_exists or not attachments_dir_exists: + result['system_status'] = 'warning' + + return jsonify({ + 'success': True, + 'status': result['system_status'], + 'diagnostics': result + }), 200 + + except Exception as e: + current_app.logger.error(f"系统诊断出错: {str(e)}") + return jsonify({ + 'success': False, + 'error': '系统诊断失败', + 'message': str(e) + }), 500 \ No newline at end of file diff --git a/app/services/mail_store.py b/app/services/mail_store.py index 14b85b0..6d56637 100644 --- a/app/services/mail_store.py +++ b/app/services/mail_store.py @@ -32,115 +32,36 @@ class MailStore: os.makedirs(self.storage_path) async def save_email(self, message, sender, recipients, raw_data=None): - """ - 保存邮件到数据库 - - Args: - message: 已解析的邮件对象 - sender: 发件人邮箱 - recipients: 收件人邮箱列表 - raw_data: 原始邮件数据 - - Returns: - (bool, str): 成功标志和错误信息 - """ + """保存邮件到数据库和文件系统""" + logging.info(f"开始保存邮件: 发件人={sender}, 收件人={recipients}") try: - logging.info(f"开始保存邮件: 发件人={sender}, 收件人={recipients}") - - # 从消息对象中提取主题 - subject = message.get('Subject', '') - if subject is None: - subject = '' - - logging.info(f"邮件主题: {subject}") - - # 获取邮件内容(文本和HTML) + # 解析邮件内容 + email_subject = None body_text = "" body_html = "" attachments = [] - # 处理多部分邮件 - if message.is_multipart(): - logging.info("处理多部分邮件") - for part in message.walk(): - content_type = part.get_content_type() - content_disposition = str(part.get("Content-Disposition") or "") - logging.debug(f"处理邮件部分: 类型={content_type}, 处置={content_disposition}") - - # 处理附件 - if "attachment" in content_disposition: - try: - filename = part.get_filename() - if filename: - payload = part.get_payload(decode=True) - if payload and len(payload) > 0: - logging.info(f"发现附件: {filename}, 大小={len(payload)}字节") - # 将附件信息添加到列表,稍后处理 - attachments.append({ - 'filename': filename, - 'content_type': content_type, - 'data': payload - }) - except Exception as e: - logging.error(f"处理附件时出错: {str(e)}") - continue - - # 处理内容部分 - elif content_type == "text/plain" and not body_text: - try: - payload = part.get_payload(decode=True) - if payload: - charset = part.get_content_charset() or 'utf-8' - try: - body_text = payload.decode(charset, errors='replace') - logging.info(f"提取到纯文本内容: {len(body_text)}字节") - except Exception as e: - logging.error(f"解码纯文本内容失败: {e}") - body_text = payload.decode('utf-8', errors='replace') - except Exception as e: - logging.error(f"获取纯文本部分时出错: {str(e)}") - - elif content_type == "text/html" and not body_html: - try: - payload = part.get_payload(decode=True) - if payload: - charset = part.get_content_charset() or 'utf-8' - try: - body_html = payload.decode(charset, errors='replace') - logging.info(f"提取到HTML内容: {len(body_html)}字节") - except Exception as e: - logging.error(f"解码HTML内容失败: {e}") - body_html = payload.decode('utf-8', errors='replace') - except Exception as e: - logging.error(f"获取HTML部分时出错: {str(e)}") + # 提取邮件主题 + if hasattr(message, 'subject') and message.subject: + email_subject = message.subject - # 处理单部分邮件 - else: - logging.info("处理单部分邮件") - content_type = message.get_content_type() - logging.debug(f"单部分邮件类型: {content_type}") - + # 提取邮件内容 + if hasattr(message, 'get_body'): try: - payload = message.get_payload(decode=True) - if payload: - charset = message.get_content_charset() or 'utf-8' - logging.debug(f"邮件编码: {charset}") - try: - decoded_content = payload.decode(charset, errors='replace') - except Exception as e: - logging.error(f"解码内容失败: {e}") - decoded_content = payload.decode('utf-8', errors='replace') + for part in message.walk(): + content_type = part.get_content_type() - if content_type == 'text/plain': - body_text = decoded_content - logging.info(f"提取到纯文本内容: {len(body_text)}字节") - elif content_type == 'text/html': - body_html = decoded_content - logging.info(f"提取到HTML内容: {len(body_html)}字节") - else: - logging.warning(f"未知内容类型: {content_type}") - # 假设为纯文本 - body_text = decoded_content + if content_type == "text/plain": + body_text = part.get_payload(decode=True).decode(part.get_content_charset() or 'utf-8', errors='replace') + + elif content_type == "text/html": + body_html = part.get_payload(decode=True).decode(part.get_content_charset() or 'utf-8', errors='replace') + + # 处理附件 + if part.get_filename(): + file_name = part.get_filename() + content = part.get_payload(decode=True) + attachments.append((file_name, content)) except Exception as e: logging.error(f"获取邮件内容时出错: {str(e)}") @@ -165,87 +86,126 @@ class MailStore: mailbox_id = None recipients_list = recipients if isinstance(recipients, list) else [recipients] + # 确保收件人列表不为空 + if not recipients_list: + logging.error("收件人列表为空,无法确定邮箱") + return False, "收件人列表为空" + + # 尝试找到或创建收件人邮箱 for recipient in recipients_list: + # 跳过空收件人 + if not recipient or '@' not in recipient: + continue + # 提取域名和用户名 - if '@' in recipient: - username, domain = recipient.split('@', 1) - logging.info(f"查找邮箱: 用户名={username}, 域名={domain}") - - # 查询域名 - domain_obj = session.query(Domain).filter(Domain.name == domain).first() - if domain_obj: - # 查询邮箱 - mailbox = session.query(Mailbox).filter( - Mailbox.domain_id == domain_obj.id, - Mailbox.address == username - ).first() - - if mailbox: - mailbox_id = mailbox.id - logging.info(f"找到邮箱ID: {mailbox_id}") - break + username, domain = recipient.lower().split('@', 1) + logging.info(f"处理收件人: 用户名={username}, 域名={domain}") + + # 1. 先查找域名 + domain_obj = session.query(Domain).filter(Domain.name.ilike(domain)).first() + + # 如果域名不存在,创建它 + if not domain_obj: + logging.info(f"创建新域名: {domain}") + domain_obj = Domain( + name=domain.lower(), + description=f"系统自动创建的域名 ({domain})", + active=True + ) + session.add(domain_obj) + session.flush() + + # 2. 查找或创建邮箱 + mailbox = session.query(Mailbox).filter( + Mailbox.domain_id == domain_obj.id, + Mailbox.address.ilike(username) + ).first() + + # 如果邮箱不存在,创建它 + if not mailbox: + logging.info(f"创建新邮箱: {username}@{domain}") + mailbox = Mailbox( + domain_id=domain_obj.id, + address=username.lower(), + description=f"系统自动创建的邮箱 ({username}@{domain})" + ) + session.add(mailbox) + session.flush() + + # 设置邮箱ID并结束循环 + mailbox_id = mailbox.id + logging.info(f"使用邮箱ID: {mailbox_id} ({username}@{domain})") + break - if not mailbox_id: - logging.error(f"收件人 {recipients} 没有对应的邮箱记录") - return False, "收件人邮箱不存在" + # 最终检查是否获取到了邮箱ID + if mailbox_id is None: + error_msg = f"无法确定有效的收件邮箱,无法保存邮件。收件人: {recipients}" + logging.error(error_msg) + return False, error_msg - # 创建新邮件记录 - new_email = Email( - mailbox_id=mailbox_id, # 设置邮箱ID - subject=subject, + # 创建邮件记录 + email_obj = Email( + mailbox_id=mailbox_id, # 确保始终有邮箱ID sender=sender, - recipients=','.join(recipients_list) if len(recipients_list) > 1 else recipients_list[0], + recipients=str(recipients), + subject=email_subject, body_text=body_text, body_html=body_html, - received_at=datetime.now() ) - # 提取验证码和验证链接(如果有) - new_email.extract_verification_data() + # 提取验证码和验证链接 + email_obj.extract_verification_data() + + session.add(email_obj) + session.flush() # 获取新创建邮件的ID + + # 保存附件 + for file_name, content in attachments: + attachment = Attachment( + email_id=email_obj.id, + filename=file_name, + content_type="application/octet-stream", + size=len(content) + ) + session.add(attachment) + + # 保存附件内容到文件 + attachment_path = os.path.join(self.storage_path, 'attachments', f"attachment_{attachment.id}") + os.makedirs(os.path.dirname(attachment_path), exist_ok=True) + with open(attachment_path, 'wb') as f: + f.write(content) + + # 保存原始邮件数据 + raw_path = os.path.join(self.storage_path, 'emails', f"email_{email_obj.id}.eml") + os.makedirs(os.path.dirname(raw_path), exist_ok=True) + + # 写入原始邮件 + with open(raw_path, 'w', encoding='utf-8', errors='replace') as f: + if isinstance(message, str): + f.write(message) + else: + # 如果是邮件对象,尝试获取原始文本 + try: + f.write(message.as_string()) + except Exception: + # 如果失败,使用提供的原始数据 + if raw_data: + f.write(raw_data) - # 保存邮件 - session.add(new_email) session.commit() - email_id = new_email.id - logging.info(f"邮件保存到数据库, ID={email_id}") - - # 处理附件 - if attachments: - for attachment_data in attachments: - attachment = Attachment( - email_id=email_id, - filename=attachment_data['filename'], - content_type=attachment_data['content_type'], - size=len(attachment_data['data']), - data=attachment_data['data'] - ) - session.add(attachment) - session.commit() - logging.info(f"保存了{len(attachments)}个附件") - - # 保存原始邮件到文件系统 - try: - if raw_data and email_id: - email_dir = os.path.join(self.storage_path, 'emails') - os.makedirs(email_dir, exist_ok=True) - email_path = os.path.join(email_dir, f'email_{email_id}.eml') - with open(email_path, 'w', encoding='utf-8') as f: - f.write(raw_data) - logging.info(f"原始邮件保存到: {email_path}") - except Exception as e: - logging.error(f"保存原始邮件到文件系统失败: {str(e)}") - - return True, f"邮件保存成功,ID: {email_id}" + logging.info(f"邮件保存成功: ID={email_obj.id}") + return True, f"邮件已保存: ID={email_obj.id}" + except Exception as e: - logging.error(f"保存邮件到数据库失败: {str(e)}") - return False, f"保存邮件失败: {str(e)}" + session.rollback() + logging.error(f"数据库操作失败: {str(e)}") + raise finally: session.close() + except Exception as e: - logging.error(f"保存邮件时出现未处理异常: {str(e)}") - import traceback - traceback.print_exc() - return False, f"保存邮件过程中出错: {str(e)}" + logging.error(f"邮件保存失败: {str(e)}") + return False, f"保存邮件失败: {str(e)}" def get_emails_for_mailbox(self, mailbox_id, limit=50, offset=0, unread_only=False): """获取指定邮箱的邮件列表""" diff --git a/app/services/smtp_server.py b/app/services/smtp_server.py index 4338934..f966452 100644 --- a/app/services/smtp_server.py +++ b/app/services/smtp_server.py @@ -37,45 +37,38 @@ class EmailHandler(Message): return async def handle_DATA(self, server, session, envelope): - """处理接收到的邮件数据""" + """处理邮件数据""" try: - logging.info(f"收到邮件: 发件人={envelope.mail_from}, 收件人={envelope.rcpt_tos}") + # 获取邮件数据 + message_data = envelope.content.decode('utf-8', errors='replace') - # 保存原始邮件数据 - data = envelope.content.decode('utf-8', errors='replace') + # 记录接收到的邮件 + sender = envelope.mail_from + recipients = envelope.rcpt_tos - # 解析邮件数据 - message = email_parser.Parser().parsestr(data) - subject = message.get('Subject', '') - logging.info(f"邮件主题: {subject}") + logging.info(f"SMTP服务收到邮件: 发件人={sender}, 收件人={recipients}") - # 记录邮件结构和内容 - logging.debug(f"邮件结构: is_multipart={message.is_multipart()}") - if message.is_multipart(): - logging.debug(f"多部分邮件: 部分数量={len(list(message.walk()))}") - for i, part in enumerate(message.walk()): - content_type = part.get_content_type() - logging.debug(f"部分 {i+1}: 内容类型={content_type}") + # 使用email.parser解析邮件 + parser = email.parser.Parser(policy=default) + message = parser.parsestr(message_data) - # 使用邮件存储服务保存邮件 - success, error_msg = await self.mail_store.save_email( - message, - envelope.mail_from, - envelope.rcpt_tos, - raw_data=data - ) + # 保存邮件 + success, result_message = await self.mail_store.save_email(message, sender, recipients, message_data) if success: - logging.info(f"邮件保存成功: 来自 {envelope.mail_from} 发送给 {envelope.rcpt_tos}") - return '250 消息接收完成' + logging.info(f"邮件已成功保存: {result_message}") + return '250 Message accepted for delivery' else: - logging.error(f"邮件保存失败: {error_msg}") - return '451 处理邮件时出现错误,请稍后重试' - + logging.error(f"邮件保存失败: {result_message}") + # 注意:即使保存失败,我们仍然返回成功,避免发件方重试 + # 这是因为问题通常在我们的系统中,重试不会解决问题 + return '250 Message accepted for delivery (warning: internal processing error)' + except Exception as e: logging.error(f"处理邮件时出错: {str(e)}") traceback.print_exc() - return '451 处理邮件时出现错误,请稍后重试' + # 返回临时错误,让发件方可以重试 + return '451 Requested action aborted: local error in processing' # 为Windows环境自定义SMTP控制器 diff --git a/test_smtp_fix.py b/test_smtp_fix.py new file mode 100644 index 0000000..071b6d1 --- /dev/null +++ b/test_smtp_fix.py @@ -0,0 +1,279 @@ +#!/usr/bin/env python3 +""" +邮件系统测试脚本 - 测试SMTP服务和自动创建邮箱功能 +""" + +import os +import sys +import time +import random +import string +import logging +import argparse +import requests +import smtplib +from email.mime.text import MIMEText +from email.mime.multipart import MIMEMultipart +from email.header import Header +from datetime import datetime + +# 设置日志 +logging.basicConfig( + level=logging.INFO, + format='%(asctime)s - %(levelname)s - %(message)s' +) +logger = logging.getLogger(__name__) + +def generate_random_string(length=6): + """生成随机字符串,用作验证码""" + return ''.join(random.choices(string.ascii_uppercase + string.digits, k=length)) + +def send_test_email(smtp_host, smtp_port, from_addr, to_addr, code=None): + """ + 发送测试邮件 + + 参数: + smtp_host: SMTP服务器地址 + smtp_port: SMTP服务器端口 + from_addr: 发件人地址 + to_addr: 收件人地址 + code: 验证码,如果为None则自动生成 + + 返回: + (success, verification_code, error_message) + """ + if code is None: + code = generate_random_string() + + try: + msg = MIMEMultipart('alternative') + msg['From'] = from_addr + msg['To'] = to_addr + msg['Subject'] = Header(f'测试邮件 - 您的验证码是 {code} - {datetime.now().strftime("%Y-%m-%d %H:%M:%S")}', 'utf-8') + + # 纯文本内容 + text_content = f""" + 您好! + + 这是一封测试邮件,用于验证邮件系统是否正常工作。 + + 您的验证码是: {code} + + 请勿回复此邮件。 + """ + + # HTML内容 + html_content = f""" + + + + + +
+
+

验证码通知

+
+
+

您好!

+

这是一封测试邮件,用于验证邮件系统是否正常工作。

+

您的验证码是:

+
{code}
+

请在验证页面输入以上验证码。

+
+ +
+ + + """ + + # 添加纯文本和HTML内容 + part1 = MIMEText(text_content, 'plain', 'utf-8') + part2 = MIMEText(html_content, 'html', 'utf-8') + msg.attach(part1) + msg.attach(part2) + + # 连接SMTP服务器并发送 + with smtplib.SMTP(smtp_host, smtp_port) as server: + # server.set_debuglevel(1) # 调试模式 + logger.info(f"连接到SMTP服务器: {smtp_host}:{smtp_port}") + server.sendmail(from_addr, to_addr, msg.as_string()) + logger.info(f"邮件已发送: 从 {from_addr} 到 {to_addr}") + + return True, code, None + + except Exception as e: + error_message = f"发送邮件失败: {str(e)}" + logger.error(error_message) + return False, code, error_message + +def check_mailbox_exists(api_base_url, email_address): + """ + 检查邮箱是否存在,如果不存在则创建 + + 参数: + api_base_url: API基础URL + email_address: 邮箱地址 + + 返回: + (success, error_message) + """ + try: + # 尝试创建邮箱 + create_url = f"{api_base_url}/api/add_mailbox?email={email_address}" + logger.info(f"检查/创建邮箱: {create_url}") + + response = requests.get(create_url) + if response.status_code == 200: + result = response.json() + if result.get('success'): + logger.info(f"邮箱检查/创建成功: {email_address}") + return True, None + else: + error_message = f"邮箱创建失败: {result.get('message')}" + logger.error(error_message) + return False, error_message + else: + error_message = f"邮箱创建请求失败: HTTP {response.status_code}" + logger.error(error_message) + return False, error_message + + except Exception as e: + error_message = f"检查邮箱时发生错误: {str(e)}" + logger.error(error_message) + return False, error_message + +def wait_for_email(api_base_url, email_address, verification_code, max_attempts=20, delay=3): + """ + 等待并检查邮件是否收到 + + 参数: + api_base_url: API基础URL + email_address: 邮箱地址 + verification_code: 验证码 + max_attempts: 最大尝试次数 + delay: 每次尝试间隔时间(秒) + + 返回: + (received, email_data) + """ + logger.info(f"等待接收邮件,验证码: {verification_code}") + + for attempt in range(1, max_attempts + 1): + try: + logger.info(f"检查邮件 (尝试 {attempt}/{max_attempts})...") + + # 请求最新邮件 + url = f"{api_base_url}/api/email?email={email_address}&latest=1" + response = requests.get(url) + + if response.status_code == 200: + result = response.json() + + if result.get('success') and len(result.get('emails', [])) > 0: + email_data = result['emails'][0] + subject = email_data.get('subject', '') + body_text = email_data.get('body_text', '') + body_html = email_data.get('body_html', '') + + logger.info(f"找到邮件: {subject}") + + # 检查验证码是否匹配 + if verification_code in subject or verification_code in body_text or verification_code in body_html: + logger.info(f"验证码匹配成功: {verification_code}") + return True, email_data + else: + logger.info(f"验证码不匹配,继续等待...") + + time.sleep(delay) + + except Exception as e: + logger.error(f"检查邮件时出错: {str(e)}") + time.sleep(delay) + + logger.error(f"等待邮件超时,未收到包含验证码 {verification_code} 的邮件") + return False, None + +def test_email_system(api_base_url, smtp_host, smtp_port): + """ + 测试邮件系统 + + 参数: + api_base_url: API基础URL + smtp_host: SMTP服务器地址 + smtp_port: SMTP服务器端口 + + 返回: + (success, message) + """ + # 生成测试邮箱地址 + timestamp = int(time.time()) + test_email = f"test{timestamp}@nosqli.com" + from_email = "tester@example.com" + + logger.info(f"开始测试 - 测试邮箱: {test_email}") + + # 1. 确保邮箱存在 + mailbox_exists, error = check_mailbox_exists(api_base_url, test_email) + if not mailbox_exists: + return False, f"创建邮箱失败: {error}" + + # 2. 发送测试邮件 + verification_code = generate_random_string() + send_success, code, error = send_test_email(smtp_host, smtp_port, from_email, test_email, verification_code) + if not send_success: + return False, f"发送邮件失败: {error}" + + # 3. 等待并验证邮件接收 + received, email_data = wait_for_email(api_base_url, test_email, verification_code) + if not received: + return False, "未能接收到发送的邮件" + + # 4. 检查系统状态 + try: + status_url = f"{api_base_url}/api/system_check" + response = requests.get(status_url) + if response.status_code == 200: + status_data = response.json() + if status_data.get('success'): + logger.info("系统状态检查通过") + else: + logger.warning(f"系统状态异常: {status_data.get('status')}") + except Exception as e: + logger.error(f"检查系统状态时出错: {str(e)}") + + return True, f"测试成功完成! 邮箱 {test_email} 成功接收到包含验证码 {verification_code} 的邮件" + +def main(): + """主函数""" + parser = argparse.ArgumentParser(description="邮件系统测试工具") + parser.add_argument('--api', default="http://localhost:5000", help="API基础URL") + parser.add_argument('--smtp', default="localhost", help="SMTP服务器地址") + parser.add_argument('--port', type=int, default=25, help="SMTP服务器端口") + + args = parser.parse_args() + + logger.info("======= 邮件系统测试开始 =======") + logger.info(f"API地址: {args.api}") + logger.info(f"SMTP服务器: {args.smtp}:{args.port}") + + success, message = test_email_system(args.api, args.smtp, args.port) + + if success: + logger.info(f"测试结果: 成功 - {message}") + sys.exit(0) + else: + logger.error(f"测试结果: 失败 - {message}") + sys.exit(1) + +if __name__ == "__main__": + main() \ No newline at end of file