first commit

This commit is contained in:
huangzhenpc
2025-02-26 18:29:10 +08:00
parent 5d21c9468c
commit a8d1b41381
38 changed files with 2878 additions and 0 deletions

148
old/app/models/email.py Normal file
View File

@@ -0,0 +1,148 @@
import os
import json
from sqlalchemy import Column, Integer, String, Text, DateTime, ForeignKey, Boolean, JSON
from sqlalchemy.orm import relationship
from datetime import datetime
import re
import sys
import logging
from . import Base
import config
active_config = config.active_config
class Email(Base):
"""电子邮件模型"""
__tablename__ = 'emails'
id = Column(Integer, primary_key=True)
mailbox_id = Column(Integer, ForeignKey('mailboxes.id'), nullable=False, index=True)
sender = Column(String(255), nullable=False)
recipients = Column(String(1000), nullable=False)
subject = Column(String(500), nullable=True)
body_text = Column(Text, nullable=True)
body_html = Column(Text, nullable=True)
received_at = Column(DateTime, default=datetime.utcnow)
read = Column(Boolean, default=False)
headers = Column(JSON, nullable=True)
# 提取的验证码和链接
verification_code = Column(String(100), nullable=True)
verification_link = Column(String(1000), nullable=True)
# 关联关系
mailbox = relationship("Mailbox", back_populates="emails")
attachments = relationship("Attachment", back_populates="email", cascade="all, delete-orphan")
def save_raw_email(self, raw_content):
"""保存原始邮件内容到文件"""
storage_path = active_config.MAIL_STORAGE_PATH
mailbox_dir = os.path.join(storage_path, str(self.mailbox_id))
os.makedirs(mailbox_dir, exist_ok=True)
# 保存原始邮件内容
file_path = os.path.join(mailbox_dir, f"{self.id}.eml")
with open(file_path, 'wb') as f:
f.write(raw_content)
def extract_verification_data(self):
"""
尝试从邮件内容中提取验证码和验证链接
这个方法会在邮件保存时自动调用
"""
logger = logging.getLogger(__name__)
# 合并文本和HTML内容用于搜索
content = f"{self.subject or ''} {self.body_text or ''} {self.body_html or ''}"
logger.info(f"开始提取邮件ID={self.id}的验证信息,内容长度={len(content)}")
# 首先检查是否是Cursor验证邮件
if "Verify your email" in self.subject and (
"cursor.sh" in self.sender.lower() or
"cursor" in self.sender.lower()
):
logger.info("检测到Cursor验证邮件")
# 从HTML中提取6位数字验证码
cursor_patterns = [
r'(\d{6})</div>', # 匹配Cursor邮件中的6位数字验证码格式
r'<div[^>]*>(\d{6})</div>', # 更宽松的匹配
r'>(\d{6})<', # 最简单的形式
r'(\d{6})' # 任何6位数字
]
for pattern in cursor_patterns:
matches = re.findall(pattern, content)
if matches:
self.verification_code = matches[0]
logger.info(f"从Cursor邮件中提取到验证码: {self.verification_code}")
break
return
# 提取可能的验证码4-8位数字或字母组合
code_patterns = [
r'\b([A-Z0-9]{4,8})\b', # 大写字母和数字
r'验证码[:]\s*([A-Z0-9]{4,8})', # 中文格式
r'验证码是[:]\s*([A-Z0-9]{4,8})', # 中文格式2
r'code[:]\s*([A-Z0-9]{4,8})', # 英文格式
r'code is[:]\s*([A-Z0-9]{4,8})', # 英文格式2
r'code[:]\s*<[^>]*>([A-Z0-9]{4,8})', # HTML格式
r'<div[^>]*>([0-9]{4,8})</div>', # HTML分隔的数字
r'<strong[^>]*>([A-Z0-9]{4,8})</strong>', # 粗体验证码
]
for pattern in code_patterns:
matches = re.findall(pattern, content, re.IGNORECASE)
if matches:
# 过滤掉明显不是验证码的结果
filtered_matches = [m for m in matches if len(m) >= 4 and not m.lower() in ['code', 'verify', 'http', 'https']]
if filtered_matches:
self.verification_code = filtered_matches[0]
logger.info(f"提取到验证码: {self.verification_code}")
break
# 提取验证链接
link_patterns = [
r'https?://\S+(?:verify|confirm|activate)\S+',
r'https?://\S+(?:token|auth|account)\S+',
]
for pattern in link_patterns:
matches = re.findall(pattern, content, re.IGNORECASE)
if matches:
self.verification_link = matches[0]
logger.info(f"提取到验证链接: {self.verification_link}")
break
# 如果没有找到验证码,但邮件主题暗示这是验证邮件
verify_subjects = ['verify', 'confirmation', 'activate', 'validation', '验证', '确认']
if not self.verification_code and any(subj in self.subject.lower() for subj in verify_subjects):
logger.info("根据主题判断这可能是验证邮件,但未能提取到验证码")
# 尝试从HTML中提取明显的数字序列
if self.body_html:
number_matches = re.findall(r'(\d{4,8})', self.body_html)
filtered_numbers = [n for n in number_matches if len(n) >= 4 and len(n) <= 8]
if filtered_numbers:
self.verification_code = filtered_numbers[0]
logger.info(f"从HTML中提取到可能的验证码: {self.verification_code}")
logger.info(f"验证信息提取完成: code={self.verification_code}, link={self.verification_link}")
def __repr__(self):
return f"<Email {self.id}: {self.subject}>"
def to_dict(self):
"""转换为字典用于API响应"""
return {
"id": self.id,
"mailbox_id": self.mailbox_id,
"sender": self.sender,
"recipients": self.recipients,
"subject": self.subject,
"received_at": self.received_at.isoformat() if self.received_at else None,
"read": self.read,
"verification_code": self.verification_code,
"verification_link": self.verification_link,
"has_attachments": len(self.attachments) > 0 if self.attachments else False
}