Detect refund/suspension status changes and generate notifications stored in Redis. Bell icon in navbar shows unread count badge, click to expand dropdown with dismiss per-item or read-all. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
164 lines
4.8 KiB
Python
164 lines
4.8 KiB
Python
#!/usr/bin/env python3
|
||
"""
|
||
Redis缓存模块
|
||
用于缓存账户列表、邮件数据等,加速页面加载
|
||
"""
|
||
|
||
import json
|
||
import logging
|
||
import os
|
||
import uuid
|
||
from datetime import datetime
|
||
from typing import Any, Optional
|
||
|
||
import redis.asyncio as redis
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
REDIS_URL = 'redis://:redis_XMiXNa@127.0.0.1:6379/0'
|
||
|
||
# 缓存过期时间(秒)
|
||
TTL_ACCOUNTS = 300 # 账户列表 5分钟
|
||
TTL_MESSAGES = 180 # 邮件列表 3分钟
|
||
TTL_PAYMENT = 600 # 支付状态 10分钟
|
||
TTL_NOTIFICATIONS = 604800 # 通知 7天
|
||
|
||
NOTIFICATIONS_KEY = 'notifications:unread'
|
||
|
||
|
||
class RedisCache:
|
||
"""Redis缓存管理器"""
|
||
|
||
def __init__(self):
|
||
self._redis: Optional[redis.Redis] = None
|
||
|
||
async def connect(self):
|
||
try:
|
||
self._redis = redis.from_url(REDIS_URL, decode_responses=True)
|
||
await self._redis.ping()
|
||
logger.info(f"Redis连接成功: {REDIS_URL}")
|
||
except Exception as e:
|
||
logger.warning(f"Redis连接失败,将跳过缓存: {e}")
|
||
self._redis = None
|
||
|
||
async def close(self):
|
||
if self._redis:
|
||
await self._redis.aclose()
|
||
|
||
@property
|
||
def available(self) -> bool:
|
||
return self._redis is not None
|
||
|
||
async def get(self, key: str) -> Optional[Any]:
|
||
if not self._redis:
|
||
return None
|
||
try:
|
||
val = await self._redis.get(key)
|
||
return json.loads(val) if val else None
|
||
except Exception as e:
|
||
logger.debug(f"Redis get 失败 [{key}]: {e}")
|
||
return None
|
||
|
||
async def set(self, key: str, value: Any, ttl: int = 300):
|
||
if not self._redis:
|
||
return
|
||
try:
|
||
await self._redis.set(key, json.dumps(value, ensure_ascii=False), ex=ttl)
|
||
except Exception as e:
|
||
logger.debug(f"Redis set 失败 [{key}]: {e}")
|
||
|
||
async def delete(self, key: str):
|
||
if not self._redis:
|
||
return
|
||
try:
|
||
await self._redis.delete(key)
|
||
except Exception:
|
||
pass
|
||
|
||
async def delete_pattern(self, pattern: str):
|
||
"""删除匹配模式的所有key"""
|
||
if not self._redis:
|
||
return
|
||
try:
|
||
async for key in self._redis.scan_iter(match=pattern, count=100):
|
||
await self._redis.delete(key)
|
||
except Exception:
|
||
pass
|
||
|
||
# ---- 业务快捷方法 ----
|
||
|
||
def accounts_key(self) -> str:
|
||
return "cache:accounts"
|
||
|
||
def messages_key(self, email: str, folder: str) -> str:
|
||
return f"cache:messages:{email}:{folder}"
|
||
|
||
def payment_key(self) -> str:
|
||
return "cache:payment_status"
|
||
|
||
async def invalidate_accounts(self):
|
||
await self.delete(self.accounts_key())
|
||
|
||
async def invalidate_messages(self, email: str):
|
||
await self.delete_pattern(f"cache:messages:{email}:*")
|
||
|
||
async def invalidate_payment(self):
|
||
await self.delete(self.payment_key())
|
||
|
||
|
||
# ---- 通知系统 ----
|
||
|
||
async def add_notification(self, email: str, ntype: str, message: str):
|
||
"""添加一条通知(LPUSH + EXPIRE)"""
|
||
if not self._redis:
|
||
return
|
||
try:
|
||
notif = json.dumps({
|
||
'id': str(uuid.uuid4()),
|
||
'email': email,
|
||
'type': ntype,
|
||
'message': message,
|
||
'created_at': datetime.now().strftime('%Y-%m-%d %H:%M:%S')
|
||
}, ensure_ascii=False)
|
||
await self._redis.lpush(NOTIFICATIONS_KEY, notif)
|
||
await self._redis.expire(NOTIFICATIONS_KEY, TTL_NOTIFICATIONS)
|
||
except Exception as e:
|
||
logger.debug(f"添加通知失败: {e}")
|
||
|
||
async def get_notifications(self) -> list:
|
||
"""获取所有未读通知"""
|
||
if not self._redis:
|
||
return []
|
||
try:
|
||
items = await self._redis.lrange(NOTIFICATIONS_KEY, 0, -1)
|
||
return [json.loads(item) for item in items]
|
||
except Exception as e:
|
||
logger.debug(f"获取通知失败: {e}")
|
||
return []
|
||
|
||
async def dismiss_notification(self, notif_id: str):
|
||
"""移除指定 id 的通知"""
|
||
if not self._redis:
|
||
return
|
||
try:
|
||
items = await self._redis.lrange(NOTIFICATIONS_KEY, 0, -1)
|
||
for item in items:
|
||
data = json.loads(item)
|
||
if data.get('id') == notif_id:
|
||
await self._redis.lrem(NOTIFICATIONS_KEY, 1, item)
|
||
break
|
||
except Exception as e:
|
||
logger.debug(f"移除通知失败: {e}")
|
||
|
||
async def dismiss_all_notifications(self):
|
||
"""清除所有通知"""
|
||
if not self._redis:
|
||
return
|
||
try:
|
||
await self._redis.delete(NOTIFICATIONS_KEY)
|
||
except Exception:
|
||
pass
|
||
|
||
|
||
cache = RedisCache()
|