Files
leaudit-platform-backend/fastapi_modules/fastapi_leaudit/leaudit_bridge/ruleValidator.py
T
wren 246c0e5ded feat: complete M1-M3 infrastructure — OSS client, native execution chain, rule lifecycle API, system docs
- M1: unified OSS client (upload/download/presign) + path utils + config
- M2: rule service with validate/create/publish/rollback + binding CRUD endpoints
- M3: native AuditCtx runner, file/rule resolvers, storage adapter with full persistence
- docs: SYSTEM_OVERVIEW.md as comprehensive architecture reference
- fix: double finalize — terminal state now written once by finalize_run
2026-04-28 11:49:55 +08:00

87 lines
3.0 KiB
Python

"""规则 YAML 校验器。"""
from __future__ import annotations
import importlib
import sys
from dataclasses import dataclass
import yaml
from pydantic import ValidationError
from leaudit.dsl.loader import parse_rules_yaml_text
from leaudit.dsl.validator import DSLValidationError, validate as validate_rules
@dataclass(frozen=True)
class RuleValidationPayload:
"""规则校验结果。"""
valid: bool
ruleType: str | None = None
ruleName: str | None = None
versionNo: str | None = None
ruleCount: int = 0
extractCount: int = 0
errors: list[str] | None = None
class RuleValidator:
"""负责规则 YAML 的语法与 DSL 语义校验。"""
_CHECK_MODULES = (
"leaudit.engine.checks.required",
"leaudit.engine.checks.compare",
"leaudit.engine.checks.format_check",
"leaudit.engine.checks.text",
"leaudit.engine.checks.multi_entity",
"leaudit.engine.checks.visual",
"leaudit.engine.checks.external",
"leaudit.engine.checks.assert_check",
"leaudit.engine.checks.code_check",
"leaudit.engine.checks.ai_check",
)
def ValidateYaml(self, YamlText: str) -> RuleValidationPayload:
"""校验 YAML 并返回摘要结果。"""
try:
RulesFile = parse_rules_yaml_text(YamlText)
self._EnsureChecksImported()
validate_rules(RulesFile, registered_primitives=None)
except yaml.YAMLError as Error:
return RuleValidationPayload(valid=False, errors=[f"YAML 语法错误: {Error}"])
except ValidationError as Error:
return RuleValidationPayload(valid=False, errors=[f"Schema 校验失败: {Error}"])
except DSLValidationError as Error:
return RuleValidationPayload(valid=False, errors=[f"DSL 校验失败: {Error}"])
except Exception as Error:
return RuleValidationPayload(valid=False, errors=[f"规则校验失败: {Error}"])
return RuleValidationPayload(
valid=True,
ruleType=RulesFile.metadata.type_id,
ruleName=RulesFile.metadata.name,
versionNo=RulesFile.metadata.version,
ruleCount=len(RulesFile.flat_rules),
extractCount=len(RulesFile.flat_extract),
errors=[],
)
def ParseValidated(self, YamlText: str):
"""解析并返回已通过完整校验的 RulesFile。"""
Validation = self.ValidateYaml(YamlText)
if not Validation.valid:
raise ValueError("; ".join(Validation.errors or ["规则校验失败"]))
return parse_rules_yaml_text(YamlText)
def _EnsureChecksImported(self) -> None:
"""确保所有检查器模块已注册。"""
for ModuleName in self._CHECK_MODULES:
try:
if ModuleName in sys.modules:
importlib.reload(sys.modules[ModuleName])
else:
importlib.import_module(ModuleName)
except Exception:
continue