#!/usr/bin/env python # -*- coding: utf-8 -*- import requests import re import json import logging from datetime import datetime import smtplib from email.mime.text import MIMEText from email.mime.multipart import MIMEMultipart # 配置日志 logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') logger = logging.getLogger('EmailAnalyzer') class EmailAnalyzer: """邮件分析类,用于操作邮箱系统并分析邮件内容""" def __init__(self, api_base_url="http://localhost:5000/api", smtp_host="localhost", smtp_port=3825): """初始化邮件分析器""" self.api_base_url = api_base_url self.smtp_host = smtp_host self.smtp_port = smtp_port logger.info(f"初始化邮件分析器: API={api_base_url}, SMTP={smtp_host}:{smtp_port}") # API相关方法 def get_all_domains(self): """获取所有域名""" try: response = requests.get(f"{self.api_base_url}/domains") if response.status_code == 200: domains = response.json().get('domains', []) logger.info(f"获取到 {len(domains)} 个域名") return domains else: logger.error(f"获取域名失败: HTTP {response.status_code}") return [] except Exception as e: logger.error(f"获取域名出错: {str(e)}") return [] def create_domain(self, domain_name, description=""): """创建新域名""" try: data = { "name": domain_name, "description": description } response = requests.post(f"{self.api_base_url}/domains", json=data) if response.status_code in (200, 201): result = response.json() domain = result.get("domain", {}) logger.info(f"成功创建域名: {domain_name} (ID: {domain.get('id')})") return domain else: logger.error(f"创建域名失败: HTTP {response.status_code} - {response.text}") return None except Exception as e: logger.error(f"创建域名出错: {str(e)}") return None def get_all_mailboxes(self): """获取所有邮箱""" try: response = requests.get(f"{self.api_base_url}/mailboxes") if response.status_code == 200: mailboxes = response.json().get('mailboxes', []) logger.info(f"获取到 {len(mailboxes)} 个邮箱") return mailboxes else: logger.error(f"获取邮箱失败: HTTP {response.status_code}") return [] except Exception as e: logger.error(f"获取邮箱出错: {str(e)}") return [] def create_mailbox(self, domain_id, address): """创建新邮箱""" try: data = { "domain_id": domain_id, "address": address } response = requests.post(f"{self.api_base_url}/mailboxes", json=data) if response.status_code in (200, 201): result = response.json() mailbox = result.get("mailbox", {}) logger.info(f"成功创建邮箱: {mailbox.get('full_address')} (ID: {mailbox.get('id')})") return mailbox else: logger.error(f"创建邮箱失败: HTTP {response.status_code} - {response.text}") return None except Exception as e: logger.error(f"创建邮箱出错: {str(e)}") return None def get_mailbox_emails(self, mailbox_id): """获取指定邮箱的所有邮件""" try: response = requests.get(f"{self.api_base_url}/mailboxes/{mailbox_id}/emails") if response.status_code == 200: emails = response.json().get('emails', []) logger.info(f"获取到邮箱ID={mailbox_id}的 {len(emails)} 封邮件") return emails else: logger.error(f"获取邮件失败: HTTP {response.status_code}") return [] except Exception as e: logger.error(f"获取邮件出错: {str(e)}") return [] def get_email_detail(self, email_id): """获取指定邮件的详细信息""" try: response = requests.get(f"{self.api_base_url}/emails/{email_id}") if response.status_code == 200: email = response.json().get('email', {}) logger.info(f"获取到邮件ID={email_id}的详细信息") return email else: logger.error(f"获取邮件详情失败: HTTP {response.status_code}") return {} except Exception as e: logger.error(f"获取邮件详情出错: {str(e)}") return {} # SMTP相关方法 def send_email(self, from_addr, to_addr, subject, body_text="", body_html=""): """发送邮件""" try: # 创建邮件 msg = MIMEMultipart('alternative') msg['From'] = from_addr msg['To'] = to_addr msg['Subject'] = subject # 添加文本内容 if body_text: msg.attach(MIMEText(body_text, 'plain')) # 添加HTML内容 if body_html: msg.attach(MIMEText(body_html, 'html')) # 连接SMTP服务器并发送 server = smtplib.SMTP(self.smtp_host, self.smtp_port) server.set_debuglevel(1) # 开启调试 server.sendmail(from_addr, to_addr, msg.as_string()) server.quit() logger.info(f"成功发送邮件: {from_addr} -> {to_addr}, 主题: {subject}") return True except Exception as e: logger.error(f"发送邮件失败: {str(e)}") return False def send_verification_code(self, to_addr, code_length=6): """发送验证码邮件""" # 生成随机验证码 import random import string code = ''.join(random.choices(string.digits, k=code_length)) # 邮件主题 subject = f'您的验证码 - {datetime.now().strftime("%Y-%m-%d %H:%M:%S")}' # 邮件HTML内容 html = f'''

