添加邮件解码API和自动创建邮箱功能:1. 添加/api/email接口用于获取已解码邮件内容 2. 添加/api/add_mailbox接口用于自动创建邮箱

This commit is contained in:
huangzhenpc
2025-02-26 11:47:15 +08:00
parent d523609b12
commit 34b1047481
2 changed files with 413 additions and 0 deletions

View File

@@ -18,6 +18,8 @@ def index():
# 为避免可能的文件读取问题改为从routes.py模块中导入所有路由定义
try:
from .routes import *
# 导入解码邮件路由模块
from .decoded_email_routes import *
except Exception as e:
logging.error(f"导入API路由时出错: {str(e)}")
raise

View File

@@ -0,0 +1,411 @@
from flask import request, jsonify, current_app
import os
import email
from email import policy
import re
import base64
from datetime import datetime, timedelta
from . import api_bp
from ..models import get_session, Domain, Mailbox, Email
from ..services import get_mail_store
# 创建邮箱接口
@api_bp.route('/add_mailbox', methods=['POST', 'GET'])
def add_mailbox():
"""
创建新邮箱,如果域名不存在则自动创建
查询参数GET方式或表单参数POST方式:
- email: 邮箱地址 (例如: testaa@nosqli.com)
- description: 邮箱描述 (可选)
"""
try:
# 获取参数
if request.method == 'POST':
data = request.json or {}
email_address = data.get('email')
description = data.get('description', '')
else: # GET方式
email_address = request.args.get('email')
description = request.args.get('description', '')
# 验证邮箱地址
if not email_address or '@' not in email_address:
return jsonify({
'success': False,
'error': '无效的邮箱地址',
'message': '请提供有效的邮箱地址格式为user@domain.com'
}), 400
# 解析邮箱地址
username, domain_name = email_address.split('@', 1)
# 创建或查找域名和邮箱
db = get_session()
try:
# 查找域名
domain = db.query(Domain).filter_by(name=domain_name).first()
# 如果域名不存在,创建域名
if not domain:
current_app.logger.info(f"域名 {domain_name} 不存在,开始创建")
domain = Domain(
name=domain_name,
description=f"自动创建的域名 {domain_name}",
active=True
)
db.add(domain)
db.commit()
current_app.logger.info(f"域名 {domain_name} 创建成功ID: {domain.id}")
# 查询邮箱是否已存在
mailbox = db.query(Mailbox).filter_by(
domain_id=domain.id,
address=username
).first()
# 如果邮箱已存在,返回已存在信息
if mailbox:
return jsonify({
'success': True,
'message': f'邮箱 {email_address} 已存在',
'mailbox': {
'id': mailbox.id,
'address': mailbox.address,
'domain_id': mailbox.domain_id,
'full_address': f"{mailbox.address}@{domain_name}",
'description': mailbox.description
}
}), 200
# 创建邮箱
mailbox = Mailbox(
domain_id=domain.id,
address=username,
description=description or f"自动创建的邮箱 {email_address}"
)
db.add(mailbox)
db.commit()
# 返回成功信息
return jsonify({
'success': True,
'message': f'邮箱 {email_address} 创建成功',
'mailbox': {
'id': mailbox.id,
'address': mailbox.address,
'domain_id': mailbox.domain_id,
'full_address': f"{mailbox.address}@{domain_name}",
'description': mailbox.description
}
}), 201
finally:
db.close()
except Exception as e:
current_app.logger.error(f"创建邮箱出错: {str(e)}")
return jsonify({
'success': False,
'error': '服务器错误',
'message': str(e)
}), 500
# 简化的URL路径直接通过邮箱地址获取邮件
@api_bp.route('/email', methods=['GET'])
def get_email_by_address():
"""
通过邮箱地址获取邮件的简化URL
等同于 /decoded_emails?email={email_address}&latest=0
查询参数:
- email: 邮箱地址 (必填)
- latest: 是否只返回最新的邮件 (1表示是0表示否默认0)
"""
# 重用已有的解码邮件接口
return get_decoded_emails()
@api_bp.route('/decoded_emails', methods=['GET'])
def get_decoded_emails():
"""
获取指定邮箱地址的所有邮件,并返回解码后的内容
查询参数:
- email: 邮箱地址 (例如: testaa@nosqli.com)
- latest: 是否只返回最新的邮件 (1表示是0表示否默认0)
- limit: 返回邮件数量 (默认10)
- offset: 查询起始位置 (默认0)
"""
try:
# 获取查询参数
email_address = request.args.get('email')
latest = request.args.get('latest', '0') == '1'
limit = int(request.args.get('limit', 10))
offset = int(request.args.get('offset', 0))
# 验证邮箱地址是否有效
if not email_address or '@' not in email_address:
return jsonify({
'success': False,
'error': '无效的邮箱地址',
'message': '请提供有效的邮箱地址格式为user@domain.com'
}), 400
# 解析邮箱地址
username, domain_name = email_address.split('@', 1)
# 查询数据库
db = get_session()
try:
# 查找域名
domain = db.query(Domain).filter_by(name=domain_name).first()
if not domain:
return jsonify({
'success': False,
'error': '域名不存在',
'message': f'域名 {domain_name} 不存在'
}), 404
# 查找邮箱
mailbox = db.query(Mailbox).filter_by(
domain_id=domain.id,
address=username
).first()
if not mailbox:
return jsonify({
'success': False,
'error': '邮箱不存在',
'message': f'邮箱 {email_address} 不存在'
}), 404
# 获取邮件
query = db.query(Email).filter_by(mailbox_id=mailbox.id)
# 按接收时间排序,最新的在前
query = query.order_by(Email.received_at.desc())
# 如果只要最新的一封
if latest:
emails = query.limit(1).all()
else:
emails = query.limit(limit).offset(offset).all()
# 处理结果
result_emails = []
for email_obj in emails:
# 获取原始邮件文件路径
email_file_path = os.path.join(
current_app.config.get('MAIL_STORAGE_PATH', 'email_data'),
'emails',
f'email_{email_obj.id}.eml'
)
# 解码邮件内容
decoded_email = decode_email(email_obj, email_file_path)
result_emails.append(decoded_email)
# 返回结果
return jsonify({
'success': True,
'email_address': email_address,
'total_emails': query.count(),
'emails': result_emails
}), 200
finally:
db.close()
except Exception as e:
current_app.logger.error(f"获取解码邮件出错: {str(e)}")
return jsonify({
'success': False,
'error': '服务器错误',
'message': str(e)
}), 500
@api_bp.route('/decoded_email/<int:email_id>', methods=['GET'])
def get_decoded_email_by_id(email_id):
"""获取指定ID的解码邮件内容"""
try:
db = get_session()
try:
# 获取邮件对象
email_obj = db.query(Email).filter_by(id=email_id).first()
if not email_obj:
return jsonify({
'success': False,
'error': '邮件不存在',
'message': f'ID为{email_id}的邮件不存在'
}), 404
# 获取原始邮件文件路径
email_file_path = os.path.join(
current_app.config.get('MAIL_STORAGE_PATH', 'email_data'),
'emails',
f'email_{email_obj.id}.eml'
)
# 解码邮件内容
decoded_email = decode_email(email_obj, email_file_path)
# 返回结果
return jsonify({
'success': True,
'email': decoded_email
}), 200
finally:
db.close()
except Exception as e:
current_app.logger.error(f"获取解码邮件出错: {str(e)}")
return jsonify({
'success': False,
'error': '服务器错误',
'message': str(e)
}), 500
def decode_email(email_obj, email_file_path):
"""解析并解码邮件内容"""
# 创建基本邮件信息
result = {
'id': email_obj.id,
'subject': email_obj.subject,
'sender': email_obj.sender,
'recipients': email_obj.recipients,
'received_at': email_obj.received_at.isoformat() if email_obj.received_at else None,
'read': email_obj.read,
'has_attachments': len(email_obj.attachments) > 0 if hasattr(email_obj, 'attachments') else False
}
# 从数据库中直接获取验证码
if hasattr(email_obj, 'verification_code') and email_obj.verification_code:
result['verification_code'] = email_obj.verification_code
if hasattr(email_obj, 'verification_link') and email_obj.verification_link:
result['verification_link'] = email_obj.verification_link
# 如果邮件对象有文本内容或HTML内容直接使用
if hasattr(email_obj, 'body_text') and email_obj.body_text:
result['body_text'] = email_obj.body_text
if hasattr(email_obj, 'body_html') and email_obj.body_html:
result['body_html'] = email_obj.body_html
# 如果有原始邮件文件,尝试解析
if os.path.exists(email_file_path):
try:
# 解析.eml文件
with open(email_file_path, 'r', encoding='utf-8', errors='replace') as f:
msg = email.message_from_file(f, policy=policy.default)
# 如果没有从数据库获取到内容,尝试从文件解析
if 'body_text' not in result or 'body_html' not in result:
body_text = ""
body_html = ""
# 处理多部分邮件
if msg.is_multipart():
for part in msg.iter_parts():
content_type = part.get_content_type()
if content_type == "text/plain":
try:
body_text = part.get_content()
except Exception:
payload = part.get_payload(decode=True)
if payload:
charset = part.get_content_charset() or 'utf-8'
try:
body_text = payload.decode(charset, errors='replace')
except:
body_text = payload.decode('utf-8', errors='replace')
elif content_type == "text/html":
try:
body_html = part.get_content()
except Exception:
payload = part.get_payload(decode=True)
if payload:
charset = part.get_content_charset() or 'utf-8'
try:
body_html = payload.decode(charset, errors='replace')
except:
body_html = payload.decode('utf-8', errors='replace')
else:
# 处理单部分邮件
content_type = msg.get_content_type()
try:
if content_type == "text/plain":
body_text = msg.get_content()
elif content_type == "text/html":
body_html = msg.get_content()
except Exception:
payload = msg.get_payload(decode=True)
if payload:
charset = msg.get_content_charset() or 'utf-8'
try:
decoded = payload.decode(charset, errors='replace')
if content_type == "text/plain":
body_text = decoded
elif content_type == "text/html":
body_html = decoded
except:
pass
# 如果找到了内容,添加到结果中
if body_text and 'body_text' not in result:
result['body_text'] = body_text
if body_html and 'body_html' not in result:
result['body_html'] = body_html
# 如果仍然没有提取到验证码,尝试从内容中提取
if 'verification_code' not in result:
verification_code = extract_verification_code(result.get('body_text', ''), result.get('body_html', ''))
if verification_code:
result['verification_code'] = verification_code
except Exception as e:
current_app.logger.error(f"解析邮件文件出错: {str(e)}")
return result
def extract_verification_code(body_text, body_html):
"""从邮件内容中提取验证码"""
# 首先尝试从HTML中提取
if body_html:
# 常用的验证码模式
patterns = [
r'letter-spacing:\s*\d+px[^>]*>([^<]+)<', # 特殊样式的验证码
r'<div[^>]*>(\d{6})</div>', # 6位数字验证码在div中
r'验证码[:]\s*([A-Z0-9]{4,8})', # 中文标记的验证码
r'code[^\d]+(\d{4,8})', # 英文标记的验证码
r'\b([A-Z0-9]{6})\b' # 6位大写字母或数字
]
for pattern in patterns:
matches = re.findall(pattern, body_html)
if matches:
return matches[0].strip()
# 如果HTML中没找到尝试从纯文本中提取
if body_text:
patterns = [
r'验证码[:]\s*([A-Z0-9]{4,8})', # 中文格式
r'code[^\d]+(\d{4,8})', # 英文格式
r'\b(\d{6})\b' # 6位数字
]
for pattern in patterns:
matches = re.findall(pattern, body_text)
if matches:
return matches[0].strip()
return None