fix: preserve review field page positions in platform

This commit is contained in:
wren
2026-05-06 16:29:39 +08:00
parent c4694e11f0
commit 0b76dce2a5
3 changed files with 623 additions and 8 deletions
@@ -2,6 +2,7 @@
from __future__ import annotations
import json
from typing import Any
from datetime import date as date_type, datetime
import hashlib
@@ -143,6 +144,7 @@ class DocumentServiceImpl(IDocumentService):
resolvedTypeId = int(typeRow["id"])
resolvedTypeCode = str(typeRow["code"])
resolvedGroupId = await self._resolveDocumentGroupId(Session, resolvedTypeId, GroupId)
resolvedRootGroupId = await self._resolveDocumentRootGroupId(Session, resolvedTypeId, resolvedGroupId)
duplicateUpload = False
previousVersionId: int | None = None
rootVersionId: int | None = None
@@ -154,6 +156,7 @@ class DocumentServiceImpl(IDocumentService):
latestCandidate = await _find_latest_version_candidate(
Session,
type_id=resolvedTypeId,
root_group_id=resolvedRootGroupId,
region=normalizedRegion,
normalized_name=normalizedName,
)
@@ -1547,6 +1550,50 @@ class DocumentServiceImpl(IDocumentService):
raise LeauditException(StatusCodeEnum.HTTP_400_BAD_REQUEST, "当前子类型不属于所选文档类型,无法上传")
return int(row["id"])
async def _resolveDocumentRootGroupId(self, Session, TypeId: int, GroupId: int | None) -> int | None:
"""解析上传命中的一级分组,用于跨二级类型做版本归档。"""
if GroupId is not None:
row = (
await Session.execute(
text(
"""
SELECT
CASE
WHEN COALESCE(pid, 0) = 0 THEN id
ELSE pid
END AS root_group_id
FROM leaudit_evaluation_point_groups
WHERE id = :group_id
AND deleted_at IS NULL
LIMIT 1
"""
),
{"group_id": GroupId},
)
).mappings().first()
if row and row["root_group_id"] is not None:
return int(row["root_group_id"])
row = (
await Session.execute(
text(
"""
SELECT
CASE WHEN COUNT(DISTINCT pid) = 1 THEN MIN(pid) END AS root_group_id
FROM leaudit_evaluation_point_groups
WHERE document_type_id = :doc_type_id
AND COALESCE(pid, 0) <> 0
AND deleted_at IS NULL
AND is_enabled = true
"""
),
{"doc_type_id": TypeId},
)
).mappings().first()
if row and row["root_group_id"] is not None:
return int(row["root_group_id"])
return None
async def _getDocumentDetail(
self,
Session,
@@ -2220,6 +2267,12 @@ class DocumentServiceImpl(IDocumentService):
{"run_id": RunId, "document_id": Detail.documentId},
)
).mappings().all()
rows = await self._backfillMissingReviewFieldPositions(
Session,
Detail.documentId,
RunId,
rows,
)
result: list[ReviewPointResultVO] = []
for row in rows:
@@ -2273,6 +2326,197 @@ class DocumentServiceImpl(IDocumentService):
)
return result
async def _backfillMissingReviewFieldPositions(
self,
Session,
DocumentId: int,
RunId: int,
rows: list[dict[str, Any]],
) -> list[dict[str, Any]]:
"""详情页兜底补齐缺失页码,避免长文本字段只能按文本定位。"""
missingFields: dict[str, str] = {}
touchedRuleRows: dict[int, dict[str, Any]] = {}
for rawRow in rows:
row = dict(rawRow)
extractedFields = row["extracted_fields"] if isinstance(row.get("extracted_fields"), dict) else {}
fieldPositions = dict(row["field_positions"]) if isinstance(row.get("field_positions"), dict) else {}
row["field_positions"] = fieldPositions
rowTouched = False
for fieldName, rawValue in extractedFields.items():
if _extract_review_page(fieldPositions.get(fieldName)) is not None:
continue
valueText = _stringify_char_position_value(_normalize_review_value(rawValue))
if not valueText.strip():
continue
existing = missingFields.get(str(fieldName))
if existing is None or len(valueText) > len(existing):
missingFields[str(fieldName)] = valueText
rowTouched = True
if rowTouched and row.get("id") is not None:
touchedRuleRows[int(row["id"])] = row
if not missingFields:
return rows
inferredPositions = await self._inferFieldPositionsFromDocumentSource(
Session,
DocumentId,
missingFields,
)
if not inferredPositions:
return rows
for ruleRowId, row in touchedRuleRows.items():
fieldPositions = row["field_positions"] if isinstance(row.get("field_positions"), dict) else {}
changed = False
extractedFields = row["extracted_fields"] if isinstance(row.get("extracted_fields"), dict) else {}
for fieldName in extractedFields.keys():
key = str(fieldName)
if _extract_review_page(fieldPositions.get(key)) is not None:
continue
inferred = inferredPositions.get(key)
if inferred is None:
continue
fieldPositions[key] = inferred
changed = True
if not changed:
continue
await Session.execute(
text(
"""
UPDATE leaudit_rule_results
SET field_positions = CAST(:field_positions AS JSONB),
updated_at = NOW()
WHERE id = :rule_result_id
"""
),
{
"rule_result_id": ruleRowId,
"field_positions": json.dumps(fieldPositions, ensure_ascii=False),
},
)
for fieldName, position in inferredPositions.items():
await Session.execute(
text(
"""
UPDATE leaudit_field_results
SET meta_json = jsonb_set(
COALESCE(meta_json, '{}'::jsonb),
'{position}',
CAST(:position AS JSONB),
true
),
updated_at = NOW()
WHERE run_id = :run_id
AND document_id = :document_id
AND field_name = :field_name
AND (
meta_json IS NULL
OR meta_json->'position' IS NULL
OR meta_json->'position' = 'null'::jsonb
)
"""
),
{
"run_id": RunId,
"document_id": DocumentId,
"field_name": fieldName,
"position": json.dumps(position, ensure_ascii=False),
},
)
await Session.commit()
return rows
async def _inferFieldPositionsFromDocumentSource(
self,
Session,
DocumentId: int,
FieldTexts: dict[str, str],
) -> dict[str, dict[str, Any]]:
"""按当前文档源文件逐页匹配字段文本,推导最小可用 pageNum。"""
fileRow = (
await Session.execute(
text(
"""
SELECT
id,
file_name,
file_ext,
local_path,
oss_url,
file_role
FROM leaudit_document_files
WHERE document_id = :document_id
AND is_active = true
ORDER BY
CASE file_role
WHEN 'converted_pdf' THEN 0
WHEN 'merged_pdf' THEN 1
WHEN 'primary' THEN 2
ELSE 9
END,
id DESC
LIMIT 1
"""
),
{"document_id": DocumentId},
)
).mappings().first()
if not fileRow:
return {}
fileName = str(fileRow["file_name"] or "")
fileExt = f".{str(fileRow['file_ext']).lstrip('.')}" if fileRow.get("file_ext") else Path(fileName).suffix
tempPath: str | None = None
localPath = str(fileRow["local_path"] or "").strip()
try:
if localPath and Path(localPath).is_file():
sourcePath = Path(localPath)
else:
ossUrl = str(fileRow["oss_url"] or "").strip()
if not ossUrl:
return {}
downloaded = await self.OssService.DownloadToTempFile(
ossUrl,
Suffix=fileExt or Path(fileName).suffix or "",
Prefix=f"review-page-{DocumentId}-",
)
tempPath = downloaded
sourcePath = Path(downloaded)
suffix = sourcePath.suffix.lower()
if suffix == ".pdf":
pageTexts = _extract_page_texts_from_pdf(sourcePath)
elif suffix == ".docx":
pageTexts = _extract_page_texts_from_docx(sourcePath)
else:
return {}
if not pageTexts:
return {}
inferred: dict[str, dict[str, Any]] = {}
for fieldName, fieldText in FieldTexts.items():
pageNum = _infer_page_num_from_page_texts(fieldText, pageTexts)
if pageNum is None:
continue
inferred[fieldName] = {"pageNum": pageNum, "matchMethod": "detail_page_fallback"}
return inferred
except Exception:
return {}
finally:
if tempPath:
try:
Path(tempPath).unlink(missing_ok=True)
except OSError:
pass
def _buildReviewPointStats(self, ReviewPoints: list[ReviewPointResultVO]) -> ReviewPointStatsVO:
"""按旧详情页口径汇总统计。"""
stats = ReviewPointStatsVO(total=len(ReviewPoints), success=0, warning=0, error=0, score=0)
@@ -2341,10 +2585,71 @@ async def _find_latest_version_candidate(
session,
*,
type_id: int,
root_group_id: int | None,
region: str,
normalized_name: str,
) -> dict | None:
"""Find the latest primary document version candidate by normalized name."""
"""Find the latest primary document version candidate by normalized name.
Preferred rule: same region + same root group + same normalized name.
Fallback rule: when a root group cannot be resolved, keep the old same-type behavior.
"""
if root_group_id is not None:
result = await session.execute(
text(
"""
SELECT
d.id AS document_id,
d.version_group_key,
d.version_no,
d.root_version_id,
f.id AS file_id,
f.sha256
FROM leaudit_documents d
JOIN leaudit_document_files f
ON f.document_id = d.id
AND f.is_active = true
AND f.file_role = 'primary'
LEFT JOIN leaudit_evaluation_point_groups eg
ON eg.id = d.group_id
LEFT JOIN (
SELECT
document_type_id,
CASE WHEN COUNT(DISTINCT pid) = 1 THEN MIN(pid) END AS inferred_root_group_id
FROM leaudit_evaluation_point_groups
WHERE COALESCE(pid, 0) <> 0
AND deleted_at IS NULL
AND is_enabled = true
AND document_type_id IS NOT NULL
GROUP BY document_type_id
) dg
ON dg.document_type_id = d.type_id
WHERE d.region = :region
AND d.normalized_name = :normalized_name
AND d.is_latest_version = true
AND d.deleted_at IS NULL
AND COALESCE(
CASE
WHEN eg.id IS NULL THEN NULL
WHEN COALESCE(eg.pid, 0) = 0 THEN eg.id
ELSE eg.pid
END,
dg.inferred_root_group_id
) = :root_group_id
ORDER BY d.version_no DESC, d.id DESC
LIMIT 1
"""
),
{
"root_group_id": root_group_id,
"region": region,
"normalized_name": normalized_name,
},
)
row = result.mappings().first()
if row:
return dict(row)
result = await session.execute(
text(
"""
@@ -2595,6 +2900,119 @@ def _stringify_char_position_value(raw_value: Any) -> str:
return ""
_REVIEW_TEXT_PUNCT_TABLE = str.maketrans({
"": ",", "": ".", "": ";", "": ":",
"": "!", "": "?", "": "(", "": ")",
"": "[", "": "]", "": '"', "": '"',
"": "'", "": "'", "": '"', "": '"',
})
def _normalize_review_match_text(text: str) -> str:
"""统一比较文本,尽量消除 OCR/文档解析带来的空白和标点差异。"""
if not text:
return ""
normalized = re.sub(r"<[^>]+>", "", str(text))
normalized = re.sub(r"\s+", "", normalized)
return normalized.translate(_REVIEW_TEXT_PUNCT_TABLE)
def _infer_page_num_from_page_texts(field_text: str, page_texts: list[tuple[int, str]]) -> int | None:
"""根据字段文本在逐页正文中的命中情况推导 1-based 页码。"""
normalizedField = _normalize_review_match_text(field_text)
if not normalizedField:
return None
bestPage: int | None = None
bestScore = 0.0
fieldLength = len(normalizedField)
for pageNum, pageText in page_texts:
normalizedPage = _normalize_review_match_text(pageText)
if not normalizedPage:
continue
if normalizedField in normalizedPage:
return pageNum
if fieldLength < 8:
continue
ratio = _partial_similarity(normalizedField, normalizedPage)
if ratio > bestScore:
bestScore = ratio
bestPage = pageNum
if bestScore >= 0.92:
return bestPage
return None
def _partial_similarity(needle: str, haystack: str) -> float:
"""简化版 partial similarity,避免详情页兜底依赖额外包。"""
if not needle or not haystack:
return 0.0
if len(needle) > len(haystack):
needle, haystack = haystack, needle
window = len(needle)
if window <= 0:
return 0.0
if needle == haystack:
return 1.0
best = 0.0
step = max(1, window // 6)
stop = max(len(haystack) - window + 1, 1)
for start in range(0, stop, step):
chunk = haystack[start : start + window]
if not chunk:
continue
matches = sum(1 for left, right in zip(needle, chunk) if left == right)
best = max(best, matches / window)
if best >= 0.999:
return 1.0
return best
def _extract_page_texts_from_pdf(path: Path) -> list[tuple[int, str]]:
"""平台侧直接从 PDF 提取逐页文本,避免依赖 leaudit 内核。"""
import fitz
doc = fitz.open(path)
try:
page_texts: list[tuple[int, str]] = []
for index in range(len(doc)):
text = doc[index].get_text("text") or ""
if text.strip():
page_texts.append((index + 1, text))
return page_texts
finally:
doc.close()
def _extract_page_texts_from_docx(path: Path) -> list[tuple[int, str]]:
"""DOCX 无稳定真实分页时,平台侧仅退化为整文第 1 页定位。"""
from docx import Document as DocxDocument
doc = DocxDocument(str(path))
parts: list[str] = []
for para in doc.paragraphs:
text = (para.text or "").strip()
if text:
parts.append(text)
for table in doc.tables:
for row in table.rows:
cells = [cell.text.strip() for cell in row.cells if cell.text and cell.text.strip()]
if cells:
parts.append(" | ".join(cells))
text = "\n".join(parts).strip()
return [(1, text)] if text else []
def _format_display_datetime(value: Any) -> str:
"""格式化详情页展示时间。"""
if isinstance(value, datetime):