"""Bridge-side OCR post-processing for leaudit integration. Keeps docauditai-specific fixes outside ``services/leaudit/**``: - DOCX embedded-image visuals can be refined once more with the VLM after the merged ``OcrResult`` is built. - Cross-page seals with missing completeness flags are normalized so the legacy compatibility checks have a stable shape to consume. """ from __future__ import annotations import logging from io import BytesIO from pathlib import Path from leaudit.ocr.base import BaseOCRClient from leaudit.ocr.models import OcrResult, VisualManifestItem log = logging.getLogger(__name__) class BridgeOCRClient(BaseOCRClient): """Wrap an OCR client and apply integration-side post-processing.""" def __init__( self, inner: BaseOCRClient, *, vlm_client: object | None = None, vlm_concurrency: int = 6, ) -> None: self.inner = inner self.vlm_client = vlm_client self.vlm_concurrency = vlm_concurrency async def ocr(self, file_path: Path | str) -> OcrResult: path = Path(file_path) result = await self.inner.ocr(path) await postprocess_ocr_result( result, file_path=path, vlm_client=self.vlm_client, vlm_concurrency=self.vlm_concurrency, ) return result async def postprocess_ocr_result( ocr_result: OcrResult, *, file_path: Path, vlm_client: object | None = None, vlm_concurrency: int = 6, ) -> OcrResult: """Apply bridge-side visual repairs without touching leaudit core.""" suffix = file_path.suffix.lower() if suffix not in {".docx", ".doc", ".wps"}: return ocr_result await _maybe_refine_docx_visuals( ocr_result, vlm_client=vlm_client, concurrency=vlm_concurrency, ) await _inject_docx_signature_candidates( ocr_result, vlm_client=vlm_client, ) _normalize_cross_page_seals(ocr_result) return ocr_result async def _maybe_refine_docx_visuals( ocr_result: OcrResult, *, vlm_client: object | None, concurrency: int, ) -> None: vm = ocr_result.visual_manifest if vlm_client is None or vm is None: return if not (vm.seals or vm.signatures or vm.cross_page_seals): return try: from leaudit.ocr.visual_classifier import refine_visual_manifest await refine_visual_manifest( ocr_result, vlm_client, concurrency=concurrency, ) except Exception as exc: log.warning("bridge visual refinement skipped: %s", exc) async def _inject_docx_signature_candidates( ocr_result: OcrResult, *, vlm_client: object | None, ) -> None: """Probe likely handwritten-signature zones on DOCX parent images.""" if vlm_client is None: return try: from PIL import Image except ImportError: log.warning("Pillow unavailable, skip DOCX signature candidate probing") return parent_to_items: dict[str, list[VisualManifestItem]] = {} for bucket in ( ocr_result.visual_manifest.seals or [], ocr_result.visual_manifest.signatures or [], ocr_result.visual_manifest.cross_page_seals or [], ): for item in bucket: parent_key = getattr(item, "parent_image_key", None) if parent_key: parent_to_items.setdefault(parent_key, []).append(item) for parent_key, items in parent_to_items.items(): if any((it.label or "") == "signature" for it in items): continue parent_bytes = ocr_result.get_image_bytes(parent_key) if not parent_bytes: continue try: image = Image.open(BytesIO(parent_bytes)) except Exception as exc: log.warning("failed to open parent image %s: %s", parent_key, exc) continue width, height = image.size for candidate_bbox in _signature_candidate_boxes(items, width, height): try: crop = image.crop(tuple(candidate_bbox)) buf = BytesIO() crop.save(buf, format="PNG") result = await _classify_signature_candidate( vlm_client, buf.getvalue(), "这是合同签章页里疑似法人签名的候选区域,请优先判断是否为手写签名。", ) except Exception as exc: log.warning("signature probe failed for %s: %s", parent_key, exc) continue if getattr(result, "kind", None) != "signature": continue page_num = _infer_parent_page_num(items) ocr_result.visual_manifest.signatures.append( VisualManifestItem( page_num=page_num, bbox=candidate_bbox, label="signature", confidence=getattr(result, "confidence", 0.9) or 0.9, text_match=(getattr(result, "text", None) or "").strip() or None, alt_text="docx_signature_candidate", image_key=parent_key, parent_image_key=parent_key, ) ) break async def _classify_signature_candidate( vlm_client: object, image_bytes: bytes, user_hint: str, ) -> object: """Classify with one retry using a fresh VLM client when needed.""" try: return await vlm_client.classify_visual(image_bytes, user_hint=user_hint) except Exception as exc: log.warning("signature probe primary VLM failed, retrying fresh client: %s", exc) try: from leaudit.llm.qwen_vlm_client import QwenVLMClient fresh = QwenVLMClient( base_url=getattr(vlm_client, "base_url"), api_key=getattr(vlm_client, "api_key", ""), model=getattr(vlm_client, "model"), timeout=getattr(vlm_client, "timeout", 90.0), ) try: return await fresh.classify_visual(image_bytes, user_hint=user_hint) finally: await fresh.close() except Exception as exc: raise RuntimeError(exc) from exc def _signature_candidate_boxes( items: list[VisualManifestItem], width: int, height: int, ) -> list[list[int]]: candidates: list[list[int]] = [] seen: set[tuple[int, int, int, int]] = set() for item in items: seal_type = getattr(item, "seal_type", None) label = getattr(item, "label", None) bbox = getattr(item, "bbox", None) or [] if len(bbox) != 4: continue x1, y1, x2, y2 = bbox box_w = max(1, x2 - x1) box_h = max(1, y2 - y1) ratio = box_w / box_h if seal_type == "法人章" or label == "法人章": continue if not (0.75 <= ratio <= 1.35): continue if box_w < width * 0.10 or box_h < height * 0.10: continue cand = [ max(0, int(x1 - box_w * 0.25)), max(0, int(y1 + box_h * 0.50)), min(width, int(x2 + box_w * 0.25)), min(height, int(y2 + box_h * 0.95)), ] if cand[2] - cand[0] < 24 or cand[3] - cand[1] < 24: continue key = tuple(cand) if key not in seen: seen.add(key) candidates.append(cand) return candidates def _infer_parent_page_num(items: list[VisualManifestItem]) -> int: for item in items: page_num = getattr(item, "page_num", None) if isinstance(page_num, int): return page_num return 0 def _normalize_cross_page_seals(ocr_result: OcrResult) -> None: """Fill obvious completeness defaults for bridge-side checks.""" for item in ocr_result.visual_manifest.cross_page_seals or []: if item.pages and len(item.pages) >= 2: item.is_complete = True continue bbox = item.bbox or [] if len(bbox) == 4: width = max(1, bbox[2] - bbox[0]) height = max(1, bbox[3] - bbox[1]) ratio = width / height # DOCX embedded images often contain a complete round seal near the # page edge; Chandra may still classify it as a seam-seal half by # geometry. A near-square crop is a strong signal that the visible # stamp is already complete. if 0.65 <= ratio <= 1.35: item.is_complete = True continue if item.is_complete is not None: continue if item.pages and len(item.pages) == 1: item.is_complete = False