Files
emailsystem/email_api_client.py
huangzhenpc 5cc138b69a 修复
2025-02-26 10:21:48 +08:00

245 lines
8.1 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python
# -*- coding: utf-8 -*-
import requests
import json
import time
import logging
import re
class EmailApiClient:
"""
邮件系统API客户端
用于在批量注册场景中与邮件系统API交互
"""
def __init__(self, api_base_url="http://74.48.75.19:5000/api", timeout=10):
"""
初始化API客户端
参数:
api_base_url: API基础URL
timeout: 请求超时时间(秒)
"""
self.api_base_url = api_base_url
self.timeout = timeout
# 设置日志
self.logger = logging.getLogger("EmailApiClient")
if not self.logger.handlers:
handler = logging.StreamHandler()
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
handler.setFormatter(formatter)
self.logger.addHandler(handler)
self.logger.setLevel(logging.INFO)
self.logger.info(f"初始化API客户端: {api_base_url}")
def get_emails_by_address(self, email_address, limit=10, unread_only=False, since=None):
"""
通过邮箱地址获取最新邮件
参数:
email_address: 完整邮箱地址 (例如: user@example.com)
limit: 返回的邮件数量 (默认: 10)
unread_only: 是否只返回未读邮件 (默认: False)
since: 从指定时间戳后获取邮件 (可选)
返回:
API响应的JSON数据
"""
url = f"{self.api_base_url}/emails/by-address"
# 构建参数
params = {
'email_address': email_address,
'limit': limit
}
if unread_only:
params['unread_only'] = 'true'
if since:
params['since'] = since
# 发送请求
try:
self.logger.debug(f"请求 URL: {url} 参数: {params}")
response = requests.get(url, params=params, timeout=self.timeout)
if response.status_code == 200:
data = response.json()
if data.get('success'):
self.logger.info(f"成功获取邮件: {email_address}, 数量: {data.get('count', 0)}")
else:
self.logger.warning(f"获取邮件失败: {data.get('error')}")
return data
else:
self.logger.error(f"请求失败: {response.status_code}, {response.text}")
return {
'success': False,
'error': f'HTTP错误: {response.status_code}',
'details': response.text
}
except Exception as e:
self.logger.error(f"请求出错: {str(e)}")
return {
'success': False,
'error': f'请求出错: {str(e)}'
}
def wait_for_email(self, email_address, timeout=60, check_interval=2, keyword=None, subject=None):
"""
等待接收特定邮件
参数:
email_address: 邮箱地址
timeout: 超时时间(秒)
check_interval: 检查间隔(秒)
keyword: 邮件内容关键词
subject: 邮件主题关键词
返回:
找到的第一封匹配邮件或超时后返回None
"""
self.logger.info(f"等待邮件: {email_address}, 超时: {timeout}s, 关键词: {keyword}, 主题: {subject}")
start_time = time.time()
since = start_time - 60 # 获取最近1分钟的邮件
while time.time() - start_time < timeout:
# 获取最新邮件
result = self.get_emails_by_address(email_address, limit=5, since=since)
if result and result.get('success'):
emails = result.get('emails', [])
# 更新时间戳
since = result.get('timestamp')
# 检查是否有匹配的邮件
for email in emails:
# 检查主题
if subject and subject.lower() not in email.get('subject', '').lower():
continue
# 检查内容(如果提供了关键词)
if keyword:
email_text = (email.get('body_text', '') or '') + (email.get('body_html', '') or '')
if keyword.lower() not in email_text.lower():
continue
# 找到匹配的邮件
self.logger.info(f"找到匹配的邮件: ID={email.get('id')}, 主题={email.get('subject')}")
return email
# 等待下一次检查
time.sleep(check_interval)
# 超时
self.logger.warning(f"等待邮件超时: {email_address}")
return None
def extract_verification_code(self, email, patterns=None):
"""
从邮件中提取验证码
参数:
email: 邮件对象
patterns: 自定义正则表达式列表
返回:
提取到的验证码或None
"""
# 如果邮件已经包含验证码字段,直接返回
if email.get('verification_code'):
return email.get('verification_code')
# 默认验证码模式
default_patterns = [
r'验证码[:\s]+(\d{4,8})',
r'verification code[:\s]+(\d{4,8})',
r'code[:\s]+(\d{4,8})',
r'(\d{6})</div>',
r'>(\d{6})<',
r'[\s>](\d{6})[\s<]'
]
search_patterns = patterns or default_patterns
# 构建搜索文本
text = f"{email.get('subject', '')} {email.get('body_text', '')} {email.get('body_html', '')}"
# 尝试每个模式
for pattern in search_patterns:
matches = re.findall(pattern, text)
if matches:
code = matches[0]
self.logger.info(f"从邮件中提取到验证码: {code}")
return code
self.logger.warning("未能从邮件中提取验证码")
return None
def check_system_status(self):
"""
检查邮件系统状态
返回:
系统状态信息
"""
url = f"{self.api_base_url}/status"
try:
response = requests.get(url, timeout=self.timeout)
if response.status_code == 200:
return response.json()
else:
self.logger.error(f"检查系统状态失败: {response.status_code}, {response.text}")
return {
'success': False,
'error': f'HTTP错误: {response.status_code}'
}
except Exception as e:
self.logger.error(f"检查系统状态出错: {str(e)}")
return {
'success': False,
'error': f'请求出错: {str(e)}'
}
# 演示用法
def demo():
"""演示API客户端使用方法"""
# 创建客户端
client = EmailApiClient()
# 检查系统状态
status = client.check_system_status()
print(f"系统状态: {status}")
# 使用测试邮箱
test_email = "testuser@nosqli.com"
# 获取最新邮件
emails = client.get_emails_by_address(test_email, limit=5)
print(f"邮件查询结果: {json.dumps(emails, indent=2, ensure_ascii=False)}")
# 等待特定邮件
print(f"等待新邮件...")
email = client.wait_for_email(test_email, timeout=10, subject="验证")
if email:
print(f"收到新邮件: {email.get('subject')}")
verification_code = client.extract_verification_code(email)
print(f"提取到的验证码: {verification_code}")
else:
print("未收到新邮件")
if __name__ == "__main__":
# 配置日志
logging.basicConfig(level=logging.INFO)
# 演示
demo()