535d97a70c
17-table PostgreSQL schema with full Chinese column comments, FastAPI project structure (admin/common/modules), DSL rule files, and schema migration scripts.
56 lines
1.6 KiB
Python
56 lines
1.6 KiB
Python
"""Load leaudit YAML RulesFile from filesystem or MinIO."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import logging
|
|
from pathlib import Path
|
|
from typing import TYPE_CHECKING
|
|
|
|
if TYPE_CHECKING:
|
|
from leaudit.dsl.schema import RulesFile
|
|
|
|
log = logging.getLogger(__name__)
|
|
|
|
_DEFAULT_RULES_DIR = Path(__file__).resolve().parents[2] / "rules"
|
|
|
|
|
|
class RulesLoader:
|
|
"""Load and cache leaudit RulesFile from YAML files."""
|
|
|
|
def __init__(self, rules_dir: str | Path | None = None) -> None:
|
|
self._rules_dir = Path(rules_dir) if rules_dir else _DEFAULT_RULES_DIR
|
|
self._cache: dict[str, RulesFile] = {}
|
|
|
|
def load(self, rules_path: str) -> RulesFile:
|
|
"""Load a RulesFile by relative path under rules_dir, or absolute path."""
|
|
from leaudit.dsl.loader import load_rules_file
|
|
|
|
if rules_path in self._cache:
|
|
return self._cache[rules_path]
|
|
|
|
p = Path(rules_path)
|
|
if not p.is_absolute():
|
|
p = self._rules_dir / p
|
|
|
|
log.info("Loading RulesFile: %s", p)
|
|
rules_file = load_rules_file(p)
|
|
self._cache[rules_path] = rules_file
|
|
return rules_file
|
|
|
|
def load_from_yaml_text(self, yaml_text: str, cache_key: str | None = None) -> RulesFile:
|
|
"""Parse a RulesFile from raw YAML string."""
|
|
from leaudit.dsl.loader import parse_rules_yaml_text
|
|
|
|
if cache_key and cache_key in self._cache:
|
|
return self._cache[cache_key]
|
|
|
|
rules_file = parse_rules_yaml_text(yaml_text)
|
|
|
|
if cache_key:
|
|
self._cache[cache_key] = rules_file
|
|
|
|
return rules_file
|
|
|
|
def clear_cache(self) -> None:
|
|
self._cache.clear()
|