261 lines
9.8 KiB
Python
261 lines
9.8 KiB
Python
import smtplib
|
||
import os
|
||
import json
|
||
import logging
|
||
import sys
|
||
from email.parser import BytesParser
|
||
from email.policy import default
|
||
from datetime import datetime
|
||
import redis
|
||
import smtpd
|
||
import asyncore
|
||
import base64
|
||
from .config import Config
|
||
|
||
# 配置日志输出到控制台
|
||
logging.basicConfig(
|
||
level=logging.DEBUG, # 改为 DEBUG 级别
|
||
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
|
||
handlers=[
|
||
logging.StreamHandler(sys.stdout)
|
||
]
|
||
)
|
||
logger = logging.getLogger('smtp_server')
|
||
|
||
# 初始化 Redis 客户端
|
||
redis_client = redis.from_url(Config.REDIS_URL)
|
||
|
||
class CustomSMTPServer(smtpd.SMTPServer):
|
||
def __init__(self, localaddr, remoteaddr):
|
||
logger.info(f"Initializing SMTP server on {localaddr}")
|
||
super().__init__(localaddr, remoteaddr)
|
||
|
||
# 定义允许的域名列表
|
||
ALLOWED_DOMAINS = ['nosqli.com', 'email.nosqli.com']
|
||
|
||
def process_message(self, peer, mailfrom, rcpttos, data, **kwargs):
|
||
try:
|
||
logger.debug(f"Connection from peer: {peer}")
|
||
logger.debug(f"Mail from: {mailfrom}")
|
||
logger.debug(f"Recipients: {rcpttos}")
|
||
logger.debug(f"Raw data length: {len(data)} bytes")
|
||
logger.debug(f"Additional kwargs: {kwargs}") # 记录额外的参数
|
||
|
||
# 记录接收到的邮件基本信息
|
||
logger.info(f"Received mail from {mailfrom} to {rcpttos}")
|
||
|
||
# 验证收件人域名
|
||
valid_recipients = []
|
||
for rcpt in rcpttos:
|
||
is_valid = any(rcpt.endswith(f'@{domain}') for domain in self.ALLOWED_DOMAINS)
|
||
if not is_valid:
|
||
logger.warning(f"Rejected mail to {rcpt}: invalid domain")
|
||
else:
|
||
valid_recipients.append(rcpt)
|
||
|
||
if not valid_recipients:
|
||
logger.error("No valid recipients found")
|
||
return
|
||
|
||
# 解析邮件
|
||
logger.debug("Parsing email data...")
|
||
email = BytesParser(policy=default).parsebytes(data)
|
||
|
||
# 获取邮件正文
|
||
body = self._get_email_body(email)
|
||
logger.debug(f"Email body length: {len(body) if body else 0}")
|
||
|
||
# 处理附件
|
||
attachments = self._process_attachments(email)
|
||
logger.debug(f"Found {len(attachments)} attachments")
|
||
|
||
# 构建邮件数据
|
||
timestamp = datetime.now().isoformat()
|
||
message_id = email.get('Message-ID', f"<{timestamp}@nosqli.com>")
|
||
|
||
email_data = {
|
||
'message_id': message_id,
|
||
'subject': email.get('subject', ''),
|
||
'sender': mailfrom,
|
||
'recipients': json.dumps(valid_recipients),
|
||
'body': body,
|
||
'timestamp': timestamp,
|
||
'attachments': json.dumps(attachments),
|
||
'headers': json.dumps(dict(email.items())),
|
||
'peer': json.dumps(peer)
|
||
}
|
||
|
||
# 存储邮件
|
||
self._store_email(email_data)
|
||
|
||
logger.info(f"Successfully processed mail: {message_id}")
|
||
|
||
except Exception as e:
|
||
logger.error(f"Error processing email: {str(e)}", exc_info=True)
|
||
raise
|
||
|
||
def _get_email_body(self, email):
|
||
"""提取邮件正文"""
|
||
try:
|
||
logger.debug("Extracting email body...")
|
||
if email.is_multipart():
|
||
for part in email.walk():
|
||
if part.get_content_type() == "text/plain":
|
||
content = part.get_payload(decode=True).decode()
|
||
logger.debug(f"Found text/plain content: {len(content)} chars")
|
||
return content
|
||
else:
|
||
content = email.get_payload(decode=True).decode()
|
||
logger.debug(f"Found single part content: {len(content)} chars")
|
||
return content
|
||
logger.warning("No text content found in email")
|
||
return ""
|
||
except Exception as e:
|
||
logger.error(f"Error extracting email body: {str(e)}")
|
||
return ""
|
||
|
||
def _process_attachments(self, email):
|
||
"""处理邮件附件"""
|
||
attachments = []
|
||
try:
|
||
logger.debug("Processing attachments...")
|
||
if email.is_multipart():
|
||
for part in email.walk():
|
||
if part.get_content_maintype() == 'multipart':
|
||
continue
|
||
if part.get('Content-Disposition') is None:
|
||
continue
|
||
|
||
filename = part.get_filename()
|
||
if filename:
|
||
logger.debug(f"Processing attachment: {filename}")
|
||
attachment_data = part.get_payload(decode=True)
|
||
attachments.append({
|
||
'filename': filename,
|
||
'content': base64.b64encode(attachment_data).decode(),
|
||
'content_type': part.get_content_type(),
|
||
'size': len(attachment_data)
|
||
})
|
||
logger.debug(f"Attachment processed: {filename} ({len(attachment_data)} bytes)")
|
||
except Exception as e:
|
||
logger.error(f"Error processing attachments: {str(e)}")
|
||
return attachments
|
||
|
||
def _store_email(self, email_data):
|
||
"""存储邮件到 Redis"""
|
||
try:
|
||
logger.debug("Storing email in Redis...")
|
||
# 使用 message_id 作为主键
|
||
email_key = f"email:{email_data['message_id']}"
|
||
redis_client.hmset(email_key, email_data)
|
||
logger.debug(f"Stored email with key: {email_key}")
|
||
|
||
# 为每个收件人创建索引
|
||
recipients = json.loads(email_data['recipients'])
|
||
for recipient in recipients:
|
||
recipient_key = f"recipient:{recipient}"
|
||
redis_client.lpush(recipient_key, email_key)
|
||
logger.debug(f"Created recipient index: {recipient_key}")
|
||
|
||
# 创建时间索引
|
||
time_key = f"time:{email_data['timestamp']}"
|
||
redis_client.set(time_key, email_key)
|
||
logger.debug(f"Created time index: {time_key}")
|
||
|
||
# 设置过期时间(可选,这里设置为10分钟)
|
||
redis_client.expire(email_key, 10 * 60)
|
||
logger.debug("Set expiration time: 10 minutes")
|
||
|
||
except Exception as e:
|
||
logger.error(f"Error storing email: {str(e)}")
|
||
raise
|
||
|
||
|
||
def start_smtp_server(host='0.0.0.0', port=25):
|
||
"""启动 SMTP 服务器"""
|
||
try:
|
||
logger.info(f"Starting SMTP server on {host}:{port}")
|
||
server = CustomSMTPServer((host, port), None)
|
||
logger.info("SMTP server initialized, entering main loop...")
|
||
asyncore.loop()
|
||
except Exception as e:
|
||
logger.error(f"Error starting SMTP server: {str(e)}")
|
||
raise
|
||
|
||
|
||
def get_emails_by_recipient(recipient, limit=10):
|
||
"""获取指定收件人的最新邮件"""
|
||
try:
|
||
recipient_key = f'recipient:{recipient}'
|
||
email_keys = redis_client.lrange(recipient_key, 0, limit - 1)
|
||
|
||
emails = []
|
||
for key in email_keys:
|
||
email_data = redis_client.hgetall(key.decode())
|
||
if email_data:
|
||
# 转换数据为字符串
|
||
email_data = {k.decode(): v.decode() for k, v in email_data.items()}
|
||
emails.append(email_data)
|
||
|
||
return emails
|
||
except Exception as e:
|
||
print(f'Error fetching emails: {e}')
|
||
return []
|
||
|
||
|
||
def get_attachment(email_key, attachment_index):
|
||
"""获取指定邮件的附件"""
|
||
try:
|
||
email_data = redis_client.hgetall(email_key)
|
||
if email_data:
|
||
attachments = json.loads(email_data[b'attachments'].decode())
|
||
if 0 <= attachment_index < len(attachments):
|
||
return attachments[attachment_index]
|
||
return None
|
||
except Exception as e:
|
||
print(f'Error fetching attachment: {e}')
|
||
return None
|
||
|
||
|
||
def get_latest_emails(recipient, limit=10):
|
||
"""获取指定收件人的最新邮件"""
|
||
try:
|
||
recipient_key = f'recipient:{recipient}'
|
||
email_keys = redis_client.lrange(recipient_key, 0, limit - 1)
|
||
emails = []
|
||
for key in email_keys:
|
||
email_data = redis_client.hgetall(key.decode())
|
||
if email_data:
|
||
email_data = {k.decode(): v.decode() for k, v in email_data.items()}
|
||
emails.append(email_data)
|
||
return emails
|
||
except Exception as e:
|
||
logger.error(f'Error fetching emails: {e}')
|
||
return []
|
||
|
||
|
||
def get_latest_email_with_code(recipient):
|
||
"""获取指定收件人的最新邮件并提取验证码"""
|
||
try:
|
||
recipient_key = f'recipient:{recipient}'
|
||
email_key = redis_client.lindex(recipient_key, 0) # 获取最新邮件的键
|
||
if email_key:
|
||
email_data = redis_client.hgetall(email_key.decode())
|
||
if email_data:
|
||
email_data = {k.decode(): v.decode() for k, v in email_data.items()}
|
||
body = email_data.get('body', '')
|
||
# 假设验证码是以某种格式存在于邮件正文中,例如 "验证码: 123456"
|
||
code = extract_code_from_body(body)
|
||
email_data['code'] = code # 将验证码添加到返回数据中
|
||
return email_data
|
||
return None
|
||
except Exception as e:
|
||
logger.error(f'Error fetching latest email with code: {e}')
|
||
return None
|
||
|
||
|
||
def extract_code_from_body(body):
|
||
"""从邮件正文中提取验证码"""
|
||
import re
|
||
match = re.search(r'\b(\d{6})\b', body)
|
||
return match.group(1) if match else None |