fix: enforce fine-grained rbac permissions on role CRUD endpoints
Add _assertPermission() that checks role_permissions table for specific permission keys (super_admin bypasses). Wire it into CreateRole (rbac:roles:create), UpdateRole (rbac:roles:update), and DeleteRole (rbac:roles:delete). Previously only the coarse can_manage role check was enforced, making the permission grants in role_permissions purely cosmetic for these endpoints.
This commit is contained in:
@@ -186,6 +186,7 @@ class RbacAdminServiceImpl(IRbacAdminService):
|
||||
async def CreateRole(self, CurrentUserId: int, Body: RoleCreateDTO) -> RoleVO:
|
||||
"""创建角色。"""
|
||||
await self._assertManagePermission(CurrentUserId)
|
||||
await self._assertPermission(CurrentUserId, "rbac:roles:create")
|
||||
async with GetAsyncSession() as Session:
|
||||
row = (
|
||||
await Session.execute(
|
||||
@@ -211,6 +212,7 @@ class RbacAdminServiceImpl(IRbacAdminService):
|
||||
async def UpdateRole(self, CurrentUserId: int, RoleId: int, Body: RoleUpdateDTO) -> RoleVO:
|
||||
"""更新角色。"""
|
||||
await self._assertManagePermission(CurrentUserId)
|
||||
await self._assertPermission(CurrentUserId, "rbac:roles:update")
|
||||
async with GetAsyncSession() as Session:
|
||||
current = await self._getRoleRow(Session, RoleId)
|
||||
row = (
|
||||
@@ -244,6 +246,7 @@ class RbacAdminServiceImpl(IRbacAdminService):
|
||||
async def DeleteRole(self, CurrentUserId: int, RoleId: int, Force: bool) -> None:
|
||||
"""删除角色。"""
|
||||
await self._assertManagePermission(CurrentUserId)
|
||||
await self._assertPermission(CurrentUserId, "rbac:roles:delete")
|
||||
async with GetAsyncSession() as Session:
|
||||
role = await self._getRoleRow(Session, RoleId)
|
||||
if role["is_system_role"]:
|
||||
@@ -565,6 +568,32 @@ class RbacAdminServiceImpl(IRbacAdminService):
|
||||
if not context["can_manage"]:
|
||||
raise LeauditException(StatusCodeEnum.HTTP_403_FORBIDDEN, "当前用户没有系统设置管理权限")
|
||||
|
||||
async def _assertPermission(self, CurrentUserId: int, PermissionKey: str) -> None:
|
||||
"""校验当前用户是否具备特定细粒度权限。super_admin 自动放行。"""
|
||||
context = await self._getCurrentUserContext(CurrentUserId)
|
||||
if context["is_super_admin"]:
|
||||
return
|
||||
async with GetAsyncSession() as Session:
|
||||
row = (
|
||||
await Session.execute(
|
||||
text(
|
||||
"""
|
||||
SELECT 1
|
||||
FROM role_permissions rp
|
||||
JOIN permissions p ON p.id = rp.permission_id
|
||||
JOIN user_role ur ON ur.role_id = rp.role_id
|
||||
WHERE ur.user_id = :user_id
|
||||
AND p.permission_key = :permission_key
|
||||
AND rp.grant_type = 'GRANT'
|
||||
LIMIT 1
|
||||
"""
|
||||
),
|
||||
{"user_id": CurrentUserId, "permission_key": PermissionKey},
|
||||
)
|
||||
).first()
|
||||
if not row:
|
||||
raise LeauditException(StatusCodeEnum.HTTP_403_FORBIDDEN, f"缺少权限: {PermissionKey}")
|
||||
|
||||
async def _getCurrentUserContext(self, CurrentUserId: int) -> dict[str, Any]:
|
||||
"""加载当前用户上下文。"""
|
||||
async with GetAsyncSession() as Session:
|
||||
@@ -576,7 +605,8 @@ class RbacAdminServiceImpl(IRbacAdminService):
|
||||
u.id,
|
||||
COALESCE(u.area, '') AS area,
|
||||
COALESCE(bool_or(r.role_key IN ('super_admin', 'provincial_admin')), FALSE) AS is_global,
|
||||
COALESCE(bool_or(r.role_key IN ('super_admin', 'provincial_admin', 'admin')), FALSE) AS can_manage
|
||||
COALESCE(bool_or(r.role_key IN ('super_admin', 'provincial_admin', 'admin')), FALSE) AS can_manage,
|
||||
COALESCE(bool_or(r.role_key = 'super_admin'), FALSE) AS is_super_admin
|
||||
FROM sso_users u
|
||||
LEFT JOIN user_role ur ON ur.user_id = u.id
|
||||
LEFT JOIN roles r ON r.id = ur.role_id
|
||||
@@ -589,7 +619,7 @@ class RbacAdminServiceImpl(IRbacAdminService):
|
||||
).mappings().first()
|
||||
if not row:
|
||||
raise LeauditException(StatusCodeEnum.HTTP_404_NOT_FOUND, "当前用户不存在")
|
||||
return {"area": str(row["area"] or ""), "is_global": bool(row["is_global"]), "can_manage": bool(row["can_manage"])}
|
||||
return {"area": str(row["area"] or ""), "is_global": bool(row["is_global"]), "can_manage": bool(row["can_manage"]), "is_super_admin": bool(row["is_super_admin"])}
|
||||
|
||||
async def _ensureAdminSeeds(self, Session) -> dict[str, int]:
|
||||
"""确保系统设置所需路由和权限定义已存在。"""
|
||||
|
||||
Reference in New Issue
Block a user