fix: 统一规则配置命名与权限校验 #17

Merged
TanWenyan merged 3 commits from wren-dev into main 2026-05-25 14:55:17 +08:00
10 changed files with 177 additions and 15 deletions
@@ -17,6 +17,9 @@ from fastapi_modules.fastapi_leaudit.services.ruleConfigService import IRuleConf
class RuleConfigController(BaseController):
"""规则配置页聚合控制器。"""
_LIST_PERMISSION = "evaluation_point:list:read"
_CONTENT_PERMISSIONS = ["rules:content:read"]
def __init__(self):
super().__init__(prefix="/v3/rule-config-packs", tags=["规则配置"])
self.RuleConfigService: IRuleConfigService = GetRuleConfigServiceSingleton()
@@ -28,8 +31,8 @@ class RuleConfigController(BaseController):
payload: dict[str, Any] = Depends(verify_access_token),
):
"""列出规则配置页 pack。"""
if not await self._check_permission(int(payload["user_id"])):
return JSONResponse(status_code=403, content={"code": 403, "msg": "当前用户没有规则配置查看权限", "data": None})
if not await self._check_permission(int(payload["user_id"]), [self._LIST_PERMISSION]):
return JSONResponse(status_code=403, content={"code": 403, "msg": "当前账号没有评查点列表读取权限", "data": None})
current_user_id = int(payload["user_id"])
data = await (
self.RuleConfigService.ListPackSummaries(CurrentUserId=current_user_id)
@@ -41,13 +44,13 @@ class RuleConfigController(BaseController):
@self.router.get("/{PackId}")
async def GetRuleConfigPack(PackId: int, payload: dict[str, Any] = Depends(verify_access_token)):
"""获取单个规则配置 pack。"""
if not await self._check_permission(int(payload["user_id"])):
if not await self._check_permission(int(payload["user_id"]), self._CONTENT_PERMISSIONS):
return JSONResponse(status_code=403, content={"code": 403, "msg": "当前用户没有规则配置查看权限", "data": None})
data = await self.RuleConfigService.GetPack(PackId, CurrentUserId=int(payload["user_id"]))
return JSONResponse(status_code=200, content={"code": 200, "message": "success", "data": data.model_dump()})
async def _check_permission(self, user_id: int) -> bool:
async def _check_permission(self, user_id: int, permission_keys: list[str]) -> bool:
return await self.PermissionService.HasAnyPermission(
user_id,
["rules:list:read", "rules:content:read", "evaluation_group:list:read"],
permission_keys,
)
@@ -1,4 +1,4 @@
"""规则管理控制器。"""
"""规则配置控制器。"""
from typing import Any
@@ -30,10 +30,10 @@ from fastapi_modules.fastapi_leaudit.services.permissionService import IPermissi
class RuleController(BaseController):
"""规则管理控制器。"""
"""规则配置控制器。"""
def __init__(self):
super().__init__(prefix="/rule-sets", tags=["规则管理"])
super().__init__(prefix="/rule-sets", tags=["规则配置"])
self.RuleService: IRuleService = GetRuleServiceSingleton()
self.PermissionService: IPermissionService = PermissionServiceImpl()
self._PERMISSIONS = {
@@ -245,7 +245,7 @@ class RbacAdminServiceImpl(IRbacAdminService):
"route_path": "/rules",
"route_name": "rule-management",
"component": "rules",
"route_title": "规则管理",
"route_title": "规则配置",
"icon": "ri-book-3-line",
"sort_order": 70,
"is_hidden": False,
@@ -363,6 +363,7 @@ class RbacAdminServiceImpl(IRbacAdminService):
{"permission_key": "contract_template:create:write", "display_name": "上传合同模板", "module": "contract_template", "resource": "create", "action": "write", "api_method": "POST", "api_path": "/api/v3/contract-templates", "route_path": "/contract-template/list"},
{"permission_key": "contract_template:update:write", "display_name": "更新合同模板", "module": "contract_template", "resource": "update", "action": "write", "api_method": "PUT", "api_path": "/api/v3/contract-templates/{id}", "route_path": "/contract-template/list"},
{"permission_key": "contract_template:delete:delete", "display_name": "删除合同模板", "module": "contract_template", "resource": "delete", "action": "delete", "api_method": "DELETE", "api_path": "/api/v3/contract-templates/{id}", "route_path": "/contract-template/list"},
{"permission_key": "evaluation_point:list:read", "display_name": "评查点列表", "module": "evaluation_point", "resource": "list", "action": "read", "api_method": "GET", "api_path": "/api/v3/rule-config-packs", "route_path": "/rules"},
{"permission_key": "evaluation_group:list:read", "display_name": "评查点分组列表", "module": "evaluation_group", "resource": "list", "action": "read", "api_method": "GET", "api_path": "/api/v3/evaluation-point-groups", "route_path": "/rules"},
{"permission_key": "evaluation_group:create:write", "display_name": "创建评查点分组", "module": "evaluation_group", "resource": "create", "action": "write", "api_method": "POST", "api_path": "/api/v3/evaluation-point-groups", "route_path": "/rules"},
{"permission_key": "evaluation_group:update:write", "display_name": "更新评查点分组与绑定", "module": "evaluation_group", "resource": "update", "action": "write", "api_method": "PUT", "api_path": "/api/v3/evaluation-point-groups/{id}", "route_path": "/rules"},
@@ -159,7 +159,7 @@ class RbacServiceImpl(IRbacService):
"route_name": "rule-management",
"component": "rules",
"parent_id": None,
"route_title": "规则管理",
"route_title": "规则配置",
"icon": "ri-book-3-line",
"sort_order": 5,
"is_hidden": False,
@@ -489,7 +489,7 @@ class RbacServiceImpl(IRbacService):
"route_name": "rule-management",
"component": "rules",
"parent_id": None,
"route_title": "规则管理",
"route_title": "规则配置",
"icon": "ri-book-3-line",
"sort_order": 5,
"is_hidden": False,
Submodule legal-platform-frontend updated: 5d6e6af4a2...469de25dc8
@@ -102,7 +102,7 @@ seed(role_key, permission_key, grant_type, data_scope) AS (
('provincial_admin', 'govdoc:settings:read', 'GRANT', 'ALL'),
('provincial_admin', 'govdoc:settings:update', 'GRANT', 'ALL'),
-- admin: 模块读写 + 规则查看,不含规则管理与配置修改
-- admin: 模块读写 + 规则查看,不含规则配置与配置修改
('admin', 'govdoc:module:read', 'GRANT', 'REGION'),
('admin', 'govdoc:document:create', 'GRANT', 'REGION'),
('admin', 'govdoc:document:read', 'GRANT', 'REGION'),
@@ -135,4 +135,4 @@ ON CONFLICT (role_id, permission_id) DO UPDATE SET
data_scope = EXCLUDED.data_scope,
updated_at = NOW();
COMMIT;
COMMIT;
+1 -1
View File
@@ -34,7 +34,7 @@ VALUES
('/documents/list', 'documents.list', 'documents/list', NULL, '文档列表', 'table', 11, FALSE, TRUE, '{"group":"documents"}'::jsonb, 0, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP, NULL),
('/audit', 'audit', 'Layout', NULL, '评查任务', 'audit', 20, FALSE, TRUE, '{"group":"audit"}'::jsonb, 0, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP, NULL),
('/audit/runs', 'audit.runs', 'audit/runs', NULL, '评查运行', 'history', 21, FALSE, TRUE, '{"group":"audit"}'::jsonb, 0, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP, NULL),
('/rules', 'rules', 'Layout', NULL, '规则管理', 'rule', 30, FALSE, TRUE, '{"group":"rules"}'::jsonb, 0, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP, NULL),
('/rules', 'rules', 'Layout', NULL, '规则配置', 'rule', 30, FALSE, TRUE, '{"group":"rules"}'::jsonb, 0, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP, NULL),
('/system', 'system', 'Layout', NULL, '系统管理', 'setting', 90, FALSE, TRUE, '{"group":"system"}'::jsonb, 0, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP, NULL),
('/system/users', 'system.users', 'system/users', NULL, '用户管理', 'user', 91, FALSE, TRUE, '{"group":"system"}'::jsonb, 0, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP, NULL),
('/system/roles', 'system.roles', 'system/roles', NULL, '角色权限', 'shield', 92, FALSE, TRUE, '{"group":"system"}'::jsonb, 0, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP, NULL),
+27
View File
@@ -1,6 +1,8 @@
"""首页入口可见性测试。"""
from fastapi_modules.fastapi_leaudit.services.impl.entryModuleAdminServiceImpl import EntryModuleAdminServiceImpl
from fastapi_modules.fastapi_leaudit.services.impl.homeServiceImpl import HomeServiceImpl
from fastapi_modules.fastapi_leaudit.services.impl.rbacServiceImpl import RbacServiceImpl
def test_document_entry_targets_are_visible_without_file_management_routes():
@@ -19,3 +21,28 @@ def test_non_document_entry_targets_still_require_route_grant():
assert service._isAllowedTargetPath("/tenants", set()) is False
assert service._isAllowedTargetPath("/cross-checking", set()) is False
assert service._isAllowedTargetPath("/cross-checking", {"/cross-checking"}) is True
def test_govdoc_home_features_use_rule_config_for_legacy_rule_groups():
"""内部公文旧规则分组功能编码应兼容到规则配置。"""
service = HomeServiceImpl()
assert service._parseFeatures(["home", "rule_groups"], "govdoc") == ["home", "rules"]
assert service._parseFeatures([], "govdoc") == ["home", "govdoc_audits", "govdoc_upload", "rules"]
def test_govdoc_admin_features_use_rule_config_for_legacy_rule_groups():
"""入口模块管理读写内部公文功能时兼容旧规则分组编码。"""
service = EntryModuleAdminServiceImpl()
assert service._parseFeatures(["home", "rule_groups"], "govdoc") == ["home", "rules"]
assert service._normalizeFeatures(["rule_groups"], "govdoc") == ["rules"]
def test_rbac_compat_routes_use_rule_config_title():
"""RBAC 兼容菜单里的 /rules 标题统一为规则配置。"""
service = RbacServiceImpl()
routes = service._buildCompatibilityRoutes(["admin"], {"rules:list:read"})
rules_route = next(route for route in routes if route.route_path == "/rules")
assert rules_route.route_title == "规则配置"
+130
View File
@@ -0,0 +1,130 @@
"""规则配置列表权限控制测试。"""
import pytest
from starlette.responses import JSONResponse
from fastapi_modules.fastapi_leaudit.controllers.ruleConfigController import RuleConfigController
from fastapi_modules.fastapi_leaudit.services.impl.rbacAdminServiceImpl import RbacAdminServiceImpl
class _DenyPermissionService:
"""拒绝所有权限的测试权限服务。"""
async def CheckPermission(self, user_id: int, permission_key: str) -> bool:
"""检查单个权限。"""
return False
async def HasAnyPermission(self, UserId: int, PermissionKeys: list[str]) -> bool:
"""检查任一权限。"""
return False
class _EvaluationPointListOnlyPermissionService:
"""只允许评查点列表读取权限。"""
async def CheckPermission(self, user_id: int, permission_key: str) -> bool:
"""检查单个权限。"""
return permission_key == "evaluation_point:list:read"
async def HasAnyPermission(self, UserId: int, PermissionKeys: list[str]) -> bool:
"""检查任一权限。"""
return "evaluation_point:list:read" in PermissionKeys
class _RulesListOnlyPermissionService:
"""只允许规则配置列表权限。"""
async def CheckPermission(self, user_id: int, permission_key: str) -> bool:
"""检查单个权限。"""
return permission_key == "rules:list:read"
async def HasAnyPermission(self, UserId: int, PermissionKeys: list[str]) -> bool:
"""检查任一权限。"""
return "rules:list:read" in PermissionKeys
class _FakeRuleConfigService:
"""记录调用的规则配置服务。"""
def __init__(self) -> None:
self.summary_called = False
async def ListPackSummaries(self, CurrentUserId: int):
"""记录轻量列表调用。"""
self.summary_called = True
return []
async def ListPacks(self, CurrentUserId: int):
"""记录完整列表调用。"""
return []
async def GetPack(self, PackId: int, CurrentUserId: int):
"""不用于本测试。"""
raise AssertionError("GetPack should not be called")
def _find_endpoint(controller: RuleConfigController, path: str, method: str):
"""根据路径和方法查找路由 endpoint。"""
full_path = f"{controller.router.prefix}{path}"
for route in controller.router.routes:
if getattr(route, "path", "") == full_path and method in getattr(route, "methods", set()):
return route.endpoint
raise AssertionError(f"未找到路由 {method} {full_path}")
@pytest.mark.asyncio
async def test_rule_config_pack_list_requires_evaluation_point_list_read_permission():
"""无评查点列表读取权限时返回 403,且不加载评查点规则数据。"""
controller = RuleConfigController()
service = _FakeRuleConfigService()
controller.RuleConfigService = service
controller.PermissionService = _DenyPermissionService()
endpoint = _find_endpoint(controller, "", "GET")
response = await endpoint(summaryOnly=True, payload={"user_id": 7})
assert isinstance(response, JSONResponse)
assert response.status_code == 403
assert service.summary_called is False
@pytest.mark.asyncio
async def test_rule_config_pack_list_rejects_rules_list_without_evaluation_point_list_read():
"""只有规则配置列表权限不能读取评查点列表数据。"""
controller = RuleConfigController()
service = _FakeRuleConfigService()
controller.RuleConfigService = service
controller.PermissionService = _RulesListOnlyPermissionService()
endpoint = _find_endpoint(controller, "", "GET")
response = await endpoint(summaryOnly=True, payload={"user_id": 7})
assert isinstance(response, JSONResponse)
assert response.status_code == 403
assert service.summary_called is False
@pytest.mark.asyncio
async def test_rule_config_pack_list_calls_service_with_evaluation_point_list_read_permission():
"""有评查点列表读取权限时正常加载规则配置摘要。"""
controller = RuleConfigController()
service = _FakeRuleConfigService()
controller.RuleConfigService = service
controller.PermissionService = _EvaluationPointListOnlyPermissionService()
endpoint = _find_endpoint(controller, "", "GET")
response = await endpoint(summaryOnly=True, payload={"user_id": 7})
assert response.status_code == 200
assert service.summary_called is True
def test_rbac_manageable_permissions_include_evaluation_point_list_read():
"""角色权限管理中 /rules 必须包含评查点列表读取权限。"""
permission_keys = {
item["permission_key"]
for item in RbacAdminServiceImpl._MANAGEABLE_PERMISSION_BLUEPRINTS
if item["route_path"] == "/rules"
}
assert "evaluation_point:list:read" in permission_keys
+1
View File
@@ -169,6 +169,7 @@ def test_rbac_manageable_permissions_include_rule_version_lifecycle():
if item["route_path"] == "/rules"
}
assert "evaluation_point:list:read" in permission_keys
assert "rules:list:read" in permission_keys
assert "rules:version_list:read" in permission_keys
assert "rules:content:read" in permission_keys