import logging import os import email from email.policy import default from sqlalchemy.orm import Session from datetime import datetime import re from ..models.domain import Domain from ..models.mailbox import Mailbox from ..models.email import Email from ..models.attachment import Attachment logging.basicConfig( level=logging.DEBUG, # 设置日志级别为DEBUG format='%(asctime)s - %(levelname)s - %(message)s', # 日志格式 filename='app.log', # 日志文件名 filemode='a' # 追加模式 ) logger = logging.getLogger(__name__) class MailStore: """邮件存储服务,负责保存和检索邮件""" def __init__(self, db_session_factory, storage_path=None): """ 初始化邮件存储服务 参数: db_session_factory: 数据库会话工厂函数 storage_path: 附件存储路径 """ self.db_session_factory = db_session_factory self.storage_path = storage_path or os.path.join(os.getcwd(), 'email_data') # 确保存储目录存在 if not os.path.exists(self.storage_path): os.makedirs(self.storage_path) async def save_email(self, message, sender, recipients, raw_data=None): logging.info(f"开始保存邮件: 发件人={sender}, 收件人={recipients}") logging.debug(f"原始邮件数据: {raw_data}") if recipients is None: logging.error("收件人列表为None,无法保存邮件") return False, "收件人列表为None" elif isinstance(recipients, list): recipients_list = recipients # 如果是列表,直接使用 else: recipients_list = recipients.split(",") # 假设是以逗号分隔的字符串 # 确保收件人列表不为空 if recipients_list is None or not recipients_list: logging.error("收件人列表为空,无法保存邮件") return False, "收件人列表为空" # 打印收件人列表以进行调试 logging.debug(f"收件人列表x2: {recipients_list}") if message is None: logging.error("邮件内容无效,无法保存邮件") return False, "邮件内容无效" try: # 解析邮件内容 email_subject = None body_text = "" body_html = "" attachments = [] # 提取邮件主题 if hasattr(message, 'subject') and message.subject: email_subject = message.subject logging.debug(f"提取的邮件主题: {email_subject}") else: logging.warning("邮件主题未找到") # 提取邮件内容 if hasattr(message, 'get_body'): try: for part in message.walk(): content_type = part.get_content_type() logging.debug(f"处理邮件部分: {content_type}") if content_type == "text/plain": body_text = part.get_payload(decode=True).decode(part.get_content_charset() or 'utf-8', errors='replace') logging.debug(f"提取的纯文本内容长度xb: {len(body_text)}") elif content_type == "text/html": body_html = part.get_payload(decode=True).decode(part.get_content_charset() or 'utf-8', errors='replace') logging.debug(f"提取的HTML内容长度: {len(body_html)}") # 处理附件 if part.get_filename(): file_name = part.get_filename() content = part.get_payload(decode=True) attachments.append((file_name, content)) logging.debug(f"提取的附件: {file_name}, 大小: {len(content)} 字节") except Exception as e: logging.error(f"获取邮件内容时出错: {str(e)}") # 如果仍然没有内容,尝试从原始数据中提取 if not body_text and not body_html and raw_data: logging.info("从原始数据中提取内容") try: # 简单提取,可能不适用于所有情况 if '' in raw_data.lower(): body_html = raw_data else: body_text = raw_data except Exception as e: logging.error(f"从原始数据提取内容失败: {str(e)}") logging.info(f"提取完成x3: 纯文本={len(body_text)}字节, HTML={len(body_html)}字节, 附件数={len(attachments)}") # 保存到数据库 session = self.db_session_factory() try: # 查找收件人对应的邮箱 mailbox_id = None # 确保收件人列表不为空 if not recipients_list: logging.error("收件人列表为空,无法确定邮箱") return False, "收件人列表为空" # 尝试找到或创建收件人邮箱 for recipient in recipients_list: # 跳过空收件人 logging.debug(f"处理收件人: {recipient}") if not recipient or '@' not in recipient: continue # 提取域名和用户名 username, domain = recipient.lower().split('@', 1) logging.info(f"处理收件人: 用户名={username}, 域名={domain}") # 1. 先查找域名 domain_obj = session.query(Domain).filter(Domain.name.ilike(domain)).first() # 如果域名不存在,创建它 if not domain_obj: logging.info(f"创建新域名: {domain}") domain_obj = Domain( name=domain.lower(), description=f"系统自动创建的域名 ({domain})", active=True ) session.add(domain_obj) session.flush() # 2. 查找或创建邮箱 mailbox = session.query(Mailbox).filter( Mailbox.domain_id == domain_obj.id, Mailbox.address.ilike(username) ).first() # 如果邮箱不存在,创建它 if not mailbox: logging.info(f"创建新邮箱: {username}@{domain}") mailbox = Mailbox( domain_id=domain_obj.id, address=username.lower(), description=f"系统自动创建的邮箱 ({username}@{domain})" ) session.add(mailbox) session.flush() # 设置邮箱ID并结束循环 mailbox_id = mailbox.id logging.info(f"使用邮箱ID: {mailbox_id} ({username}@{domain})") break # 最终检查是否获取到了邮箱ID if mailbox_id is None: error_msg = f"无法确定有效的收件邮箱,无法保存邮件。收件人: {recipients}" logging.error(error_msg) return False, error_msg # 打印邮箱ID以进行调试 logging.debug(f"使用的邮箱IDx3: {mailbox_id}") # 创建邮件记录 email_obj = Email( mailbox_id=mailbox_id, # 确保始终有邮箱ID sender=sender, recipients=str(recipients), subject=email_subject, body_text=body_text, body_html=body_html, ) logging.debug(f"创建邮件记录: {email_obj}") # 提取验证码和验证链接 email_obj.extract_verification_data() session.add(email_obj) session.flush() # 获取新创建邮件的ID session.commit() logging.info(f"邮件保存成功: ID={email_obj.id}") return True, f"邮件已保存: ID={email_obj.id}" except Exception as e: session.rollback() logging.error(f"数据库操作失败#1: {str(e)}") raise finally: session.close() except Exception as e: logging.error(f"邮件保存失败#2: {str(e)}") return False, f"保存邮件失败#3: {str(e)}" def get_emails_for_mailbox(self, mailbox_id, limit=50, offset=0, unread_only=False): """获取指定邮箱的邮件列表""" db = self.db_session_factory() try: query = db.query(Email).filter(Email.mailbox_id == mailbox_id) if unread_only: query = query.filter(Email.read == False) total = query.count() emails = query.order_by(Email.received_at.desc()) \ .limit(limit) \ .offset(offset) \ .all() return { 'total': total, 'items': [email.to_dict() for email in emails] } except Exception as e: logger.error(f"获取邮件列表时出错: {str(e)}") return {'total': 0, 'items': []} finally: db.close() def get_email_by_id(self, email_id, mark_as_read=True): """获取指定ID的邮件详情""" db = self.db_session_factory() try: email = db.query(Email).filter(Email.id == email_id).first() if not email: return None if mark_as_read and not email.read: email.read = True email.last_read = datetime.now() db.commit() # 获取附件信息 attachments = [attachment.to_dict() for attachment in email.attachments] # 构建完整响应 result = email.to_dict() result['body_text'] = email.body_text result['body_html'] = email.body_html result['attachments'] = attachments return result except Exception as e: db.rollback() logger.error(f"获取邮件详情时出错: {str(e)}") return None finally: db.close() def delete_email(self, email_id): """删除指定ID的邮件""" db = self.db_session_factory() try: email = db.query(Email).filter(Email.id == email_id).first() if not email: return False db.delete(email) db.commit() return True except Exception as e: db.rollback() logger.error(f"删除邮件时出错: {str(e)}") return False finally: db.close() def get_attachment_content(self, attachment_id): """获取附件内容""" db = self.db_session_factory() try: attachment = db.query(Attachment).filter(Attachment.id == attachment_id).first() if not attachment: return None content = attachment.get_content() return { 'content': content, 'filename': attachment.filename, 'content_type': attachment.content_type } except Exception as e: logger.error(f"获取附件内容时出错: {str(e)}") return None finally: db.close()