Compare commits

...

18 Commits

Author SHA1 Message Date
huangzhenpc
ea0af99c47 保存现有功能 增加域名和添加时间关联 2025-03-01 11:26:31 +08:00
huangzhenpc
0720ee9110 保存现有功能 增加域名和添加时间关联 2025-03-01 11:25:11 +08:00
huangzhenpc
34eb2528c9 保存现有功能 增加域名和添加时间关联 2025-03-01 11:23:28 +08:00
huangzhenpc
a14ed60b83 huanx 2025-03-01 11:19:58 +08:00
huangzhenpc
75b43e046a reids 域名添加 添加时间映射 2025-03-01 11:13:27 +08:00
huangzhenpc
c5d8533fad reids 域名添加 添加时间映射 2025-03-01 11:11:44 +08:00
huangzhenpc
6d9293cb9d reids 动态thinkadmin后台增加域名 2025-02-27 11:13:22 +08:00
huangzhenpc
dab8c386e8 testapiaddyuming 2025-02-27 10:08:58 +08:00
huangzhenpc
c158e42ded test 2025-02-27 09:27:44 +08:00
huangzhenpc
064b44c3b3 test 2025-02-26 19:14:40 +08:00
huangzhenpc
10ccab1da6 test 2025-02-26 19:05:53 +08:00
huangzhenpc
90cf18264b test 2025-02-26 19:02:18 +08:00
huangzhenpc
4518168641 test 2025-02-26 18:52:05 +08:00
huangzhenpc
ba57cfc588 test 2025-02-26 18:50:40 +08:00
huangzhenpc
d3797bcb60 test 2025-02-26 18:48:31 +08:00
huangzhenpc
ce450544b1 test 2025-02-26 18:42:59 +08:00
huangzhenpc
1b81f4eebd test 2025-02-26 18:38:10 +08:00
huangzhenpc
36087b1e05 test 2025-02-26 18:35:28 +08:00
7 changed files with 340 additions and 93 deletions

129
README.md
View File

