Files
claude-outlonok/auth.py
huangzhenpc 889f4f15d5 Optimize database and deployment: fix SQL injection, async auth, persist data
- Move SQLite DB to data/ directory and track in git for portability
- Fix SQL injection in cleanup_old_emails (use parameterized query)
- Replace sync requests with async httpx in auth.py
- Enable WAL mode and foreign keys for SQLite
- Add UNIQUE constraint and foreign key to account_tags table
- Remove redundant indexes on primary key columns
- Mount data/ volume in docker-compose for persistence
- Remove unused requests dependency

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-06 00:57:35 +08:00

79 lines
2.7 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""
OAuth2认证模块
处理Microsoft OAuth2令牌获取和刷新
"""
import httpx
import logging
from typing import Optional
from fastapi import HTTPException
from config import CLIENT_ID, TOKEN_URL
logger = logging.getLogger(__name__)
# ============================================================================
# OAuth2令牌获取函数
# ============================================================================
async def get_access_token(refresh_token: str, check_only: bool = False, client_id: str = None) -> Optional[str]:
"""使用refresh_token获取access_token
Args:
refresh_token: 刷新令牌
check_only: 如果为True验证失败时返回None而不是抛出异常
client_id: 可选指定client_id默认使用全局CLIENT_ID
Returns:
成功返回access_token如果check_only=True且验证失败则返回None
"""
data = {
'client_id': client_id or CLIENT_ID,
'grant_type': 'refresh_token',
'refresh_token': refresh_token,
'scope': 'https://outlook.office.com/IMAP.AccessAsUser.All offline_access'
}
try:
async with httpx.AsyncClient(timeout=30.0) as client:
response = await client.post(TOKEN_URL, data=data)
response.raise_for_status()
token_data = response.json()
access_token = token_data.get('access_token')
if not access_token:
error_msg = f"获取 access_token 失败: {token_data.get('error_description', '响应中未找到 access_token')}"
logger.error(error_msg)
if check_only:
return None
raise HTTPException(status_code=401, detail=error_msg)
new_refresh_token = token_data.get('refresh_token')
if new_refresh_token and new_refresh_token != refresh_token:
logger.debug("提示: refresh_token 已被服务器更新")
return access_token
except httpx.HTTPStatusError as http_err:
logger.error(f"请求 access_token 时发生HTTP错误: {http_err}")
logger.error(f"服务器响应: {http_err.response.status_code} - {http_err.response.text}")
if check_only:
return None
raise HTTPException(status_code=401, detail="Refresh token已过期或无效需要重新获取授权")
except httpx.RequestError as e:
logger.error(f"请求 access_token 时发生网络错误: {e}")
if check_only:
return None
raise HTTPException(status_code=500, detail="Token acquisition failed")
except Exception as e:
logger.error(f"解析 access_token 响应时出错: {e}")
if check_only:
return None
raise HTTPException(status_code=500, detail="Token acquisition failed")