From 33255e823f1a0ceab0bf127227d8c31246183701 Mon Sep 17 00:00:00 2001 From: wren <“porlong@qq.com”> Date: Thu, 30 Apr 2026 10:36:38 +0800 Subject: [PATCH] fix: enforce fine-grained rbac permissions on role CRUD endpoints Add _assertPermission() that checks role_permissions table for specific permission keys (super_admin bypasses). Wire it into CreateRole (rbac:roles:create), UpdateRole (rbac:roles:update), and DeleteRole (rbac:roles:delete). Previously only the coarse can_manage role check was enforced, making the permission grants in role_permissions purely cosmetic for these endpoints. --- .../services/impl/rbacAdminServiceImpl.py | 34 +++++++++++++++++-- 1 file changed, 32 insertions(+), 2 deletions(-) diff --git a/fastapi_modules/fastapi_leaudit/services/impl/rbacAdminServiceImpl.py b/fastapi_modules/fastapi_leaudit/services/impl/rbacAdminServiceImpl.py index a86c964..6e1e028 100644 --- a/fastapi_modules/fastapi_leaudit/services/impl/rbacAdminServiceImpl.py +++ b/fastapi_modules/fastapi_leaudit/services/impl/rbacAdminServiceImpl.py @@ -186,6 +186,7 @@ class RbacAdminServiceImpl(IRbacAdminService): async def CreateRole(self, CurrentUserId: int, Body: RoleCreateDTO) -> RoleVO: """创建角色。""" await self._assertManagePermission(CurrentUserId) + await self._assertPermission(CurrentUserId, "rbac:roles:create") async with GetAsyncSession() as Session: row = ( await Session.execute( @@ -211,6 +212,7 @@ class RbacAdminServiceImpl(IRbacAdminService): async def UpdateRole(self, CurrentUserId: int, RoleId: int, Body: RoleUpdateDTO) -> RoleVO: """更新角色。""" await self._assertManagePermission(CurrentUserId) + await self._assertPermission(CurrentUserId, "rbac:roles:update") async with GetAsyncSession() as Session: current = await self._getRoleRow(Session, RoleId) row = ( @@ -244,6 +246,7 @@ class RbacAdminServiceImpl(IRbacAdminService): async def DeleteRole(self, CurrentUserId: int, RoleId: int, Force: bool) -> None: """删除角色。""" await self._assertManagePermission(CurrentUserId) + await self._assertPermission(CurrentUserId, "rbac:roles:delete") async with GetAsyncSession() as Session: role = await self._getRoleRow(Session, RoleId) if role["is_system_role"]: @@ -565,6 +568,32 @@ class RbacAdminServiceImpl(IRbacAdminService): if not context["can_manage"]: raise LeauditException(StatusCodeEnum.HTTP_403_FORBIDDEN, "当前用户没有系统设置管理权限") + async def _assertPermission(self, CurrentUserId: int, PermissionKey: str) -> None: + """校验当前用户是否具备特定细粒度权限。super_admin 自动放行。""" + context = await self._getCurrentUserContext(CurrentUserId) + if context["is_super_admin"]: + return + async with GetAsyncSession() as Session: + row = ( + await Session.execute( + text( + """ + SELECT 1 + FROM role_permissions rp + JOIN permissions p ON p.id = rp.permission_id + JOIN user_role ur ON ur.role_id = rp.role_id + WHERE ur.user_id = :user_id + AND p.permission_key = :permission_key + AND rp.grant_type = 'GRANT' + LIMIT 1 + """ + ), + {"user_id": CurrentUserId, "permission_key": PermissionKey}, + ) + ).first() + if not row: + raise LeauditException(StatusCodeEnum.HTTP_403_FORBIDDEN, f"缺少权限: {PermissionKey}") + async def _getCurrentUserContext(self, CurrentUserId: int) -> dict[str, Any]: """加载当前用户上下文。""" async with GetAsyncSession() as Session: @@ -576,7 +605,8 @@ class RbacAdminServiceImpl(IRbacAdminService): u.id, COALESCE(u.area, '') AS area, COALESCE(bool_or(r.role_key IN ('super_admin', 'provincial_admin')), FALSE) AS is_global, - COALESCE(bool_or(r.role_key IN ('super_admin', 'provincial_admin', 'admin')), FALSE) AS can_manage + COALESCE(bool_or(r.role_key IN ('super_admin', 'provincial_admin', 'admin')), FALSE) AS can_manage, + COALESCE(bool_or(r.role_key = 'super_admin'), FALSE) AS is_super_admin FROM sso_users u LEFT JOIN user_role ur ON ur.user_id = u.id LEFT JOIN roles r ON r.id = ur.role_id @@ -589,7 +619,7 @@ class RbacAdminServiceImpl(IRbacAdminService): ).mappings().first() if not row: raise LeauditException(StatusCodeEnum.HTTP_404_NOT_FOUND, "当前用户不存在") - return {"area": str(row["area"] or ""), "is_global": bool(row["is_global"]), "can_manage": bool(row["can_manage"])} + return {"area": str(row["area"] or ""), "is_global": bool(row["is_global"]), "can_manage": bool(row["can_manage"]), "is_super_admin": bool(row["is_super_admin"])} async def _ensureAdminSeeds(self, Session) -> dict[str, int]: """确保系统设置所需路由和权限定义已存在。"""