from datetime import datetime, timedelta from typing import Optional from jose import JWTError, jwt from passlib.context import CryptContext from fastapi import HTTPException, Depends, status from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials from app.config import settings pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto") security = HTTPBearer() def verify_password(plain_password: str, hashed_password: str) -> bool: return pwd_context.verify(plain_password, hashed_password) def get_password_hash(password: str) -> str: return pwd_context.hash(password) def create_access_token(data: dict, expires_delta: Optional[timedelta] = None) -> str: to_encode = data.copy() if expires_delta: expire = datetime.utcnow() + expires_delta else: expire = datetime.utcnow() + timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES) to_encode.update({"exp": expire}) encoded_jwt = jwt.encode(to_encode, settings.SECRET_KEY, algorithm=settings.ALGORITHM) return encoded_jwt def verify_token(token: str) -> dict: try: payload = jwt.decode(token, settings.SECRET_KEY, algorithms=[settings.ALGORITHM]) return payload except JWTError: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="无效的认证凭据", headers={"WWW-Authenticate": "Bearer"}, ) def authenticate_admin(username: str, password: str) -> bool: """验证管理员账号""" return username == settings.ADMIN_USERNAME and password == settings.ADMIN_PASSWORD async def get_current_user(credentials: HTTPAuthorizationCredentials = Depends(security)) -> dict: """获取当前用户""" token = credentials.credentials payload = verify_token(token) username = payload.get("sub") if username is None: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="无效的认证凭据" ) return {"username": username}