#!/usr/bin/env python # -*- coding: utf-8 -*- import requests import json import time import logging class EmailApiClient: """ 邮件系统API客户端 用于在批量注册场景中与邮件系统API交互 """ def __init__(self, api_base_url="http://localhost:5000/api", timeout=10): """ 初始化API客户端 参数: api_base_url: API基础URL timeout: 请求超时时间(秒) """ self.api_base_url = api_base_url self.timeout = timeout self.logger = logging.getLogger("EmailApiClient") # 设置日志 if not self.logger.handlers: handler = logging.StreamHandler() formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') handler.setFormatter(formatter) self.logger.addHandler(handler) self.logger.setLevel(logging.INFO) def get_emails_by_address(self, email_address, limit=10, unread_only=False, since=None): """ 通过邮箱地址获取最新邮件 参数: email_address: 完整邮箱地址 (例如: user@example.com) limit: 返回的邮件数量 (默认: 10) unread_only: 是否只返回未读邮件 (默认: False) since: 从指定时间戳后获取邮件 (可选) 返回: API响应的JSON数据 """ url = f"{self.api_base_url}/emails/by-address" # 构建参数 params = { 'email_address': email_address, 'limit': limit } if unread_only: params['unread_only'] = 'true' if since: params['since'] = since # 发送请求 try: self.logger.debug(f"请求 URL: {url} 参数: {params}") response = requests.get(url, params=params, timeout=self.timeout) if response.status_code == 200: data = response.json() if data.get('success'): self.logger.info(f"成功获取邮件: {email_address}, 数量: {data.get('count', 0)}") else: self.logger.warning(f"获取邮件失败: {data.get('error')}") return data else: self.logger.error(f"请求失败: {response.status_code}, {response.text}") return { 'success': False, 'error': f'HTTP错误: {response.status_code}', 'details': response.text } except Exception as e: self.logger.error(f"请求出错: {str(e)}") return { 'success': False, 'error': f'请求出错: {str(e)}' } def wait_for_email(self, email_address, timeout=60, check_interval=2, keyword=None, subject=None): """ 等待接收特定邮件 参数: email_address: 邮箱地址 timeout: 超时时间(秒) check_interval: 检查间隔(秒) keyword: 邮件内容关键词 subject: 邮件主题关键词 返回: 找到的第一封匹配邮件,或超时后返回None """ self.logger.info(f"等待邮件: {email_address}, 超时: {timeout}s, 关键词: {keyword}, 主题: {subject}") start_time = time.time() since = start_time - 60 # 获取最近1分钟的邮件 while time.time() - start_time < timeout: # 获取最新邮件 result = self.get_emails_by_address(email_address, limit=5, since=since) if result and result.get('success'): emails = result.get('emails', []) # 更新时间戳 since = result.get('timestamp') # 检查是否有匹配的邮件 for email in emails: # 检查主题 if subject and subject.lower() not in email.get('subject', '').lower(): continue # 检查内容(如果提供了关键词) if keyword: email_text = (email.get('body_text', '') or '') + (email.get('body_html', '') or '') if keyword.lower() not in email_text.lower(): continue # 找到匹配的邮件 self.logger.info(f"找到匹配的邮件: ID={email.get('id')}, 主题={email.get('subject')}") return email # 等待下一次检查 time.sleep(check_interval) # 超时 self.logger.warning(f"等待邮件超时: {email_address}") return None def extract_verification_code(self, email, patterns=None): """ 从邮件中提取验证码 参数: email: 邮件对象 patterns: 自定义正则表达式列表 返回: 提取到的验证码,或None """ # 如果邮件已经包含验证码字段,直接返回 if email.get('verification_code'): return email.get('verification_code') import re # 默认验证码模式 default_patterns = [ r'验证码[::\s]+(\d{4,8})', r'verification code[::\s]+(\d{4,8})', r'code[::\s]+(\d{4,8})', r'(\d{6})', r'>(\d{6})<', r'[\s>](\d{6})[\s<]' ] search_patterns = patterns or default_patterns # 构建搜索文本 text = f"{email.get('subject', '')} {email.get('body_text', '')} {email.get('body_html', '')}" # 尝试每个模式 for pattern in search_patterns: matches = re.findall(pattern, text) if matches: code = matches[0] self.logger.info(f"从邮件中提取到验证码: {code}") return code self.logger.warning("未能从邮件中提取验证码") return None def check_system_status(self): """ 检查邮件系统状态 返回: 系统状态信息 """ url = f"{self.api_base_url}/status" try: response = requests.get(url, timeout=self.timeout) if response.status_code == 200: return response.json() else: self.logger.error(f"检查系统状态失败: {response.status_code}, {response.text}") return { 'success': False, 'error': f'HTTP错误: {response.status_code}' } except Exception as e: self.logger.error(f"检查系统状态出错: {str(e)}") return { 'success': False, 'error': f'请求出错: {str(e)}' } # 演示用法 def demo(): """演示API客户端使用方法""" # 创建客户端 client = EmailApiClient() # 检查系统状态 status = client.check_system_status() print(f"系统状态: {status}") # 使用测试邮箱 test_email = "testuser@nosqli.com" # 获取最新邮件 emails = client.get_emails_by_address(test_email, limit=5) print(f"邮件查询结果: {json.dumps(emails, indent=2, ensure_ascii=False)}") # 等待特定邮件 print(f"等待新邮件...") email = client.wait_for_email(test_email, timeout=10, subject="验证") if email: print(f"收到新邮件: {email.get('subject')}") verification_code = client.extract_verification_code(email) print(f"提取到的验证码: {verification_code}") else: print("未收到新邮件") if __name__ == "__main__": # 配置日志 logging.basicConfig(level=logging.INFO) # 演示 demo()