@@ -1,32 +1,139 @@
# 邮件系统使用指南
# 自托管邮件系统文档
这是一个自托管的邮件系统,用于接收和处理电子邮件,特别适用于批量注册和验证场景。
## 概述
这是一个自托管的邮件系统,旨在接收和处理电子邮件,特别适用于批量注册和验证场景。该系统支持通过 API 接口查询和管理邮件,并提供验证码自动提取功能。
## 系统特性
- 自动接收邮件并存储到数据库
- 提供API接口查询和管理邮件
- 自动接收邮件并存储到 Redis 数据库
- 提供 API 接口查询和管理邮件
- 支持通过邮箱地址直接查询最新邮件
- 提供验证码自动提取功能
- 支持批量注册和验证操作
- 支持邮件附件的处理和下载
- 动态管理允许的域名列表
## 系统架构
该系统主要由以下组件构成:
- **Flask**: 用于构建 Web 应用和 API 接口
- **Redis**: 用于存储邮件数据和允许的域名列表
- **SMTP 服务器**: 用于接收邮件
## 安装和配置
### 系统要求
- Python 3.7+
- redis 和MySQL 数据库
- Redis 数据库
- 开放的网络端口SMTP: 25, HTTP: 5000
### 启动系统:
```bash
python run.py --host 0.0.0.0 --port 5000 --smtp-port 25
```
### 安装步骤
1. 克隆代码库:
```bash
git clone <repository-url>
cd emailsystem2
```
2. 安装依赖:
```bash
pip install -r requirements.txt
```
3. 启动 Redis 服务器(确保 Redis 已安装并运行):
```bash
redis-server
```
4. 启动邮件系统:
```bash
python run.py --host 0.0.0.0 --port 5000 --smtp-port 25
```
### 设置为系统服务(可选)
如果希望将邮件系统设置为系统服务,可以使用以下命令:
### 设置为系统服务(可选):
```bash
sudo cp email-system.service /etc/systemd/system/
sudo systemctl enable email-system
sudo systemctl start email-system
```
```
## API 接口
### 1. 获取最新邮件
- **请求方法**: `GET`
- **请求路径**: `/latest_email`
- **请求参数**:
- `recipient`: 收件邮箱地址
- **成功响应**:
```json
{
"message_id": "<message-id>",
"subject": "邮件主题",
"sender": "发件人邮箱",
"recipients": ["收件人邮箱"],
"body": "邮件正文",
"timestamp": "2025-02-26T19:03:53.838745",
"code": "验证码"
}
```
### 2. 管理允许的域名
- **添加域名**:
- **请求方法**: `POST`
- **请求路径**: `/allowed_domains/add`
- **请求体**:
```json
{
"domain": "email.nosqli.com"
}
```
- **成功响应**:
```json
{
"message": "Domain added successfully"
}
```
- **删除域名**:
- **请求方法**: `POST`
- **请求路径**: `/allowed_domains/remove`
- **请求体**:
```json
{
"domain": "email.nosqli.com"
}
```
- **成功响应**:
```json
{
"message": "Domain removed successfully"
}
```
- **获取当前允许的域名及其创建时间**:
- **请求方法**: `GET`
- **请求路径**: `/allowed_domains/list`
- **成功响应**:
```json
{
"email.nosqli.com": "2025-02-26T19:03:53.838745",
"tw.nosqli.com": "2025-02-26T19:05:00.123456"
}
```
## 日志记录
系统使用 Python 的 `logging` 模块记录操作日志,所有日志信息将输出到控制台,便于调试和监控。
## 注意事项
- 确保 Redis 服务器正常运行,并且网络端口未被其他服务占用。
- 在生产环境中,建议使用 WSGI 服务器(如 Gunicorn来运行 Flask 应用.

View File

@@ -1,15 +1,54 @@
from flask import Flask
from flask import Flask, request, jsonify
from .config import Config
from .models import db
from .utils import get_latest_emails, get_latest_email_with_code, add_allowed_domain, remove_allowed_domain, get_allowed_domains, get_allowed_domains_with_time
def create_app():
app = Flask(__name__)
app.config.from_object(Config)
# 初始化数据库
db.init_app(app)
with app.app_context():
db.create_all()
@app.route('/emails', methods=['GET'])
def get_emails():
recipient = request.args.get('recipient')
limit = request.args.get('limit', default=10, type=int)
if not recipient:
return jsonify({'error': 'Recipient email is required'}), 400
emails = get_latest_emails(recipient, limit)
return jsonify(emails)
@app.route('/latest_email', methods=['GET'])
def get_latest_email():
recipient = request.args.get('recipient')
if not recipient:
return jsonify({'error': 'Recipient email is required'}), 400
email_data = get_latest_email_with_code(recipient)
if email_data:
return jsonify(email_data)
return jsonify({'error': 'No emails found for this recipient'}), 404
@app.route('/allowed_domains/add', methods=['POST'])
def add_domain():
domain = request.json.get('domain')
if not domain:
return jsonify({'error': 'Domain is required'}), 400
add_allowed_domain(domain)
return jsonify({'message': 'Domain added successfully'}), 201
@app.route('/allowed_domains/remove', methods=['POST'])
def remove_domain():
domain = request.json.get('domain')
if not domain:
return jsonify({'error': 'Domain is required'}), 400
remove_allowed_domain(domain)
return jsonify({'message': 'Domain removed successfully'}), 200
@app.route('/allowed_domains/list', methods=['GET'])
def list_domains():
domains = get_allowed_domains()
return jsonify(domains), 200
@app.route('/allowed_domains/list_with_time', methods=['GET'])
def list_domains_with_time():
domains_with_time = get_allowed_domains_with_time()
return jsonify(domains_with_time), 200
return app

View File

@@ -1,10 +1,5 @@
class Config:
# 使用 1Panel MySQL 内部连接
SQLALCHEMY_DATABASE_URI = 'mysql://gitea_HN5jYh:mysql_KbBZTN@1Panel-mysql-vjz9:3306/gitea_2l82ep'
# 如果内部连接不通,可以尝试使用外部连接
# SQLALCHEMY_DATABASE_URI = 'mysql://gitea_HN5jYh:mysql_KbBZTN@rnpanel.586vip.cn:3306/gitea_2l82ep'
SQLALCHEMY_TRACK_MODIFICATIONS = False
# 使用 1Panel Redis 内部连接
REDIS_URL = "redis://1Panel-redis-r3Pz:6379/0"
# 如果内部连接不通,可以尝试使用外部连接
# REDIS_URL = "redis://rnpanel.586vip.cn:6379/0"
# REDIS_URL = "redis://1Panel-redis-r3Pz:6379/0"
# 如果内部连接不通,可以尝试使用外部连接,添加密码认证
REDIS_URL = "redis://localhost:6380/0"

View File

@@ -1,11 +0,0 @@
from flask_sqlalchemy import SQLAlchemy
db = SQLAlchemy()
class Email(db.Model):
id = db.Column(db.Integer, primary_key=True)
subject = db.Column(db.String(255))
sender = db.Column(db.String(255))
recipient = db.Column(db.String(255))
body = db.Column(db.Text)
timestamp = db.Column(db.DateTime, server_default=db.func.now())

View File

@@ -2,74 +2,80 @@ import smtplib
import os
import json
import logging
import sys
from email.parser import BytesParser
from email.policy import default
from datetime import datetime
from .models import db, Email
import redis
import smtpd
import asyncore
import base64
from .config import Config
# 配置日志
# 配置日志输出到控制台
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
level=logging.DEBUG, # 改为 DEBUG 级别
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.StreamHandler(sys.stdout)
]
)
logger = logging.getLogger('smtp_server')
# 初始化 Redis 客户端
redis_client = redis.from_url(Config.REDIS_URL)
def receive_email():
# 这里实现邮件接收逻辑
# 假设我们从某个 SMTP 服务器接收邮件
# 解析邮件并存储到 Redis
# 示例:
raw_email = b'...' # 这里应该是接收到的原始邮件内容
email = BytesParser(policy=default).parsebytes(raw_email)
email_data = {
'subject': email['subject'],
'sender': email['from'],
'recipient': email['to'],
'body': email.get_body(preferencelist=('plain')).get_content()
}
# 将邮件信息存储到 Redis
redis_client.hmset(f'email:{email_data['subject']}', email_data)
class CustomSMTPServer(smtpd.SMTPServer):
def process_message(self, peer, mailfrom, rcpttos, data):
def __init__(self, localaddr, remoteaddr):
logger.info(f"Initializing SMTP server on {localaddr}")
super().__init__(localaddr, remoteaddr)
def process_message(self, peer, mailfrom, rcpttos, data, **kwargs):
try:
# 记录接收到的邮件基本信息
logger.info(f'Received mail from {mailfrom} to {rcpttos}')
logger.debug(f"Connection from peer: {peer}")
logger.debug(f"Mail from: {mailfrom}")
logger.debug(f"Recipients: {rcpttos}")
logger.debug(f"Raw data length: {len(data)} bytes")
logger.debug(f"Additional kwargs: {kwargs}")
# 验证收件人域名
# 记录接收到的邮件基本信息
logger.info(f"Received mail from {mailfrom} to {rcpttos}")
# 从 Redis 获取允许的域名
allowed_domains = get_allowed_domains()
valid_recipients = []
for rcpt in rcpttos:
if not rcpt.endswith('@nosqli.com'):
logger.warning(f'Rejected mail to {rcpt}: invalid domain')
continue
is_valid = any(rcpt.endswith(f'@{domain}') for domain in allowed_domains)
if not is_valid:
logger.warning(f"Rejected mail to {rcpt}: invalid domain")
else:
valid_recipients.append(rcpt)
if not valid_recipients:
logger.error("No valid recipients found")
return
# 解析邮件
logger.debug("Parsing email data...")
email = BytesParser(policy=default).parsebytes(data)
# 获取邮件正文
body = self._get_email_body(email)
logger.debug(f"Email body length: {len(body) if body else 0}")
# 处理附件
attachments = self._process_attachments(email)
logger.debug(f"Found {len(attachments)} attachments")
# 构建邮件数据
timestamp = datetime.now().isoformat()
message_id = email.get('Message-ID', f'<{timestamp}@nosqli.com>')
message_id = email.get('Message-ID', f"<{timestamp}@nosqli.com>")
email_data = {
'message_id': message_id,
'subject': email.get('subject', ''),
'sender': mailfrom,
'recipients': json.dumps(rcpttos),
'recipients': json.dumps(valid_recipients),
'body': body,
'timestamp': timestamp,
'attachments': json.dumps(attachments),
@@ -80,30 +86,37 @@ class CustomSMTPServer(smtpd.SMTPServer):
# 存储邮件
self._store_email(email_data)
logger.info(f'Successfully processed mail: {message_id}')
logger.info(f"Successfully processed mail: {message_id}")
except Exception as e:
logger.error(f'Error processing email: {str(e)}', exc_info=True)
logger.error(f"Error processing email: {str(e)}", exc_info=True)
raise
def _get_email_body(self, email):
"""提取邮件正文"""
try:
logger.debug("Extracting email body...")
if email.is_multipart():
for part in email.walk():
if part.get_content_type() == "text/plain":
return part.get_payload(decode=True).decode()
content = part.get_payload(decode=True).decode()
logger.debug(f"Found text/plain content: {len(content)} chars")
return content
else:
return email.get_payload(decode=True).decode()
content = email.get_payload(decode=True).decode()
logger.debug(f"Found single part content: {len(content)} chars")
return content
logger.warning("No text content found in email")
return ""
except Exception as e:
logger.error(f'Error extracting email body: {str(e)}')
logger.error(f"Error extracting email body: {str(e)}")
return ""
def _process_attachments(self, email):
"""处理邮件附件"""
attachments = []
try:
logger.debug("Processing attachments...")
if email.is_multipart():
for part in email.walk():
if part.get_content_maintype() == 'multipart':
@@ -113,6 +126,7 @@ class CustomSMTPServer(smtpd.SMTPServer):
filename = part.get_filename()
if filename:
logger.debug(f"Processing attachment: {filename}")
attachment_data = part.get_payload(decode=True)
attachments.append({
'filename': filename,
@@ -120,43 +134,50 @@ class CustomSMTPServer(smtpd.SMTPServer):
'content_type': part.get_content_type(),
'size': len(attachment_data)
})
logger.debug(f"Attachment processed: {filename} ({len(attachment_data)} bytes)")
except Exception as e:
logger.error(f'Error processing attachments: {str(e)}')
logger.error(f"Error processing attachments: {str(e)}")
return attachments
def _store_email(self, email_data):
"""存储邮件到 Redis"""
try:
logger.debug("Storing email in Redis...")
# 使用 message_id 作为主键
email_key = f'email:{email_data["message_id"]}'
email_key = f"email:{email_data['message_id']}"
redis_client.hmset(email_key, email_data)
logger.debug(f"Stored email with key: {email_key}")
# 为每个收件人创建索引
recipients = json.loads(email_data['recipients'])
for recipient in recipients:
recipient_key = f'recipient:{recipient}'
recipient_key = f"recipient:{recipient}"
redis_client.lpush(recipient_key, email_key)
logger.debug(f"Created recipient index: {recipient_key}")
# 创建时间索引
time_key = f'time:{email_data["timestamp"]}'
time_key = f"time:{email_data['timestamp']}"
redis_client.set(time_key, email_key)
logger.debug(f"Created time index: {time_key}")
# 设置过期时间(可选,这里设置为30天
redis_client.expire(email_key, 30 * 24 * 60 * 60)
# 设置过期时间(可选,这里设置为10分钟
redis_client.expire(email_key, 10 * 60)
logger.debug("Set expiration time: 10 minutes")
except Exception as e:
logger.error(f'Error storing email: {str(e)}')
logger.error(f"Error storing email: {str(e)}")
raise
def start_smtp_server(host='0.0.0.0', port=25):
"""启动 SMTP 服务器"""
try:
logger.info(f'Starting SMTP server on {host}:{port}')
logger.info(f"Starting SMTP server on {host}:{port}")
server = CustomSMTPServer((host, port), None)
logger.info("SMTP server initialized, entering main loop...")
asyncore.loop()
except Exception as e:
logger.error(f'Error starting SMTP server: {str(e)}')
logger.error(f"Error starting SMTP server: {str(e)}")
raise
@@ -191,4 +212,94 @@ def get_attachment(email_key, attachment_index):
return None
except Exception as e:
print(f'Error fetching attachment: {e}')
return None
return None
def get_latest_emails(recipient, limit=10):
"""获取指定收件人的最新邮件"""
try:
recipient_key = f'recipient:{recipient}'
email_keys = redis_client.lrange(recipient_key, 0, limit - 1)
emails = []
for key in email_keys:
email_data = redis_client.hgetall(key.decode())
if email_data:
email_data = {k.decode(): v.decode() for k, v in email_data.items()}
emails.append(email_data)
return emails
except Exception as e:
logger.error(f'Error fetching emails: {e}')
return []
def get_latest_email_with_code(recipient):
"""获取指定收件人的最新邮件并提取验证码"""
try:
recipient_key = f'recipient:{recipient}'
email_key = redis_client.lindex(recipient_key, 0) # 获取最新邮件的键
if email_key:
email_data = redis_client.hgetall(email_key.decode())
if email_data:
email_data = {k.decode(): v.decode() for k, v in email_data.items()}
body = email_data.get('body', '')
# 假设验证码是以某种格式存在于邮件正文中,例如 "验证码: 123456"
code = extract_code_from_body(body)
email_data['code'] = code # 将验证码添加到返回数据中
return email_data
return None
except Exception as e:
logger.error(f'Error fetching latest email with code: {e}')
return None
def extract_code_from_body(body):
"""从邮件正文中提取验证码"""
import re
match = re.search(r'\b(\d{6})\b', body)
return match.group(1) if match else None
def add_allowed_domain(domain):
"""添加允许的域名并记录添加时间"""
try:
timestamp = datetime.now().isoformat()
redis_client.sadd('allowed_domains', domain)
redis_client.hset(f'domain:{domain}', 'status', 'allowed') # 新增状态键
redis_client.hset(f'domain_time:{domain}', 'added_at', timestamp) # 记录添加时间
logger.info(f'Added allowed domain: {domain} at {timestamp}')
except Exception as e:
logger.error(f'Error adding allowed domain: {e}')
def remove_allowed_domain(domain):
"""删除允许的域名"""
try:
redis_client.srem('allowed_domains', domain)
logger.info(f'Removed allowed domain: {domain}')
except Exception as e:
logger.error(f'Error removing allowed domain: {e}')
def get_allowed_domains():
"""获取当前允许的域名列表"""
try:
domains = redis_client.smembers('allowed_domains')
return [domain.decode() for domain in domains]
except Exception as e:
logger.error(f'Error fetching allowed domains: {e}')
return []
def get_allowed_domains_with_time():
"""获取当前允许的域名及其添加时间"""
try:
domains = redis_client.smembers('allowed_domains')
domain_info = {}
for domain in domains:
domain = domain.decode()
added_at = redis_client.hget(f'domain_time:{domain}', 'added_at')
domain_info[domain] = added_at.decode() if added_at else None
return domain_info
except Exception as e:
logger.error(f'Error fetching allowed domains with time: {e}')
return {}

View File

@@ -1,5 +1,3 @@
Flask==2.0.1
Flask-SQLAlchemy==2.5.1
Flask-Mail==0.9.1
redis==3.5.3
mysqlclient==2.0.3
Flask-Mail==0.9.1

16
run.py
View File

@@ -1,17 +1,25 @@
import threading
import argparse
from app import create_app
from app.utils import start_smtp_server
app = create_app()
def run_smtp_server():
start_smtp_server(host='0.0.0.0', port=25)
def run_smtp_server(host, port):
start_smtp_server(host=host, port=port)
if __name__ == '__main__':
parser = argparse.ArgumentParser(description='Email System Server')
parser.add_argument('--host', default='0.0.0.0', help='Host to bind')
parser.add_argument('--port', type=int, default=5000, help='Port for web server')
parser.add_argument('--smtp-port', type=int, default=25, help='Port for SMTP server')
args = parser.parse_args()
# 在单独的线程中启动 SMTP 服务器
smtp_thread = threading.Thread(target=run_smtp_server)
smtp_thread = threading.Thread(target=run_smtp_server, args=(args.host, args.smtp_port))
smtp_thread.daemon = True
smtp_thread.start()
# 启动 Flask 应用
app.run(host='0.0.0.0', port=5000, debug=True)
app.run(host=args.host, port=args.port)