152 lines
5.6 KiB
Python
152 lines
5.6 KiB
Python
"""权限服务实现。
|
|
|
|
从旧项目 app/rbac/permission_checker_v2.py 迁移,业务逻辑完全不变。
|
|
- 数据库驱动 GRANT/DENY 机制(DENY 优先级更高)
|
|
- 支持通配符:document:*:*、*:*:* 等
|
|
- 实时查询(无 Redis 缓存)
|
|
仅改造为 SQLAlchemy 会话 + 项目统一配置。
|
|
"""
|
|
|
|
from fastapi_common.fastapi_common_logger import logger
|
|
from fastapi_common.fastapi_common_sqlalchemy.database import GetAsyncSession
|
|
import time
|
|
|
|
from fastapi_modules.fastapi_leaudit.services.permissionService import IPermissionService
|
|
|
|
|
|
class PermissionServiceImpl(IPermissionService):
|
|
"""权限检查服务实现。"""
|
|
|
|
def __init__(self) -> None:
|
|
self._permission_cache: dict[int, tuple[float, tuple[set[str], set[str]]]] = {}
|
|
|
|
async def CheckPermission(self, UserId: int, PermissionKey: str) -> bool:
|
|
"""检查用户是否拥有指定权限。
|
|
|
|
GRANT/DENY 优先级:
|
|
1. 精确 DENY → 拒绝
|
|
2. 通配符 DENY → 拒绝
|
|
3. 精确 GRANT → 通过
|
|
4. 通配符 GRANT → 通过
|
|
5. 无匹配 → 拒绝
|
|
"""
|
|
try:
|
|
grants, denies = await self._getUserPermissions(UserId)
|
|
|
|
# DENY 优先
|
|
if PermissionKey in denies:
|
|
logger.debug(f"[DENY] 精确拒绝: user={UserId}, perm={PermissionKey}")
|
|
return False
|
|
|
|
if self._matchWildcard(PermissionKey, denies):
|
|
logger.debug(f"[DENY] 通配符拒绝: user={UserId}, perm={PermissionKey}")
|
|
return False
|
|
|
|
# GRANT
|
|
if PermissionKey in grants:
|
|
logger.debug(f"[GRANT] 精确授权: user={UserId}, perm={PermissionKey}")
|
|
return True
|
|
|
|
if self._matchWildcard(PermissionKey, grants):
|
|
logger.debug(f"[GRANT] 通配符授权: user={UserId}, perm={PermissionKey}")
|
|
return True
|
|
|
|
logger.debug(f"[DENY] 无匹配权限: user={UserId}, perm={PermissionKey}")
|
|
return False
|
|
|
|
except Exception as e:
|
|
logger.error(f"权限检查异常: user={UserId}, perm={PermissionKey}, error={e}")
|
|
return False # 安全优先:异常时拒绝
|
|
|
|
async def HasAnyPermission(self, UserId: int, PermissionKeys: list[str]) -> bool:
|
|
"""OR 逻辑:任一权限通过即返回 True。"""
|
|
for key in PermissionKeys:
|
|
if await self.CheckPermission(UserId, key):
|
|
return True
|
|
return False
|
|
|
|
async def HasAllPermissions(self, UserId: int, PermissionKeys: list[str]) -> bool:
|
|
"""AND 逻辑:全部权限通过才返回 True。"""
|
|
for key in PermissionKeys:
|
|
if not await self.CheckPermission(UserId, key):
|
|
return False
|
|
return True
|
|
|
|
# ------------------------------------------------------------------
|
|
# 内部方法
|
|
# ------------------------------------------------------------------
|
|
|
|
async def _getUserPermissions(self, UserId: int) -> tuple[set[str], set[str]]:
|
|
"""从数据库查询用户的 GRANT 和 DENY 权限集合。"""
|
|
cached = self._permission_cache.get(UserId)
|
|
now = time.monotonic()
|
|
if cached and now - cached[0] <= 60:
|
|
return cached[1]
|
|
|
|
grants: set[str] = set()
|
|
denies: set[str] = set()
|
|
|
|
async with GetAsyncSession() as session:
|
|
from sqlalchemy import text
|
|
|
|
result = await session.execute(
|
|
text(
|
|
"SELECT p.permission_key, rp.grant_type "
|
|
"FROM sso_users u "
|
|
"JOIN user_role ur ON u.id = ur.user_id "
|
|
"JOIN roles r ON ur.role_id = r.id "
|
|
"JOIN role_permissions rp ON r.id = rp.role_id "
|
|
"JOIN permissions p ON rp.permission_id = p.id "
|
|
"WHERE u.id = :uid"
|
|
),
|
|
{"uid": UserId},
|
|
)
|
|
|
|
rows = result.fetchall()
|
|
if not rows:
|
|
logger.warning(f"用户无角色或权限: user_id={UserId}")
|
|
return grants, denies
|
|
|
|
for row in rows:
|
|
permKey = row[0]
|
|
grantType = row[1]
|
|
if grantType == "GRANT":
|
|
grants.add(permKey)
|
|
elif grantType == "DENY":
|
|
denies.add(permKey)
|
|
|
|
logger.debug(f"用户权限: user={UserId}, grants={len(grants)}, denies={len(denies)}")
|
|
self._permission_cache[UserId] = (now, (grants, denies))
|
|
return grants, denies
|
|
|
|
@staticmethod
|
|
def _matchWildcard(PermissionKey: str, PermissionSet: set[str]) -> bool:
|
|
"""检查 PermissionKey 是否匹配集合中的任一通配符模式。"""
|
|
for pattern in PermissionSet:
|
|
if PermissionServiceImpl._wildcardMatch(PermissionKey, pattern):
|
|
return True
|
|
return False
|
|
|
|
@staticmethod
|
|
def _wildcardMatch(PermissionKey: str, Pattern: str) -> bool:
|
|
"""通配符匹配。
|
|
|
|
_wildcardMatch("document:read:all", "document:*:*") → True
|
|
_wildcardMatch("document:read:all", "document:read:*") → True
|
|
_wildcardMatch("document:read:all", "evaluation:*:*") → False
|
|
_wildcardMatch("document:read:all", "*:*:*") → True
|
|
"""
|
|
keyParts = PermissionKey.split(":")
|
|
patternParts = Pattern.split(":")
|
|
|
|
if len(keyParts) != len(patternParts):
|
|
return False
|
|
|
|
for keyPart, patternPart in zip(keyParts, patternParts):
|
|
if patternPart == "*":
|
|
continue
|
|
if keyPart != patternPart:
|
|
return False
|
|
|
|
return True
|