246c0e5ded
- M1: unified OSS client (upload/download/presign) + path utils + config - M2: rule service with validate/create/publish/rollback + binding CRUD endpoints - M3: native AuditCtx runner, file/rule resolvers, storage adapter with full persistence - docs: SYSTEM_OVERVIEW.md as comprehensive architecture reference - fix: double finalize — terminal state now written once by finalize_run
186 lines
6.7 KiB
Python
186 lines
6.7 KiB
Python
"""Native AuditCtx runner for the platform bridge.
|
|
|
|
This module is the target execution path after deprecating the old
|
|
platform-side hand-written pipeline orchestration.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
from dataclasses import dataclass, field
|
|
from typing import Any
|
|
|
|
from leaudit.services.audit_ctx import AuditCtx
|
|
|
|
from fastapi_modules.fastapi_leaudit.leaudit_bridge.auditCtxBuilder import (
|
|
AuditCtxBuilder,
|
|
NativeAuditBuildInput,
|
|
NativeAuditMetadata,
|
|
)
|
|
from fastapi_modules.fastapi_leaudit.leaudit_bridge.auditServiceFactory import (
|
|
AuditServiceFactory,
|
|
NativeServiceBundle,
|
|
)
|
|
from fastapi_modules.fastapi_leaudit.leaudit_bridge.storage_adapter import StorageAdapter
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class NativeRunRequest:
|
|
"""Platform-side request payload for one native leaudit run."""
|
|
|
|
metadata: NativeAuditMetadata
|
|
local_file_path: str
|
|
rules_file: Any | None = None
|
|
rule_source_path: str | None = None
|
|
rules_path_override: str | None = None
|
|
page_range: tuple[int, ...] | None = None
|
|
config_overrides: dict[str, Any] = field(default_factory=dict)
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class NativeRunResult:
|
|
"""Result of one native ``AuditService.audit(ctx)`` execution."""
|
|
|
|
ctx: AuditCtx
|
|
service_bundle: NativeServiceBundle
|
|
metadata: NativeAuditMetadata
|
|
|
|
|
|
class NativeRunner:
|
|
"""Bridge-side runner that delegates orchestration to native leaudit."""
|
|
|
|
def __init__(
|
|
self,
|
|
*,
|
|
service_factory: AuditServiceFactory | None = None,
|
|
ctx_builder: AuditCtxBuilder | None = None,
|
|
storage_adapter: StorageAdapter | None = None,
|
|
) -> None:
|
|
self.service_factory = service_factory or AuditServiceFactory()
|
|
self.ctx_builder = ctx_builder or AuditCtxBuilder()
|
|
self.storage = storage_adapter or StorageAdapter()
|
|
|
|
async def run(self, request: NativeRunRequest) -> NativeRunResult:
|
|
"""Execute one native leaudit run and return the populated ctx.
|
|
|
|
Persistence is intentionally not mixed into the orchestration step.
|
|
The caller can choose when to persist the final ctx to platform tables.
|
|
"""
|
|
bundle = self.service_factory.create_bundle(
|
|
rules_path=request.rules_path_override or request.rule_source_path
|
|
)
|
|
ctx = self.ctx_builder.build(
|
|
NativeAuditBuildInput(
|
|
metadata=request.metadata,
|
|
file_path=request.local_file_path,
|
|
services=bundle.audit_services,
|
|
rules_file=request.rules_file,
|
|
page_range=request.page_range,
|
|
rule_source_path=request.rule_source_path,
|
|
force_rules_path=request.rules_path_override,
|
|
config_overrides=request.config_overrides,
|
|
)
|
|
)
|
|
ctx = await bundle.audit_service.audit(ctx)
|
|
return NativeRunResult(
|
|
ctx=ctx,
|
|
service_bundle=bundle,
|
|
metadata=request.metadata,
|
|
)
|
|
|
|
async def persist_result(self, result: NativeRunResult) -> None:
|
|
"""Persist a native run into platform-owned ``leaudit_*`` tables.
|
|
"""
|
|
document_id = result.metadata.document_id
|
|
run_id = result.metadata.run_id
|
|
ctx = result.ctx
|
|
extraction_errors = list(ctx.extraction_errors)
|
|
if not extraction_errors and ctx.extraction is not None:
|
|
extraction_errors = list(ctx.extraction.all_errors)
|
|
|
|
if ctx.normalized_doc is not None:
|
|
await self.storage.save_ocr_result(
|
|
document_id,
|
|
ctx.normalized_doc,
|
|
run_id=run_id,
|
|
)
|
|
if ctx.extraction is not None:
|
|
await self.storage.save_extraction_result(
|
|
document_id,
|
|
ctx.extraction,
|
|
run_id=run_id,
|
|
)
|
|
if ctx.evaluation is not None and ctx.rules_file is not None and ctx.extraction is not None:
|
|
await self.storage.save_evaluation_results(
|
|
document_id,
|
|
ctx.rules_file,
|
|
ctx.evaluation,
|
|
ctx.extraction,
|
|
run_id=run_id,
|
|
rule_version_id=result.metadata.rule_version_id,
|
|
)
|
|
if extraction_errors:
|
|
await self.storage.save_run_errors(
|
|
document_id,
|
|
run_id=run_id,
|
|
stage=ctx.phase or "extract",
|
|
messages=extraction_errors,
|
|
level="warning",
|
|
error_code="EXTRACTION_WARNING",
|
|
)
|
|
if ctx.fallback_tasks:
|
|
await self.storage.save_rescue_outcomes(
|
|
document_id,
|
|
run_id=run_id,
|
|
tasks=ctx.fallback_tasks,
|
|
)
|
|
|
|
await self.storage.save_run_metrics(
|
|
document_id,
|
|
run_id=run_id,
|
|
timing=dict(ctx.timing),
|
|
page_count=len(ctx.normalized_doc.pages) if ctx.normalized_doc is not None else None,
|
|
sub_document_count=len(ctx.extraction.sub_documents) if ctx.extraction is not None and getattr(ctx.extraction, "sub_documents", None) else 0,
|
|
field_count=len(ctx.extraction.fields) if ctx.extraction is not None else 0,
|
|
rule_count=len(ctx.evaluation.rules) if ctx.evaluation is not None else (len(ctx.rules_file.flat_rules) if ctx.rules_file is not None else 0),
|
|
rescue_rule_count=len(ctx.fallback_tasks),
|
|
artifact_count=self._estimate_artifact_count(ctx),
|
|
)
|
|
|
|
result_status = "review" if any(task.requires_human_review for task in ctx.fallback_tasks) else self._resolve_result_status(ctx)
|
|
await self.storage.finalize_run(
|
|
document_id,
|
|
run_id=run_id,
|
|
result_status=result_status,
|
|
rescue_applied=bool(ctx.fallback_tasks),
|
|
phase=ctx.phase,
|
|
finished=True,
|
|
)
|
|
|
|
def _estimate_artifact_count(self, ctx: AuditCtx) -> int:
|
|
"""粗略估算当前运行已经产出的平台产物数。"""
|
|
count = 0
|
|
if ctx.normalized_doc is not None:
|
|
count += 1
|
|
if ctx.extraction is not None:
|
|
count += 1
|
|
if ctx.evaluation is not None:
|
|
count += 1
|
|
if ctx.fallback_tasks:
|
|
count += len(ctx.fallback_tasks)
|
|
return count
|
|
|
|
def _resolve_result_status(self, ctx: AuditCtx) -> str:
|
|
"""按原生 AuditCtx 结果推导运行状态。"""
|
|
if ctx.evaluation is None:
|
|
return "error"
|
|
if ctx.evaluation.errors:
|
|
return "error"
|
|
if ctx.evaluation.failed_count == 0 and ctx.evaluation.skipped_count == 0:
|
|
return "pass"
|
|
if ctx.evaluation.failed_count > 0:
|
|
return "fail"
|
|
return "partial"
|
|
|
|
|
|
__all__ = ["NativeRunRequest", "NativeRunResult", "NativeRunner"]
|