fix: 修复 govdoc 根路由的前端路由集识别 #13
@@ -1212,6 +1212,7 @@ class RbacAdminServiceImpl(IRbacAdminService):
|
|||||||
routeIds = sorted(set(Body.route_ids))
|
routeIds = sorted(set(Body.route_ids))
|
||||||
async with GetAsyncSession() as Session:
|
async with GetAsyncSession() as Session:
|
||||||
await self._getAdminRouteMap(Session, EnsureSeeds=True)
|
await self._getAdminRouteMap(Session, EnsureSeeds=True)
|
||||||
|
await self._assertCanManageTargetRole(Session, CurrentUserId, RoleId)
|
||||||
result = await self._updateRoleRoutesInSession(Session, RoleId, routeIds, Body.permission)
|
result = await self._updateRoleRoutesInSession(Session, RoleId, routeIds, Body.permission)
|
||||||
await Session.commit()
|
await Session.commit()
|
||||||
PermissionServiceImpl.InvalidateAll()
|
PermissionServiceImpl.InvalidateAll()
|
||||||
@@ -1243,6 +1244,7 @@ class RbacAdminServiceImpl(IRbacAdminService):
|
|||||||
await self._assertManageAndPermission(CurrentUserId, "rbac:role_permissions:write")
|
await self._assertManageAndPermission(CurrentUserId, "rbac:role_permissions:write")
|
||||||
async with GetAsyncSession() as Session:
|
async with GetAsyncSession() as Session:
|
||||||
await self._getAdminRouteMap(Session, EnsureSeeds=True)
|
await self._getAdminRouteMap(Session, EnsureSeeds=True)
|
||||||
|
await self._assertCanManageTargetRole(Session, CurrentUserId, Body.role_id)
|
||||||
await self._saveRolePermissionsInSession(Session, Body.role_id, Body.permissions, Body.replace, Body.replace_scope_permission_ids)
|
await self._saveRolePermissionsInSession(Session, Body.role_id, Body.permissions, Body.replace, Body.replace_scope_permission_ids)
|
||||||
await Session.commit()
|
await Session.commit()
|
||||||
PermissionServiceImpl.InvalidateAll()
|
PermissionServiceImpl.InvalidateAll()
|
||||||
@@ -1259,6 +1261,7 @@ class RbacAdminServiceImpl(IRbacAdminService):
|
|||||||
]
|
]
|
||||||
async with GetAsyncSession() as Session:
|
async with GetAsyncSession() as Session:
|
||||||
await self._getAdminRouteMap(Session, EnsureSeeds=True)
|
await self._getAdminRouteMap(Session, EnsureSeeds=True)
|
||||||
|
await self._assertCanManageTargetRole(Session, CurrentUserId, RoleId)
|
||||||
routeResult = await self._updateRoleRoutesInSession(Session, RoleId, routeIds, Body.route_permission)
|
routeResult = await self._updateRoleRoutesInSession(Session, RoleId, routeIds, Body.route_permission)
|
||||||
await self._saveRolePermissionsInSession(Session, RoleId, permissionConfigs, True, Body.replace_scope_permission_ids)
|
await self._saveRolePermissionsInSession(Session, RoleId, permissionConfigs, True, Body.replace_scope_permission_ids)
|
||||||
permissionResult = await self._getRolePermissionsInSession(Session, RoleId)
|
permissionResult = await self._getRolePermissionsInSession(Session, RoleId)
|
||||||
@@ -1539,7 +1542,8 @@ class RbacAdminServiceImpl(IRbacAdminService):
|
|||||||
{tenant_name_select},
|
{tenant_name_select},
|
||||||
COALESCE(bool_or(r.role_key IN ('super_admin', 'provincial_admin')), FALSE) AS is_global,
|
COALESCE(bool_or(r.role_key IN ('super_admin', 'provincial_admin')), FALSE) AS is_global,
|
||||||
COALESCE(bool_or(r.role_key IN ('super_admin', 'provincial_admin', 'admin')), FALSE) AS can_manage,
|
COALESCE(bool_or(r.role_key IN ('super_admin', 'provincial_admin', 'admin')), FALSE) AS can_manage,
|
||||||
COALESCE(bool_or(r.role_key = 'super_admin'), FALSE) AS is_super_admin
|
COALESCE(bool_or(r.role_key = 'super_admin'), FALSE) AS is_super_admin,
|
||||||
|
COALESCE(MAX(COALESCE(r.priority, 0)), 0) AS max_role_priority
|
||||||
FROM sso_users u
|
FROM sso_users u
|
||||||
LEFT JOIN user_role ur ON ur.user_id = u.id
|
LEFT JOIN user_role ur ON ur.user_id = u.id
|
||||||
LEFT JOIN roles r ON r.id = ur.role_id
|
LEFT JOIN roles r ON r.id = ur.role_id
|
||||||
@@ -1566,6 +1570,7 @@ class RbacAdminServiceImpl(IRbacAdminService):
|
|||||||
"is_global": bool(row["is_global"]),
|
"is_global": bool(row["is_global"]),
|
||||||
"can_manage": bool(row["can_manage"]),
|
"can_manage": bool(row["can_manage"]),
|
||||||
"is_super_admin": bool(row["is_super_admin"]),
|
"is_super_admin": bool(row["is_super_admin"]),
|
||||||
|
"max_role_priority": int(row["max_role_priority"] or 0),
|
||||||
}
|
}
|
||||||
|
|
||||||
async def _resolve_requested_scope(self, Session, RawValue: str | None) -> dict[str, str]:
|
async def _resolve_requested_scope(self, Session, RawValue: str | None) -> dict[str, str]:
|
||||||
@@ -1869,6 +1874,21 @@ class RbacAdminServiceImpl(IRbacAdminService):
|
|||||||
raise LeauditException(StatusCodeEnum.HTTP_404_NOT_FOUND, "角色不存在")
|
raise LeauditException(StatusCodeEnum.HTTP_404_NOT_FOUND, "角色不存在")
|
||||||
return row
|
return row
|
||||||
|
|
||||||
|
async def _assertCanManageTargetRole(self, Session, CurrentUserId: int, RoleId: int) -> None:
|
||||||
|
"""禁止低优先级角色编辑更高优先级角色的权限配置。"""
|
||||||
|
context = await self._getCurrentUserContext(CurrentUserId)
|
||||||
|
if context.get("is_super_admin"):
|
||||||
|
return
|
||||||
|
|
||||||
|
target = await self._getRoleRow(Session, RoleId)
|
||||||
|
currentPriority = int(context.get("max_role_priority") or 0)
|
||||||
|
targetPriority = int(target.get("priority") or 0)
|
||||||
|
if targetPriority > currentPriority:
|
||||||
|
raise LeauditException(
|
||||||
|
StatusCodeEnum.HTTP_403_FORBIDDEN,
|
||||||
|
"下级角色不可编辑上级角色的路由权限",
|
||||||
|
)
|
||||||
|
|
||||||
def _toRoleVo(self, Row) -> RoleVO:
|
def _toRoleVo(self, Row) -> RoleVO:
|
||||||
"""角色记录转 VO。"""
|
"""角色记录转 VO。"""
|
||||||
return RoleVO(
|
return RoleVO(
|
||||||
|
|||||||
@@ -849,6 +849,7 @@ class RbacServiceImpl(IRbacService):
|
|||||||
"/chat-with-llm",
|
"/chat-with-llm",
|
||||||
"/contract-template",
|
"/contract-template",
|
||||||
"/cross-checking",
|
"/cross-checking",
|
||||||
|
"/govdoc",
|
||||||
}
|
}
|
||||||
return any(path in frontendPaths for path in expected)
|
return any(path in frontendPaths for path in expected)
|
||||||
|
|
||||||
|
|||||||
+1
-1
Submodule legal-platform-frontend updated: 40ef991434...76795e2f15
@@ -4,6 +4,8 @@ import pytest
|
|||||||
from starlette.responses import JSONResponse
|
from starlette.responses import JSONResponse
|
||||||
|
|
||||||
from fastapi_modules.fastapi_leaudit.controllers.govdocController import GovdocController
|
from fastapi_modules.fastapi_leaudit.controllers.govdocController import GovdocController
|
||||||
|
from fastapi_modules.fastapi_leaudit.domian.vo.rbacVo import RbacRouteVO
|
||||||
|
from fastapi_modules.fastapi_leaudit.services.impl.rbacServiceImpl import RbacServiceImpl
|
||||||
|
|
||||||
|
|
||||||
class _DenyPermissionService:
|
class _DenyPermissionService:
|
||||||
@@ -126,3 +128,20 @@ async def test_govdoc_list_calls_service_when_permission_granted():
|
|||||||
|
|
||||||
assert response.data["total"] == 0
|
assert response.data["total"] == 0
|
||||||
assert service.list_called is True
|
assert service.list_called is True
|
||||||
|
|
||||||
|
|
||||||
|
def test_govdoc_root_route_marks_frontend_route_set_ready():
|
||||||
|
"""只有内部公文模块路由时也视为新版路由,避免兼容菜单补出列表和上传。"""
|
||||||
|
service = RbacServiceImpl()
|
||||||
|
routes = [
|
||||||
|
RbacRouteVO(
|
||||||
|
id=1,
|
||||||
|
route_path="/govdoc",
|
||||||
|
route_name="govdoc",
|
||||||
|
component="govdoc",
|
||||||
|
parent_id=None,
|
||||||
|
route_title="内部公文处理",
|
||||||
|
)
|
||||||
|
]
|
||||||
|
|
||||||
|
assert service._isFrontendRouteSetReady(routes) is True
|
||||||
|
|||||||
@@ -216,6 +216,41 @@ def test_rbac_seed_cache_reuses_recent_route_map():
|
|||||||
assert service._get_cached_admin_seed_route_map() == route_map
|
assert service._get_cached_admin_seed_route_map() == route_map
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_rbac_rejects_lower_role_editing_higher_role_permissions(monkeypatch):
|
||||||
|
service = RbacAdminServiceImpl()
|
||||||
|
|
||||||
|
async def fake_context(_current_user_id):
|
||||||
|
return {"is_super_admin": False, "max_role_priority": 50}
|
||||||
|
|
||||||
|
async def fake_role_row(_session, _role_id):
|
||||||
|
return {"id": 1, "priority": 90}
|
||||||
|
|
||||||
|
monkeypatch.setattr(service, "_getCurrentUserContext", fake_context)
|
||||||
|
monkeypatch.setattr(service, "_getRoleRow", fake_role_row)
|
||||||
|
|
||||||
|
with pytest.raises(LeauditException) as exc:
|
||||||
|
await service._assertCanManageTargetRole(None, 100, 1)
|
||||||
|
|
||||||
|
assert exc.value.status == StatusCodeEnum.HTTP_403_FORBIDDEN
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_rbac_allows_super_admin_editing_higher_role_permissions(monkeypatch):
|
||||||
|
service = RbacAdminServiceImpl()
|
||||||
|
|
||||||
|
async def fake_context(_current_user_id):
|
||||||
|
return {"is_super_admin": True, "max_role_priority": 100}
|
||||||
|
|
||||||
|
async def fail_if_called(_session, _role_id):
|
||||||
|
raise AssertionError("super_admin should bypass target role lookup")
|
||||||
|
|
||||||
|
monkeypatch.setattr(service, "_getCurrentUserContext", fake_context)
|
||||||
|
monkeypatch.setattr(service, "_getRoleRow", fail_if_called)
|
||||||
|
|
||||||
|
await service._assertCanManageTargetRole(None, 100, 1)
|
||||||
|
|
||||||
|
|
||||||
def test_permission_cache_is_shared_and_can_invalidate_user():
|
def test_permission_cache_is_shared_and_can_invalidate_user():
|
||||||
first = PermissionServiceImpl()
|
first = PermissionServiceImpl()
|
||||||
second = PermissionServiceImpl()
|
second = PermissionServiceImpl()
|
||||||
|
|||||||
Reference in New Issue
Block a user