3161 lines
130 KiB
Python
3161 lines
130 KiB
Python
"""文档服务实现。"""
|
||
|
||
from __future__ import annotations
|
||
|
||
import json
|
||
from typing import Any
|
||
from datetime import date as date_type, datetime
|
||
import hashlib
|
||
import mimetypes
|
||
import re
|
||
import time
|
||
import unicodedata
|
||
import uuid
|
||
from pathlib import Path
|
||
|
||
from sqlalchemy import text
|
||
|
||
from fastapi_common.fastapi_common_sqlalchemy.database import GetAsyncSession
|
||
from fastapi_common.fastapi_common_web.domain.responses import StatusCodeEnum
|
||
from fastapi_common.fastapi_common_web.exception.LeauditException import LeauditException
|
||
from fastapi_common.fastapi_common_storage.oss_path_utils import OssPathUtils
|
||
|
||
from fastapi_modules.fastapi_leaudit.domian.vo.documentVo import (
|
||
DocumentAttachmentVO,
|
||
DocumentDetailVO,
|
||
DocumentHistoryVersionVO,
|
||
DocumentListItemVO,
|
||
DocumentListPageVO,
|
||
DocumentStatusItemVO,
|
||
DocumentTypeRootCreateDTO,
|
||
DocumentTypeRootItemVO,
|
||
DocumentTypeRootUpdateDTO,
|
||
DocumentUpdateDTO,
|
||
DocumentTypeCreateDTO,
|
||
DocumentTypeItemVO,
|
||
DocumentTypeUpdateDTO,
|
||
DocumentUploadVO,
|
||
)
|
||
from fastapi_modules.fastapi_leaudit.domian.vo.reviewPointVo import (
|
||
DocumentConfirmVO,
|
||
ReviewPointAuditVO,
|
||
ReviewPointInfoVO,
|
||
ReviewPointResultVO,
|
||
ReviewPointStatsVO,
|
||
ReviewPointsAggregateVO,
|
||
)
|
||
from fastapi_modules.fastapi_leaudit.models import LeauditDocument, LeauditDocumentFile
|
||
from fastapi_modules.fastapi_leaudit.services import IAuditService, IDocumentService, IOssService
|
||
from fastapi_modules.fastapi_leaudit.services.impl.auditServiceImpl import AuditServiceImpl
|
||
from fastapi_modules.fastapi_leaudit.services.impl.ossServiceImpl import OssServiceImpl
|
||
from fastapi_modules.fastapi_leaudit.services.impl.ruleGroupSupport import sync_group_bindings_from_doc_type
|
||
|
||
|
||
class DocumentServiceImpl(IDocumentService):
|
||
"""文档服务实现。"""
|
||
|
||
def __init__(
|
||
self,
|
||
OssService: IOssService | None = None,
|
||
AuditService: IAuditService | None = None,
|
||
) -> None:
|
||
self.OssService = OssService or OssServiceImpl()
|
||
self.AuditService = AuditService or AuditServiceImpl()
|
||
|
||
async def Upload(
|
||
self,
|
||
FileName: str,
|
||
FileContent: bytes,
|
||
ContentType: str | None,
|
||
TypeId: int | None = None,
|
||
TypeCode: str | None = None,
|
||
GroupId: int | None = None,
|
||
Region: str = "default",
|
||
FileRole: str = "primary",
|
||
CreatedBy: int | None = None,
|
||
Attachments: list[tuple[str, bytes, str | None]] | None = None,
|
||
AutoRun: bool = False,
|
||
Speed: str = "normal",
|
||
) -> DocumentUploadVO:
|
||
"""上传文档并建立 LeAudit document/file 记录。"""
|
||
if not FileName:
|
||
raise LeauditException(StatusCodeEnum.HTTP_400_BAD_REQUEST, "上传文件名不能为空")
|
||
if not FileContent:
|
||
raise LeauditException(StatusCodeEnum.HTTP_400_BAD_REQUEST, "上传文件内容不能为空")
|
||
if not TypeId and not TypeCode:
|
||
raise LeauditException(StatusCodeEnum.HTTP_400_BAD_REQUEST, "typeId 与 typeCode 至少传一个")
|
||
|
||
normalizedRegion = (Region or "default").strip() or "default"
|
||
normalizedFileRole = (FileRole or "primary").strip() or "primary"
|
||
fileExt = Path(FileName).suffix.lstrip(".").lower() or None
|
||
mimeType = ContentType or mimetypes.guess_type(FileName)[0] or "application/octet-stream"
|
||
fileSha256 = hashlib.sha256(FileContent).hexdigest()
|
||
fileSize = len(FileContent)
|
||
normalizedSpeed = _normalize_speed(Speed)
|
||
normalizedName = _normalize_document_name(FileName)
|
||
uploadedAt = datetime.now()
|
||
|
||
async with GetAsyncSession() as Session:
|
||
await self._ensureDocumentGroupColumn(Session)
|
||
if TypeId is not None and TypeCode is not None:
|
||
typeResult = await Session.execute(
|
||
text(
|
||
"""
|
||
SELECT id, code
|
||
FROM leaudit_document_types
|
||
WHERE id = :type_id
|
||
AND code = :type_code
|
||
AND deleted_at IS NULL
|
||
LIMIT 1
|
||
"""
|
||
),
|
||
{"type_id": TypeId, "type_code": TypeCode},
|
||
)
|
||
elif TypeId is not None:
|
||
typeResult = await Session.execute(
|
||
text(
|
||
"""
|
||
SELECT id, code
|
||
FROM leaudit_document_types
|
||
WHERE id = :type_id
|
||
AND deleted_at IS NULL
|
||
LIMIT 1
|
||
"""
|
||
),
|
||
{"type_id": TypeId},
|
||
)
|
||
else:
|
||
typeResult = await Session.execute(
|
||
text(
|
||
"""
|
||
SELECT id, code
|
||
FROM leaudit_document_types
|
||
WHERE code = :type_code
|
||
AND deleted_at IS NULL
|
||
LIMIT 1
|
||
"""
|
||
),
|
||
{"type_code": TypeCode},
|
||
)
|
||
typeRow = typeResult.mappings().first()
|
||
if not typeRow:
|
||
raise LeauditException(StatusCodeEnum.HTTP_404_NOT_FOUND, "文档类型不存在或已停用")
|
||
|
||
resolvedTypeId = int(typeRow["id"])
|
||
resolvedTypeCode = str(typeRow["code"])
|
||
resolvedGroupId = await self._resolveDocumentGroupId(Session, resolvedTypeId, GroupId)
|
||
resolvedRootGroupId = await self._resolveDocumentRootGroupId(Session, resolvedTypeId, resolvedGroupId)
|
||
duplicateUpload = False
|
||
previousVersionId: int | None = None
|
||
rootVersionId: int | None = None
|
||
versionGroupKey: str | None = None
|
||
versionNo = 1
|
||
|
||
latestCandidate = None
|
||
if normalizedFileRole == "primary":
|
||
latestCandidate = await _find_latest_version_candidate(
|
||
Session,
|
||
type_id=resolvedTypeId,
|
||
root_group_id=resolvedRootGroupId,
|
||
region=normalizedRegion,
|
||
normalized_name=normalizedName,
|
||
)
|
||
|
||
internalDocumentNo = time.time_ns()
|
||
if latestCandidate:
|
||
previousVersionId = int(latestCandidate["document_id"])
|
||
rootVersionId = int(latestCandidate["root_version_id"] or latestCandidate["document_id"])
|
||
versionGroupKey = str(latestCandidate["version_group_key"])
|
||
versionNo = int(latestCandidate["version_no"]) + 1
|
||
previousDocument = await Session.get(LeauditDocument, previousVersionId)
|
||
if previousDocument is not None:
|
||
previousDocument.isLatestVersion = False
|
||
else:
|
||
versionGroupKey = uuid.uuid4().hex
|
||
|
||
document = await LeauditDocument.create_new(
|
||
Session,
|
||
bizDocumentId=internalDocumentNo,
|
||
typeId=resolvedTypeId,
|
||
groupId=resolvedGroupId,
|
||
region=normalizedRegion,
|
||
processingStatus="waiting",
|
||
versionGroupKey=versionGroupKey,
|
||
versionNo=versionNo,
|
||
previousVersionId=previousVersionId,
|
||
rootVersionId=rootVersionId,
|
||
isLatestVersion=True,
|
||
normalizedName=normalizedName,
|
||
)
|
||
if document.rootVersionId is None:
|
||
document.rootVersionId = document.Id
|
||
rootVersionId = document.Id
|
||
else:
|
||
rootVersionId = document.rootVersionId
|
||
|
||
versionLabel = f"v{document.versionNo}"
|
||
objectKey = OssPathUtils.BuildBusinessDocKey(
|
||
Region=normalizedRegion,
|
||
TypeCode=resolvedTypeCode,
|
||
DocumentId=document.Id,
|
||
Version=versionLabel,
|
||
FileRole=normalizedFileRole,
|
||
FileName=FileName,
|
||
Year=uploadedAt.year,
|
||
Month=uploadedAt.month,
|
||
)
|
||
ossUrl = await self.OssService.UploadBytes(
|
||
ObjectKey=objectKey,
|
||
Content=FileContent,
|
||
ContentType=mimeType,
|
||
)
|
||
|
||
versionCount = await LeauditDocumentFile.count_by_document(Session, document.Id)
|
||
_ = versionCount # single-version-per-document in current model; kept for future extension
|
||
await LeauditDocumentFile.deactivate_active_by_document(Session, document.Id)
|
||
documentFile = LeauditDocumentFile(
|
||
documentId=document.Id,
|
||
fileRole=normalizedFileRole,
|
||
fileName=FileName,
|
||
fileExt=fileExt,
|
||
mimeType=mimeType,
|
||
fileSize=fileSize,
|
||
sha256=fileSha256,
|
||
localPath=None,
|
||
ossUrl=ossUrl,
|
||
storageProvider="minio",
|
||
isActive=True,
|
||
createdBy=CreatedBy,
|
||
)
|
||
Session.add(documentFile)
|
||
await Session.flush()
|
||
await Session.commit()
|
||
await Session.refresh(document)
|
||
await Session.refresh(documentFile)
|
||
|
||
if normalizedFileRole == "primary" and Attachments:
|
||
await self._appendAttachmentFiles(
|
||
Session=Session,
|
||
DocumentId=document.Id,
|
||
TypeCode=resolvedTypeCode,
|
||
Region=normalizedRegion,
|
||
VersionNo=int(document.versionNo or 1),
|
||
Files=Attachments,
|
||
CreatedBy=CreatedBy,
|
||
)
|
||
|
||
ossUrl = documentFile.ossUrl or ""
|
||
|
||
run = None
|
||
processingStatus = document.processingStatus or "waiting"
|
||
if AutoRun:
|
||
run = await self.AuditService.Run(
|
||
DocumentId=document.Id,
|
||
Speed=Speed,
|
||
Force=duplicateUpload,
|
||
TriggerUserId=CreatedBy,
|
||
)
|
||
processingStatus = "running" if run.status in {"pending", "running"} else run.status
|
||
|
||
return DocumentUploadVO(
|
||
documentId=document.Id,
|
||
internalDocumentNo=document.bizDocumentId,
|
||
versionGroupKey=document.versionGroupKey or "",
|
||
versionNo=int(document.versionNo or 1),
|
||
previousVersionId=document.previousVersionId,
|
||
rootVersionId=int(document.rootVersionId or document.Id),
|
||
duplicateUpload=duplicateUpload,
|
||
fileId=documentFile.Id,
|
||
typeId=resolvedTypeId,
|
||
typeCode=resolvedTypeCode,
|
||
groupId=resolvedGroupId,
|
||
region=normalizedRegion,
|
||
fileName=documentFile.fileName,
|
||
ossUrl=ossUrl,
|
||
speed=normalizedSpeed,
|
||
processingStatus=processingStatus,
|
||
autoRunTriggered=AutoRun,
|
||
run=run,
|
||
)
|
||
|
||
async def ListDocuments(
|
||
self,
|
||
CurrentUserId: int,
|
||
Page: int = 1,
|
||
PageSize: int = 20,
|
||
Keyword: str | None = None,
|
||
DocumentNumber: str | None = None,
|
||
TypeCode: str | None = None,
|
||
TypeIds: list[int] | None = None,
|
||
EntryModuleId: int | None = None,
|
||
Region: str | None = None,
|
||
ProcessingStatus: str | None = None,
|
||
ResultStatus: str | None = None,
|
||
AuditStatus: int | None = None,
|
||
UserId: int | None = None,
|
||
DateFrom: str | None = None,
|
||
DateTo: str | None = None,
|
||
) -> DocumentListPageVO:
|
||
"""获取文档列表(仅最新版本,附历史版本摘要)。"""
|
||
page = max(1, int(Page))
|
||
page_size = max(1, min(int(PageSize), 100))
|
||
offset = (page - 1) * page_size
|
||
|
||
async with GetAsyncSession() as Session:
|
||
await self._ensureDocumentGroupColumn(Session)
|
||
documentColumns = await self._loadDocumentColumns(Session)
|
||
|
||
currentUser = await self._getCurrentUserContext(CurrentUserId)
|
||
filters = ["d.is_latest_version = true", "d.deleted_at IS NULL", "f.is_active = true", "f.file_role = 'primary'"]
|
||
params: dict[str, object] = {"limit": page_size, "offset": offset}
|
||
filters.extend(
|
||
self._buildDocumentScopeFilters(
|
||
CurrentUserId=CurrentUserId,
|
||
CurrentUser=currentUser,
|
||
Params=params,
|
||
RequestedRegion=Region,
|
||
RequestedUserId=UserId,
|
||
DocumentAlias="d",
|
||
FileAlias="f",
|
||
)
|
||
)
|
||
|
||
if Keyword:
|
||
if "document_number" in documentColumns:
|
||
filters.append("(f.file_name ILIKE :keyword OR d.normalized_name ILIKE :keyword OR d.document_number ILIKE :keyword)")
|
||
else:
|
||
filters.append("(f.file_name ILIKE :keyword OR d.normalized_name ILIKE :keyword)")
|
||
params["keyword"] = f"%{Keyword.strip()}%"
|
||
if DocumentNumber and "document_number" in documentColumns:
|
||
filters.append("d.document_number ILIKE :document_number")
|
||
params["document_number"] = f"%{DocumentNumber.strip()}%"
|
||
if TypeCode:
|
||
filters.append("dt.code = :type_code")
|
||
params["type_code"] = TypeCode.strip()
|
||
if TypeIds:
|
||
normalizedTypeIds = [int(typeId) for typeId in TypeIds if int(typeId) > 0]
|
||
if normalizedTypeIds:
|
||
filters.append("d.type_id = ANY(:type_ids)")
|
||
params["type_ids"] = normalizedTypeIds
|
||
if EntryModuleId is not None and int(EntryModuleId) > 0:
|
||
filters.append(
|
||
"COALESCE(eg.entry_module_id, eg_parent.entry_module_id, dt.entry_module_id) = :entry_module_id"
|
||
)
|
||
params["entry_module_id"] = int(EntryModuleId)
|
||
if ProcessingStatus:
|
||
filters.append("d.processing_status = :processing_status")
|
||
params["processing_status"] = ProcessingStatus.strip()
|
||
if ResultStatus:
|
||
filters.append("ar.result_status = :result_status")
|
||
params["result_status"] = ResultStatus.strip()
|
||
if DateFrom:
|
||
filters.append("d.created_at >= :date_from")
|
||
params["date_from"] = date_type.fromisoformat(DateFrom.strip())
|
||
if DateTo:
|
||
filters.append("d.created_at < (CAST(:date_to AS date) + INTERVAL '1 day')")
|
||
params["date_to"] = date_type.fromisoformat(DateTo.strip())
|
||
|
||
where_clause = " AND ".join(filters)
|
||
|
||
derivedAuditStatusExpr = """
|
||
CASE
|
||
WHEN LOWER(COALESCE(ar.status, '')) IN ('queued', 'running')
|
||
OR LOWER(COALESCE(d.processing_status, '')) = 'running' THEN 2
|
||
WHEN LOWER(COALESCE(d.processing_status, '')) IN ('waiting', 'pending') THEN 0
|
||
WHEN LOWER(COALESCE(d.processing_status, '')) = 'failed' THEN -1
|
||
WHEN COALESCE(ar.failed_count, 0) > 0 THEN -1
|
||
WHEN COALESCE(ar.passed_count, 0) > 0 THEN 1
|
||
ELSE 0
|
||
END
|
||
""".strip()
|
||
persistedOrDerivedAuditStatusExpr = (
|
||
f"COALESCE(d.audit_status, {derivedAuditStatusExpr})"
|
||
if "audit_status" in documentColumns
|
||
else derivedAuditStatusExpr
|
||
)
|
||
|
||
if AuditStatus is not None:
|
||
filters.append(f"{persistedOrDerivedAuditStatusExpr} = :audit_status")
|
||
params["audit_status"] = int(AuditStatus)
|
||
where_clause = " AND ".join(filters)
|
||
|
||
groupIdSelectExpr = "d.group_id" if "group_id" in documentColumns else "NULL::bigint"
|
||
# Transitional display fallback: if the document itself has not yet persisted a child group,
|
||
# but its document type currently maps to exactly one active child group, surface it in list/detail UI.
|
||
resolvedGroupIdExpr = f"COALESCE({groupIdSelectExpr}, dg.inferred_group_id)"
|
||
resolvedGroupNameExpr = (
|
||
f"CASE WHEN {groupIdSelectExpr} IS NOT NULL THEN eg.name ELSE dg.inferred_group_name END"
|
||
)
|
||
|
||
optionalSelects = [
|
||
f"{resolvedGroupIdExpr} AS group_id",
|
||
"d.document_number AS document_number" if "document_number" in documentColumns else "NULL::text AS document_number",
|
||
f"{persistedOrDerivedAuditStatusExpr} AS audit_status",
|
||
"d.is_test_document AS is_test_document" if "is_test_document" in documentColumns else "FALSE AS is_test_document",
|
||
"dt.name AS type_name",
|
||
f"{resolvedGroupNameExpr} AS group_name",
|
||
]
|
||
|
||
count_sql = text(
|
||
f"""
|
||
SELECT COUNT(*)
|
||
FROM leaudit_documents d
|
||
JOIN leaudit_document_files f
|
||
ON f.document_id = d.id
|
||
LEFT JOIN leaudit_document_types dt
|
||
ON dt.id = d.type_id
|
||
LEFT JOIN leaudit_evaluation_point_groups eg
|
||
ON eg.id = d.group_id
|
||
LEFT JOIN leaudit_evaluation_point_groups eg_parent
|
||
ON eg_parent.id = eg.pid
|
||
LEFT JOIN (
|
||
SELECT
|
||
document_type_id,
|
||
CASE WHEN COUNT(*) = 1 THEN MIN(id) END AS inferred_group_id,
|
||
CASE WHEN COUNT(*) = 1 THEN MIN(name) END AS inferred_group_name
|
||
FROM leaudit_evaluation_point_groups
|
||
WHERE pid <> 0
|
||
AND deleted_at IS NULL
|
||
AND is_enabled = true
|
||
AND document_type_id IS NOT NULL
|
||
GROUP BY document_type_id
|
||
) dg
|
||
ON dg.document_type_id = d.type_id
|
||
LEFT JOIN leaudit_audit_runs ar
|
||
ON ar.id = d.current_run_id
|
||
WHERE {where_clause}
|
||
"""
|
||
)
|
||
|
||
list_sql = text(
|
||
f"""
|
||
SELECT
|
||
d.id AS document_id,
|
||
d.biz_document_id AS internal_document_no,
|
||
d.version_group_key,
|
||
d.version_no,
|
||
d.root_version_id,
|
||
d.previous_version_id,
|
||
d.type_id,
|
||
dt.code AS type_code,
|
||
d.region,
|
||
d.normalized_name,
|
||
d.processing_status,
|
||
d.current_run_id,
|
||
d.updated_at,
|
||
f.id AS file_id,
|
||
f.file_name,
|
||
f.file_ext,
|
||
f.mime_type,
|
||
f.file_size,
|
||
f.oss_url,
|
||
ar.status AS run_status,
|
||
ar.result_status,
|
||
ar.total_score,
|
||
ar.passed_count,
|
||
ar.failed_count,
|
||
ar.skipped_count,
|
||
{', '.join(optionalSelects)},
|
||
vc.total_versions,
|
||
COALESCE(vc.total_versions, 1) > 1 AS has_history
|
||
FROM leaudit_documents d
|
||
JOIN leaudit_document_files f
|
||
ON f.document_id = d.id
|
||
LEFT JOIN leaudit_document_types dt
|
||
ON dt.id = d.type_id
|
||
LEFT JOIN leaudit_evaluation_point_groups eg
|
||
ON eg.id = d.group_id
|
||
LEFT JOIN leaudit_evaluation_point_groups eg_parent
|
||
ON eg_parent.id = eg.pid
|
||
LEFT JOIN (
|
||
SELECT
|
||
document_type_id,
|
||
CASE WHEN COUNT(*) = 1 THEN MIN(id) END AS inferred_group_id,
|
||
CASE WHEN COUNT(*) = 1 THEN MIN(name) END AS inferred_group_name
|
||
FROM leaudit_evaluation_point_groups
|
||
WHERE pid <> 0
|
||
AND deleted_at IS NULL
|
||
AND is_enabled = true
|
||
AND document_type_id IS NOT NULL
|
||
GROUP BY document_type_id
|
||
) dg
|
||
ON dg.document_type_id = d.type_id
|
||
LEFT JOIN leaudit_audit_runs ar
|
||
ON ar.id = d.current_run_id
|
||
LEFT JOIN (
|
||
SELECT version_group_key, COUNT(*) AS total_versions
|
||
FROM leaudit_documents
|
||
WHERE deleted_at IS NULL
|
||
GROUP BY version_group_key
|
||
) vc
|
||
ON vc.version_group_key = d.version_group_key
|
||
WHERE {where_clause}
|
||
ORDER BY d.updated_at DESC, d.id DESC
|
||
LIMIT :limit OFFSET :offset
|
||
"""
|
||
)
|
||
|
||
history_sql = text(
|
||
"""
|
||
SELECT
|
||
d.version_group_key,
|
||
d.id AS document_id,
|
||
d.version_no,
|
||
d.processing_status,
|
||
d.updated_at,
|
||
f.id AS file_id,
|
||
f.file_name,
|
||
f.file_ext,
|
||
f.file_size,
|
||
ar.status AS run_status,
|
||
ar.result_status,
|
||
ar.total_score,
|
||
ar.passed_count,
|
||
ar.failed_count,
|
||
ar.skipped_count
|
||
FROM leaudit_documents d
|
||
JOIN leaudit_document_files f
|
||
ON f.document_id = d.id
|
||
AND f.is_active = true
|
||
AND f.file_role = 'primary'
|
||
LEFT JOIN leaudit_audit_runs ar
|
||
ON ar.id = d.current_run_id
|
||
WHERE d.version_group_key = ANY(:group_keys)
|
||
AND d.is_latest_version = false
|
||
AND d.deleted_at IS NULL
|
||
ORDER BY d.version_group_key, d.version_no DESC, d.id DESC
|
||
"""
|
||
)
|
||
|
||
async with GetAsyncSession() as Session:
|
||
total = int((await Session.execute(count_sql, params)).scalar_one())
|
||
rows = (await Session.execute(list_sql, params)).mappings().all()
|
||
|
||
history_by_group: dict[str, list[DocumentHistoryVersionVO]] = {}
|
||
group_keys = [str(row["version_group_key"]) for row in rows if row["version_group_key"]]
|
||
if group_keys:
|
||
history_rows = (
|
||
await Session.execute(history_sql, {"group_keys": group_keys})
|
||
).mappings().all()
|
||
for row in history_rows:
|
||
history_by_group.setdefault(str(row["version_group_key"]), []).append(
|
||
DocumentHistoryVersionVO(
|
||
documentId=int(row["document_id"]),
|
||
fileId=int(row["file_id"]) if row["file_id"] is not None else None,
|
||
versionNo=int(row["version_no"]),
|
||
fileName=row["file_name"],
|
||
fileExt=row["file_ext"],
|
||
fileSize=int(row["file_size"]) if row["file_size"] is not None else None,
|
||
processingStatus=row["processing_status"],
|
||
runStatus=row["run_status"],
|
||
resultStatus=row["result_status"],
|
||
totalScore=int(row["total_score"]) if row["total_score"] is not None else None,
|
||
passedCount=int(row["passed_count"]) if row["passed_count"] is not None else None,
|
||
failedCount=int(row["failed_count"]) if row["failed_count"] is not None else None,
|
||
skippedCount=int(row["skipped_count"]) if row["skipped_count"] is not None else None,
|
||
updatedAt=row["updated_at"].isoformat() if row["updated_at"] else None,
|
||
)
|
||
)
|
||
|
||
documents: list[DocumentListItemVO] = []
|
||
for row in rows:
|
||
group_key = str(row["version_group_key"] or "")
|
||
documents.append(
|
||
DocumentListItemVO(
|
||
documentId=int(row["document_id"]),
|
||
internalDocumentNo=int(row["internal_document_no"]),
|
||
versionGroupKey=group_key,
|
||
versionNo=int(row["version_no"] or 1),
|
||
rootVersionId=int(row["root_version_id"] or row["document_id"]),
|
||
previousVersionId=int(row["previous_version_id"]) if row["previous_version_id"] is not None else None,
|
||
typeId=int(row["type_id"]) if row["type_id"] is not None else None,
|
||
typeCode=row["type_code"],
|
||
typeName=row["type_name"],
|
||
groupId=int(row["group_id"]) if row["group_id"] is not None else None,
|
||
groupName=row["group_name"],
|
||
region=row["region"],
|
||
normalizedName=row["normalized_name"],
|
||
fileId=int(row["file_id"]) if row["file_id"] is not None else None,
|
||
fileName=row["file_name"],
|
||
fileExt=row["file_ext"],
|
||
mimeType=row["mime_type"],
|
||
fileSize=int(row["file_size"]) if row["file_size"] is not None else None,
|
||
ossUrl=row["oss_url"],
|
||
processingStatus=row["processing_status"],
|
||
currentRunId=int(row["current_run_id"]) if row["current_run_id"] is not None else None,
|
||
runStatus=row["run_status"],
|
||
resultStatus=row["result_status"],
|
||
totalScore=float(row["total_score"]) if row["total_score"] is not None else None,
|
||
passedCount=int(row["passed_count"]) if row["passed_count"] is not None else None,
|
||
failedCount=int(row["failed_count"]) if row["failed_count"] is not None else None,
|
||
skippedCount=int(row["skipped_count"]) if row["skipped_count"] is not None else None,
|
||
documentNumber=row["document_number"],
|
||
auditStatus=int(row["audit_status"]) if row["audit_status"] is not None else None,
|
||
isTestDocument=bool(row["is_test_document"]),
|
||
updatedAt=row["updated_at"].isoformat() if row["updated_at"] else None,
|
||
hasHistory=bool(row["has_history"]),
|
||
totalVersions=int(row["total_versions"] or 1),
|
||
historyVersions=history_by_group.get(group_key, []),
|
||
)
|
||
)
|
||
|
||
total_pages = (total + page_size - 1) // page_size if total else 0
|
||
return DocumentListPageVO(
|
||
total=total,
|
||
page=page,
|
||
pageSize=page_size,
|
||
totalPages=total_pages,
|
||
documents=documents,
|
||
)
|
||
|
||
async def GetDocument(self, CurrentUserId: int, Id: int) -> DocumentDetailVO:
|
||
"""获取单个文档详情,并执行数据隔离校验。"""
|
||
async with GetAsyncSession() as Session:
|
||
await self._ensureDocumentGroupColumn(Session)
|
||
currentUser = await self._getCurrentUserContext(CurrentUserId)
|
||
documentColumns = await self._loadDocumentColumns(Session)
|
||
detail = await self._getDocumentDetail(Session, Id, CurrentUserId, currentUser, documentColumns)
|
||
if not detail:
|
||
raise LeauditException(StatusCodeEnum.HTTP_404_NOT_FOUND, "文档不存在或无权访问")
|
||
return detail
|
||
|
||
async def GetReviewPoints(self, CurrentUserId: int, DocumentId: int) -> ReviewPointsAggregateVO:
|
||
"""获取评查详情页聚合数据。"""
|
||
async with GetAsyncSession() as Session:
|
||
await self._ensureDocumentGroupColumn(Session)
|
||
await self._ensureReviewPointAuditTable(Session)
|
||
currentUser = await self._getCurrentUserContext(CurrentUserId)
|
||
documentColumns = await self._loadDocumentColumns(Session)
|
||
detail = await self._getDocumentDetail(Session, DocumentId, CurrentUserId, currentUser, documentColumns)
|
||
if not detail and await self._hasCrossReviewDocumentAccess(Session, DocumentId, CurrentUserId):
|
||
detail = await self._getDocumentDetail(
|
||
Session,
|
||
DocumentId,
|
||
CurrentUserId,
|
||
currentUser,
|
||
documentColumns,
|
||
BypassScopeCheck=True,
|
||
)
|
||
if not detail:
|
||
raise LeauditException(StatusCodeEnum.HTTP_404_NOT_FOUND, "文档不存在或无权访问")
|
||
|
||
runRow = await self._loadReviewRunRow(Session, detail)
|
||
reviewPoints: list[ReviewPointResultVO] = []
|
||
stats = ReviewPointStatsVO()
|
||
|
||
if runRow:
|
||
reviewPoints = await self._loadReviewPointResults(Session, detail, int(runRow["id"]))
|
||
stats = self._buildReviewPointStats(reviewPoints)
|
||
|
||
documentPayload = await self._buildReviewDocumentPayload(Session, detail, runRow)
|
||
reviewInfo = self._buildReviewInfo(runRow, reviewPoints, stats)
|
||
comparisonDocument = await self._loadComparisonDocument(Session, detail.documentId)
|
||
scoringProposals = await self._loadScoringProposals(Session, detail.documentId)
|
||
|
||
return ReviewPointsAggregateVO(
|
||
data=reviewPoints,
|
||
stats=stats,
|
||
reviewInfo=reviewInfo,
|
||
document=documentPayload,
|
||
comparison_document=comparisonDocument,
|
||
scoring_proposals=scoringProposals,
|
||
)
|
||
|
||
async def AuditReviewPoint(
|
||
self,
|
||
CurrentUserId: int,
|
||
ReviewPointResultId: int,
|
||
Result: str,
|
||
Message: str,
|
||
) -> ReviewPointAuditVO:
|
||
"""更新单个评查点的人工审核状态。"""
|
||
normalizedResult = (Result or "").strip().lower()
|
||
if normalizedResult not in {"true", "false", "review"}:
|
||
raise LeauditException(StatusCodeEnum.HTTP_400_BAD_REQUEST, "result 仅支持 true、false、review")
|
||
|
||
async with GetAsyncSession() as Session:
|
||
await self._ensureDocumentGroupColumn(Session)
|
||
await self._ensureReviewPointAuditTable(Session)
|
||
|
||
row = (
|
||
await Session.execute(
|
||
text(
|
||
"""
|
||
SELECT document_id
|
||
FROM leaudit_rule_results
|
||
WHERE id = :result_id
|
||
LIMIT 1
|
||
"""
|
||
),
|
||
{"result_id": ReviewPointResultId},
|
||
)
|
||
).mappings().first()
|
||
if not row:
|
||
raise LeauditException(StatusCodeEnum.HTTP_404_NOT_FOUND, "评查点结果不存在")
|
||
|
||
documentId = int(row["document_id"])
|
||
currentUser = await self._getCurrentUserContext(CurrentUserId)
|
||
documentColumns = await self._loadDocumentColumns(Session)
|
||
detail = await self._getDocumentDetail(Session, documentId, CurrentUserId, currentUser, documentColumns)
|
||
if not detail and await self._hasCrossReviewDocumentAccess(Session, documentId, CurrentUserId):
|
||
detail = await self._getDocumentDetail(
|
||
Session,
|
||
documentId,
|
||
CurrentUserId,
|
||
currentUser,
|
||
documentColumns,
|
||
BypassScopeCheck=True,
|
||
)
|
||
if not detail:
|
||
raise LeauditException(StatusCodeEnum.HTTP_404_NOT_FOUND, "文档不存在或无权访问")
|
||
|
||
payload = {
|
||
"rule_result_id": ReviewPointResultId,
|
||
"document_id": documentId,
|
||
"edit_audit_status": 0 if normalizedResult == "review" else 1,
|
||
"override_result": None if normalizedResult == "review" else (normalizedResult == "true"),
|
||
"message": "" if normalizedResult == "review" else (Message or ""),
|
||
"reviewed_by": CurrentUserId,
|
||
}
|
||
auditRow = (
|
||
await Session.execute(
|
||
text(
|
||
"""
|
||
INSERT INTO leaudit_review_point_audits (
|
||
rule_result_id,
|
||
document_id,
|
||
edit_audit_status,
|
||
override_result,
|
||
message,
|
||
reviewed_by,
|
||
created_at,
|
||
updated_at
|
||
) VALUES (
|
||
:rule_result_id,
|
||
:document_id,
|
||
:edit_audit_status,
|
||
:override_result,
|
||
:message,
|
||
:reviewed_by,
|
||
NOW(),
|
||
NOW()
|
||
)
|
||
ON CONFLICT (rule_result_id) DO UPDATE SET
|
||
edit_audit_status = EXCLUDED.edit_audit_status,
|
||
override_result = EXCLUDED.override_result,
|
||
message = EXCLUDED.message,
|
||
reviewed_by = EXCLUDED.reviewed_by,
|
||
updated_at = NOW()
|
||
RETURNING id, edit_audit_status, override_result, message
|
||
"""
|
||
),
|
||
payload,
|
||
)
|
||
).mappings().first()
|
||
await Session.commit()
|
||
|
||
return ReviewPointAuditVO(
|
||
reviewPointResultId=ReviewPointResultId,
|
||
editAuditStatusId=int(auditRow["id"]),
|
||
editAuditStatus=int(auditRow["edit_audit_status"] or 0),
|
||
overrideResult=bool(auditRow["override_result"]) if auditRow["override_result"] is not None else None,
|
||
message=str(auditRow["message"] or ""),
|
||
)
|
||
|
||
async def ConfirmReviewResults(self, CurrentUserId: int, DocumentId: int) -> DocumentConfirmVO:
|
||
"""确认文档评查结果。"""
|
||
async with GetAsyncSession() as Session:
|
||
await self._ensureDocumentGroupColumn(Session)
|
||
currentUser = await self._getCurrentUserContext(CurrentUserId)
|
||
documentColumns = await self._loadDocumentColumns(Session)
|
||
detail = await self._getDocumentDetail(Session, DocumentId, CurrentUserId, currentUser, documentColumns)
|
||
if not detail and await self._hasCrossReviewDocumentAccess(Session, DocumentId, CurrentUserId):
|
||
detail = await self._getDocumentDetail(
|
||
Session,
|
||
DocumentId,
|
||
CurrentUserId,
|
||
currentUser,
|
||
documentColumns,
|
||
BypassScopeCheck=True,
|
||
)
|
||
if not detail:
|
||
raise LeauditException(StatusCodeEnum.HTTP_404_NOT_FOUND, "文档不存在或无权访问")
|
||
|
||
if "audit_status" in documentColumns:
|
||
await Session.execute(
|
||
text(
|
||
"""
|
||
UPDATE leaudit_documents
|
||
SET audit_status = 1, updated_at = NOW()
|
||
WHERE id = :document_id
|
||
AND deleted_at IS NULL
|
||
"""
|
||
),
|
||
{"document_id": DocumentId},
|
||
)
|
||
await Session.commit()
|
||
|
||
return DocumentConfirmVO(documentId=DocumentId, auditStatus=1)
|
||
|
||
async def GetDocumentsStatus(self, CurrentUserId: int, Ids: list[int]) -> list[DocumentStatusItemVO]:
|
||
"""批量获取文档状态,并执行数据隔离校验。"""
|
||
normalizedIds = [int(docId) for docId in Ids if int(docId) > 0]
|
||
if not normalizedIds:
|
||
return []
|
||
|
||
async with GetAsyncSession() as Session:
|
||
currentUser = await self._getCurrentUserContext(CurrentUserId)
|
||
params: dict[str, object] = {"ids": normalizedIds}
|
||
filters = [
|
||
"d.id = ANY(:ids)",
|
||
"d.deleted_at IS NULL",
|
||
"f.is_active = true",
|
||
"f.file_role = 'primary'",
|
||
]
|
||
filters.extend(
|
||
self._buildDocumentScopeFilters(
|
||
CurrentUserId=CurrentUserId,
|
||
CurrentUser=currentUser,
|
||
Params=params,
|
||
DocumentAlias="d",
|
||
FileAlias="f",
|
||
)
|
||
)
|
||
|
||
rows = (
|
||
await Session.execute(
|
||
text(
|
||
f"""
|
||
SELECT
|
||
d.id AS document_id,
|
||
d.processing_status,
|
||
ar.status AS run_status,
|
||
ar.phase,
|
||
ar.result_status,
|
||
d.updated_at
|
||
FROM leaudit_documents d
|
||
JOIN leaudit_document_files f
|
||
ON f.document_id = d.id
|
||
LEFT JOIN leaudit_audit_runs ar
|
||
ON ar.id = d.current_run_id
|
||
WHERE {' AND '.join(filters)}
|
||
ORDER BY d.updated_at DESC, d.id DESC
|
||
"""
|
||
),
|
||
params,
|
||
)
|
||
).mappings().all()
|
||
|
||
return [
|
||
DocumentStatusItemVO(
|
||
documentId=int(row["document_id"]),
|
||
processingStatus=row["processing_status"],
|
||
runStatus=row["run_status"],
|
||
phase=row["phase"],
|
||
resultStatus=row["result_status"],
|
||
updatedAt=row["updated_at"].isoformat() if row["updated_at"] else None,
|
||
)
|
||
for row in rows
|
||
]
|
||
|
||
async def UpdateDocument(self, CurrentUserId: int, Id: int, Body: DocumentUpdateDTO) -> DocumentDetailVO:
|
||
"""更新文档元数据,并执行数据隔离校验。"""
|
||
async with GetAsyncSession() as Session:
|
||
await self._ensureDocumentGroupColumn(Session)
|
||
currentUser = await self._getCurrentUserContext(CurrentUserId)
|
||
documentColumns = await self._loadDocumentColumns(Session)
|
||
detail = await self._getDocumentDetail(Session, Id, CurrentUserId, currentUser, documentColumns)
|
||
if not detail:
|
||
raise LeauditException(StatusCodeEnum.HTTP_404_NOT_FOUND, "文档不存在或无权访问")
|
||
|
||
assignments: list[str] = []
|
||
params: dict[str, object] = {"id": Id}
|
||
|
||
if Body.documentNumber is not None and "document_number" in documentColumns:
|
||
assignments.append("document_number = :document_number")
|
||
params["document_number"] = Body.documentNumber.strip() or None
|
||
if Body.remark is not None and "remark" in documentColumns:
|
||
assignments.append("remark = :remark")
|
||
params["remark"] = Body.remark.strip() or None
|
||
if Body.isTestDocument is not None and "is_test_document" in documentColumns:
|
||
assignments.append("is_test_document = :is_test_document")
|
||
params["is_test_document"] = Body.isTestDocument
|
||
if Body.auditStatus is not None and "audit_status" in documentColumns:
|
||
assignments.append("audit_status = :audit_status")
|
||
params["audit_status"] = Body.auditStatus
|
||
|
||
if assignments:
|
||
assignments.append("updated_at = NOW()")
|
||
await Session.execute(
|
||
text(f"UPDATE leaudit_documents SET {', '.join(assignments)} WHERE id = :id AND deleted_at IS NULL"),
|
||
params,
|
||
)
|
||
await Session.commit()
|
||
|
||
refreshed = await self._getDocumentDetail(Session, Id, CurrentUserId, currentUser, documentColumns)
|
||
if not refreshed:
|
||
raise LeauditException(StatusCodeEnum.HTTP_404_NOT_FOUND, "文档不存在或无权访问")
|
||
return refreshed
|
||
|
||
async def _appendAttachmentFiles(
|
||
self,
|
||
Session,
|
||
*,
|
||
DocumentId: int,
|
||
TypeCode: str,
|
||
Region: str,
|
||
VersionNo: int,
|
||
Files: list[tuple[str, bytes, str | None]],
|
||
CreatedBy: int | None,
|
||
) -> None:
|
||
"""为文档追加附件文件记录。"""
|
||
versionLabel = f"v{VersionNo}"
|
||
uploadedAt = datetime.now()
|
||
|
||
for fileName, fileContent, contentType in Files:
|
||
if not fileName:
|
||
raise LeauditException(StatusCodeEnum.HTTP_400_BAD_REQUEST, "附件文件名不能为空")
|
||
if not fileContent:
|
||
raise LeauditException(StatusCodeEnum.HTTP_400_BAD_REQUEST, f"附件 {fileName} 内容不能为空")
|
||
|
||
fileExt = Path(fileName).suffix.lstrip(".").lower() or None
|
||
mimeType = contentType or mimetypes.guess_type(fileName)[0] or "application/octet-stream"
|
||
fileSha256 = hashlib.sha256(fileContent).hexdigest()
|
||
fileSize = len(fileContent)
|
||
objectKey = OssPathUtils.BuildBusinessDocKey(
|
||
Region=Region,
|
||
TypeCode=TypeCode,
|
||
DocumentId=DocumentId,
|
||
Version=versionLabel,
|
||
FileRole="attachment",
|
||
FileName=fileName,
|
||
Year=uploadedAt.year,
|
||
Month=uploadedAt.month,
|
||
)
|
||
ossUrl = await self.OssService.UploadBytes(
|
||
ObjectKey=objectKey,
|
||
Content=fileContent,
|
||
ContentType=mimeType,
|
||
)
|
||
|
||
attachment = LeauditDocumentFile(
|
||
documentId=DocumentId,
|
||
fileRole="attachment",
|
||
fileName=fileName,
|
||
fileExt=fileExt,
|
||
mimeType=mimeType,
|
||
fileSize=fileSize,
|
||
sha256=fileSha256,
|
||
localPath=None,
|
||
ossUrl=ossUrl,
|
||
storageProvider="minio",
|
||
isActive=True,
|
||
createdBy=CreatedBy,
|
||
)
|
||
Session.add(attachment)
|
||
|
||
await Session.commit()
|
||
|
||
async def DeleteDocument(self, CurrentUserId: int, Id: int) -> None:
|
||
"""软删除文档,并执行数据隔离校验。"""
|
||
async with GetAsyncSession() as Session:
|
||
currentUser = await self._getCurrentUserContext(CurrentUserId)
|
||
filters = ["d.id = :id", "d.deleted_at IS NULL", "f.is_active = true", "f.file_role = 'primary'"]
|
||
params: dict[str, object] = {"id": Id}
|
||
filters.extend(
|
||
self._buildDocumentScopeFilters(
|
||
CurrentUserId=CurrentUserId,
|
||
CurrentUser=currentUser,
|
||
Params=params,
|
||
DocumentAlias="d",
|
||
FileAlias="f",
|
||
)
|
||
)
|
||
row = (
|
||
await Session.execute(
|
||
text(
|
||
f"""
|
||
SELECT d.id, d.version_group_key, d.is_latest_version
|
||
FROM leaudit_documents d
|
||
JOIN leaudit_document_files f
|
||
ON f.document_id = d.id
|
||
WHERE {' AND '.join(filters)}
|
||
LIMIT 1
|
||
"""
|
||
),
|
||
params,
|
||
)
|
||
).mappings().first()
|
||
if not row:
|
||
raise LeauditException(StatusCodeEnum.HTTP_404_NOT_FOUND, "文档不存在或无权访问")
|
||
|
||
versionGroupKey = str(row["version_group_key"] or "")
|
||
isLatestVersion = bool(row["is_latest_version"])
|
||
|
||
await Session.execute(
|
||
text(
|
||
"""
|
||
UPDATE leaudit_documents
|
||
SET deleted_at = NOW(), is_latest_version = false, updated_at = NOW()
|
||
WHERE id = :id AND deleted_at IS NULL
|
||
"""
|
||
),
|
||
{"id": Id},
|
||
)
|
||
await Session.execute(
|
||
text(
|
||
"""
|
||
UPDATE leaudit_document_files
|
||
SET is_active = false
|
||
WHERE document_id = :id AND is_active = true
|
||
"""
|
||
),
|
||
{"id": Id},
|
||
)
|
||
|
||
if isLatestVersion and versionGroupKey:
|
||
nextLatest = (
|
||
await Session.execute(
|
||
text(
|
||
"""
|
||
SELECT id
|
||
FROM leaudit_documents
|
||
WHERE version_group_key = :group_key
|
||
AND deleted_at IS NULL
|
||
ORDER BY version_no DESC, id DESC
|
||
LIMIT 1
|
||
"""
|
||
),
|
||
{"group_key": versionGroupKey},
|
||
)
|
||
).scalar_one_or_none()
|
||
if nextLatest is not None:
|
||
await Session.execute(
|
||
text(
|
||
"""
|
||
UPDATE leaudit_documents
|
||
SET is_latest_version = true, updated_at = NOW()
|
||
WHERE id = :id
|
||
"""
|
||
),
|
||
{"id": int(nextLatest)},
|
||
)
|
||
|
||
await Session.commit()
|
||
|
||
async def AppendAttachments(
|
||
self,
|
||
CurrentUserId: int,
|
||
Id: int,
|
||
Files: list[tuple[str, bytes, str | None]],
|
||
) -> DocumentDetailVO:
|
||
"""为现有文档追加附件,并执行数据隔离校验。"""
|
||
if not Files:
|
||
raise LeauditException(StatusCodeEnum.HTTP_400_BAD_REQUEST, "至少上传一个附件文件")
|
||
|
||
async with GetAsyncSession() as Session:
|
||
await self._ensureDocumentGroupColumn(Session)
|
||
currentUser = await self._getCurrentUserContext(CurrentUserId)
|
||
documentColumns = await self._loadDocumentColumns(Session)
|
||
detail = await self._getDocumentDetail(Session, Id, CurrentUserId, currentUser, documentColumns)
|
||
if not detail:
|
||
raise LeauditException(StatusCodeEnum.HTTP_404_NOT_FOUND, "文档不存在或无权访问")
|
||
|
||
documentMeta = (
|
||
await Session.execute(
|
||
text(
|
||
"""
|
||
SELECT
|
||
d.id AS document_id,
|
||
d.type_id,
|
||
d.version_no,
|
||
d.region,
|
||
dt.code AS type_code
|
||
FROM leaudit_documents d
|
||
LEFT JOIN leaudit_document_types dt
|
||
ON dt.id = d.type_id
|
||
WHERE d.id = :document_id
|
||
AND d.deleted_at IS NULL
|
||
LIMIT 1
|
||
"""
|
||
),
|
||
{"document_id": Id},
|
||
)
|
||
).mappings().first()
|
||
if not documentMeta:
|
||
raise LeauditException(StatusCodeEnum.HTTP_404_NOT_FOUND, "文档不存在或无权访问")
|
||
|
||
resolvedTypeCode = str(documentMeta["type_code"] or "").strip()
|
||
if not resolvedTypeCode:
|
||
raise LeauditException(StatusCodeEnum.HTTP_400_BAD_REQUEST, "当前文档缺少文档类型编码,无法追加附件")
|
||
|
||
normalizedRegion = str(documentMeta["region"] or "default").strip() or "default"
|
||
await self._appendAttachmentFiles(
|
||
Session=Session,
|
||
DocumentId=int(documentMeta["document_id"]),
|
||
TypeCode=resolvedTypeCode,
|
||
Region=normalizedRegion,
|
||
VersionNo=int(documentMeta["version_no"] or 1),
|
||
Files=Files,
|
||
CreatedBy=CurrentUserId,
|
||
)
|
||
|
||
refreshed = await self._getDocumentDetail(Session, Id, CurrentUserId, currentUser, documentColumns)
|
||
if not refreshed:
|
||
raise LeauditException(StatusCodeEnum.HTTP_404_NOT_FOUND, "文档不存在或无权访问")
|
||
return refreshed
|
||
|
||
|
||
async def ListDocumentTypes(self, Ids: list[int] | None = None, EntryModuleId: int | None = None) -> list[DocumentTypeItemVO]:
|
||
"""获取文档类型列表。"""
|
||
async with GetAsyncSession() as Session:
|
||
rows, bindingsMap = await self._queryDocumentTypes(Session, Ids, EntryModuleId)
|
||
return [
|
||
DocumentTypeItemVO(
|
||
id=int(r["id"]), name=str(r["name"] or ""), code=str(r["code"] or ""),
|
||
description=r.get("description"), entryModuleId=r.get("entry_module_id"),
|
||
isEnabled=bool(r.get("is_enabled", True)),
|
||
ruleSetIds=bindingsMap.get(int(r["id"]), []),
|
||
)
|
||
for r in rows
|
||
]
|
||
|
||
async def GetDocumentType(self, Id: int) -> DocumentTypeItemVO:
|
||
"""获取文档类型详情。"""
|
||
async with GetAsyncSession() as Session:
|
||
rows, bindingsMap = await self._queryDocumentTypes(Session, [Id])
|
||
if not rows:
|
||
raise LeauditException(StatusCodeEnum.HTTP_404_NOT_FOUND, "文档类型不存在")
|
||
r = rows[0]
|
||
return DocumentTypeItemVO(
|
||
id=int(r["id"]), name=str(r["name"] or ""), code=str(r["code"] or ""),
|
||
description=r.get("description"), entryModuleId=r.get("entry_module_id"),
|
||
isEnabled=bool(r.get("is_enabled", True)),
|
||
ruleSetIds=bindingsMap.get(int(r["id"]), []),
|
||
)
|
||
|
||
async def CreateDocumentType(self, Body: DocumentTypeCreateDTO) -> DocumentTypeItemVO:
|
||
"""创建文档类型。"""
|
||
async with GetAsyncSession() as Session:
|
||
existing = (
|
||
await Session.execute(
|
||
text("SELECT 1 FROM leaudit_document_types WHERE code = :code AND deleted_at IS NULL LIMIT 1"),
|
||
{"code": Body.code.strip()},
|
||
)
|
||
).first()
|
||
if existing:
|
||
raise LeauditException(StatusCodeEnum.HTTP_409_CONFLICT, f"文档类型编码 {Body.code} 已存在")
|
||
|
||
row = (
|
||
await Session.execute(
|
||
text(
|
||
"""
|
||
INSERT INTO leaudit_document_types (code, name, description, entry_module_id, is_enabled, sort_order, created_at, updated_at)
|
||
VALUES (:code, :name, :description, :entry_module_id, :is_enabled, :sort_order, NOW(), NOW())
|
||
RETURNING id
|
||
"""
|
||
),
|
||
{
|
||
"code": Body.code.strip(),
|
||
"name": Body.name.strip(),
|
||
"description": Body.description.strip() or None,
|
||
"entry_module_id": Body.entryModuleId,
|
||
"is_enabled": Body.isEnabled,
|
||
"sort_order": Body.sortOrder,
|
||
},
|
||
)
|
||
).scalar_one()
|
||
await self._syncRuleBindings(Session, int(row), Body.ruleSetIds, "default")
|
||
await sync_group_bindings_from_doc_type(Session, int(row), Body.ruleSetIds)
|
||
await Session.commit()
|
||
|
||
return await self.GetDocumentType(int(row))
|
||
|
||
async def UpdateDocumentType(self, Id: int, Body: DocumentTypeUpdateDTO) -> DocumentTypeItemVO:
|
||
"""更新文档类型。"""
|
||
async with GetAsyncSession() as Session:
|
||
current = (
|
||
await Session.execute(
|
||
text("SELECT id FROM leaudit_document_types WHERE id = :id AND deleted_at IS NULL"),
|
||
{"id": Id},
|
||
)
|
||
).first()
|
||
if not current:
|
||
raise LeauditException(StatusCodeEnum.HTTP_404_NOT_FOUND, "文档类型不存在")
|
||
|
||
providedFields = set(getattr(Body, "model_fields_set", set()))
|
||
sets: list[str] = []
|
||
params: dict[str, object] = {"id": Id}
|
||
if "name" in providedFields and Body.name is not None:
|
||
sets.append("name = :name")
|
||
params["name"] = Body.name.strip()
|
||
if "description" in providedFields:
|
||
sets.append("description = :description")
|
||
params["description"] = Body.description.strip() if Body.description is not None else None
|
||
if params["description"] == "":
|
||
params["description"] = None
|
||
if "entryModuleId" in providedFields:
|
||
sets.append("entry_module_id = :entry_module_id")
|
||
params["entry_module_id"] = Body.entryModuleId
|
||
if "isEnabled" in providedFields and Body.isEnabled is not None:
|
||
sets.append("is_enabled = :is_enabled")
|
||
params["is_enabled"] = Body.isEnabled
|
||
if "sortOrder" in providedFields and Body.sortOrder is not None:
|
||
sets.append("sort_order = :sort_order")
|
||
params["sort_order"] = Body.sortOrder
|
||
if sets:
|
||
sets.append("updated_at = NOW()")
|
||
await Session.execute(text(f"UPDATE leaudit_document_types SET {', '.join(sets)} WHERE id = :id"), params)
|
||
|
||
if "ruleSetIds" in providedFields and Body.ruleSetIds is not None:
|
||
await self._syncRuleBindings(Session, Id, Body.ruleSetIds, "default")
|
||
await sync_group_bindings_from_doc_type(Session, Id, Body.ruleSetIds)
|
||
|
||
await Session.commit()
|
||
return await self.GetDocumentType(Id)
|
||
|
||
async def DeleteDocumentType(self, Id: int) -> None:
|
||
"""软删除文档类型。"""
|
||
async with GetAsyncSession() as Session:
|
||
current = (
|
||
await Session.execute(
|
||
text("SELECT 1 FROM leaudit_document_types WHERE id = :id AND deleted_at IS NULL"),
|
||
{"id": Id},
|
||
)
|
||
).first()
|
||
if not current:
|
||
raise LeauditException(StatusCodeEnum.HTTP_404_NOT_FOUND, "文档类型不存在")
|
||
await Session.execute(text("UPDATE leaudit_document_types SET deleted_at = NOW() WHERE id = :id"), {"id": Id})
|
||
await Session.execute(text("UPDATE leaudit_rule_type_bindings SET deleted_at = NOW() WHERE doc_type_id = :id AND deleted_at IS NULL"), {"id": Id})
|
||
await Session.commit()
|
||
|
||
async def ListDocumentTypeRoots(self, EntryModuleId: int | None = None) -> list[DocumentTypeRootItemVO]:
|
||
"""获取一级文档类型(业务大类)列表。"""
|
||
async with GetAsyncSession() as Session:
|
||
rows = await self._queryDocumentTypeRoots(Session, EntryModuleId=EntryModuleId)
|
||
return [self._toDocumentTypeRootVo(row) for row in rows]
|
||
|
||
async def GetDocumentTypeRoot(self, Id: int) -> DocumentTypeRootItemVO:
|
||
"""获取一级文档类型(业务大类)详情。"""
|
||
async with GetAsyncSession() as Session:
|
||
rows = await self._queryDocumentTypeRoots(Session, Ids=[Id])
|
||
if not rows:
|
||
raise LeauditException(StatusCodeEnum.HTTP_404_NOT_FOUND, "一级文档类型不存在")
|
||
return self._toDocumentTypeRootVo(rows[0])
|
||
|
||
async def CreateDocumentTypeRoot(self, Body: DocumentTypeRootCreateDTO) -> DocumentTypeRootItemVO:
|
||
"""创建一级文档类型(业务大类)。"""
|
||
async with GetAsyncSession() as Session:
|
||
await self._ensureDocumentTypeRootCodeUnique(Session, Body.code.strip(), None)
|
||
root_id = (
|
||
await Session.execute(
|
||
text(
|
||
"""
|
||
INSERT INTO leaudit_evaluation_point_groups
|
||
(pid, name, code, description, document_type_id, entry_module_id, sort_order, is_enabled, created_at, updated_at)
|
||
VALUES
|
||
(0, :name, :code, :description, NULL, :entry_module_id, :sort_order, :is_enabled, NOW(), NOW())
|
||
RETURNING id
|
||
"""
|
||
),
|
||
{
|
||
"name": Body.name.strip(),
|
||
"code": Body.code.strip(),
|
||
"description": Body.description.strip() or None,
|
||
"entry_module_id": Body.entryModuleId,
|
||
"sort_order": Body.sortOrder,
|
||
"is_enabled": Body.isEnabled,
|
||
},
|
||
)
|
||
).scalar_one()
|
||
await Session.commit()
|
||
return await self.GetDocumentTypeRoot(int(root_id))
|
||
|
||
async def UpdateDocumentTypeRoot(self, Id: int, Body: DocumentTypeRootUpdateDTO) -> DocumentTypeRootItemVO:
|
||
"""更新一级文档类型(业务大类)。"""
|
||
async with GetAsyncSession() as Session:
|
||
current = (
|
||
await Session.execute(
|
||
text(
|
||
"""
|
||
SELECT 1
|
||
FROM leaudit_evaluation_point_groups
|
||
WHERE id = :id
|
||
AND deleted_at IS NULL
|
||
AND COALESCE(pid, 0) = 0
|
||
"""
|
||
),
|
||
{"id": Id},
|
||
)
|
||
).first()
|
||
if not current:
|
||
raise LeauditException(StatusCodeEnum.HTTP_404_NOT_FOUND, "一级文档类型不存在")
|
||
|
||
providedFields = set(getattr(Body, "model_fields_set", set()))
|
||
sets: list[str] = []
|
||
params: dict[str, object] = {"id": Id}
|
||
if "name" in providedFields and Body.name is not None:
|
||
sets.append("name = :name")
|
||
params["name"] = Body.name.strip()
|
||
if "description" in providedFields:
|
||
sets.append("description = :description")
|
||
params["description"] = Body.description.strip() if Body.description is not None else None
|
||
if params["description"] == "":
|
||
params["description"] = None
|
||
if "entryModuleId" in providedFields:
|
||
sets.append("entry_module_id = :entry_module_id")
|
||
params["entry_module_id"] = Body.entryModuleId
|
||
if "isEnabled" in providedFields and Body.isEnabled is not None:
|
||
sets.append("is_enabled = :is_enabled")
|
||
params["is_enabled"] = Body.isEnabled
|
||
if "sortOrder" in providedFields and Body.sortOrder is not None:
|
||
sets.append("sort_order = :sort_order")
|
||
params["sort_order"] = Body.sortOrder
|
||
if sets:
|
||
sets.append("updated_at = NOW()")
|
||
await Session.execute(
|
||
text(f"UPDATE leaudit_evaluation_point_groups SET {', '.join(sets)} WHERE id = :id"),
|
||
params,
|
||
)
|
||
await Session.commit()
|
||
return await self.GetDocumentTypeRoot(Id)
|
||
|
||
async def _queryDocumentTypes(self, Session, Ids: list[int] | None = None, EntryModuleId: int | None = None):
|
||
"""查询文档类型及其规则绑定。"""
|
||
if Ids:
|
||
rows = (
|
||
await Session.execute(
|
||
text(
|
||
"""
|
||
SELECT id, code, name, description, entry_module_id, is_enabled, sort_order
|
||
FROM leaudit_document_types
|
||
WHERE deleted_at IS NULL AND id = ANY(:ids)
|
||
ORDER BY sort_order ASC, id ASC
|
||
"""
|
||
),
|
||
{"ids": Ids},
|
||
)
|
||
).mappings().all()
|
||
elif EntryModuleId is not None:
|
||
rows = (
|
||
await Session.execute(
|
||
text(
|
||
"""
|
||
SELECT id, code, name, description, entry_module_id, is_enabled, sort_order
|
||
FROM leaudit_document_types
|
||
WHERE deleted_at IS NULL
|
||
AND entry_module_id = :entry_module_id
|
||
ORDER BY sort_order ASC, id ASC
|
||
"""
|
||
),
|
||
{"entry_module_id": EntryModuleId},
|
||
)
|
||
).mappings().all()
|
||
else:
|
||
rows = (
|
||
await Session.execute(
|
||
text(
|
||
"""
|
||
SELECT id, code, name, description, entry_module_id, is_enabled, sort_order
|
||
FROM leaudit_document_types
|
||
WHERE deleted_at IS NULL
|
||
ORDER BY sort_order ASC, id ASC
|
||
"""
|
||
)
|
||
)
|
||
).mappings().all()
|
||
|
||
allIds = [int(r["id"]) for r in rows]
|
||
bindingsMap: dict[int, list[int]] = {i: [] for i in allIds}
|
||
if allIds:
|
||
bindingRows = (
|
||
await Session.execute(
|
||
text(
|
||
"""
|
||
SELECT doc_type_id, rule_set_id
|
||
FROM leaudit_rule_type_bindings
|
||
WHERE doc_type_id = ANY(:ids) AND deleted_at IS NULL AND is_active = true
|
||
ORDER BY priority DESC
|
||
"""
|
||
),
|
||
{"ids": allIds},
|
||
)
|
||
).fetchall()
|
||
for b in bindingRows:
|
||
bindingsMap.setdefault(int(b[0]), []).append(int(b[1]))
|
||
return rows, bindingsMap
|
||
|
||
async def _queryDocumentTypeRoots(self, Session, Ids: list[int] | None = None, EntryModuleId: int | None = None):
|
||
filters = ["g.deleted_at IS NULL", "COALESCE(g.pid, 0) = 0"]
|
||
params: dict[str, object] = {}
|
||
if Ids:
|
||
filters.append("g.id = ANY(:ids)")
|
||
params["ids"] = Ids
|
||
if EntryModuleId is not None:
|
||
filters.append("g.entry_module_id = :entry_module_id")
|
||
params["entry_module_id"] = EntryModuleId
|
||
|
||
where_clause = " AND ".join(filters)
|
||
rows = (
|
||
await Session.execute(
|
||
text(
|
||
f"""
|
||
SELECT
|
||
g.id,
|
||
g.name,
|
||
g.code,
|
||
g.description,
|
||
g.entry_module_id,
|
||
em.name AS entry_module_name,
|
||
g.is_enabled,
|
||
COALESCE(child_stats.child_group_count, 0) AS child_group_count,
|
||
COALESCE(rule_stats.rule_set_count, 0) AS rule_set_count,
|
||
COALESCE(rule_stats.rule_set_ids, ARRAY[]::bigint[]) AS rule_set_ids
|
||
FROM leaudit_evaluation_point_groups g
|
||
LEFT JOIN leaudit_entry_modules em ON em.id = g.entry_module_id
|
||
LEFT JOIN (
|
||
SELECT pid, COUNT(*)::int AS child_group_count
|
||
FROM leaudit_evaluation_point_groups
|
||
WHERE deleted_at IS NULL AND COALESCE(pid, 0) <> 0
|
||
GROUP BY pid
|
||
) child_stats ON child_stats.pid = g.id
|
||
LEFT JOIN (
|
||
SELECT
|
||
child.pid,
|
||
COUNT(DISTINCT rgb.rule_set_id)::int AS rule_set_count,
|
||
ARRAY_AGG(DISTINCT rgb.rule_set_id) FILTER (WHERE rgb.rule_set_id IS NOT NULL) AS rule_set_ids
|
||
FROM leaudit_evaluation_point_groups child
|
||
LEFT JOIN leaudit_rule_group_bindings rgb
|
||
ON rgb.group_id = child.id
|
||
AND rgb.deleted_at IS NULL
|
||
WHERE child.deleted_at IS NULL
|
||
AND COALESCE(child.pid, 0) <> 0
|
||
GROUP BY child.pid
|
||
) rule_stats ON rule_stats.pid = g.id
|
||
WHERE {where_clause}
|
||
ORDER BY COALESCE(g.sort_order, 0) ASC, g.id ASC
|
||
"""
|
||
),
|
||
params,
|
||
)
|
||
).mappings().all()
|
||
return rows
|
||
|
||
def _toDocumentTypeRootVo(self, row) -> DocumentTypeRootItemVO:
|
||
rule_set_ids = [int(item) for item in (row.get("rule_set_ids") or []) if item is not None]
|
||
return DocumentTypeRootItemVO(
|
||
id=int(row["id"]),
|
||
name=str(row["name"] or ""),
|
||
code=str(row["code"] or ""),
|
||
description=row.get("description"),
|
||
entryModuleId=int(row["entry_module_id"]) if row.get("entry_module_id") is not None else None,
|
||
entryModuleName=row.get("entry_module_name"),
|
||
isEnabled=bool(row.get("is_enabled", True)),
|
||
childGroupCount=int(row.get("child_group_count") or 0),
|
||
ruleSetCount=int(row.get("rule_set_count") or 0),
|
||
ruleSetIds=rule_set_ids,
|
||
)
|
||
|
||
async def _ensureDocumentTypeRootCodeUnique(self, Session, Code: str, CurrentId: int | None) -> None:
|
||
sql = """
|
||
SELECT 1
|
||
FROM leaudit_evaluation_point_groups
|
||
WHERE deleted_at IS NULL
|
||
AND COALESCE(pid, 0) = 0
|
||
AND code = :code
|
||
"""
|
||
params: dict[str, object] = {"code": Code}
|
||
if CurrentId is not None:
|
||
sql += " AND id <> :current_id"
|
||
params["current_id"] = CurrentId
|
||
exists = (await Session.execute(text(sql), params)).first()
|
||
if exists:
|
||
raise LeauditException(StatusCodeEnum.HTTP_409_CONFLICT, f"一级文档类型编码 {Code} 已存在")
|
||
|
||
async def _loadDocumentColumns(self, Session) -> set[str]:
|
||
"""读取 leaudit_documents 当前真实列,兼容不同环境的渐进式字段上线。"""
|
||
rows = (
|
||
await Session.execute(
|
||
text(
|
||
"""
|
||
SELECT column_name
|
||
FROM information_schema.columns
|
||
WHERE table_schema = 'public'
|
||
AND table_name = 'leaudit_documents'
|
||
"""
|
||
)
|
||
)
|
||
).fetchall()
|
||
return {str(row[0]) for row in rows}
|
||
|
||
async def _ensureReviewPointAuditTable(self, Session) -> None:
|
||
"""补齐详情页人工审核表。"""
|
||
await Session.execute(
|
||
text(
|
||
"""
|
||
CREATE TABLE IF NOT EXISTS leaudit_review_point_audits (
|
||
id BIGSERIAL PRIMARY KEY,
|
||
rule_result_id BIGINT NOT NULL UNIQUE,
|
||
document_id BIGINT NOT NULL,
|
||
edit_audit_status INTEGER NOT NULL DEFAULT 0,
|
||
override_result BOOLEAN NULL,
|
||
message TEXT NOT NULL DEFAULT '',
|
||
reviewed_by BIGINT NULL,
|
||
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
|
||
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
|
||
)
|
||
"""
|
||
)
|
||
)
|
||
await Session.execute(
|
||
text(
|
||
"CREATE INDEX IF NOT EXISTS idx_leaudit_review_point_audits_document_id ON leaudit_review_point_audits(document_id)"
|
||
)
|
||
)
|
||
|
||
async def _tableExists(self, Session, TableName: str) -> bool:
|
||
"""检查表是否存在。"""
|
||
row = (
|
||
await Session.execute(
|
||
text(
|
||
"""
|
||
SELECT EXISTS (
|
||
SELECT 1
|
||
FROM information_schema.tables
|
||
WHERE table_schema = 'public'
|
||
AND table_name = :table_name
|
||
) AS exists_flag
|
||
"""
|
||
),
|
||
{"table_name": TableName},
|
||
)
|
||
).mappings().first()
|
||
return bool(row["exists_flag"]) if row else False
|
||
|
||
async def _loadTableColumns(self, Session, TableName: str) -> set[str]:
|
||
"""读取任意 public 表的真实列。"""
|
||
rows = (
|
||
await Session.execute(
|
||
text(
|
||
"""
|
||
SELECT column_name
|
||
FROM information_schema.columns
|
||
WHERE table_schema = 'public'
|
||
AND table_name = :table_name
|
||
"""
|
||
),
|
||
{"table_name": TableName},
|
||
)
|
||
).fetchall()
|
||
return {str(row[0]) for row in rows}
|
||
|
||
async def _ensureDocumentGroupColumn(self, Session) -> None:
|
||
"""渐进式补齐文档二级分组字段,避免旧环境缺列。"""
|
||
await Session.execute(
|
||
text(
|
||
"""
|
||
ALTER TABLE leaudit_documents
|
||
ADD COLUMN IF NOT EXISTS group_id BIGINT NULL
|
||
REFERENCES leaudit_evaluation_point_groups(id)
|
||
"""
|
||
)
|
||
)
|
||
await Session.execute(
|
||
text("CREATE INDEX IF NOT EXISTS idx_leaudit_documents_group_id ON leaudit_documents(group_id)")
|
||
)
|
||
|
||
async def _resolveDocumentGroupId(self, Session, TypeId: int, GroupId: int | None) -> int | None:
|
||
"""校验上传时选择的二级分组是否属于当前文档类型。"""
|
||
if GroupId is None:
|
||
return None
|
||
row = (
|
||
await Session.execute(
|
||
text(
|
||
"""
|
||
SELECT id
|
||
FROM leaudit_evaluation_point_groups
|
||
WHERE id = :group_id
|
||
AND document_type_id = :doc_type_id
|
||
AND deleted_at IS NULL
|
||
AND COALESCE(pid, 0) <> 0
|
||
LIMIT 1
|
||
"""
|
||
),
|
||
{"group_id": GroupId, "doc_type_id": TypeId},
|
||
)
|
||
).mappings().first()
|
||
if not row:
|
||
raise LeauditException(StatusCodeEnum.HTTP_400_BAD_REQUEST, "当前子类型不属于所选文档类型,无法上传")
|
||
return int(row["id"])
|
||
|
||
async def _resolveDocumentRootGroupId(self, Session, TypeId: int, GroupId: int | None) -> int | None:
|
||
"""解析上传命中的一级分组,用于跨二级类型做版本归档。"""
|
||
if GroupId is not None:
|
||
row = (
|
||
await Session.execute(
|
||
text(
|
||
"""
|
||
SELECT
|
||
CASE
|
||
WHEN COALESCE(pid, 0) = 0 THEN id
|
||
ELSE pid
|
||
END AS root_group_id
|
||
FROM leaudit_evaluation_point_groups
|
||
WHERE id = :group_id
|
||
AND deleted_at IS NULL
|
||
LIMIT 1
|
||
"""
|
||
),
|
||
{"group_id": GroupId},
|
||
)
|
||
).mappings().first()
|
||
if row and row["root_group_id"] is not None:
|
||
return int(row["root_group_id"])
|
||
|
||
row = (
|
||
await Session.execute(
|
||
text(
|
||
"""
|
||
SELECT
|
||
CASE WHEN COUNT(DISTINCT pid) = 1 THEN MIN(pid) END AS root_group_id
|
||
FROM leaudit_evaluation_point_groups
|
||
WHERE document_type_id = :doc_type_id
|
||
AND COALESCE(pid, 0) <> 0
|
||
AND deleted_at IS NULL
|
||
AND is_enabled = true
|
||
"""
|
||
),
|
||
{"doc_type_id": TypeId},
|
||
)
|
||
).mappings().first()
|
||
if row and row["root_group_id"] is not None:
|
||
return int(row["root_group_id"])
|
||
return None
|
||
|
||
async def _getDocumentDetail(
|
||
self,
|
||
Session,
|
||
DocumentId: int,
|
||
CurrentUserId: int,
|
||
CurrentUser: dict[str, Any],
|
||
DocumentColumns: set[str],
|
||
BypassScopeCheck: bool = False,
|
||
) -> DocumentDetailVO | None:
|
||
"""查询单文档详情,并附带历史版本。"""
|
||
params: dict[str, object] = {"id": DocumentId}
|
||
filters = ["d.id = :id", "d.deleted_at IS NULL", "f.is_active = true", "f.file_role = 'primary'"]
|
||
if not BypassScopeCheck:
|
||
filters.extend(
|
||
self._buildDocumentScopeFilters(
|
||
CurrentUserId=CurrentUserId,
|
||
CurrentUser=CurrentUser,
|
||
Params=params,
|
||
DocumentAlias="d",
|
||
FileAlias="f",
|
||
)
|
||
)
|
||
whereClause = " AND ".join(filters)
|
||
|
||
groupIdSelectExpr = "d.group_id" if "group_id" in DocumentColumns else "NULL::bigint"
|
||
resolvedGroupIdExpr = f"COALESCE({groupIdSelectExpr}, dg.inferred_group_id)"
|
||
resolvedGroupNameExpr = (
|
||
f"CASE WHEN {groupIdSelectExpr} IS NOT NULL THEN eg.name ELSE dg.inferred_group_name END"
|
||
)
|
||
|
||
optionalSelects = [
|
||
f"{resolvedGroupIdExpr} AS group_id",
|
||
"d.document_number AS document_number" if "document_number" in DocumentColumns else "NULL::text AS document_number",
|
||
"d.remark AS remark" if "remark" in DocumentColumns else "NULL::text AS remark",
|
||
"d.is_test_document AS is_test_document" if "is_test_document" in DocumentColumns else "FALSE AS is_test_document",
|
||
"d.audit_status AS audit_status" if "audit_status" in DocumentColumns else "NULL::integer AS audit_status",
|
||
]
|
||
|
||
detailRow = (
|
||
await Session.execute(
|
||
text(
|
||
f"""
|
||
SELECT
|
||
d.id AS document_id,
|
||
d.biz_document_id AS internal_document_no,
|
||
d.version_group_key,
|
||
d.version_no,
|
||
d.root_version_id,
|
||
d.previous_version_id,
|
||
d.type_id,
|
||
dt.code AS type_code,
|
||
dt.name AS type_name,
|
||
{resolvedGroupNameExpr} AS group_name,
|
||
d.region,
|
||
d.normalized_name,
|
||
d.processing_status,
|
||
d.current_run_id,
|
||
d.updated_at,
|
||
f.id AS file_id,
|
||
f.file_name,
|
||
f.file_ext,
|
||
f.mime_type,
|
||
f.file_size,
|
||
f.oss_url,
|
||
ar.status AS run_status,
|
||
ar.result_status,
|
||
ar.total_score,
|
||
ar.passed_count,
|
||
ar.failed_count,
|
||
ar.skipped_count,
|
||
vc.total_versions,
|
||
COALESCE(vc.total_versions, 1) > 1 AS has_history,
|
||
{', '.join(optionalSelects)}
|
||
FROM leaudit_documents d
|
||
JOIN leaudit_document_files f
|
||
ON f.document_id = d.id
|
||
LEFT JOIN leaudit_document_types dt
|
||
ON dt.id = d.type_id
|
||
LEFT JOIN leaudit_evaluation_point_groups eg
|
||
ON eg.id = d.group_id
|
||
LEFT JOIN (
|
||
SELECT
|
||
document_type_id,
|
||
CASE WHEN COUNT(*) = 1 THEN MIN(id) END AS inferred_group_id,
|
||
CASE WHEN COUNT(*) = 1 THEN MIN(name) END AS inferred_group_name
|
||
FROM leaudit_evaluation_point_groups
|
||
WHERE pid <> 0
|
||
AND deleted_at IS NULL
|
||
AND is_enabled = true
|
||
AND document_type_id IS NOT NULL
|
||
GROUP BY document_type_id
|
||
) dg
|
||
ON dg.document_type_id = d.type_id
|
||
LEFT JOIN leaudit_audit_runs ar
|
||
ON ar.id = d.current_run_id
|
||
LEFT JOIN (
|
||
SELECT version_group_key, COUNT(*) AS total_versions
|
||
FROM leaudit_documents
|
||
WHERE deleted_at IS NULL
|
||
GROUP BY version_group_key
|
||
) vc
|
||
ON vc.version_group_key = d.version_group_key
|
||
WHERE {whereClause}
|
||
LIMIT 1
|
||
"""
|
||
),
|
||
params,
|
||
)
|
||
).mappings().first()
|
||
if not detailRow:
|
||
return None
|
||
|
||
groupKey = str(detailRow["version_group_key"] or "")
|
||
historyVersions: list[DocumentHistoryVersionVO] = []
|
||
if groupKey:
|
||
historyRows = (
|
||
await Session.execute(
|
||
text(
|
||
"""
|
||
SELECT
|
||
d.id AS document_id,
|
||
d.version_no,
|
||
d.processing_status,
|
||
d.updated_at,
|
||
f.id AS file_id,
|
||
f.file_name,
|
||
f.file_ext,
|
||
ar.status AS run_status,
|
||
ar.result_status
|
||
FROM leaudit_documents d
|
||
JOIN leaudit_document_files f
|
||
ON f.document_id = d.id
|
||
AND f.is_active = true
|
||
AND f.file_role = 'primary'
|
||
LEFT JOIN leaudit_audit_runs ar
|
||
ON ar.id = d.current_run_id
|
||
WHERE d.version_group_key = :group_key
|
||
AND d.id <> :document_id
|
||
AND d.deleted_at IS NULL
|
||
ORDER BY d.version_no DESC, d.id DESC
|
||
"""
|
||
),
|
||
{"group_key": groupKey, "document_id": int(detailRow["document_id"])},
|
||
)
|
||
).mappings().all()
|
||
historyVersions = [
|
||
DocumentHistoryVersionVO(
|
||
documentId=int(row["document_id"]),
|
||
fileId=int(row["file_id"]) if row["file_id"] is not None else None,
|
||
versionNo=int(row["version_no"]),
|
||
fileName=row["file_name"],
|
||
fileExt=row["file_ext"],
|
||
processingStatus=row["processing_status"],
|
||
runStatus=row["run_status"],
|
||
resultStatus=row["result_status"],
|
||
updatedAt=row["updated_at"].isoformat() if row["updated_at"] else None,
|
||
)
|
||
for row in historyRows
|
||
]
|
||
|
||
attachmentRows = (
|
||
await Session.execute(
|
||
text(
|
||
"""
|
||
SELECT
|
||
id,
|
||
file_name,
|
||
file_ext,
|
||
mime_type,
|
||
file_size,
|
||
file_role,
|
||
oss_url,
|
||
created_by,
|
||
created_at
|
||
FROM leaudit_document_files
|
||
WHERE document_id = :document_id
|
||
AND deleted_at IS NULL
|
||
AND is_active = true
|
||
AND file_role = 'attachment'
|
||
ORDER BY id ASC
|
||
"""
|
||
),
|
||
{"document_id": int(detailRow["document_id"])},
|
||
)
|
||
).mappings().all()
|
||
attachments = [
|
||
DocumentAttachmentVO(
|
||
fileId=int(row["id"]),
|
||
fileName=str(row["file_name"] or ""),
|
||
fileExt=row["file_ext"],
|
||
mimeType=row["mime_type"],
|
||
fileSize=int(row["file_size"]) if row["file_size"] is not None else None,
|
||
fileRole=str(row["file_role"] or "attachment"),
|
||
ossUrl=row["oss_url"],
|
||
createdBy=int(row["created_by"]) if row["created_by"] is not None else None,
|
||
createdAt=row["created_at"].isoformat() if row["created_at"] else None,
|
||
)
|
||
for row in attachmentRows
|
||
]
|
||
|
||
return DocumentDetailVO(
|
||
documentId=int(detailRow["document_id"]),
|
||
internalDocumentNo=int(detailRow["internal_document_no"]),
|
||
versionGroupKey=groupKey,
|
||
versionNo=int(detailRow["version_no"] or 1),
|
||
rootVersionId=int(detailRow["root_version_id"] or detailRow["document_id"]),
|
||
previousVersionId=int(detailRow["previous_version_id"]) if detailRow["previous_version_id"] is not None else None,
|
||
typeId=int(detailRow["type_id"]) if detailRow["type_id"] is not None else None,
|
||
typeCode=detailRow["type_code"],
|
||
typeName=detailRow["type_name"],
|
||
groupId=int(detailRow["group_id"]) if detailRow["group_id"] is not None else None,
|
||
groupName=detailRow["group_name"],
|
||
region=str(detailRow["region"] or ""),
|
||
normalizedName=detailRow["normalized_name"],
|
||
fileId=int(detailRow["file_id"]) if detailRow["file_id"] is not None else None,
|
||
fileName=detailRow["file_name"],
|
||
fileExt=detailRow["file_ext"],
|
||
mimeType=detailRow["mime_type"],
|
||
fileSize=int(detailRow["file_size"]) if detailRow["file_size"] is not None else None,
|
||
ossUrl=detailRow["oss_url"],
|
||
processingStatus=detailRow["processing_status"],
|
||
currentRunId=int(detailRow["current_run_id"]) if detailRow["current_run_id"] is not None else None,
|
||
runStatus=detailRow["run_status"],
|
||
resultStatus=detailRow["result_status"],
|
||
totalScore=float(detailRow["total_score"]) if detailRow["total_score"] is not None else None,
|
||
passedCount=int(detailRow["passed_count"]) if detailRow["passed_count"] is not None else None,
|
||
failedCount=int(detailRow["failed_count"]) if detailRow["failed_count"] is not None else None,
|
||
skippedCount=int(detailRow["skipped_count"]) if detailRow["skipped_count"] is not None else None,
|
||
updatedAt=detailRow["updated_at"].isoformat() if detailRow["updated_at"] else None,
|
||
hasHistory=bool(detailRow["has_history"]),
|
||
totalVersions=int(detailRow["total_versions"] or 1),
|
||
historyVersions=historyVersions,
|
||
documentNumber=detailRow["document_number"],
|
||
remark=detailRow["remark"],
|
||
isTestDocument=bool(detailRow["is_test_document"]),
|
||
auditStatus=int(detailRow["audit_status"]) if detailRow["audit_status"] is not None else None,
|
||
pageCount=None,
|
||
attachments=attachments,
|
||
)
|
||
|
||
async def _hasCrossReviewDocumentAccess(self, Session, DocumentId: int, CurrentUserId: int) -> bool:
|
||
"""判断当前用户是否作为交叉评查任务成员拥有文档访问权。"""
|
||
if not await self._tableExists(Session, "leaudit_cross_review_task_documents"):
|
||
return False
|
||
if not await self._tableExists(Session, "leaudit_cross_review_task_members"):
|
||
return False
|
||
if not await self._tableExists(Session, "leaudit_cross_review_tasks"):
|
||
return False
|
||
|
||
row = (
|
||
await Session.execute(
|
||
text(
|
||
"""
|
||
SELECT 1
|
||
FROM leaudit_cross_review_task_documents td
|
||
JOIN leaudit_cross_review_task_members tm
|
||
ON tm.task_id = td.task_id
|
||
JOIN leaudit_cross_review_tasks t
|
||
ON t.id = td.task_id
|
||
WHERE td.document_id = :document_id
|
||
AND tm.user_id = :user_id
|
||
AND td.delete_time IS NULL
|
||
AND tm.delete_time IS NULL
|
||
AND t.delete_time IS NULL
|
||
LIMIT 1
|
||
"""
|
||
),
|
||
{"document_id": DocumentId, "user_id": CurrentUserId},
|
||
)
|
||
).first()
|
||
return bool(row)
|
||
|
||
def _buildDocumentScopeFilters(
|
||
self,
|
||
CurrentUserId: int,
|
||
CurrentUser: dict[str, Any],
|
||
Params: dict[str, object],
|
||
DocumentAlias: str,
|
||
FileAlias: str,
|
||
RequestedRegion: str | None = None,
|
||
RequestedUserId: int | None = None,
|
||
) -> list[str]:
|
||
"""根据当前用户角色与地区构建文档数据范围过滤。"""
|
||
filters: list[str] = []
|
||
requestedRegion = (RequestedRegion or "").strip()
|
||
area = str(CurrentUser["area"] or "").strip()
|
||
|
||
if CurrentUser["is_global"]:
|
||
if requestedRegion:
|
||
filters.append(f"{DocumentAlias}.region = :requested_region")
|
||
Params["requested_region"] = requestedRegion
|
||
if RequestedUserId is not None:
|
||
filters.append(f"{FileAlias}.created_by = :requested_user_id")
|
||
Params["requested_user_id"] = RequestedUserId
|
||
return filters
|
||
|
||
if CurrentUser["can_manage"]:
|
||
if not area:
|
||
filters.append("1 = 0")
|
||
return filters
|
||
if requestedRegion and requestedRegion != area:
|
||
filters.append("1 = 0")
|
||
return filters
|
||
filters.append(f"{DocumentAlias}.region = :scope_region")
|
||
Params["scope_region"] = area
|
||
if RequestedUserId is not None:
|
||
filters.append(f"{FileAlias}.created_by = :requested_user_id")
|
||
Params["requested_user_id"] = RequestedUserId
|
||
return filters
|
||
|
||
filters.append(f"{FileAlias}.created_by = :scope_user_id")
|
||
Params["scope_user_id"] = CurrentUserId
|
||
if requestedRegion:
|
||
filters.append(f"{DocumentAlias}.region = :requested_region")
|
||
Params["requested_region"] = requestedRegion
|
||
if RequestedUserId is not None and RequestedUserId != CurrentUserId:
|
||
filters.append("1 = 0")
|
||
return filters
|
||
|
||
async def _getCurrentUserContext(self, CurrentUserId: int) -> dict[str, Any]:
|
||
"""加载当前用户上下文,用于文档数据隔离。"""
|
||
async with GetAsyncSession() as Session:
|
||
row = (
|
||
await Session.execute(
|
||
text(
|
||
"""
|
||
SELECT
|
||
u.id,
|
||
COALESCE(u.area, '') AS area,
|
||
COALESCE(bool_or(r.role_key IN ('super_admin', 'provincial_admin')), FALSE) AS is_global,
|
||
COALESCE(bool_or(r.role_key IN ('super_admin', 'provincial_admin', 'admin')), FALSE) AS can_manage,
|
||
COALESCE(bool_or(r.role_key = 'super_admin'), FALSE) AS is_super_admin
|
||
FROM sso_users u
|
||
LEFT JOIN user_role ur ON ur.user_id = u.id
|
||
LEFT JOIN roles r ON r.id = ur.role_id
|
||
WHERE u.id = :user_id
|
||
GROUP BY u.id, u.area
|
||
"""
|
||
),
|
||
{"user_id": CurrentUserId},
|
||
)
|
||
).mappings().first()
|
||
if not row:
|
||
raise LeauditException(StatusCodeEnum.HTTP_404_NOT_FOUND, "当前用户不存在")
|
||
return {
|
||
"area": str(row["area"] or ""),
|
||
"is_global": bool(row["is_global"]),
|
||
"can_manage": bool(row["can_manage"]),
|
||
"is_super_admin": bool(row["is_super_admin"]),
|
||
}
|
||
|
||
async def _loadReviewRunRow(self, Session, Detail: DocumentDetailVO) -> dict[str, Any] | None:
|
||
"""定位当前文档可展示的最新评查运行。"""
|
||
params: dict[str, object] = {"document_id": Detail.documentId}
|
||
if Detail.currentRunId is not None:
|
||
params["run_id"] = Detail.currentRunId
|
||
row = (
|
||
await Session.execute(
|
||
text(
|
||
"""
|
||
SELECT
|
||
id,
|
||
status,
|
||
result_status,
|
||
total_score,
|
||
passed_count,
|
||
failed_count,
|
||
skipped_count,
|
||
llm_model,
|
||
finished_at,
|
||
updated_at
|
||
FROM leaudit_audit_runs
|
||
WHERE id = :run_id
|
||
AND document_id = :document_id
|
||
LIMIT 1
|
||
"""
|
||
),
|
||
params,
|
||
)
|
||
).mappings().first()
|
||
if row:
|
||
return dict(row)
|
||
|
||
row = (
|
||
await Session.execute(
|
||
text(
|
||
"""
|
||
SELECT
|
||
id,
|
||
status,
|
||
result_status,
|
||
total_score,
|
||
passed_count,
|
||
failed_count,
|
||
skipped_count,
|
||
llm_model,
|
||
finished_at,
|
||
updated_at
|
||
FROM leaudit_audit_runs
|
||
WHERE document_id = :document_id
|
||
ORDER BY id DESC
|
||
LIMIT 1
|
||
"""
|
||
),
|
||
params,
|
||
)
|
||
).mappings().first()
|
||
return dict(row) if row else None
|
||
|
||
async def _buildReviewDocumentPayload(
|
||
self,
|
||
Session,
|
||
Detail: DocumentDetailVO,
|
||
RunRow: dict[str, Any] | None,
|
||
) -> dict[str, Any]:
|
||
"""把新文档详情转换成旧详情页可直接消费的文档对象。"""
|
||
metaRow = (
|
||
await Session.execute(
|
||
text(
|
||
"""
|
||
SELECT
|
||
d.created_at,
|
||
f.created_by,
|
||
COALESCE(u.nick_name, u.username, '') AS upload_user
|
||
FROM leaudit_documents d
|
||
JOIN leaudit_document_files f
|
||
ON f.document_id = d.id
|
||
AND f.is_active = true
|
||
AND f.file_role = 'primary'
|
||
LEFT JOIN sso_users u
|
||
ON u.id = f.created_by
|
||
WHERE d.id = :document_id
|
||
LIMIT 1
|
||
"""
|
||
),
|
||
{"document_id": Detail.documentId},
|
||
)
|
||
).mappings().first()
|
||
|
||
metricRow = None
|
||
if RunRow:
|
||
metricRow = (
|
||
await Session.execute(
|
||
text(
|
||
"""
|
||
SELECT page_count
|
||
FROM leaudit_run_metrics
|
||
WHERE run_id = :run_id
|
||
ORDER BY id DESC
|
||
LIMIT 1
|
||
"""
|
||
),
|
||
{"run_id": int(RunRow["id"])},
|
||
)
|
||
).mappings().first()
|
||
|
||
createdAt = metaRow["created_at"] if metaRow else None
|
||
uploadUser = str(metaRow["upload_user"] or "") if metaRow else ""
|
||
pageCount = int(metricRow["page_count"]) if metricRow and metricRow["page_count"] is not None else 0
|
||
|
||
ocrResultPayload = await self._buildReviewOcrPayload(Session, Detail.documentId, RunRow, pageCount)
|
||
|
||
return {
|
||
"id": Detail.documentId,
|
||
"documentId": Detail.documentId,
|
||
"name": Detail.fileName or Detail.normalizedName or f"文档{Detail.documentId}",
|
||
"path": Detail.ossUrl or "",
|
||
"fileName": Detail.fileName or "",
|
||
"fileType": (Detail.fileExt or "").lower(),
|
||
"size": Detail.fileSize,
|
||
"file_size": Detail.fileSize,
|
||
"documentNumber": Detail.documentNumber,
|
||
"document_number": Detail.documentNumber,
|
||
"type": str(Detail.typeId or ""),
|
||
"type_id": Detail.typeId,
|
||
"typeName": Detail.typeName,
|
||
"type_name": Detail.typeName,
|
||
"groupId": Detail.groupId,
|
||
"groupName": Detail.groupName,
|
||
"region": Detail.region,
|
||
"auditStatus": Detail.auditStatus or 0,
|
||
"audit_status": Detail.auditStatus or 0,
|
||
"uploadTime": _format_iso_datetime(createdAt) or Detail.updatedAt or "",
|
||
"created_at": _format_iso_datetime(createdAt) or Detail.updatedAt or "",
|
||
"updated_at": Detail.updatedAt or "",
|
||
"uploadUser": uploadUser,
|
||
"pageCount": pageCount,
|
||
"page_count": pageCount,
|
||
"ocrResult": ocrResultPayload,
|
||
"attachments": [item.model_dump() for item in Detail.attachments],
|
||
}
|
||
|
||
async def _buildReviewOcrPayload(
|
||
self,
|
||
Session,
|
||
DocumentId: int,
|
||
RunRow: dict[str, Any] | None,
|
||
PageCount: int,
|
||
) -> dict[str, Any]:
|
||
"""构建旧详情页仍在读取的最小 ocrResult 结构。"""
|
||
pageMap: dict[str, set[int]] = {}
|
||
|
||
def add_page(field_name: str | None, page_value: Any) -> None:
|
||
if not field_name:
|
||
return
|
||
page = _coerce_positive_int(page_value)
|
||
if page is None:
|
||
return
|
||
section = str(field_name).split("-", 1)[0].strip() or "未分组"
|
||
pageMap.setdefault(section, set()).add(page)
|
||
|
||
if RunRow:
|
||
fieldRows = (
|
||
await Session.execute(
|
||
text(
|
||
"""
|
||
SELECT field_name, meta_json, raw_value_json
|
||
FROM leaudit_field_results
|
||
WHERE run_id = :run_id
|
||
AND document_id = :document_id
|
||
ORDER BY id ASC
|
||
"""
|
||
),
|
||
{"run_id": int(RunRow["id"]), "document_id": DocumentId},
|
||
)
|
||
).mappings().all()
|
||
for row in fieldRows:
|
||
metaJson = row["meta_json"] if isinstance(row["meta_json"], dict) else {}
|
||
rawValueJson = row["raw_value_json"] if isinstance(row["raw_value_json"], dict) else {}
|
||
position = metaJson.get("position") if isinstance(metaJson.get("position"), dict) else {}
|
||
add_page(row["field_name"], position.get("pageNum") or position.get("page"))
|
||
rawPosition = rawValueJson.get("position") if isinstance(rawValueJson.get("position"), dict) else {}
|
||
add_page(row["field_name"], rawPosition.get("pageNum") or rawPosition.get("page"))
|
||
|
||
ruleRows = (
|
||
await Session.execute(
|
||
text(
|
||
"""
|
||
SELECT extracted_fields, field_positions
|
||
FROM leaudit_rule_results
|
||
WHERE run_id = :run_id
|
||
AND document_id = :document_id
|
||
ORDER BY id ASC
|
||
"""
|
||
),
|
||
{"run_id": int(RunRow["id"]), "document_id": DocumentId},
|
||
)
|
||
).mappings().all()
|
||
for row in ruleRows:
|
||
extractedFields = row["extracted_fields"] if isinstance(row["extracted_fields"], dict) else {}
|
||
fieldPositions = row["field_positions"] if isinstance(row["field_positions"], dict) else {}
|
||
for field_name, field_payload in extractedFields.items():
|
||
if isinstance(field_payload, dict):
|
||
add_page(field_name, field_payload.get("page"))
|
||
add_page(field_name, field_payload.get("pageNum"))
|
||
position = fieldPositions.get(field_name) if isinstance(fieldPositions, dict) else None
|
||
add_page(field_name, _extract_review_page(position))
|
||
|
||
normalizedMap = {
|
||
key: {"pages": sorted(value)}
|
||
for key, value in pageMap.items()
|
||
if value
|
||
}
|
||
return {
|
||
"__meta": {"page_offset": 0, "page_count": PageCount},
|
||
"ocr_result": normalizedMap,
|
||
}
|
||
|
||
async def _loadComparisonDocument(self, Session, DocumentId: int) -> dict[str, Any]:
|
||
"""读取合同结构比对结果;缺表或空数据时降级。"""
|
||
if not await self._tableExists(Session, "contract_structure_comparison"):
|
||
return {"template_contract_path": ""}
|
||
|
||
columns = await self._loadTableColumns(Session, "contract_structure_comparison")
|
||
selectColumns: list[str] = ["id", "document_id"]
|
||
optionalColumns = [
|
||
"template_contract_path",
|
||
"comparison_results",
|
||
"created_at",
|
||
"updated_at",
|
||
"comparison_id",
|
||
]
|
||
for column in optionalColumns:
|
||
if column in columns:
|
||
selectColumns.append(column)
|
||
|
||
row = (
|
||
await Session.execute(
|
||
text(
|
||
f"""
|
||
SELECT {', '.join(selectColumns)}
|
||
FROM contract_structure_comparison
|
||
WHERE document_id = :document_id
|
||
ORDER BY id DESC
|
||
LIMIT 1
|
||
"""
|
||
),
|
||
{"document_id": DocumentId},
|
||
)
|
||
).mappings().first()
|
||
|
||
if not row:
|
||
return {"template_contract_path": ""}
|
||
|
||
payload = dict(row)
|
||
rawResults = payload.get("comparison_results")
|
||
if isinstance(rawResults, str):
|
||
import json
|
||
|
||
try:
|
||
payload["comparison_results"] = json.loads(rawResults)
|
||
except Exception:
|
||
payload["comparison_results"] = None
|
||
|
||
if "comparisonId" not in payload:
|
||
payload["comparisonId"] = payload.get("comparison_id") or payload.get("id")
|
||
if "template_contract_path" not in payload:
|
||
payload["template_contract_path"] = ""
|
||
return payload
|
||
|
||
async def _loadScoringProposals(self, Session, DocumentId: int) -> list[dict[str, Any]]:
|
||
"""读取交叉评分提案;缺表时降级为空。"""
|
||
if await self._tableExists(Session, "leaudit_cross_review_proposals"):
|
||
rows = (
|
||
await Session.execute(
|
||
text(
|
||
"""
|
||
SELECT
|
||
id,
|
||
rule_result_id AS evaluation_result_id,
|
||
proposer_id,
|
||
proposed_score_delta AS proposed_score,
|
||
reason,
|
||
status,
|
||
create_time AS created_at,
|
||
update_time AS updated_at,
|
||
document_id
|
||
FROM leaudit_cross_review_proposals
|
||
WHERE document_id = :document_id
|
||
AND delete_time IS NULL
|
||
ORDER BY id DESC
|
||
"""
|
||
),
|
||
{"document_id": DocumentId},
|
||
)
|
||
).mappings().all()
|
||
if rows:
|
||
return [dict(row) for row in rows]
|
||
|
||
if not await self._tableExists(Session, "cross_scoring_proposals"):
|
||
return []
|
||
|
||
columns = await self._loadTableColumns(Session, "cross_scoring_proposals")
|
||
selectColumns = [
|
||
column
|
||
for column in (
|
||
"id",
|
||
"evaluation_result_id",
|
||
"proposer_id",
|
||
"proposed_score",
|
||
"reason",
|
||
"status",
|
||
"created_at",
|
||
"updated_at",
|
||
"document_id",
|
||
"deleted_at",
|
||
)
|
||
if column in columns
|
||
]
|
||
if not selectColumns:
|
||
return []
|
||
|
||
filterClauses = ["document_id = :document_id"]
|
||
if "deleted_at" in columns:
|
||
filterClauses.append("deleted_at IS NULL")
|
||
|
||
rows = (
|
||
await Session.execute(
|
||
text(
|
||
f"""
|
||
SELECT {', '.join(selectColumns)}
|
||
FROM cross_scoring_proposals
|
||
WHERE {' AND '.join(filterClauses)}
|
||
ORDER BY id DESC
|
||
"""
|
||
),
|
||
{"document_id": DocumentId},
|
||
)
|
||
).mappings().all()
|
||
return [dict(row) for row in rows]
|
||
|
||
async def _loadReviewPointResults(
|
||
self,
|
||
Session,
|
||
Detail: DocumentDetailVO,
|
||
RunId: int,
|
||
) -> list[ReviewPointResultVO]:
|
||
"""读取评查运行下的规则结果并转换成前端详情页结构。"""
|
||
rows = (
|
||
await Session.execute(
|
||
text(
|
||
"""
|
||
SELECT
|
||
rr.id,
|
||
rr.rule_id,
|
||
rr.rule_name,
|
||
rr.risk,
|
||
rr.score,
|
||
rr.passed,
|
||
rr.status,
|
||
rr.skip_reason,
|
||
rr.pass_message,
|
||
rr.fail_message,
|
||
rr.stages,
|
||
rr.extracted_fields,
|
||
rr.field_positions,
|
||
rr.remediation,
|
||
rr.rule_meta,
|
||
rr.updated_at,
|
||
a.id AS audit_id,
|
||
a.edit_audit_status,
|
||
a.override_result,
|
||
a.message AS audit_message
|
||
FROM leaudit_rule_results rr
|
||
LEFT JOIN leaudit_review_point_audits a
|
||
ON a.rule_result_id = rr.id
|
||
WHERE rr.run_id = :run_id
|
||
AND rr.document_id = :document_id
|
||
ORDER BY id ASC
|
||
"""
|
||
),
|
||
{"run_id": RunId, "document_id": Detail.documentId},
|
||
)
|
||
).mappings().all()
|
||
rows = await self._backfillMissingReviewFieldPositions(
|
||
Session,
|
||
Detail.documentId,
|
||
RunId,
|
||
rows,
|
||
)
|
||
|
||
result: list[ReviewPointResultVO] = []
|
||
for row in rows:
|
||
extractedFields = row["extracted_fields"] if isinstance(row["extracted_fields"], dict) else {}
|
||
fieldPositions = row["field_positions"] if isinstance(row["field_positions"], dict) else {}
|
||
ruleMeta = row["rule_meta"] if isinstance(row["rule_meta"], dict) else {}
|
||
remediation = row["remediation"] if isinstance(row["remediation"], dict) else {}
|
||
rawStages = row["stages"] if isinstance(row["stages"], list) else []
|
||
stages = _enrich_stage_payload_with_positions(rawStages, fieldPositions)
|
||
score = float(row["score"]) if row["score"] is not None else 0.0
|
||
auditedResult = row["override_result"] if row["edit_audit_status"] == 1 else None
|
||
effectivePassed = auditedResult if auditedResult is not None else row["passed"]
|
||
effectiveStatus = row["status"]
|
||
if auditedResult is not None:
|
||
effectiveStatus = "passed" if auditedResult else ("failed" if row["status"] != "error" else "error")
|
||
resultFlag = None
|
||
if effectiveStatus not in {"skipped", "not_applicable"}:
|
||
resultFlag = bool(effectivePassed) if effectivePassed is not None else None
|
||
title = self._buildReviewPointTitle(row)
|
||
if row["edit_audit_status"] == 1 and row["audit_message"]:
|
||
title = str(row["audit_message"])
|
||
|
||
result.append(
|
||
ReviewPointResultVO(
|
||
id=int(row["id"]),
|
||
documentId=str(Detail.documentId),
|
||
pointId=int(row["id"]),
|
||
editAuditStatusId=int(row["audit_id"]) if row["audit_id"] is not None else "",
|
||
editAuditStatus=int(row["edit_audit_status"] or 0),
|
||
editAuditStatusMessage=str(row["audit_message"] or ""),
|
||
title=title,
|
||
pointName=str(row["rule_name"] or row["rule_id"] or ""),
|
||
pointCode=str(row["rule_id"] or ""),
|
||
groupName=str(ruleMeta.get("group") or Detail.groupName or Detail.typeName or "未分组"),
|
||
status=_map_rule_status(effectiveStatus, effectivePassed, row["risk"]),
|
||
content=_normalize_review_content(extractedFields, fieldPositions),
|
||
contentPage=_normalize_review_content_pages(extractedFields, fieldPositions),
|
||
suggestion=_extract_review_suggestion(remediation, row["fail_message"], row["pass_message"]),
|
||
postAction="manual" if effectivePassed is not True else "",
|
||
actionContent=remediation or {},
|
||
legalBasis=ruleMeta.get("references_laws") or {},
|
||
evaluationConfig={"risk": row["risk"], "status": effectiveStatus, "ruleMeta": ruleMeta},
|
||
score=score,
|
||
finalScore=score if resultFlag is True else (0.0 if resultFlag is False else None),
|
||
machineScore=score,
|
||
result=resultFlag,
|
||
failMessage=str(row["fail_message"] or ""),
|
||
passMessage=str(row["pass_message"] or ""),
|
||
evaluatedPointResultsLog={"rules": _normalize_review_stage_logs(stages)},
|
||
)
|
||
)
|
||
return result
|
||
|
||
async def _backfillMissingReviewFieldPositions(
|
||
self,
|
||
Session,
|
||
DocumentId: int,
|
||
RunId: int,
|
||
rows: list[dict[str, Any]],
|
||
) -> list[dict[str, Any]]:
|
||
"""详情页兜底补齐缺失页码,避免长文本字段只能按文本定位。"""
|
||
missingFields: dict[str, str] = {}
|
||
touchedRuleRows: dict[int, dict[str, Any]] = {}
|
||
|
||
for rawRow in rows:
|
||
row = dict(rawRow)
|
||
extractedFields = row["extracted_fields"] if isinstance(row.get("extracted_fields"), dict) else {}
|
||
fieldPositions = dict(row["field_positions"]) if isinstance(row.get("field_positions"), dict) else {}
|
||
row["field_positions"] = fieldPositions
|
||
|
||
rowTouched = False
|
||
for fieldName, rawValue in extractedFields.items():
|
||
if _extract_review_page(fieldPositions.get(fieldName)) is not None:
|
||
continue
|
||
valueText = _stringify_char_position_value(_normalize_review_value(rawValue))
|
||
if not valueText.strip():
|
||
continue
|
||
existing = missingFields.get(str(fieldName))
|
||
if existing is None or len(valueText) > len(existing):
|
||
missingFields[str(fieldName)] = valueText
|
||
rowTouched = True
|
||
|
||
if rowTouched and row.get("id") is not None:
|
||
touchedRuleRows[int(row["id"])] = row
|
||
|
||
if not missingFields:
|
||
return rows
|
||
|
||
inferredPositions = await self._inferFieldPositionsFromDocumentSource(
|
||
Session,
|
||
DocumentId,
|
||
missingFields,
|
||
)
|
||
if not inferredPositions:
|
||
return rows
|
||
|
||
for ruleRowId, row in touchedRuleRows.items():
|
||
fieldPositions = row["field_positions"] if isinstance(row.get("field_positions"), dict) else {}
|
||
changed = False
|
||
extractedFields = row["extracted_fields"] if isinstance(row.get("extracted_fields"), dict) else {}
|
||
for fieldName in extractedFields.keys():
|
||
key = str(fieldName)
|
||
if _extract_review_page(fieldPositions.get(key)) is not None:
|
||
continue
|
||
inferred = inferredPositions.get(key)
|
||
if inferred is None:
|
||
continue
|
||
fieldPositions[key] = inferred
|
||
changed = True
|
||
|
||
if not changed:
|
||
continue
|
||
|
||
await Session.execute(
|
||
text(
|
||
"""
|
||
UPDATE leaudit_rule_results
|
||
SET field_positions = CAST(:field_positions AS JSONB),
|
||
updated_at = NOW()
|
||
WHERE id = :rule_result_id
|
||
"""
|
||
),
|
||
{
|
||
"rule_result_id": ruleRowId,
|
||
"field_positions": json.dumps(fieldPositions, ensure_ascii=False),
|
||
},
|
||
)
|
||
|
||
for fieldName, position in inferredPositions.items():
|
||
await Session.execute(
|
||
text(
|
||
"""
|
||
UPDATE leaudit_field_results
|
||
SET meta_json = jsonb_set(
|
||
COALESCE(meta_json, '{}'::jsonb),
|
||
'{position}',
|
||
CAST(:position AS JSONB),
|
||
true
|
||
),
|
||
updated_at = NOW()
|
||
WHERE run_id = :run_id
|
||
AND document_id = :document_id
|
||
AND field_name = :field_name
|
||
AND (
|
||
meta_json IS NULL
|
||
OR meta_json->'position' IS NULL
|
||
OR meta_json->'position' = 'null'::jsonb
|
||
)
|
||
"""
|
||
),
|
||
{
|
||
"run_id": RunId,
|
||
"document_id": DocumentId,
|
||
"field_name": fieldName,
|
||
"position": json.dumps(position, ensure_ascii=False),
|
||
},
|
||
)
|
||
|
||
await Session.commit()
|
||
return rows
|
||
|
||
async def _inferFieldPositionsFromDocumentSource(
|
||
self,
|
||
Session,
|
||
DocumentId: int,
|
||
FieldTexts: dict[str, str],
|
||
) -> dict[str, dict[str, Any]]:
|
||
"""按当前文档源文件逐页匹配字段文本,推导最小可用 pageNum。"""
|
||
fileRow = (
|
||
await Session.execute(
|
||
text(
|
||
"""
|
||
SELECT
|
||
id,
|
||
file_name,
|
||
file_ext,
|
||
local_path,
|
||
oss_url,
|
||
file_role
|
||
FROM leaudit_document_files
|
||
WHERE document_id = :document_id
|
||
AND is_active = true
|
||
ORDER BY
|
||
CASE file_role
|
||
WHEN 'converted_pdf' THEN 0
|
||
WHEN 'merged_pdf' THEN 1
|
||
WHEN 'primary' THEN 2
|
||
ELSE 9
|
||
END,
|
||
id DESC
|
||
LIMIT 1
|
||
"""
|
||
),
|
||
{"document_id": DocumentId},
|
||
)
|
||
).mappings().first()
|
||
if not fileRow:
|
||
return {}
|
||
|
||
fileName = str(fileRow["file_name"] or "")
|
||
fileExt = f".{str(fileRow['file_ext']).lstrip('.')}" if fileRow.get("file_ext") else Path(fileName).suffix
|
||
tempPath: str | None = None
|
||
localPath = str(fileRow["local_path"] or "").strip()
|
||
try:
|
||
if localPath and Path(localPath).is_file():
|
||
sourcePath = Path(localPath)
|
||
else:
|
||
ossUrl = str(fileRow["oss_url"] or "").strip()
|
||
if not ossUrl:
|
||
return {}
|
||
downloaded = await self.OssService.DownloadToTempFile(
|
||
ossUrl,
|
||
Suffix=fileExt or Path(fileName).suffix or "",
|
||
Prefix=f"review-page-{DocumentId}-",
|
||
)
|
||
tempPath = downloaded
|
||
sourcePath = Path(downloaded)
|
||
|
||
suffix = sourcePath.suffix.lower()
|
||
if suffix == ".pdf":
|
||
pageTexts = _extract_page_texts_from_pdf(sourcePath)
|
||
elif suffix == ".docx":
|
||
pageTexts = _extract_page_texts_from_docx(sourcePath)
|
||
else:
|
||
return {}
|
||
if not pageTexts:
|
||
return {}
|
||
|
||
inferred: dict[str, dict[str, Any]] = {}
|
||
for fieldName, fieldText in FieldTexts.items():
|
||
pageNum = _infer_page_num_from_page_texts(fieldText, pageTexts)
|
||
if pageNum is None:
|
||
continue
|
||
inferred[fieldName] = {"pageNum": pageNum, "matchMethod": "detail_page_fallback"}
|
||
return inferred
|
||
except Exception:
|
||
return {}
|
||
finally:
|
||
if tempPath:
|
||
try:
|
||
Path(tempPath).unlink(missing_ok=True)
|
||
except OSError:
|
||
pass
|
||
|
||
def _buildReviewPointStats(self, ReviewPoints: list[ReviewPointResultVO]) -> ReviewPointStatsVO:
|
||
"""按旧详情页口径汇总统计。"""
|
||
stats = ReviewPointStatsVO(total=len(ReviewPoints), success=0, warning=0, error=0, score=0)
|
||
for item in ReviewPoints:
|
||
if item.status == "success":
|
||
stats.success += 1
|
||
elif item.status == "warning":
|
||
stats.warning += 1
|
||
elif item.status == "error":
|
||
stats.error += 1
|
||
stats.score += float(item.score or 0)
|
||
return stats
|
||
|
||
def _buildReviewInfo(
|
||
self,
|
||
RunRow: dict[str, Any] | None,
|
||
ReviewPoints: list[ReviewPointResultVO],
|
||
Stats: ReviewPointStatsVO,
|
||
) -> ReviewPointInfoVO:
|
||
"""构建详情页右侧的评查摘要。"""
|
||
groupNames = [item.groupName for item in ReviewPoints if item.groupName]
|
||
uniqueGroups = list(dict.fromkeys(groupNames))
|
||
reviewTime = ""
|
||
reviewModel = "DeepSeek"
|
||
if RunRow:
|
||
reviewTime = _format_display_datetime(RunRow.get("finished_at") or RunRow.get("updated_at"))
|
||
reviewModel = str(RunRow.get("llm_model") or "DeepSeek")
|
||
issueCount = Stats.warning + Stats.error
|
||
return ReviewPointInfoVO(
|
||
reviewTime=reviewTime,
|
||
reviewModel=reviewModel,
|
||
ruleGroup="、".join(uniqueGroups),
|
||
result="warning" if issueCount > 0 else "success",
|
||
issueCount=issueCount,
|
||
)
|
||
|
||
def _buildReviewPointTitle(self, Row: dict[str, Any]) -> str:
|
||
"""生成详情页展示标题。"""
|
||
if Row.get("passed") is True:
|
||
return str(Row.get("pass_message") or Row.get("rule_name") or Row.get("rule_id") or "")
|
||
if Row.get("fail_message"):
|
||
return str(Row["fail_message"])
|
||
if Row.get("skip_reason"):
|
||
return str(Row["skip_reason"])
|
||
return str(Row.get("rule_name") or Row.get("rule_id") or "")
|
||
|
||
async def _syncRuleBindings(self, Session, DocTypeId: int, RuleSetIds: list[int], Region: str = "default") -> None:
|
||
"""全量替换规则绑定。"""
|
||
await Session.execute(
|
||
text("UPDATE leaudit_rule_type_bindings SET deleted_at = NOW() WHERE doc_type_id = :id AND deleted_at IS NULL"),
|
||
{"id": DocTypeId},
|
||
)
|
||
for idx, ruleSetId in enumerate(RuleSetIds):
|
||
await Session.execute(
|
||
text(
|
||
"""
|
||
INSERT INTO leaudit_rule_type_bindings (doc_type_id, rule_set_id, binding_mode, priority, region, is_active, created_at, updated_at)
|
||
VALUES (:doc_type_id, :rule_set_id, 'explicit', :priority, :region, true, NOW(), NOW())
|
||
"""
|
||
),
|
||
{"doc_type_id": DocTypeId, "rule_set_id": ruleSetId, "priority": 100 - idx, "region": Region},
|
||
)
|
||
|
||
|
||
async def _find_latest_version_candidate(
|
||
session,
|
||
*,
|
||
type_id: int,
|
||
root_group_id: int | None,
|
||
region: str,
|
||
normalized_name: str,
|
||
) -> dict | None:
|
||
"""Find the latest primary document version candidate by normalized name.
|
||
|
||
Preferred rule: same region + same root group + same normalized name.
|
||
Fallback rule: when a root group cannot be resolved, keep the old same-type behavior.
|
||
"""
|
||
if root_group_id is not None:
|
||
result = await session.execute(
|
||
text(
|
||
"""
|
||
SELECT
|
||
d.id AS document_id,
|
||
d.version_group_key,
|
||
d.version_no,
|
||
d.root_version_id,
|
||
f.id AS file_id,
|
||
f.sha256
|
||
FROM leaudit_documents d
|
||
JOIN leaudit_document_files f
|
||
ON f.document_id = d.id
|
||
AND f.is_active = true
|
||
AND f.file_role = 'primary'
|
||
LEFT JOIN leaudit_evaluation_point_groups eg
|
||
ON eg.id = d.group_id
|
||
LEFT JOIN (
|
||
SELECT
|
||
document_type_id,
|
||
CASE WHEN COUNT(DISTINCT pid) = 1 THEN MIN(pid) END AS inferred_root_group_id
|
||
FROM leaudit_evaluation_point_groups
|
||
WHERE COALESCE(pid, 0) <> 0
|
||
AND deleted_at IS NULL
|
||
AND is_enabled = true
|
||
AND document_type_id IS NOT NULL
|
||
GROUP BY document_type_id
|
||
) dg
|
||
ON dg.document_type_id = d.type_id
|
||
WHERE d.region = :region
|
||
AND d.normalized_name = :normalized_name
|
||
AND d.is_latest_version = true
|
||
AND d.deleted_at IS NULL
|
||
AND COALESCE(
|
||
CASE
|
||
WHEN eg.id IS NULL THEN NULL
|
||
WHEN COALESCE(eg.pid, 0) = 0 THEN eg.id
|
||
ELSE eg.pid
|
||
END,
|
||
dg.inferred_root_group_id
|
||
) = :root_group_id
|
||
ORDER BY d.version_no DESC, d.id DESC
|
||
LIMIT 1
|
||
"""
|
||
),
|
||
{
|
||
"root_group_id": root_group_id,
|
||
"region": region,
|
||
"normalized_name": normalized_name,
|
||
},
|
||
)
|
||
row = result.mappings().first()
|
||
if row:
|
||
return dict(row)
|
||
|
||
result = await session.execute(
|
||
text(
|
||
"""
|
||
SELECT
|
||
d.id AS document_id,
|
||
d.version_group_key,
|
||
d.version_no,
|
||
d.root_version_id,
|
||
f.id AS file_id,
|
||
f.sha256
|
||
FROM leaudit_documents d
|
||
JOIN leaudit_document_files f
|
||
ON f.document_id = d.id
|
||
AND f.is_active = true
|
||
AND f.file_role = 'primary'
|
||
WHERE d.type_id = :type_id
|
||
AND d.region = :region
|
||
AND d.normalized_name = :normalized_name
|
||
AND d.is_latest_version = true
|
||
AND d.deleted_at IS NULL
|
||
ORDER BY d.version_no DESC, d.id DESC
|
||
LIMIT 1
|
||
"""
|
||
),
|
||
{
|
||
"type_id": type_id,
|
||
"region": region,
|
||
"normalized_name": normalized_name,
|
||
},
|
||
)
|
||
row = result.mappings().first()
|
||
return dict(row) if row else None
|
||
|
||
|
||
def _map_rule_status(status: str | None, passed: bool | None, risk: str | None) -> str:
|
||
"""把新规则结果状态转换成旧详情页口径。"""
|
||
normalized = (status or "").strip().lower()
|
||
if normalized in {"skipped", "not_applicable"}:
|
||
return "notApplicable"
|
||
if passed is True or normalized in {"passed", "success"}:
|
||
return "success"
|
||
if normalized in {"error", "failed", "fail"}:
|
||
return "error" if (risk or "").strip().lower() == "high" else "warning"
|
||
return "warning" if passed is False else "success"
|
||
|
||
|
||
def _normalize_review_content(
|
||
extracted_fields: dict[str, Any],
|
||
field_positions: dict[str, Any],
|
||
) -> dict[str, dict[str, Any]]:
|
||
"""归一化字段内容,兼容旧详情页结构。"""
|
||
content: dict[str, dict[str, Any]] = {}
|
||
for field_name, raw_value in extracted_fields.items():
|
||
position = field_positions.get(field_name) if isinstance(field_positions, dict) else None
|
||
page = _extract_review_page(position)
|
||
char_positions = _build_char_positions(position, raw_value)
|
||
content[str(field_name)] = {
|
||
"page": page if page is not None else "",
|
||
"value": _normalize_review_value(raw_value),
|
||
"char_positions": char_positions,
|
||
}
|
||
return content
|
||
|
||
|
||
def _normalize_review_content_pages(
|
||
extracted_fields: dict[str, Any],
|
||
field_positions: dict[str, Any],
|
||
) -> dict[str, str]:
|
||
"""提取字段页码映射。"""
|
||
pages: dict[str, str] = {}
|
||
for field_name in extracted_fields.keys():
|
||
position = field_positions.get(field_name) if isinstance(field_positions, dict) else None
|
||
page = _extract_review_page(position)
|
||
pages[str(field_name)] = str(page) if page is not None else ""
|
||
return pages
|
||
|
||
|
||
def _normalize_review_value(raw_value: Any) -> Any:
|
||
"""尽量保留原值,同时把复杂对象压到旧前端能识别的结构。"""
|
||
if isinstance(raw_value, dict):
|
||
if "value" in raw_value:
|
||
return raw_value["value"]
|
||
if "text" in raw_value:
|
||
return raw_value["text"]
|
||
if "value_text" in raw_value:
|
||
return raw_value["value_text"]
|
||
return raw_value
|
||
return raw_value
|
||
|
||
|
||
def _extract_review_page(position: Any) -> int | None:
|
||
"""从字段位置信息中提取页码。"""
|
||
if not isinstance(position, dict):
|
||
return None
|
||
page_num = position.get("pageNum")
|
||
if isinstance(page_num, int):
|
||
return page_num
|
||
if isinstance(page_num, str) and page_num.isdigit():
|
||
return int(page_num)
|
||
page_nums = position.get("pageNums")
|
||
if isinstance(page_nums, list) and page_nums:
|
||
first = page_nums[0]
|
||
if isinstance(first, int):
|
||
return first
|
||
if isinstance(first, str) and first.isdigit():
|
||
return int(first)
|
||
match_position = position.get("matchPosition")
|
||
if isinstance(match_position, dict):
|
||
for key in ("page", "pageNum"):
|
||
value = match_position.get(key)
|
||
if isinstance(value, int):
|
||
return value
|
||
if isinstance(value, str) and value.isdigit():
|
||
return int(value)
|
||
return None
|
||
|
||
|
||
def _extract_review_suggestion(remediation: dict[str, Any], fail_message: Any, pass_message: Any) -> str:
|
||
"""从 remediation 中提取适合旧详情页展示的一句建议。"""
|
||
suggestions = remediation.get("suggestions")
|
||
if isinstance(suggestions, list) and suggestions:
|
||
first = suggestions[0]
|
||
if isinstance(first, str):
|
||
return first
|
||
if isinstance(first, dict):
|
||
return str(first.get("text") or first.get("message") or first.get("suggestion") or "")
|
||
actions = remediation.get("actions")
|
||
if isinstance(actions, list) and actions:
|
||
first = actions[0]
|
||
if isinstance(first, str):
|
||
return first
|
||
if isinstance(first, dict):
|
||
return str(first.get("text") or first.get("message") or first.get("action") or "")
|
||
if fail_message:
|
||
return str(fail_message)
|
||
if pass_message:
|
||
return str(pass_message)
|
||
return ""
|
||
|
||
|
||
def _normalize_review_stage_logs(stages: list[Any]) -> list[dict[str, Any]]:
|
||
"""把 stages 压成旧前端的 evaluatedPointResultsLog.rules。"""
|
||
result: list[dict[str, Any]] = []
|
||
for index, stage in enumerate(stages):
|
||
if not isinstance(stage, dict):
|
||
continue
|
||
result.append(
|
||
{
|
||
"id": str(stage.get("id") or index),
|
||
"type": str(stage.get("type") or stage.get("stage") or stage.get("name") or "rule"),
|
||
"res": stage.get("passed"),
|
||
"config": stage,
|
||
}
|
||
)
|
||
return result
|
||
|
||
|
||
def _enrich_stage_payload_with_positions(payload: Any, field_positions: dict[str, Any]) -> Any:
|
||
"""递归给 stage payload 注入最小可用 char_positions。"""
|
||
if isinstance(payload, list):
|
||
return [_enrich_stage_payload_with_positions(item, field_positions) for item in payload]
|
||
|
||
if not isinstance(payload, dict):
|
||
return payload
|
||
|
||
enriched: dict[str, Any] = {}
|
||
for key, value in payload.items():
|
||
enriched[key] = _enrich_stage_payload_with_positions(value, field_positions)
|
||
|
||
if isinstance(value, dict):
|
||
position = field_positions.get(key) if isinstance(field_positions, dict) else None
|
||
if position:
|
||
if ("page" not in enriched[key] or not enriched[key].get("page")):
|
||
page = _extract_review_page(position)
|
||
if page is not None:
|
||
enriched[key]["page"] = page
|
||
if "char_positions" not in enriched[key]:
|
||
char_positions = _build_char_positions(position, value.get("value"))
|
||
if char_positions:
|
||
enriched[key]["char_positions"] = char_positions
|
||
return enriched
|
||
|
||
|
||
def _build_char_positions(position: Any, raw_value: Any) -> list[dict[str, Any]] | None:
|
||
"""把后端已有 bbox / matchPosition 压成前端高亮所需的 char_positions。"""
|
||
if not isinstance(position, dict):
|
||
return None
|
||
|
||
bbox = position.get("bbox")
|
||
if bbox is None and isinstance(position.get("matchPosition"), dict):
|
||
bbox = position["matchPosition"].get("bbox")
|
||
|
||
normalized_box = _normalize_bbox_to_points(bbox)
|
||
if not normalized_box:
|
||
return None
|
||
|
||
value_text = _stringify_char_position_value(raw_value)
|
||
return [
|
||
{
|
||
"box": normalized_box,
|
||
"char": value_text,
|
||
"score": 1,
|
||
}
|
||
]
|
||
|
||
|
||
def _normalize_bbox_to_points(bbox: Any) -> list[list[float]] | None:
|
||
"""兼容多种 bbox 形态,统一转成四点坐标。"""
|
||
if not bbox:
|
||
return None
|
||
|
||
if (
|
||
isinstance(bbox, list)
|
||
and len(bbox) == 4
|
||
and all(isinstance(item, (list, tuple)) and len(item) >= 2 for item in bbox)
|
||
):
|
||
return [[float(point[0]), float(point[1])] for point in bbox]
|
||
|
||
if isinstance(bbox, (list, tuple)) and len(bbox) >= 4 and all(isinstance(item, (int, float)) for item in bbox[:4]):
|
||
x1, y1, x2, y2 = [float(item) for item in bbox[:4]]
|
||
return [[x1, y1], [x2, y1], [x2, y2], [x1, y2]]
|
||
|
||
if isinstance(bbox, dict):
|
||
if all(key in bbox for key in ("x1", "y1", "x2", "y2")):
|
||
x1 = float(bbox["x1"])
|
||
y1 = float(bbox["y1"])
|
||
x2 = float(bbox["x2"])
|
||
y2 = float(bbox["y2"])
|
||
return [[x1, y1], [x2, y1], [x2, y2], [x1, y2]]
|
||
if all(key in bbox for key in ("left", "top", "right", "bottom")):
|
||
x1 = float(bbox["left"])
|
||
y1 = float(bbox["top"])
|
||
x2 = float(bbox["right"])
|
||
y2 = float(bbox["bottom"])
|
||
return [[x1, y1], [x2, y1], [x2, y2], [x1, y2]]
|
||
return None
|
||
|
||
|
||
def _stringify_char_position_value(raw_value: Any) -> str:
|
||
"""将字段值压成前端高亮文本。"""
|
||
normalized = _normalize_review_value(raw_value)
|
||
if normalized is None:
|
||
return ""
|
||
if isinstance(normalized, str):
|
||
return normalized
|
||
if isinstance(normalized, (int, float, bool)):
|
||
return str(normalized)
|
||
return ""
|
||
|
||
|
||
_REVIEW_TEXT_PUNCT_TABLE = str.maketrans({
|
||
",": ",", "。": ".", ";": ";", ":": ":",
|
||
"!": "!", "?": "?", "(": "(", ")": ")",
|
||
"【": "[", "】": "]", "「": '"', "」": '"',
|
||
"‘": "'", "’": "'", "“": '"', "”": '"',
|
||
})
|
||
|
||
|
||
def _normalize_review_match_text(text: str) -> str:
|
||
"""统一比较文本,尽量消除 OCR/文档解析带来的空白和标点差异。"""
|
||
if not text:
|
||
return ""
|
||
normalized = re.sub(r"<[^>]+>", "", str(text))
|
||
normalized = re.sub(r"\s+", "", normalized)
|
||
return normalized.translate(_REVIEW_TEXT_PUNCT_TABLE)
|
||
|
||
|
||
def _infer_page_num_from_page_texts(field_text: str, page_texts: list[tuple[int, str]]) -> int | None:
|
||
"""根据字段文本在逐页正文中的命中情况推导 1-based 页码。"""
|
||
normalizedField = _normalize_review_match_text(field_text)
|
||
if not normalizedField:
|
||
return None
|
||
|
||
bestPage: int | None = None
|
||
bestScore = 0.0
|
||
fieldLength = len(normalizedField)
|
||
|
||
for pageNum, pageText in page_texts:
|
||
normalizedPage = _normalize_review_match_text(pageText)
|
||
if not normalizedPage:
|
||
continue
|
||
if normalizedField in normalizedPage:
|
||
return pageNum
|
||
|
||
if fieldLength < 8:
|
||
continue
|
||
|
||
ratio = _partial_similarity(normalizedField, normalizedPage)
|
||
if ratio > bestScore:
|
||
bestScore = ratio
|
||
bestPage = pageNum
|
||
|
||
if bestScore >= 0.92:
|
||
return bestPage
|
||
return None
|
||
|
||
|
||
def _partial_similarity(needle: str, haystack: str) -> float:
|
||
"""简化版 partial similarity,避免详情页兜底依赖额外包。"""
|
||
if not needle or not haystack:
|
||
return 0.0
|
||
if len(needle) > len(haystack):
|
||
needle, haystack = haystack, needle
|
||
|
||
window = len(needle)
|
||
if window <= 0:
|
||
return 0.0
|
||
if needle == haystack:
|
||
return 1.0
|
||
|
||
best = 0.0
|
||
step = max(1, window // 6)
|
||
stop = max(len(haystack) - window + 1, 1)
|
||
for start in range(0, stop, step):
|
||
chunk = haystack[start : start + window]
|
||
if not chunk:
|
||
continue
|
||
matches = sum(1 for left, right in zip(needle, chunk) if left == right)
|
||
best = max(best, matches / window)
|
||
if best >= 0.999:
|
||
return 1.0
|
||
|
||
return best
|
||
|
||
|
||
def _extract_page_texts_from_pdf(path: Path) -> list[tuple[int, str]]:
|
||
"""平台侧直接从 PDF 提取逐页文本,避免依赖 leaudit 内核。"""
|
||
import fitz
|
||
|
||
doc = fitz.open(path)
|
||
try:
|
||
page_texts: list[tuple[int, str]] = []
|
||
for index in range(len(doc)):
|
||
text = doc[index].get_text("text") or ""
|
||
if text.strip():
|
||
page_texts.append((index + 1, text))
|
||
return page_texts
|
||
finally:
|
||
doc.close()
|
||
|
||
|
||
def _extract_page_texts_from_docx(path: Path) -> list[tuple[int, str]]:
|
||
"""DOCX 无稳定真实分页时,平台侧仅退化为整文第 1 页定位。"""
|
||
from docx import Document as DocxDocument
|
||
|
||
doc = DocxDocument(str(path))
|
||
parts: list[str] = []
|
||
|
||
for para in doc.paragraphs:
|
||
text = (para.text or "").strip()
|
||
if text:
|
||
parts.append(text)
|
||
|
||
for table in doc.tables:
|
||
for row in table.rows:
|
||
cells = [cell.text.strip() for cell in row.cells if cell.text and cell.text.strip()]
|
||
if cells:
|
||
parts.append(" | ".join(cells))
|
||
|
||
text = "\n".join(parts).strip()
|
||
return [(1, text)] if text else []
|
||
|
||
|
||
def _format_display_datetime(value: Any) -> str:
|
||
"""格式化详情页展示时间。"""
|
||
if isinstance(value, datetime):
|
||
return value.strftime("%Y-%m-%d %H:%M:%S")
|
||
return ""
|
||
|
||
|
||
def _format_iso_datetime(value: Any) -> str | None:
|
||
"""格式化 ISO 时间字符串。"""
|
||
if isinstance(value, datetime):
|
||
return value.isoformat()
|
||
return None
|
||
|
||
|
||
def _coerce_positive_int(value: Any) -> int | None:
|
||
"""把各种页码值安全转成正整数。"""
|
||
if isinstance(value, int):
|
||
return value if value > 0 else None
|
||
if isinstance(value, float):
|
||
int_value = int(value)
|
||
return int_value if int_value > 0 else None
|
||
if isinstance(value, str):
|
||
match = re.search(r"\d+", value)
|
||
if match:
|
||
int_value = int(match.group(0))
|
||
return int_value if int_value > 0 else None
|
||
return None
|
||
|
||
|
||
def _normalize_speed(speed: str | None) -> str:
|
||
"""Normalize front-end speed selection to urgent/normal."""
|
||
normalized = (speed or "").strip().lower()
|
||
if normalized in {"urgent", "high", "fast", "emergency", "紧急"}:
|
||
return "urgent"
|
||
return "normal"
|
||
|
||
|
||
def _normalize_document_name(file_name: str) -> str:
|
||
"""Build a stable name key for same-name version matching."""
|
||
stem = Path(file_name).stem
|
||
name = unicodedata.normalize("NFKC", stem).strip().lower()
|
||
name = re.sub(r"[\s_\-]+", " ", name)
|
||
name = re.sub(r"(?:\(|()\d+(?:\)|))$", "", name).strip()
|
||
name = re.sub(r"(?:[-_\s]*副本|[-_\s]*copy)$", "", name).strip()
|
||
name = re.sub(r"\s+", " ", name).strip()
|
||
return name or "untitled"
|