Files
emailsystem/email_analyzer.py
2025-02-25 19:50:00 +08:00

298 lines
11 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python
# -*- coding: utf-8 -*-
import requests
import re
import json
import logging
from datetime import datetime
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
# 配置日志
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger('EmailAnalyzer')
class EmailAnalyzer:
"""邮件分析类,用于操作邮箱系统并分析邮件内容"""
def __init__(self, api_base_url="http://localhost:5000/api", smtp_host="localhost", smtp_port=3825):
"""初始化邮件分析器"""
self.api_base_url = api_base_url
self.smtp_host = smtp_host
self.smtp_port = smtp_port
logger.info(f"初始化邮件分析器: API={api_base_url}, SMTP={smtp_host}:{smtp_port}")
# API相关方法
def get_all_domains(self):
"""获取所有域名"""
try:
response = requests.get(f"{self.api_base_url}/domains")
if response.status_code == 200:
domains = response.json().get('domains', [])
logger.info(f"获取到 {len(domains)} 个域名")
return domains
else:
logger.error(f"获取域名失败: HTTP {response.status_code}")
return []
except Exception as e:
logger.error(f"获取域名出错: {str(e)}")
return []
def create_domain(self, domain_name, description=""):
"""创建新域名"""
try:
data = {
"name": domain_name,
"description": description
}
response = requests.post(f"{self.api_base_url}/domains", json=data)
if response.status_code in (200, 201):
result = response.json()
domain = result.get("domain", {})
logger.info(f"成功创建域名: {domain_name} (ID: {domain.get('id')})")
return domain
else:
logger.error(f"创建域名失败: HTTP {response.status_code} - {response.text}")
return None
except Exception as e:
logger.error(f"创建域名出错: {str(e)}")
return None
def get_all_mailboxes(self):
"""获取所有邮箱"""
try:
response = requests.get(f"{self.api_base_url}/mailboxes")
if response.status_code == 200:
mailboxes = response.json().get('mailboxes', [])
logger.info(f"获取到 {len(mailboxes)} 个邮箱")
return mailboxes
else:
logger.error(f"获取邮箱失败: HTTP {response.status_code}")
return []
except Exception as e:
logger.error(f"获取邮箱出错: {str(e)}")
return []
def create_mailbox(self, domain_id, address):
"""创建新邮箱"""
try:
data = {
"domain_id": domain_id,
"address": address
}
response = requests.post(f"{self.api_base_url}/mailboxes", json=data)
if response.status_code in (200, 201):
result = response.json()
mailbox = result.get("mailbox", {})
logger.info(f"成功创建邮箱: {mailbox.get('full_address')} (ID: {mailbox.get('id')})")
return mailbox
else:
logger.error(f"创建邮箱失败: HTTP {response.status_code} - {response.text}")
return None
except Exception as e:
logger.error(f"创建邮箱出错: {str(e)}")
return None
def get_mailbox_emails(self, mailbox_id):
"""获取指定邮箱的所有邮件"""
try:
response = requests.get(f"{self.api_base_url}/mailboxes/{mailbox_id}/emails")
if response.status_code == 200:
emails = response.json().get('emails', [])
logger.info(f"获取到邮箱ID={mailbox_id}{len(emails)} 封邮件")
return emails
else:
logger.error(f"获取邮件失败: HTTP {response.status_code}")
return []
except Exception as e:
logger.error(f"获取邮件出错: {str(e)}")
return []
def get_email_detail(self, email_id):
"""获取指定邮件的详细信息"""
try:
response = requests.get(f"{self.api_base_url}/emails/{email_id}")
if response.status_code == 200:
email = response.json().get('email', {})
logger.info(f"获取到邮件ID={email_id}的详细信息")
return email
else:
logger.error(f"获取邮件详情失败: HTTP {response.status_code}")
return {}
except Exception as e:
logger.error(f"获取邮件详情出错: {str(e)}")
return {}
# SMTP相关方法
def send_email(self, from_addr, to_addr, subject, body_text="", body_html=""):
"""发送邮件"""
try:
# 创建邮件
msg = MIMEMultipart('alternative')
msg['From'] = from_addr
msg['To'] = to_addr
msg['Subject'] = subject
# 添加文本内容
if body_text:
msg.attach(MIMEText(body_text, 'plain'))
# 添加HTML内容
if body_html:
msg.attach(MIMEText(body_html, 'html'))
# 连接SMTP服务器并发送
server = smtplib.SMTP(self.smtp_host, self.smtp_port)
server.set_debuglevel(1) # 开启调试
server.sendmail(from_addr, to_addr, msg.as_string())
server.quit()
logger.info(f"成功发送邮件: {from_addr} -> {to_addr}, 主题: {subject}")
return True
except Exception as e:
logger.error(f"发送邮件失败: {str(e)}")
return False
def send_verification_code(self, to_addr, code_length=6):
"""发送验证码邮件"""
# 生成随机验证码
import random
import string
code = ''.join(random.choices(string.digits, k=code_length))
# 邮件主题
subject = f'您的验证码 - {datetime.now().strftime("%Y-%m-%d %H:%M:%S")}'
# 邮件HTML内容
html = f'''
<html>
<body>
<div style="font-family: Arial, sans-serif; max-width: 600px; margin: 0 auto; padding: 20px; border: 1px solid #eee; border-radius: 5px;">
<h2 style="color: #333;">验证码</h2>
<p>您好!</p>
<p>您的验证码是:</p>
<div style="background-color: #f7f7f7; padding: 15px; font-size: 24px; font-weight: bold; text-align: center; letter-spacing: 5px; margin: 20px 0; border-radius: 4px;">
{code}
</div>
<p>此验证码将在30分钟内有效。</p>
<p>如果这不是您请求的,请忽略此邮件。</p>
<p>谢谢!</p>
<div style="margin-top: 30px; padding-top: 15px; border-top: 1px solid #eee; font-size: 12px; color: #999;">
此邮件由系统自动发送,请勿回复。
</div>
</div>
</body>
</html>
'''
# 发送邮件
result = self.send_email('noreply@system.com', to_addr, subject, body_html=html)
return result, code
# 邮件分析方法
def extract_verification_code(self, email):
"""从邮件中提取验证码"""
# 如果邮件对象已经包含验证码字段,直接返回
if email.get('verification_code'):
return email.get('verification_code')
# 尝试从HTML内容中提取
body_html = email.get('body_html', '')
if body_html:
# 尝试匹配验证码样式
code_match = re.search(r'letter-spacing: 5px[^>]*>([^<]+)<', body_html)
if code_match:
return code_match.group(1).strip()
# 尝试从文本内容中提取6位数字验证码
body_text = email.get('body_text', '')
if body_text:
code_match = re.search(r'\b(\d{6})\b', body_text)
if code_match:
return code_match.group(1)
# 无法提取到验证码
return None
def extract_verification_link(self, email):
"""从邮件中提取验证链接"""
# 如果邮件对象已经包含验证链接字段,直接返回
if email.get('verification_link'):
return email.get('verification_link')
# 尝试从HTML内容中提取链接
body_html = email.get('body_html', '')
if body_html:
link_match = re.search(r'href=[\'"]([^\'"]*verify[^\'"]*)[\'"]', body_html)
if link_match:
return link_match.group(1)
# 尝试从文本内容中提取链接
body_text = email.get('body_text', '')
if body_text:
link_match = re.search(r'https?://\S+?(?:verify|confirm|activate)\S+', body_text)
if link_match:
return link_match.group(0)
# 无法提取到验证链接
return None
# 使用示例
def main():
# 创建邮件分析器实例
analyzer = EmailAnalyzer()
# 获取所有域名
domains = analyzer.get_all_domains()
for domain in domains:
print(f"域名: {domain.get('name')} (ID: {domain.get('id')})")
# 如果没有域名,创建一个
if not domains:
domain = analyzer.create_domain("test-domain.com", "测试域名")
print(f"创建域名: {domain.get('name')} (ID: {domain.get('id')}")
else:
domain = domains[0]
# 获取所有邮箱
mailboxes = analyzer.get_all_mailboxes()
for mailbox in mailboxes:
print(f"邮箱: {mailbox.get('full_address')} (ID: {mailbox.get('id')})")
# 如果没有邮箱,创建一个
if not mailboxes:
mailbox = analyzer.create_mailbox(domain.get('id'), "test-user")
print(f"创建邮箱: {mailbox.get('full_address')} (ID: {mailbox.get('id')}")
else:
mailbox = mailboxes[0]
# 发送一封测试邮件
to_addr = mailbox.get('full_address')
sent, code = analyzer.send_verification_code(to_addr)
print(f"验证码邮件发送{'成功' if sent else '失败'}, 验证码: {code}")
# 等待2秒让邮件被处理
import time
time.sleep(2)
# 获取邮箱中的邮件
mailbox_id = mailbox.get('id')
emails = analyzer.get_mailbox_emails(mailbox_id)
print(f"获取到 {len(emails)} 封邮件")
# 分析最新的邮件
if emails:
latest_email = emails[0] # 假设按时间降序排列
print(f"最新邮件: {latest_email.get('subject')}")
verification_code = analyzer.extract_verification_code(latest_email)
print(f"提取到的验证码: {verification_code}")
verification_link = analyzer.extract_verification_link(latest_email)
print(f"提取到的验证链接: {verification_link}")
if __name__ == "__main__":
main()