Files
leaudit-platform-backend/fastapi_modules/fastapi_leaudit/services/impl/permissionServiceImpl.py
T

152 lines
5.6 KiB
Python

"""权限服务实现。
从旧项目 app/rbac/permission_checker_v2.py 迁移,业务逻辑完全不变。
- 数据库驱动 GRANT/DENY 机制(DENY 优先级更高)
- 支持通配符:document:*:*、*:*:* 等
- 实时查询(无 Redis 缓存)
仅改造为 SQLAlchemy 会话 + 项目统一配置。
"""
from fastapi_common.fastapi_common_logger import logger
from fastapi_common.fastapi_common_sqlalchemy.database import GetAsyncSession
import time
from fastapi_modules.fastapi_leaudit.services.permissionService import IPermissionService
class PermissionServiceImpl(IPermissionService):
"""权限检查服务实现。"""
def __init__(self) -> None:
self._permission_cache: dict[int, tuple[float, tuple[set[str], set[str]]]] = {}
async def CheckPermission(self, UserId: int, PermissionKey: str) -> bool:
"""检查用户是否拥有指定权限。
GRANT/DENY 优先级:
1. 精确 DENY → 拒绝
2. 通配符 DENY → 拒绝
3. 精确 GRANT → 通过
4. 通配符 GRANT → 通过
5. 无匹配 → 拒绝
"""
try:
grants, denies = await self._getUserPermissions(UserId)
# DENY 优先
if PermissionKey in denies:
logger.debug(f"[DENY] 精确拒绝: user={UserId}, perm={PermissionKey}")
return False
if self._matchWildcard(PermissionKey, denies):
logger.debug(f"[DENY] 通配符拒绝: user={UserId}, perm={PermissionKey}")
return False
# GRANT
if PermissionKey in grants:
logger.debug(f"[GRANT] 精确授权: user={UserId}, perm={PermissionKey}")
return True
if self._matchWildcard(PermissionKey, grants):
logger.debug(f"[GRANT] 通配符授权: user={UserId}, perm={PermissionKey}")
return True
logger.debug(f"[DENY] 无匹配权限: user={UserId}, perm={PermissionKey}")
return False
except Exception as e:
logger.error(f"权限检查异常: user={UserId}, perm={PermissionKey}, error={e}")
return False # 安全优先:异常时拒绝
async def HasAnyPermission(self, UserId: int, PermissionKeys: list[str]) -> bool:
"""OR 逻辑:任一权限通过即返回 True。"""
for key in PermissionKeys:
if await self.CheckPermission(UserId, key):
return True
return False
async def HasAllPermissions(self, UserId: int, PermissionKeys: list[str]) -> bool:
"""AND 逻辑:全部权限通过才返回 True。"""
for key in PermissionKeys:
if not await self.CheckPermission(UserId, key):
return False
return True
# ------------------------------------------------------------------
# 内部方法
# ------------------------------------------------------------------
async def _getUserPermissions(self, UserId: int) -> tuple[set[str], set[str]]:
"""从数据库查询用户的 GRANT 和 DENY 权限集合。"""
cached = self._permission_cache.get(UserId)
now = time.monotonic()
if cached and now - cached[0] <= 60:
return cached[1]
grants: set[str] = set()
denies: set[str] = set()
async with GetAsyncSession() as session:
from sqlalchemy import text
result = await session.execute(
text(
"SELECT p.permission_key, rp.grant_type "
"FROM sso_users u "
"JOIN user_role ur ON u.id = ur.user_id "
"JOIN roles r ON ur.role_id = r.id "
"JOIN role_permissions rp ON r.id = rp.role_id "
"JOIN permissions p ON rp.permission_id = p.id "
"WHERE u.id = :uid"
),
{"uid": UserId},
)
rows = result.fetchall()
if not rows:
logger.warning(f"用户无角色或权限: user_id={UserId}")
return grants, denies
for row in rows:
permKey = row[0]
grantType = row[1]
if grantType == "GRANT":
grants.add(permKey)
elif grantType == "DENY":
denies.add(permKey)
logger.debug(f"用户权限: user={UserId}, grants={len(grants)}, denies={len(denies)}")
self._permission_cache[UserId] = (now, (grants, denies))
return grants, denies
@staticmethod
def _matchWildcard(PermissionKey: str, PermissionSet: set[str]) -> bool:
"""检查 PermissionKey 是否匹配集合中的任一通配符模式。"""
for pattern in PermissionSet:
if PermissionServiceImpl._wildcardMatch(PermissionKey, pattern):
return True
return False
@staticmethod
def _wildcardMatch(PermissionKey: str, Pattern: str) -> bool:
"""通配符匹配。
_wildcardMatch("document:read:all", "document:*:*") → True
_wildcardMatch("document:read:all", "document:read:*") → True
_wildcardMatch("document:read:all", "evaluation:*:*") → False
_wildcardMatch("document:read:all", "*:*:*") → True
"""
keyParts = PermissionKey.split(":")
patternParts = Pattern.split(":")
if len(keyParts) != len(patternParts):
return False
for keyPart, patternPart in zip(keyParts, patternParts):
if patternPart == "*":
continue
if keyPart != patternPart:
return False
return True