diff --git a/fastapi_modules/fastapi_leaudit/controllers/documentController.py b/fastapi_modules/fastapi_leaudit/controllers/documentController.py index 41e8d08..287de6f 100644 --- a/fastapi_modules/fastapi_leaudit/controllers/documentController.py +++ b/fastapi_modules/fastapi_leaudit/controllers/documentController.py @@ -55,6 +55,13 @@ class DocumentController(BaseController): """文档控制器。""" _CROSS_REVIEW_DOCUMENT_READ_PERMISSION = "cross_review:document:read" + _DOCUMENT_TYPE_PERMISSIONS = { + "list": "doc_type:list:read", + "detail": "doc_type:detail:read", + "create": "doc_type:create:write", + "update": "doc_type:update:write", + "delete": "doc_type:delete:delete", + } @staticmethod def _tenant_context(payload: dict[str, Any]) -> dict[str, str | None]: @@ -296,8 +303,16 @@ class DocumentController(BaseController): async def ListDocumentTypes( ids: str | None = Query(None, description="逗号分隔的ID列表,不传则返回全部"), entry_module_id: int | None = Query(None, description="按入口模块ID过滤文档类型"), + payload: dict[str, Any] = Depends(verify_access_token), ): """获取文档类型列表。""" + deniedResponse = await self._deny_document_type_without_permission( + int(payload["user_id"]), + self._DOCUMENT_TYPE_PERMISSIONS["list"], + "当前用户没有文档类型列表权限", + ) + if deniedResponse: + return deniedResponse idList: list[int] | None = None if ids: idList = [int(x.strip()) for x in ids.split(",") if x.strip().isdigit()] @@ -305,52 +320,109 @@ class DocumentController(BaseController): return Result.success(data=Data) @self.router.get("/document-types/{TypeId}", response_model=Result[DocumentTypeItemVO]) - async def GetDocumentType(TypeId: int): + async def GetDocumentType(TypeId: int, payload: dict[str, Any] = Depends(verify_access_token)): """获取文档类型详情。""" + deniedResponse = await self._deny_document_type_without_permission( + int(payload["user_id"]), + self._DOCUMENT_TYPE_PERMISSIONS["detail"], + "当前用户没有文档类型详情权限", + ) + if deniedResponse: + return deniedResponse Data = await self.DocumentService.GetDocumentType(Id=TypeId) return Result.success(data=Data) @self.router.post("/document-types", response_model=Result[DocumentTypeItemVO]) - async def CreateDocumentType(Body: DocumentTypeCreateDTO): + async def CreateDocumentType(Body: DocumentTypeCreateDTO, payload: dict[str, Any] = Depends(verify_access_token)): """创建文档类型。""" + deniedResponse = await self._deny_document_type_without_permission( + int(payload["user_id"]), + self._DOCUMENT_TYPE_PERMISSIONS["create"], + "当前用户没有创建文档类型权限", + ) + if deniedResponse: + return deniedResponse Data = await self.DocumentService.CreateDocumentType(Body=Body) return Result.success(data=Data, message="文档类型创建成功") @self.router.put("/document-types/{TypeId}", response_model=Result[DocumentTypeItemVO]) - async def UpdateDocumentType(TypeId: int, Body: DocumentTypeUpdateDTO): + async def UpdateDocumentType(TypeId: int, Body: DocumentTypeUpdateDTO, payload: dict[str, Any] = Depends(verify_access_token)): """更新文档类型。""" + deniedResponse = await self._deny_document_type_without_permission( + int(payload["user_id"]), + self._DOCUMENT_TYPE_PERMISSIONS["update"], + "当前用户没有更新文档类型权限", + ) + if deniedResponse: + return deniedResponse Data = await self.DocumentService.UpdateDocumentType(Id=TypeId, Body=Body) return Result.success(data=Data, message="文档类型更新成功") @self.router.delete("/document-types/{TypeId}", response_model=Result[None]) - async def DeleteDocumentType(TypeId: int): + async def DeleteDocumentType(TypeId: int, payload: dict[str, Any] = Depends(verify_access_token)): """删除文档类型(软删除)。""" + deniedResponse = await self._deny_document_type_without_permission( + int(payload["user_id"]), + self._DOCUMENT_TYPE_PERMISSIONS["delete"], + "当前用户没有删除文档类型权限", + ) + if deniedResponse: + return deniedResponse await self.DocumentService.DeleteDocumentType(Id=TypeId) return Result.success(message="文档类型已删除") @self.router.get("/v3/document-type-roots", response_model=Result[list[DocumentTypeRootItemVO]]) async def ListDocumentTypeRoots( entry_module_id: int | None = Query(None, description="按入口模块过滤一级大类"), + payload: dict[str, Any] = Depends(verify_access_token), ): """获取一级文档类型(业务大类)列表。""" + deniedResponse = await self._deny_document_type_without_permission( + int(payload["user_id"]), + self._DOCUMENT_TYPE_PERMISSIONS["list"], + "当前用户没有业务大类列表权限", + ) + if deniedResponse: + return deniedResponse Data = await self.DocumentService.ListDocumentTypeRoots(EntryModuleId=entry_module_id) return Result.success(data=Data) @self.router.get("/v3/document-type-roots/{RootId}", response_model=Result[DocumentTypeRootItemVO]) - async def GetDocumentTypeRoot(RootId: int): + async def GetDocumentTypeRoot(RootId: int, payload: dict[str, Any] = Depends(verify_access_token)): """获取一级文档类型(业务大类)详情。""" + deniedResponse = await self._deny_document_type_without_permission( + int(payload["user_id"]), + self._DOCUMENT_TYPE_PERMISSIONS["detail"], + "当前用户没有业务大类详情权限", + ) + if deniedResponse: + return deniedResponse Data = await self.DocumentService.GetDocumentTypeRoot(Id=RootId) return Result.success(data=Data) @self.router.post("/v3/document-type-roots", response_model=Result[DocumentTypeRootItemVO]) - async def CreateDocumentTypeRoot(Body: DocumentTypeRootCreateDTO): + async def CreateDocumentTypeRoot(Body: DocumentTypeRootCreateDTO, payload: dict[str, Any] = Depends(verify_access_token)): """创建一级文档类型(业务大类)。""" + deniedResponse = await self._deny_document_type_without_permission( + int(payload["user_id"]), + self._DOCUMENT_TYPE_PERMISSIONS["create"], + "当前用户没有创建业务大类权限", + ) + if deniedResponse: + return deniedResponse Data = await self.DocumentService.CreateDocumentTypeRoot(Body=Body) return Result.success(data=Data, message="一级文档类型创建成功") @self.router.put("/v3/document-type-roots/{RootId}", response_model=Result[DocumentTypeRootItemVO]) - async def UpdateDocumentTypeRoot(RootId: int, Body: DocumentTypeRootUpdateDTO): + async def UpdateDocumentTypeRoot(RootId: int, Body: DocumentTypeRootUpdateDTO, payload: dict[str, Any] = Depends(verify_access_token)): """更新一级文档类型(业务大类)。""" + deniedResponse = await self._deny_document_type_without_permission( + int(payload["user_id"]), + self._DOCUMENT_TYPE_PERMISSIONS["update"], + "当前用户没有更新业务大类权限", + ) + if deniedResponse: + return deniedResponse Data = await self.DocumentService.UpdateDocumentTypeRoot(Id=RootId, Body=Body) return Result.success(data=Data, message="一级文档类型更新成功") @@ -431,3 +503,11 @@ class DocumentController(BaseController): status_code=403, content={"code": 403, "msg": "当前用户没有查看交叉评查结果权限", "data": None}, ) + + async def _deny_document_type_without_permission(self, UserId: int, PermissionKey: str, Message: str) -> JSONResponse | None: + if await self.PermissionService.CheckPermission(UserId, PermissionKey): + return None + return JSONResponse( + status_code=403, + content={"code": 403, "msg": Message, "data": None}, + ) diff --git a/fastapi_modules/fastapi_leaudit/services/impl/entryModuleAdminServiceImpl.py b/fastapi_modules/fastapi_leaudit/services/impl/entryModuleAdminServiceImpl.py index 56cefb8..2346db8 100644 --- a/fastapi_modules/fastapi_leaudit/services/impl/entryModuleAdminServiceImpl.py +++ b/fastapi_modules/fastapi_leaudit/services/impl/entryModuleAdminServiceImpl.py @@ -51,7 +51,7 @@ _ALLOWED_FEATURES = { _DEFAULT_FEATURES_BY_PROFILE = { "document_review": ["home", "documents", "upload", "rules", "rule_groups"], "contract": ["home", "documents", "upload", "rules", "contract_template_search", "contract_template_list"], - "govdoc": ["home", "govdoc_audits", "govdoc_upload", "rule_groups"], + "govdoc": ["home", "govdoc_audits", "govdoc_upload", "rules"], "cross_checking": ["cross_checking", "cross_checking_upload", "cross_checking_list"], "custom": ["home", "documents"], } @@ -884,6 +884,8 @@ class EntryModuleAdminServiceImpl(IEntryModuleAdminService): feature = str(item or "").strip() if not feature: continue + if MenuProfile == "govdoc" and feature == "rule_groups": + feature = "rules" if feature not in _ALLOWED_FEATURES: invalid.append(feature) continue @@ -915,6 +917,8 @@ class EntryModuleAdminServiceImpl(IEntryModuleAdminService): normalized: list[str] = [] for item in Features: feature = str(item or "").strip() + if MenuProfile == "govdoc" and feature == "rule_groups": + feature = "rules" if feature in _ALLOWED_FEATURES and feature not in normalized: normalized.append(feature) return normalized or list(_DEFAULT_FEATURES_BY_PROFILE.get(MenuProfile, _DEFAULT_FEATURES_BY_PROFILE["document_review"])) diff --git a/fastapi_modules/fastapi_leaudit/services/impl/homeServiceImpl.py b/fastapi_modules/fastapi_leaudit/services/impl/homeServiceImpl.py index 5ce2d16..221f288 100644 --- a/fastapi_modules/fastapi_leaudit/services/impl/homeServiceImpl.py +++ b/fastapi_modules/fastapi_leaudit/services/impl/homeServiceImpl.py @@ -54,7 +54,7 @@ class HomeServiceImpl(IHomeService): _DEFAULT_FEATURES_BY_PROFILE: dict[str, list[str]] = { "document_review": ["home", "documents", "upload", "rules", "rule_groups"], "contract": ["home", "documents", "upload", "rules", "contract_template_search", "contract_template_list"], - "govdoc": ["home", "govdoc_audits", "govdoc_upload", "rule_groups"], + "govdoc": ["home", "govdoc_audits", "govdoc_upload", "rules"], "cross_checking": ["cross_checking", "cross_checking_upload", "cross_checking_list"], "custom": ["home", "documents"], } @@ -553,6 +553,8 @@ class HomeServiceImpl(IHomeService): normalized: list[str] = [] for item in parsed: feature = str(item or "").strip() + if menu_profile == "govdoc" and feature == "rule_groups": + feature = "rules" if feature in allowed_features and feature not in normalized: normalized.append(feature) return normalized or list(cls._DEFAULT_FEATURES_BY_PROFILE[menu_profile]) diff --git a/fastapi_modules/fastapi_leaudit/services/impl/rbacAdminServiceImpl.py b/fastapi_modules/fastapi_leaudit/services/impl/rbacAdminServiceImpl.py index 005ccdd..b71cf89 100644 --- a/fastapi_modules/fastapi_leaudit/services/impl/rbacAdminServiceImpl.py +++ b/fastapi_modules/fastapi_leaudit/services/impl/rbacAdminServiceImpl.py @@ -323,6 +323,18 @@ class RbacAdminServiceImpl(IRbacAdminService): "is_cache": True, "meta": {"group": "settings"}, }, + { + "route_path": "/rule-groups", + "route_name": "rule-groups", + "component": "rule-groups", + "route_title": "评查点分组", + "icon": "ri-node-tree", + "sort_order": 6, + "parent_path": "/settings", + "is_hidden": False, + "is_cache": True, + "meta": {"group": "settings"}, + }, ] _MANAGEABLE_PERMISSION_BLUEPRINTS: list[dict[str, Any]] = [ @@ -332,10 +344,10 @@ class RbacAdminServiceImpl(IRbacAdminService): {"permission_key": "entry_module:update:write", "display_name": "更新入口模块", "module": "entry_module", "resource": "update", "action": "write", "api_method": "PUT", "api_path": "/api/v3/entry-modules/{id}", "route_path": "/entry-modules"}, {"permission_key": "entry_module:delete:delete", "display_name": "删除入口模块", "module": "entry_module", "resource": "delete", "action": "delete", "api_method": "DELETE", "api_path": "/api/v3/entry-modules/{id}", "route_path": "/entry-modules"}, {"permission_key": "entry_module:image:write", "display_name": "上传入口模块图标", "module": "entry_module", "resource": "image", "action": "write", "api_method": "POST", "api_path": "/api/v3/entry-modules/{id}/image", "route_path": "/entry-modules"}, - {"permission_key": "doc_type:list:read", "display_name": "文档类型列表", "module": "doc_type", "resource": "list", "action": "read", "api_method": "GET", "api_path": "/api/document-types", "route_path": "/document-types"}, - {"permission_key": "doc_type:detail:read", "display_name": "文档类型详情", "module": "doc_type", "resource": "detail", "action": "read", "api_method": "GET", "api_path": "/api/document-types/{id}", "route_path": "/document-types"}, - {"permission_key": "doc_type:create:write", "display_name": "创建文档类型", "module": "doc_type", "resource": "create", "action": "write", "api_method": "POST", "api_path": "/api/document-types", "route_path": "/document-types"}, - {"permission_key": "doc_type:update:write", "display_name": "更新文档类型", "module": "doc_type", "resource": "update", "action": "write", "api_method": "PUT", "api_path": "/api/document-types/{id}", "route_path": "/document-types"}, + {"permission_key": "doc_type:list:read", "display_name": "业务大类列表", "module": "doc_type", "resource": "list", "action": "read", "api_method": "GET", "api_path": "/api/v3/document-type-roots", "route_path": "/document-types"}, + {"permission_key": "doc_type:detail:read", "display_name": "业务大类详情", "module": "doc_type", "resource": "detail", "action": "read", "api_method": "GET", "api_path": "/api/v3/document-type-roots/{id}", "route_path": "/document-types"}, + {"permission_key": "doc_type:create:write", "display_name": "创建业务大类", "module": "doc_type", "resource": "create", "action": "write", "api_method": "POST", "api_path": "/api/v3/document-type-roots", "route_path": "/document-types"}, + {"permission_key": "doc_type:update:write", "display_name": "更新业务大类", "module": "doc_type", "resource": "update", "action": "write", "api_method": "PUT", "api_path": "/api/v3/document-type-roots/{id}", "route_path": "/document-types"}, {"permission_key": "doc_type:delete:delete", "display_name": "删除文档类型", "module": "doc_type", "resource": "delete", "action": "delete", "api_method": "DELETE", "api_path": "/api/document-types/{id}", "route_path": "/document-types"}, {"permission_key": "rbac:tenants:read", "display_name": "查看租户列表", "module": "rbac", "resource": "tenants", "action": "read", "api_method": "GET", "api_path": "/api/v3/tenants", "route_path": "/tenants"}, {"permission_key": "rbac:tenants:create", "display_name": "创建租户", "module": "rbac", "resource": "tenants", "action": "create", "api_method": "POST", "api_path": "/api/v3/tenants", "route_path": "/tenants"}, diff --git a/fastapi_modules/fastapi_leaudit/services/impl/rbacServiceImpl.py b/fastapi_modules/fastapi_leaudit/services/impl/rbacServiceImpl.py index 5986fd3..80e0bc6 100644 --- a/fastapi_modules/fastapi_leaudit/services/impl/rbacServiceImpl.py +++ b/fastapi_modules/fastapi_leaudit/services/impl/rbacServiceImpl.py @@ -26,6 +26,7 @@ class RbacServiceImpl(IRbacService): "/files", "/documents", "/rules", + "/rule-groups", "/rules-files", "/settings", "/entry-modules", @@ -322,6 +323,20 @@ class RbacServiceImpl(IRbacService): "meta": {"group": "settings"}, "children": None, }, + { + "id": 1019, + "route_path": "/rule-groups", + "route_name": "rule-groups", + "component": "rule-groups", + "parent_id": 1013, + "route_title": "评查点分组", + "icon": "ri-node-tree", + "sort_order": 6, + "is_hidden": False, + "is_cache": True, + "meta": {"group": "settings"}, + "children": None, + }, ], }, { @@ -738,7 +753,8 @@ class RbacServiceImpl(IRbacService): databaseRoutes = await self._loadDatabaseRoutes(Session, roleIds, grantedPermissions) if self._isFrontendRouteSetReady(databaseRoutes): - routes = self._filterRoutesByMinimalScope(databaseRoutes) + grantedRoutePaths = self._collectCurrentFrontendRoutePaths(databaseRoutes) + routes = self._filterRoutesByRouteAndPermissionScope(databaseRoutes, grantedRoutePaths, grantedPermissions) else: routes = self._buildCompatibilityRoutes(roleKeys, grantedPermissions) @@ -872,6 +888,30 @@ class RbacServiceImpl(IRbacService): filtered.append(routeCopy) return filtered + def _filterRoutesByRouteAndPermissionScope( + self, + Routes: list[RbacRouteVO], + GrantedRoutePaths: set[str], + GrantedPermissions: set[str], + ) -> list[RbacRouteVO]: + """按角色已勾选路由裁剪,接口权限不能替代子路由勾选。""" + filtered: list[RbacRouteVO] = [] + for route in Routes: + if not self._isRoutePathEnabled(route.route_path): + continue + if route.route_path not in GrantedRoutePaths: + continue + + routeCopy = route.model_copy(deep=True) + routeCopy.permissions = self._resolvePermissionsForPath(route.route_path, GrantedPermissions) + routeCopy.children = self._filterRoutesByRouteAndPermissionScope( + route.children or [], + GrantedRoutePaths, + GrantedPermissions, + ) or None + filtered.append(routeCopy) + return filtered + def _filterBlueprintsByMinimalScope(self, Blueprints: list[dict[str, Any]]) -> list[dict[str, Any]]: """按当前最小可用范围裁剪兼容蓝图。""" filtered: list[dict[str, Any]] = [] @@ -953,6 +993,14 @@ class RbacServiceImpl(IRbacService): paths.update(self._collectRoutePaths(route.children)) return paths + def _collectCurrentFrontendRoutePaths(self, Routes: list[RbacRouteVO]) -> set[str]: + """收集当前前端真实路由,旧 govdoc-audit 残留授权不映射成新版子路由。""" + return { + path + for path in self._collectRoutePaths(Routes) + if not path.startswith("/govdoc-audit/") + } + @staticmethod def _normalizeMeta(Meta: Any) -> dict | None: """兼容 meta 为 JSON 字符串、字典或空值的情况。""" diff --git a/tests/test_document_type_permissions.py b/tests/test_document_type_permissions.py new file mode 100644 index 0000000..11b3ab9 --- /dev/null +++ b/tests/test_document_type_permissions.py @@ -0,0 +1,169 @@ +"""文档类型权限控制测试。""" + +import pytest +from starlette.responses import JSONResponse + +from fastapi_modules.fastapi_leaudit.controllers.documentController import DocumentController +from fastapi_modules.fastapi_leaudit.domian.vo.documentVo import ( + DocumentTypeCreateDTO, + DocumentTypeItemVO, + DocumentTypeRootCreateDTO, + DocumentTypeRootItemVO, + DocumentTypeRootUpdateDTO, + DocumentTypeUpdateDTO, +) + + +class _DenyPermissionService: + """拒绝所有权限的测试权限服务。""" + + async def CheckPermission(self, UserId: int, PermissionKey: str) -> bool: + return False + + +class _AllowOnlyPermissionService: + """只允许指定权限的测试权限服务。""" + + def __init__(self, allowed: set[str]) -> None: + self.allowed = allowed + + async def CheckPermission(self, UserId: int, PermissionKey: str) -> bool: + return PermissionKey in self.allowed + + +class _FakeDocumentService: + """记录调用的测试文档服务。""" + + def __init__(self) -> None: + self.calls: list[str] = [] + + async def ListDocumentTypes(self, **kwargs): + self.calls.append("ListDocumentTypes") + return [ + DocumentTypeItemVO( + id=1, + name="合同", + code="contract", + description=None, + entryModuleId=None, + isEnabled=True, + ruleSetIds=[], + ) + ] + + async def GetDocumentType(self, **kwargs): + self.calls.append("GetDocumentType") + return DocumentTypeItemVO( + id=1, + name="合同", + code="contract", + description=None, + entryModuleId=None, + isEnabled=True, + ruleSetIds=[], + ) + + async def CreateDocumentType(self, **kwargs): + self.calls.append("CreateDocumentType") + return await self.GetDocumentType() + + async def UpdateDocumentType(self, **kwargs): + self.calls.append("UpdateDocumentType") + return await self.GetDocumentType() + + async def DeleteDocumentType(self, **kwargs): + self.calls.append("DeleteDocumentType") + + async def ListDocumentTypeRoots(self, **kwargs): + self.calls.append("ListDocumentTypeRoots") + return [ + DocumentTypeRootItemVO( + id=11, + name="合同", + code="root.contract", + description=None, + entryModuleId=None, + entryModuleName=None, + isEnabled=True, + childGroupCount=0, + ruleSetCount=0, + ruleSetIds=[], + ) + ] + + async def GetDocumentTypeRoot(self, **kwargs): + self.calls.append("GetDocumentTypeRoot") + return DocumentTypeRootItemVO( + id=11, + name="合同", + code="root.contract", + description=None, + entryModuleId=None, + entryModuleName=None, + isEnabled=True, + childGroupCount=0, + ruleSetCount=0, + ruleSetIds=[], + ) + + async def CreateDocumentTypeRoot(self, **kwargs): + self.calls.append("CreateDocumentTypeRoot") + return await self.GetDocumentTypeRoot() + + async def UpdateDocumentTypeRoot(self, **kwargs): + self.calls.append("UpdateDocumentTypeRoot") + return await self.GetDocumentTypeRoot() + + +def _find_endpoint(controller: DocumentController, path: str, method: str): + """根据路径和方法查找路由 endpoint。""" + full_path = f"{controller.router.prefix}{path}" + for route in controller.router.routes: + if getattr(route, "path", "") == full_path and method in getattr(route, "methods", set()): + return route.endpoint + raise AssertionError(f"未找到路由 {method} {full_path}") + + +@pytest.mark.asyncio +@pytest.mark.parametrize( + ("path", "method", "kwargs", "expected_call"), + [ + ("/document-types", "GET", {"ids": None, "entry_module_id": None}, "ListDocumentTypes"), + ("/document-types/{TypeId}", "GET", {"TypeId": 1}, "GetDocumentType"), + ("/document-types", "POST", {"Body": DocumentTypeCreateDTO(code="contract", name="合同")}, "CreateDocumentType"), + ("/document-types/{TypeId}", "PUT", {"TypeId": 1, "Body": DocumentTypeUpdateDTO(name="合同")}, "UpdateDocumentType"), + ("/document-types/{TypeId}", "DELETE", {"TypeId": 1}, "DeleteDocumentType"), + ("/v3/document-type-roots", "GET", {"entry_module_id": None}, "ListDocumentTypeRoots"), + ("/v3/document-type-roots/{RootId}", "GET", {"RootId": 11}, "GetDocumentTypeRoot"), + ("/v3/document-type-roots", "POST", {"Body": DocumentTypeRootCreateDTO(code="root.contract", name="合同")}, "CreateDocumentTypeRoot"), + ("/v3/document-type-roots/{RootId}", "PUT", {"RootId": 11, "Body": DocumentTypeRootUpdateDTO(name="合同")}, "UpdateDocumentTypeRoot"), + ], +) +async def test_document_type_endpoints_require_permission(path, method, kwargs, expected_call): + """文档类型和业务大类接口无权限时返回 403,且不调用业务服务。""" + controller = DocumentController() + service = _FakeDocumentService() + controller.DocumentService = service + controller.PermissionService = _DenyPermissionService() + endpoint = _find_endpoint(controller, path, method) + + response = await endpoint(**kwargs, payload={"user_id": 7}) + + assert isinstance(response, JSONResponse) + assert response.status_code == 403 + assert expected_call not in service.calls + + +@pytest.mark.asyncio +async def test_document_type_root_list_calls_service_when_permission_granted(): + """业务大类列表有查看权限时正常调用业务服务。""" + controller = DocumentController() + service = _FakeDocumentService() + controller.DocumentService = service + controller.PermissionService = _AllowOnlyPermissionService({"doc_type:list:read"}) + endpoint = _find_endpoint(controller, "/v3/document-type-roots", "GET") + + response = await endpoint(entry_module_id=None, payload={"user_id": 7}) + + assert response.data[0].id == 11 + assert service.calls == ["ListDocumentTypeRoots"] diff --git a/tests/test_govdoc_permissions.py b/tests/test_govdoc_permissions.py index c264562..cd9c8bb 100644 --- a/tests/test_govdoc_permissions.py +++ b/tests/test_govdoc_permissions.py @@ -145,3 +145,91 @@ def test_govdoc_root_route_marks_frontend_route_set_ready(): ] assert service._isFrontendRouteSetReady(routes) is True + + +def test_govdoc_parent_route_does_not_expose_ungranted_child_routes(): + """只有内部公文父路由和接口权限时,不应补出未勾选的列表/上传子路由。""" + service = RbacServiceImpl() + routes = [ + RbacRouteVO( + id=1, + route_path="/govdoc", + route_name="govdoc", + component="govdoc", + parent_id=None, + route_title="内部公文处理", + children=[ + RbacRouteVO( + id=2, + route_path="/govdoc/audits", + route_name="govdoc-audits", + component="govdoc.audits", + parent_id=1, + route_title="公文列表", + ), + RbacRouteVO( + id=3, + route_path="/govdoc/upload", + route_name="govdoc-upload", + component="govdoc.upload", + parent_id=1, + route_title="公文上传", + ), + ], + ) + ] + + filtered = service._filterRoutesByRouteAndPermissionScope( + routes, + {"/govdoc"}, + {"govdoc:document:read", "govdoc:document:create"}, + ) + paths = service._collectRoutePaths(filtered) + + assert "/govdoc" in paths + assert "/govdoc/audits" not in paths + assert "/govdoc/upload" not in paths + + +def test_legacy_govdoc_audit_route_does_not_grant_current_govdoc_child_route(): + """旧 /govdoc-audit 残留授权不应继续放行当前 /govdoc 子路由。""" + service = RbacServiceImpl() + routes = [ + RbacRouteVO( + id=1, + route_path="/govdoc", + route_name="govdoc", + component="govdoc", + parent_id=None, + route_title="内部公文处理", + ), + RbacRouteVO( + id=2, + route_path="/govdoc-audit/audits", + route_name="legacy-govdoc-audits", + component="govdoc-audit.audits", + parent_id=None, + route_title="旧公文列表", + ), + RbacRouteVO( + id=3, + route_path="/govdoc-audit/upload", + route_name="legacy-govdoc-upload", + component="govdoc-audit.upload", + parent_id=None, + route_title="旧公文上传", + ), + ] + + filtered = service._filterRoutesByRouteAndPermissionScope( + routes, + service._collectCurrentFrontendRoutePaths(routes), + {"govdoc:document:read", "govdoc:document:create"}, + ) + paths = service._collectRoutePaths(filtered) + + assert "/govdoc" in paths + assert "/govdoc-audit/audits" not in paths + assert "/govdoc-audit/upload" not in paths + assert "/govdoc/audits" not in paths + assert "/govdoc/upload" not in paths diff --git a/tests/test_rule_write_scope.py b/tests/test_rule_write_scope.py index add7f40..8eb0d22 100644 --- a/tests/test_rule_write_scope.py +++ b/tests/test_rule_write_scope.py @@ -183,7 +183,7 @@ def test_rbac_manageable_permissions_include_rule_version_lifecycle(): assert "rules:binding_delete:delete" in permission_keys -def test_rbac_rule_group_permissions_are_folded_into_rules_menu(): +def test_rbac_rule_groups_route_is_exposed_under_settings(): route_paths = {item["route_path"] for item in RbacAdminServiceImpl._MANAGEABLE_ROUTE_BLUEPRINTS} group_permission_paths = { item["route_path"] @@ -191,17 +191,19 @@ def test_rbac_rule_group_permissions_are_folded_into_rules_menu(): if item["permission_key"].startswith("evaluation_group:") } - assert "/rule-groups" not in route_paths + assert "/rule-groups" in route_paths assert group_permission_paths == {"/rules"} - -def test_user_route_compat_menu_does_not_expose_rule_groups(): +def test_user_route_compat_menu_exposes_rule_groups_under_settings(): service = RbacServiceImpl() routes = service._buildCompatibilityRoutes(["admin"], {"evaluation_group:list:read", "rules:list:read"}) paths = service._collectRoutePaths(routes) rules_route = next(route for route in routes if route.route_path == "/rules") + settings_route = next(route for route in routes if route.route_path == "/settings") + rule_groups_route = next(route for route in (settings_route.children or []) if route.route_path == "/rule-groups") - assert "/rule-groups" not in paths + assert "/rule-groups" in paths + assert rule_groups_route.parent_id == settings_route.id assert "evaluation_group:list:read" in rules_route.permissions