Files
emailsystem/test_core_features.py
2025-02-25 19:50:00 +08:00

284 lines
9.3 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python
# -*- coding: utf-8 -*-
import requests
import smtplib
import time
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
import logging
import sys
# 配置日志
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger('CoreFeatureTest')
# API配置
API_BASE_URL = "http://localhost:5000/api"
SMTP_HOST = "localhost"
SMTP_PORT = 3825
def print_divider(title=""):
"""打印分隔线"""
line = "=" * 80
if title:
print(f"\n{line}\n{title.center(80)}\n{line}\n")
else:
print(f"\n{line}\n")
def get_all_domains():
"""获取所有域名"""
try:
response = requests.get(f"{API_BASE_URL}/domains")
if response.status_code == 200:
domains = response.json().get('domains', [])
logger.info(f"获取到 {len(domains)} 个域名")
return domains
else:
logger.error(f"获取域名失败: HTTP {response.status_code}")
return []
except Exception as e:
logger.error(f"获取域名出错: {str(e)}")
return []
def create_domain(domain_name, description=""):
"""创建新域名"""
try:
data = {
"name": domain_name,
"description": description
}
response = requests.post(f"{API_BASE_URL}/domains", json=data)
if response.status_code in (200, 201):
result = response.json()
domain = result.get("domain", {})
logger.info(f"成功创建域名: {domain_name} (ID: {domain.get('id')})")
return domain
else:
logger.error(f"创建域名失败: HTTP {response.status_code} - {response.text}")
return None
except Exception as e:
logger.error(f"创建域名出错: {str(e)}")
return None
def get_all_mailboxes():
"""获取所有邮箱"""
try:
response = requests.get(f"{API_BASE_URL}/mailboxes")
if response.status_code == 200:
mailboxes = response.json().get('mailboxes', [])
logger.info(f"获取到 {len(mailboxes)} 个邮箱")
return mailboxes
else:
logger.error(f"获取邮箱失败: HTTP {response.status_code}")
return []
except Exception as e:
logger.error(f"获取邮箱出错: {str(e)}")
return []
def create_mailbox(domain_id, address):
"""创建新邮箱"""
try:
data = {
"domain_id": domain_id,
"address": address
}
response = requests.post(f"{API_BASE_URL}/mailboxes", json=data)
if response.status_code in (200, 201):
result = response.json()
mailbox = result.get("mailbox", {})
logger.info(f"成功创建邮箱: {mailbox.get('full_address')} (ID: {mailbox.get('id')})")
return mailbox
else:
logger.error(f"创建邮箱失败: HTTP {response.status_code} - {response.text}")
return None
except Exception as e:
logger.error(f"创建邮箱出错: {str(e)}")
return None
def get_mailbox_emails(mailbox_id):
"""获取指定邮箱的所有邮件"""
try:
response = requests.get(f"{API_BASE_URL}/mailboxes/{mailbox_id}/emails")
if response.status_code == 200:
emails = response.json().get('emails', [])
logger.info(f"获取到邮箱ID={mailbox_id}{len(emails)} 封邮件")
return emails
else:
logger.error(f"获取邮件失败: HTTP {response.status_code}")
return []
except Exception as e:
logger.error(f"获取邮件出错: {str(e)}")
return []
def send_email(from_addr, to_addr, subject, body_text="", body_html=""):
"""发送邮件"""
try:
# 创建邮件
msg = MIMEMultipart('alternative')
msg['From'] = from_addr
msg['To'] = to_addr
msg['Subject'] = subject
# 添加文本内容
if body_text:
msg.attach(MIMEText(body_text, 'plain'))
# 添加HTML内容
if body_html:
msg.attach(MIMEText(body_html, 'html'))
# 连接SMTP服务器并发送
server = smtplib.SMTP(SMTP_HOST, SMTP_PORT)
server.set_debuglevel(1) # 开启调试
server.sendmail(from_addr, to_addr, msg.as_string())
server.quit()
logger.info(f"成功发送邮件: {from_addr} -> {to_addr}, 主题: {subject}")
return True
except Exception as e:
logger.error(f"发送邮件失败: {str(e)}")
return False
def test_create_multiple_domains():
"""测试创建多个域名"""
print_divider("测试创建多个域名")
# 创建3个不同的域名
domains = []
test_domains = [
{"name": "example1.com", "desc": "测试域名1"},
{"name": "example2.com", "desc": "测试域名2"},
{"name": "example3.com", "desc": "测试域名3"}
]
for domain_info in test_domains:
domain = create_domain(domain_info["name"], domain_info["desc"])
if domain:
domains.append(domain)
# 验证域名是否创建成功
all_domains = get_all_domains()
print(f"\n当前系统中共有 {len(all_domains)} 个域名:")
for domain in all_domains:
print(f" - {domain.get('name')} (ID: {domain.get('id')})")
return domains
def test_create_multiple_mailboxes(domains):
"""测试为每个域名创建多个邮箱"""
print_divider("测试创建多个邮箱")
mailboxes = []
# 为每个域名创建多个邮箱
for domain in domains:
domain_id = domain.get('id')
domain_name = domain.get('name')
print(f"\n为域名 {domain_name} 创建邮箱:")
# 为每个域名创建2个邮箱
users = ["user1", "user2"]
for user in users:
mailbox = create_mailbox(domain_id, user)
if mailbox:
mailboxes.append(mailbox)
print(f" - 创建了邮箱: {mailbox.get('full_address')} (ID: {mailbox.get('id')})")
# 验证邮箱是否创建成功
all_mailboxes = get_all_mailboxes()
print(f"\n当前系统中共有 {len(all_mailboxes)} 个邮箱:")
for mailbox in all_mailboxes:
print(f" - {mailbox.get('full_address')} (ID: {mailbox.get('id')})")
return mailboxes
def test_email_receiving(mailboxes):
"""测试邮件接收功能"""
print_divider("测试邮件接收功能")
if not mailboxes:
print("没有可用的邮箱,无法测试邮件接收功能")
return False
# 选择第一个邮箱作为发件人
sender_mailbox = mailboxes[0]
from_addr = sender_mailbox.get('full_address')
# 发送邮件到其他所有邮箱
for recipient_mailbox in mailboxes[1:]:
to_addr = recipient_mailbox.get('full_address')
subject = f"测试邮件 - 从 {from_addr} 发送到 {to_addr}"
body_text = f"""
你好!
这是一封测试邮件,发送时间:{time.strftime('%Y-%m-%d %H:%M:%S')}
此邮件用于测试邮件系统的接收功能。
来自: {from_addr}
"""
success = send_email(from_addr, to_addr, subject, body_text)
if success:
print(f"成功发送邮件: {from_addr} -> {to_addr}")
else:
print(f"发送邮件失败: {from_addr} -> {to_addr}")
# 等待邮件处理
print("\n等待5秒钟让系统处理邮件...")
time.sleep(5)
# 检查每个邮箱是否收到邮件
print("\n检查邮箱收件情况:")
for mailbox in mailboxes[1:]: # 跳过发件人邮箱
mailbox_id = mailbox.get('id')
mailbox_address = mailbox.get('full_address')
emails = get_mailbox_emails(mailbox_id)
if emails:
print(f" - 邮箱 {mailbox_address} 收到了 {len(emails)} 封邮件")
for email in emails:
print(f" * 邮件ID: {email.get('id')}, 主题: {email.get('subject')}")
print(f" 发件人: {email.get('sender')}, 时间: {email.get('received_at')}")
else:
print(f" - 邮箱 {mailbox_address} 没有收到任何邮件")
return True
def main():
try:
print_divider("邮件系统核心功能测试")
print("此脚本将测试以下功能:")
print(" 1. 创建多个不同的域名")
print(" 2. 为每个域名创建多个邮箱")
print(" 3. 测试邮件接收功能")
# 测试创建多个域名
domains = test_create_multiple_domains()
# 测试创建多个邮箱
if domains:
mailboxes = test_create_multiple_mailboxes(domains)
# 测试邮件接收功能
if mailboxes:
test_email_receiving(mailboxes)
else:
print("邮箱创建失败,无法测试邮件接收功能")
else:
print("域名创建失败,无法测试邮箱创建和邮件接收功能")
print_divider("测试完成")
except Exception as e:
print(f"测试过程中发生错误: {str(e)}")
import traceback
traceback.print_exc()
return 1
return 0
if __name__ == "__main__":
sys.exit(main())