feat: bootstrap user rbac foundation
This commit is contained in:
@@ -5,19 +5,40 @@ from __future__ import annotations
|
||||
from typing import Any
|
||||
|
||||
import jwt
|
||||
from fastapi import Request
|
||||
from fastapi import HTTPException, Request, status
|
||||
|
||||
from fastapi_admin.config import JWT_SECRET_KEY, JWT_ALGORITHM
|
||||
from fastapi_common.fastapi_common_security.jwtService import JwtService
|
||||
|
||||
|
||||
def verify_access_token(RequestObj: Request) -> dict[str, Any]:
|
||||
"""验证 JWT access token 并返回 payload。"""
|
||||
"""验证 JWT access token 并返回 payload。
|
||||
|
||||
认证失败必须直接返回 401,不能静默放行为空 payload,
|
||||
否则受保护接口会在后续逻辑里变成“假鉴权”。
|
||||
"""
|
||||
auth = RequestObj.headers.get("Authorization", "")
|
||||
if not auth.startswith("Bearer "):
|
||||
return {}
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_401_UNAUTHORIZED,
|
||||
detail="缺少有效的 Authorization Bearer Token",
|
||||
)
|
||||
|
||||
token = auth.removeprefix("Bearer ").strip()
|
||||
try:
|
||||
payload = jwt.decode(token, JWT_SECRET_KEY, algorithms=[JWT_ALGORITHM])
|
||||
payload = JwtService.verify(token)
|
||||
if payload.get("type") != "access":
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_401_UNAUTHORIZED,
|
||||
detail="无效的访问令牌类型",
|
||||
)
|
||||
return payload
|
||||
except jwt.ExpiredSignatureError as exc:
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_401_UNAUTHORIZED,
|
||||
detail="访问令牌已过期",
|
||||
) from exc
|
||||
except jwt.PyJWTError:
|
||||
return {}
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_401_UNAUTHORIZED,
|
||||
detail="访问令牌无效",
|
||||
)
|
||||
|
||||
Reference in New Issue
Block a user