535d97a70c
17-table PostgreSQL schema with full Chinese column comments, FastAPI project structure (admin/common/modules), DSL rule files, and schema migration scripts.
273 lines
8.5 KiB
Python
273 lines
8.5 KiB
Python
"""Bridge-side OCR post-processing for leaudit integration.
|
|
|
|
Keeps docauditai-specific fixes outside ``services/leaudit/**``:
|
|
- DOCX embedded-image visuals can be refined once more with the VLM after
|
|
the merged ``OcrResult`` is built.
|
|
- Cross-page seals with missing completeness flags are normalized so the
|
|
legacy compatibility checks have a stable shape to consume.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import logging
|
|
from io import BytesIO
|
|
from pathlib import Path
|
|
|
|
from leaudit.ocr.base import BaseOCRClient
|
|
from leaudit.ocr.models import OcrResult, VisualManifestItem
|
|
|
|
log = logging.getLogger(__name__)
|
|
|
|
|
|
class BridgeOCRClient(BaseOCRClient):
|
|
"""Wrap an OCR client and apply integration-side post-processing."""
|
|
|
|
def __init__(
|
|
self,
|
|
inner: BaseOCRClient,
|
|
*,
|
|
vlm_client: object | None = None,
|
|
vlm_concurrency: int = 6,
|
|
) -> None:
|
|
self.inner = inner
|
|
self.vlm_client = vlm_client
|
|
self.vlm_concurrency = vlm_concurrency
|
|
|
|
async def ocr(self, file_path: Path | str) -> OcrResult:
|
|
path = Path(file_path)
|
|
result = await self.inner.ocr(path)
|
|
await postprocess_ocr_result(
|
|
result,
|
|
file_path=path,
|
|
vlm_client=self.vlm_client,
|
|
vlm_concurrency=self.vlm_concurrency,
|
|
)
|
|
return result
|
|
|
|
|
|
async def postprocess_ocr_result(
|
|
ocr_result: OcrResult,
|
|
*,
|
|
file_path: Path,
|
|
vlm_client: object | None = None,
|
|
vlm_concurrency: int = 6,
|
|
) -> OcrResult:
|
|
"""Apply bridge-side visual repairs without touching leaudit core."""
|
|
suffix = file_path.suffix.lower()
|
|
if suffix not in {".docx", ".doc", ".wps"}:
|
|
return ocr_result
|
|
|
|
await _maybe_refine_docx_visuals(
|
|
ocr_result,
|
|
vlm_client=vlm_client,
|
|
concurrency=vlm_concurrency,
|
|
)
|
|
await _inject_docx_signature_candidates(
|
|
ocr_result,
|
|
vlm_client=vlm_client,
|
|
)
|
|
_normalize_cross_page_seals(ocr_result)
|
|
return ocr_result
|
|
|
|
|
|
async def _maybe_refine_docx_visuals(
|
|
ocr_result: OcrResult,
|
|
*,
|
|
vlm_client: object | None,
|
|
concurrency: int,
|
|
) -> None:
|
|
vm = ocr_result.visual_manifest
|
|
if vlm_client is None or vm is None:
|
|
return
|
|
if not (vm.seals or vm.signatures or vm.cross_page_seals):
|
|
return
|
|
|
|
try:
|
|
from leaudit.ocr.visual_classifier import refine_visual_manifest
|
|
|
|
await refine_visual_manifest(
|
|
ocr_result,
|
|
vlm_client,
|
|
concurrency=concurrency,
|
|
)
|
|
except Exception as exc:
|
|
log.warning("bridge visual refinement skipped: %s", exc)
|
|
|
|
|
|
async def _inject_docx_signature_candidates(
|
|
ocr_result: OcrResult,
|
|
*,
|
|
vlm_client: object | None,
|
|
) -> None:
|
|
"""Probe likely handwritten-signature zones on DOCX parent images."""
|
|
if vlm_client is None:
|
|
return
|
|
|
|
try:
|
|
from PIL import Image
|
|
except ImportError:
|
|
log.warning("Pillow unavailable, skip DOCX signature candidate probing")
|
|
return
|
|
|
|
parent_to_items: dict[str, list[VisualManifestItem]] = {}
|
|
for bucket in (
|
|
ocr_result.visual_manifest.seals or [],
|
|
ocr_result.visual_manifest.signatures or [],
|
|
ocr_result.visual_manifest.cross_page_seals or [],
|
|
):
|
|
for item in bucket:
|
|
parent_key = getattr(item, "parent_image_key", None)
|
|
if parent_key:
|
|
parent_to_items.setdefault(parent_key, []).append(item)
|
|
|
|
for parent_key, items in parent_to_items.items():
|
|
if any((it.label or "") == "signature" for it in items):
|
|
continue
|
|
parent_bytes = ocr_result.get_image_bytes(parent_key)
|
|
if not parent_bytes:
|
|
continue
|
|
|
|
try:
|
|
image = Image.open(BytesIO(parent_bytes))
|
|
except Exception as exc:
|
|
log.warning("failed to open parent image %s: %s", parent_key, exc)
|
|
continue
|
|
|
|
width, height = image.size
|
|
for candidate_bbox in _signature_candidate_boxes(items, width, height):
|
|
try:
|
|
crop = image.crop(tuple(candidate_bbox))
|
|
buf = BytesIO()
|
|
crop.save(buf, format="PNG")
|
|
result = await _classify_signature_candidate(
|
|
vlm_client,
|
|
buf.getvalue(),
|
|
"这是合同签章页里疑似法人签名的候选区域,请优先判断是否为手写签名。",
|
|
)
|
|
except Exception as exc:
|
|
log.warning("signature probe failed for %s: %s", parent_key, exc)
|
|
continue
|
|
|
|
if getattr(result, "kind", None) != "signature":
|
|
continue
|
|
|
|
page_num = _infer_parent_page_num(items)
|
|
ocr_result.visual_manifest.signatures.append(
|
|
VisualManifestItem(
|
|
page_num=page_num,
|
|
bbox=candidate_bbox,
|
|
label="signature",
|
|
confidence=getattr(result, "confidence", 0.9) or 0.9,
|
|
text_match=(getattr(result, "text", None) or "").strip() or None,
|
|
alt_text="docx_signature_candidate",
|
|
image_key=parent_key,
|
|
parent_image_key=parent_key,
|
|
)
|
|
)
|
|
break
|
|
|
|
|
|
async def _classify_signature_candidate(
|
|
vlm_client: object,
|
|
image_bytes: bytes,
|
|
user_hint: str,
|
|
) -> object:
|
|
"""Classify with one retry using a fresh VLM client when needed."""
|
|
try:
|
|
return await vlm_client.classify_visual(image_bytes, user_hint=user_hint)
|
|
except Exception as exc:
|
|
log.warning("signature probe primary VLM failed, retrying fresh client: %s", exc)
|
|
|
|
try:
|
|
from leaudit.llm.qwen_vlm_client import QwenVLMClient
|
|
|
|
fresh = QwenVLMClient(
|
|
base_url=getattr(vlm_client, "base_url"),
|
|
api_key=getattr(vlm_client, "api_key", ""),
|
|
model=getattr(vlm_client, "model"),
|
|
timeout=getattr(vlm_client, "timeout", 90.0),
|
|
)
|
|
try:
|
|
return await fresh.classify_visual(image_bytes, user_hint=user_hint)
|
|
finally:
|
|
await fresh.close()
|
|
except Exception as exc:
|
|
raise RuntimeError(exc) from exc
|
|
|
|
|
|
def _signature_candidate_boxes(
|
|
items: list[VisualManifestItem],
|
|
width: int,
|
|
height: int,
|
|
) -> list[list[int]]:
|
|
candidates: list[list[int]] = []
|
|
seen: set[tuple[int, int, int, int]] = set()
|
|
|
|
for item in items:
|
|
seal_type = getattr(item, "seal_type", None)
|
|
label = getattr(item, "label", None)
|
|
bbox = getattr(item, "bbox", None) or []
|
|
if len(bbox) != 4:
|
|
continue
|
|
|
|
x1, y1, x2, y2 = bbox
|
|
box_w = max(1, x2 - x1)
|
|
box_h = max(1, y2 - y1)
|
|
ratio = box_w / box_h
|
|
|
|
if seal_type == "法人章" or label == "法人章":
|
|
continue
|
|
if not (0.75 <= ratio <= 1.35):
|
|
continue
|
|
if box_w < width * 0.10 or box_h < height * 0.10:
|
|
continue
|
|
|
|
cand = [
|
|
max(0, int(x1 - box_w * 0.25)),
|
|
max(0, int(y1 + box_h * 0.50)),
|
|
min(width, int(x2 + box_w * 0.25)),
|
|
min(height, int(y2 + box_h * 0.95)),
|
|
]
|
|
if cand[2] - cand[0] < 24 or cand[3] - cand[1] < 24:
|
|
continue
|
|
key = tuple(cand)
|
|
if key not in seen:
|
|
seen.add(key)
|
|
candidates.append(cand)
|
|
|
|
return candidates
|
|
|
|
|
|
def _infer_parent_page_num(items: list[VisualManifestItem]) -> int:
|
|
for item in items:
|
|
page_num = getattr(item, "page_num", None)
|
|
if isinstance(page_num, int):
|
|
return page_num
|
|
return 0
|
|
|
|
|
|
def _normalize_cross_page_seals(ocr_result: OcrResult) -> None:
|
|
"""Fill obvious completeness defaults for bridge-side checks."""
|
|
for item in ocr_result.visual_manifest.cross_page_seals or []:
|
|
if item.pages and len(item.pages) >= 2:
|
|
item.is_complete = True
|
|
continue
|
|
|
|
bbox = item.bbox or []
|
|
if len(bbox) == 4:
|
|
width = max(1, bbox[2] - bbox[0])
|
|
height = max(1, bbox[3] - bbox[1])
|
|
ratio = width / height
|
|
# DOCX embedded images often contain a complete round seal near the
|
|
# page edge; Chandra may still classify it as a seam-seal half by
|
|
# geometry. A near-square crop is a strong signal that the visible
|
|
# stamp is already complete.
|
|
if 0.65 <= ratio <= 1.35:
|
|
item.is_complete = True
|
|
continue
|
|
|
|
if item.is_complete is not None:
|
|
continue
|
|
if item.pages and len(item.pages) == 1:
|
|
item.is_complete = False
|