Files
cursornew2026/backend/app/services/auth_service.py
huangzhenpc ac19d029da backend v2.1: 公告管理功能 + 系统重构
- 新增 Announcement 数据模型,支持公告的增删改查
- 后台管理新增"公告管理"Tab(创建/编辑/删除/启用禁用)
- 客户端 /api/announcement 改为从数据库读取
- 账号服务重构,新增无感换号、自动分析等功能
- 新增后台任务调度器、数据库迁移脚本
- Schema/Service/Config 全面升级至 v2.1

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-09 19:58:05 +08:00

60 lines
2.0 KiB
Python

from datetime import datetime, timedelta, timezone
from typing import Optional
from jose import JWTError, jwt
from passlib.context import CryptContext
from fastapi import HTTPException, Depends, status
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from app.config import settings
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
security = HTTPBearer()
def verify_password(plain_password: str, hashed_password: str) -> bool:
return pwd_context.verify(plain_password, hashed_password)
def get_password_hash(password: str) -> str:
return pwd_context.hash(password)
def create_access_token(data: dict, expires_delta: Optional[timedelta] = None) -> str:
to_encode = data.copy()
if expires_delta:
expire = datetime.now(timezone.utc) + expires_delta
else:
expire = datetime.now(timezone.utc) + timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES)
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, settings.SECRET_KEY, algorithm=settings.ALGORITHM)
return encoded_jwt
def verify_token(token: str) -> dict:
try:
payload = jwt.decode(token, settings.SECRET_KEY, algorithms=[settings.ALGORITHM])
return payload
except JWTError:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="无效的认证凭据",
headers={"WWW-Authenticate": "Bearer"},
)
def authenticate_admin(username: str, password: str) -> bool:
"""验证管理员账号"""
return username == settings.ADMIN_USERNAME and password == settings.ADMIN_PASSWORD
async def get_current_user(credentials: HTTPAuthorizationCredentials = Depends(security)) -> dict:
"""获取当前用户"""
token = credentials.credentials
payload = verify_token(token)
username = payload.get("sub")
if username is None:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="无效的认证凭据"
)
return {"username": username}