Files
emailsystemv2/app/utils.py
huangzhenpc a8d1b41381 first commit
2025-02-26 18:29:10 +08:00

194 lines
6.6 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import smtplib
import os
import json
import logging
from email.parser import BytesParser
from email.policy import default
from datetime import datetime
from .models import db, Email
import redis
import smtpd
import asyncore
import base64
from .config import Config
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger('smtp_server')
# 初始化 Redis 客户端
redis_client = redis.from_url(Config.REDIS_URL)
def receive_email():
# 这里实现邮件接收逻辑
# 假设我们从某个 SMTP 服务器接收邮件
# 解析邮件并存储到 Redis
# 示例:
raw_email = b'...' # 这里应该是接收到的原始邮件内容
email = BytesParser(policy=default).parsebytes(raw_email)
email_data = {
'subject': email['subject'],
'sender': email['from'],
'recipient': email['to'],
'body': email.get_body(preferencelist=('plain')).get_content()
}
# 将邮件信息存储到 Redis
redis_client.hmset(f'email:{email_data['subject']}', email_data)
class CustomSMTPServer(smtpd.SMTPServer):
def process_message(self, peer, mailfrom, rcpttos, data):
try:
# 记录接收到的邮件基本信息
logger.info(f'Received mail from {mailfrom} to {rcpttos}')
# 验证收件人域名
for rcpt in rcpttos:
if not rcpt.endswith('@nosqli.com'):
logger.warning(f'Rejected mail to {rcpt}: invalid domain')
continue
# 解析邮件
email = BytesParser(policy=default).parsebytes(data)
# 获取邮件正文
body = self._get_email_body(email)
# 处理附件
attachments = self._process_attachments(email)
# 构建邮件数据
timestamp = datetime.now().isoformat()
message_id = email.get('Message-ID', f'<{timestamp}@nosqli.com>')
email_data = {
'message_id': message_id,
'subject': email.get('subject', ''),
'sender': mailfrom,
'recipients': json.dumps(rcpttos),
'body': body,
'timestamp': timestamp,
'attachments': json.dumps(attachments),
'headers': json.dumps(dict(email.items())),
'peer': json.dumps(peer)
}
# 存储邮件
self._store_email(email_data)
logger.info(f'Successfully processed mail: {message_id}')
except Exception as e:
logger.error(f'Error processing email: {str(e)}', exc_info=True)
raise
def _get_email_body(self, email):
"""提取邮件正文"""
try:
if email.is_multipart():
for part in email.walk():
if part.get_content_type() == "text/plain":
return part.get_payload(decode=True).decode()
else:
return email.get_payload(decode=True).decode()
return ""
except Exception as e:
logger.error(f'Error extracting email body: {str(e)}')
return ""
def _process_attachments(self, email):
"""处理邮件附件"""
attachments = []
try:
if email.is_multipart():
for part in email.walk():
if part.get_content_maintype() == 'multipart':
continue
if part.get('Content-Disposition') is None:
continue
filename = part.get_filename()
if filename:
attachment_data = part.get_payload(decode=True)
attachments.append({
'filename': filename,
'content': base64.b64encode(attachment_data).decode(),
'content_type': part.get_content_type(),
'size': len(attachment_data)
})
except Exception as e:
logger.error(f'Error processing attachments: {str(e)}')
return attachments
def _store_email(self, email_data):
"""存储邮件到 Redis"""
try:
# 使用 message_id 作为主键
email_key = f'email:{email_data["message_id"]}'
redis_client.hmset(email_key, email_data)
# 为每个收件人创建索引
recipients = json.loads(email_data['recipients'])
for recipient in recipients:
recipient_key = f'recipient:{recipient}'
redis_client.lpush(recipient_key, email_key)
# 创建时间索引
time_key = f'time:{email_data["timestamp"]}'
redis_client.set(time_key, email_key)
# 设置过期时间可选这里设置为30天
redis_client.expire(email_key, 30 * 24 * 60 * 60)
except Exception as e:
logger.error(f'Error storing email: {str(e)}')
raise
def start_smtp_server(host='0.0.0.0', port=25):
"""启动 SMTP 服务器"""
try:
logger.info(f'Starting SMTP server on {host}:{port}')
server = CustomSMTPServer((host, port), None)
asyncore.loop()
except Exception as e:
logger.error(f'Error starting SMTP server: {str(e)}')
raise
def get_emails_by_recipient(recipient, limit=10):
"""获取指定收件人的最新邮件"""
try:
recipient_key = f'recipient:{recipient}'
email_keys = redis_client.lrange(recipient_key, 0, limit - 1)
emails = []
for key in email_keys:
email_data = redis_client.hgetall(key.decode())
if email_data:
# 转换数据为字符串
email_data = {k.decode(): v.decode() for k, v in email_data.items()}
emails.append(email_data)
return emails
except Exception as e:
print(f'Error fetching emails: {e}')
return []
def get_attachment(email_key, attachment_index):
"""获取指定邮件的附件"""
try:
email_data = redis_client.hgetall(email_key)
if email_data:
attachments = json.loads(email_data[b'attachments'].decode())
if 0 <= attachment_index < len(attachments):
return attachments[attachment_index]
return None
except Exception as e:
print(f'Error fetching attachment: {e}')
return None