"""权限服务实现。 从旧项目 app/rbac/permission_checker_v2.py 迁移,业务逻辑完全不变。 - 数据库驱动 GRANT/DENY 机制(DENY 优先级更高) - 支持通配符:document:*:*、*:*:* 等 - 实时查询(无 Redis 缓存) 仅改造为 SQLAlchemy 会话 + 项目统一配置。 """ from fastapi_common.fastapi_common_logger import logger from fastapi_common.fastapi_common_sqlalchemy.database import GetAsyncSession import time from fastapi_modules.fastapi_leaudit.services.permissionService import IPermissionService class PermissionServiceImpl(IPermissionService): """权限检查服务实现。""" def __init__(self) -> None: self._permission_cache: dict[int, tuple[float, tuple[set[str], set[str]]]] = {} async def CheckPermission(self, UserId: int, PermissionKey: str) -> bool: """检查用户是否拥有指定权限。 GRANT/DENY 优先级: 1. 精确 DENY → 拒绝 2. 通配符 DENY → 拒绝 3. 精确 GRANT → 通过 4. 通配符 GRANT → 通过 5. 无匹配 → 拒绝 """ try: grants, denies = await self._getUserPermissions(UserId) # DENY 优先 if PermissionKey in denies: logger.debug(f"[DENY] 精确拒绝: user={UserId}, perm={PermissionKey}") return False if self._matchWildcard(PermissionKey, denies): logger.debug(f"[DENY] 通配符拒绝: user={UserId}, perm={PermissionKey}") return False # GRANT if PermissionKey in grants: logger.debug(f"[GRANT] 精确授权: user={UserId}, perm={PermissionKey}") return True if self._matchWildcard(PermissionKey, grants): logger.debug(f"[GRANT] 通配符授权: user={UserId}, perm={PermissionKey}") return True logger.debug(f"[DENY] 无匹配权限: user={UserId}, perm={PermissionKey}") return False except Exception as e: logger.error(f"权限检查异常: user={UserId}, perm={PermissionKey}, error={e}") return False # 安全优先:异常时拒绝 async def HasAnyPermission(self, UserId: int, PermissionKeys: list[str]) -> bool: """OR 逻辑:任一权限通过即返回 True。""" for key in PermissionKeys: if await self.CheckPermission(UserId, key): return True return False async def HasAllPermissions(self, UserId: int, PermissionKeys: list[str]) -> bool: """AND 逻辑:全部权限通过才返回 True。""" for key in PermissionKeys: if not await self.CheckPermission(UserId, key): return False return True # ------------------------------------------------------------------ # 内部方法 # ------------------------------------------------------------------ async def _getUserPermissions(self, UserId: int) -> tuple[set[str], set[str]]: """从数据库查询用户的 GRANT 和 DENY 权限集合。""" cached = self._permission_cache.get(UserId) now = time.monotonic() if cached and now - cached[0] <= 60: return cached[1] grants: set[str] = set() denies: set[str] = set() async with GetAsyncSession() as session: from sqlalchemy import text result = await session.execute( text( "SELECT p.permission_key, rp.grant_type " "FROM sso_users u " "JOIN user_role ur ON u.id = ur.user_id " "JOIN roles r ON ur.role_id = r.id " "JOIN role_permissions rp ON r.id = rp.role_id " "JOIN permissions p ON rp.permission_id = p.id " "WHERE u.id = :uid" ), {"uid": UserId}, ) rows = result.fetchall() if not rows: logger.warning(f"用户无角色或权限: user_id={UserId}") return grants, denies for row in rows: permKey = row[0] grantType = row[1] if grantType == "GRANT": grants.add(permKey) elif grantType == "DENY": denies.add(permKey) logger.debug(f"用户权限: user={UserId}, grants={len(grants)}, denies={len(denies)}") self._permission_cache[UserId] = (now, (grants, denies)) return grants, denies @staticmethod def _matchWildcard(PermissionKey: str, PermissionSet: set[str]) -> bool: """检查 PermissionKey 是否匹配集合中的任一通配符模式。""" for pattern in PermissionSet: if PermissionServiceImpl._wildcardMatch(PermissionKey, pattern): return True return False @staticmethod def _wildcardMatch(PermissionKey: str, Pattern: str) -> bool: """通配符匹配。 _wildcardMatch("document:read:all", "document:*:*") → True _wildcardMatch("document:read:all", "document:read:*") → True _wildcardMatch("document:read:all", "evaluation:*:*") → False _wildcardMatch("document:read:all", "*:*:*") → True """ keyParts = PermissionKey.split(":") patternParts = Pattern.split(":") if len(keyParts) != len(patternParts): return False for keyPart, patternPart in zip(keyParts, patternParts): if patternPart == "*": continue if keyPart != patternPart: return False return True