diff --git a/fastapi_modules/fastapi_leaudit/leaudit_bridge/nativeRunner.py b/fastapi_modules/fastapi_leaudit/leaudit_bridge/nativeRunner.py index 2748993..5b73981 100644 --- a/fastapi_modules/fastapi_leaudit/leaudit_bridge/nativeRunner.py +++ b/fastapi_modules/fastapi_leaudit/leaudit_bridge/nativeRunner.py @@ -107,6 +107,7 @@ class NativeRunner: await self.storage.save_extraction_result( document_id, ctx.extraction, + ocr_result=ctx.normalized_doc, run_id=run_id, ) if ctx.evaluation is not None and ctx.rules_file is not None and ctx.extraction is not None: @@ -115,6 +116,7 @@ class NativeRunner: ctx.rules_file, ctx.evaluation, ctx.extraction, + ocr_result=ctx.normalized_doc, run_id=run_id, rule_version_id=result.metadata.rule_version_id, ) diff --git a/fastapi_modules/fastapi_leaudit/leaudit_bridge/storage_adapter.py b/fastapi_modules/fastapi_leaudit/leaudit_bridge/storage_adapter.py index 499b3e9..87c6fd4 100644 --- a/fastapi_modules/fastapi_leaudit/leaudit_bridge/storage_adapter.py +++ b/fastapi_modules/fastapi_leaudit/leaudit_bridge/storage_adapter.py @@ -99,11 +99,13 @@ class StorageAdapter: self, document_id: int, bundle: ExtractionBundle, + ocr_result: OcrResult | None = None, *, run_id: int | None = None, ) -> None: """Save extraction result to leaudit_field_results table.""" - extracted = _bundle_to_extracted(bundle) + inferred_positions = _build_inferred_field_positions(bundle, ocr_result) + extracted = _bundle_to_extracted(bundle, inferred_positions=inferred_positions) resolved_run_id = await self._ensure_run_id(document_id, run_id) async with GetAsyncSession() as session: @@ -111,7 +113,8 @@ class StorageAdapter: field_data = extracted.get("fields", {}).get(name, {}) raw_value = fv.raw_value if isinstance(fv, FieldValue) else None meta_json = { - "position": _field_value_position_payload(fv), + "position": (_field_value_position_payload(fv) if isinstance(fv, FieldValue) else None) + or inferred_positions.get(name), "reasons": list(fv.reasons or []), "type_name": fv.type_name, } if isinstance(fv, FieldValue) else None @@ -153,6 +156,7 @@ class StorageAdapter: rules_file: RulesFile, evaluation: EvaluationResult, bundle: ExtractionBundle, + ocr_result: OcrResult | None = None, *, run_id: int | None = None, rule_version_id: int | None = None, @@ -163,6 +167,7 @@ class StorageAdapter: then inserts fresh rows. """ resolved_run_id = await self._ensure_run_id(document_id, run_id) + inferred_positions = _build_inferred_field_positions(bundle, ocr_result) async with GetAsyncSession() as session: # Delete existing results for this document+run await session.execute( @@ -178,7 +183,14 @@ class StorageAdapter: # Insert one row per rule result for rule_result in evaluation.rules: rule = rule_meta.get(rule_result.rule_id) - row = _rule_result_to_row(document_id, resolved_run_id, rule_result, rule, bundle) + row = _rule_result_to_row( + document_id, + resolved_run_id, + rule_result, + rule, + bundle, + inferred_positions=inferred_positions, + ) if rule_version_id is not None: row["rule_version_id"] = rule_version_id json_columns = {"stages", "extracted_fields", "field_positions", "remediation", "rule_meta"} @@ -550,8 +562,13 @@ def _ocr_to_dict(ocr: OcrResult) -> dict[str, Any]: return result -def _bundle_to_extracted(bundle: ExtractionBundle) -> dict[str, Any]: +def _bundle_to_extracted( + bundle: ExtractionBundle, + *, + inferred_positions: dict[str, dict[str, Any]] | None = None, +) -> dict[str, Any]: """Convert ExtractionBundle to docauditai's extracted_results format.""" + inferred_positions = inferred_positions or {} fields: dict[str, Any] = {} for name, fv in bundle.fields.items(): if isinstance(fv, FieldValue): @@ -559,7 +576,7 @@ def _bundle_to_extracted(bundle: ExtractionBundle) -> dict[str, Any]: "value": fv.value, "confidence": float(fv.confidence) if fv.confidence else 0.0, } - position_payload = _field_value_position_payload(fv) + position_payload = _field_value_position_payload(fv) or inferred_positions.get(name) if position_payload is not None: field_data["position"] = position_payload fields[name] = field_data @@ -633,8 +650,11 @@ def _extract_relevant_fields(rule: Any, bundle: ExtractionBundle) -> dict[str, A def _extract_relevant_field_positions( rule: Any, bundle: ExtractionBundle, + *, + inferred_positions: dict[str, dict[str, Any]] | None = None, ) -> dict[str, Any]: """Extract position data for fields referenced by a rule's stages.""" + inferred_positions = inferred_positions or {} positions: dict[str, Any] = {} if not rule or not hasattr(rule, "stages") or not rule.stages: return positions @@ -671,7 +691,7 @@ def _extract_relevant_field_positions( continue fv = bundle.fields.get(f) if fv is not None and isinstance(fv, FieldValue): - position_payload = _field_value_position_payload(fv) + position_payload = _field_value_position_payload(fv) or inferred_positions.get(f) if position_payload is not None: positions[f] = position_payload return positions @@ -703,12 +723,183 @@ def _field_value_position_payload(fv: FieldValue) -> dict[str, Any] | None: return payload or None +_POSITION_PUNCT_TABLE = str.maketrans({ + ",": ",", "。": ".", ";": ";", ":": ":", + "!": "!", "?": "?", "(": "(", ")": ")", + "【": "[", "】": "]", "「": '"', "」": '"', + "‘": "'", "’": "'", "“": '"', "”": '"', +}) + + +def _build_inferred_field_positions( + bundle: ExtractionBundle, + ocr_result: OcrResult | None, +) -> dict[str, dict[str, Any]]: + """平台侧兜底补齐字段页码,避免 bridge 落库后丢失定位页。""" + if ocr_result is None or not ocr_result.pages: + return {} + + inferred: dict[str, dict[str, Any]] = {} + page_texts = [ + (int(page.page_num) + 1, str(page.text or "")) + for page in ocr_result.pages + if str(page.text or "").strip() + ] + if not page_texts: + return {} + + for field_name, field_value in bundle.fields.items(): + if not isinstance(field_value, FieldValue): + continue + if _field_value_position_payload(field_value) is not None: + continue + + value_text = _stringify_position_value(field_value.value) + if not value_text: + continue + + chunk_match = _infer_position_from_chunks(value_text, ocr_result) + if chunk_match is not None: + inferred[field_name] = chunk_match + continue + + page_num = _infer_page_num_from_page_texts(value_text, page_texts) + if page_num is not None: + inferred[field_name] = { + "pageNum": page_num, + "matchMethod": "platform_bridge_page_fallback", + } + + return inferred + + +def _infer_position_from_chunks( + value_text: str, + ocr_result: OcrResult, +) -> dict[str, Any] | None: + normalized_value = _normalize_position_text(value_text) + if not normalized_value: + return None + + for page in ocr_result.pages: + for chunk in page.chunks or []: + content = "" + bbox = None + if isinstance(chunk, dict): + content = str(chunk.get("content") or "") + bbox = chunk.get("bbox") + else: + content = str(getattr(chunk, "content", "") or "") + bbox = getattr(chunk, "bbox", None) + + if not content: + continue + if normalized_value not in _normalize_position_text(content): + continue + + payload: dict[str, Any] = { + "pageNum": int(page.page_num) + 1, + "matchMethod": "platform_bridge_chunk_fallback", + } + if bbox: + payload["bbox"] = bbox + return payload + return None + + +def _infer_page_num_from_page_texts( + value_text: str, + page_texts: list[tuple[int, str]], +) -> int | None: + normalized_value = _normalize_position_text(value_text) + if not normalized_value: + return None + + best_page: int | None = None + best_score = 0.0 + min_length_for_fuzzy = 8 + + for page_num, page_text in page_texts: + normalized_page = _normalize_position_text(page_text) + if not normalized_page: + continue + if normalized_value in normalized_page: + return page_num + + if len(normalized_value) < min_length_for_fuzzy: + continue + + score = _partial_similarity(normalized_value, normalized_page) + if score > best_score: + best_score = score + best_page = page_num + + if best_score >= 0.92: + return best_page + return None + + +def _normalize_position_text(text: str) -> str: + if not text: + return "" + normalized = re.sub(r"<[^>]+>", "", str(text)) + normalized = re.sub(r"\s+", "", normalized) + return normalized.translate(_POSITION_PUNCT_TABLE) + + +def _partial_similarity(needle: str, haystack: str) -> float: + if not needle or not haystack: + return 0.0 + if len(needle) > len(haystack): + needle, haystack = haystack, needle + + window = len(needle) + if window <= 0: + return 0.0 + if needle == haystack: + return 1.0 + + best = 0.0 + step = max(1, window // 6) + stop = max(len(haystack) - window + 1, 1) + for start in range(0, stop, step): + chunk = haystack[start : start + window] + if not chunk: + continue + matches = sum(1 for left, right in zip(needle, chunk) if left == right) + best = max(best, matches / window) + if best >= 0.999: + return 1.0 + return best + + +def _stringify_position_value(raw_value: Any) -> str: + if raw_value is None: + return "" + if isinstance(raw_value, str): + return raw_value.strip() + if isinstance(raw_value, (int, float, bool)): + return str(raw_value) + if isinstance(raw_value, dict): + for key in ("value", "text", "value_text"): + value = raw_value.get(key) + if isinstance(value, str) and value.strip(): + return value.strip() + return "" + if isinstance(raw_value, list): + parts = [_stringify_position_value(item) for item in raw_value] + return " ".join(part for part in parts if part).strip() + return str(raw_value).strip() + + def _rule_result_to_row( document_id: int, run_id: int | None, rule_result: RuleResult, rule: Any | None, bundle: ExtractionBundle, + *, + inferred_positions: dict[str, dict[str, Any]] | None = None, ) -> dict[str, Any]: """Convert a RuleResult to a leaudit_rule_results row.""" passed = rule_result.passed @@ -760,7 +951,11 @@ def _rule_result_to_row( "fail_message": fail_msg, "stages": [s.model_dump(mode="json") for s in (rule_result.stages or [])], "extracted_fields": relevant_fields, - "field_positions": _extract_relevant_field_positions(rule, bundle), + "field_positions": _extract_relevant_field_positions( + rule, + bundle, + inferred_positions=inferred_positions, + ), "remediation": remediation, "rule_meta": rule_meta_data, } diff --git a/fastapi_modules/fastapi_leaudit/services/impl/documentServiceImpl.py b/fastapi_modules/fastapi_leaudit/services/impl/documentServiceImpl.py index d387657..5268bb2 100644 --- a/fastapi_modules/fastapi_leaudit/services/impl/documentServiceImpl.py +++ b/fastapi_modules/fastapi_leaudit/services/impl/documentServiceImpl.py @@ -2,6 +2,7 @@ from __future__ import annotations +import json from typing import Any from datetime import date as date_type, datetime import hashlib @@ -143,6 +144,7 @@ class DocumentServiceImpl(IDocumentService): resolvedTypeId = int(typeRow["id"]) resolvedTypeCode = str(typeRow["code"]) resolvedGroupId = await self._resolveDocumentGroupId(Session, resolvedTypeId, GroupId) + resolvedRootGroupId = await self._resolveDocumentRootGroupId(Session, resolvedTypeId, resolvedGroupId) duplicateUpload = False previousVersionId: int | None = None rootVersionId: int | None = None @@ -154,6 +156,7 @@ class DocumentServiceImpl(IDocumentService): latestCandidate = await _find_latest_version_candidate( Session, type_id=resolvedTypeId, + root_group_id=resolvedRootGroupId, region=normalizedRegion, normalized_name=normalizedName, ) @@ -1547,6 +1550,50 @@ class DocumentServiceImpl(IDocumentService): raise LeauditException(StatusCodeEnum.HTTP_400_BAD_REQUEST, "当前子类型不属于所选文档类型,无法上传") return int(row["id"]) + async def _resolveDocumentRootGroupId(self, Session, TypeId: int, GroupId: int | None) -> int | None: + """解析上传命中的一级分组,用于跨二级类型做版本归档。""" + if GroupId is not None: + row = ( + await Session.execute( + text( + """ + SELECT + CASE + WHEN COALESCE(pid, 0) = 0 THEN id + ELSE pid + END AS root_group_id + FROM leaudit_evaluation_point_groups + WHERE id = :group_id + AND deleted_at IS NULL + LIMIT 1 + """ + ), + {"group_id": GroupId}, + ) + ).mappings().first() + if row and row["root_group_id"] is not None: + return int(row["root_group_id"]) + + row = ( + await Session.execute( + text( + """ + SELECT + CASE WHEN COUNT(DISTINCT pid) = 1 THEN MIN(pid) END AS root_group_id + FROM leaudit_evaluation_point_groups + WHERE document_type_id = :doc_type_id + AND COALESCE(pid, 0) <> 0 + AND deleted_at IS NULL + AND is_enabled = true + """ + ), + {"doc_type_id": TypeId}, + ) + ).mappings().first() + if row and row["root_group_id"] is not None: + return int(row["root_group_id"]) + return None + async def _getDocumentDetail( self, Session, @@ -2220,6 +2267,12 @@ class DocumentServiceImpl(IDocumentService): {"run_id": RunId, "document_id": Detail.documentId}, ) ).mappings().all() + rows = await self._backfillMissingReviewFieldPositions( + Session, + Detail.documentId, + RunId, + rows, + ) result: list[ReviewPointResultVO] = [] for row in rows: @@ -2273,6 +2326,197 @@ class DocumentServiceImpl(IDocumentService): ) return result + async def _backfillMissingReviewFieldPositions( + self, + Session, + DocumentId: int, + RunId: int, + rows: list[dict[str, Any]], + ) -> list[dict[str, Any]]: + """详情页兜底补齐缺失页码,避免长文本字段只能按文本定位。""" + missingFields: dict[str, str] = {} + touchedRuleRows: dict[int, dict[str, Any]] = {} + + for rawRow in rows: + row = dict(rawRow) + extractedFields = row["extracted_fields"] if isinstance(row.get("extracted_fields"), dict) else {} + fieldPositions = dict(row["field_positions"]) if isinstance(row.get("field_positions"), dict) else {} + row["field_positions"] = fieldPositions + + rowTouched = False + for fieldName, rawValue in extractedFields.items(): + if _extract_review_page(fieldPositions.get(fieldName)) is not None: + continue + valueText = _stringify_char_position_value(_normalize_review_value(rawValue)) + if not valueText.strip(): + continue + existing = missingFields.get(str(fieldName)) + if existing is None or len(valueText) > len(existing): + missingFields[str(fieldName)] = valueText + rowTouched = True + + if rowTouched and row.get("id") is not None: + touchedRuleRows[int(row["id"])] = row + + if not missingFields: + return rows + + inferredPositions = await self._inferFieldPositionsFromDocumentSource( + Session, + DocumentId, + missingFields, + ) + if not inferredPositions: + return rows + + for ruleRowId, row in touchedRuleRows.items(): + fieldPositions = row["field_positions"] if isinstance(row.get("field_positions"), dict) else {} + changed = False + extractedFields = row["extracted_fields"] if isinstance(row.get("extracted_fields"), dict) else {} + for fieldName in extractedFields.keys(): + key = str(fieldName) + if _extract_review_page(fieldPositions.get(key)) is not None: + continue + inferred = inferredPositions.get(key) + if inferred is None: + continue + fieldPositions[key] = inferred + changed = True + + if not changed: + continue + + await Session.execute( + text( + """ + UPDATE leaudit_rule_results + SET field_positions = CAST(:field_positions AS JSONB), + updated_at = NOW() + WHERE id = :rule_result_id + """ + ), + { + "rule_result_id": ruleRowId, + "field_positions": json.dumps(fieldPositions, ensure_ascii=False), + }, + ) + + for fieldName, position in inferredPositions.items(): + await Session.execute( + text( + """ + UPDATE leaudit_field_results + SET meta_json = jsonb_set( + COALESCE(meta_json, '{}'::jsonb), + '{position}', + CAST(:position AS JSONB), + true + ), + updated_at = NOW() + WHERE run_id = :run_id + AND document_id = :document_id + AND field_name = :field_name + AND ( + meta_json IS NULL + OR meta_json->'position' IS NULL + OR meta_json->'position' = 'null'::jsonb + ) + """ + ), + { + "run_id": RunId, + "document_id": DocumentId, + "field_name": fieldName, + "position": json.dumps(position, ensure_ascii=False), + }, + ) + + await Session.commit() + return rows + + async def _inferFieldPositionsFromDocumentSource( + self, + Session, + DocumentId: int, + FieldTexts: dict[str, str], + ) -> dict[str, dict[str, Any]]: + """按当前文档源文件逐页匹配字段文本,推导最小可用 pageNum。""" + fileRow = ( + await Session.execute( + text( + """ + SELECT + id, + file_name, + file_ext, + local_path, + oss_url, + file_role + FROM leaudit_document_files + WHERE document_id = :document_id + AND is_active = true + ORDER BY + CASE file_role + WHEN 'converted_pdf' THEN 0 + WHEN 'merged_pdf' THEN 1 + WHEN 'primary' THEN 2 + ELSE 9 + END, + id DESC + LIMIT 1 + """ + ), + {"document_id": DocumentId}, + ) + ).mappings().first() + if not fileRow: + return {} + + fileName = str(fileRow["file_name"] or "") + fileExt = f".{str(fileRow['file_ext']).lstrip('.')}" if fileRow.get("file_ext") else Path(fileName).suffix + tempPath: str | None = None + localPath = str(fileRow["local_path"] or "").strip() + try: + if localPath and Path(localPath).is_file(): + sourcePath = Path(localPath) + else: + ossUrl = str(fileRow["oss_url"] or "").strip() + if not ossUrl: + return {} + downloaded = await self.OssService.DownloadToTempFile( + ossUrl, + Suffix=fileExt or Path(fileName).suffix or "", + Prefix=f"review-page-{DocumentId}-", + ) + tempPath = downloaded + sourcePath = Path(downloaded) + + suffix = sourcePath.suffix.lower() + if suffix == ".pdf": + pageTexts = _extract_page_texts_from_pdf(sourcePath) + elif suffix == ".docx": + pageTexts = _extract_page_texts_from_docx(sourcePath) + else: + return {} + if not pageTexts: + return {} + + inferred: dict[str, dict[str, Any]] = {} + for fieldName, fieldText in FieldTexts.items(): + pageNum = _infer_page_num_from_page_texts(fieldText, pageTexts) + if pageNum is None: + continue + inferred[fieldName] = {"pageNum": pageNum, "matchMethod": "detail_page_fallback"} + return inferred + except Exception: + return {} + finally: + if tempPath: + try: + Path(tempPath).unlink(missing_ok=True) + except OSError: + pass + def _buildReviewPointStats(self, ReviewPoints: list[ReviewPointResultVO]) -> ReviewPointStatsVO: """按旧详情页口径汇总统计。""" stats = ReviewPointStatsVO(total=len(ReviewPoints), success=0, warning=0, error=0, score=0) @@ -2341,10 +2585,71 @@ async def _find_latest_version_candidate( session, *, type_id: int, + root_group_id: int | None, region: str, normalized_name: str, ) -> dict | None: - """Find the latest primary document version candidate by normalized name.""" + """Find the latest primary document version candidate by normalized name. + + Preferred rule: same region + same root group + same normalized name. + Fallback rule: when a root group cannot be resolved, keep the old same-type behavior. + """ + if root_group_id is not None: + result = await session.execute( + text( + """ + SELECT + d.id AS document_id, + d.version_group_key, + d.version_no, + d.root_version_id, + f.id AS file_id, + f.sha256 + FROM leaudit_documents d + JOIN leaudit_document_files f + ON f.document_id = d.id + AND f.is_active = true + AND f.file_role = 'primary' + LEFT JOIN leaudit_evaluation_point_groups eg + ON eg.id = d.group_id + LEFT JOIN ( + SELECT + document_type_id, + CASE WHEN COUNT(DISTINCT pid) = 1 THEN MIN(pid) END AS inferred_root_group_id + FROM leaudit_evaluation_point_groups + WHERE COALESCE(pid, 0) <> 0 + AND deleted_at IS NULL + AND is_enabled = true + AND document_type_id IS NOT NULL + GROUP BY document_type_id + ) dg + ON dg.document_type_id = d.type_id + WHERE d.region = :region + AND d.normalized_name = :normalized_name + AND d.is_latest_version = true + AND d.deleted_at IS NULL + AND COALESCE( + CASE + WHEN eg.id IS NULL THEN NULL + WHEN COALESCE(eg.pid, 0) = 0 THEN eg.id + ELSE eg.pid + END, + dg.inferred_root_group_id + ) = :root_group_id + ORDER BY d.version_no DESC, d.id DESC + LIMIT 1 + """ + ), + { + "root_group_id": root_group_id, + "region": region, + "normalized_name": normalized_name, + }, + ) + row = result.mappings().first() + if row: + return dict(row) + result = await session.execute( text( """ @@ -2595,6 +2900,119 @@ def _stringify_char_position_value(raw_value: Any) -> str: return "" +_REVIEW_TEXT_PUNCT_TABLE = str.maketrans({ + ",": ",", "。": ".", ";": ";", ":": ":", + "!": "!", "?": "?", "(": "(", ")": ")", + "【": "[", "】": "]", "「": '"', "」": '"', + "‘": "'", "’": "'", "“": '"', "”": '"', +}) + + +def _normalize_review_match_text(text: str) -> str: + """统一比较文本,尽量消除 OCR/文档解析带来的空白和标点差异。""" + if not text: + return "" + normalized = re.sub(r"<[^>]+>", "", str(text)) + normalized = re.sub(r"\s+", "", normalized) + return normalized.translate(_REVIEW_TEXT_PUNCT_TABLE) + + +def _infer_page_num_from_page_texts(field_text: str, page_texts: list[tuple[int, str]]) -> int | None: + """根据字段文本在逐页正文中的命中情况推导 1-based 页码。""" + normalizedField = _normalize_review_match_text(field_text) + if not normalizedField: + return None + + bestPage: int | None = None + bestScore = 0.0 + fieldLength = len(normalizedField) + + for pageNum, pageText in page_texts: + normalizedPage = _normalize_review_match_text(pageText) + if not normalizedPage: + continue + if normalizedField in normalizedPage: + return pageNum + + if fieldLength < 8: + continue + + ratio = _partial_similarity(normalizedField, normalizedPage) + if ratio > bestScore: + bestScore = ratio + bestPage = pageNum + + if bestScore >= 0.92: + return bestPage + return None + + +def _partial_similarity(needle: str, haystack: str) -> float: + """简化版 partial similarity,避免详情页兜底依赖额外包。""" + if not needle or not haystack: + return 0.0 + if len(needle) > len(haystack): + needle, haystack = haystack, needle + + window = len(needle) + if window <= 0: + return 0.0 + if needle == haystack: + return 1.0 + + best = 0.0 + step = max(1, window // 6) + stop = max(len(haystack) - window + 1, 1) + for start in range(0, stop, step): + chunk = haystack[start : start + window] + if not chunk: + continue + matches = sum(1 for left, right in zip(needle, chunk) if left == right) + best = max(best, matches / window) + if best >= 0.999: + return 1.0 + + return best + + +def _extract_page_texts_from_pdf(path: Path) -> list[tuple[int, str]]: + """平台侧直接从 PDF 提取逐页文本,避免依赖 leaudit 内核。""" + import fitz + + doc = fitz.open(path) + try: + page_texts: list[tuple[int, str]] = [] + for index in range(len(doc)): + text = doc[index].get_text("text") or "" + if text.strip(): + page_texts.append((index + 1, text)) + return page_texts + finally: + doc.close() + + +def _extract_page_texts_from_docx(path: Path) -> list[tuple[int, str]]: + """DOCX 无稳定真实分页时,平台侧仅退化为整文第 1 页定位。""" + from docx import Document as DocxDocument + + doc = DocxDocument(str(path)) + parts: list[str] = [] + + for para in doc.paragraphs: + text = (para.text or "").strip() + if text: + parts.append(text) + + for table in doc.tables: + for row in table.rows: + cells = [cell.text.strip() for cell in row.cells if cell.text and cell.text.strip()] + if cells: + parts.append(" | ".join(cells)) + + text = "\n".join(parts).strip() + return [(1, text)] if text else [] + + def _format_display_datetime(value: Any) -> str: """格式化详情页展示时间。""" if isinstance(value, datetime):