Files

188 lines
6.8 KiB
Python

"""Native AuditCtx runner for the platform bridge.
This module is the target execution path after deprecating the old
platform-side hand-written pipeline orchestration.
"""
from __future__ import annotations
from dataclasses import dataclass, field
from typing import Any
from leaudit.services.audit_ctx import AuditCtx
from fastapi_modules.fastapi_leaudit.leaudit_bridge.auditCtxBuilder import (
AuditCtxBuilder,
NativeAuditBuildInput,
NativeAuditMetadata,
)
from fastapi_modules.fastapi_leaudit.leaudit_bridge.auditServiceFactory import (
AuditServiceFactory,
NativeServiceBundle,
)
from fastapi_modules.fastapi_leaudit.leaudit_bridge.storage_adapter import StorageAdapter
@dataclass(frozen=True)
class NativeRunRequest:
"""Platform-side request payload for one native leaudit run."""
metadata: NativeAuditMetadata
local_file_path: str
rules_file: Any | None = None
rule_source_path: str | None = None
rules_path_override: str | None = None
page_range: tuple[int, ...] | None = None
config_overrides: dict[str, Any] = field(default_factory=dict)
@dataclass(frozen=True)
class NativeRunResult:
"""Result of one native ``AuditService.audit(ctx)`` execution."""
ctx: AuditCtx
service_bundle: NativeServiceBundle
metadata: NativeAuditMetadata
class NativeRunner:
"""Bridge-side runner that delegates orchestration to native leaudit."""
def __init__(
self,
*,
service_factory: AuditServiceFactory | None = None,
ctx_builder: AuditCtxBuilder | None = None,
storage_adapter: StorageAdapter | None = None,
) -> None:
self.service_factory = service_factory or AuditServiceFactory()
self.ctx_builder = ctx_builder or AuditCtxBuilder()
self.storage = storage_adapter or StorageAdapter()
async def run(self, request: NativeRunRequest) -> NativeRunResult:
"""Execute one native leaudit run and return the populated ctx.
Persistence is intentionally not mixed into the orchestration step.
The caller can choose when to persist the final ctx to platform tables.
"""
bundle = self.service_factory.create_bundle(
rules_path=request.rules_path_override or request.rule_source_path
)
ctx = self.ctx_builder.build(
NativeAuditBuildInput(
metadata=request.metadata,
file_path=request.local_file_path,
services=bundle.audit_services,
rules_file=request.rules_file,
page_range=request.page_range,
rule_source_path=request.rule_source_path,
force_rules_path=request.rules_path_override,
config_overrides=request.config_overrides,
)
)
ctx = await bundle.audit_service.audit(ctx)
return NativeRunResult(
ctx=ctx,
service_bundle=bundle,
metadata=request.metadata,
)
async def persist_result(self, result: NativeRunResult) -> None:
"""Persist a native run into platform-owned ``leaudit_*`` tables.
"""
document_id = result.metadata.document_id
run_id = result.metadata.run_id
ctx = result.ctx
extraction_errors = list(ctx.extraction_errors)
if not extraction_errors and ctx.extraction is not None:
extraction_errors = list(ctx.extraction.all_errors)
if ctx.normalized_doc is not None:
await self.storage.save_ocr_result(
document_id,
ctx.normalized_doc,
run_id=run_id,
)
if ctx.extraction is not None:
await self.storage.save_extraction_result(
document_id,
ctx.extraction,
ocr_result=ctx.normalized_doc,
run_id=run_id,
)
if ctx.evaluation is not None and ctx.rules_file is not None and ctx.extraction is not None:
await self.storage.save_evaluation_results(
document_id,
ctx.rules_file,
ctx.evaluation,
ctx.extraction,
ocr_result=ctx.normalized_doc,
run_id=run_id,
rule_version_id=result.metadata.rule_version_id,
)
if extraction_errors:
await self.storage.save_run_errors(
document_id,
run_id=run_id,
stage=ctx.phase or "extract",
messages=extraction_errors,
level="warning",
error_code="EXTRACTION_WARNING",
)
if ctx.fallback_tasks:
await self.storage.save_rescue_outcomes(
document_id,
run_id=run_id,
tasks=ctx.fallback_tasks,
)
await self.storage.save_run_metrics(
document_id,
run_id=run_id,
timing=dict(ctx.timing),
page_count=len(ctx.normalized_doc.pages) if ctx.normalized_doc is not None else None,
sub_document_count=len(ctx.extraction.sub_documents) if ctx.extraction is not None and getattr(ctx.extraction, "sub_documents", None) else 0,
field_count=len(ctx.extraction.fields) if ctx.extraction is not None else 0,
rule_count=len(ctx.evaluation.rules) if ctx.evaluation is not None else (len(ctx.rules_file.flat_rules) if ctx.rules_file is not None else 0),
rescue_rule_count=len(ctx.fallback_tasks),
artifact_count=self._estimate_artifact_count(ctx),
)
result_status = "review" if any(task.requires_human_review for task in ctx.fallback_tasks) else self._resolve_result_status(ctx)
await self.storage.finalize_run(
document_id,
run_id=run_id,
result_status=result_status,
rescue_applied=bool(ctx.fallback_tasks),
phase=ctx.phase,
finished=True,
)
def _estimate_artifact_count(self, ctx: AuditCtx) -> int:
"""粗略估算当前运行已经产出的平台产物数。"""
count = 0
if ctx.normalized_doc is not None:
count += 1
if ctx.extraction is not None:
count += 1
if ctx.evaluation is not None:
count += 1
if ctx.fallback_tasks:
count += len(ctx.fallback_tasks)
return count
def _resolve_result_status(self, ctx: AuditCtx) -> str:
"""按原生 AuditCtx 结果推导运行状态。"""
if ctx.evaluation is None:
return "error"
if ctx.evaluation.errors:
return "error"
if ctx.evaluation.failed_count == 0 and ctx.evaluation.skipped_count == 0:
return "pass"
if ctx.evaluation.failed_count > 0:
return "fail"
return "partial"
__all__ = ["NativeRunRequest", "NativeRunResult", "NativeRunner"]