246c0e5ded
- M1: unified OSS client (upload/download/presign) + path utils + config - M2: rule service with validate/create/publish/rollback + binding CRUD endpoints - M3: native AuditCtx runner, file/rule resolvers, storage adapter with full persistence - docs: SYSTEM_OVERVIEW.md as comprehensive architecture reference - fix: double finalize — terminal state now written once by finalize_run
76 lines
2.3 KiB
Python
76 lines
2.3 KiB
Python
"""Build native leaudit ``AuditCtx`` instances from platform-side inputs."""
|
|
|
|
from __future__ import annotations
|
|
|
|
from dataclasses import dataclass, field
|
|
from typing import Any
|
|
|
|
from leaudit.config.audit_config import AuditConfig
|
|
from leaudit.services.audit_ctx import AuditCtx
|
|
from leaudit.services.audit_services import AuditServices
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class NativeAuditMetadata:
|
|
"""Platform-side metadata kept outside the native ``AuditCtx`` model."""
|
|
|
|
run_id: int
|
|
document_id: int
|
|
document_file_id: int | None = None
|
|
rule_set_id: int | None = None
|
|
rule_version_id: int | None = None
|
|
trigger_user_id: int | None = None
|
|
extras: dict[str, Any] = field(default_factory=dict)
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class NativeAuditBuildInput:
|
|
"""Everything the bridge knows before constructing a native ``AuditCtx``."""
|
|
|
|
metadata: NativeAuditMetadata
|
|
file_path: str
|
|
services: AuditServices
|
|
rules_file: Any | None = None
|
|
page_range: tuple[int, ...] | None = None
|
|
rule_source_path: str | None = None
|
|
force_rules_path: str | None = None
|
|
config_overrides: dict[str, Any] = field(default_factory=dict)
|
|
|
|
|
|
class AuditCtxBuilder:
|
|
"""Translate platform-side run inputs into leaudit's native ``AuditCtx``."""
|
|
|
|
def build(self, payload: NativeAuditBuildInput) -> AuditCtx:
|
|
"""Create a native ``AuditCtx`` ready for ``AuditService.audit``."""
|
|
config = self.build_config(
|
|
force_rules_path=payload.force_rules_path or payload.rule_source_path,
|
|
overrides=payload.config_overrides,
|
|
)
|
|
return AuditCtx(
|
|
document_id=str(payload.metadata.document_id),
|
|
rules_file=payload.rules_file,
|
|
services=payload.services,
|
|
file_path=payload.file_path,
|
|
page_range=payload.page_range,
|
|
config=config,
|
|
)
|
|
|
|
def build_config(
|
|
self,
|
|
*,
|
|
force_rules_path: str | None = None,
|
|
overrides: dict[str, Any] | None = None,
|
|
) -> AuditConfig:
|
|
"""Build native ``AuditConfig`` from platform-side overrides."""
|
|
raw = dict(overrides or {})
|
|
if force_rules_path and "force_rules_path" not in raw:
|
|
raw["force_rules_path"] = force_rules_path
|
|
return AuditConfig(**raw)
|
|
|
|
|
|
__all__ = [
|
|
"AuditCtxBuilder",
|
|
"NativeAuditBuildInput",
|
|
"NativeAuditMetadata",
|
|
]
|