Files
emailsystem/test_email_by_address.py
huangzhenpc 5cc138b69a 修复
2025-02-26 10:21:48 +08:00

126 lines
4.0 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python
# -*- coding: utf-8 -*-
import requests
import json
import sys
import time
def get_emails_by_address(email_address, limit=10, unread_only=False, since=None):
"""
通过邮箱地址获取最新邮件
参数:
email_address: 完整邮箱地址 (例如: user@example.com)
limit: 返回的邮件数量 (默认: 10)
unread_only: 是否只返回未读邮件 (默认: False)
since: 从指定时间戳后获取邮件 (可选)
返回:
API响应的JSON数据
"""
# 构建API URL
api_url = f"http://74.48.75.19:5000/api/emails/by-address"
# 构建查询参数
params = {
"email_address": email_address,
"limit": limit
}
# 添加可选参数
if unread_only:
params["unread_only"] = "true"
if since:
params["since"] = since
# 打印请求信息
print(f"请求URL: {api_url}")
print(f"参数: {params}")
try:
# 发送API请求
response = requests.get(api_url, params=params, timeout=15)
# 打印响应状态
print(f"响应状态码: {response.status_code}")
# 返回JSON数据
if response.status_code == 200:
return response.json()
else:
print(f"API请求失败: {response.text}")
return {
"success": False,
"error": f"请求失败: HTTP {response.status_code}"
}
except Exception as e:
print(f"发生错误: {str(e)}")
return {
"success": False,
"error": f"发生异常: {str(e)}"
}
def format_email_info(email):
"""格式化邮件信息显示"""
result = []
result.append(f"ID: {email.get('id')}")
result.append(f"主题: {email.get('subject')}")
result.append(f"发件人: {email.get('sender')}")
result.append(f"收件人: {email.get('recipients')}")
result.append(f"接收时间: {email.get('received_at')}")
if 'verification_code' in email and email['verification_code']:
result.append(f"验证码: {email.get('verification_code')}")
if 'verification_link' in email and email['verification_link']:
result.append(f"验证链接: {email.get('verification_link')}")
return "\n".join(result)
def main():
# 检查命令行参数
if len(sys.argv) < 2:
print("用法: python test_email_by_address.py <邮箱地址> [limit] [unread_only] [since]")
print("例如: python test_email_by_address.py test@example.com 5 true")
return
# 解析参数
email_address = sys.argv[1]
limit = int(sys.argv[2]) if len(sys.argv) > 2 else 10
unread_only = sys.argv[3].lower() == 'true' if len(sys.argv) > 3 else False
since = float(sys.argv[4]) if len(sys.argv) > 4 else None
print(f"获取邮箱 {email_address} 的最新邮件")
print(f"参数: limit={limit}, unread_only={unread_only}, since={since}")
# 获取邮件
result = get_emails_by_address(email_address, limit, unread_only, since)
# 显示结果
if result and result.get('success'):
print("\n===== 查询结果 =====")
print(f"邮箱地址: {result.get('email_address')}")
print(f"邮箱ID: {result.get('mailbox_id')}")
print(f"总邮件数: {result.get('total')}")
print(f"返回邮件数: {result.get('count')}")
# 显示邮件详情
emails = result.get('emails', [])
if emails:
print("\n----- 邮件列表 -----")
for i, email in enumerate(emails, 1):
print(f"\n邮件 {i}:")
print(format_email_info(email))
print("-" * 40)
else:
print("\n没有找到邮件")
# 返回时间戳,可用于下次查询
print(f"\n当前时间戳: {result.get('timestamp')}")
print("在下次查询时可以使用此时间戳作为since参数仅获取新邮件")
else:
print("查询失败或没有结果")
if __name__ == "__main__":
main()