Files
leaudit-platform-backend/fastapi_modules/fastapi_leaudit/leaudit_bridge/rules_loader.py
T
wren 535d97a70c chore: initial commit — leaudit-platform project skeleton
17-table PostgreSQL schema with full Chinese column comments,
FastAPI project structure (admin/common/modules),
DSL rule files, and schema migration scripts.
2026-04-27 16:48:22 +08:00

56 lines
1.6 KiB
Python

"""Load leaudit YAML RulesFile from filesystem or MinIO."""
from __future__ import annotations
import logging
from pathlib import Path
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from leaudit.dsl.schema import RulesFile
log = logging.getLogger(__name__)
_DEFAULT_RULES_DIR = Path(__file__).resolve().parents[2] / "rules"
class RulesLoader:
"""Load and cache leaudit RulesFile from YAML files."""
def __init__(self, rules_dir: str | Path | None = None) -> None:
self._rules_dir = Path(rules_dir) if rules_dir else _DEFAULT_RULES_DIR
self._cache: dict[str, RulesFile] = {}
def load(self, rules_path: str) -> RulesFile:
"""Load a RulesFile by relative path under rules_dir, or absolute path."""
from leaudit.dsl.loader import load_rules_file
if rules_path in self._cache:
return self._cache[rules_path]
p = Path(rules_path)
if not p.is_absolute():
p = self._rules_dir / p
log.info("Loading RulesFile: %s", p)
rules_file = load_rules_file(p)
self._cache[rules_path] = rules_file
return rules_file
def load_from_yaml_text(self, yaml_text: str, cache_key: str | None = None) -> RulesFile:
"""Parse a RulesFile from raw YAML string."""
from leaudit.dsl.loader import parse_rules_yaml_text
if cache_key and cache_key in self._cache:
return self._cache[cache_key]
rules_file = parse_rules_yaml_text(yaml_text)
if cache_key:
self._cache[cache_key] = rules_file
return rules_file
def clear_cache(self) -> None:
self._cache.clear()