126 lines
4.0 KiB
Python
126 lines
4.0 KiB
Python
#!/usr/bin/env python
|
||
# -*- coding: utf-8 -*-
|
||
|
||
import requests
|
||
import json
|
||
import sys
|
||
import time
|
||
|
||
def get_emails_by_address(email_address, limit=10, unread_only=False, since=None):
|
||
"""
|
||
通过邮箱地址获取最新邮件
|
||
|
||
参数:
|
||
email_address: 完整邮箱地址 (例如: user@example.com)
|
||
limit: 返回的邮件数量 (默认: 10)
|
||
unread_only: 是否只返回未读邮件 (默认: False)
|
||
since: 从指定时间戳后获取邮件 (可选)
|
||
|
||
返回:
|
||
API响应的JSON数据
|
||
"""
|
||
# 构建API URL
|
||
api_url = f"http://74.48.75.19:5000/api/emails/by-address"
|
||
|
||
# 构建查询参数
|
||
params = {
|
||
"email_address": email_address,
|
||
"limit": limit
|
||
}
|
||
|
||
# 添加可选参数
|
||
if unread_only:
|
||
params["unread_only"] = "true"
|
||
if since:
|
||
params["since"] = since
|
||
|
||
# 打印请求信息
|
||
print(f"请求URL: {api_url}")
|
||
print(f"参数: {params}")
|
||
|
||
try:
|
||
# 发送API请求
|
||
response = requests.get(api_url, params=params, timeout=15)
|
||
|
||
# 打印响应状态
|
||
print(f"响应状态码: {response.status_code}")
|
||
|
||
# 返回JSON数据
|
||
if response.status_code == 200:
|
||
return response.json()
|
||
else:
|
||
print(f"API请求失败: {response.text}")
|
||
return {
|
||
"success": False,
|
||
"error": f"请求失败: HTTP {response.status_code}"
|
||
}
|
||
except Exception as e:
|
||
print(f"发生错误: {str(e)}")
|
||
return {
|
||
"success": False,
|
||
"error": f"发生异常: {str(e)}"
|
||
}
|
||
|
||
def format_email_info(email):
|
||
"""格式化邮件信息显示"""
|
||
result = []
|
||
result.append(f"ID: {email.get('id')}")
|
||
result.append(f"主题: {email.get('subject')}")
|
||
result.append(f"发件人: {email.get('sender')}")
|
||
result.append(f"收件人: {email.get('recipients')}")
|
||
result.append(f"接收时间: {email.get('received_at')}")
|
||
|
||
if 'verification_code' in email and email['verification_code']:
|
||
result.append(f"验证码: {email.get('verification_code')}")
|
||
|
||
if 'verification_link' in email and email['verification_link']:
|
||
result.append(f"验证链接: {email.get('verification_link')}")
|
||
|
||
return "\n".join(result)
|
||
|
||
def main():
|
||
# 检查命令行参数
|
||
if len(sys.argv) < 2:
|
||
print("用法: python test_email_by_address.py <邮箱地址> [limit] [unread_only] [since]")
|
||
print("例如: python test_email_by_address.py test@example.com 5 true")
|
||
return
|
||
|
||
# 解析参数
|
||
email_address = sys.argv[1]
|
||
limit = int(sys.argv[2]) if len(sys.argv) > 2 else 10
|
||
unread_only = sys.argv[3].lower() == 'true' if len(sys.argv) > 3 else False
|
||
since = float(sys.argv[4]) if len(sys.argv) > 4 else None
|
||
|
||
print(f"获取邮箱 {email_address} 的最新邮件")
|
||
print(f"参数: limit={limit}, unread_only={unread_only}, since={since}")
|
||
|
||
# 获取邮件
|
||
result = get_emails_by_address(email_address, limit, unread_only, since)
|
||
|
||
# 显示结果
|
||
if result and result.get('success'):
|
||
print("\n===== 查询结果 =====")
|
||
print(f"邮箱地址: {result.get('email_address')}")
|
||
print(f"邮箱ID: {result.get('mailbox_id')}")
|
||
print(f"总邮件数: {result.get('total')}")
|
||
print(f"返回邮件数: {result.get('count')}")
|
||
|
||
# 显示邮件详情
|
||
emails = result.get('emails', [])
|
||
if emails:
|
||
print("\n----- 邮件列表 -----")
|
||
for i, email in enumerate(emails, 1):
|
||
print(f"\n邮件 {i}:")
|
||
print(format_email_info(email))
|
||
print("-" * 40)
|
||
else:
|
||
print("\n没有找到邮件")
|
||
|
||
# 返回时间戳,可用于下次查询
|
||
print(f"\n当前时间戳: {result.get('timestamp')}")
|
||
print("在下次查询时可以使用此时间戳作为since参数,仅获取新邮件")
|
||
else:
|
||
print("查询失败或没有结果")
|
||
|
||
if __name__ == "__main__":
|
||
main() |