import base64 import re import os import email from email import policy from datetime import datetime, timedelta import psutil import time from flask import jsonify, request, current_app from sqlalchemy import or_, func, desc from . import api_bp from ..models import Email, Domain, Mailbox, get_session from ..services import get_mail_store # 调试接口 - 检查邮件接收状态 @api_bp.route('/debug_email', methods=['GET']) def debug_email(): """ 调试接口:检查某个邮箱的邮件状态并提供详细信息 查询参数: - email: 邮箱地址 (例如: newsadd1test@nosqli.com) """ try: # 获取查询参数 email_address = request.args.get('email') # 验证邮箱地址是否有效 if not email_address or '@' not in email_address: return jsonify({ 'success': False, 'error': '无效的邮箱地址', 'message': '请提供有效的邮箱地址,格式为user@domain.com' }), 400 # 解析邮箱地址 username, domain_name = email_address.split('@', 1) result = { 'email_address': email_address, 'username': username, 'domain': domain_name, 'system_info': {}, 'logs': [], 'files': [] } # 查询数据库 db = get_session() try: # 查找域名 domain = db.query(Domain).filter_by(name=domain_name).first() if domain: result['system_info']['domain'] = { 'id': domain.id, 'name': domain.name, 'active': domain.active, 'created_at': str(domain.created_at) if hasattr(domain, 'created_at') else None } # 查找邮箱 mailbox = db.query(Mailbox).filter_by( domain_id=domain.id, address=username ).first() if mailbox: result['system_info']['mailbox'] = { 'id': mailbox.id, 'address': mailbox.address, 'full_address': f"{mailbox.address}@{domain_name}", 'created_at': str(mailbox.created_at) if hasattr(mailbox, 'created_at') else None } # 获取邮件 emails = db.query(Email).filter_by(mailbox_id=mailbox.id).all() result['system_info']['emails_count'] = len(emails) result['system_info']['emails'] = [] for email_obj in emails: email_info = { 'id': email_obj.id, 'subject': email_obj.subject, 'sender': email_obj.sender, 'received_at': str(email_obj.received_at), 'verification_code': email_obj.verification_code if hasattr(email_obj, 'verification_code') else None } result['system_info']['emails'].append(email_info) else: result['system_info']['mailbox'] = "未找到邮箱记录" else: result['system_info']['domain'] = "未找到域名记录" # 查找文件 email_data_dir = current_app.config.get('MAIL_STORAGE_PATH', 'email_data') emails_dir = os.path.join(email_data_dir, 'emails') if os.path.exists(emails_dir): for file_name in os.listdir(emails_dir): if file_name.endswith('.eml'): file_path = os.path.join(emails_dir, file_name) file_info = { 'name': file_name, 'path': file_path, 'size': os.path.getsize(file_path), 'modified': datetime.fromtimestamp(os.path.getmtime(file_path)).isoformat() } # 尝试读取文件内容并检查是否包含收件人地址 try: with open(file_path, 'r', encoding='utf-8', errors='replace') as f: content = f.read(10000) # 只读取前10000个字符用于检查 if email_address.lower() in content.lower(): file_info['contains_address'] = True result['files'].append(file_info) except Exception as e: file_info['error'] = str(e) result['files'].append(file_info) # 检查日志 log_file = current_app.config.get('LOG_FILE', os.path.join('logs', 'email_system.log')) if os.path.exists(log_file): try: with open(log_file, 'r', encoding='utf-8', errors='replace') as f: # 从日志尾部读取最后200行 lines = f.readlines()[-200:] for line in lines: if email_address.lower() in line.lower(): result['logs'].append(line.strip()) except Exception as e: result['logs'] = [f"读取日志出错: {str(e)}"] return jsonify({ 'success': True, 'debug_info': result }), 200 finally: db.close() except Exception as e: current_app.logger.error(f"调试邮件出错: {str(e)}") return jsonify({ 'success': False, 'error': '服务器错误', 'message': str(e) }), 500 # 创建邮箱接口 @api_bp.route('/add_mailbox', methods=['POST', 'GET']) def add_mailbox(): """ 创建新邮箱,如果域名不存在则自动创建 查询参数(GET方式)或表单参数(POST方式): - email: 邮箱地址 (例如: testaa@nosqli.com) - description: 邮箱描述 (可选) """ try: # 获取参数 if request.method == 'POST': data = request.json or {} email_address = data.get('email') description = data.get('description', '') else: # GET方式 email_address = request.args.get('email') description = request.args.get('description', '') # 验证邮箱地址 if not email_address or '@' not in email_address: return jsonify({ 'success': False, 'error': '无效的邮箱地址', 'message': '请提供有效的邮箱地址,格式为user@domain.com' }), 400 # 解析邮箱地址 username, domain_name = email_address.split('@', 1) # 创建或查找域名和邮箱 db = get_session() try: # 查找域名 domain = db.query(Domain).filter_by(name=domain_name).first() # 如果域名不存在,创建域名 if not domain: current_app.logger.info(f"域名 {domain_name} 不存在,开始创建") domain = Domain( name=domain_name, description=f"自动创建的域名 {domain_name}", active=True ) db.add(domain) db.commit() current_app.logger.info(f"域名 {domain_name} 创建成功,ID: {domain.id}") # 查询邮箱是否已存在 mailbox = db.query(Mailbox).filter_by( domain_id=domain.id, address=username ).first() # 如果邮箱已存在,返回已存在信息 if mailbox: return jsonify({ 'success': True, 'message': f'邮箱 {email_address} 已存在', 'mailbox': { 'id': mailbox.id, 'address': mailbox.address, 'domain_id': mailbox.domain_id, 'full_address': f"{mailbox.address}@{domain_name}", 'description': mailbox.description } }), 200 # 创建邮箱 mailbox = Mailbox( domain_id=domain.id, address=username, description=description or f"自动创建的邮箱 {email_address}" ) db.add(mailbox) db.commit() # 返回成功信息 return jsonify({ 'success': True, 'message': f'邮箱 {email_address} 创建成功', 'mailbox': { 'id': mailbox.id, 'address': mailbox.address, 'domain_id': mailbox.domain_id, 'full_address': f"{mailbox.address}@{domain_name}", 'description': mailbox.description } }), 201 finally: db.close() except Exception as e: current_app.logger.error(f"创建邮箱出错: {str(e)}") return jsonify({ 'success': False, 'error': '服务器错误', 'message': str(e) }), 500 # 简化的URL路径,直接通过邮箱地址获取邮件 @api_bp.route('/email', methods=['GET']) def get_email_by_address(): """ 通过邮箱地址获取邮件的简化URL 等同于 /decoded_emails?email={email_address}&latest=0 查询参数: - email: 邮箱地址 (必填) - latest: 是否只返回最新的邮件 (1表示是,0表示否,默认0) """ # 重用已有的解码邮件接口 return get_decoded_emails() @api_bp.route('/decoded_emails', methods=['GET']) def get_decoded_emails(): """ 获取指定邮箱地址的所有邮件,并返回解码后的内容 查询参数: - email: 邮箱地址 (例如: testaa@nosqli.com) - latest: 是否只返回最新的邮件 (1表示是,0表示否,默认0) - limit: 返回邮件数量 (默认10) - offset: 查询起始位置 (默认0) """ try: # 获取查询参数 email_address = request.args.get('email') latest = request.args.get('latest', '0') == '1' limit = int(request.args.get('limit', 10)) offset = int(request.args.get('offset', 0)) # 验证邮箱地址是否有效 if not email_address or '@' not in email_address: return jsonify({ 'success': False, 'error': '无效的邮箱地址', 'message': '请提供有效的邮箱地址,格式为user@domain.com' }), 400 # 解析邮箱地址 username, domain_name = email_address.split('@', 1) # 查询数据库 db = get_session() try: # 查找域名 domain = db.query(Domain).filter_by(name=domain_name).first() if not domain: return jsonify({ 'success': False, 'error': '域名不存在', 'message': f'域名 {domain_name} 不存在' }), 404 # 查找邮箱 mailbox = db.query(Mailbox).filter_by( domain_id=domain.id, address=username ).first() if not mailbox: return jsonify({ 'success': False, 'error': '邮箱不存在', 'message': f'邮箱 {email_address} 不存在' }), 404 # 获取邮件 query = db.query(Email).filter_by(mailbox_id=mailbox.id) # 按接收时间排序,最新的在前 query = query.order_by(Email.received_at.desc()) # 如果只要最新的一封 if latest: emails = query.limit(1).all() else: emails = query.limit(limit).offset(offset).all() # 处理结果 result_emails = [] for email_obj in emails: # 获取原始邮件文件路径 email_file_path = os.path.join( current_app.config.get('MAIL_STORAGE_PATH', 'email_data'), 'emails', f'email_{email_obj.id}.eml' ) # 解码邮件内容 decoded_email = decode_email(email_obj, email_file_path) result_emails.append(decoded_email) # 返回结果 return jsonify({ 'success': True, 'email_address': email_address, 'total_emails': query.count(), 'emails': result_emails }), 200 finally: db.close() except Exception as e: current_app.logger.error(f"获取解码邮件出错: {str(e)}") return jsonify({ 'success': False, 'error': '服务器错误', 'message': str(e) }), 500 @api_bp.route('/decoded_email/', methods=['GET']) def get_decoded_email_by_id(email_id): """获取指定ID的解码邮件内容""" try: db = get_session() try: # 获取邮件对象 email_obj = db.query(Email).filter_by(id=email_id).first() if not email_obj: return jsonify({ 'success': False, 'error': '邮件不存在', 'message': f'ID为{email_id}的邮件不存在' }), 404 # 获取原始邮件文件路径 email_file_path = os.path.join( current_app.config.get('MAIL_STORAGE_PATH', 'email_data'), 'emails', f'email_{email_obj.id}.eml' ) # 解码邮件内容 decoded_email = decode_email(email_obj, email_file_path) # 返回结果 return jsonify({ 'success': True, 'email': decoded_email }), 200 finally: db.close() except Exception as e: current_app.logger.error(f"获取解码邮件出错: {str(e)}") return jsonify({ 'success': False, 'error': '服务器错误', 'message': str(e) }), 500 def decode_email(email_obj, email_file_path): """解析并解码邮件内容""" # 创建基本邮件信息 result = { 'id': email_obj.id, 'subject': email_obj.subject, 'sender': email_obj.sender, 'recipients': email_obj.recipients, 'received_at': email_obj.received_at.isoformat() if email_obj.received_at else None, 'read': email_obj.read, 'has_attachments': len(email_obj.attachments) > 0 if hasattr(email_obj, 'attachments') else False } # 从数据库中直接获取验证码 if hasattr(email_obj, 'verification_code') and email_obj.verification_code: result['verification_code'] = email_obj.verification_code if hasattr(email_obj, 'verification_link') and email_obj.verification_link: result['verification_link'] = email_obj.verification_link # 如果邮件对象有文本内容或HTML内容,直接使用 if hasattr(email_obj, 'body_text') and email_obj.body_text: result['body_text'] = email_obj.body_text if hasattr(email_obj, 'body_html') and email_obj.body_html: result['body_html'] = email_obj.body_html # 如果有原始邮件文件,尝试解析 if os.path.exists(email_file_path): try: # 解析.eml文件 with open(email_file_path, 'r', encoding='utf-8', errors='replace') as f: msg = email.message_from_file(f, policy=policy.default) # 如果没有从数据库获取到内容,尝试从文件解析 if 'body_text' not in result or 'body_html' not in result: body_text = "" body_html = "" # 处理多部分邮件 if msg.is_multipart(): for part in msg.iter_parts(): content_type = part.get_content_type() if content_type == "text/plain": try: body_text = part.get_content() except Exception: payload = part.get_payload(decode=True) if payload: charset = part.get_content_charset() or 'utf-8' try: body_text = payload.decode(charset, errors='replace') except: body_text = payload.decode('utf-8', errors='replace') elif content_type == "text/html": try: body_html = part.get_content() except Exception: payload = part.get_payload(decode=True) if payload: charset = part.get_content_charset() or 'utf-8' try: body_html = payload.decode(charset, errors='replace') except: body_html = payload.decode('utf-8', errors='replace') else: # 处理单部分邮件 content_type = msg.get_content_type() try: if content_type == "text/plain": body_text = msg.get_content() elif content_type == "text/html": body_html = msg.get_content() except Exception: payload = msg.get_payload(decode=True) if payload: charset = msg.get_content_charset() or 'utf-8' try: decoded = payload.decode(charset, errors='replace') if content_type == "text/plain": body_text = decoded elif content_type == "text/html": body_html = decoded except: pass # 如果找到了内容,添加到结果中 if body_text and 'body_text' not in result: result['body_text'] = body_text if body_html and 'body_html' not in result: result['body_html'] = body_html # 如果仍然没有提取到验证码,尝试从内容中提取 if 'verification_code' not in result: verification_code = extract_verification_code(result.get('body_text', ''), result.get('body_html', '')) if verification_code: result['verification_code'] = verification_code except Exception as e: current_app.logger.error(f"解析邮件文件出错: {str(e)}") return result def extract_verification_code(body_text, body_html): """从邮件内容中提取验证码""" # 首先尝试从HTML中提取 if body_html: # 常用的验证码模式 patterns = [ r'letter-spacing:\s*\d+px[^>]*>([^<]+)<', # 特殊样式的验证码 r']*>(\d{6})', # 6位数字验证码在div中 r'验证码[::]\s*([A-Z0-9]{4,8})', # 中文标记的验证码 r'code[^\d]+(\d{4,8})', # 英文标记的验证码 r'\b([A-Z0-9]{6})\b' # 6位大写字母或数字 ] for pattern in patterns: matches = re.findall(pattern, body_html) if matches: return matches[0].strip() # 如果HTML中没找到,尝试从纯文本中提取 if body_text: patterns = [ r'验证码[::]\s*([A-Z0-9]{4,8})', # 中文格式 r'code[^\d]+(\d{4,8})', # 英文格式 r'\b(\d{6})\b' # 6位数字 ] for pattern in patterns: matches = re.findall(pattern, body_text) if matches: return matches[0].strip() return None # 系统诊断接口 @api_bp.route('/system_check', methods=['GET']) def system_check(): """ 系统诊断接口:检查邮件系统各组件状态 """ try: result = { 'timestamp': datetime.now().isoformat(), 'system_status': 'normal', 'components': {}, 'recent_activity': {}, 'mailboxes': [], 'storage': {} } # 检查系统资源 try: result['components']['system'] = { 'cpu_percent': psutil.cpu_percent(), 'memory_percent': psutil.virtual_memory().percent, 'disk_usage': psutil.disk_usage('/').percent } except Exception as e: result['components']['system'] = {'error': str(e)} # 检查数据库状态 db = get_session() try: # 获取域名数量 domain_count = db.query(Domain).count() # 获取邮箱数量 mailbox_count = db.query(Mailbox).count() # 获取邮件数量 email_count = db.query(Email).count() # 获取最新邮件 latest_emails = db.query(Email).order_by(Email.received_at.desc()).limit(5).all() result['components']['database'] = { 'status': 'connected', 'domain_count': domain_count, 'mailbox_count': mailbox_count, 'email_count': email_count } # 最近活动 result['recent_activity']['latest_emails'] = [ { 'id': email.id, 'subject': email.subject, 'sender': email.sender, 'received_at': email.received_at.isoformat() if email.received_at else None } for email in latest_emails ] # 获取所有活跃邮箱 active_mailboxes = db.query(Mailbox).order_by(Mailbox.id).limit(10).all() result['mailboxes'] = [ { 'id': mb.id, 'address': mb.address, 'domain_id': mb.domain_id, 'full_address': f"{mb.address}@{mb.domain.name}" if hasattr(mb, 'domain') and mb.domain else f"{mb.address}@unknown", 'email_count': db.query(Email).filter_by(mailbox_id=mb.id).count() } for mb in active_mailboxes ] except Exception as e: result['components']['database'] = {'status': 'error', 'error': str(e)} finally: db.close() # 检查存储状态 email_data_dir = current_app.config.get('MAIL_STORAGE_PATH', 'email_data') try: emails_dir = os.path.join(email_data_dir, 'emails') attachments_dir = os.path.join(email_data_dir, 'attachments') # 检查目录是否存在 emails_dir_exists = os.path.exists(emails_dir) attachments_dir_exists = os.path.exists(attachments_dir) # 计算文件数量和大小 email_files_count = 0 email_files_size = 0 if emails_dir_exists: for file_name in os.listdir(emails_dir): if file_name.endswith('.eml'): email_files_count += 1 email_files_size += os.path.getsize(os.path.join(emails_dir, file_name)) attachment_files_count = 0 attachment_files_size = 0 if attachments_dir_exists: for file_name in os.listdir(attachments_dir): attachment_files_count += 1 attachment_files_size += os.path.getsize(os.path.join(attachments_dir, file_name)) result['storage'] = { 'emails_dir': { 'exists': emails_dir_exists, 'path': emails_dir, 'file_count': email_files_count, 'size_bytes': email_files_size, 'size_mb': round(email_files_size / (1024 * 1024), 2) if email_files_size > 0 else 0 }, 'attachments_dir': { 'exists': attachments_dir_exists, 'path': attachments_dir, 'file_count': attachment_files_count, 'size_bytes': attachment_files_size, 'size_mb': round(attachment_files_size / (1024 * 1024), 2) if attachment_files_size > 0 else 0 } } # 检查最近的邮件文件 if emails_dir_exists and email_files_count > 0: files = [(os.path.getmtime(os.path.join(emails_dir, f)), f) for f in os.listdir(emails_dir) if f.endswith('.eml')] files.sort(reverse=True) result['recent_activity']['latest_files'] = [ { 'filename': f, 'modified': datetime.fromtimestamp(t).isoformat(), 'age_seconds': int(time.time() - t) } for t, f in files[:5] ] except Exception as e: result['storage'] = {'error': str(e)} # 整体状态评估 if ('database' in result['components'] and result['components']['database'].get('status') != 'connected'): result['system_status'] = 'warning' if not emails_dir_exists or not attachments_dir_exists: result['system_status'] = 'warning' return jsonify({ 'success': True, 'status': result['system_status'], 'diagnostics': result }), 200 except Exception as e: current_app.logger.error(f"系统诊断出错: {str(e)}") return jsonify({ 'success': False, 'error': '系统诊断失败', 'message': str(e) }), 500