#!/usr/bin/env python3 """ IMAP邮件客户端模块 处理IMAP连接和邮件获取操作 """ import asyncio import imaplib import email import logging import time from datetime import datetime from typing import Dict, List, Optional from email.header import decode_header from email import utils as email_utils from fastapi import HTTPException from config import IMAP_SERVER, IMAP_PORT, INBOX_FOLDER_NAME from auth import get_access_token logger = logging.getLogger(__name__) # ============================================================================ # 辅助函数 # ============================================================================ def decode_header_value(header_value): """解码邮件头部信息""" if header_value is None: return "" decoded_string = "" try: parts = decode_header(str(header_value)) for part, charset in parts: if isinstance(part, bytes): try: decoded_string += part.decode(charset if charset else 'utf-8', 'replace') except LookupError: decoded_string += part.decode('utf-8', 'replace') else: decoded_string += str(part) except Exception: if isinstance(header_value, str): return header_value try: return str(header_value, 'utf-8', 'replace') if isinstance(header_value, bytes) else str(header_value) except: return "[Header Decode Error]" return decoded_string # ============================================================================ # IMAP客户端类 # ============================================================================ class IMAPEmailClient: """IMAP邮件客户端(按需连接模式)""" def __init__(self, email: str, account_info: Dict): """初始化IMAP邮件客户端 Args: email: 邮箱地址 account_info: 包含refresh_token的账户信息 """ self.email = email self.refresh_token = account_info['refresh_token'] self.client_id = account_info.get('client_id', '') self.access_token = '' self.expires_at = 0 # Token管理锁 self._token_lock = asyncio.Lock() logger.debug(f"IMAPEmailClient初始化 ({email}),采用按需连接策略") def is_token_expired(self) -> bool: """检查access token是否过期或即将过期""" buffer_time = 300 # 5分钟缓冲时间 return datetime.now().timestamp() + buffer_time >= self.expires_at async def ensure_token_valid(self): """确保token有效(异步版本,带并发控制)""" async with self._token_lock: if not self.access_token or self.is_token_expired(): logger.info(f"{self.email} access token已过期或不存在,需要刷新") await self.refresh_access_token() async def refresh_access_token(self) -> None: """刷新访问令牌""" try: logger.info(f"🔑 正在刷新 {self.email} 的访问令牌...") access_token = await get_access_token(self.refresh_token, client_id=self.client_id) if access_token: self.access_token = access_token self.expires_at = time.time() + 3600 # 默认1小时过期 expires_at_str = datetime.fromtimestamp(self.expires_at).strftime('%Y-%m-%d %H:%M:%S') logger.info(f"✓ Token刷新成功(有效期至: {expires_at_str})") else: raise HTTPException(status_code=401, detail="Failed to refresh access token") except Exception as e: logger.error(f"✗ Token刷新失败 {self.email}: {e}") raise async def create_imap_connection(self, mailbox_to_select=INBOX_FOLDER_NAME): """创建IMAP连接(按需创建,带超时和重试)""" await self.ensure_token_valid() max_retries = 3 for attempt in range(max_retries): try: if attempt > 0: logger.info(f"🔄 重试连接 IMAP (第{attempt+1}次)") def _sync_connect(): imap_conn = imaplib.IMAP4_SSL(IMAP_SERVER, IMAP_PORT) auth_string = f"user={self.email}\1auth=Bearer {self.access_token}\1\1" typ, data = imap_conn.authenticate('XOAUTH2', lambda x: auth_string.encode('utf-8')) if typ == 'OK': stat_select, data_select = imap_conn.select(mailbox_to_select, readonly=True) if stat_select == 'OK': return imap_conn else: error_msg = data_select[0].decode('utf-8', 'replace') if data_select and data_select[0] else "未知错误" raise Exception(f"选择邮箱 '{mailbox_to_select}' 失败: {error_msg}") else: error_message = data[0].decode('utf-8', 'replace') if data and data[0] else "未知认证错误" raise Exception(f"IMAP XOAUTH2 认证失败: {error_message} (Type: {typ})") # 在线程池中执行,带10秒超时 imap_conn = await asyncio.wait_for( asyncio.to_thread(_sync_connect), timeout=10.0 ) logger.info(f"🔌 IMAP连接已建立 → {mailbox_to_select}") return imap_conn except asyncio.TimeoutError: logger.error(f"创建IMAP连接超时 ({self.email}), 第{attempt+1}次尝试") if attempt < max_retries - 1: await asyncio.sleep(1) continue except Exception as e: logger.error(f"创建IMAP连接失败 ({self.email}), 第{attempt+1}次尝试: {e}") if attempt < max_retries - 1: await asyncio.sleep(1) continue logger.error(f"经过{max_retries}次尝试,仍无法创建IMAP连接 ({self.email})") raise HTTPException(status_code=500, detail=f"Failed to connect to IMAP server for {self.email}") def close_imap_connection(self, imap_conn): """安全关闭IMAP连接""" if imap_conn: try: current_state = getattr(imap_conn, 'state', None) try: if current_state == 'SELECTED': imap_conn.close() except Exception as e: logger.debug(f"关闭邮箱时出现预期错误: {e}") try: if current_state != 'LOGOUT': imap_conn.logout() except Exception as e: logger.debug(f"登出时出现预期错误: {e}") logger.info(f"🔌 IMAP连接已关闭") except Exception as e: logger.debug(f"关闭IMAP连接时发生预期错误: {e}") async def get_messages_with_content(self, folder_id: str = INBOX_FOLDER_NAME, top: int = 5) -> List[Dict]: """获取指定文件夹的邮件(一次性获取完整内容,包括正文) 优化点: - 一次性获取邮件的完整内容(头部+正文) - 前端可以缓存这些数据,查看详情时无需再次请求 Args: folder_id: 文件夹ID, 默认为'INBOX' top: 获取的邮件数量 """ import time start_time = time.time() logger.info(f"📧 开始获取 {self.email} 的邮件(文件夹: {folder_id}, 请求数量: {top})") imap_conn = None try: imap_conn = await self.create_imap_connection(folder_id) def _sync_get_messages_full(): # 快速扫描邮件UID列表(毫秒级操作) scan_start = time.time() typ, uid_data = imap_conn.uid('search', None, "ALL") if typ != 'OK': raise Exception(f"在 '{folder_id}' 中搜索邮件失败 (status: {typ})。") if not uid_data[0]: return [] uids = uid_data[0].split() scan_time = (time.time() - scan_start) * 1000 logger.info(f"📋 扫描完成: 共 {len(uids)} 封邮件 (耗时: {scan_time:.0f}ms)") # 只获取最新的top条邮件 uids = uids[-top:] if len(uids) > top else uids uids.reverse() # 最新的在前 fetch_start = time.time() logger.info(f"📥 开始获取 {len(uids)} 封邮件的完整内容(包含正文和附件)...") messages = [] for idx, uid_bytes in enumerate(uids, 1): try: # 一次性获取完整邮件内容(RFC822) typ, msg_data = imap_conn.uid('fetch', uid_bytes, '(RFC822)') if typ == 'OK' and msg_data and msg_data[0] is not None: raw_email_bytes = None if isinstance(msg_data[0], tuple) and len(msg_data[0]) == 2: raw_email_bytes = msg_data[0][1] if raw_email_bytes: email_message = email.message_from_bytes(raw_email_bytes) # 解析头部信息 subject = decode_header_value(email_message['Subject']) or "(No Subject)" from_str = decode_header_value(email_message['From']) or "(Unknown Sender)" to_str = decode_header_value(email_message['To']) or "" date_str = email_message['Date'] or "(Unknown Date)" # 解析From字段 from_name = "(Unknown)" from_email = "" if '<' in from_str and '>' in from_str: from_name = from_str.split('<')[0].strip().strip('"') from_email = from_str.split('<')[1].split('>')[0].strip() else: from_email = from_str.strip() if '@' in from_email: from_name = from_email.split('@')[0] # 解析日期 try: dt_obj = email_utils.parsedate_to_datetime(date_str) if dt_obj: date_str = dt_obj.strftime('%Y-%m-%d %H:%M:%S') except Exception: date_str = date_str[:25] if len(date_str) > 25 else date_str # 解析邮件正文(优先HTML) body_content = "" body_type = "text" body_preview = "" if email_message.is_multipart(): html_content = None text_content = None for part in email_message.walk(): content_type = part.get_content_type() content_disposition = str(part.get("Content-Disposition")) if 'attachment' not in content_disposition.lower(): try: charset = part.get_content_charset() or 'utf-8' payload = part.get_payload(decode=True) if content_type == 'text/html' and not html_content: html_content = payload.decode(charset, errors='replace') elif content_type == 'text/plain' and not text_content: text_content = payload.decode(charset, errors='replace') except Exception: continue # 优先使用HTML内容 if html_content: body_content = html_content body_type = "html" # 生成预览文本(移除HTML标签) import re body_preview = re.sub('<[^<]+?>', '', html_content)[:150] elif text_content: body_content = text_content body_type = "text" body_preview = text_content[:150] else: body_content = "[未找到可读的邮件内容]" body_preview = "[未找到可读的邮件内容]" else: # 非多部分邮件 try: charset = email_message.get_content_charset() or 'utf-8' payload = email_message.get_payload(decode=True) body_content = payload.decode(charset, errors='replace') # 检查是否为HTML内容 if '', '', body_content)[:150] else: body_preview = body_content[:150] except Exception: body_content = "[Failed to decode email body]" body_preview = "[Failed to decode email body]" if not body_content: body_content = "[未找到可读的文本内容]" body_preview = "[未找到可读的文本内容]" # 构建完整邮件信息(包含正文) message = { 'id': uid_bytes.decode('utf-8'), 'subject': subject, 'receivedDateTime': date_str, 'sender': { 'emailAddress': { 'address': from_email, 'name': from_name } }, 'from': { 'emailAddress': { 'address': from_email, 'name': from_name } }, 'toRecipients': [{'emailAddress': {'address': to_str, 'name': to_str}}] if to_str else [], 'body': { 'content': body_content, 'contentType': body_type }, 'bodyPreview': body_preview } messages.append(message) except Exception as e: logger.error(f" ✗ 处理邮件UID {uid_bytes}时出错: {e}") continue fetch_time = (time.time() - fetch_start) * 1000 logger.info(f"📬 内容获取完成: {len(messages)} 封邮件 (耗时: {fetch_time:.0f}ms, 平均: {fetch_time/len(messages) if messages else 0:.0f}ms/封)") return messages # 在线程池中执行同步IMAP操作 messages = await asyncio.to_thread(_sync_get_messages_full) total_time = (time.time() - start_time) * 1000 logger.info(f"✅ 完成!总耗时: {total_time:.0f}ms | 获取 {len(messages)} 封完整邮件(已包含正文,前端可缓存)") return messages except asyncio.CancelledError: logger.warning(f"获取邮件操作被取消 ({self.email})") raise except Exception as e: logger.error(f"获取邮件失败 {self.email}: {e}") raise HTTPException(status_code=500, detail="Failed to retrieve emails") finally: if imap_conn: self.close_imap_connection(imap_conn) async def cleanup(self): """清理资源""" logger.debug(f"IMAPEmailClient清理完成 ({self.email})")