"""Build leaudit execution context from docauditai document data. Currently leaudit's pipeline in docauditai bypasses leaudit's own ``AuditCtx`` / ``AuditService`` and calls engine modules directly. This module encapsulates the pre-execution setup that currently lives inlined in ``pipeline.py`` and ``tasks.py``: - Resolve local file path (download from OSS to temp if needed) - Determine RulesFile (from document metadata, type binding, or content classification) - Prepare OCR/LLM/VLM client references """ from __future__ import annotations import logging import os import tempfile from dataclasses import dataclass, field from pathlib import Path from typing import TYPE_CHECKING from leaudit.dsl.schema import RulesFile from leaudit.llm.base import BaseLLMClient from leaudit.ocr.base import BaseOCRClient if TYPE_CHECKING: from leaudit.llm.vlm_base import BaseVLMClient log = logging.getLogger(__name__) @dataclass class ExecutionContext: """Everything leaudit needs to run for one document.""" document_id: int file_path: Path rules_file: RulesFile ocr_client: BaseOCRClient llm_client: BaseLLMClient | None = None vlm_client: object | None = None source_port: int = 8000 tmp_path: Path | None = None metadata: dict = field(default_factory=dict) def cleanup(self) -> None: """Remove temporary file if one was created.""" if self.tmp_path is not None: try: os.remove(self.tmp_path) except OSError: pass class CtxBuilder: """Build :class:`ExecutionContext` from docauditai document data. Handles the glue between docauditai's document model and leaudit's execution expectations — primarily file-path resolution and rules selection. """ def __init__( self, ocr_client: BaseOCRClient | None = None, llm_client: BaseLLMClient | None = None, vlm_client: object | None = None, ) -> None: self.ocr_client = ocr_client self.llm_client = llm_client self.vlm_client = vlm_client async def build( self, document_id: int, file_path: str | Path | None = None, file_content: bytes | None = None, filename: str | None = None, rules_file: RulesFile | None = None, *, source_port: int = 8000, ) -> ExecutionContext: """Build a ready-to-use execution context. At least one of *file_path* or (*file_content* + *filename*) must be provided. Args: document_id: docauditai document ID. file_path: Existing local path to the document file. file_content: Raw bytes (from DB or OSS) — a temp file is created. filename: Required when *file_content* is given. rules_file: Pre-loaded RulesFile. When None, the caller must resolve after OCR classification. source_port: Instance port. Returns: ExecutionContext ready for pipeline.run(). """ tmp_path: Path | None = None if file_path is not None: resolved = Path(file_path) elif file_content is not None and filename is not None: suffix = self._suffix(filename) tmp = tempfile.NamedTemporaryFile(suffix=suffix, delete=False) tmp.write(file_content) tmp.close() resolved = Path(tmp.name) tmp_path = resolved else: raise ValueError( "Either file_path or (file_content + filename) is required" ) return ExecutionContext( document_id=document_id, file_path=resolved, rules_file=rules_file, # type: ignore[arg-type] ocr_client=self.ocr_client, # type: ignore[arg-type] llm_client=self.llm_client, vlm_client=self.vlm_client, source_port=source_port, tmp_path=tmp_path, ) @staticmethod def _suffix(filename: str) -> str: _, ext = os.path.splitext(filename) return ext if ext else ".pdf"