初始提交

This commit is contained in:
huangzhenpc
2025-03-05 10:42:37 +08:00
parent 14c7e6d7f9
commit ce5785c481
9 changed files with 545 additions and 0 deletions

42
.gitignore vendored Normal file
View File

@@ -0,0 +1,42 @@
# 忽略 old 文件夹
old/
old/*
# Python
__pycache__/
*.py[cod]
*$py.class
*.so
.Python
env/
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
*.egg-info/
.installed.cfg
*.egg
# 虚拟环境
venv/
ENV/
# IDE
.idea/
.vscode/
*.swp
*.swo
# 日志文件
*.log
# 本地配置文件
.env
.env.local

54
app/__init__.py Normal file
View File

@@ -0,0 +1,54 @@
from flask import Flask, request, jsonify
from .config import Config
from .utils import get_latest_emails, get_latest_email_with_code, add_allowed_domain, remove_allowed_domain, get_allowed_domains, get_allowed_domains_with_time
def create_app():
app = Flask(__name__)
app.config.from_object(Config)
@app.route('/emails', methods=['GET'])
def get_emails():
recipient = request.args.get('recipient')
limit = request.args.get('limit', default=10, type=int)
if not recipient:
return jsonify({'error': 'Recipient email is required'}), 400
emails = get_latest_emails(recipient, limit)
return jsonify(emails)
@app.route('/latest_email', methods=['GET'])
def get_latest_email():
recipient = request.args.get('recipient')
if not recipient:
return jsonify({'error': 'Recipient email is required'}), 400
email_data = get_latest_email_with_code(recipient)
if email_data:
return jsonify(email_data)
return jsonify({'error': 'No emails found for this recipient'}), 404
@app.route('/allowed_domains/add', methods=['POST'])
def add_domain():
domain = request.json.get('domain')
if not domain:
return jsonify({'error': 'Domain is required'}), 400
add_allowed_domain(domain)
return jsonify({'message': 'Domain added successfully'}), 201
@app.route('/allowed_domains/remove', methods=['POST'])
def remove_domain():
domain = request.json.get('domain')
if not domain:
return jsonify({'error': 'Domain is required'}), 400
remove_allowed_domain(domain)
return jsonify({'message': 'Domain removed successfully'}), 200
@app.route('/allowed_domains/list', methods=['GET'])
def list_domains():
domains = get_allowed_domains()
return jsonify(domains), 200
@app.route('/allowed_domains/list_with_time', methods=['GET'])
def list_domains_with_time():
domains_with_time = get_allowed_domains_with_time()
return jsonify(domains_with_time), 200
return app

7
app/config.py Normal file
View File

@@ -0,0 +1,7 @@
class Config:
# Redis 连接配置
# 优先使用 1Panel Redis 内部连接
REDIS_URL = "redis://localhost:6379/0"
# 备用配置:如果内部连接不通,可以使用以下外部连接配置
# REDIS_URL = "redis://localhost:6380/0"

28
app/init.py Normal file
View File

@@ -0,0 +1,28 @@
from flask import Flask, request, jsonify, render_template
from .utils import get_latest_emails, get_latest_email_with_code, add_allowed_domain, remove_allowed_domain, get_allowed_domains, get_allowed_domains_with_time
import redis
from .config import Config
from .smtp_server import start_smtp_server
import threading
def create_app():
app = Flask(__name__)
app.config.from_object(Config)
# 现有的路由保持不变...
@app.route('/')
def list_emails():
# 获取所有邮件
emails = get_latest_emails(count=50) # 获取最新50封邮件
return render_template('email_list.html', emails=emails)
@app.route('/email/<email_id>')
def view_email(email_id):
# 获取单个邮件详情
email = get_latest_email_with_code(email_id)
if email:
return render_template('email_detail.html', email=email)
return "邮件不存在", 404
return app

View File

@@ -0,0 +1,43 @@
<!DOCTYPE html>
<html>
<head>
<title>邮件详情</title>
<style>
body { font-family: Arial, sans-serif; margin: 20px; }
.email-container {
max-width: 800px;
margin: 0 auto;
}
.email-header {
border-bottom: 1px solid #ddd;
padding-bottom: 10px;
margin-bottom: 20px;
}
.email-meta { color: #666; margin: 5px 0; }
.email-content {
white-space: pre-wrap;
padding: 20px;
background: #f9f9f9;
border-radius: 4px;
}
.back-link {
display: inline-block;
margin-bottom: 20px;
color: #0066cc;
text-decoration: none;
}
</style>
</head>
<body>
<div class="email-container">
<a href="{{ url_for('list_emails') }}" class="back-link">← 返回列表</a>
<div class="email-header">
<h2>{{ email.subject }}</h2>
<div class="email-meta">发件人: {{ email.sender }}</div>
<div class="email-meta">收件人: {{ email.recipient }}</div>
<div class="email-meta">时间: {{ email.timestamp }}</div>
</div>
<div class="email-content">{{ email.content }}</div>
</div>
</body>
</html>

View File

@@ -0,0 +1,37 @@
<!DOCTYPE html>
<html>
<head>
<title>邮件列表</title>
<style>
body { font-family: Arial, sans-serif; margin: 20px; }
.email-list { list-style: none; padding: 0; }
.email-item {
border: 1px solid #ddd;
margin: 10px 0;
padding: 10px;
border-radius: 4px;
}
.email-item:hover { background-color: #f5f5f5; }
.email-subject { font-weight: bold; }
.email-meta { color: #666; font-size: 0.9em; }
a { text-decoration: none; color: inherit; }
</style>
</head>
<body>
<h1>邮件列表</h1>
<ul class="email-list">
{% for email in emails %}
<li class="email-item">
<a href="{{ url_for('view_email', email_id=email.id) }}">
<div class="email-subject">{{ email.subject }}</div>
<div class="email-meta">
发件人: {{ email.sender }} |
收件人: {{ email.recipient }} |
时间: {{ email.timestamp }}
</div>
</a>
</li>
{% endfor %}
</ul>
</body>
</html>

305
app/utils.py Normal file
View File

@@ -0,0 +1,305 @@
import smtplib
import os
import json
import logging
import sys
from email.parser import BytesParser
from email.policy import default
from datetime import datetime
import redis
import smtpd
import asyncore
import base64
from .config import Config
# 配置日志输出到控制台
logging.basicConfig(
level=logging.DEBUG, # 改为 DEBUG 级别
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.StreamHandler(sys.stdout)
]
)
logger = logging.getLogger('smtp_server')
# 初始化 Redis 客户端
redis_client = redis.from_url(Config.REDIS_URL)
class CustomSMTPServer(smtpd.SMTPServer):
def __init__(self, localaddr, remoteaddr):
logger.info(f"Initializing SMTP server on {localaddr}")
super().__init__(localaddr, remoteaddr)
def process_message(self, peer, mailfrom, rcpttos, data, **kwargs):
try:
logger.debug(f"Connection from peer: {peer}")
logger.debug(f"Mail from: {mailfrom}")
logger.debug(f"Recipients: {rcpttos}")
logger.debug(f"Raw data length: {len(data)} bytes")
logger.debug(f"Additional kwargs: {kwargs}")
# 记录接收到的邮件基本信息
logger.info(f"Received mail from {mailfrom} to {rcpttos}")
# 从 Redis 获取允许的域名
allowed_domains = get_allowed_domains()
valid_recipients = []
for rcpt in rcpttos:
is_valid = any(rcpt.endswith(f'@{domain}') for domain in allowed_domains)
if not is_valid:
logger.warning(f"Rejected mail to {rcpt}: invalid domain")
else:
valid_recipients.append(rcpt)
if not valid_recipients:
logger.error("No valid recipients found")
return
# 解析邮件
logger.debug("Parsing email data...")
email = BytesParser(policy=default).parsebytes(data)
# 获取邮件正文
body = self._get_email_body(email)
logger.debug(f"Email body length: {len(body) if body else 0}")
# 处理附件
attachments = self._process_attachments(email)
logger.debug(f"Found {len(attachments)} attachments")
# 构建邮件数据
timestamp = datetime.now().isoformat()
message_id = email.get('Message-ID', f"<{timestamp}@nosqli.com>")
email_data = {
'message_id': message_id,
'subject': email.get('subject', ''),
'sender': mailfrom,
'recipients': json.dumps(valid_recipients),
'body': body,
'timestamp': timestamp,
'attachments': json.dumps(attachments),
'headers': json.dumps(dict(email.items())),
'peer': json.dumps(peer)
}
# 存储邮件
self._store_email(email_data)
logger.info(f"Successfully processed mail: {message_id}")
except Exception as e:
logger.error(f"Error processing email: {str(e)}", exc_info=True)
raise
def _get_email_body(self, email):
"""提取邮件正文"""
try:
logger.debug("Extracting email body...")
if email.is_multipart():
for part in email.walk():
if part.get_content_type() == "text/plain":
content = part.get_payload(decode=True).decode()
logger.debug(f"Found text/plain content: {len(content)} chars")
return content
else:
content = email.get_payload(decode=True).decode()
logger.debug(f"Found single part content: {len(content)} chars")
return content
logger.warning("No text content found in email")
return ""
except Exception as e:
logger.error(f"Error extracting email body: {str(e)}")
return ""
def _process_attachments(self, email):
"""处理邮件附件"""
attachments = []
try:
logger.debug("Processing attachments...")
if email.is_multipart():
for part in email.walk():
if part.get_content_maintype() == 'multipart':
continue
if part.get('Content-Disposition') is None:
continue
filename = part.get_filename()
if filename:
logger.debug(f"Processing attachment: {filename}")
attachment_data = part.get_payload(decode=True)
attachments.append({
'filename': filename,
'content': base64.b64encode(attachment_data).decode(),
'content_type': part.get_content_type(),
'size': len(attachment_data)
})
logger.debug(f"Attachment processed: {filename} ({len(attachment_data)} bytes)")
except Exception as e:
logger.error(f"Error processing attachments: {str(e)}")
return attachments
def _store_email(self, email_data):
"""存储邮件到 Redis"""
try:
logger.debug("Storing email in Redis...")
# 使用 message_id 作为主键
email_key = f"email:{email_data['message_id']}"
redis_client.hmset(email_key, email_data)
logger.debug(f"Stored email with key: {email_key}")
# 为每个收件人创建索引
recipients = json.loads(email_data['recipients'])
for recipient in recipients:
recipient_key = f"recipient:{recipient}"
redis_client.lpush(recipient_key, email_key)
logger.debug(f"Created recipient index: {recipient_key}")
# 创建时间索引
time_key = f"time:{email_data['timestamp']}"
redis_client.set(time_key, email_key)
logger.debug(f"Created time index: {time_key}")
# 设置过期时间可选这里设置为10分钟
redis_client.expire(email_key, 10 * 60)
logger.debug("Set expiration time: 10 minutes")
except Exception as e:
logger.error(f"Error storing email: {str(e)}")
raise
def start_smtp_server(host='0.0.0.0', port=25):
"""启动 SMTP 服务器"""
try:
logger.info(f"Starting SMTP server on {host}:{port}")
server = CustomSMTPServer((host, port), None)
logger.info("SMTP server initialized, entering main loop...")
asyncore.loop()
except Exception as e:
logger.error(f"Error starting SMTP server: {str(e)}")
raise
def get_emails_by_recipient(recipient, limit=10):
"""获取指定收件人的最新邮件"""
try:
recipient_key = f'recipient:{recipient}'
email_keys = redis_client.lrange(recipient_key, 0, limit - 1)
emails = []
for key in email_keys:
email_data = redis_client.hgetall(key.decode())
if email_data:
# 转换数据为字符串
email_data = {k.decode(): v.decode() for k, v in email_data.items()}
emails.append(email_data)
return emails
except Exception as e:
print(f'Error fetching emails: {e}')
return []
def get_attachment(email_key, attachment_index):
"""获取指定邮件的附件"""
try:
email_data = redis_client.hgetall(email_key)
if email_data:
attachments = json.loads(email_data[b'attachments'].decode())
if 0 <= attachment_index < len(attachments):
return attachments[attachment_index]
return None
except Exception as e:
print(f'Error fetching attachment: {e}')
return None
def get_latest_emails(recipient, limit=10):
"""获取指定收件人的最新邮件"""
try:
recipient_key = f'recipient:{recipient}'
email_keys = redis_client.lrange(recipient_key, 0, limit - 1)
emails = []
for key in email_keys:
email_data = redis_client.hgetall(key.decode())
if email_data:
email_data = {k.decode(): v.decode() for k, v in email_data.items()}
emails.append(email_data)
return emails
except Exception as e:
logger.error(f'Error fetching emails: {e}')
return []
def get_latest_email_with_code(recipient):
"""获取指定收件人的最新邮件并提取验证码"""
try:
recipient_key = f'recipient:{recipient}'
email_key = redis_client.lindex(recipient_key, 0) # 获取最新邮件的键
if email_key:
email_data = redis_client.hgetall(email_key.decode())
if email_data:
email_data = {k.decode(): v.decode() for k, v in email_data.items()}
body = email_data.get('body', '')
# 假设验证码是以某种格式存在于邮件正文中,例如 "验证码: 123456"
code = extract_code_from_body(body)
email_data['code'] = code # 将验证码添加到返回数据中
return email_data
return None
except Exception as e:
logger.error(f'Error fetching latest email with code: {e}')
return None
def extract_code_from_body(body):
"""从邮件正文中提取验证码"""
import re
match = re.search(r'\b(\d{6})\b', body)
return match.group(1) if match else None
def add_allowed_domain(domain):
"""添加允许的域名并记录添加时间"""
try:
timestamp = datetime.now().isoformat()
redis_client.sadd('allowed_domains', domain)
redis_client.hset(f'domain:{domain}', 'status', 'allowed') # 新增状态键
redis_client.hset(f'domain_time:{domain}', 'added_at', timestamp) # 记录添加时间
logger.info(f'Added allowed domain: {domain} at {timestamp}')
except Exception as e:
logger.error(f'Error adding allowed domain: {e}')
def remove_allowed_domain(domain):
"""删除允许的域名"""
try:
redis_client.srem('allowed_domains', domain)
logger.info(f'Removed allowed domain: {domain}')
except Exception as e:
logger.error(f'Error removing allowed domain: {e}')
def get_allowed_domains():
"""获取当前允许的域名列表"""
try:
domains = redis_client.smembers('allowed_domains')
return [domain.decode() for domain in domains]
except Exception as e:
logger.error(f'Error fetching allowed domains: {e}')
return []
def get_allowed_domains_with_time():
"""获取当前允许的域名及其添加时间"""
try:
domains = redis_client.smembers('allowed_domains')
domain_info = {}
for domain in domains:
domain = domain.decode()
added_at = redis_client.hget(f'domain_time:{domain}', 'added_at')
domain_info[domain] = added_at.decode() if added_at else None
return domain_info
except Exception as e:
logger.error(f'Error fetching allowed domains with time: {e}')
return {}

4
requirements.txt Normal file
View File

@@ -0,0 +1,4 @@
Flask==2.0.1
Werkzeug==2.0.1
redis==3.5.3
Flask-Mail==0.9.1

25
run.py Normal file
View File

@@ -0,0 +1,25 @@
import threading
import argparse
from app import create_app
from app.utils import start_smtp_server
app = create_app()
def run_smtp_server(host, port):
start_smtp_server(host=host, port=port)
if __name__ == '__main__':
parser = argparse.ArgumentParser(description='Email System Server')
parser.add_argument('--host', default='0.0.0.0', help='Host to bind')
parser.add_argument('--port', type=int, default=5000, help='Port for web server')
parser.add_argument('--smtp-port', type=int, default=25, help='Port for SMTP server')
args = parser.parse_args()
# 在单独的线程中启动 SMTP 服务器
smtp_thread = threading.Thread(target=run_smtp_server, args=(args.host, args.smtp_port))
smtp_thread.daemon = True
smtp_thread.start()
# 启动 Flask 应用
app.run(host=args.host, port=args.port)