diff --git a/app/api/__init__.py b/app/api/__init__.py index 8da90d4..5d7fbc4 100644 --- a/app/api/__init__.py +++ b/app/api/__init__.py @@ -18,6 +18,8 @@ def index(): # 为避免可能的文件读取问题,改为从routes.py模块中导入所有路由定义 try: from .routes import * + # 导入解码邮件路由模块 + from .decoded_email_routes import * except Exception as e: logging.error(f"导入API路由时出错: {str(e)}") raise \ No newline at end of file diff --git a/app/api/decoded_email_routes.py b/app/api/decoded_email_routes.py new file mode 100644 index 0000000..bc2b673 --- /dev/null +++ b/app/api/decoded_email_routes.py @@ -0,0 +1,411 @@ +from flask import request, jsonify, current_app +import os +import email +from email import policy +import re +import base64 +from datetime import datetime, timedelta + +from . import api_bp +from ..models import get_session, Domain, Mailbox, Email +from ..services import get_mail_store + +# 创建邮箱接口 +@api_bp.route('/add_mailbox', methods=['POST', 'GET']) +def add_mailbox(): + """ + 创建新邮箱,如果域名不存在则自动创建 + + 查询参数(GET方式)或表单参数(POST方式): + - email: 邮箱地址 (例如: testaa@nosqli.com) + - description: 邮箱描述 (可选) + """ + try: + # 获取参数 + if request.method == 'POST': + data = request.json or {} + email_address = data.get('email') + description = data.get('description', '') + else: # GET方式 + email_address = request.args.get('email') + description = request.args.get('description', '') + + # 验证邮箱地址 + if not email_address or '@' not in email_address: + return jsonify({ + 'success': False, + 'error': '无效的邮箱地址', + 'message': '请提供有效的邮箱地址,格式为user@domain.com' + }), 400 + + # 解析邮箱地址 + username, domain_name = email_address.split('@', 1) + + # 创建或查找域名和邮箱 + db = get_session() + try: + # 查找域名 + domain = db.query(Domain).filter_by(name=domain_name).first() + + # 如果域名不存在,创建域名 + if not domain: + current_app.logger.info(f"域名 {domain_name} 不存在,开始创建") + domain = Domain( + name=domain_name, + description=f"自动创建的域名 {domain_name}", + active=True + ) + db.add(domain) + db.commit() + current_app.logger.info(f"域名 {domain_name} 创建成功,ID: {domain.id}") + + # 查询邮箱是否已存在 + mailbox = db.query(Mailbox).filter_by( + domain_id=domain.id, + address=username + ).first() + + # 如果邮箱已存在,返回已存在信息 + if mailbox: + return jsonify({ + 'success': True, + 'message': f'邮箱 {email_address} 已存在', + 'mailbox': { + 'id': mailbox.id, + 'address': mailbox.address, + 'domain_id': mailbox.domain_id, + 'full_address': f"{mailbox.address}@{domain_name}", + 'description': mailbox.description + } + }), 200 + + # 创建邮箱 + mailbox = Mailbox( + domain_id=domain.id, + address=username, + description=description or f"自动创建的邮箱 {email_address}" + ) + db.add(mailbox) + db.commit() + + # 返回成功信息 + return jsonify({ + 'success': True, + 'message': f'邮箱 {email_address} 创建成功', + 'mailbox': { + 'id': mailbox.id, + 'address': mailbox.address, + 'domain_id': mailbox.domain_id, + 'full_address': f"{mailbox.address}@{domain_name}", + 'description': mailbox.description + } + }), 201 + + finally: + db.close() + + except Exception as e: + current_app.logger.error(f"创建邮箱出错: {str(e)}") + return jsonify({ + 'success': False, + 'error': '服务器错误', + 'message': str(e) + }), 500 + +# 简化的URL路径,直接通过邮箱地址获取邮件 +@api_bp.route('/email', methods=['GET']) +def get_email_by_address(): + """ + 通过邮箱地址获取邮件的简化URL + 等同于 /decoded_emails?email={email_address}&latest=0 + + 查询参数: + - email: 邮箱地址 (必填) + - latest: 是否只返回最新的邮件 (1表示是,0表示否,默认0) + """ + # 重用已有的解码邮件接口 + return get_decoded_emails() + +@api_bp.route('/decoded_emails', methods=['GET']) +def get_decoded_emails(): + """ + 获取指定邮箱地址的所有邮件,并返回解码后的内容 + + 查询参数: + - email: 邮箱地址 (例如: testaa@nosqli.com) + - latest: 是否只返回最新的邮件 (1表示是,0表示否,默认0) + - limit: 返回邮件数量 (默认10) + - offset: 查询起始位置 (默认0) + """ + try: + # 获取查询参数 + email_address = request.args.get('email') + latest = request.args.get('latest', '0') == '1' + limit = int(request.args.get('limit', 10)) + offset = int(request.args.get('offset', 0)) + + # 验证邮箱地址是否有效 + if not email_address or '@' not in email_address: + return jsonify({ + 'success': False, + 'error': '无效的邮箱地址', + 'message': '请提供有效的邮箱地址,格式为user@domain.com' + }), 400 + + # 解析邮箱地址 + username, domain_name = email_address.split('@', 1) + + # 查询数据库 + db = get_session() + try: + # 查找域名 + domain = db.query(Domain).filter_by(name=domain_name).first() + if not domain: + return jsonify({ + 'success': False, + 'error': '域名不存在', + 'message': f'域名 {domain_name} 不存在' + }), 404 + + # 查找邮箱 + mailbox = db.query(Mailbox).filter_by( + domain_id=domain.id, + address=username + ).first() + + if not mailbox: + return jsonify({ + 'success': False, + 'error': '邮箱不存在', + 'message': f'邮箱 {email_address} 不存在' + }), 404 + + # 获取邮件 + query = db.query(Email).filter_by(mailbox_id=mailbox.id) + + # 按接收时间排序,最新的在前 + query = query.order_by(Email.received_at.desc()) + + # 如果只要最新的一封 + if latest: + emails = query.limit(1).all() + else: + emails = query.limit(limit).offset(offset).all() + + # 处理结果 + result_emails = [] + for email_obj in emails: + # 获取原始邮件文件路径 + email_file_path = os.path.join( + current_app.config.get('MAIL_STORAGE_PATH', 'email_data'), + 'emails', + f'email_{email_obj.id}.eml' + ) + + # 解码邮件内容 + decoded_email = decode_email(email_obj, email_file_path) + result_emails.append(decoded_email) + + # 返回结果 + return jsonify({ + 'success': True, + 'email_address': email_address, + 'total_emails': query.count(), + 'emails': result_emails + }), 200 + + finally: + db.close() + + except Exception as e: + current_app.logger.error(f"获取解码邮件出错: {str(e)}") + return jsonify({ + 'success': False, + 'error': '服务器错误', + 'message': str(e) + }), 500 + + +@api_bp.route('/decoded_email/', methods=['GET']) +def get_decoded_email_by_id(email_id): + """获取指定ID的解码邮件内容""" + try: + db = get_session() + try: + # 获取邮件对象 + email_obj = db.query(Email).filter_by(id=email_id).first() + + if not email_obj: + return jsonify({ + 'success': False, + 'error': '邮件不存在', + 'message': f'ID为{email_id}的邮件不存在' + }), 404 + + # 获取原始邮件文件路径 + email_file_path = os.path.join( + current_app.config.get('MAIL_STORAGE_PATH', 'email_data'), + 'emails', + f'email_{email_obj.id}.eml' + ) + + # 解码邮件内容 + decoded_email = decode_email(email_obj, email_file_path) + + # 返回结果 + return jsonify({ + 'success': True, + 'email': decoded_email + }), 200 + + finally: + db.close() + + except Exception as e: + current_app.logger.error(f"获取解码邮件出错: {str(e)}") + return jsonify({ + 'success': False, + 'error': '服务器错误', + 'message': str(e) + }), 500 + + +def decode_email(email_obj, email_file_path): + """解析并解码邮件内容""" + # 创建基本邮件信息 + result = { + 'id': email_obj.id, + 'subject': email_obj.subject, + 'sender': email_obj.sender, + 'recipients': email_obj.recipients, + 'received_at': email_obj.received_at.isoformat() if email_obj.received_at else None, + 'read': email_obj.read, + 'has_attachments': len(email_obj.attachments) > 0 if hasattr(email_obj, 'attachments') else False + } + + # 从数据库中直接获取验证码 + if hasattr(email_obj, 'verification_code') and email_obj.verification_code: + result['verification_code'] = email_obj.verification_code + + if hasattr(email_obj, 'verification_link') and email_obj.verification_link: + result['verification_link'] = email_obj.verification_link + + # 如果邮件对象有文本内容或HTML内容,直接使用 + if hasattr(email_obj, 'body_text') and email_obj.body_text: + result['body_text'] = email_obj.body_text + + if hasattr(email_obj, 'body_html') and email_obj.body_html: + result['body_html'] = email_obj.body_html + + # 如果有原始邮件文件,尝试解析 + if os.path.exists(email_file_path): + try: + # 解析.eml文件 + with open(email_file_path, 'r', encoding='utf-8', errors='replace') as f: + msg = email.message_from_file(f, policy=policy.default) + + # 如果没有从数据库获取到内容,尝试从文件解析 + if 'body_text' not in result or 'body_html' not in result: + body_text = "" + body_html = "" + + # 处理多部分邮件 + if msg.is_multipart(): + for part in msg.iter_parts(): + content_type = part.get_content_type() + + if content_type == "text/plain": + try: + body_text = part.get_content() + except Exception: + payload = part.get_payload(decode=True) + if payload: + charset = part.get_content_charset() or 'utf-8' + try: + body_text = payload.decode(charset, errors='replace') + except: + body_text = payload.decode('utf-8', errors='replace') + + elif content_type == "text/html": + try: + body_html = part.get_content() + except Exception: + payload = part.get_payload(decode=True) + if payload: + charset = part.get_content_charset() or 'utf-8' + try: + body_html = payload.decode(charset, errors='replace') + except: + body_html = payload.decode('utf-8', errors='replace') + else: + # 处理单部分邮件 + content_type = msg.get_content_type() + try: + if content_type == "text/plain": + body_text = msg.get_content() + elif content_type == "text/html": + body_html = msg.get_content() + except Exception: + payload = msg.get_payload(decode=True) + if payload: + charset = msg.get_content_charset() or 'utf-8' + try: + decoded = payload.decode(charset, errors='replace') + if content_type == "text/plain": + body_text = decoded + elif content_type == "text/html": + body_html = decoded + except: + pass + + # 如果找到了内容,添加到结果中 + if body_text and 'body_text' not in result: + result['body_text'] = body_text + + if body_html and 'body_html' not in result: + result['body_html'] = body_html + + # 如果仍然没有提取到验证码,尝试从内容中提取 + if 'verification_code' not in result: + verification_code = extract_verification_code(result.get('body_text', ''), result.get('body_html', '')) + if verification_code: + result['verification_code'] = verification_code + + except Exception as e: + current_app.logger.error(f"解析邮件文件出错: {str(e)}") + + return result + + +def extract_verification_code(body_text, body_html): + """从邮件内容中提取验证码""" + # 首先尝试从HTML中提取 + if body_html: + # 常用的验证码模式 + patterns = [ + r'letter-spacing:\s*\d+px[^>]*>([^<]+)<', # 特殊样式的验证码 + r']*>(\d{6})', # 6位数字验证码在div中 + r'验证码[::]\s*([A-Z0-9]{4,8})', # 中文标记的验证码 + r'code[^\d]+(\d{4,8})', # 英文标记的验证码 + r'\b([A-Z0-9]{6})\b' # 6位大写字母或数字 + ] + + for pattern in patterns: + matches = re.findall(pattern, body_html) + if matches: + return matches[0].strip() + + # 如果HTML中没找到,尝试从纯文本中提取 + if body_text: + patterns = [ + r'验证码[::]\s*([A-Z0-9]{4,8})', # 中文格式 + r'code[^\d]+(\d{4,8})', # 英文格式 + r'\b(\d{6})\b' # 6位数字 + ] + + for pattern in patterns: + matches = re.findall(pattern, body_text) + if matches: + return matches[0].strip() + + return None \ No newline at end of file