150 lines
6.4 KiB
Python
150 lines
6.4 KiB
Python
import os
|
||
import json
|
||
from sqlalchemy import Column, Integer, String, Text, DateTime, ForeignKey, Boolean, JSON
|
||
from sqlalchemy.orm import relationship
|
||
from datetime import datetime
|
||
import re
|
||
import sys
|
||
import logging
|
||
|
||
from . import Base
|
||
import config
|
||
active_config = config.active_config
|
||
|
||
|
||
class Email(Base):
|
||
"""电子邮件模型"""
|
||
__tablename__ = 'emails'
|
||
|
||
id = Column(Integer, primary_key=True)
|
||
mailbox_id = Column(Integer, ForeignKey('mailboxes.id'), nullable=False, index=True)
|
||
sender = Column(String(255), nullable=False)
|
||
recipients = Column(String(1000), nullable=False)
|
||
subject = Column(String(500), nullable=True)
|
||
body_text = Column(Text, nullable=True)
|
||
body_html = Column(Text, nullable=True)
|
||
received_at = Column(DateTime, default=datetime.utcnow)
|
||
read = Column(Boolean, default=False)
|
||
headers = Column(JSON, nullable=True)
|
||
|
||
# 提取的验证码和链接
|
||
verification_code = Column(String(100), nullable=True)
|
||
verification_link = Column(String(1000), nullable=True)
|
||
|
||
# 关联关系
|
||
mailbox = relationship("Mailbox", back_populates="emails")
|
||
attachments = relationship("Attachment", back_populates="email", cascade="all, delete-orphan")
|
||
|
||
def save_raw_email(self, raw_content):
|
||
"""保存原始邮件内容到文件"""
|
||
storage_path = active_config.MAIL_STORAGE_PATH
|
||
mailbox_dir = os.path.join(storage_path, str(self.mailbox_id))
|
||
os.makedirs(mailbox_dir, exist_ok=True)
|
||
|
||
# 保存原始邮件内容
|
||
file_path = os.path.join(mailbox_dir, f"{self.id}.eml")
|
||
with open(file_path, 'wb') as f:
|
||
f.write(raw_content)
|
||
|
||
def extract_verification_data(self):
|
||
"""
|
||
尝试从邮件内容中提取验证码和验证链接
|
||
这个方法会在邮件保存时自动调用
|
||
"""
|
||
logger = logging.getLogger(__name__)
|
||
|
||
# 合并文本和HTML内容用于搜索
|
||
content = f"{self.subject or ''} {self.body_text or ''} {self.body_html or ''}"
|
||
logger.info(f"开始提取邮件ID={self.id}的验证信息,内容长度={len(content)}")
|
||
|
||
# 首先检查是否是Cursor验证邮件
|
||
if "Verify your email" in self.subject and (
|
||
"cursor.sh" in self.sender.lower() or
|
||
"cursor" in self.sender.lower()
|
||
):
|
||
logger.info("检测到Cursor验证邮件")
|
||
# 针对Cursor验证邮件的特定验证码格式
|
||
import re
|
||
# 从HTML中提取6位数字验证码
|
||
cursor_patterns = [
|
||
r'(\d{6})</div>', # 匹配Cursor邮件中的6位数字验证码格式
|
||
r'<div[^>]*>(\d{6})</div>', # 更宽松的匹配
|
||
r'>(\d{6})<', # 最简单的形式
|
||
r'(\d{6})' # 任何6位数字
|
||
]
|
||
|
||
for pattern in cursor_patterns:
|
||
matches = re.findall(pattern, content)
|
||
if matches:
|
||
self.verification_code = matches[0]
|
||
logger.info(f"从Cursor邮件中提取到验证码: {self.verification_code}")
|
||
break
|
||
|
||
return
|
||
|
||
# 提取可能的验证码(4-8位数字或字母组合)
|
||
code_patterns = [
|
||
r'\b([A-Z0-9]{4,8})\b', # 大写字母和数字
|
||
r'验证码[::]\s*([A-Z0-9]{4,8})', # 中文格式
|
||
r'验证码是[::]\s*([A-Z0-9]{4,8})', # 中文格式2
|
||
r'code[::]\s*([A-Z0-9]{4,8})', # 英文格式
|
||
r'code is[::]\s*([A-Z0-9]{4,8})', # 英文格式2
|
||
r'code[::]\s*<[^>]*>([A-Z0-9]{4,8})', # HTML格式
|
||
r'<div[^>]*>([0-9]{4,8})</div>', # HTML分隔的数字
|
||
r'<strong[^>]*>([A-Z0-9]{4,8})</strong>', # 粗体验证码
|
||
]
|
||
|
||
for pattern in code_patterns:
|
||
matches = re.findall(pattern, content, re.IGNORECASE)
|
||
if matches:
|
||
# 过滤掉明显不是验证码的结果
|
||
filtered_matches = [m for m in matches if len(m) >= 4 and not m.lower() in ['code', 'verify', 'http', 'https']]
|
||
if filtered_matches:
|
||
self.verification_code = filtered_matches[0]
|
||
logger.info(f"提取到验证码: {self.verification_code}")
|
||
break
|
||
|
||
# 提取验证链接
|
||
link_patterns = [
|
||
r'https?://\S+(?:verify|confirm|activate)\S+',
|
||
r'https?://\S+(?:token|auth|account)\S+',
|
||
]
|
||
|
||
for pattern in link_patterns:
|
||
matches = re.findall(pattern, content, re.IGNORECASE)
|
||
if matches:
|
||
self.verification_link = matches[0]
|
||
logger.info(f"提取到验证链接: {self.verification_link}")
|
||
break
|
||
|
||
# 如果没有找到验证码,但邮件主题暗示这是验证邮件
|
||
verify_subjects = ['verify', 'confirmation', 'activate', 'validation', '验证', '确认']
|
||
if not self.verification_code and any(subj in self.subject.lower() for subj in verify_subjects):
|
||
logger.info("根据主题判断这可能是验证邮件,但未能提取到验证码")
|
||
# 尝试从HTML中提取明显的数字序列
|
||
if self.body_html:
|
||
number_matches = re.findall(r'(\d{4,8})', self.body_html)
|
||
filtered_numbers = [n for n in number_matches if len(n) >= 4 and len(n) <= 8]
|
||
if filtered_numbers:
|
||
self.verification_code = filtered_numbers[0]
|
||
logger.info(f"从HTML中提取到可能的验证码: {self.verification_code}")
|
||
|
||
logger.info(f"验证信息提取完成: code={self.verification_code}, link={self.verification_link}")
|
||
|
||
def __repr__(self):
|
||
return f"<Email {self.id}: {self.subject}>"
|
||
|
||
def to_dict(self):
|
||
"""转换为字典,用于API响应"""
|
||
return {
|
||
"id": self.id,
|
||
"mailbox_id": self.mailbox_id,
|
||
"sender": self.sender,
|
||
"recipients": self.recipients,
|
||
"subject": self.subject,
|
||
"received_at": self.received_at.isoformat() if self.received_at else None,
|
||
"read": self.read,
|
||
"verification_code": self.verification_code,
|
||
"verification_link": self.verification_link,
|
||
"has_attachments": len(self.attachments) > 0 if self.attachments else False
|
||
} |