#!/usr/bin/env python3 """ Redis缓存模块 用于缓存账户列表、邮件数据等,加速页面加载 """ import json import logging import os import uuid from datetime import datetime from typing import Any, Optional import redis.asyncio as redis logger = logging.getLogger(__name__) REDIS_URL = 'redis://:redis_XMiXNa@127.0.0.1:6379/0' # 缓存过期时间(秒) TTL_ACCOUNTS = 300 # 账户列表 5分钟 TTL_MESSAGES = 180 # 邮件列表 3分钟 TTL_PAYMENT = 600 # 支付状态 10分钟 TTL_NOTIFICATIONS = 604800 # 通知 7天 NOTIFICATIONS_KEY = 'notifications:unread' class RedisCache: """Redis缓存管理器""" def __init__(self): self._redis: Optional[redis.Redis] = None async def connect(self): try: self._redis = redis.from_url(REDIS_URL, decode_responses=True) await self._redis.ping() logger.info(f"Redis连接成功: {REDIS_URL}") except Exception as e: logger.warning(f"Redis连接失败,将跳过缓存: {e}") self._redis = None async def close(self): if self._redis: await self._redis.aclose() @property def available(self) -> bool: return self._redis is not None async def get(self, key: str) -> Optional[Any]: if not self._redis: return None try: val = await self._redis.get(key) return json.loads(val) if val else None except Exception as e: logger.debug(f"Redis get 失败 [{key}]: {e}") return None async def set(self, key: str, value: Any, ttl: int = 300): if not self._redis: return try: await self._redis.set(key, json.dumps(value, ensure_ascii=False), ex=ttl) except Exception as e: logger.debug(f"Redis set 失败 [{key}]: {e}") async def delete(self, key: str): if not self._redis: return try: await self._redis.delete(key) except Exception: pass async def delete_pattern(self, pattern: str): """删除匹配模式的所有key""" if not self._redis: return try: async for key in self._redis.scan_iter(match=pattern, count=100): await self._redis.delete(key) except Exception: pass # ---- 业务快捷方法 ---- def accounts_key(self) -> str: return "cache:accounts" def messages_key(self, email: str, folder: str) -> str: return f"cache:messages:{email}:{folder}" def payment_key(self) -> str: return "cache:payment_status" async def invalidate_accounts(self): await self.delete(self.accounts_key()) async def invalidate_messages(self, email: str): await self.delete_pattern(f"cache:messages:{email}:*") async def invalidate_payment(self): await self.delete(self.payment_key()) # ---- 通知系统 ---- async def add_notification(self, email: str, ntype: str, message: str): """添加一条通知(LPUSH + EXPIRE)""" if not self._redis: return try: notif = json.dumps({ 'id': str(uuid.uuid4()), 'email': email, 'type': ntype, 'message': message, 'created_at': datetime.now().strftime('%Y-%m-%d %H:%M:%S') }, ensure_ascii=False) await self._redis.lpush(NOTIFICATIONS_KEY, notif) await self._redis.expire(NOTIFICATIONS_KEY, TTL_NOTIFICATIONS) except Exception as e: logger.debug(f"添加通知失败: {e}") async def get_notifications(self) -> list: """获取所有未读通知""" if not self._redis: return [] try: items = await self._redis.lrange(NOTIFICATIONS_KEY, 0, -1) return [json.loads(item) for item in items] except Exception as e: logger.debug(f"获取通知失败: {e}") return [] async def dismiss_notification(self, notif_id: str): """移除指定 id 的通知""" if not self._redis: return try: items = await self._redis.lrange(NOTIFICATIONS_KEY, 0, -1) for item in items: data = json.loads(item) if data.get('id') == notif_id: await self._redis.lrem(NOTIFICATIONS_KEY, 1, item) break except Exception as e: logger.debug(f"移除通知失败: {e}") async def dismiss_all_notifications(self): """清除所有通知""" if not self._redis: return try: await self._redis.delete(NOTIFICATIONS_KEY) except Exception: pass cache = RedisCache()