fix(rules): require evaluation point list read permission for rule packs
This commit is contained in:
@@ -17,6 +17,9 @@ from fastapi_modules.fastapi_leaudit.services.ruleConfigService import IRuleConf
|
|||||||
class RuleConfigController(BaseController):
|
class RuleConfigController(BaseController):
|
||||||
"""规则配置页聚合控制器。"""
|
"""规则配置页聚合控制器。"""
|
||||||
|
|
||||||
|
_LIST_PERMISSION = "evaluation_point:list:read"
|
||||||
|
_CONTENT_PERMISSIONS = ["rules:content:read"]
|
||||||
|
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
super().__init__(prefix="/v3/rule-config-packs", tags=["规则配置"])
|
super().__init__(prefix="/v3/rule-config-packs", tags=["规则配置"])
|
||||||
self.RuleConfigService: IRuleConfigService = GetRuleConfigServiceSingleton()
|
self.RuleConfigService: IRuleConfigService = GetRuleConfigServiceSingleton()
|
||||||
@@ -28,8 +31,8 @@ class RuleConfigController(BaseController):
|
|||||||
payload: dict[str, Any] = Depends(verify_access_token),
|
payload: dict[str, Any] = Depends(verify_access_token),
|
||||||
):
|
):
|
||||||
"""列出规则配置页 pack。"""
|
"""列出规则配置页 pack。"""
|
||||||
if not await self._check_permission(int(payload["user_id"])):
|
if not await self._check_permission(int(payload["user_id"]), [self._LIST_PERMISSION]):
|
||||||
return JSONResponse(status_code=403, content={"code": 403, "msg": "当前用户没有规则配置查看权限", "data": None})
|
return JSONResponse(status_code=403, content={"code": 403, "msg": "当前账号没有评查点列表读取权限", "data": None})
|
||||||
current_user_id = int(payload["user_id"])
|
current_user_id = int(payload["user_id"])
|
||||||
data = await (
|
data = await (
|
||||||
self.RuleConfigService.ListPackSummaries(CurrentUserId=current_user_id)
|
self.RuleConfigService.ListPackSummaries(CurrentUserId=current_user_id)
|
||||||
@@ -41,13 +44,13 @@ class RuleConfigController(BaseController):
|
|||||||
@self.router.get("/{PackId}")
|
@self.router.get("/{PackId}")
|
||||||
async def GetRuleConfigPack(PackId: int, payload: dict[str, Any] = Depends(verify_access_token)):
|
async def GetRuleConfigPack(PackId: int, payload: dict[str, Any] = Depends(verify_access_token)):
|
||||||
"""获取单个规则配置 pack。"""
|
"""获取单个规则配置 pack。"""
|
||||||
if not await self._check_permission(int(payload["user_id"])):
|
if not await self._check_permission(int(payload["user_id"]), self._CONTENT_PERMISSIONS):
|
||||||
return JSONResponse(status_code=403, content={"code": 403, "msg": "当前用户没有规则配置查看权限", "data": None})
|
return JSONResponse(status_code=403, content={"code": 403, "msg": "当前用户没有规则配置查看权限", "data": None})
|
||||||
data = await self.RuleConfigService.GetPack(PackId, CurrentUserId=int(payload["user_id"]))
|
data = await self.RuleConfigService.GetPack(PackId, CurrentUserId=int(payload["user_id"]))
|
||||||
return JSONResponse(status_code=200, content={"code": 200, "message": "success", "data": data.model_dump()})
|
return JSONResponse(status_code=200, content={"code": 200, "message": "success", "data": data.model_dump()})
|
||||||
|
|
||||||
async def _check_permission(self, user_id: int) -> bool:
|
async def _check_permission(self, user_id: int, permission_keys: list[str]) -> bool:
|
||||||
return await self.PermissionService.HasAnyPermission(
|
return await self.PermissionService.HasAnyPermission(
|
||||||
user_id,
|
user_id,
|
||||||
["rules:list:read", "rules:content:read", "evaluation_group:list:read"],
|
permission_keys,
|
||||||
)
|
)
|
||||||
|
|||||||
@@ -363,6 +363,7 @@ class RbacAdminServiceImpl(IRbacAdminService):
|
|||||||
{"permission_key": "contract_template:create:write", "display_name": "上传合同模板", "module": "contract_template", "resource": "create", "action": "write", "api_method": "POST", "api_path": "/api/v3/contract-templates", "route_path": "/contract-template/list"},
|
{"permission_key": "contract_template:create:write", "display_name": "上传合同模板", "module": "contract_template", "resource": "create", "action": "write", "api_method": "POST", "api_path": "/api/v3/contract-templates", "route_path": "/contract-template/list"},
|
||||||
{"permission_key": "contract_template:update:write", "display_name": "更新合同模板", "module": "contract_template", "resource": "update", "action": "write", "api_method": "PUT", "api_path": "/api/v3/contract-templates/{id}", "route_path": "/contract-template/list"},
|
{"permission_key": "contract_template:update:write", "display_name": "更新合同模板", "module": "contract_template", "resource": "update", "action": "write", "api_method": "PUT", "api_path": "/api/v3/contract-templates/{id}", "route_path": "/contract-template/list"},
|
||||||
{"permission_key": "contract_template:delete:delete", "display_name": "删除合同模板", "module": "contract_template", "resource": "delete", "action": "delete", "api_method": "DELETE", "api_path": "/api/v3/contract-templates/{id}", "route_path": "/contract-template/list"},
|
{"permission_key": "contract_template:delete:delete", "display_name": "删除合同模板", "module": "contract_template", "resource": "delete", "action": "delete", "api_method": "DELETE", "api_path": "/api/v3/contract-templates/{id}", "route_path": "/contract-template/list"},
|
||||||
|
{"permission_key": "evaluation_point:list:read", "display_name": "评查点列表", "module": "evaluation_point", "resource": "list", "action": "read", "api_method": "GET", "api_path": "/api/v3/rule-config-packs", "route_path": "/rules"},
|
||||||
{"permission_key": "evaluation_group:list:read", "display_name": "评查点分组列表", "module": "evaluation_group", "resource": "list", "action": "read", "api_method": "GET", "api_path": "/api/v3/evaluation-point-groups", "route_path": "/rules"},
|
{"permission_key": "evaluation_group:list:read", "display_name": "评查点分组列表", "module": "evaluation_group", "resource": "list", "action": "read", "api_method": "GET", "api_path": "/api/v3/evaluation-point-groups", "route_path": "/rules"},
|
||||||
{"permission_key": "evaluation_group:create:write", "display_name": "创建评查点分组", "module": "evaluation_group", "resource": "create", "action": "write", "api_method": "POST", "api_path": "/api/v3/evaluation-point-groups", "route_path": "/rules"},
|
{"permission_key": "evaluation_group:create:write", "display_name": "创建评查点分组", "module": "evaluation_group", "resource": "create", "action": "write", "api_method": "POST", "api_path": "/api/v3/evaluation-point-groups", "route_path": "/rules"},
|
||||||
{"permission_key": "evaluation_group:update:write", "display_name": "更新评查点分组与绑定", "module": "evaluation_group", "resource": "update", "action": "write", "api_method": "PUT", "api_path": "/api/v3/evaluation-point-groups/{id}", "route_path": "/rules"},
|
{"permission_key": "evaluation_group:update:write", "display_name": "更新评查点分组与绑定", "module": "evaluation_group", "resource": "update", "action": "write", "api_method": "PUT", "api_path": "/api/v3/evaluation-point-groups/{id}", "route_path": "/rules"},
|
||||||
|
|||||||
@@ -0,0 +1,130 @@
|
|||||||
|
"""规则配置列表权限控制测试。"""
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
from starlette.responses import JSONResponse
|
||||||
|
|
||||||
|
from fastapi_modules.fastapi_leaudit.controllers.ruleConfigController import RuleConfigController
|
||||||
|
from fastapi_modules.fastapi_leaudit.services.impl.rbacAdminServiceImpl import RbacAdminServiceImpl
|
||||||
|
|
||||||
|
|
||||||
|
class _DenyPermissionService:
|
||||||
|
"""拒绝所有权限的测试权限服务。"""
|
||||||
|
|
||||||
|
async def CheckPermission(self, user_id: int, permission_key: str) -> bool:
|
||||||
|
"""检查单个权限。"""
|
||||||
|
return False
|
||||||
|
|
||||||
|
async def HasAnyPermission(self, UserId: int, PermissionKeys: list[str]) -> bool:
|
||||||
|
"""检查任一权限。"""
|
||||||
|
return False
|
||||||
|
|
||||||
|
|
||||||
|
class _EvaluationPointListOnlyPermissionService:
|
||||||
|
"""只允许评查点列表读取权限。"""
|
||||||
|
|
||||||
|
async def CheckPermission(self, user_id: int, permission_key: str) -> bool:
|
||||||
|
"""检查单个权限。"""
|
||||||
|
return permission_key == "evaluation_point:list:read"
|
||||||
|
|
||||||
|
async def HasAnyPermission(self, UserId: int, PermissionKeys: list[str]) -> bool:
|
||||||
|
"""检查任一权限。"""
|
||||||
|
return "evaluation_point:list:read" in PermissionKeys
|
||||||
|
|
||||||
|
|
||||||
|
class _RulesListOnlyPermissionService:
|
||||||
|
"""只允许规则配置列表权限。"""
|
||||||
|
|
||||||
|
async def CheckPermission(self, user_id: int, permission_key: str) -> bool:
|
||||||
|
"""检查单个权限。"""
|
||||||
|
return permission_key == "rules:list:read"
|
||||||
|
|
||||||
|
async def HasAnyPermission(self, UserId: int, PermissionKeys: list[str]) -> bool:
|
||||||
|
"""检查任一权限。"""
|
||||||
|
return "rules:list:read" in PermissionKeys
|
||||||
|
|
||||||
|
|
||||||
|
class _FakeRuleConfigService:
|
||||||
|
"""记录调用的规则配置服务。"""
|
||||||
|
|
||||||
|
def __init__(self) -> None:
|
||||||
|
self.summary_called = False
|
||||||
|
|
||||||
|
async def ListPackSummaries(self, CurrentUserId: int):
|
||||||
|
"""记录轻量列表调用。"""
|
||||||
|
self.summary_called = True
|
||||||
|
return []
|
||||||
|
|
||||||
|
async def ListPacks(self, CurrentUserId: int):
|
||||||
|
"""记录完整列表调用。"""
|
||||||
|
return []
|
||||||
|
|
||||||
|
async def GetPack(self, PackId: int, CurrentUserId: int):
|
||||||
|
"""不用于本测试。"""
|
||||||
|
raise AssertionError("GetPack should not be called")
|
||||||
|
|
||||||
|
|
||||||
|
def _find_endpoint(controller: RuleConfigController, path: str, method: str):
|
||||||
|
"""根据路径和方法查找路由 endpoint。"""
|
||||||
|
full_path = f"{controller.router.prefix}{path}"
|
||||||
|
for route in controller.router.routes:
|
||||||
|
if getattr(route, "path", "") == full_path and method in getattr(route, "methods", set()):
|
||||||
|
return route.endpoint
|
||||||
|
raise AssertionError(f"未找到路由 {method} {full_path}")
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_rule_config_pack_list_requires_evaluation_point_list_read_permission():
|
||||||
|
"""无评查点列表读取权限时返回 403,且不加载评查点规则数据。"""
|
||||||
|
controller = RuleConfigController()
|
||||||
|
service = _FakeRuleConfigService()
|
||||||
|
controller.RuleConfigService = service
|
||||||
|
controller.PermissionService = _DenyPermissionService()
|
||||||
|
endpoint = _find_endpoint(controller, "", "GET")
|
||||||
|
|
||||||
|
response = await endpoint(summaryOnly=True, payload={"user_id": 7})
|
||||||
|
|
||||||
|
assert isinstance(response, JSONResponse)
|
||||||
|
assert response.status_code == 403
|
||||||
|
assert service.summary_called is False
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_rule_config_pack_list_rejects_rules_list_without_evaluation_point_list_read():
|
||||||
|
"""只有规则配置列表权限不能读取评查点列表数据。"""
|
||||||
|
controller = RuleConfigController()
|
||||||
|
service = _FakeRuleConfigService()
|
||||||
|
controller.RuleConfigService = service
|
||||||
|
controller.PermissionService = _RulesListOnlyPermissionService()
|
||||||
|
endpoint = _find_endpoint(controller, "", "GET")
|
||||||
|
|
||||||
|
response = await endpoint(summaryOnly=True, payload={"user_id": 7})
|
||||||
|
|
||||||
|
assert isinstance(response, JSONResponse)
|
||||||
|
assert response.status_code == 403
|
||||||
|
assert service.summary_called is False
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_rule_config_pack_list_calls_service_with_evaluation_point_list_read_permission():
|
||||||
|
"""有评查点列表读取权限时正常加载规则配置摘要。"""
|
||||||
|
controller = RuleConfigController()
|
||||||
|
service = _FakeRuleConfigService()
|
||||||
|
controller.RuleConfigService = service
|
||||||
|
controller.PermissionService = _EvaluationPointListOnlyPermissionService()
|
||||||
|
endpoint = _find_endpoint(controller, "", "GET")
|
||||||
|
|
||||||
|
response = await endpoint(summaryOnly=True, payload={"user_id": 7})
|
||||||
|
|
||||||
|
assert response.status_code == 200
|
||||||
|
assert service.summary_called is True
|
||||||
|
|
||||||
|
|
||||||
|
def test_rbac_manageable_permissions_include_evaluation_point_list_read():
|
||||||
|
"""角色权限管理中 /rules 必须包含评查点列表读取权限。"""
|
||||||
|
permission_keys = {
|
||||||
|
item["permission_key"]
|
||||||
|
for item in RbacAdminServiceImpl._MANAGEABLE_PERMISSION_BLUEPRINTS
|
||||||
|
if item["route_path"] == "/rules"
|
||||||
|
}
|
||||||
|
|
||||||
|
assert "evaluation_point:list:read" in permission_keys
|
||||||
@@ -169,6 +169,7 @@ def test_rbac_manageable_permissions_include_rule_version_lifecycle():
|
|||||||
if item["route_path"] == "/rules"
|
if item["route_path"] == "/rules"
|
||||||
}
|
}
|
||||||
|
|
||||||
|
assert "evaluation_point:list:read" in permission_keys
|
||||||
assert "rules:list:read" in permission_keys
|
assert "rules:list:read" in permission_keys
|
||||||
assert "rules:version_list:read" in permission_keys
|
assert "rules:version_list:read" in permission_keys
|
||||||
assert "rules:content:read" in permission_keys
|
assert "rules:content:read" in permission_keys
|
||||||
|
|||||||
Reference in New Issue
Block a user