"""规则配置列表权限控制测试。""" import pytest from starlette.responses import JSONResponse from fastapi_modules.fastapi_leaudit.controllers.ruleConfigController import RuleConfigController from fastapi_modules.fastapi_leaudit.services.impl.rbacAdminServiceImpl import RbacAdminServiceImpl class _DenyPermissionService: """拒绝所有权限的测试权限服务。""" async def CheckPermission(self, user_id: int, permission_key: str) -> bool: """检查单个权限。""" return False async def HasAnyPermission(self, UserId: int, PermissionKeys: list[str]) -> bool: """检查任一权限。""" return False class _EvaluationPointListOnlyPermissionService: """只允许评查点列表读取权限。""" async def CheckPermission(self, user_id: int, permission_key: str) -> bool: """检查单个权限。""" return permission_key == "evaluation_point:list:read" async def HasAnyPermission(self, UserId: int, PermissionKeys: list[str]) -> bool: """检查任一权限。""" return "evaluation_point:list:read" in PermissionKeys class _RulesListOnlyPermissionService: """只允许规则配置列表权限。""" async def CheckPermission(self, user_id: int, permission_key: str) -> bool: """检查单个权限。""" return permission_key == "rules:list:read" async def HasAnyPermission(self, UserId: int, PermissionKeys: list[str]) -> bool: """检查任一权限。""" return "rules:list:read" in PermissionKeys class _FakeRuleConfigService: """记录调用的规则配置服务。""" def __init__(self) -> None: self.summary_called = False async def ListPackSummaries(self, CurrentUserId: int): """记录轻量列表调用。""" self.summary_called = True return [] async def ListPacks(self, CurrentUserId: int): """记录完整列表调用。""" return [] async def GetPack(self, PackId: int, CurrentUserId: int): """不用于本测试。""" raise AssertionError("GetPack should not be called") def _find_endpoint(controller: RuleConfigController, path: str, method: str): """根据路径和方法查找路由 endpoint。""" full_path = f"{controller.router.prefix}{path}" for route in controller.router.routes: if getattr(route, "path", "") == full_path and method in getattr(route, "methods", set()): return route.endpoint raise AssertionError(f"未找到路由 {method} {full_path}") @pytest.mark.asyncio async def test_rule_config_pack_list_requires_evaluation_point_list_read_permission(): """无评查点列表读取权限时返回 403,且不加载评查点规则数据。""" controller = RuleConfigController() service = _FakeRuleConfigService() controller.RuleConfigService = service controller.PermissionService = _DenyPermissionService() endpoint = _find_endpoint(controller, "", "GET") response = await endpoint(summaryOnly=True, payload={"user_id": 7}) assert isinstance(response, JSONResponse) assert response.status_code == 403 assert service.summary_called is False @pytest.mark.asyncio async def test_rule_config_pack_list_rejects_rules_list_without_evaluation_point_list_read(): """只有规则配置列表权限不能读取评查点列表数据。""" controller = RuleConfigController() service = _FakeRuleConfigService() controller.RuleConfigService = service controller.PermissionService = _RulesListOnlyPermissionService() endpoint = _find_endpoint(controller, "", "GET") response = await endpoint(summaryOnly=True, payload={"user_id": 7}) assert isinstance(response, JSONResponse) assert response.status_code == 403 assert service.summary_called is False @pytest.mark.asyncio async def test_rule_config_pack_list_calls_service_with_evaluation_point_list_read_permission(): """有评查点列表读取权限时正常加载规则配置摘要。""" controller = RuleConfigController() service = _FakeRuleConfigService() controller.RuleConfigService = service controller.PermissionService = _EvaluationPointListOnlyPermissionService() endpoint = _find_endpoint(controller, "", "GET") response = await endpoint(summaryOnly=True, payload={"user_id": 7}) assert response.status_code == 200 assert service.summary_called is True def test_rbac_manageable_permissions_include_evaluation_point_list_read(): """角色权限管理中 /rules 必须包含评查点列表读取权限。""" permission_keys = { item["permission_key"] for item in RbacAdminServiceImpl._MANAGEABLE_PERMISSION_BLUEPRINTS if item["route_path"] == "/rules" } assert "evaluation_point:list:read" in permission_keys