57 lines
2.9 KiB
Python
57 lines
2.9 KiB
Python
"""规则配置页聚合控制器。"""
|
|
|
|
from typing import Any
|
|
|
|
from fastapi import Depends, Query
|
|
from fastapi.responses import JSONResponse
|
|
|
|
from fastapi_common.fastapi_common_security.security import verify_access_token
|
|
from fastapi_common.fastapi_common_web.controller import BaseController
|
|
|
|
from fastapi_modules.fastapi_leaudit.services.impl.permissionServiceImpl import PermissionServiceImpl
|
|
from fastapi_modules.fastapi_leaudit.services.impl.ruleConfigServiceImpl import GetRuleConfigServiceSingleton
|
|
from fastapi_modules.fastapi_leaudit.services.permissionService import IPermissionService
|
|
from fastapi_modules.fastapi_leaudit.services.ruleConfigService import IRuleConfigService
|
|
|
|
|
|
class RuleConfigController(BaseController):
|
|
"""规则配置页聚合控制器。"""
|
|
|
|
_LIST_PERMISSION = "evaluation_point:list:read"
|
|
_CONTENT_PERMISSIONS = ["rules:content:read"]
|
|
|
|
def __init__(self):
|
|
super().__init__(prefix="/v3/rule-config-packs", tags=["规则配置"])
|
|
self.RuleConfigService: IRuleConfigService = GetRuleConfigServiceSingleton()
|
|
self.PermissionService: IPermissionService = PermissionServiceImpl()
|
|
|
|
@self.router.get("")
|
|
async def ListRuleConfigPacks(
|
|
summaryOnly: bool = Query(False, description="是否返回轻量规则列表摘要"),
|
|
payload: dict[str, Any] = Depends(verify_access_token),
|
|
):
|
|
"""列出规则配置页 pack。"""
|
|
if not await self._check_permission(int(payload["user_id"]), [self._LIST_PERMISSION]):
|
|
return JSONResponse(status_code=403, content={"code": 403, "msg": "当前账号没有评查点列表读取权限", "data": None})
|
|
current_user_id = int(payload["user_id"])
|
|
data = await (
|
|
self.RuleConfigService.ListPackSummaries(CurrentUserId=current_user_id)
|
|
if summaryOnly
|
|
else self.RuleConfigService.ListPacks(CurrentUserId=current_user_id)
|
|
)
|
|
return JSONResponse(status_code=200, content={"code": 200, "message": "success", "data": [item.model_dump() for item in data]})
|
|
|
|
@self.router.get("/{PackId}")
|
|
async def GetRuleConfigPack(PackId: int, payload: dict[str, Any] = Depends(verify_access_token)):
|
|
"""获取单个规则配置 pack。"""
|
|
if not await self._check_permission(int(payload["user_id"]), self._CONTENT_PERMISSIONS):
|
|
return JSONResponse(status_code=403, content={"code": 403, "msg": "当前用户没有规则配置查看权限", "data": None})
|
|
data = await self.RuleConfigService.GetPack(PackId, CurrentUserId=int(payload["user_id"]))
|
|
return JSONResponse(status_code=200, content={"code": 200, "message": "success", "data": data.model_dump()})
|
|
|
|
async def _check_permission(self, user_id: int, permission_keys: list[str]) -> bool:
|
|
return await self.PermissionService.HasAnyPermission(
|
|
user_id,
|
|
permission_keys,
|
|
)
|