Files
claude-outlonok/cache.py
huangzhenpc a5fd90cb1e Add notification system for Claude payment status changes
Detect refund/suspension status changes and generate notifications
stored in Redis. Bell icon in navbar shows unread count badge,
click to expand dropdown with dismiss per-item or read-all.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-06 17:00:05 +08:00

164 lines
4.8 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""
Redis缓存模块
用于缓存账户列表、邮件数据等,加速页面加载
"""
import json
import logging
import os
import uuid
from datetime import datetime
from typing import Any, Optional
import redis.asyncio as redis
logger = logging.getLogger(__name__)
REDIS_URL = 'redis://:redis_XMiXNa@127.0.0.1:6379/0'
# 缓存过期时间(秒)
TTL_ACCOUNTS = 300 # 账户列表 5分钟
TTL_MESSAGES = 180 # 邮件列表 3分钟
TTL_PAYMENT = 600 # 支付状态 10分钟
TTL_NOTIFICATIONS = 604800 # 通知 7天
NOTIFICATIONS_KEY = 'notifications:unread'
class RedisCache:
"""Redis缓存管理器"""
def __init__(self):
self._redis: Optional[redis.Redis] = None
async def connect(self):
try:
self._redis = redis.from_url(REDIS_URL, decode_responses=True)
await self._redis.ping()
logger.info(f"Redis连接成功: {REDIS_URL}")
except Exception as e:
logger.warning(f"Redis连接失败将跳过缓存: {e}")
self._redis = None
async def close(self):
if self._redis:
await self._redis.aclose()
@property
def available(self) -> bool:
return self._redis is not None
async def get(self, key: str) -> Optional[Any]:
if not self._redis:
return None
try:
val = await self._redis.get(key)
return json.loads(val) if val else None
except Exception as e:
logger.debug(f"Redis get 失败 [{key}]: {e}")
return None
async def set(self, key: str, value: Any, ttl: int = 300):
if not self._redis:
return
try:
await self._redis.set(key, json.dumps(value, ensure_ascii=False), ex=ttl)
except Exception as e:
logger.debug(f"Redis set 失败 [{key}]: {e}")
async def delete(self, key: str):
if not self._redis:
return
try:
await self._redis.delete(key)
except Exception:
pass
async def delete_pattern(self, pattern: str):
"""删除匹配模式的所有key"""
if not self._redis:
return
try:
async for key in self._redis.scan_iter(match=pattern, count=100):
await self._redis.delete(key)
except Exception:
pass
# ---- 业务快捷方法 ----
def accounts_key(self) -> str:
return "cache:accounts"
def messages_key(self, email: str, folder: str) -> str:
return f"cache:messages:{email}:{folder}"
def payment_key(self) -> str:
return "cache:payment_status"
async def invalidate_accounts(self):
await self.delete(self.accounts_key())
async def invalidate_messages(self, email: str):
await self.delete_pattern(f"cache:messages:{email}:*")
async def invalidate_payment(self):
await self.delete(self.payment_key())
# ---- 通知系统 ----
async def add_notification(self, email: str, ntype: str, message: str):
"""添加一条通知LPUSH + EXPIRE"""
if not self._redis:
return
try:
notif = json.dumps({
'id': str(uuid.uuid4()),
'email': email,
'type': ntype,
'message': message,
'created_at': datetime.now().strftime('%Y-%m-%d %H:%M:%S')
}, ensure_ascii=False)
await self._redis.lpush(NOTIFICATIONS_KEY, notif)
await self._redis.expire(NOTIFICATIONS_KEY, TTL_NOTIFICATIONS)
except Exception as e:
logger.debug(f"添加通知失败: {e}")
async def get_notifications(self) -> list:
"""获取所有未读通知"""
if not self._redis:
return []
try:
items = await self._redis.lrange(NOTIFICATIONS_KEY, 0, -1)
return [json.loads(item) for item in items]
except Exception as e:
logger.debug(f"获取通知失败: {e}")
return []
async def dismiss_notification(self, notif_id: str):
"""移除指定 id 的通知"""
if not self._redis:
return
try:
items = await self._redis.lrange(NOTIFICATIONS_KEY, 0, -1)
for item in items:
data = json.loads(item)
if data.get('id') == notif_id:
await self._redis.lrem(NOTIFICATIONS_KEY, 1, item)
break
except Exception as e:
logger.debug(f"移除通知失败: {e}")
async def dismiss_all_notifications(self):
"""清除所有通知"""
if not self._redis:
return
try:
await self._redis.delete(NOTIFICATIONS_KEY)
except Exception:
pass
cache = RedisCache()