Files
leaudit-platform-backend/fastapi_modules/fastapi_leaudit/leaudit_bridge/ocr_bridge.py
T

317 lines
10 KiB
Python

"""Bridge-side OCR post-processing for leaudit integration.
Keeps docauditai-specific fixes outside ``services/leaudit/**``:
- DOCX embedded-image visuals can be refined once more with the VLM after
the merged ``OcrResult`` is built.
- Cross-page seals with missing completeness flags are normalized so the
legacy compatibility checks have a stable shape to consume.
"""
from __future__ import annotations
import asyncio
import logging
from io import BytesIO
from pathlib import Path
from fastapi_admin.config import (
LEAUDIT_SIGNATURE_PROBE_CONCURRENCY,
LEAUDIT_SIGNATURE_PROBE_RETRY_BACKOFF_BASE_SECONDS,
LEAUDIT_SIGNATURE_PROBE_RETRY_MAX_ATTEMPTS,
LEAUDIT_SIGNATURE_PROBE_TIMEOUT,
)
from leaudit.ocr.base import BaseOCRClient
from leaudit.ocr.models import OcrResult, VisualManifestItem
log = logging.getLogger(__name__)
class BridgeOCRClient(BaseOCRClient):
"""Wrap an OCR client and apply integration-side post-processing."""
def __init__(
self,
inner: BaseOCRClient,
*,
vlm_client: object | None = None,
vlm_concurrency: int = 6,
) -> None:
self.inner = inner
self.vlm_client = vlm_client
self.vlm_concurrency = vlm_concurrency
async def ocr(self, file_path: Path | str) -> OcrResult:
path = Path(file_path)
result = await self.inner.ocr(path)
await postprocess_ocr_result(
result,
file_path=path,
vlm_client=self.vlm_client,
vlm_concurrency=self.vlm_concurrency,
)
return result
async def postprocess_ocr_result(
ocr_result: OcrResult,
*,
file_path: Path,
vlm_client: object | None = None,
vlm_concurrency: int = 6,
) -> OcrResult:
"""Apply bridge-side visual repairs without touching leaudit core."""
suffix = file_path.suffix.lower()
if suffix not in {".docx", ".doc", ".wps"}:
return ocr_result
await _maybe_refine_docx_visuals(
ocr_result,
vlm_client=vlm_client,
concurrency=vlm_concurrency,
)
await _inject_docx_signature_candidates(
ocr_result,
vlm_client=vlm_client,
)
_normalize_cross_page_seals(ocr_result)
return ocr_result
async def _maybe_refine_docx_visuals(
ocr_result: OcrResult,
*,
vlm_client: object | None,
concurrency: int,
) -> None:
vm = ocr_result.visual_manifest
if vlm_client is None or vm is None:
return
if not (vm.seals or vm.signatures or vm.cross_page_seals):
return
try:
from leaudit.ocr.visual_classifier import refine_visual_manifest
await refine_visual_manifest(
ocr_result,
vlm_client,
concurrency=concurrency,
)
except Exception as exc:
log.warning("bridge visual refinement skipped: %s", exc)
async def _inject_docx_signature_candidates(
ocr_result: OcrResult,
*,
vlm_client: object | None,
) -> None:
"""Probe likely handwritten-signature zones on DOCX parent images."""
if vlm_client is None:
return
try:
from PIL import Image
except ImportError:
log.warning("Pillow unavailable, skip DOCX signature candidate probing")
return
parent_to_items: dict[str, list[VisualManifestItem]] = {}
for bucket in (
ocr_result.visual_manifest.seals or [],
ocr_result.visual_manifest.signatures or [],
ocr_result.visual_manifest.cross_page_seals or [],
):
for item in bucket:
parent_key = getattr(item, "parent_image_key", None)
if parent_key:
parent_to_items.setdefault(parent_key, []).append(item)
sem = asyncio.Semaphore(max(1, int(LEAUDIT_SIGNATURE_PROBE_CONCURRENCY)))
async def _probe_parent(parent_key: str, items: list[VisualManifestItem]) -> None:
if any((it.label or "") == "signature" for it in items):
return
parent_bytes = ocr_result.get_image_bytes(parent_key)
if not parent_bytes:
return
try:
image = Image.open(BytesIO(parent_bytes))
except Exception as exc:
log.warning("failed to open parent image %s: %s", parent_key, exc)
return
width, height = image.size
for candidate_bbox in _signature_candidate_boxes(items, width, height):
try:
crop = image.crop(tuple(candidate_bbox))
buf = BytesIO()
crop.save(buf, format="PNG")
async with sem:
result = await _classify_signature_candidate(
vlm_client,
buf.getvalue(),
"这是合同签章页里疑似法人签名的候选区域,请优先判断是否为手写签名。",
parent_key=parent_key,
)
except Exception as exc:
log.warning("signature probe failed for %s: %s", parent_key, exc)
continue
if getattr(result, "kind", None) != "signature":
continue
page_num = _infer_parent_page_num(items)
ocr_result.visual_manifest.signatures.append(
VisualManifestItem(
page_num=page_num,
bbox=candidate_bbox,
label="signature",
confidence=getattr(result, "confidence", 0.9) or 0.9,
text_match=(getattr(result, "text", None) or "").strip() or None,
alt_text="docx_signature_candidate",
image_key=parent_key,
parent_image_key=parent_key,
)
)
break
if parent_to_items:
await asyncio.gather(
*(_probe_parent(parent_key, items) for parent_key, items in parent_to_items.items()),
return_exceptions=False,
)
async def _classify_signature_candidate(
vlm_client: object,
image_bytes: bytes,
user_hint: str,
*,
parent_key: str | None = None,
) -> object:
"""Classify with configurable retry using a fresh VLM client when needed."""
timeout = max(1, int(LEAUDIT_SIGNATURE_PROBE_TIMEOUT))
max_attempts = max(1, int(LEAUDIT_SIGNATURE_PROBE_RETRY_MAX_ATTEMPTS))
backoff_base = max(0.0, float(LEAUDIT_SIGNATURE_PROBE_RETRY_BACKOFF_BASE_SECONDS))
last_error: Exception | None = None
for attempt in range(max_attempts):
current_client = vlm_client
fresh = None
try:
if attempt > 0:
from fastapi_modules.fastapi_leaudit.leaudit_bridge.resilient_clients import ResilientQwenVLMClient
fresh = ResilientQwenVLMClient(
base_url=getattr(vlm_client, "base_url"),
api_key=getattr(vlm_client, "api_key", ""),
model=getattr(vlm_client, "model"),
timeout=getattr(vlm_client, "timeout", 90.0),
retry_max_attempts=1,
retry_backoff_base_seconds=0.0,
)
current_client = fresh
return await asyncio.wait_for(
current_client.classify_visual(image_bytes, user_hint=user_hint),
timeout=timeout,
)
except Exception as exc:
last_error = exc
if attempt < max_attempts - 1:
log.warning(
"signature probe attempt %s/%s failed for %s, retrying after %.2fs (timeout=%ss): %s",
attempt + 1,
max_attempts,
parent_key or "-",
backoff_base * (2 ** attempt),
timeout,
exc,
)
await asyncio.sleep(backoff_base * (2 ** attempt))
continue
finally:
if fresh is not None:
await fresh.close()
raise RuntimeError(last_error) from last_error
def _signature_candidate_boxes(
items: list[VisualManifestItem],
width: int,
height: int,
) -> list[list[int]]:
candidates: list[list[int]] = []
seen: set[tuple[int, int, int, int]] = set()
for item in items:
seal_type = getattr(item, "seal_type", None)
label = getattr(item, "label", None)
bbox = getattr(item, "bbox", None) or []
if len(bbox) != 4:
continue
x1, y1, x2, y2 = bbox
box_w = max(1, x2 - x1)
box_h = max(1, y2 - y1)
ratio = box_w / box_h
if seal_type == "法人章" or label == "法人章":
continue
if not (0.75 <= ratio <= 1.35):
continue
if box_w < width * 0.10 or box_h < height * 0.10:
continue
cand = [
max(0, int(x1 - box_w * 0.25)),
max(0, int(y1 + box_h * 0.50)),
min(width, int(x2 + box_w * 0.25)),
min(height, int(y2 + box_h * 0.95)),
]
if cand[2] - cand[0] < 24 or cand[3] - cand[1] < 24:
continue
key = tuple(cand)
if key not in seen:
seen.add(key)
candidates.append(cand)
return candidates
def _infer_parent_page_num(items: list[VisualManifestItem]) -> int:
for item in items:
page_num = getattr(item, "page_num", None)
if isinstance(page_num, int):
return page_num
return 0
def _normalize_cross_page_seals(ocr_result: OcrResult) -> None:
"""Fill obvious completeness defaults for bridge-side checks."""
for item in ocr_result.visual_manifest.cross_page_seals or []:
if item.pages and len(item.pages) >= 2:
item.is_complete = True
continue
bbox = item.bbox or []
if len(bbox) == 4:
width = max(1, bbox[2] - bbox[0])
height = max(1, bbox[3] - bbox[1])
ratio = width / height
# DOCX embedded images often contain a complete round seal near the
# page edge; Chandra may still classify it as a seam-seal half by
# geometry. A near-square crop is a strong signal that the visible
# stamp is already complete.
if 0.65 <= ratio <= 1.35:
item.is_complete = True
continue
if item.is_complete is not None:
continue
if item.pages and len(item.pages) == 1:
item.is_complete = False