246c0e5ded
- M1: unified OSS client (upload/download/presign) + path utils + config - M2: rule service with validate/create/publish/rollback + binding CRUD endpoints - M3: native AuditCtx runner, file/rule resolvers, storage adapter with full persistence - docs: SYSTEM_OVERVIEW.md as comprehensive architecture reference - fix: double finalize — terminal state now written once by finalize_run
87 lines
3.0 KiB
Python
87 lines
3.0 KiB
Python
"""规则 YAML 校验器。"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import importlib
|
|
import sys
|
|
from dataclasses import dataclass
|
|
|
|
import yaml
|
|
from pydantic import ValidationError
|
|
|
|
from leaudit.dsl.loader import parse_rules_yaml_text
|
|
from leaudit.dsl.validator import DSLValidationError, validate as validate_rules
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class RuleValidationPayload:
|
|
"""规则校验结果。"""
|
|
|
|
valid: bool
|
|
ruleType: str | None = None
|
|
ruleName: str | None = None
|
|
versionNo: str | None = None
|
|
ruleCount: int = 0
|
|
extractCount: int = 0
|
|
errors: list[str] | None = None
|
|
|
|
|
|
class RuleValidator:
|
|
"""负责规则 YAML 的语法与 DSL 语义校验。"""
|
|
|
|
_CHECK_MODULES = (
|
|
"leaudit.engine.checks.required",
|
|
"leaudit.engine.checks.compare",
|
|
"leaudit.engine.checks.format_check",
|
|
"leaudit.engine.checks.text",
|
|
"leaudit.engine.checks.multi_entity",
|
|
"leaudit.engine.checks.visual",
|
|
"leaudit.engine.checks.external",
|
|
"leaudit.engine.checks.assert_check",
|
|
"leaudit.engine.checks.code_check",
|
|
"leaudit.engine.checks.ai_check",
|
|
)
|
|
|
|
def ValidateYaml(self, YamlText: str) -> RuleValidationPayload:
|
|
"""校验 YAML 并返回摘要结果。"""
|
|
try:
|
|
RulesFile = parse_rules_yaml_text(YamlText)
|
|
self._EnsureChecksImported()
|
|
validate_rules(RulesFile, registered_primitives=None)
|
|
except yaml.YAMLError as Error:
|
|
return RuleValidationPayload(valid=False, errors=[f"YAML 语法错误: {Error}"])
|
|
except ValidationError as Error:
|
|
return RuleValidationPayload(valid=False, errors=[f"Schema 校验失败: {Error}"])
|
|
except DSLValidationError as Error:
|
|
return RuleValidationPayload(valid=False, errors=[f"DSL 校验失败: {Error}"])
|
|
except Exception as Error:
|
|
return RuleValidationPayload(valid=False, errors=[f"规则校验失败: {Error}"])
|
|
|
|
return RuleValidationPayload(
|
|
valid=True,
|
|
ruleType=RulesFile.metadata.type_id,
|
|
ruleName=RulesFile.metadata.name,
|
|
versionNo=RulesFile.metadata.version,
|
|
ruleCount=len(RulesFile.flat_rules),
|
|
extractCount=len(RulesFile.flat_extract),
|
|
errors=[],
|
|
)
|
|
|
|
def ParseValidated(self, YamlText: str):
|
|
"""解析并返回已通过完整校验的 RulesFile。"""
|
|
Validation = self.ValidateYaml(YamlText)
|
|
if not Validation.valid:
|
|
raise ValueError("; ".join(Validation.errors or ["规则校验失败"]))
|
|
return parse_rules_yaml_text(YamlText)
|
|
|
|
def _EnsureChecksImported(self) -> None:
|
|
"""确保所有检查器模块已注册。"""
|
|
for ModuleName in self._CHECK_MODULES:
|
|
try:
|
|
if ModuleName in sys.modules:
|
|
importlib.reload(sys.modules[ModuleName])
|
|
else:
|
|
importlib.import_module(ModuleName)
|
|
except Exception:
|
|
continue
|