"""Load leaudit YAML RulesFile from filesystem or MinIO.""" from __future__ import annotations import logging from pathlib import Path from typing import TYPE_CHECKING if TYPE_CHECKING: from leaudit.dsl.schema import RulesFile log = logging.getLogger(__name__) _DEFAULT_RULES_DIR = Path(__file__).resolve().parents[2] / "rules" class RulesLoader: """Load and cache leaudit RulesFile from YAML files.""" def __init__(self, rules_dir: str | Path | None = None) -> None: self._rules_dir = Path(rules_dir) if rules_dir else _DEFAULT_RULES_DIR self._cache: dict[str, RulesFile] = {} def load(self, rules_path: str) -> RulesFile: """Load a RulesFile by relative path under rules_dir, or absolute path.""" from leaudit.dsl.loader import load_rules_file if rules_path in self._cache: return self._cache[rules_path] p = Path(rules_path) if not p.is_absolute(): p = self._rules_dir / p log.info("Loading RulesFile: %s", p) rules_file = load_rules_file(p) self._cache[rules_path] = rules_file return rules_file def load_from_yaml_text(self, yaml_text: str, cache_key: str | None = None) -> RulesFile: """Parse a RulesFile from raw YAML string.""" from leaudit.dsl.loader import parse_rules_yaml_text if cache_key and cache_key in self._cache: return self._cache[cache_key] rules_file = parse_rules_yaml_text(yaml_text) if cache_key: self._cache[cache_key] = rules_file return rules_file def clear_cache(self) -> None: self._cache.clear()