From a9e29c9cf5a4381cdb4ae24066a9b3bd090bd6c4 Mon Sep 17 00:00:00 2001 From: huangzhenpc Date: Wed, 26 Feb 2025 10:16:12 +0800 Subject: [PATCH] =?UTF-8?q?=E5=A2=9E=E5=BC=BA=EF=BC=9A=E6=B7=BB=E5=8A=A0?= =?UTF-8?q?=E6=89=B9=E9=87=8F=E6=B3=A8=E5=86=8C=E5=8A=9F=E8=83=BD=E5=92=8C?= =?UTF-8?q?=E6=94=AF=E6=8C=81=E6=96=87=E4=BB=B6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- README.md | 168 ++++++++++++++++++--- app/api/email_routes.py | 256 +++++++++++++++++++++++++++----- app/models/email.py | 60 +++++++- app/services/mail_store.py | 272 ++++++++++++++++++++-------------- app/services/smtp_server.py | 124 +++++++++++++--- app/utils/__init__.py | 9 ++ batch_registration_example.py | 196 ++++++++++++++++++++++++ config.example.py | 120 +++++++++++++++ email-system.service | 19 +++ email_api_client.py | 244 ++++++++++++++++++++++++++++++ test_email_by_address.py | 116 +++++++++++++++ 11 files changed, 1392 insertions(+), 192 deletions(-) create mode 100644 app/utils/__init__.py create mode 100644 batch_registration_example.py create mode 100644 config.example.py create mode 100644 email-system.service create mode 100644 email_api_client.py create mode 100644 test_email_by_address.py diff --git a/README.md b/README.md index b2b247c..ee6e741 100644 --- a/README.md +++ b/README.md @@ -1,32 +1,154 @@ -# 邮件系统 +# 邮件系统使用指南 -这是一个基于Python的完整邮件系统解决方案,提供SMTP服务和API接口管理。 +这是一个自托管的邮件系统,用于接收和处理电子邮件,特别适用于批量注册和验证场景。 -## 功能特点 +## 系统特性 -- SMTP服务器接收和发送邮件 -- 邮箱管理API接口 -- 配置灵活,支持多域名 -- 支持Docker部署 -- 包含监控和性能测试工具 +- 自动接收邮件并存储到数据库 +- 提供API接口查询和管理邮件 +- 支持通过邮箱地址直接查询最新邮件 +- 提供验证码自动提取功能 +- 支持批量注册和验证操作 +- 支持邮件附件的处理和下载 -## 目录结构 +## 安装和配置 -- `app/`: 应用主代码 - - `api/`: API接口实现 - - `models/`: 数据模型 - - `services/`: 业务逻辑服务 - - `templates/`: 模板文件 -- `config.py`: 配置文件 -- `run.py`: 主程序入口 -- `monitor_email_system.py`: 监控工具 -- `performance_test.py`: 性能测试工具 -- `deploy_production.sh`: 生产环境部署脚本 +### 系统要求 -## 部署方法 +- Python 3.7+ +- SQLite 或 MySQL 数据库 +- 开放的网络端口(SMTP: 25, HTTP: 5000) -详细部署步骤请参考 `DEPLOYMENT_GUIDE.md`。 +### 安装步骤 -## 许可证 +1. 克隆代码库: +```bash +git clone https://github.com/yourusername/emailsystem.git +cd emailsystem +``` -此项目采用MIT许可证。 \ No newline at end of file +2. 安装依赖: +```bash +pip install -r requirements.txt +``` + +3. 配置系统: +```bash +cp config.example.py config.py +# 编辑config.py文件设置相关参数 +``` + +4. 启动系统: +```bash +python run.py --host 0.0.0.0 --port 5000 --smtp-port 25 +``` + +5. 设置为系统服务(可选): +```bash +sudo cp email-system.service /etc/systemd/system/ +sudo systemctl enable email-system +sudo systemctl start email-system +``` + +## API使用指南 + +### 获取邮箱列表 + +``` +GET /api/domains/{domain_id}/mailboxes +``` + +### 获取指定邮箱的邮件 + +``` +GET /api/mailboxes/{mailbox_id}/emails +``` + +### 直接通过邮箱地址获取邮件 + +``` +GET /api/emails/by-address?email_address=user@example.com&limit=10&unread_only=true +``` + +参数说明: +- `email_address`: 邮箱地址 +- `limit`: 返回邮件数量限制(可选,默认10) +- `unread_only`: 是否只返回未读邮件(可选,默认false) +- `since`: 获取指定时间之后的邮件(可选,格式:YYYY-MM-DDTHH:MM:SS) + +### 获取邮件详情 + +``` +GET /api/emails/{email_id} +``` + +### 检查系统状态 + +``` +GET /api/status +``` + +## 使用示例 + +### 使用API客户端 + +```python +from email_api_client import EmailApiClient + +# 创建客户端实例 +client = EmailApiClient(base_url="http://localhost:5000/api") + +# 检查系统状态 +status = client.check_system_status() +print(f"系统状态: {status}") + +# 获取指定邮箱的最新邮件 +emails = client.get_emails_by_address("user@example.com", limit=5) +print(f"获取到 {len(emails)} 封邮件") + +# 等待特定邮件并提取验证码 +email = client.wait_for_email( + "user@example.com", + timeout=30, + subject="验证", + keyword="验证码" +) +if email: + verification_code = client.extract_verification_code(email) + print(f"验证码: {verification_code}") +``` + +### 运行批量注册示例 + +```bash +python batch_registration_example.py --domain example.com --count 10 --concurrent 3 +``` + +## 注意事项 + +1. 系统需要开放25端口用于接收邮件,请确保服务器防火墙已配置 +2. 默认情况下,系统会接收任何发往配置域名的邮件 +3. 邮件存储在数据库中,附件存储在文件系统中 +4. 系统不支持发送邮件功能 + +## 故障排除 + +1. 如果无法接收邮件,请检查: + - SMTP端口是否开放 + - DNS MX记录是否正确配置 + - 防火墙设置 + +2. 如果API无法访问,请检查: + - HTTP端口是否开放 + - 应用日志中是否有错误信息 + - 数据库连接是否正常 + +## 更多资源 + +- [API文档](docs/api.md) +- [配置选项](docs/configuration.md) +- [开发指南](docs/development.md) + +## 维护与支持 + +如有问题或需要支持,请创建GitHub issue或联系维护人员。 \ No newline at end of file diff --git a/app/api/email_routes.py b/app/api/email_routes.py index 848123b..8bef3b4 100644 --- a/app/api/email_routes.py +++ b/app/api/email_routes.py @@ -1,9 +1,13 @@ from flask import request, jsonify, current_app, send_file from io import BytesIO import time +import os +import logging from . import api_bp -from ..models import get_session, Email, Mailbox +from ..models import get_session, Email, Mailbox, Domain +from ..utils import email_parser +from ..config import config # 获取邮箱的所有邮件 @api_bp.route('/mailboxes//emails', methods=['GET']) @@ -39,6 +43,7 @@ def get_mailbox_emails(mailbox_id): # 返回结果 result = { + 'success': True, 'total': total, 'page': page, 'limit': limit, @@ -50,49 +55,122 @@ def get_mailbox_emails(mailbox_id): db.close() except Exception as e: current_app.logger.error(f"获取邮件列表出错: {str(e)}") - return jsonify({'error': '获取邮件列表失败', 'details': str(e)}), 500 + return jsonify({'success': False, 'error': '获取邮件列表失败', 'details': str(e)}), 500 # 获取特定邮件详情 @api_bp.route('/emails/', methods=['GET']) def get_email(email_id): - """获取特定邮件的详细信息""" + """ + 获取单个邮件的详细信息 + """ try: - mark_as_read = request.args.get('mark_as_read', 'true').lower() == 'true' + email_id = int(email_id) + session = get_session() + email = session.query(Email).filter(Email.id == email_id).first() + + if not email: + return jsonify({ + 'success': False, + 'error': f'未找到ID为{email_id}的邮件' + }), 404 + + # 获取邮件正文内容 + body_text = None + body_html = None - db = get_session() try: - email = db.query(Email).filter_by(id=email_id).first() + # 尝试从文件中读取邮件内容 + if email.id: + email_path = os.path.join(config.DATA_DIR, 'emails', f'email_{email.id}.eml') + if os.path.exists(email_path): + logging.info(f"从文件读取邮件内容: {email_path}") + with open(email_path, 'r', encoding='utf-8', errors='ignore') as f: + try: + raw_email = f.read() + msg = email_parser.parsestr(raw_email) + + if msg.is_multipart(): + # 处理多部分邮件 + for part in msg.walk(): + content_type = part.get_content_type() + content_disposition = str(part.get("Content-Disposition")) + + # 跳过附件 + if "attachment" in content_disposition: + continue + + # 处理文本内容 + if content_type == "text/plain": + payload = part.get_payload(decode=True) + charset = part.get_content_charset() or 'utf-8' + try: + body_text = payload.decode(charset, errors='replace') + except Exception as e: + logging.error(f"解码纯文本内容失败: {e}") + body_text = payload.decode('utf-8', errors='replace') + + # 处理HTML内容 + elif content_type == "text/html": + payload = part.get_payload(decode=True) + charset = part.get_content_charset() or 'utf-8' + try: + body_html = payload.decode(charset, errors='replace') + except Exception as e: + logging.error(f"解码HTML内容失败: {e}") + body_html = payload.decode('utf-8', errors='replace') + else: + # 处理单部分邮件 + content_type = msg.get_content_type() + payload = msg.get_payload(decode=True) + charset = msg.get_content_charset() or 'utf-8' + + try: + decoded_content = payload.decode(charset, errors='replace') + except Exception as e: + logging.error(f"解码内容失败: {e}") + decoded_content = payload.decode('utf-8', errors='replace') + + if content_type == "text/plain": + body_text = decoded_content + elif content_type == "text/html": + body_html = decoded_content + except Exception as e: + logging.error(f"解析邮件文件失败: {e}") + except Exception as e: + logging.error(f"读取邮件内容时出错: {e}") + + # 如果文件读取失败,使用数据库中的内容 + if body_text is None: + body_text = email.body_text + + if body_html is None: + body_html = email.body_html - if not email: - return jsonify({'error': '邮件不存在', 'success': False}), 404 - - # 标记为已读 - if mark_as_read and not email.read: - email.read = True - db.commit() - - # 构建详细响应 - result = email.to_dict() - result['body_text'] = email.body_text - result['body_html'] = email.body_html - - # 获取附件信息 - attachments = [] - for attachment in email.attachments: - attachments.append({ - 'id': attachment.id, - 'filename': attachment.filename, - 'content_type': attachment.content_type, - 'size': attachment.size - }) - result['attachments'] = attachments - - return jsonify({'email': result, 'success': True}), 200 - finally: - db.close() + logging.info(f"邮件ID={email_id} 正文长度: text={len(body_text or '')}字节, html={len(body_html or '')}字节") + + # 返回邮件信息,包括正文内容 + return jsonify({ + 'success': True, + 'email': { + 'id': email.id, + 'subject': email.subject, + 'sender': email.sender, + 'recipients': email.recipients, + 'received_at': email.received_at.isoformat(), + 'verification_code': email.verification_code, + 'verification_link': email.verification_link, + 'body_text': body_text, + 'body_html': body_html + } + }) except Exception as e: - current_app.logger.error(f"获取邮件详情出错: {str(e)}") - return jsonify({'error': '获取邮件详情失败', 'details': str(e), 'success': False}), 500 + logging.error(f"获取邮件时出错: {str(e)}") + return jsonify({ + 'success': False, + 'error': f'获取邮件时发生错误: {str(e)}' + }), 500 + finally: + session.close() # 删除邮件 @api_bp.route('/emails/', methods=['DELETE']) @@ -195,4 +273,110 @@ def poll_new_emails(mailbox_id): db.close() except Exception as e: current_app.logger.error(f"轮询新邮件出错: {str(e)}") - return jsonify({'error': '轮询新邮件失败', 'details': str(e)}), 500 \ No newline at end of file + return jsonify({'error': '轮询新邮件失败', 'details': str(e)}), 500 + +# 通过邮箱地址获取最新邮件 +@api_bp.route('/emails/by-address', methods=['GET']) +def get_emails_by_address(): + """ + 通过邮箱地址获取最新邮件 + + 参数: + email_address: 完整邮箱地址 (例如: user@example.com) + limit: 返回的邮件数量 (默认: 10) + since: 从指定时间戳后获取邮件 (可选) + unread_only: 是否只返回未读邮件 (默认: false) + + 返回: + 最新的邮件列表 + """ + try: + email_address = request.args.get('email_address') + if not email_address or '@' not in email_address: + return jsonify({ + 'success': False, + 'error': '请提供有效的邮箱地址 (格式: user@example.com)' + }), 400 + + limit = int(request.args.get('limit', 10)) + unread_only = request.args.get('unread_only', 'false').lower() == 'true' + since = request.args.get('since') + + # 解析邮箱地址 + try: + username, domain_name = email_address.split('@', 1) + except ValueError: + return jsonify({ + 'success': False, + 'error': '邮箱地址格式无效' + }), 400 + + db = get_session() + try: + # 查找域名 + domain = db.query(Domain).filter_by(name=domain_name, active=True).first() + if not domain: + return jsonify({ + 'success': False, + 'error': f'域名 {domain_name} 不存在或未激活' + }), 404 + + # 查找邮箱 + mailbox = db.query(Mailbox).filter_by(address=username, domain_id=domain.id).first() + if not mailbox: + # 自动创建邮箱 - 批量注册场景 + mailbox = Mailbox( + address=username, + domain_id=domain.id, + description=f"自动创建 ({email_address})", + active=True + ) + db.add(mailbox) + db.flush() # 获取新创建邮箱的ID + logging.info(f"自动创建邮箱: {email_address}, ID={mailbox.id}") + + # 查询邮件 + query = db.query(Email).filter(Email.mailbox_id == mailbox.id) + + if unread_only: + query = query.filter(Email.read == False) + + if since: + try: + since_time = float(since) + query = query.filter( + Email.received_at >= time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(since_time)) + ) + except ValueError: + logging.warning(f"无效的since参数: {since}") + + # 获取最新的邮件 + emails = query.order_by(Email.received_at.desc()).limit(limit).all() + + # 获取总数 + total = query.count() + + # 提交数据库变更 + db.commit() + + # 返回结果 + return jsonify({ + 'success': True, + 'email_address': email_address, + 'mailbox_id': mailbox.id, + 'total': total, + 'count': len(emails), + 'emails': [email.to_dict() for email in emails], + 'timestamp': time.time() + }), 200 + except Exception as e: + db.rollback() + raise + finally: + db.close() + except Exception as e: + logging.error(f"获取邮件出错: {str(e)}") + return jsonify({ + 'success': False, + 'error': f'获取邮件失败: {str(e)}' + }), 500 \ No newline at end of file diff --git a/app/models/email.py b/app/models/email.py index 163a566..7d31879 100644 --- a/app/models/email.py +++ b/app/models/email.py @@ -5,6 +5,7 @@ from sqlalchemy.orm import relationship from datetime import datetime import re import sys +import logging from . import Base import config @@ -50,22 +51,58 @@ class Email(Base): 尝试从邮件内容中提取验证码和验证链接 这个方法会在邮件保存时自动调用 """ + logger = logging.getLogger(__name__) + # 合并文本和HTML内容用于搜索 - content = f"{self.subject} {self.body_text or ''}" + content = f"{self.subject or ''} {self.body_text or ''} {self.body_html or ''}" + logger.info(f"开始提取邮件ID={self.id}的验证信息,内容长度={len(content)}") + + # 首先检查是否是Cursor验证邮件 + if "Verify your email" in self.subject and ( + "cursor.sh" in self.sender.lower() or + "cursor" in self.sender.lower() + ): + logger.info("检测到Cursor验证邮件") + # 针对Cursor验证邮件的特定验证码格式 + import re + # 从HTML中提取6位数字验证码 + cursor_patterns = [ + r'(\d{6})', # 匹配Cursor邮件中的6位数字验证码格式 + r']*>(\d{6})', # 更宽松的匹配 + r'>(\d{6})<', # 最简单的形式 + r'(\d{6})' # 任何6位数字 + ] + + for pattern in cursor_patterns: + matches = re.findall(pattern, content) + if matches: + self.verification_code = matches[0] + logger.info(f"从Cursor邮件中提取到验证码: {self.verification_code}") + break + + return # 提取可能的验证码(4-8位数字或字母组合) code_patterns = [ - r'\b[A-Z0-9]{4,8}\b', # 大写字母和数字 + r'\b([A-Z0-9]{4,8})\b', # 大写字母和数字 r'验证码[::]\s*([A-Z0-9]{4,8})', # 中文格式 r'验证码是[::]\s*([A-Z0-9]{4,8})', # 中文格式2 r'code[::]\s*([A-Z0-9]{4,8})', # 英文格式 + r'code is[::]\s*([A-Z0-9]{4,8})', # 英文格式2 + r'code[::]\s*<[^>]*>([A-Z0-9]{4,8})', # HTML格式 + r']*>([0-9]{4,8})', # HTML分隔的数字 + r']*>([A-Z0-9]{4,8})', # 粗体验证码 ] for pattern in code_patterns: matches = re.findall(pattern, content, re.IGNORECASE) if matches: - self.verification_code = matches[0] - break + # 过滤掉明显不是验证码的结果 + filtered_matches = [m for m in matches if len(m) >= 4 and not m.lower() in ['code', 'verify', 'http', 'https']] + if filtered_matches: + self.verification_code = filtered_matches[0] + logger.info(f"提取到验证码: {self.verification_code}") + break # 提取验证链接 link_patterns = [ @@ -77,7 +114,22 @@ class Email(Base): matches = re.findall(pattern, content, re.IGNORECASE) if matches: self.verification_link = matches[0] + logger.info(f"提取到验证链接: {self.verification_link}") break + + # 如果没有找到验证码,但邮件主题暗示这是验证邮件 + verify_subjects = ['verify', 'confirmation', 'activate', 'validation', '验证', '确认'] + if not self.verification_code and any(subj in self.subject.lower() for subj in verify_subjects): + logger.info("根据主题判断这可能是验证邮件,但未能提取到验证码") + # 尝试从HTML中提取明显的数字序列 + if self.body_html: + number_matches = re.findall(r'(\d{4,8})', self.body_html) + filtered_numbers = [n for n in number_matches if len(n) >= 4 and len(n) <= 8] + if filtered_numbers: + self.verification_code = filtered_numbers[0] + logger.info(f"从HTML中提取到可能的验证码: {self.verification_code}") + + logger.info(f"验证信息提取完成: code={self.verification_code}, link={self.verification_link}") def __repr__(self): return f"" diff --git a/app/services/mail_store.py b/app/services/mail_store.py index 366a253..d36711d 100644 --- a/app/services/mail_store.py +++ b/app/services/mail_store.py @@ -31,138 +31,192 @@ class MailStore: if not os.path.exists(self.storage_path): os.makedirs(self.storage_path) - async def save_email(self, sender, recipient, message, raw_data): + async def save_email(self, message, sender, recipients, raw_data=None): """ - 保存一封电子邮件 + 保存邮件到数据库 - 参数: - sender: 发件人地址 - recipient: 收件人地址 - message: 解析后的邮件对象 + Args: + message: 已解析的邮件对象 + sender: 发件人邮箱 + recipients: 收件人邮箱列表 raw_data: 原始邮件数据 - 返回: - 成功返回邮件ID,失败返回None + Returns: + (bool, str): 成功标志和错误信息 """ - # 从收件人地址中提取用户名和域名 try: - address, domain_name = recipient.split('@', 1) - except ValueError: - logger.warning(f"无效的收件人地址格式: {recipient}") - return None + logging.info(f"开始保存邮件: 发件人={sender}, 收件人={recipients}") - # 获取数据库会话 - db = self.db_session_factory() - - try: - # 检查域名是否存在且活跃 - domain = db.query(Domain).filter_by(name=domain_name, active=True).first() - if not domain: - logger.warning(f"不支持的域名: {domain_name}") - return None + # 从消息对象中提取主题 + subject = message.get('Subject', '') + if subject is None: + subject = '' - # 查找或创建邮箱 - mailbox = db.query(Mailbox).filter_by(address=address, domain_id=domain.id).first() - if not mailbox: - # 自动创建新邮箱 - mailbox = Mailbox( - address=address, - domain_id=domain.id, - active=True - ) - db.add(mailbox) - db.flush() # 获取ID但不提交 - logger.info(f"已为 {recipient} 自动创建邮箱") - - # 提取邮件内容 - subject = message.get('subject', '') + logging.info(f"邮件主题: {subject}") - # 获取文本和HTML内容 - body_text = None - body_html = None - attachments_data = [] + # 获取邮件内容(文本和HTML) + body_text = "" + body_html = "" + attachments = [] + # 处理多部分邮件 if message.is_multipart(): + logging.info("处理多部分邮件") for part in message.walk(): content_type = part.get_content_type() - content_disposition = part.get_content_disposition() - - # 处理文本内容 - if content_disposition is None or content_disposition == 'inline': - if content_type == 'text/plain' and not body_text: - body_text = part.get_content() - elif content_type == 'text/html' and not body_html: - body_html = part.get_content() + content_disposition = str(part.get("Content-Disposition") or "") + logging.debug(f"处理邮件部分: 类型={content_type}, 处置={content_disposition}") # 处理附件 - elif content_disposition == 'attachment': - filename = part.get_filename() - if filename: - content = part.get_payload(decode=True) - if content: - attachments_data.append({ - 'filename': filename, - 'content_type': content_type, - 'data': content, - 'size': len(content) - }) + if "attachment" in content_disposition: + try: + filename = part.get_filename() + if filename: + payload = part.get_payload(decode=True) + if payload and len(payload) > 0: + logging.info(f"发现附件: {filename}, 大小={len(payload)}字节") + # 将附件信息添加到列表,稍后处理 + attachments.append({ + 'filename': filename, + 'content_type': content_type, + 'data': payload + }) + except Exception as e: + logging.error(f"处理附件时出错: {str(e)}") + continue + + # 处理内容部分 + elif content_type == "text/plain" and not body_text: + try: + payload = part.get_payload(decode=True) + if payload: + charset = part.get_content_charset() or 'utf-8' + try: + body_text = payload.decode(charset, errors='replace') + logging.info(f"提取到纯文本内容: {len(body_text)}字节") + except Exception as e: + logging.error(f"解码纯文本内容失败: {e}") + body_text = payload.decode('utf-8', errors='replace') + except Exception as e: + logging.error(f"获取纯文本部分时出错: {str(e)}") + + elif content_type == "text/html" and not body_html: + try: + payload = part.get_payload(decode=True) + if payload: + charset = part.get_content_charset() or 'utf-8' + try: + body_html = payload.decode(charset, errors='replace') + logging.info(f"提取到HTML内容: {len(body_html)}字节") + except Exception as e: + logging.error(f"解码HTML内容失败: {e}") + body_html = payload.decode('utf-8', errors='replace') + except Exception as e: + logging.error(f"获取HTML部分时出错: {str(e)}") + + # 处理单部分邮件 else: - # 非多部分邮件 + logging.info("处理单部分邮件") content_type = message.get_content_type() - if content_type == 'text/plain': - body_text = message.get_content() - elif content_type == 'text/html': - body_html = message.get_content() + logging.debug(f"单部分邮件类型: {content_type}") + + try: + payload = message.get_payload(decode=True) + if payload: + charset = message.get_content_charset() or 'utf-8' + logging.debug(f"邮件编码: {charset}") + try: + decoded_content = payload.decode(charset, errors='replace') + except Exception as e: + logging.error(f"解码内容失败: {e}") + decoded_content = payload.decode('utf-8', errors='replace') + + if content_type == 'text/plain': + body_text = decoded_content + logging.info(f"提取到纯文本内容: {len(body_text)}字节") + elif content_type == 'text/html': + body_html = decoded_content + logging.info(f"提取到HTML内容: {len(body_html)}字节") + else: + logging.warning(f"未知内容类型: {content_type}") + # 假设为纯文本 + body_text = decoded_content + except Exception as e: + logging.error(f"获取邮件内容时出错: {str(e)}") - # 创建邮件记录 - email_obj = Email( - mailbox_id=mailbox.id, - sender=sender, - recipients=recipient, - subject=subject, - body_text=body_text, - body_html=body_html, - headers={k: v for k, v in message.items()} - ) + # 如果仍然没有内容,尝试从原始数据中提取 + if not body_text and not body_html and raw_data: + logging.info("从原始数据中提取内容") + try: + # 简单提取,可能不适用于所有情况 + if '' in raw_data.lower(): + body_html = raw_data + else: + body_text = raw_data + except Exception as e: + logging.error(f"从原始数据提取内容失败: {str(e)}") - # 保存邮件 - db.add(email_obj) - db.flush() # 获取ID但不提交 + logging.info(f"提取完成: 纯文本={len(body_text)}字节, HTML={len(body_html)}字节, 附件数={len(attachments)}") - # 提取验证信息 - email_obj.extract_verification_data() - - # 保存附件 - for attachment_data in attachments_data: - attachment = Attachment( - email_id=email_obj.id, - filename=attachment_data['filename'], - content_type=attachment_data['content_type'], - size=attachment_data['size'] + # 保存到数据库 + session = self.db_session_factory() + try: + # 创建新邮件记录 + new_email = Email( + subject=subject, + sender=sender, + recipients=','.join(recipients) if isinstance(recipients, list) else recipients, + body_text=body_text, + body_html=body_html, + received_at=datetime.datetime.now() ) - db.add(attachment) - db.flush() + # 提取验证码和验证链接(如果有) + new_email.extract_verification_data() - # 决定存储位置 - if attachment_data['size'] > 1024 * 1024: # 大于1MB的存储到文件系统 - attachments_dir = os.path.join(self.storage_path, 'attachments') - attachment.save_to_filesystem(attachment_data['data'], attachments_dir) - else: - # 小附件直接存储在数据库 - attachment.content = attachment_data['data'] - - # 提交所有更改 - db.commit() - logger.info(f"邮件已成功保存: {sender} -> {recipient}, ID: {email_obj.id}") - return email_obj.id - + # 保存邮件 + session.add(new_email) + session.commit() + email_id = new_email.id + logging.info(f"邮件保存到数据库, ID={email_id}") + + # 处理附件 + if attachments: + for attachment_data in attachments: + attachment = Attachment( + email_id=email_id, + filename=attachment_data['filename'], + content_type=attachment_data['content_type'], + size=len(attachment_data['data']), + data=attachment_data['data'] + ) + session.add(attachment) + session.commit() + logging.info(f"保存了{len(attachments)}个附件") + + # 保存原始邮件到文件系统 + try: + if raw_data and email_id: + email_dir = os.path.join(self.storage_path, 'emails') + os.makedirs(email_dir, exist_ok=True) + email_path = os.path.join(email_dir, f'email_{email_id}.eml') + with open(email_path, 'w', encoding='utf-8') as f: + f.write(raw_data) + logging.info(f"原始邮件保存到: {email_path}") + except Exception as e: + logging.error(f"保存原始邮件到文件系统失败: {str(e)}") + + return True, f"邮件保存成功,ID: {email_id}" + except Exception as e: + logging.error(f"保存邮件到数据库失败: {str(e)}") + return False, f"保存邮件失败: {str(e)}" + finally: + session.close() except Exception as e: - db.rollback() - logger.error(f"保存邮件时出错: {str(e)}") - return None - finally: - db.close() + logging.error(f"保存邮件时出现未处理异常: {str(e)}") + import traceback + traceback.print_exc() + return False, f"保存邮件过程中出错: {str(e)}" def get_emails_for_mailbox(self, mailbox_id, limit=50, offset=0, unread_only=False): """获取指定邮箱的邮件列表""" @@ -201,7 +255,7 @@ class MailStore: if mark_as_read and not email.read: email.read = True - email.last_read = datetime.utcnow() + email.last_read = datetime.datetime.now() db.commit() # 获取附件信息 diff --git a/app/services/smtp_server.py b/app/services/smtp_server.py index 32443df..4338934 100644 --- a/app/services/smtp_server.py +++ b/app/services/smtp_server.py @@ -9,9 +9,14 @@ from aiosmtpd.handlers import Message import os import sys import threading +import traceback from ..models.domain import Domain from ..models.mailbox import Mailbox +from ..utils import email_parser +from ..models import Email + +from aiosmtpd.smtp import SMTP, Session, Envelope logger = logging.getLogger(__name__) @@ -34,29 +39,43 @@ class EmailHandler(Message): async def handle_DATA(self, server, session, envelope): """处理接收到的邮件数据""" try: - # 获取收件人和发件人 - peer = session.peer - mail_from = envelope.mail_from - rcpt_tos = envelope.rcpt_tos + logging.info(f"收到邮件: 发件人={envelope.mail_from}, 收件人={envelope.rcpt_tos}") - # 获取原始邮件内容 - data = envelope.content - mail = email.message_from_bytes(data, policy=default) + # 保存原始邮件数据 + data = envelope.content.decode('utf-8', errors='replace') - # 保存邮件到存储服务 - for rcpt in rcpt_tos: - result = await self.mail_store.save_email(mail_from, rcpt, mail, data) + # 解析邮件数据 + message = email_parser.Parser().parsestr(data) + subject = message.get('Subject', '') + logging.info(f"邮件主题: {subject}") + + # 记录邮件结构和内容 + logging.debug(f"邮件结构: is_multipart={message.is_multipart()}") + if message.is_multipart(): + logging.debug(f"多部分邮件: 部分数量={len(list(message.walk()))}") + for i, part in enumerate(message.walk()): + content_type = part.get_content_type() + logging.debug(f"部分 {i+1}: 内容类型={content_type}") + + # 使用邮件存储服务保存邮件 + success, error_msg = await self.mail_store.save_email( + message, + envelope.mail_from, + envelope.rcpt_tos, + raw_data=data + ) + + if success: + logging.info(f"邮件保存成功: 来自 {envelope.mail_from} 发送给 {envelope.rcpt_tos}") + return '250 消息接收完成' + else: + logging.error(f"邮件保存失败: {error_msg}") + return '451 处理邮件时出现错误,请稍后重试' - # 记录日志 - if result: - logger.info(f"邮件已保存: {mail_from} -> {rcpt}, 主题: {mail.get('Subject')}") - else: - logger.warning(f"邮件未保存: {mail_from} -> {rcpt}, 可能是无效地址") - - return '250 Message accepted for delivery' except Exception as e: - logger.error(f"处理邮件时出错: {str(e)}") - return '451 Requested action aborted: error in processing' + logging.error(f"处理邮件时出错: {str(e)}") + traceback.print_exc() + return '451 处理邮件时出现错误,请稍后重试' # 为Windows环境自定义SMTP控制器 @@ -133,4 +152,69 @@ class SMTPServer: return True except Exception as e: logger.error(f"停止SMTP服务器失败: {str(e)}") - return False \ No newline at end of file + return False + +class MailHandler: + """邮件处理器,用于处理接收的SMTP邮件""" + + def __init__(self, mail_store): + self.mail_store = mail_store + + async def handle_EHLO(self, server, session, envelope, hostname): + session.host_name = hostname + return '250-AUTH PLAIN\n250-SIZE 52428800\n250 SMTPUTF8' + + async def handle_MAIL(self, server, session, envelope, address, mail_options=None): + if not mail_options: + mail_options = [] + envelope.mail_from = address + envelope.mail_options.extend(mail_options) + return '250 OK' + + async def handle_RCPT(self, server, session, envelope, address, rcpt_options=None): + if not rcpt_options: + rcpt_options = [] + envelope.rcpt_tos.append(address) + envelope.rcpt_options.extend(rcpt_options) + return '250 OK' + + async def handle_DATA(self, server, session, envelope): + """处理接收到的邮件数据""" + try: + logging.info(f"收到邮件: 发件人={envelope.mail_from}, 收件人={envelope.rcpt_tos}") + + # 保存原始邮件数据 + raw_data = envelope.content.decode('utf-8', errors='replace') + + # 解析邮件数据 + message = email_parser.parsestr(raw_data) + subject = message.get('Subject', '') + logging.info(f"邮件主题: {subject}") + + # 记录邮件结构和内容 + logging.debug(f"邮件结构: is_multipart={message.is_multipart()}") + if message.is_multipart(): + logging.debug(f"多部分邮件: 部分数量={len(list(message.walk()))}") + for i, part in enumerate(message.walk()): + content_type = part.get_content_type() + logging.debug(f"部分 {i+1}: 内容类型={content_type}") + + # 使用邮件存储服务保存邮件 + success, error_msg = await self.mail_store.save_email( + message, + envelope.mail_from, + envelope.rcpt_tos, + raw_data=raw_data + ) + + if success: + logging.info(f"邮件保存成功: 来自 {envelope.mail_from} 发送给 {envelope.rcpt_tos}") + return '250 消息接收完成' + else: + logging.error(f"邮件保存失败: {error_msg}") + return '451 处理邮件时出现错误,请稍后重试' + + except Exception as e: + logging.error(f"处理邮件时出错: {str(e)}") + traceback.print_exc() + return '451 处理邮件时出现错误,请稍后重试' \ No newline at end of file diff --git a/app/utils/__init__.py b/app/utils/__init__.py new file mode 100644 index 0000000..54838ee --- /dev/null +++ b/app/utils/__init__.py @@ -0,0 +1,9 @@ +""" +工具类和辅助函数 +""" +import email.parser as email_parser +import email.policy + +# 创建邮件解析器实例,用于解析邮件 +parser = email_parser.Parser() +parsestr = parser.parsestr \ No newline at end of file diff --git a/batch_registration_example.py b/batch_registration_example.py new file mode 100644 index 0000000..1daac05 --- /dev/null +++ b/batch_registration_example.py @@ -0,0 +1,196 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +import time +import logging +import random +import string +import json +import concurrent.futures +import argparse +from email_api_client import EmailApiClient + +# 配置日志 +logging.basicConfig( + level=logging.INFO, + format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' +) +logger = logging.getLogger("BatchRegistration") + +def generate_random_username(length=8): + """生成随机用户名""" + letters = string.ascii_lowercase + string.digits + return ''.join(random.choice(letters) for _ in range(length)) + +def register_account(client, domain, username=None): + """ + 模拟注册账号流程 + + 参数: + client: EmailApiClient实例 + domain: 邮箱域名 + username: 指定用户名,不指定则随机生成 + + 返回: + 注册结果字典 + """ + # 生成或使用提供的用户名 + username = username or generate_random_username() + email_address = f"{username}@{domain}" + + logger.info(f"开始注册账号: {email_address}") + + # 模拟注册操作 + logger.info(f"模拟向注册服务发送注册请求...") + # 这里是模拟,实际中应该调用真实的注册API + time.sleep(1) # 模拟API调用延迟 + + # 等待验证邮件 + logger.info(f"等待验证邮件...") + start_time = time.time() + email = client.wait_for_email( + email_address, + timeout=30, # 30秒超时 + subject="验证", # 查找主题包含"验证"的邮件 + keyword="验证码" # 内容包含"验证码"的邮件 + ) + + wait_time = time.time() - start_time + + if email: + # 提取验证码 + verification_code = client.extract_verification_code(email) + + # 模拟验证过程 + if verification_code: + logger.info(f"提取到验证码: {verification_code},正在验证...") + # 模拟提交验证码 + time.sleep(0.5) # 模拟API调用延迟 + + return { + 'success': True, + 'email_address': email_address, + 'verification_code': verification_code, + 'wait_time': wait_time, + 'message': '注册成功' + } + else: + logger.warning(f"未能提取验证码") + return { + 'success': False, + 'email_address': email_address, + 'wait_time': wait_time, + 'message': '未能提取验证码' + } + else: + logger.warning(f"未收到验证邮件") + return { + 'success': False, + 'email_address': email_address, + 'wait_time': wait_time, + 'message': '未收到验证邮件' + } + +def batch_register(domain, count, concurrent=2): + """ + 批量注册账号 + + 参数: + domain: 邮箱域名 + count: 注册数量 + concurrent: 并发数 + + 返回: + 注册结果列表 + """ + logger.info(f"开始批量注册 {count} 个账号,域名: {domain},并发数: {concurrent}") + + # 创建API客户端 + client = EmailApiClient() + + # 检查系统状态 + status = client.check_system_status() + if not status.get('success', False): + logger.error(f"邮件系统状态异常: {status}") + return [] + + # 生成用户名列表 + usernames = [generate_random_username() for _ in range(count)] + + results = [] + + # 使用线程池并发注册 + if concurrent > 1: + with concurrent.futures.ThreadPoolExecutor(max_workers=concurrent) as executor: + # 提交任务 + future_to_username = { + executor.submit(register_account, client, domain, username): username + for username in usernames + } + + # 获取结果 + for future in concurrent.futures.as_completed(future_to_username): + username = future_to_username[future] + try: + result = future.result() + results.append(result) + logger.info(f"账号 {username}@{domain} 注册完成: {result['success']}") + except Exception as e: + logger.error(f"账号 {username}@{domain} 注册出错: {str(e)}") + results.append({ + 'success': False, + 'email_address': f"{username}@{domain}", + 'message': f'注册过程出错: {str(e)}' + }) + else: + # 串行注册 + for username in usernames: + try: + result = register_account(client, domain, username) + results.append(result) + logger.info(f"账号 {username}@{domain} 注册完成: {result['success']}") + except Exception as e: + logger.error(f"账号 {username}@{domain} 注册出错: {str(e)}") + results.append({ + 'success': False, + 'email_address': f"{username}@{domain}", + 'message': f'注册过程出错: {str(e)}' + }) + + # 统计结果 + success_count = sum(1 for r in results if r.get('success', False)) + fail_count = len(results) - success_count + + logger.info(f"批量注册完成: 成功 {success_count} 个, 失败 {fail_count} 个") + + return results + +def save_results(results, filename): + """将结果保存到文件""" + with open(filename, 'w', encoding='utf-8') as f: + json.dump(results, f, ensure_ascii=False, indent=2) + logger.info(f"结果已保存到文件: {filename}") + +def main(): + """主函数""" + parser = argparse.ArgumentParser(description='批量注册账号示例') + parser.add_argument('--domain', type=str, default='nosqli.com', help='邮箱域名') + parser.add_argument('--count', type=int, default=5, help='注册数量') + parser.add_argument('--concurrent', type=int, default=2, help='并发数') + parser.add_argument('--output', type=str, default='registration_results.json', help='结果输出文件') + + args = parser.parse_args() + + # 执行批量注册 + results = batch_register(args.domain, args.count, args.concurrent) + + # 保存结果 + if results: + save_results(results, args.output) + + # 退出代码 + success_count = sum(1 for r in results if r.get('success', False)) + return 0 if success_count == args.count else 1 + +if __name__ == "__main__": + exit(main()) \ No newline at end of file diff --git a/config.example.py b/config.example.py new file mode 100644 index 0000000..9cf5872 --- /dev/null +++ b/config.example.py @@ -0,0 +1,120 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +""" +邮件系统配置文件示例 +复制此文件为 config.py 并根据实际情况修改配置 +""" + +import os +import logging +from datetime import timedelta + +# 基础配置 +DEBUG = True # 是否启用调试模式 +SECRET_KEY = "请替换为随机生成的安全密钥" # Flask应用密钥 +TESTING = False # 是否为测试环境 + +# 目录配置 +BASE_DIR = os.path.abspath(os.path.dirname(__file__)) +TEMP_DIR = os.path.join(BASE_DIR, "temp") # 临时文件目录 +UPLOAD_DIR = os.path.join(BASE_DIR, "uploads") # 上传文件目录 +ATTACHMENT_DIR = os.path.join(BASE_DIR, "attachments") # 邮件附件存储目录 +EMAIL_STORAGE_DIR = os.path.join(BASE_DIR, "emails") # 原始邮件存储目录 + +# 确保目录存在 +for dir_path in [TEMP_DIR, UPLOAD_DIR, ATTACHMENT_DIR, EMAIL_STORAGE_DIR]: + os.makedirs(dir_path, exist_ok=True) + +# 数据库配置 +# SQLite配置示例 +DB_TYPE = "sqlite" +DB_PATH = os.path.join(BASE_DIR, "email_system.db") +SQLALCHEMY_DATABASE_URI = f"sqlite:///{DB_PATH}" + +# MySQL配置示例 (取消注释并修改配置以使用MySQL) +# DB_TYPE = "mysql" +# MYSQL_HOST = "localhost" +# MYSQL_PORT = 3306 +# MYSQL_USER = "emailsystem" +# MYSQL_PASSWORD = "your_password" +# MYSQL_DB = "emailsystem" +# SQLALCHEMY_DATABASE_URI = f"mysql+pymysql://{MYSQL_USER}:{MYSQL_PASSWORD}@{MYSQL_HOST}:{MYSQL_PORT}/{MYSQL_DB}" + +SQLALCHEMY_TRACK_MODIFICATIONS = False +SQLALCHEMY_ECHO = False # 是否显示SQL语句 (调试用) + +# SMTP服务器配置 +SMTP_HOST = "0.0.0.0" # SMTP服务器地址 +SMTP_PORT = 25 # SMTP服务器端口 +SMTP_SSL_PORT = 465 # SMTP SSL端口 (如需启用) +SMTP_USE_SSL = False # 是否启用SSL +SMTP_USERNAME = None # SMTP用户名 (如需认证) +SMTP_PASSWORD = None # SMTP密码 (如需认证) +SMTP_MAX_MESSAGE_SIZE = 20 * 1024 * 1024 # 最大邮件大小 (20MB) +SMTP_TIMEOUT = 60 # SMTP超时时间 (秒) + +# API配置 +API_PREFIX = "/api" # API前缀 +API_DEFAULT_LIMIT = 10 # 默认分页大小 +API_MAX_LIMIT = 100 # 最大分页大小 +API_RATE_LIMIT = "60/minute" # API速率限制 +API_VERSION = "1.0.0" # API版本 + +# 监控和日志配置 +LOG_LEVEL = logging.INFO # 日志级别 +LOG_FILE = os.path.join(BASE_DIR, "email_system.log") # 日志文件路径 +LOG_FORMAT = "%(asctime)s - %(name)s - %(levelname)s - %(message)s" # 日志格式 +LOG_MAX_SIZE = 10 * 1024 * 1024 # 日志文件最大大小 (10MB) +LOG_BACKUP_COUNT = 5 # 日志文件备份数 + +# Web界面配置 +WEB_HOST = "0.0.0.0" # Web服务器地址 +WEB_PORT = 5000 # Web服务器端口 +WEB_USE_SSL = False # 是否启用SSL +WEB_SSL_CERT = None # SSL证书路径 +WEB_SSL_KEY = None # SSL密钥路径 + +# 邮件系统配置 +ALLOWED_DOMAINS = ["example.com", "test.com"] # 允许接收的域名列表 +AUTO_CREATE_DOMAIN = True # 是否自动创建域名 +AUTO_CREATE_MAILBOX = True # 是否自动创建邮箱 +DEFAULT_ENCODING = "utf-8" # 默认编码 + +# 缓存配置 +CACHE_TYPE = "simple" # 缓存类型 (simple, redis, memcached等) +CACHE_DEFAULT_TIMEOUT = 300 # 缓存默认超时时间 (秒) + +# Redis配置 (如使用Redis缓存) +# REDIS_HOST = "localhost" +# REDIS_PORT = 6379 +# REDIS_PASSWORD = None +# REDIS_DB = 0 + +# 验证码提取配置 +VERIFICATION_CODE_PATTERNS = [ + r"验证码[::\s]*([0-9]{4,6})", # 中文验证码格式 + r"verification code[::\s]*([0-9]{4,6})", # 英文验证码格式 + r"code[::\s]*([0-9a-zA-Z]{4,8})", # 通用验证码格式 + r"([0-9]{6})" # 纯数字验证码 +] + +# 邮件清理配置 +EMAIL_RETENTION_DAYS = 30 # 邮件保留天数 +EMAIL_CLEANUP_INTERVAL = 60 * 60 * 24 # 邮件清理间隔 (秒) +ATTACHMENT_MAX_SIZE = 50 * 1024 * 1024 # 附件最大大小 (50MB) + +# 安全配置 +TRUSTED_HOSTS = ["localhost", "127.0.0.1"] # 信任的主机列表 +CORS_ORIGINS = ["http://localhost:3000", "http://localhost:5000"] # CORS允许的源 +JWT_SECRET_KEY = "请替换为随机生成的JWT密钥" # JWT密钥 +JWT_ACCESS_TOKEN_EXPIRES = timedelta(hours=1) # JWT访问令牌过期时间 +JWT_REFRESH_TOKEN_EXPIRES = timedelta(days=30) # JWT刷新令牌过期时间 + +# 性能配置 +WORKER_PROCESSES = os.cpu_count() or 4 # 工作进程数 +WORKER_THREADS = 2 # 每个进程的线程数 +MAX_REQUESTS = 1000 # 每个工作进程处理的最大请求数 + +# 自定义配置 +# 在此处添加您的自定义配置 \ No newline at end of file diff --git a/email-system.service b/email-system.service new file mode 100644 index 0000000..de4d3b0 --- /dev/null +++ b/email-system.service @@ -0,0 +1,19 @@ +[Unit] +Description=Email System Service +After=network.target +Wants=network.target + +[Service] +Type=simple +User=root +WorkingDirectory=/opt/emailsystem +ExecStart=/usr/bin/python3 run.py --host 0.0.0.0 --port 5000 --smtp-port 25 +Restart=always +RestartSec=10 +StandardOutput=syslog +StandardError=syslog +SyslogIdentifier=email-system +Environment=PYTHONUNBUFFERED=1 + +[Install] +WantedBy=multi-user.target \ No newline at end of file diff --git a/email_api_client.py b/email_api_client.py new file mode 100644 index 0000000..c30c562 --- /dev/null +++ b/email_api_client.py @@ -0,0 +1,244 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +import requests +import json +import time +import logging + +class EmailApiClient: + """ + 邮件系统API客户端 + 用于在批量注册场景中与邮件系统API交互 + """ + + def __init__(self, api_base_url="http://localhost:5000/api", timeout=10): + """ + 初始化API客户端 + + 参数: + api_base_url: API基础URL + timeout: 请求超时时间(秒) + """ + self.api_base_url = api_base_url + self.timeout = timeout + self.logger = logging.getLogger("EmailApiClient") + + # 设置日志 + if not self.logger.handlers: + handler = logging.StreamHandler() + formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') + handler.setFormatter(formatter) + self.logger.addHandler(handler) + self.logger.setLevel(logging.INFO) + + def get_emails_by_address(self, email_address, limit=10, unread_only=False, since=None): + """ + 通过邮箱地址获取最新邮件 + + 参数: + email_address: 完整邮箱地址 (例如: user@example.com) + limit: 返回的邮件数量 (默认: 10) + unread_only: 是否只返回未读邮件 (默认: False) + since: 从指定时间戳后获取邮件 (可选) + + 返回: + API响应的JSON数据 + """ + url = f"{self.api_base_url}/emails/by-address" + + # 构建参数 + params = { + 'email_address': email_address, + 'limit': limit + } + + if unread_only: + params['unread_only'] = 'true' + + if since: + params['since'] = since + + # 发送请求 + try: + self.logger.debug(f"请求 URL: {url} 参数: {params}") + response = requests.get(url, params=params, timeout=self.timeout) + + if response.status_code == 200: + data = response.json() + if data.get('success'): + self.logger.info(f"成功获取邮件: {email_address}, 数量: {data.get('count', 0)}") + else: + self.logger.warning(f"获取邮件失败: {data.get('error')}") + return data + else: + self.logger.error(f"请求失败: {response.status_code}, {response.text}") + return { + 'success': False, + 'error': f'HTTP错误: {response.status_code}', + 'details': response.text + } + except Exception as e: + self.logger.error(f"请求出错: {str(e)}") + return { + 'success': False, + 'error': f'请求出错: {str(e)}' + } + + def wait_for_email(self, email_address, timeout=60, check_interval=2, keyword=None, subject=None): + """ + 等待接收特定邮件 + + 参数: + email_address: 邮箱地址 + timeout: 超时时间(秒) + check_interval: 检查间隔(秒) + keyword: 邮件内容关键词 + subject: 邮件主题关键词 + + 返回: + 找到的第一封匹配邮件,或超时后返回None + """ + self.logger.info(f"等待邮件: {email_address}, 超时: {timeout}s, 关键词: {keyword}, 主题: {subject}") + start_time = time.time() + since = start_time - 60 # 获取最近1分钟的邮件 + + while time.time() - start_time < timeout: + # 获取最新邮件 + result = self.get_emails_by_address(email_address, limit=5, since=since) + + if result and result.get('success'): + emails = result.get('emails', []) + + # 更新时间戳 + since = result.get('timestamp') + + # 检查是否有匹配的邮件 + for email in emails: + # 检查主题 + if subject and subject.lower() not in email.get('subject', '').lower(): + continue + + # 检查内容(如果提供了关键词) + if keyword: + email_text = (email.get('body_text', '') or '') + (email.get('body_html', '') or '') + if keyword.lower() not in email_text.lower(): + continue + + # 找到匹配的邮件 + self.logger.info(f"找到匹配的邮件: ID={email.get('id')}, 主题={email.get('subject')}") + return email + + # 等待下一次检查 + time.sleep(check_interval) + + # 超时 + self.logger.warning(f"等待邮件超时: {email_address}") + return None + + def extract_verification_code(self, email, patterns=None): + """ + 从邮件中提取验证码 + + 参数: + email: 邮件对象 + patterns: 自定义正则表达式列表 + + 返回: + 提取到的验证码,或None + """ + # 如果邮件已经包含验证码字段,直接返回 + if email.get('verification_code'): + return email.get('verification_code') + + import re + + # 默认验证码模式 + default_patterns = [ + r'验证码[::\s]+(\d{4,8})', + r'verification code[::\s]+(\d{4,8})', + r'code[::\s]+(\d{4,8})', + r'(\d{6})', + r'>(\d{6})<', + r'[\s>](\d{6})[\s<]' + ] + + search_patterns = patterns or default_patterns + + # 构建搜索文本 + text = f"{email.get('subject', '')} {email.get('body_text', '')} {email.get('body_html', '')}" + + # 尝试每个模式 + for pattern in search_patterns: + matches = re.findall(pattern, text) + if matches: + code = matches[0] + self.logger.info(f"从邮件中提取到验证码: {code}") + return code + + self.logger.warning("未能从邮件中提取验证码") + return None + + def check_system_status(self): + """ + 检查邮件系统状态 + + 返回: + 系统状态信息 + """ + url = f"{self.api_base_url}/status" + + try: + response = requests.get(url, timeout=self.timeout) + + if response.status_code == 200: + return response.json() + else: + self.logger.error(f"检查系统状态失败: {response.status_code}, {response.text}") + return { + 'success': False, + 'error': f'HTTP错误: {response.status_code}' + } + except Exception as e: + self.logger.error(f"检查系统状态出错: {str(e)}") + return { + 'success': False, + 'error': f'请求出错: {str(e)}' + } + + +# 演示用法 +def demo(): + """演示API客户端使用方法""" + # 创建客户端 + client = EmailApiClient() + + # 检查系统状态 + status = client.check_system_status() + print(f"系统状态: {status}") + + # 使用测试邮箱 + test_email = "testuser@nosqli.com" + + # 获取最新邮件 + emails = client.get_emails_by_address(test_email, limit=5) + print(f"邮件查询结果: {json.dumps(emails, indent=2, ensure_ascii=False)}") + + # 等待特定邮件 + print(f"等待新邮件...") + email = client.wait_for_email(test_email, timeout=10, subject="验证") + + if email: + print(f"收到新邮件: {email.get('subject')}") + verification_code = client.extract_verification_code(email) + print(f"提取到的验证码: {verification_code}") + else: + print("未收到新邮件") + + +if __name__ == "__main__": + # 配置日志 + logging.basicConfig(level=logging.INFO) + + # 演示 + demo() \ No newline at end of file diff --git a/test_email_by_address.py b/test_email_by_address.py new file mode 100644 index 0000000..036d785 --- /dev/null +++ b/test_email_by_address.py @@ -0,0 +1,116 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +import requests +import json +import sys +import time + +def get_emails_by_address(email_address, limit=10, unread_only=False, since=None): + """ + 通过邮箱地址获取最新邮件 + + 参数: + email_address: 完整邮箱地址 (例如: user@example.com) + limit: 返回的邮件数量 (默认: 10) + unread_only: 是否只返回未读邮件 (默认: False) + since: 从指定时间戳后获取邮件 (可选) + + 返回: + API响应的JSON数据 + """ + base_url = "http://localhost:5000/api/emails/by-address" + + # 构建参数 + params = { + 'email_address': email_address, + 'limit': limit + } + + if unread_only: + params['unread_only'] = 'true' + + if since: + params['since'] = since + + # 发送请求 + try: + print(f"请求 URL: {base_url}?{'&'.join([f'{k}={v}' for k, v in params.items()])}") + response = requests.get(base_url, params=params) + + # 打印响应信息 + print(f"状态码: {response.status_code}") + + if response.status_code == 200: + data = response.json() + return data + else: + print(f"请求失败: {response.text}") + return None + except Exception as e: + print(f"请求出错: {str(e)}") + return None + +def format_email_info(email): + """格式化邮件信息显示""" + result = [] + result.append(f"ID: {email.get('id')}") + result.append(f"主题: {email.get('subject')}") + result.append(f"发件人: {email.get('sender')}") + result.append(f"收件人: {email.get('recipients')}") + result.append(f"接收时间: {email.get('received_at')}") + + if 'verification_code' in email and email['verification_code']: + result.append(f"验证码: {email.get('verification_code')}") + + if 'verification_link' in email and email['verification_link']: + result.append(f"验证链接: {email.get('verification_link')}") + + return "\n".join(result) + +def main(): + # 检查命令行参数 + if len(sys.argv) < 2: + print("用法: python test_email_by_address.py <邮箱地址> [limit] [unread_only] [since]") + print("例如: python test_email_by_address.py test@example.com 5 true") + return + + # 解析参数 + email_address = sys.argv[1] + limit = int(sys.argv[2]) if len(sys.argv) > 2 else 10 + unread_only = sys.argv[3].lower() == 'true' if len(sys.argv) > 3 else False + since = float(sys.argv[4]) if len(sys.argv) > 4 else None + + print(f"获取邮箱 {email_address} 的最新邮件") + print(f"参数: limit={limit}, unread_only={unread_only}, since={since}") + + # 获取邮件 + result = get_emails_by_address(email_address, limit, unread_only, since) + + # 显示结果 + if result and result.get('success'): + print("\n===== 查询结果 =====") + print(f"邮箱地址: {result.get('email_address')}") + print(f"邮箱ID: {result.get('mailbox_id')}") + print(f"总邮件数: {result.get('total')}") + print(f"返回邮件数: {result.get('count')}") + + # 显示邮件详情 + emails = result.get('emails', []) + if emails: + print("\n----- 邮件列表 -----") + for i, email in enumerate(emails, 1): + print(f"\n邮件 {i}:") + print(format_email_info(email)) + print("-" * 40) + else: + print("\n没有找到邮件") + + # 返回时间戳,可用于下次查询 + print(f"\n当前时间戳: {result.get('timestamp')}") + print("在下次查询时可以使用此时间戳作为since参数,仅获取新邮件") + else: + print("查询失败或没有结果") + +if __name__ == "__main__": + main() \ No newline at end of file