from flask import request, jsonify, current_app, send_file from io import BytesIO import time import os import logging from . import api_bp from ..models import get_session, Email, Mailbox, Domain from ..utils import email_parser from ..config import config # 获取邮箱的所有邮件 @api_bp.route('/mailboxes//emails', methods=['GET']) def get_mailbox_emails(mailbox_id): """获取指定邮箱的所有邮件""" try: page = int(request.args.get('page', 1)) limit = int(request.args.get('limit', 50)) unread_only = request.args.get('unread_only', 'false').lower() == 'true' offset = (page - 1) * limit db = get_session() try: # 检查邮箱是否存在 mailbox = db.query(Mailbox).filter_by(id=mailbox_id).first() if not mailbox: return jsonify({'error': '邮箱不存在'}), 404 # 查询邮件 query = db.query(Email).filter(Email.mailbox_id == mailbox_id) if unread_only: query = query.filter(Email.read == False) # 获取总数 total = query.count() # 分页获取邮件 emails = query.order_by(Email.received_at.desc()) \ .limit(limit) \ .offset(offset) \ .all() # 返回结果 result = { 'success': True, 'total': total, 'page': page, 'limit': limit, 'emails': [email.to_dict() for email in emails] } return jsonify(result), 200 finally: db.close() except Exception as e: current_app.logger.error(f"获取邮件列表出错: {str(e)}") return jsonify({'success': False, 'error': '获取邮件列表失败', 'details': str(e)}), 500 # 获取特定邮件详情 @api_bp.route('/emails/', methods=['GET']) def get_email(email_id): """ 获取单个邮件的详细信息 """ try: email_id = int(email_id) session = get_session() email = session.query(Email).filter(Email.id == email_id).first() if not email: return jsonify({ 'success': False, 'error': f'未找到ID为{email_id}的邮件' }), 404 # 获取邮件正文内容 body_text = None body_html = None try: # 尝试从文件中读取邮件内容 if email.id: email_path = os.path.join(config.DATA_DIR, 'emails', f'email_{email.id}.eml') if os.path.exists(email_path): logging.info(f"从文件读取邮件内容: {email_path}") with open(email_path, 'r', encoding='utf-8', errors='ignore') as f: try: raw_email = f.read() msg = email_parser.parsestr(raw_email) if msg.is_multipart(): # 处理多部分邮件 for part in msg.walk(): content_type = part.get_content_type() content_disposition = str(part.get("Content-Disposition")) # 跳过附件 if "attachment" in content_disposition: continue # 处理文本内容 if content_type == "text/plain": payload = part.get_payload(decode=True) charset = part.get_content_charset() or 'utf-8' try: body_text = payload.decode(charset, errors='replace') except Exception as e: logging.error(f"解码纯文本内容失败: {e}") body_text = payload.decode('utf-8', errors='replace') # 处理HTML内容 elif content_type == "text/html": payload = part.get_payload(decode=True) charset = part.get_content_charset() or 'utf-8' try: body_html = payload.decode(charset, errors='replace') except Exception as e: logging.error(f"解码HTML内容失败: {e}") body_html = payload.decode('utf-8', errors='replace') else: # 处理单部分邮件 content_type = msg.get_content_type() payload = msg.get_payload(decode=True) charset = msg.get_content_charset() or 'utf-8' try: decoded_content = payload.decode(charset, errors='replace') except Exception as e: logging.error(f"解码内容失败: {e}") decoded_content = payload.decode('utf-8', errors='replace') if content_type == "text/plain": body_text = decoded_content elif content_type == "text/html": body_html = decoded_content except Exception as e: logging.error(f"解析邮件文件失败: {e}") except Exception as e: logging.error(f"读取邮件内容时出错: {e}") # 如果文件读取失败,使用数据库中的内容 if body_text is None: body_text = email.body_text if body_html is None: body_html = email.body_html logging.info(f"邮件ID={email_id} 正文长度: text={len(body_text or '')}字节, html={len(body_html or '')}字节") # 返回邮件信息,包括正文内容 return jsonify({ 'success': True, 'email': { 'id': email.id, 'subject': email.subject, 'sender': email.sender, 'recipients': email.recipients, 'received_at': email.received_at.isoformat(), 'verification_code': email.verification_code, 'verification_link': email.verification_link, 'body_text': body_text, 'body_html': body_html } }) except Exception as e: logging.error(f"获取邮件时出错: {str(e)}") return jsonify({ 'success': False, 'error': f'获取邮件时发生错误: {str(e)}' }), 500 finally: session.close() # 删除邮件 @api_bp.route('/emails/', methods=['DELETE']) def delete_email(email_id): """删除特定邮件""" try: db = get_session() try: email = db.query(Email).filter_by(id=email_id).first() if not email: return jsonify({'error': '邮件不存在'}), 404 db.delete(email) db.commit() return jsonify({'message': '邮件已删除'}), 200 except Exception as e: db.rollback() raise finally: db.close() except Exception as e: current_app.logger.error(f"删除邮件出错: {str(e)}") return jsonify({'error': '删除邮件失败', 'details': str(e)}), 500 # 下载附件 @api_bp.route('/attachments/', methods=['GET']) def download_attachment(attachment_id): """下载特定的附件""" try: from ..models import Attachment db = get_session() try: attachment = db.query(Attachment).filter_by(id=attachment_id).first() if not attachment: return jsonify({'error': '附件不存在'}), 404 # 获取附件内容 content = attachment.get_content() if not content: return jsonify({'error': '附件内容不可用'}), 404 # 创建内存文件对象 file_obj = BytesIO(content) # 返回文件下载响应 return send_file( file_obj, mimetype=attachment.content_type, as_attachment=True, download_name=attachment.filename ) finally: db.close() except Exception as e: current_app.logger.error(f"下载附件出错: {str(e)}") return jsonify({'error': '下载附件失败', 'details': str(e)}), 500 # 获取最新邮件 (轮询API) @api_bp.route('/mailboxes//poll', methods=['GET']) def poll_new_emails(mailbox_id): """轮询指定邮箱的新邮件""" try: # 获取上次检查时间 last_check = request.args.get('last_check') if last_check: try: last_check_time = float(last_check) except ValueError: return jsonify({'error': '无效的last_check参数'}), 400 else: last_check_time = time.time() - 300 # 默认检查最近5分钟 db = get_session() try: # 检查邮箱是否存在 mailbox = db.query(Mailbox).filter_by(id=mailbox_id).first() if not mailbox: return jsonify({'error': '邮箱不存在'}), 404 # 查询新邮件 new_emails = db.query(Email).filter( Email.mailbox_id == mailbox_id, Email.received_at >= time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(last_check_time)) ).order_by(Email.received_at.desc()).all() # 返回结果 result = { 'mailbox_id': mailbox_id, 'count': len(new_emails), 'emails': [email.to_dict() for email in new_emails], 'timestamp': time.time() } return jsonify(result), 200 finally: db.close() except Exception as e: current_app.logger.error(f"轮询新邮件出错: {str(e)}") return jsonify({'error': '轮询新邮件失败', 'details': str(e)}), 500 # 通过邮箱地址获取最新邮件 @api_bp.route('/emails/by-address', methods=['GET']) def get_emails_by_address(): """ 通过邮箱地址获取最新邮件 参数: email_address: 完整邮箱地址 (例如: user@example.com) limit: 返回的邮件数量 (默认: 10) since: 从指定时间戳后获取邮件 (可选) unread_only: 是否只返回未读邮件 (默认: false) 返回: 最新的邮件列表 """ try: email_address = request.args.get('email_address') if not email_address or '@' not in email_address: return jsonify({ 'success': False, 'error': '请提供有效的邮箱地址 (格式: user@example.com)' }), 400 limit = int(request.args.get('limit', 10)) unread_only = request.args.get('unread_only', 'false').lower() == 'true' since = request.args.get('since') # 解析邮箱地址 try: username, domain_name = email_address.split('@', 1) except ValueError: return jsonify({ 'success': False, 'error': '邮箱地址格式无效' }), 400 db = get_session() try: # 查找域名 domain = db.query(Domain).filter_by(name=domain_name, active=True).first() if not domain: return jsonify({ 'success': False, 'error': f'域名 {domain_name} 不存在或未激活' }), 404 # 查找邮箱 mailbox = db.query(Mailbox).filter_by(address=username, domain_id=domain.id).first() if not mailbox: # 自动创建邮箱 - 批量注册场景 mailbox = Mailbox( address=username, domain_id=domain.id, description=f"自动创建 ({email_address})", active=True ) db.add(mailbox) db.flush() # 获取新创建邮箱的ID logging.info(f"自动创建邮箱: {email_address}, ID={mailbox.id}") # 查询邮件 query = db.query(Email).filter(Email.mailbox_id == mailbox.id) if unread_only: query = query.filter(Email.read == False) if since: try: since_time = float(since) query = query.filter( Email.received_at >= time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(since_time)) ) except ValueError: logging.warning(f"无效的since参数: {since}") # 获取最新的邮件 emails = query.order_by(Email.received_at.desc()).limit(limit).all() # 获取总数 total = query.count() # 提交数据库变更 db.commit() # 返回结果 return jsonify({ 'success': True, 'email_address': email_address, 'mailbox_id': mailbox.id, 'total': total, 'count': len(emails), 'emails': [email.to_dict() for email in emails], 'timestamp': time.time() }), 200 except Exception as e: db.rollback() raise finally: db.close() except Exception as e: logging.error(f"获取邮件出错: {str(e)}") return jsonify({ 'success': False, 'error': f'获取邮件失败: {str(e)}' }), 500