Add Redis caching, email refresh button, optimize page loading

- Add Redis service (docker-compose) for caching accounts, messages, payment status
- Cache accounts list (5min), messages (3min), payment status (10min)
- Auto-invalidate cache on import/delete/payment-check/note-update
- Add refresh button to email list panel (force re-fetch from IMAP)
- Messages API supports refresh=true param to bypass cache
- New cache.py module with RedisCache class

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-03-06 01:26:42 +08:00
parent 197c969e41
commit e96b2e1b4a
8 changed files with 210 additions and 20 deletions

View File

@@ -29,6 +29,7 @@ from models import (
)
from config import CLIENT_ID, ADMIN_TOKEN, DEFAULT_EMAIL_LIMIT, logger
from imap_client import IMAPEmailClient
from cache import cache, TTL_ACCOUNTS, TTL_MESSAGES, TTL_PAYMENT
# ============================================================================
# 辅助函数
@@ -309,10 +310,12 @@ async def lifespan(app: FastAPI):
"""应用程序生命周期管理"""
logger.info("启动邮件管理系统...")
logger.info("初始化数据库...")
await cache.connect()
yield
logger.info("正在关闭邮件管理系统...")
try:
await email_manager.cleanup_all()
await cache.close()
db_manager.close()
except Exception as e:
logger.error(f"清理系统资源时出错: {e}")
@@ -404,10 +407,12 @@ async def admin_js():
# ============================================================================
@app.get("/api/messages")
async def get_messages(email: str, top: int = None, folder: str = "INBOX") -> ApiResponse:
async def get_messages(email: str, top: int = None, folder: str = "INBOX",
refresh: bool = False) -> ApiResponse:
"""获取邮件列表(包含完整内容)
优化:一次性返回邮件的完整信息,前端可以缓存
优化:一次性返回邮件的完整信息,Redis缓存加速
refresh=true 时跳过缓存强制从IMAP拉取
"""
email = email.strip()
@@ -418,9 +423,19 @@ async def get_messages(email: str, top: int = None, folder: str = "INBOX") -> Ap
if top is None:
top = await get_system_config_value('email_limit', DEFAULT_EMAIL_LIMIT)
# 尝试读缓存
cache_key = cache.messages_key(email, folder)
if not refresh:
cached = await cache.get(cache_key)
if cached:
# 按请求的top截取
return ApiResponse(success=True, data=cached[:top])
try:
# 使用优化后的get_messages返回完整邮件内容
messages = await email_manager.get_messages(email, top, folder)
# 写入缓存
await cache.set(cache_key, messages, TTL_MESSAGES)
return ApiResponse(success=True, data=messages)
except HTTPException as e:
return ApiResponse(success=False, message=e.detail)
@@ -612,28 +627,33 @@ async def get_accounts_detailed(q: Optional[str] = None,
page_size: int = 10) -> ApiResponse:
"""分页返回完整账号信息email, password, client_id, refresh_token"""
try:
accounts_dict = await load_accounts_config()
emails = sorted(accounts_dict.keys())
# 尝试从缓存读取全量账户
cached = await cache.get(cache.accounts_key())
if cached:
all_items = cached
else:
accounts_dict = await load_accounts_config()
all_items = []
for e in sorted(accounts_dict.keys()):
info = accounts_dict[e]
all_items.append({
"email": e,
"password": info.get("password", ""),
"client_id": info.get("client_id", ""),
"refresh_token": info.get("refresh_token", "")
})
await cache.set(cache.accounts_key(), all_items, TTL_ACCOUNTS)
# 搜索过滤
if q:
q_lower = q.strip().lower()
emails = [e for e in emails if q_lower in e.lower()]
all_items = [i for i in all_items if q_lower in i["email"].lower()]
total = len(emails)
total = len(all_items)
page = max(1, page)
page_size = max(1, min(100, page_size))
start = (page - 1) * page_size
end = start + page_size
items = []
for e in emails[start:end]:
info = accounts_dict[e]
items.append({
"email": e,
"password": info.get("password", ""),
"client_id": info.get("client_id", ""),
"refresh_token": info.get("refresh_token", "")
})
items = all_items[start:start + page_size]
return ApiResponse(
success=True,
@@ -669,6 +689,8 @@ async def delete_single_account(email: str) -> ApiResponse:
pass
del email_manager.clients[email]
email_manager._accounts = None # 强制重新加载
await cache.invalidate_accounts()
await cache.invalidate_messages(email)
return ApiResponse(success=True, message=f"已删除账户 {email}")
return ApiResponse(success=False, message="删除失败")
except Exception as e:
@@ -765,6 +787,7 @@ async def import_accounts_simple(request: dict) -> ApiResponse:
errors.append(f"{line_num}行处理失败: {str(ex)}")
email_manager._accounts = None
await cache.invalidate_accounts()
msg = f"导入完成:新增 {added},更新 {updated},跳过 {skipped}"
if errors:
msg += f",错误 {len(errors)}"
@@ -1218,6 +1241,7 @@ async def check_claude_payment():
if i < total:
await asyncio.sleep(0.5)
await cache.invalidate_payment()
yield f"data: {json.dumps({'type': 'done', 'total': total})}\n\n"
return StreamingResponse(event_generator(), media_type="text/event-stream")
@@ -1227,6 +1251,7 @@ async def check_claude_payment_single(email: str) -> ApiResponse:
"""检测单个账户的Claude支付状态"""
email = email.strip()
result = await _check_claude_payment_for_account(email)
await cache.invalidate_payment()
if result.get('status') == 'error':
return ApiResponse(success=False, message=result.get('message', '检测失败'), data=result)
return ApiResponse(success=True, data=result)
@@ -1241,13 +1266,18 @@ async def update_claude_payment_note(email: str, request: dict) -> ApiResponse:
fields[key] = request[key]
ok = await db_manager.update_claude_payment_note(email, **fields)
if ok:
await cache.invalidate_payment()
return ApiResponse(success=True, message="保存成功")
return ApiResponse(success=False, message="保存失败")
@app.get("/api/tools/claude-payment-status")
async def get_claude_payment_status() -> ApiResponse:
"""获取所有账户的Claude支付缓存状态"""
cached = await cache.get(cache.payment_key())
if cached:
return ApiResponse(success=True, data=cached)
statuses = await db_manager.get_all_claude_payment_statuses()
await cache.set(cache.payment_key(), statuses, TTL_PAYMENT)
return ApiResponse(success=True, data=statuses)
# ============================================================================