Files
emailsystem/app/api/decoded_email_routes.py

708 lines
27 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import base64
import re
import os
import email
from email import policy
from datetime import datetime, timedelta
import psutil
import time
from flask import jsonify, request, current_app
from sqlalchemy import or_, func, desc
from . import api_bp
from ..models import Email, Domain, Mailbox, get_session
from ..services import get_mail_store
# 调试接口 - 检查邮件接收状态
@api_bp.route('/debug_email', methods=['GET'])
def debug_email():
"""
调试接口:检查某个邮箱的邮件状态并提供详细信息
查询参数:
- email: 邮箱地址 (例如: newsadd1test@nosqli.com)
"""
try:
# 获取查询参数
email_address = request.args.get('email')
# 验证邮箱地址是否有效
if not email_address or '@' not in email_address:
return jsonify({
'success': False,
'error': '无效的邮箱地址',
'message': '请提供有效的邮箱地址格式为user@domain.com'
}), 400
# 解析邮箱地址
username, domain_name = email_address.split('@', 1)
result = {
'email_address': email_address,
'username': username,
'domain': domain_name,
'system_info': {},
'logs': [],
'files': []
}
# 查询数据库
db = get_session()
try:
# 查找域名
domain = db.query(Domain).filter_by(name=domain_name).first()
if domain:
result['system_info']['domain'] = {
'id': domain.id,
'name': domain.name,
'active': domain.active,
'created_at': str(domain.created_at) if hasattr(domain, 'created_at') else None
}
# 查找邮箱
mailbox = db.query(Mailbox).filter_by(
domain_id=domain.id,
address=username
).first()
if mailbox:
result['system_info']['mailbox'] = {
'id': mailbox.id,
'address': mailbox.address,
'full_address': f"{mailbox.address}@{domain_name}",
'created_at': str(mailbox.created_at) if hasattr(mailbox, 'created_at') else None
}
# 获取邮件
emails = db.query(Email).filter_by(mailbox_id=mailbox.id).all()
result['system_info']['emails_count'] = len(emails)
result['system_info']['emails'] = []
for email_obj in emails:
email_info = {
'id': email_obj.id,
'subject': email_obj.subject,
'sender': email_obj.sender,
'received_at': str(email_obj.received_at),
'verification_code': email_obj.verification_code if hasattr(email_obj, 'verification_code') else None
}
result['system_info']['emails'].append(email_info)
else:
result['system_info']['mailbox'] = "未找到邮箱记录"
else:
result['system_info']['domain'] = "未找到域名记录"
# 查找文件
email_data_dir = current_app.config.get('MAIL_STORAGE_PATH', 'email_data')
emails_dir = os.path.join(email_data_dir, 'emails')
if os.path.exists(emails_dir):
for file_name in os.listdir(emails_dir):
if file_name.endswith('.eml'):
file_path = os.path.join(emails_dir, file_name)
file_info = {
'name': file_name,
'path': file_path,
'size': os.path.getsize(file_path),
'modified': datetime.fromtimestamp(os.path.getmtime(file_path)).isoformat()
}
# 尝试读取文件内容并检查是否包含收件人地址
try:
with open(file_path, 'r', encoding='utf-8', errors='replace') as f:
content = f.read(10000) # 只读取前10000个字符用于检查
if email_address.lower() in content.lower():
file_info['contains_address'] = True
result['files'].append(file_info)
except Exception as e:
file_info['error'] = str(e)
result['files'].append(file_info)
# 检查日志
log_file = current_app.config.get('LOG_FILE', os.path.join('logs', 'email_system.log'))
if os.path.exists(log_file):
try:
with open(log_file, 'r', encoding='utf-8', errors='replace') as f:
# 从日志尾部读取最后200行
lines = f.readlines()[-200:]
for line in lines:
if email_address.lower() in line.lower():
result['logs'].append(line.strip())
except Exception as e:
result['logs'] = [f"读取日志出错: {str(e)}"]
return jsonify({
'success': True,
'debug_info': result
}), 200
finally:
db.close()
except Exception as e:
current_app.logger.error(f"调试邮件出错: {str(e)}")
return jsonify({
'success': False,
'error': '服务器错误',
'message': str(e)
}), 500
# 创建邮箱接口
@api_bp.route('/add_mailbox', methods=['POST', 'GET'])
def add_mailbox():
"""
创建新邮箱,如果域名不存在则自动创建
查询参数GET方式或表单参数POST方式:
- email: 邮箱地址 (例如: testaa@nosqli.com)
- description: 邮箱描述 (可选)
"""
try:
# 获取参数
if request.method == 'POST':
data = request.json or {}
email_address = data.get('email')
description = data.get('description', '')
else: # GET方式
email_address = request.args.get('email')
description = request.args.get('description', '')
# 验证邮箱地址
if not email_address or '@' not in email_address:
return jsonify({
'success': False,
'error': '无效的邮箱地址',
'message': '请提供有效的邮箱地址格式为user@domain.com'
}), 400
# 解析邮箱地址
username, domain_name = email_address.split('@', 1)
# 创建或查找域名和邮箱
db = get_session()
try:
# 查找域名
domain = db.query(Domain).filter_by(name=domain_name).first()
# 如果域名不存在,创建域名
if not domain:
current_app.logger.info(f"域名 {domain_name} 不存在,开始创建")
domain = Domain(
name=domain_name,
description=f"自动创建的域名 {domain_name}",
active=True
)
db.add(domain)
db.commit()
current_app.logger.info(f"域名 {domain_name} 创建成功ID: {domain.id}")
# 查询邮箱是否已存在
mailbox = db.query(Mailbox).filter_by(
domain_id=domain.id,
address=username
).first()
# 如果邮箱已存在,返回已存在信息
if mailbox:
return jsonify({
'success': True,
'message': f'邮箱 {email_address} 已存在',
'mailbox': {
'id': mailbox.id,
'address': mailbox.address,
'domain_id': mailbox.domain_id,
'full_address': f"{mailbox.address}@{domain_name}",
'description': mailbox.description
}
}), 200
# 创建邮箱
mailbox = Mailbox(
domain_id=domain.id,
address=username,
description=description or f"自动创建的邮箱 {email_address}"
)
db.add(mailbox)
db.commit()
# 返回成功信息
return jsonify({
'success': True,
'message': f'邮箱 {email_address} 创建成功',
'mailbox': {
'id': mailbox.id,
'address': mailbox.address,
'domain_id': mailbox.domain_id,
'full_address': f"{mailbox.address}@{domain_name}",
'description': mailbox.description
}
}), 201
finally:
db.close()
except Exception as e:
current_app.logger.error(f"创建邮箱出错: {str(e)}")
return jsonify({
'success': False,
'error': '服务器错误',
'message': str(e)
}), 500
# 简化的URL路径直接通过邮箱地址获取邮件
@api_bp.route('/email', methods=['GET'])
def get_email_by_address():
"""
通过邮箱地址获取邮件的简化URL
等同于 /decoded_emails?email={email_address}&latest=0
查询参数:
- email: 邮箱地址 (必填)
- latest: 是否只返回最新的邮件 (1表示是0表示否默认0)
"""
# 重用已有的解码邮件接口
return get_decoded_emails()
@api_bp.route('/decoded_emails', methods=['GET'])
def get_decoded_emails():
"""
获取指定邮箱地址的所有邮件,并返回解码后的内容
查询参数:
- email: 邮箱地址 (例如: testaa@nosqli.com)
- latest: 是否只返回最新的邮件 (1表示是0表示否默认0)
- limit: 返回邮件数量 (默认10)
- offset: 查询起始位置 (默认0)
"""
try:
# 获取查询参数
email_address = request.args.get('email')
latest = request.args.get('latest', '0') == '1'
limit = int(request.args.get('limit', 10))
offset = int(request.args.get('offset', 0))
# 验证邮箱地址是否有效
if not email_address or '@' not in email_address:
return jsonify({
'success': False,
'error': '无效的邮箱地址',
'message': '请提供有效的邮箱地址格式为user@domain.com'
}), 400
# 解析邮箱地址
username, domain_name = email_address.split('@', 1)
# 查询数据库
db = get_session()
try:
# 查找域名
domain = db.query(Domain).filter_by(name=domain_name).first()
if not domain:
return jsonify({
'success': False,
'error': '域名不存在',
'message': f'域名 {domain_name} 不存在'
}), 404
# 查找邮箱
mailbox = db.query(Mailbox).filter_by(
domain_id=domain.id,
address=username
).first()
if not mailbox:
return jsonify({
'success': False,
'error': '邮箱不存在',
'message': f'邮箱 {email_address} 不存在'
}), 404
# 获取邮件
query = db.query(Email).filter_by(mailbox_id=mailbox.id)
# 按接收时间排序,最新的在前
query = query.order_by(Email.received_at.desc())
# 如果只要最新的一封
if latest:
emails = query.limit(1).all()
else:
emails = query.limit(limit).offset(offset).all()
# 处理结果
result_emails = []
for email_obj in emails:
# 获取原始邮件文件路径
email_file_path = os.path.join(
current_app.config.get('MAIL_STORAGE_PATH', 'email_data'),
'emails',
f'email_{email_obj.id}.eml'
)
# 解码邮件内容
decoded_email = decode_email(email_obj, email_file_path)
result_emails.append(decoded_email)
# 返回结果
return jsonify({
'success': True,
'email_address': email_address,
'total_emails': query.count(),
'emails': result_emails
}), 200
finally:
db.close()
except Exception as e:
current_app.logger.error(f"获取解码邮件出错: {str(e)}")
return jsonify({
'success': False,
'error': '服务器错误',
'message': str(e)
}), 500
@api_bp.route('/decoded_email/<int:email_id>', methods=['GET'])
def get_decoded_email_by_id(email_id):
"""获取指定ID的解码邮件内容"""
try:
db = get_session()
try:
# 获取邮件对象
email_obj = db.query(Email).filter_by(id=email_id).first()
if not email_obj:
return jsonify({
'success': False,
'error': '邮件不存在',
'message': f'ID为{email_id}的邮件不存在'
}), 404
# 获取原始邮件文件路径
email_file_path = os.path.join(
current_app.config.get('MAIL_STORAGE_PATH', 'email_data'),
'emails',
f'email_{email_obj.id}.eml'
)
# 解码邮件内容
decoded_email = decode_email(email_obj, email_file_path)
# 返回结果
return jsonify({
'success': True,
'email': decoded_email
}), 200
finally:
db.close()
except Exception as e:
current_app.logger.error(f"获取解码邮件出错: {str(e)}")
return jsonify({
'success': False,
'error': '服务器错误',
'message': str(e)
}), 500
def decode_email(email_obj, email_file_path):
"""解析并解码邮件内容"""
# 创建基本邮件信息
result = {
'id': email_obj.id,
'subject': email_obj.subject,
'sender': email_obj.sender,
'recipients': email_obj.recipients,
'received_at': email_obj.received_at.isoformat() if email_obj.received_at else None,
'read': email_obj.read,
'has_attachments': len(email_obj.attachments) > 0 if hasattr(email_obj, 'attachments') else False
}
# 从数据库中直接获取验证码
if hasattr(email_obj, 'verification_code') and email_obj.verification_code:
result['verification_code'] = email_obj.verification_code
if hasattr(email_obj, 'verification_link') and email_obj.verification_link:
result['verification_link'] = email_obj.verification_link
# 如果邮件对象有文本内容或HTML内容直接使用
if hasattr(email_obj, 'body_text') and email_obj.body_text:
result['body_text'] = email_obj.body_text
if hasattr(email_obj, 'body_html') and email_obj.body_html:
result['body_html'] = email_obj.body_html
# 如果有原始邮件文件,尝试解析
if os.path.exists(email_file_path):
try:
# 解析.eml文件
with open(email_file_path, 'r', encoding='utf-8', errors='replace') as f:
msg = email.message_from_file(f, policy=policy.default)
# 如果没有从数据库获取到内容,尝试从文件解析
if 'body_text' not in result or 'body_html' not in result:
body_text = ""
body_html = ""
# 处理多部分邮件
if msg.is_multipart():
for part in msg.iter_parts():
content_type = part.get_content_type()
if content_type == "text/plain":
try:
body_text = part.get_content()
except Exception:
payload = part.get_payload(decode=True)
if payload:
charset = part.get_content_charset() or 'utf-8'
try:
body_text = payload.decode(charset, errors='replace')
except:
body_text = payload.decode('utf-8', errors='replace')
elif content_type == "text/html":
try:
body_html = part.get_content()
except Exception:
payload = part.get_payload(decode=True)
if payload:
charset = part.get_content_charset() or 'utf-8'
try:
body_html = payload.decode(charset, errors='replace')
except:
body_html = payload.decode('utf-8', errors='replace')
else:
# 处理单部分邮件
content_type = msg.get_content_type()
try:
if content_type == "text/plain":
body_text = msg.get_content()
elif content_type == "text/html":
body_html = msg.get_content()
except Exception:
payload = msg.get_payload(decode=True)
if payload:
charset = msg.get_content_charset() or 'utf-8'
try:
decoded = payload.decode(charset, errors='replace')
if content_type == "text/plain":
body_text = decoded
elif content_type == "text/html":
body_html = decoded
except:
pass
# 如果找到了内容,添加到结果中
if body_text and 'body_text' not in result:
result['body_text'] = body_text
if body_html and 'body_html' not in result:
result['body_html'] = body_html
# 如果仍然没有提取到验证码,尝试从内容中提取
if 'verification_code' not in result:
verification_code = extract_verification_code(result.get('body_text', ''), result.get('body_html', ''))
if verification_code:
result['verification_code'] = verification_code
except Exception as e:
current_app.logger.error(f"解析邮件文件出错: {str(e)}")
return result
def extract_verification_code(body_text, body_html):
"""从邮件内容中提取验证码"""
# 首先尝试从HTML中提取
if body_html:
# 常用的验证码模式
patterns = [
r'letter-spacing:\s*\d+px[^>]*>([^<]+)<', # 特殊样式的验证码
r'<div[^>]*>(\d{6})</div>', # 6位数字验证码在div中
r'验证码[:]\s*([A-Z0-9]{4,8})', # 中文标记的验证码
r'code[^\d]+(\d{4,8})', # 英文标记的验证码
r'\b([A-Z0-9]{6})\b' # 6位大写字母或数字
]
for pattern in patterns:
matches = re.findall(pattern, body_html)
if matches:
return matches[0].strip()
# 如果HTML中没找到尝试从纯文本中提取
if body_text:
patterns = [
r'验证码[:]\s*([A-Z0-9]{4,8})', # 中文格式
r'code[^\d]+(\d{4,8})', # 英文格式
r'\b(\d{6})\b' # 6位数字
]
for pattern in patterns:
matches = re.findall(pattern, body_text)
if matches:
return matches[0].strip()
return None
# 系统诊断接口
@api_bp.route('/system_check', methods=['GET'])
def system_check():
"""
系统诊断接口:检查邮件系统各组件状态
"""
try:
result = {
'timestamp': datetime.now().isoformat(),
'system_status': 'normal',
'components': {},
'recent_activity': {},
'mailboxes': [],
'storage': {}
}
# 检查系统资源
try:
result['components']['system'] = {
'cpu_percent': psutil.cpu_percent(),
'memory_percent': psutil.virtual_memory().percent,
'disk_usage': psutil.disk_usage('/').percent
}
except Exception as e:
result['components']['system'] = {'error': str(e)}
# 检查数据库状态
db = get_session()
try:
# 获取域名数量
domain_count = db.query(Domain).count()
# 获取邮箱数量
mailbox_count = db.query(Mailbox).count()
# 获取邮件数量
email_count = db.query(Email).count()
# 获取最新邮件
latest_emails = db.query(Email).order_by(Email.received_at.desc()).limit(5).all()
result['components']['database'] = {
'status': 'connected',
'domain_count': domain_count,
'mailbox_count': mailbox_count,
'email_count': email_count
}
# 最近活动
result['recent_activity']['latest_emails'] = [
{
'id': email.id,
'subject': email.subject,
'sender': email.sender,
'received_at': email.received_at.isoformat() if email.received_at else None
} for email in latest_emails
]
# 获取所有活跃邮箱
active_mailboxes = db.query(Mailbox).order_by(Mailbox.id).limit(10).all()
result['mailboxes'] = [
{
'id': mb.id,
'address': mb.address,
'domain_id': mb.domain_id,
'full_address': f"{mb.address}@{mb.domain.name}" if hasattr(mb, 'domain') and mb.domain else f"{mb.address}@unknown",
'email_count': db.query(Email).filter_by(mailbox_id=mb.id).count()
} for mb in active_mailboxes
]
except Exception as e:
result['components']['database'] = {'status': 'error', 'error': str(e)}
finally:
db.close()
# 检查存储状态
email_data_dir = current_app.config.get('MAIL_STORAGE_PATH', 'email_data')
try:
emails_dir = os.path.join(email_data_dir, 'emails')
attachments_dir = os.path.join(email_data_dir, 'attachments')
# 检查目录是否存在
emails_dir_exists = os.path.exists(emails_dir)
attachments_dir_exists = os.path.exists(attachments_dir)
# 计算文件数量和大小
email_files_count = 0
email_files_size = 0
if emails_dir_exists:
for file_name in os.listdir(emails_dir):
if file_name.endswith('.eml'):
email_files_count += 1
email_files_size += os.path.getsize(os.path.join(emails_dir, file_name))
attachment_files_count = 0
attachment_files_size = 0
if attachments_dir_exists:
for file_name in os.listdir(attachments_dir):
attachment_files_count += 1
attachment_files_size += os.path.getsize(os.path.join(attachments_dir, file_name))
result['storage'] = {
'emails_dir': {
'exists': emails_dir_exists,
'path': emails_dir,
'file_count': email_files_count,
'size_bytes': email_files_size,
'size_mb': round(email_files_size / (1024 * 1024), 2) if email_files_size > 0 else 0
},
'attachments_dir': {
'exists': attachments_dir_exists,
'path': attachments_dir,
'file_count': attachment_files_count,
'size_bytes': attachment_files_size,
'size_mb': round(attachment_files_size / (1024 * 1024), 2) if attachment_files_size > 0 else 0
}
}
# 检查最近的邮件文件
if emails_dir_exists and email_files_count > 0:
files = [(os.path.getmtime(os.path.join(emails_dir, f)), f)
for f in os.listdir(emails_dir) if f.endswith('.eml')]
files.sort(reverse=True)
result['recent_activity']['latest_files'] = [
{
'filename': f,
'modified': datetime.fromtimestamp(t).isoformat(),
'age_seconds': int(time.time() - t)
} for t, f in files[:5]
]
except Exception as e:
result['storage'] = {'error': str(e)}
# 整体状态评估
if ('database' in result['components'] and result['components']['database'].get('status') != 'connected'):
result['system_status'] = 'warning'
if not emails_dir_exists or not attachments_dir_exists:
result['system_status'] = 'warning'
return jsonify({
'success': True,
'status': result['system_status'],
'diagnostics': result
}), 200
except Exception as e:
current_app.logger.error(f"系统诊断出错: {str(e)}")
return jsonify({
'success': False,
'error': '系统诊断失败',
'message': str(e)
}), 500