验证码

您好!

您的验证码是:

{code}

此验证码将在30分钟内有效。

如果这不是您请求的,请忽略此邮件。

谢谢!

此邮件由系统自动发送,请勿回复。
''' # 发送邮件 result = self.send_email('noreply@system.com', to_addr, subject, body_html=html) return result, code # 邮件分析方法 def extract_verification_code(self, email): """从邮件中提取验证码""" # 如果邮件对象已经包含验证码字段,直接返回 if email.get('verification_code'): return email.get('verification_code') # 尝试从HTML内容中提取 body_html = email.get('body_html', '') if body_html: # 尝试匹配验证码样式 code_match = re.search(r'letter-spacing: 5px[^>]*>([^<]+)<', body_html) if code_match: return code_match.group(1).strip() # 尝试从文本内容中提取6位数字验证码 body_text = email.get('body_text', '') if body_text: code_match = re.search(r'\b(\d{6})\b', body_text) if code_match: return code_match.group(1) # 无法提取到验证码 return None def extract_verification_link(self, email): """从邮件中提取验证链接""" # 如果邮件对象已经包含验证链接字段,直接返回 if email.get('verification_link'): return email.get('verification_link') # 尝试从HTML内容中提取链接 body_html = email.get('body_html', '') if body_html: link_match = re.search(r'href=[\'"]([^\'"]*verify[^\'"]*)[\'"]', body_html) if link_match: return link_match.group(1) # 尝试从文本内容中提取链接 body_text = email.get('body_text', '') if body_text: link_match = re.search(r'https?://\S+?(?:verify|confirm|activate)\S+', body_text) if link_match: return link_match.group(0) # 无法提取到验证链接 return None # 使用示例 def main(): # 创建邮件分析器实例 analyzer = EmailAnalyzer() # 获取所有域名 domains = analyzer.get_all_domains() for domain in domains: print(f"域名: {domain.get('name')} (ID: {domain.get('id')})") # 如果没有域名,创建一个 if not domains: domain = analyzer.create_domain("test-domain.com", "测试域名") print(f"创建域名: {domain.get('name')} (ID: {domain.get('id')}") else: domain = domains[0] # 获取所有邮箱 mailboxes = analyzer.get_all_mailboxes() for mailbox in mailboxes: print(f"邮箱: {mailbox.get('full_address')} (ID: {mailbox.get('id')})") # 如果没有邮箱,创建一个 if not mailboxes: mailbox = analyzer.create_mailbox(domain.get('id'), "test-user") print(f"创建邮箱: {mailbox.get('full_address')} (ID: {mailbox.get('id')}") else: mailbox = mailboxes[0] # 发送一封测试邮件 to_addr = mailbox.get('full_address') sent, code = analyzer.send_verification_code(to_addr) print(f"验证码邮件发送{'成功' if sent else '失败'}, 验证码: {code}") # 等待2秒,让邮件被处理 import time time.sleep(2) # 获取邮箱中的邮件 mailbox_id = mailbox.get('id') emails = analyzer.get_mailbox_emails(mailbox_id) print(f"获取到 {len(emails)} 封邮件") # 分析最新的邮件 if emails: latest_email = emails[0] # 假设按时间降序排列 print(f"最新邮件: {latest_email.get('subject')}") verification_code = analyzer.extract_verification_code(latest_email) print(f"提取到的验证码: {verification_code}") verification_link = analyzer.extract_verification_link(latest_email) print(f"提取到的验证链接: {verification_link}") if __name__ == "__main__": main()