import smtplib import os import json import logging import sys from email.parser import BytesParser from email.policy import default from datetime import datetime import redis import smtpd import asyncore import base64 from .config import Config # 配置日志输出到控制台 logging.basicConfig( level=logging.DEBUG, # 改为 DEBUG 级别 format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', handlers=[ logging.StreamHandler(sys.stdout) ] ) logger = logging.getLogger('smtp_server') # 初始化 Redis 客户端 redis_client = redis.from_url(Config.REDIS_URL) class CustomSMTPServer(smtpd.SMTPServer): def __init__(self, localaddr, remoteaddr): logger.info(f"Initializing SMTP server on {localaddr}") super().__init__(localaddr, remoteaddr) def process_message(self, peer, mailfrom, rcpttos, data, **kwargs): try: logger.debug(f"Connection from peer: {peer}") logger.debug(f"Mail from: {mailfrom}") logger.debug(f"Recipients: {rcpttos}") logger.debug(f"Raw data length: {len(data)} bytes") logger.debug(f"Additional kwargs: {kwargs}") # 记录额外的参数 # 记录接收到的邮件基本信息 logger.info(f"Received mail from {mailfrom} to {rcpttos}") # 验证收件人域名 valid_recipients = [] for rcpt in rcpttos: if not rcpt.endswith('@nosqli.com'): logger.warning(f"Rejected mail to {rcpt}: invalid domain") else: valid_recipients.append(rcpt) if not valid_recipients: logger.error("No valid recipients found") return # 解析邮件 logger.debug("Parsing email data...") email = BytesParser(policy=default).parsebytes(data) # 获取邮件正文 body = self._get_email_body(email) logger.debug(f"Email body length: {len(body) if body else 0}") # 处理附件 attachments = self._process_attachments(email) logger.debug(f"Found {len(attachments)} attachments") # 构建邮件数据 timestamp = datetime.now().isoformat() message_id = email.get('Message-ID', f"<{timestamp}@nosqli.com>") email_data = { 'message_id': message_id, 'subject': email.get('subject', ''), 'sender': mailfrom, 'recipients': json.dumps(valid_recipients), 'body': body, 'timestamp': timestamp, 'attachments': json.dumps(attachments), 'headers': json.dumps(dict(email.items())), 'peer': json.dumps(peer) } # 存储邮件 self._store_email(email_data) logger.info(f"Successfully processed mail: {message_id}") except Exception as e: logger.error(f"Error processing email: {str(e)}", exc_info=True) raise def _get_email_body(self, email): """提取邮件正文""" try: logger.debug("Extracting email body...") if email.is_multipart(): for part in email.walk(): if part.get_content_type() == "text/plain": content = part.get_payload(decode=True).decode() logger.debug(f"Found text/plain content: {len(content)} chars") return content else: content = email.get_payload(decode=True).decode() logger.debug(f"Found single part content: {len(content)} chars") return content logger.warning("No text content found in email") return "" except Exception as e: logger.error(f"Error extracting email body: {str(e)}") return "" def _process_attachments(self, email): """处理邮件附件""" attachments = [] try: logger.debug("Processing attachments...") if email.is_multipart(): for part in email.walk(): if part.get_content_maintype() == 'multipart': continue if part.get('Content-Disposition') is None: continue filename = part.get_filename() if filename: logger.debug(f"Processing attachment: {filename}") attachment_data = part.get_payload(decode=True) attachments.append({ 'filename': filename, 'content': base64.b64encode(attachment_data).decode(), 'content_type': part.get_content_type(), 'size': len(attachment_data) }) logger.debug(f"Attachment processed: {filename} ({len(attachment_data)} bytes)") except Exception as e: logger.error(f"Error processing attachments: {str(e)}") return attachments def _store_email(self, email_data): """存储邮件到 Redis""" try: logger.debug("Storing email in Redis...") # 使用 message_id 作为主键 email_key = f"email:{email_data['message_id']}" redis_client.hmset(email_key, email_data) logger.debug(f"Stored email with key: {email_key}") # 为每个收件人创建索引 recipients = json.loads(email_data['recipients']) for recipient in recipients: recipient_key = f"recipient:{recipient}" redis_client.lpush(recipient_key, email_key) logger.debug(f"Created recipient index: {recipient_key}") # 创建时间索引 time_key = f"time:{email_data['timestamp']}" redis_client.set(time_key, email_key) logger.debug(f"Created time index: {time_key}") # 设置过期时间(可选,这里设置为30天) redis_client.expire(email_key, 30 * 24 * 60 * 60) logger.debug("Set expiration time: 30 days") except Exception as e: logger.error(f"Error storing email: {str(e)}") raise def start_smtp_server(host='0.0.0.0', port=25): """启动 SMTP 服务器""" try: logger.info(f"Starting SMTP server on {host}:{port}") server = CustomSMTPServer((host, port), None) logger.info("SMTP server initialized, entering main loop...") asyncore.loop() except Exception as e: logger.error(f"Error starting SMTP server: {str(e)}") raise def get_emails_by_recipient(recipient, limit=10): """获取指定收件人的最新邮件""" try: recipient_key = f'recipient:{recipient}' email_keys = redis_client.lrange(recipient_key, 0, limit - 1) emails = [] for key in email_keys: email_data = redis_client.hgetall(key.decode()) if email_data: # 转换数据为字符串 email_data = {k.decode(): v.decode() for k, v in email_data.items()} emails.append(email_data) return emails except Exception as e: print(f'Error fetching emails: {e}') return [] def get_attachment(email_key, attachment_index): """获取指定邮件的附件""" try: email_data = redis_client.hgetall(email_key) if email_data: attachments = json.loads(email_data[b'attachments'].decode()) if 0 <= attachment_index < len(attachments): return attachments[attachment_index] return None except Exception as e: print(f'Error fetching attachment: {e}') return None