Files
emailsystem/app/services/mail_store.py
huangzhenpc d85c1ab23a test
2025-02-26 16:48:15 +08:00

339 lines
14 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import logging
import os
import email
from email.policy import default
from sqlalchemy.orm import Session
from datetime import datetime
import re
from ..models.domain import Domain
from ..models.mailbox import Mailbox
from ..models.email import Email
from ..models.attachment import Attachment
logging.basicConfig(
level=logging.DEBUG, # 设置日志级别为DEBUG
format='%(asctime)s - %(levelname)s - %(message)s', # 日志格式
filename='app.log', # 日志文件名
filemode='a' # 追加模式
)
logger = logging.getLogger(__name__)
class MailStore:
"""邮件存储服务,负责保存和检索邮件"""
def __init__(self, db_session_factory, storage_path=None):
"""
初始化邮件存储服务
参数:
db_session_factory: 数据库会话工厂函数
storage_path: 附件存储路径
"""
self.db_session_factory = db_session_factory
self.storage_path = storage_path or os.path.join(os.getcwd(), 'email_data')
# 确保存储目录存在
if not os.path.exists(self.storage_path):
os.makedirs(self.storage_path)
async def save_email(self, message, sender, recipients, raw_data=None):
logging.info(f"开始保存邮件: 发件人={sender}, 收件人={recipients}")
logging.debug(f"原始邮件数据: {raw_data}")
if recipients is None:
logging.error("收件人列表为None无法保存邮件")
return False, "收件人列表为None"
elif isinstance(recipients, list):
recipients_list = recipients # 如果是列表,直接使用
else:
recipients_list = recipients.split(",") # 假设是以逗号分隔的字符串
# 确保收件人列表不为空
if recipients_list is None or not recipients_list:
logging.error("收件人列表为空,无法保存邮件")
return False, "收件人列表为空"
# 打印收件人列表以进行调试
logging.debug(f"收件人列表: {recipients_list}")
if message is None:
logging.error("邮件内容无效,无法保存邮件")
return False, "邮件内容无效"
try:
# 解析邮件内容
email_subject = None
body_text = ""
body_html = ""
attachments = []
# 提取邮件主题
if hasattr(message, 'subject') and message.subject:
email_subject = message.subject
logging.debug(f"提取的邮件主题: {email_subject}")
else:
logging.warning("邮件主题未找到")
# 提取邮件内容
if hasattr(message, 'get_body'):
try:
for part in message.walk():
content_type = part.get_content_type()
logging.debug(f"处理邮件部分: {content_type}")
if content_type == "text/plain":
body_text = part.get_payload(decode=True).decode(part.get_content_charset() or 'utf-8', errors='replace')
logging.debug(f"提取的纯文本内容长度: {len(body_text)}")
elif content_type == "text/html":
body_html = part.get_payload(decode=True).decode(part.get_content_charset() or 'utf-8', errors='replace')
logging.debug(f"提取的HTML内容长度: {len(body_html)}")
# 处理附件
if part.get_filename():
file_name = part.get_filename()
content = part.get_payload(decode=True)
attachments.append((file_name, content))
logging.debug(f"提取的附件: {file_name}, 大小: {len(content)} 字节")
except Exception as e:
logging.error(f"获取邮件内容时出错: {str(e)}")
# 如果仍然没有内容,尝试从原始数据中提取
if not body_text and not body_html and raw_data:
logging.info("从原始数据中提取内容")
try:
# 简单提取,可能不适用于所有情况
if '<html>' in raw_data.lower():
body_html = raw_data
else:
body_text = raw_data
except Exception as e:
logging.error(f"从原始数据提取内容失败: {str(e)}")
logging.info(f"提取完成: 纯文本={len(body_text)}字节, HTML={len(body_html)}字节, 附件数={len(attachments)}")
# 保存到数据库
session = self.db_session_factory()
try:
# 查找收件人对应的邮箱
mailbox_id = None
# 确保收件人列表不为空
if not recipients_list:
logging.error("收件人列表为空,无法确定邮箱")
return False, "收件人列表为空"
# 尝试找到或创建收件人邮箱
for recipient in recipients_list:
# 跳过空收件人
logging.debug(f"处理收件人: {recipient}")
if not recipient or '@' not in recipient:
continue
# 提取域名和用户名
username, domain = recipient.lower().split('@', 1)
logging.info(f"处理收件人: 用户名={username}, 域名={domain}")
# 1. 先查找域名
domain_obj = session.query(Domain).filter(Domain.name.ilike(domain)).first()
# 如果域名不存在,创建它
if not domain_obj:
logging.info(f"创建新域名: {domain}")
domain_obj = Domain(
name=domain.lower(),
description=f"系统自动创建的域名 ({domain})",
active=True
)
session.add(domain_obj)
session.flush()
# 2. 查找或创建邮箱
mailbox = session.query(Mailbox).filter(
Mailbox.domain_id == domain_obj.id,
Mailbox.address.ilike(username)
).first()
# 如果邮箱不存在,创建它
if not mailbox:
logging.info(f"创建新邮箱: {username}@{domain}")
mailbox = Mailbox(
domain_id=domain_obj.id,
address=username.lower(),
description=f"系统自动创建的邮箱 ({username}@{domain})"
)
session.add(mailbox)
session.flush()
# 设置邮箱ID并结束循环
mailbox_id = mailbox.id
logging.info(f"使用邮箱ID: {mailbox_id} ({username}@{domain})")
break
# 最终检查是否获取到了邮箱ID
if mailbox_id is None:
error_msg = f"无法确定有效的收件邮箱,无法保存邮件。收件人: {recipients}"
logging.error(error_msg)
return False, error_msg
# 创建邮件记录
email_obj = Email(
mailbox_id=mailbox_id, # 确保始终有邮箱ID
sender=sender,
recipients=str(recipients),
subject=email_subject,
body_text=body_text,
body_html=body_html,
)
# 提取验证码和验证链接
email_obj.extract_verification_data()
session.add(email_obj)
session.flush() # 获取新创建邮件的ID
# 保存附件
for file_name, content in attachments:
attachment = Attachment(
email_id=email_obj.id,
filename=file_name,
content_type="application/octet-stream",
size=len(content)
)
session.add(attachment)
# 保存附件内容到文件
attachment_path = os.path.join(self.storage_path, 'attachments', f"attachment_{attachment.id}")
os.makedirs(os.path.dirname(attachment_path), exist_ok=True)
with open(attachment_path, 'wb') as f:
f.write(content)
# 保存原始邮件数据
raw_path = os.path.join(self.storage_path, 'emails', f"email_{email_obj.id}.eml")
os.makedirs(os.path.dirname(raw_path), exist_ok=True)
# 写入原始邮件
with open(raw_path, 'w', encoding='utf-8', errors='replace') as f:
if isinstance(message, str):
f.write(message)
else:
# 如果是邮件对象,尝试获取原始文本
try:
f.write(message.as_string())
except Exception:
# 如果失败,使用提供的原始数据
if raw_data:
f.write(raw_data)
session.commit()
logging.info(f"邮件保存成功: ID={email_obj.id}")
return True, f"邮件已保存: ID={email_obj.id}"
except Exception as e:
session.rollback()
logging.error(f"数据库操作失败#1: {str(e)}")
raise
finally:
session.close()
except Exception as e:
logging.error(f"邮件保存失败#2: {str(e)}")
return False, f"保存邮件失败#3: {str(e)}"
def get_emails_for_mailbox(self, mailbox_id, limit=50, offset=0, unread_only=False):
"""获取指定邮箱的邮件列表"""
db = self.db_session_factory()
try:
query = db.query(Email).filter(Email.mailbox_id == mailbox_id)
if unread_only:
query = query.filter(Email.read == False)
total = query.count()
emails = query.order_by(Email.received_at.desc()) \
.limit(limit) \
.offset(offset) \
.all()
return {
'total': total,
'items': [email.to_dict() for email in emails]
}
except Exception as e:
logger.error(f"获取邮件列表时出错: {str(e)}")
return {'total': 0, 'items': []}
finally:
db.close()
def get_email_by_id(self, email_id, mark_as_read=True):
"""获取指定ID的邮件详情"""
db = self.db_session_factory()
try:
email = db.query(Email).filter(Email.id == email_id).first()
if not email:
return None
if mark_as_read and not email.read:
email.read = True
email.last_read = datetime.now()
db.commit()
# 获取附件信息
attachments = [attachment.to_dict() for attachment in email.attachments]
# 构建完整响应
result = email.to_dict()
result['body_text'] = email.body_text
result['body_html'] = email.body_html
result['attachments'] = attachments
return result
except Exception as e:
db.rollback()
logger.error(f"获取邮件详情时出错: {str(e)}")
return None
finally:
db.close()
def delete_email(self, email_id):
"""删除指定ID的邮件"""
db = self.db_session_factory()
try:
email = db.query(Email).filter(Email.id == email_id).first()
if not email:
return False
db.delete(email)
db.commit()
return True
except Exception as e:
db.rollback()
logger.error(f"删除邮件时出错: {str(e)}")
return False
finally:
db.close()
def get_attachment_content(self, attachment_id):
"""获取附件内容"""
db = self.db_session_factory()
try:
attachment = db.query(Attachment).filter(Attachment.id == attachment_id).first()
if not attachment:
return None
content = attachment.get_content()
return {
'content': content,
'filename': attachment.filename,
'content_type': attachment.content_type
}
except Exception as e:
logger.error(f"获取附件内容时出错: {str(e)}")
return None
finally:
db.close()