298 lines
11 KiB
Python
298 lines
11 KiB
Python
#!/usr/bin/env python
|
||
# -*- coding: utf-8 -*-
|
||
|
||
import requests
|
||
import re
|
||
import json
|
||
import logging
|
||
from datetime import datetime
|
||
import smtplib
|
||
from email.mime.text import MIMEText
|
||
from email.mime.multipart import MIMEMultipart
|
||
|
||
# 配置日志
|
||
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
|
||
logger = logging.getLogger('EmailAnalyzer')
|
||
|
||
class EmailAnalyzer:
|
||
"""邮件分析类,用于操作邮箱系统并分析邮件内容"""
|
||
|
||
def __init__(self, api_base_url="http://localhost:5000/api", smtp_host="localhost", smtp_port=3825):
|
||
"""初始化邮件分析器"""
|
||
self.api_base_url = api_base_url
|
||
self.smtp_host = smtp_host
|
||
self.smtp_port = smtp_port
|
||
logger.info(f"初始化邮件分析器: API={api_base_url}, SMTP={smtp_host}:{smtp_port}")
|
||
|
||
# API相关方法
|
||
def get_all_domains(self):
|
||
"""获取所有域名"""
|
||
try:
|
||
response = requests.get(f"{self.api_base_url}/domains")
|
||
if response.status_code == 200:
|
||
domains = response.json().get('domains', [])
|
||
logger.info(f"获取到 {len(domains)} 个域名")
|
||
return domains
|
||
else:
|
||
logger.error(f"获取域名失败: HTTP {response.status_code}")
|
||
return []
|
||
except Exception as e:
|
||
logger.error(f"获取域名出错: {str(e)}")
|
||
return []
|
||
|
||
def create_domain(self, domain_name, description=""):
|
||
"""创建新域名"""
|
||
try:
|
||
data = {
|
||
"name": domain_name,
|
||
"description": description
|
||
}
|
||
response = requests.post(f"{self.api_base_url}/domains", json=data)
|
||
if response.status_code in (200, 201):
|
||
result = response.json()
|
||
domain = result.get("domain", {})
|
||
logger.info(f"成功创建域名: {domain_name} (ID: {domain.get('id')})")
|
||
return domain
|
||
else:
|
||
logger.error(f"创建域名失败: HTTP {response.status_code} - {response.text}")
|
||
return None
|
||
except Exception as e:
|
||
logger.error(f"创建域名出错: {str(e)}")
|
||
return None
|
||
|
||
def get_all_mailboxes(self):
|
||
"""获取所有邮箱"""
|
||
try:
|
||
response = requests.get(f"{self.api_base_url}/mailboxes")
|
||
if response.status_code == 200:
|
||
mailboxes = response.json().get('mailboxes', [])
|
||
logger.info(f"获取到 {len(mailboxes)} 个邮箱")
|
||
return mailboxes
|
||
else:
|
||
logger.error(f"获取邮箱失败: HTTP {response.status_code}")
|
||
return []
|
||
except Exception as e:
|
||
logger.error(f"获取邮箱出错: {str(e)}")
|
||
return []
|
||
|
||
def create_mailbox(self, domain_id, address):
|
||
"""创建新邮箱"""
|
||
try:
|
||
data = {
|
||
"domain_id": domain_id,
|
||
"address": address
|
||
}
|
||
response = requests.post(f"{self.api_base_url}/mailboxes", json=data)
|
||
if response.status_code in (200, 201):
|
||
result = response.json()
|
||
mailbox = result.get("mailbox", {})
|
||
logger.info(f"成功创建邮箱: {mailbox.get('full_address')} (ID: {mailbox.get('id')})")
|
||
return mailbox
|
||
else:
|
||
logger.error(f"创建邮箱失败: HTTP {response.status_code} - {response.text}")
|
||
return None
|
||
except Exception as e:
|
||
logger.error(f"创建邮箱出错: {str(e)}")
|
||
return None
|
||
|
||
def get_mailbox_emails(self, mailbox_id):
|
||
"""获取指定邮箱的所有邮件"""
|
||
try:
|
||
response = requests.get(f"{self.api_base_url}/mailboxes/{mailbox_id}/emails")
|
||
if response.status_code == 200:
|
||
emails = response.json().get('emails', [])
|
||
logger.info(f"获取到邮箱ID={mailbox_id}的 {len(emails)} 封邮件")
|
||
return emails
|
||
else:
|
||
logger.error(f"获取邮件失败: HTTP {response.status_code}")
|
||
return []
|
||
except Exception as e:
|
||
logger.error(f"获取邮件出错: {str(e)}")
|
||
return []
|
||
|
||
def get_email_detail(self, email_id):
|
||
"""获取指定邮件的详细信息"""
|
||
try:
|
||
response = requests.get(f"{self.api_base_url}/emails/{email_id}")
|
||
if response.status_code == 200:
|
||
email = response.json().get('email', {})
|
||
logger.info(f"获取到邮件ID={email_id}的详细信息")
|
||
return email
|
||
else:
|
||
logger.error(f"获取邮件详情失败: HTTP {response.status_code}")
|
||
return {}
|
||
except Exception as e:
|
||
logger.error(f"获取邮件详情出错: {str(e)}")
|
||
return {}
|
||
|
||
# SMTP相关方法
|
||
def send_email(self, from_addr, to_addr, subject, body_text="", body_html=""):
|
||
"""发送邮件"""
|
||
try:
|
||
# 创建邮件
|
||
msg = MIMEMultipart('alternative')
|
||
msg['From'] = from_addr
|
||
msg['To'] = to_addr
|
||
msg['Subject'] = subject
|
||
|
||
# 添加文本内容
|
||
if body_text:
|
||
msg.attach(MIMEText(body_text, 'plain'))
|
||
|
||
# 添加HTML内容
|
||
if body_html:
|
||
msg.attach(MIMEText(body_html, 'html'))
|
||
|
||
# 连接SMTP服务器并发送
|
||
server = smtplib.SMTP(self.smtp_host, self.smtp_port)
|
||
server.set_debuglevel(1) # 开启调试
|
||
server.sendmail(from_addr, to_addr, msg.as_string())
|
||
server.quit()
|
||
|
||
logger.info(f"成功发送邮件: {from_addr} -> {to_addr}, 主题: {subject}")
|
||
return True
|
||
except Exception as e:
|
||
logger.error(f"发送邮件失败: {str(e)}")
|
||
return False
|
||
|
||
def send_verification_code(self, to_addr, code_length=6):
|
||
"""发送验证码邮件"""
|
||
# 生成随机验证码
|
||
import random
|
||
import string
|
||
code = ''.join(random.choices(string.digits, k=code_length))
|
||
|
||
# 邮件主题
|
||
subject = f'您的验证码 - {datetime.now().strftime("%Y-%m-%d %H:%M:%S")}'
|
||
|
||
# 邮件HTML内容
|
||
html = f'''
|
||
<html>
|
||
<body>
|
||
<div style="font-family: Arial, sans-serif; max-width: 600px; margin: 0 auto; padding: 20px; border: 1px solid #eee; border-radius: 5px;">
|
||
<h2 style="color: #333;">验证码</h2>
|
||
<p>您好!</p>
|
||
<p>您的验证码是:</p>
|
||
<div style="background-color: #f7f7f7; padding: 15px; font-size: 24px; font-weight: bold; text-align: center; letter-spacing: 5px; margin: 20px 0; border-radius: 4px;">
|
||
{code}
|
||
</div>
|
||
<p>此验证码将在30分钟内有效。</p>
|
||
<p>如果这不是您请求的,请忽略此邮件。</p>
|
||
<p>谢谢!</p>
|
||
<div style="margin-top: 30px; padding-top: 15px; border-top: 1px solid #eee; font-size: 12px; color: #999;">
|
||
此邮件由系统自动发送,请勿回复。
|
||
</div>
|
||
</div>
|
||
</body>
|
||
</html>
|
||
'''
|
||
|
||
# 发送邮件
|
||
result = self.send_email('noreply@system.com', to_addr, subject, body_html=html)
|
||
|
||
return result, code
|
||
|
||
# 邮件分析方法
|
||
def extract_verification_code(self, email):
|
||
"""从邮件中提取验证码"""
|
||
# 如果邮件对象已经包含验证码字段,直接返回
|
||
if email.get('verification_code'):
|
||
return email.get('verification_code')
|
||
|
||
# 尝试从HTML内容中提取
|
||
body_html = email.get('body_html', '')
|
||
if body_html:
|
||
# 尝试匹配验证码样式
|
||
code_match = re.search(r'letter-spacing: 5px[^>]*>([^<]+)<', body_html)
|
||
if code_match:
|
||
return code_match.group(1).strip()
|
||
|
||
# 尝试从文本内容中提取6位数字验证码
|
||
body_text = email.get('body_text', '')
|
||
if body_text:
|
||
code_match = re.search(r'\b(\d{6})\b', body_text)
|
||
if code_match:
|
||
return code_match.group(1)
|
||
|
||
# 无法提取到验证码
|
||
return None
|
||
|
||
def extract_verification_link(self, email):
|
||
"""从邮件中提取验证链接"""
|
||
# 如果邮件对象已经包含验证链接字段,直接返回
|
||
if email.get('verification_link'):
|
||
return email.get('verification_link')
|
||
|
||
# 尝试从HTML内容中提取链接
|
||
body_html = email.get('body_html', '')
|
||
if body_html:
|
||
link_match = re.search(r'href=[\'"]([^\'"]*verify[^\'"]*)[\'"]', body_html)
|
||
if link_match:
|
||
return link_match.group(1)
|
||
|
||
# 尝试从文本内容中提取链接
|
||
body_text = email.get('body_text', '')
|
||
if body_text:
|
||
link_match = re.search(r'https?://\S+?(?:verify|confirm|activate)\S+', body_text)
|
||
if link_match:
|
||
return link_match.group(0)
|
||
|
||
# 无法提取到验证链接
|
||
return None
|
||
|
||
# 使用示例
|
||
def main():
|
||
# 创建邮件分析器实例
|
||
analyzer = EmailAnalyzer()
|
||
|
||
# 获取所有域名
|
||
domains = analyzer.get_all_domains()
|
||
for domain in domains:
|
||
print(f"域名: {domain.get('name')} (ID: {domain.get('id')})")
|
||
|
||
# 如果没有域名,创建一个
|
||
if not domains:
|
||
domain = analyzer.create_domain("test-domain.com", "测试域名")
|
||
print(f"创建域名: {domain.get('name')} (ID: {domain.get('id')}")
|
||
else:
|
||
domain = domains[0]
|
||
|
||
# 获取所有邮箱
|
||
mailboxes = analyzer.get_all_mailboxes()
|
||
for mailbox in mailboxes:
|
||
print(f"邮箱: {mailbox.get('full_address')} (ID: {mailbox.get('id')})")
|
||
|
||
# 如果没有邮箱,创建一个
|
||
if not mailboxes:
|
||
mailbox = analyzer.create_mailbox(domain.get('id'), "test-user")
|
||
print(f"创建邮箱: {mailbox.get('full_address')} (ID: {mailbox.get('id')}")
|
||
else:
|
||
mailbox = mailboxes[0]
|
||
|
||
# 发送一封测试邮件
|
||
to_addr = mailbox.get('full_address')
|
||
sent, code = analyzer.send_verification_code(to_addr)
|
||
print(f"验证码邮件发送{'成功' if sent else '失败'}, 验证码: {code}")
|
||
|
||
# 等待2秒,让邮件被处理
|
||
import time
|
||
time.sleep(2)
|
||
|
||
# 获取邮箱中的邮件
|
||
mailbox_id = mailbox.get('id')
|
||
emails = analyzer.get_mailbox_emails(mailbox_id)
|
||
print(f"获取到 {len(emails)} 封邮件")
|
||
|
||
# 分析最新的邮件
|
||
if emails:
|
||
latest_email = emails[0] # 假设按时间降序排列
|
||
print(f"最新邮件: {latest_email.get('subject')}")
|
||
|
||
verification_code = analyzer.extract_verification_code(latest_email)
|
||
print(f"提取到的验证码: {verification_code}")
|
||
|
||
verification_link = analyzer.extract_verification_link(latest_email)
|
||
print(f"提取到的验证链接: {verification_link}")
|
||
|
||
if __name__ == "__main__":
|
||
main() |