diff --git a/fastapi_modules/fastapi_leaudit/services/impl/rbacAdminServiceImpl.py b/fastapi_modules/fastapi_leaudit/services/impl/rbacAdminServiceImpl.py index 8130d5e..9e0591c 100644 --- a/fastapi_modules/fastapi_leaudit/services/impl/rbacAdminServiceImpl.py +++ b/fastapi_modules/fastapi_leaudit/services/impl/rbacAdminServiceImpl.py @@ -1212,6 +1212,7 @@ class RbacAdminServiceImpl(IRbacAdminService): routeIds = sorted(set(Body.route_ids)) async with GetAsyncSession() as Session: await self._getAdminRouteMap(Session, EnsureSeeds=True) + await self._assertCanManageTargetRole(Session, CurrentUserId, RoleId) result = await self._updateRoleRoutesInSession(Session, RoleId, routeIds, Body.permission) await Session.commit() PermissionServiceImpl.InvalidateAll() @@ -1243,6 +1244,7 @@ class RbacAdminServiceImpl(IRbacAdminService): await self._assertManageAndPermission(CurrentUserId, "rbac:role_permissions:write") async with GetAsyncSession() as Session: await self._getAdminRouteMap(Session, EnsureSeeds=True) + await self._assertCanManageTargetRole(Session, CurrentUserId, Body.role_id) await self._saveRolePermissionsInSession(Session, Body.role_id, Body.permissions, Body.replace, Body.replace_scope_permission_ids) await Session.commit() PermissionServiceImpl.InvalidateAll() @@ -1259,6 +1261,7 @@ class RbacAdminServiceImpl(IRbacAdminService): ] async with GetAsyncSession() as Session: await self._getAdminRouteMap(Session, EnsureSeeds=True) + await self._assertCanManageTargetRole(Session, CurrentUserId, RoleId) routeResult = await self._updateRoleRoutesInSession(Session, RoleId, routeIds, Body.route_permission) await self._saveRolePermissionsInSession(Session, RoleId, permissionConfigs, True, Body.replace_scope_permission_ids) permissionResult = await self._getRolePermissionsInSession(Session, RoleId) @@ -1539,7 +1542,8 @@ class RbacAdminServiceImpl(IRbacAdminService): {tenant_name_select}, COALESCE(bool_or(r.role_key IN ('super_admin', 'provincial_admin')), FALSE) AS is_global, COALESCE(bool_or(r.role_key IN ('super_admin', 'provincial_admin', 'admin')), FALSE) AS can_manage, - COALESCE(bool_or(r.role_key = 'super_admin'), FALSE) AS is_super_admin + COALESCE(bool_or(r.role_key = 'super_admin'), FALSE) AS is_super_admin, + COALESCE(MAX(COALESCE(r.priority, 0)), 0) AS max_role_priority FROM sso_users u LEFT JOIN user_role ur ON ur.user_id = u.id LEFT JOIN roles r ON r.id = ur.role_id @@ -1566,6 +1570,7 @@ class RbacAdminServiceImpl(IRbacAdminService): "is_global": bool(row["is_global"]), "can_manage": bool(row["can_manage"]), "is_super_admin": bool(row["is_super_admin"]), + "max_role_priority": int(row["max_role_priority"] or 0), } async def _resolve_requested_scope(self, Session, RawValue: str | None) -> dict[str, str]: @@ -1869,6 +1874,21 @@ class RbacAdminServiceImpl(IRbacAdminService): raise LeauditException(StatusCodeEnum.HTTP_404_NOT_FOUND, "角色不存在") return row + async def _assertCanManageTargetRole(self, Session, CurrentUserId: int, RoleId: int) -> None: + """禁止低优先级角色编辑更高优先级角色的权限配置。""" + context = await self._getCurrentUserContext(CurrentUserId) + if context.get("is_super_admin"): + return + + target = await self._getRoleRow(Session, RoleId) + currentPriority = int(context.get("max_role_priority") or 0) + targetPriority = int(target.get("priority") or 0) + if targetPriority > currentPriority: + raise LeauditException( + StatusCodeEnum.HTTP_403_FORBIDDEN, + "下级角色不可编辑上级角色的路由权限", + ) + def _toRoleVo(self, Row) -> RoleVO: """角色记录转 VO。""" return RoleVO( diff --git a/fastapi_modules/fastapi_leaudit/services/impl/rbacServiceImpl.py b/fastapi_modules/fastapi_leaudit/services/impl/rbacServiceImpl.py index 0565b0e..5abf423 100644 --- a/fastapi_modules/fastapi_leaudit/services/impl/rbacServiceImpl.py +++ b/fastapi_modules/fastapi_leaudit/services/impl/rbacServiceImpl.py @@ -849,6 +849,7 @@ class RbacServiceImpl(IRbacService): "/chat-with-llm", "/contract-template", "/cross-checking", + "/govdoc", } return any(path in frontendPaths for path in expected) diff --git a/legal-platform-frontend b/legal-platform-frontend index 40ef991..76795e2 160000 --- a/legal-platform-frontend +++ b/legal-platform-frontend @@ -1 +1 @@ -Subproject commit 40ef991434f4890382f32ed6146e0ef076cf53fa +Subproject commit 76795e2f1543627965ec5033353a93281a6017d1 diff --git a/tests/test_govdoc_permissions.py b/tests/test_govdoc_permissions.py index 2d4200d..c264562 100644 --- a/tests/test_govdoc_permissions.py +++ b/tests/test_govdoc_permissions.py @@ -4,6 +4,8 @@ import pytest from starlette.responses import JSONResponse from fastapi_modules.fastapi_leaudit.controllers.govdocController import GovdocController +from fastapi_modules.fastapi_leaudit.domian.vo.rbacVo import RbacRouteVO +from fastapi_modules.fastapi_leaudit.services.impl.rbacServiceImpl import RbacServiceImpl class _DenyPermissionService: @@ -126,3 +128,20 @@ async def test_govdoc_list_calls_service_when_permission_granted(): assert response.data["total"] == 0 assert service.list_called is True + + +def test_govdoc_root_route_marks_frontend_route_set_ready(): + """只有内部公文模块路由时也视为新版路由,避免兼容菜单补出列表和上传。""" + service = RbacServiceImpl() + routes = [ + RbacRouteVO( + id=1, + route_path="/govdoc", + route_name="govdoc", + component="govdoc", + parent_id=None, + route_title="内部公文处理", + ) + ] + + assert service._isFrontendRouteSetReady(routes) is True diff --git a/tests/test_rule_write_scope.py b/tests/test_rule_write_scope.py index 419926c..160f95b 100644 --- a/tests/test_rule_write_scope.py +++ b/tests/test_rule_write_scope.py @@ -216,6 +216,41 @@ def test_rbac_seed_cache_reuses_recent_route_map(): assert service._get_cached_admin_seed_route_map() == route_map +@pytest.mark.asyncio +async def test_rbac_rejects_lower_role_editing_higher_role_permissions(monkeypatch): + service = RbacAdminServiceImpl() + + async def fake_context(_current_user_id): + return {"is_super_admin": False, "max_role_priority": 50} + + async def fake_role_row(_session, _role_id): + return {"id": 1, "priority": 90} + + monkeypatch.setattr(service, "_getCurrentUserContext", fake_context) + monkeypatch.setattr(service, "_getRoleRow", fake_role_row) + + with pytest.raises(LeauditException) as exc: + await service._assertCanManageTargetRole(None, 100, 1) + + assert exc.value.status == StatusCodeEnum.HTTP_403_FORBIDDEN + + +@pytest.mark.asyncio +async def test_rbac_allows_super_admin_editing_higher_role_permissions(monkeypatch): + service = RbacAdminServiceImpl() + + async def fake_context(_current_user_id): + return {"is_super_admin": True, "max_role_priority": 100} + + async def fail_if_called(_session, _role_id): + raise AssertionError("super_admin should bypass target role lookup") + + monkeypatch.setattr(service, "_getCurrentUserContext", fake_context) + monkeypatch.setattr(service, "_getRoleRow", fail_if_called) + + await service._assertCanManageTargetRole(None, 100, 1) + + def test_permission_cache_is_shared_and_can_invalidate_user(): first = PermissionServiceImpl() second = PermissionServiceImpl()