"""文档服务实现。""" from __future__ import annotations import json from typing import Any from datetime import date as date_type, datetime import hashlib import mimetypes import re import time import unicodedata import uuid from pathlib import Path from sqlalchemy import text from fastapi_common.fastapi_common_sqlalchemy.database import GetAsyncSession from fastapi_common.fastapi_common_web.domain.responses import StatusCodeEnum from fastapi_common.fastapi_common_web.exception.LeauditException import LeauditException from fastapi_common.fastapi_common_storage.oss_path_utils import OssPathUtils from fastapi_modules.fastapi_leaudit.domian.vo.documentVo import ( DocumentAttachmentVO, DocumentDetailVO, DocumentHistoryVersionVO, DocumentListItemVO, DocumentListPageVO, DocumentStatusItemVO, DocumentTypeRootCreateDTO, DocumentTypeRootItemVO, DocumentTypeRootUpdateDTO, DocumentUpdateDTO, DocumentTypeCreateDTO, DocumentTypeItemVO, DocumentTypeUpdateDTO, DocumentUploadVO, ) from fastapi_modules.fastapi_leaudit.domian.vo.reviewPointVo import ( DocumentConfirmVO, ReviewPointAuditVO, ReviewPointInfoVO, ReviewPointResultVO, ReviewPointStatsVO, ReviewPointsAggregateVO, ) from fastapi_modules.fastapi_leaudit.models import LeauditDocument, LeauditDocumentFile from fastapi_modules.fastapi_leaudit.services import IAuditService, IDocumentService, IOssService from fastapi_modules.fastapi_leaudit.services.impl.auditServiceImpl import AuditServiceImpl from fastapi_modules.fastapi_leaudit.services.impl.ossServiceImpl import OssServiceImpl from fastapi_modules.fastapi_leaudit.services.impl.ruleGroupSupport import sync_group_bindings_from_doc_type class DocumentServiceImpl(IDocumentService): """文档服务实现。""" def __init__( self, OssService: IOssService | None = None, AuditService: IAuditService | None = None, ) -> None: self.OssService = OssService or OssServiceImpl() self.AuditService = AuditService or AuditServiceImpl() async def Upload( self, FileName: str, FileContent: bytes, ContentType: str | None, TypeId: int | None = None, TypeCode: str | None = None, GroupId: int | None = None, Region: str = "default", FileRole: str = "primary", CreatedBy: int | None = None, Attachments: list[tuple[str, bytes, str | None]] | None = None, AutoRun: bool = False, Speed: str = "normal", ) -> DocumentUploadVO: """上传文档并建立 LeAudit document/file 记录。""" if not FileName: raise LeauditException(StatusCodeEnum.HTTP_400_BAD_REQUEST, "上传文件名不能为空") if not FileContent: raise LeauditException(StatusCodeEnum.HTTP_400_BAD_REQUEST, "上传文件内容不能为空") if not TypeId and not TypeCode: raise LeauditException(StatusCodeEnum.HTTP_400_BAD_REQUEST, "typeId 与 typeCode 至少传一个") normalizedRegion = (Region or "default").strip() or "default" normalizedFileRole = (FileRole or "primary").strip() or "primary" fileExt = Path(FileName).suffix.lstrip(".").lower() or None mimeType = ContentType or mimetypes.guess_type(FileName)[0] or "application/octet-stream" fileSha256 = hashlib.sha256(FileContent).hexdigest() fileSize = len(FileContent) normalizedSpeed = _normalize_speed(Speed) normalizedName = _normalize_document_name(FileName) uploadedAt = datetime.now() async with GetAsyncSession() as Session: await self._ensureDocumentGroupColumn(Session) if TypeId is not None and TypeCode is not None: typeResult = await Session.execute( text( """ SELECT id, code FROM leaudit_document_types WHERE id = :type_id AND code = :type_code AND deleted_at IS NULL LIMIT 1 """ ), {"type_id": TypeId, "type_code": TypeCode}, ) elif TypeId is not None: typeResult = await Session.execute( text( """ SELECT id, code FROM leaudit_document_types WHERE id = :type_id AND deleted_at IS NULL LIMIT 1 """ ), {"type_id": TypeId}, ) else: typeResult = await Session.execute( text( """ SELECT id, code FROM leaudit_document_types WHERE code = :type_code AND deleted_at IS NULL LIMIT 1 """ ), {"type_code": TypeCode}, ) typeRow = typeResult.mappings().first() if not typeRow: raise LeauditException(StatusCodeEnum.HTTP_404_NOT_FOUND, "文档类型不存在或已停用") resolvedTypeId = int(typeRow["id"]) resolvedTypeCode = str(typeRow["code"]) resolvedGroupId = await self._resolveDocumentGroupId(Session, resolvedTypeId, GroupId) resolvedRootGroupId = await self._resolveDocumentRootGroupId(Session, resolvedTypeId, resolvedGroupId) duplicateUpload = False previousVersionId: int | None = None rootVersionId: int | None = None versionGroupKey: str | None = None versionNo = 1 latestCandidate = None if normalizedFileRole == "primary": latestCandidate = await _find_latest_version_candidate( Session, type_id=resolvedTypeId, root_group_id=resolvedRootGroupId, region=normalizedRegion, normalized_name=normalizedName, ) if latestCandidate and latestCandidate["sha256"] == fileSha256: duplicateUpload = True document = await Session.get(LeauditDocument, int(latestCandidate["document_id"])) documentFile = await Session.get(LeauditDocumentFile, int(latestCandidate["file_id"])) if document is None or documentFile is None: raise LeauditException(StatusCodeEnum.HTTP_500_INTERNAL_SERVER_ERROR, "重复上传版本定位失败") await Session.commit() else: internalDocumentNo = time.time_ns() if latestCandidate: previousVersionId = int(latestCandidate["document_id"]) rootVersionId = int(latestCandidate["root_version_id"] or latestCandidate["document_id"]) versionGroupKey = str(latestCandidate["version_group_key"]) versionNo = int(latestCandidate["version_no"]) + 1 previousDocument = await Session.get(LeauditDocument, previousVersionId) if previousDocument is not None: previousDocument.isLatestVersion = False else: versionGroupKey = uuid.uuid4().hex document = await LeauditDocument.create_new( Session, bizDocumentId=internalDocumentNo, typeId=resolvedTypeId, groupId=resolvedGroupId, region=normalizedRegion, processingStatus="waiting", versionGroupKey=versionGroupKey, versionNo=versionNo, previousVersionId=previousVersionId, rootVersionId=rootVersionId, isLatestVersion=True, normalizedName=normalizedName, ) if document.rootVersionId is None: document.rootVersionId = document.Id rootVersionId = document.Id else: rootVersionId = document.rootVersionId versionLabel = f"v{document.versionNo}" objectKey = OssPathUtils.BuildBusinessDocKey( Region=normalizedRegion, TypeCode=resolvedTypeCode, DocumentId=document.Id, Version=versionLabel, FileRole=normalizedFileRole, FileName=FileName, Year=uploadedAt.year, Month=uploadedAt.month, ) ossUrl = await self.OssService.UploadBytes( ObjectKey=objectKey, Content=FileContent, ContentType=mimeType, ) versionCount = await LeauditDocumentFile.count_by_document(Session, document.Id) _ = versionCount # single-version-per-document in current model; kept for future extension await LeauditDocumentFile.deactivate_active_by_document(Session, document.Id) documentFile = LeauditDocumentFile( documentId=document.Id, fileRole=normalizedFileRole, fileName=FileName, fileExt=fileExt, mimeType=mimeType, fileSize=fileSize, sha256=fileSha256, localPath=None, ossUrl=ossUrl, storageProvider="minio", isActive=True, createdBy=CreatedBy, ) Session.add(documentFile) await Session.flush() await Session.commit() await Session.refresh(document) await Session.refresh(documentFile) if normalizedFileRole == "primary" and Attachments: await self._appendAttachmentFiles( Session=Session, DocumentId=document.Id, TypeCode=resolvedTypeCode, Region=normalizedRegion, VersionNo=int(document.versionNo or 1), Files=Attachments, CreatedBy=CreatedBy, ) ossUrl = documentFile.ossUrl or "" run = None processingStatus = document.processingStatus or "waiting" if AutoRun: run = await self.AuditService.Run( DocumentId=document.Id, Speed=Speed, Force=duplicateUpload, ) processingStatus = "running" if run.status in {"pending", "running"} else run.status return DocumentUploadVO( documentId=document.Id, internalDocumentNo=document.bizDocumentId, versionGroupKey=document.versionGroupKey or "", versionNo=int(document.versionNo or 1), previousVersionId=document.previousVersionId, rootVersionId=int(document.rootVersionId or document.Id), duplicateUpload=duplicateUpload, fileId=documentFile.Id, typeId=resolvedTypeId, typeCode=resolvedTypeCode, groupId=resolvedGroupId, region=normalizedRegion, fileName=documentFile.fileName, ossUrl=ossUrl, speed=normalizedSpeed, processingStatus=processingStatus, autoRunTriggered=AutoRun, run=run, ) async def ListDocuments( self, CurrentUserId: int, Page: int = 1, PageSize: int = 20, Keyword: str | None = None, DocumentNumber: str | None = None, TypeCode: str | None = None, TypeIds: list[int] | None = None, EntryModuleId: int | None = None, Region: str | None = None, ProcessingStatus: str | None = None, ResultStatus: str | None = None, AuditStatus: int | None = None, UserId: int | None = None, DateFrom: str | None = None, DateTo: str | None = None, ) -> DocumentListPageVO: """获取文档列表(仅最新版本,附历史版本摘要)。""" page = max(1, int(Page)) page_size = max(1, min(int(PageSize), 100)) offset = (page - 1) * page_size async with GetAsyncSession() as Session: await self._ensureDocumentGroupColumn(Session) documentColumns = await self._loadDocumentColumns(Session) currentUser = await self._getCurrentUserContext(CurrentUserId) filters = ["d.is_latest_version = true", "d.deleted_at IS NULL", "f.is_active = true", "f.file_role = 'primary'"] params: dict[str, object] = {"limit": page_size, "offset": offset} filters.extend( self._buildDocumentScopeFilters( CurrentUserId=CurrentUserId, CurrentUser=currentUser, Params=params, RequestedRegion=Region, RequestedUserId=UserId, DocumentAlias="d", FileAlias="f", ) ) if Keyword: if "document_number" in documentColumns: filters.append("(f.file_name ILIKE :keyword OR d.normalized_name ILIKE :keyword OR d.document_number ILIKE :keyword)") else: filters.append("(f.file_name ILIKE :keyword OR d.normalized_name ILIKE :keyword)") params["keyword"] = f"%{Keyword.strip()}%" if DocumentNumber and "document_number" in documentColumns: filters.append("d.document_number ILIKE :document_number") params["document_number"] = f"%{DocumentNumber.strip()}%" if TypeCode: filters.append("dt.code = :type_code") params["type_code"] = TypeCode.strip() if TypeIds: normalizedTypeIds = [int(typeId) for typeId in TypeIds if int(typeId) > 0] if normalizedTypeIds: filters.append("d.type_id = ANY(:type_ids)") params["type_ids"] = normalizedTypeIds if EntryModuleId is not None and int(EntryModuleId) > 0: filters.append("dt.entry_module_id = :entry_module_id") params["entry_module_id"] = int(EntryModuleId) if ProcessingStatus: filters.append("d.processing_status = :processing_status") params["processing_status"] = ProcessingStatus.strip() if ResultStatus: filters.append("ar.result_status = :result_status") params["result_status"] = ResultStatus.strip() if DateFrom: filters.append("d.created_at >= :date_from") params["date_from"] = date_type.fromisoformat(DateFrom.strip()) if DateTo: filters.append("d.created_at < (CAST(:date_to AS date) + INTERVAL '1 day')") params["date_to"] = date_type.fromisoformat(DateTo.strip()) where_clause = " AND ".join(filters) derivedAuditStatusExpr = """ CASE WHEN LOWER(COALESCE(ar.status, '')) IN ('queued', 'running') OR LOWER(COALESCE(d.processing_status, '')) = 'running' THEN 2 WHEN LOWER(COALESCE(d.processing_status, '')) IN ('waiting', 'pending') THEN 0 WHEN LOWER(COALESCE(d.processing_status, '')) = 'failed' THEN -1 WHEN COALESCE(ar.failed_count, 0) > 0 THEN -1 WHEN COALESCE(ar.passed_count, 0) > 0 THEN 1 ELSE 0 END """.strip() persistedOrDerivedAuditStatusExpr = ( f"COALESCE(d.audit_status, {derivedAuditStatusExpr})" if "audit_status" in documentColumns else derivedAuditStatusExpr ) if AuditStatus is not None: filters.append(f"{persistedOrDerivedAuditStatusExpr} = :audit_status") params["audit_status"] = int(AuditStatus) where_clause = " AND ".join(filters) groupIdSelectExpr = "d.group_id" if "group_id" in documentColumns else "NULL::bigint" # Transitional display fallback: if the document itself has not yet persisted a child group, # but its document type currently maps to exactly one active child group, surface it in list/detail UI. resolvedGroupIdExpr = f"COALESCE({groupIdSelectExpr}, dg.inferred_group_id)" resolvedGroupNameExpr = ( f"CASE WHEN {groupIdSelectExpr} IS NOT NULL THEN eg.name ELSE dg.inferred_group_name END" ) optionalSelects = [ f"{resolvedGroupIdExpr} AS group_id", "d.document_number AS document_number" if "document_number" in documentColumns else "NULL::text AS document_number", f"{persistedOrDerivedAuditStatusExpr} AS audit_status", "d.is_test_document AS is_test_document" if "is_test_document" in documentColumns else "FALSE AS is_test_document", "dt.name AS type_name", f"{resolvedGroupNameExpr} AS group_name", ] count_sql = text( f""" SELECT COUNT(*) FROM leaudit_documents d JOIN leaudit_document_files f ON f.document_id = d.id LEFT JOIN leaudit_document_types dt ON dt.id = d.type_id LEFT JOIN leaudit_evaluation_point_groups eg ON eg.id = d.group_id LEFT JOIN ( SELECT document_type_id, CASE WHEN COUNT(*) = 1 THEN MIN(id) END AS inferred_group_id, CASE WHEN COUNT(*) = 1 THEN MIN(name) END AS inferred_group_name FROM leaudit_evaluation_point_groups WHERE pid <> 0 AND deleted_at IS NULL AND is_enabled = true AND document_type_id IS NOT NULL GROUP BY document_type_id ) dg ON dg.document_type_id = d.type_id LEFT JOIN leaudit_audit_runs ar ON ar.id = d.current_run_id WHERE {where_clause} """ ) list_sql = text( f""" SELECT d.id AS document_id, d.biz_document_id AS internal_document_no, d.version_group_key, d.version_no, d.root_version_id, d.previous_version_id, d.type_id, dt.code AS type_code, d.region, d.normalized_name, d.processing_status, d.current_run_id, d.updated_at, f.id AS file_id, f.file_name, f.file_ext, f.mime_type, f.file_size, f.oss_url, ar.status AS run_status, ar.result_status, ar.total_score, ar.passed_count, ar.failed_count, ar.skipped_count, {', '.join(optionalSelects)}, vc.total_versions, COALESCE(vc.total_versions, 1) > 1 AS has_history FROM leaudit_documents d JOIN leaudit_document_files f ON f.document_id = d.id LEFT JOIN leaudit_document_types dt ON dt.id = d.type_id LEFT JOIN leaudit_evaluation_point_groups eg ON eg.id = d.group_id LEFT JOIN ( SELECT document_type_id, CASE WHEN COUNT(*) = 1 THEN MIN(id) END AS inferred_group_id, CASE WHEN COUNT(*) = 1 THEN MIN(name) END AS inferred_group_name FROM leaudit_evaluation_point_groups WHERE pid <> 0 AND deleted_at IS NULL AND is_enabled = true AND document_type_id IS NOT NULL GROUP BY document_type_id ) dg ON dg.document_type_id = d.type_id LEFT JOIN leaudit_audit_runs ar ON ar.id = d.current_run_id LEFT JOIN ( SELECT version_group_key, COUNT(*) AS total_versions FROM leaudit_documents WHERE deleted_at IS NULL GROUP BY version_group_key ) vc ON vc.version_group_key = d.version_group_key WHERE {where_clause} ORDER BY d.updated_at DESC, d.id DESC LIMIT :limit OFFSET :offset """ ) history_sql = text( """ SELECT d.version_group_key, d.id AS document_id, d.version_no, d.processing_status, d.updated_at, f.id AS file_id, f.file_name, f.file_ext, ar.status AS run_status, ar.result_status FROM leaudit_documents d JOIN leaudit_document_files f ON f.document_id = d.id AND f.is_active = true AND f.file_role = 'primary' LEFT JOIN leaudit_audit_runs ar ON ar.id = d.current_run_id WHERE d.version_group_key = ANY(:group_keys) AND d.is_latest_version = false AND d.deleted_at IS NULL ORDER BY d.version_group_key, d.version_no DESC, d.id DESC """ ) async with GetAsyncSession() as Session: total = int((await Session.execute(count_sql, params)).scalar_one()) rows = (await Session.execute(list_sql, params)).mappings().all() history_by_group: dict[str, list[DocumentHistoryVersionVO]] = {} group_keys = [str(row["version_group_key"]) for row in rows if row["version_group_key"]] if group_keys: history_rows = ( await Session.execute(history_sql, {"group_keys": group_keys}) ).mappings().all() for row in history_rows: history_by_group.setdefault(str(row["version_group_key"]), []).append( DocumentHistoryVersionVO( documentId=int(row["document_id"]), fileId=int(row["file_id"]) if row["file_id"] is not None else None, versionNo=int(row["version_no"]), fileName=row["file_name"], fileExt=row["file_ext"], processingStatus=row["processing_status"], runStatus=row["run_status"], resultStatus=row["result_status"], updatedAt=row["updated_at"].isoformat() if row["updated_at"] else None, ) ) documents: list[DocumentListItemVO] = [] for row in rows: group_key = str(row["version_group_key"] or "") documents.append( DocumentListItemVO( documentId=int(row["document_id"]), internalDocumentNo=int(row["internal_document_no"]), versionGroupKey=group_key, versionNo=int(row["version_no"] or 1), rootVersionId=int(row["root_version_id"] or row["document_id"]), previousVersionId=int(row["previous_version_id"]) if row["previous_version_id"] is not None else None, typeId=int(row["type_id"]) if row["type_id"] is not None else None, typeCode=row["type_code"], typeName=row["type_name"], groupId=int(row["group_id"]) if row["group_id"] is not None else None, groupName=row["group_name"], region=row["region"], normalizedName=row["normalized_name"], fileId=int(row["file_id"]) if row["file_id"] is not None else None, fileName=row["file_name"], fileExt=row["file_ext"], mimeType=row["mime_type"], fileSize=int(row["file_size"]) if row["file_size"] is not None else None, ossUrl=row["oss_url"], processingStatus=row["processing_status"], currentRunId=int(row["current_run_id"]) if row["current_run_id"] is not None else None, runStatus=row["run_status"], resultStatus=row["result_status"], totalScore=float(row["total_score"]) if row["total_score"] is not None else None, passedCount=int(row["passed_count"]) if row["passed_count"] is not None else None, failedCount=int(row["failed_count"]) if row["failed_count"] is not None else None, skippedCount=int(row["skipped_count"]) if row["skipped_count"] is not None else None, documentNumber=row["document_number"], auditStatus=int(row["audit_status"]) if row["audit_status"] is not None else None, isTestDocument=bool(row["is_test_document"]), updatedAt=row["updated_at"].isoformat() if row["updated_at"] else None, hasHistory=bool(row["has_history"]), totalVersions=int(row["total_versions"] or 1), historyVersions=history_by_group.get(group_key, []), ) ) total_pages = (total + page_size - 1) // page_size if total else 0 return DocumentListPageVO( total=total, page=page, pageSize=page_size, totalPages=total_pages, documents=documents, ) async def GetDocument(self, CurrentUserId: int, Id: int) -> DocumentDetailVO: """获取单个文档详情,并执行数据隔离校验。""" async with GetAsyncSession() as Session: await self._ensureDocumentGroupColumn(Session) currentUser = await self._getCurrentUserContext(CurrentUserId) documentColumns = await self._loadDocumentColumns(Session) detail = await self._getDocumentDetail(Session, Id, CurrentUserId, currentUser, documentColumns) if not detail: raise LeauditException(StatusCodeEnum.HTTP_404_NOT_FOUND, "文档不存在或无权访问") return detail async def GetReviewPoints(self, CurrentUserId: int, DocumentId: int) -> ReviewPointsAggregateVO: """获取评查详情页聚合数据。""" async with GetAsyncSession() as Session: await self._ensureDocumentGroupColumn(Session) await self._ensureReviewPointAuditTable(Session) currentUser = await self._getCurrentUserContext(CurrentUserId) documentColumns = await self._loadDocumentColumns(Session) detail = await self._getDocumentDetail(Session, DocumentId, CurrentUserId, currentUser, documentColumns) if not detail: raise LeauditException(StatusCodeEnum.HTTP_404_NOT_FOUND, "文档不存在或无权访问") runRow = await self._loadReviewRunRow(Session, detail) reviewPoints: list[ReviewPointResultVO] = [] stats = ReviewPointStatsVO() if runRow: reviewPoints = await self._loadReviewPointResults(Session, detail, int(runRow["id"])) stats = self._buildReviewPointStats(reviewPoints) documentPayload = await self._buildReviewDocumentPayload(Session, detail, runRow) reviewInfo = self._buildReviewInfo(runRow, reviewPoints, stats) comparisonDocument = await self._loadComparisonDocument(Session, detail.documentId) scoringProposals = await self._loadScoringProposals(Session, detail.documentId) return ReviewPointsAggregateVO( data=reviewPoints, stats=stats, reviewInfo=reviewInfo, document=documentPayload, comparison_document=comparisonDocument, scoring_proposals=scoringProposals, ) async def AuditReviewPoint( self, CurrentUserId: int, ReviewPointResultId: int, Result: str, Message: str, ) -> ReviewPointAuditVO: """更新单个评查点的人工审核状态。""" normalizedResult = (Result or "").strip().lower() if normalizedResult not in {"true", "false", "review"}: raise LeauditException(StatusCodeEnum.HTTP_400_BAD_REQUEST, "result 仅支持 true、false、review") async with GetAsyncSession() as Session: await self._ensureDocumentGroupColumn(Session) await self._ensureReviewPointAuditTable(Session) row = ( await Session.execute( text( """ SELECT document_id FROM leaudit_rule_results WHERE id = :result_id LIMIT 1 """ ), {"result_id": ReviewPointResultId}, ) ).mappings().first() if not row: raise LeauditException(StatusCodeEnum.HTTP_404_NOT_FOUND, "评查点结果不存在") documentId = int(row["document_id"]) currentUser = await self._getCurrentUserContext(CurrentUserId) documentColumns = await self._loadDocumentColumns(Session) detail = await self._getDocumentDetail(Session, documentId, CurrentUserId, currentUser, documentColumns) if not detail: raise LeauditException(StatusCodeEnum.HTTP_404_NOT_FOUND, "文档不存在或无权访问") payload = { "rule_result_id": ReviewPointResultId, "document_id": documentId, "edit_audit_status": 0 if normalizedResult == "review" else 1, "override_result": None if normalizedResult == "review" else (normalizedResult == "true"), "message": "" if normalizedResult == "review" else (Message or ""), "reviewed_by": CurrentUserId, } auditRow = ( await Session.execute( text( """ INSERT INTO leaudit_review_point_audits ( rule_result_id, document_id, edit_audit_status, override_result, message, reviewed_by, created_at, updated_at ) VALUES ( :rule_result_id, :document_id, :edit_audit_status, :override_result, :message, :reviewed_by, NOW(), NOW() ) ON CONFLICT (rule_result_id) DO UPDATE SET edit_audit_status = EXCLUDED.edit_audit_status, override_result = EXCLUDED.override_result, message = EXCLUDED.message, reviewed_by = EXCLUDED.reviewed_by, updated_at = NOW() RETURNING id, edit_audit_status, override_result, message """ ), payload, ) ).mappings().first() await Session.commit() return ReviewPointAuditVO( reviewPointResultId=ReviewPointResultId, editAuditStatusId=int(auditRow["id"]), editAuditStatus=int(auditRow["edit_audit_status"] or 0), overrideResult=bool(auditRow["override_result"]) if auditRow["override_result"] is not None else None, message=str(auditRow["message"] or ""), ) async def ConfirmReviewResults(self, CurrentUserId: int, DocumentId: int) -> DocumentConfirmVO: """确认文档评查结果。""" async with GetAsyncSession() as Session: await self._ensureDocumentGroupColumn(Session) currentUser = await self._getCurrentUserContext(CurrentUserId) documentColumns = await self._loadDocumentColumns(Session) detail = await self._getDocumentDetail(Session, DocumentId, CurrentUserId, currentUser, documentColumns) if not detail: raise LeauditException(StatusCodeEnum.HTTP_404_NOT_FOUND, "文档不存在或无权访问") if "audit_status" in documentColumns: await Session.execute( text( """ UPDATE leaudit_documents SET audit_status = 1, updated_at = NOW() WHERE id = :document_id AND deleted_at IS NULL """ ), {"document_id": DocumentId}, ) await Session.commit() return DocumentConfirmVO(documentId=DocumentId, auditStatus=1) async def GetDocumentsStatus(self, CurrentUserId: int, Ids: list[int]) -> list[DocumentStatusItemVO]: """批量获取文档状态,并执行数据隔离校验。""" normalizedIds = [int(docId) for docId in Ids if int(docId) > 0] if not normalizedIds: return [] async with GetAsyncSession() as Session: currentUser = await self._getCurrentUserContext(CurrentUserId) params: dict[str, object] = {"ids": normalizedIds} filters = [ "d.id = ANY(:ids)", "d.deleted_at IS NULL", "f.is_active = true", "f.file_role = 'primary'", ] filters.extend( self._buildDocumentScopeFilters( CurrentUserId=CurrentUserId, CurrentUser=currentUser, Params=params, DocumentAlias="d", FileAlias="f", ) ) rows = ( await Session.execute( text( f""" SELECT d.id AS document_id, d.processing_status, ar.status AS run_status, ar.result_status, d.updated_at FROM leaudit_documents d JOIN leaudit_document_files f ON f.document_id = d.id LEFT JOIN leaudit_audit_runs ar ON ar.id = d.current_run_id WHERE {' AND '.join(filters)} ORDER BY d.updated_at DESC, d.id DESC """ ), params, ) ).mappings().all() return [ DocumentStatusItemVO( documentId=int(row["document_id"]), processingStatus=row["processing_status"], runStatus=row["run_status"], resultStatus=row["result_status"], updatedAt=row["updated_at"].isoformat() if row["updated_at"] else None, ) for row in rows ] async def UpdateDocument(self, CurrentUserId: int, Id: int, Body: DocumentUpdateDTO) -> DocumentDetailVO: """更新文档元数据,并执行数据隔离校验。""" async with GetAsyncSession() as Session: await self._ensureDocumentGroupColumn(Session) currentUser = await self._getCurrentUserContext(CurrentUserId) documentColumns = await self._loadDocumentColumns(Session) detail = await self._getDocumentDetail(Session, Id, CurrentUserId, currentUser, documentColumns) if not detail: raise LeauditException(StatusCodeEnum.HTTP_404_NOT_FOUND, "文档不存在或无权访问") assignments: list[str] = [] params: dict[str, object] = {"id": Id} if Body.documentNumber is not None and "document_number" in documentColumns: assignments.append("document_number = :document_number") params["document_number"] = Body.documentNumber.strip() or None if Body.remark is not None and "remark" in documentColumns: assignments.append("remark = :remark") params["remark"] = Body.remark.strip() or None if Body.isTestDocument is not None and "is_test_document" in documentColumns: assignments.append("is_test_document = :is_test_document") params["is_test_document"] = Body.isTestDocument if Body.auditStatus is not None and "audit_status" in documentColumns: assignments.append("audit_status = :audit_status") params["audit_status"] = Body.auditStatus if assignments: assignments.append("updated_at = NOW()") await Session.execute( text(f"UPDATE leaudit_documents SET {', '.join(assignments)} WHERE id = :id AND deleted_at IS NULL"), params, ) await Session.commit() refreshed = await self._getDocumentDetail(Session, Id, CurrentUserId, currentUser, documentColumns) if not refreshed: raise LeauditException(StatusCodeEnum.HTTP_404_NOT_FOUND, "文档不存在或无权访问") return refreshed async def _appendAttachmentFiles( self, Session, *, DocumentId: int, TypeCode: str, Region: str, VersionNo: int, Files: list[tuple[str, bytes, str | None]], CreatedBy: int | None, ) -> None: """为文档追加附件文件记录。""" versionLabel = f"v{VersionNo}" uploadedAt = datetime.now() for fileName, fileContent, contentType in Files: if not fileName: raise LeauditException(StatusCodeEnum.HTTP_400_BAD_REQUEST, "附件文件名不能为空") if not fileContent: raise LeauditException(StatusCodeEnum.HTTP_400_BAD_REQUEST, f"附件 {fileName} 内容不能为空") fileExt = Path(fileName).suffix.lstrip(".").lower() or None mimeType = contentType or mimetypes.guess_type(fileName)[0] or "application/octet-stream" fileSha256 = hashlib.sha256(fileContent).hexdigest() fileSize = len(fileContent) objectKey = OssPathUtils.BuildBusinessDocKey( Region=Region, TypeCode=TypeCode, DocumentId=DocumentId, Version=versionLabel, FileRole="attachment", FileName=fileName, Year=uploadedAt.year, Month=uploadedAt.month, ) ossUrl = await self.OssService.UploadBytes( ObjectKey=objectKey, Content=fileContent, ContentType=mimeType, ) attachment = LeauditDocumentFile( documentId=DocumentId, fileRole="attachment", fileName=fileName, fileExt=fileExt, mimeType=mimeType, fileSize=fileSize, sha256=fileSha256, localPath=None, ossUrl=ossUrl, storageProvider="minio", isActive=True, createdBy=CreatedBy, ) Session.add(attachment) await Session.commit() async def DeleteDocument(self, CurrentUserId: int, Id: int) -> None: """软删除文档,并执行数据隔离校验。""" async with GetAsyncSession() as Session: currentUser = await self._getCurrentUserContext(CurrentUserId) filters = ["d.id = :id", "d.deleted_at IS NULL", "f.is_active = true", "f.file_role = 'primary'"] params: dict[str, object] = {"id": Id} filters.extend( self._buildDocumentScopeFilters( CurrentUserId=CurrentUserId, CurrentUser=currentUser, Params=params, DocumentAlias="d", FileAlias="f", ) ) row = ( await Session.execute( text( f""" SELECT d.id, d.version_group_key, d.is_latest_version FROM leaudit_documents d JOIN leaudit_document_files f ON f.document_id = d.id WHERE {' AND '.join(filters)} LIMIT 1 """ ), params, ) ).mappings().first() if not row: raise LeauditException(StatusCodeEnum.HTTP_404_NOT_FOUND, "文档不存在或无权访问") versionGroupKey = str(row["version_group_key"] or "") isLatestVersion = bool(row["is_latest_version"]) await Session.execute( text( """ UPDATE leaudit_documents SET deleted_at = NOW(), is_latest_version = false, updated_at = NOW() WHERE id = :id AND deleted_at IS NULL """ ), {"id": Id}, ) await Session.execute( text( """ UPDATE leaudit_document_files SET is_active = false WHERE document_id = :id AND is_active = true """ ), {"id": Id}, ) if isLatestVersion and versionGroupKey: nextLatest = ( await Session.execute( text( """ SELECT id FROM leaudit_documents WHERE version_group_key = :group_key AND deleted_at IS NULL ORDER BY version_no DESC, id DESC LIMIT 1 """ ), {"group_key": versionGroupKey}, ) ).scalar_one_or_none() if nextLatest is not None: await Session.execute( text( """ UPDATE leaudit_documents SET is_latest_version = true, updated_at = NOW() WHERE id = :id """ ), {"id": int(nextLatest)}, ) await Session.commit() async def AppendAttachments( self, CurrentUserId: int, Id: int, Files: list[tuple[str, bytes, str | None]], ) -> DocumentDetailVO: """为现有文档追加附件,并执行数据隔离校验。""" if not Files: raise LeauditException(StatusCodeEnum.HTTP_400_BAD_REQUEST, "至少上传一个附件文件") async with GetAsyncSession() as Session: await self._ensureDocumentGroupColumn(Session) currentUser = await self._getCurrentUserContext(CurrentUserId) documentColumns = await self._loadDocumentColumns(Session) detail = await self._getDocumentDetail(Session, Id, CurrentUserId, currentUser, documentColumns) if not detail: raise LeauditException(StatusCodeEnum.HTTP_404_NOT_FOUND, "文档不存在或无权访问") documentMeta = ( await Session.execute( text( """ SELECT d.id AS document_id, d.type_id, d.version_no, d.region, dt.code AS type_code FROM leaudit_documents d LEFT JOIN leaudit_document_types dt ON dt.id = d.type_id WHERE d.id = :document_id AND d.deleted_at IS NULL LIMIT 1 """ ), {"document_id": Id}, ) ).mappings().first() if not documentMeta: raise LeauditException(StatusCodeEnum.HTTP_404_NOT_FOUND, "文档不存在或无权访问") resolvedTypeCode = str(documentMeta["type_code"] or "").strip() if not resolvedTypeCode: raise LeauditException(StatusCodeEnum.HTTP_400_BAD_REQUEST, "当前文档缺少文档类型编码,无法追加附件") normalizedRegion = str(documentMeta["region"] or "default").strip() or "default" await self._appendAttachmentFiles( Session=Session, DocumentId=int(documentMeta["document_id"]), TypeCode=resolvedTypeCode, Region=normalizedRegion, VersionNo=int(documentMeta["version_no"] or 1), Files=Files, CreatedBy=CurrentUserId, ) refreshed = await self._getDocumentDetail(Session, Id, CurrentUserId, currentUser, documentColumns) if not refreshed: raise LeauditException(StatusCodeEnum.HTTP_404_NOT_FOUND, "文档不存在或无权访问") return refreshed async def ListDocumentTypes(self, Ids: list[int] | None = None, EntryModuleId: int | None = None) -> list[DocumentTypeItemVO]: """获取文档类型列表。""" async with GetAsyncSession() as Session: rows, bindingsMap = await self._queryDocumentTypes(Session, Ids, EntryModuleId) return [ DocumentTypeItemVO( id=int(r["id"]), name=str(r["name"] or ""), code=str(r["code"] or ""), description=r.get("description"), entryModuleId=r.get("entry_module_id"), isEnabled=bool(r.get("is_enabled", True)), ruleSetIds=bindingsMap.get(int(r["id"]), []), ) for r in rows ] async def GetDocumentType(self, Id: int) -> DocumentTypeItemVO: """获取文档类型详情。""" async with GetAsyncSession() as Session: rows, bindingsMap = await self._queryDocumentTypes(Session, [Id]) if not rows: raise LeauditException(StatusCodeEnum.HTTP_404_NOT_FOUND, "文档类型不存在") r = rows[0] return DocumentTypeItemVO( id=int(r["id"]), name=str(r["name"] or ""), code=str(r["code"] or ""), description=r.get("description"), entryModuleId=r.get("entry_module_id"), isEnabled=bool(r.get("is_enabled", True)), ruleSetIds=bindingsMap.get(int(r["id"]), []), ) async def CreateDocumentType(self, Body: DocumentTypeCreateDTO) -> DocumentTypeItemVO: """创建文档类型。""" async with GetAsyncSession() as Session: existing = ( await Session.execute( text("SELECT 1 FROM leaudit_document_types WHERE code = :code AND deleted_at IS NULL LIMIT 1"), {"code": Body.code.strip()}, ) ).first() if existing: raise LeauditException(StatusCodeEnum.HTTP_409_CONFLICT, f"文档类型编码 {Body.code} 已存在") row = ( await Session.execute( text( """ INSERT INTO leaudit_document_types (code, name, description, entry_module_id, is_enabled, sort_order, created_at, updated_at) VALUES (:code, :name, :description, :entry_module_id, :is_enabled, :sort_order, NOW(), NOW()) RETURNING id """ ), { "code": Body.code.strip(), "name": Body.name.strip(), "description": Body.description.strip() or None, "entry_module_id": Body.entryModuleId, "is_enabled": Body.isEnabled, "sort_order": Body.sortOrder, }, ) ).scalar_one() await self._syncRuleBindings(Session, int(row), Body.ruleSetIds, "default") await sync_group_bindings_from_doc_type(Session, int(row), Body.ruleSetIds) await Session.commit() return await self.GetDocumentType(int(row)) async def UpdateDocumentType(self, Id: int, Body: DocumentTypeUpdateDTO) -> DocumentTypeItemVO: """更新文档类型。""" async with GetAsyncSession() as Session: current = ( await Session.execute( text("SELECT id FROM leaudit_document_types WHERE id = :id AND deleted_at IS NULL"), {"id": Id}, ) ).first() if not current: raise LeauditException(StatusCodeEnum.HTTP_404_NOT_FOUND, "文档类型不存在") providedFields = set(getattr(Body, "model_fields_set", set())) sets: list[str] = [] params: dict[str, object] = {"id": Id} if "name" in providedFields and Body.name is not None: sets.append("name = :name") params["name"] = Body.name.strip() if "description" in providedFields: sets.append("description = :description") params["description"] = Body.description.strip() if Body.description is not None else None if params["description"] == "": params["description"] = None if "entryModuleId" in providedFields: sets.append("entry_module_id = :entry_module_id") params["entry_module_id"] = Body.entryModuleId if "isEnabled" in providedFields and Body.isEnabled is not None: sets.append("is_enabled = :is_enabled") params["is_enabled"] = Body.isEnabled if "sortOrder" in providedFields and Body.sortOrder is not None: sets.append("sort_order = :sort_order") params["sort_order"] = Body.sortOrder if sets: sets.append("updated_at = NOW()") await Session.execute(text(f"UPDATE leaudit_document_types SET {', '.join(sets)} WHERE id = :id"), params) if "ruleSetIds" in providedFields and Body.ruleSetIds is not None: await self._syncRuleBindings(Session, Id, Body.ruleSetIds, "default") await sync_group_bindings_from_doc_type(Session, Id, Body.ruleSetIds) await Session.commit() return await self.GetDocumentType(Id) async def DeleteDocumentType(self, Id: int) -> None: """软删除文档类型。""" async with GetAsyncSession() as Session: current = ( await Session.execute( text("SELECT 1 FROM leaudit_document_types WHERE id = :id AND deleted_at IS NULL"), {"id": Id}, ) ).first() if not current: raise LeauditException(StatusCodeEnum.HTTP_404_NOT_FOUND, "文档类型不存在") await Session.execute(text("UPDATE leaudit_document_types SET deleted_at = NOW() WHERE id = :id"), {"id": Id}) await Session.execute(text("UPDATE leaudit_rule_type_bindings SET deleted_at = NOW() WHERE doc_type_id = :id AND deleted_at IS NULL"), {"id": Id}) await Session.commit() async def ListDocumentTypeRoots(self, EntryModuleId: int | None = None) -> list[DocumentTypeRootItemVO]: """获取一级文档类型(业务大类)列表。""" async with GetAsyncSession() as Session: rows = await self._queryDocumentTypeRoots(Session, EntryModuleId=EntryModuleId) return [self._toDocumentTypeRootVo(row) for row in rows] async def GetDocumentTypeRoot(self, Id: int) -> DocumentTypeRootItemVO: """获取一级文档类型(业务大类)详情。""" async with GetAsyncSession() as Session: rows = await self._queryDocumentTypeRoots(Session, Ids=[Id]) if not rows: raise LeauditException(StatusCodeEnum.HTTP_404_NOT_FOUND, "一级文档类型不存在") return self._toDocumentTypeRootVo(rows[0]) async def CreateDocumentTypeRoot(self, Body: DocumentTypeRootCreateDTO) -> DocumentTypeRootItemVO: """创建一级文档类型(业务大类)。""" async with GetAsyncSession() as Session: await self._ensureDocumentTypeRootCodeUnique(Session, Body.code.strip(), None) root_id = ( await Session.execute( text( """ INSERT INTO leaudit_evaluation_point_groups (pid, name, code, description, document_type_id, entry_module_id, sort_order, is_enabled, created_at, updated_at) VALUES (0, :name, :code, :description, NULL, :entry_module_id, :sort_order, :is_enabled, NOW(), NOW()) RETURNING id """ ), { "name": Body.name.strip(), "code": Body.code.strip(), "description": Body.description.strip() or None, "entry_module_id": Body.entryModuleId, "sort_order": Body.sortOrder, "is_enabled": Body.isEnabled, }, ) ).scalar_one() await Session.commit() return await self.GetDocumentTypeRoot(int(root_id)) async def UpdateDocumentTypeRoot(self, Id: int, Body: DocumentTypeRootUpdateDTO) -> DocumentTypeRootItemVO: """更新一级文档类型(业务大类)。""" async with GetAsyncSession() as Session: current = ( await Session.execute( text( """ SELECT 1 FROM leaudit_evaluation_point_groups WHERE id = :id AND deleted_at IS NULL AND COALESCE(pid, 0) = 0 """ ), {"id": Id}, ) ).first() if not current: raise LeauditException(StatusCodeEnum.HTTP_404_NOT_FOUND, "一级文档类型不存在") providedFields = set(getattr(Body, "model_fields_set", set())) sets: list[str] = [] params: dict[str, object] = {"id": Id} if "name" in providedFields and Body.name is not None: sets.append("name = :name") params["name"] = Body.name.strip() if "description" in providedFields: sets.append("description = :description") params["description"] = Body.description.strip() if Body.description is not None else None if params["description"] == "": params["description"] = None if "entryModuleId" in providedFields: sets.append("entry_module_id = :entry_module_id") params["entry_module_id"] = Body.entryModuleId if "isEnabled" in providedFields and Body.isEnabled is not None: sets.append("is_enabled = :is_enabled") params["is_enabled"] = Body.isEnabled if "sortOrder" in providedFields and Body.sortOrder is not None: sets.append("sort_order = :sort_order") params["sort_order"] = Body.sortOrder if sets: sets.append("updated_at = NOW()") await Session.execute( text(f"UPDATE leaudit_evaluation_point_groups SET {', '.join(sets)} WHERE id = :id"), params, ) await Session.commit() return await self.GetDocumentTypeRoot(Id) async def _queryDocumentTypes(self, Session, Ids: list[int] | None = None, EntryModuleId: int | None = None): """查询文档类型及其规则绑定。""" if Ids: rows = ( await Session.execute( text( """ SELECT id, code, name, description, entry_module_id, is_enabled, sort_order FROM leaudit_document_types WHERE deleted_at IS NULL AND id = ANY(:ids) ORDER BY sort_order ASC, id ASC """ ), {"ids": Ids}, ) ).mappings().all() elif EntryModuleId is not None: rows = ( await Session.execute( text( """ SELECT id, code, name, description, entry_module_id, is_enabled, sort_order FROM leaudit_document_types WHERE deleted_at IS NULL AND entry_module_id = :entry_module_id ORDER BY sort_order ASC, id ASC """ ), {"entry_module_id": EntryModuleId}, ) ).mappings().all() else: rows = ( await Session.execute( text( """ SELECT id, code, name, description, entry_module_id, is_enabled, sort_order FROM leaudit_document_types WHERE deleted_at IS NULL ORDER BY sort_order ASC, id ASC """ ) ) ).mappings().all() allIds = [int(r["id"]) for r in rows] bindingsMap: dict[int, list[int]] = {i: [] for i in allIds} if allIds: bindingRows = ( await Session.execute( text( """ SELECT doc_type_id, rule_set_id FROM leaudit_rule_type_bindings WHERE doc_type_id = ANY(:ids) AND deleted_at IS NULL AND is_active = true ORDER BY priority DESC """ ), {"ids": allIds}, ) ).fetchall() for b in bindingRows: bindingsMap.setdefault(int(b[0]), []).append(int(b[1])) return rows, bindingsMap async def _queryDocumentTypeRoots(self, Session, Ids: list[int] | None = None, EntryModuleId: int | None = None): filters = ["g.deleted_at IS NULL", "COALESCE(g.pid, 0) = 0"] params: dict[str, object] = {} if Ids: filters.append("g.id = ANY(:ids)") params["ids"] = Ids if EntryModuleId is not None: filters.append("g.entry_module_id = :entry_module_id") params["entry_module_id"] = EntryModuleId where_clause = " AND ".join(filters) rows = ( await Session.execute( text( f""" SELECT g.id, g.name, g.code, g.description, g.entry_module_id, em.name AS entry_module_name, g.is_enabled, COALESCE(child_stats.child_group_count, 0) AS child_group_count, COALESCE(rule_stats.rule_set_count, 0) AS rule_set_count, COALESCE(rule_stats.rule_set_ids, ARRAY[]::bigint[]) AS rule_set_ids FROM leaudit_evaluation_point_groups g LEFT JOIN leaudit_entry_modules em ON em.id = g.entry_module_id LEFT JOIN ( SELECT pid, COUNT(*)::int AS child_group_count FROM leaudit_evaluation_point_groups WHERE deleted_at IS NULL AND COALESCE(pid, 0) <> 0 GROUP BY pid ) child_stats ON child_stats.pid = g.id LEFT JOIN ( SELECT child.pid, COUNT(DISTINCT rgb.rule_set_id)::int AS rule_set_count, ARRAY_AGG(DISTINCT rgb.rule_set_id) FILTER (WHERE rgb.rule_set_id IS NOT NULL) AS rule_set_ids FROM leaudit_evaluation_point_groups child LEFT JOIN leaudit_rule_group_bindings rgb ON rgb.group_id = child.id AND rgb.deleted_at IS NULL WHERE child.deleted_at IS NULL AND COALESCE(child.pid, 0) <> 0 GROUP BY child.pid ) rule_stats ON rule_stats.pid = g.id WHERE {where_clause} ORDER BY COALESCE(g.sort_order, 0) ASC, g.id ASC """ ), params, ) ).mappings().all() return rows def _toDocumentTypeRootVo(self, row) -> DocumentTypeRootItemVO: rule_set_ids = [int(item) for item in (row.get("rule_set_ids") or []) if item is not None] return DocumentTypeRootItemVO( id=int(row["id"]), name=str(row["name"] or ""), code=str(row["code"] or ""), description=row.get("description"), entryModuleId=int(row["entry_module_id"]) if row.get("entry_module_id") is not None else None, entryModuleName=row.get("entry_module_name"), isEnabled=bool(row.get("is_enabled", True)), childGroupCount=int(row.get("child_group_count") or 0), ruleSetCount=int(row.get("rule_set_count") or 0), ruleSetIds=rule_set_ids, ) async def _ensureDocumentTypeRootCodeUnique(self, Session, Code: str, CurrentId: int | None) -> None: sql = """ SELECT 1 FROM leaudit_evaluation_point_groups WHERE deleted_at IS NULL AND COALESCE(pid, 0) = 0 AND code = :code """ params: dict[str, object] = {"code": Code} if CurrentId is not None: sql += " AND id <> :current_id" params["current_id"] = CurrentId exists = (await Session.execute(text(sql), params)).first() if exists: raise LeauditException(StatusCodeEnum.HTTP_409_CONFLICT, f"一级文档类型编码 {Code} 已存在") async def _loadDocumentColumns(self, Session) -> set[str]: """读取 leaudit_documents 当前真实列,兼容不同环境的渐进式字段上线。""" rows = ( await Session.execute( text( """ SELECT column_name FROM information_schema.columns WHERE table_schema = 'public' AND table_name = 'leaudit_documents' """ ) ) ).fetchall() return {str(row[0]) for row in rows} async def _ensureReviewPointAuditTable(self, Session) -> None: """补齐详情页人工审核表。""" await Session.execute( text( """ CREATE TABLE IF NOT EXISTS leaudit_review_point_audits ( id BIGSERIAL PRIMARY KEY, rule_result_id BIGINT NOT NULL UNIQUE, document_id BIGINT NOT NULL, edit_audit_status INTEGER NOT NULL DEFAULT 0, override_result BOOLEAN NULL, message TEXT NOT NULL DEFAULT '', reviewed_by BIGINT NULL, created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW() ) """ ) ) await Session.execute( text( "CREATE INDEX IF NOT EXISTS idx_leaudit_review_point_audits_document_id ON leaudit_review_point_audits(document_id)" ) ) async def _tableExists(self, Session, TableName: str) -> bool: """检查表是否存在。""" row = ( await Session.execute( text( """ SELECT EXISTS ( SELECT 1 FROM information_schema.tables WHERE table_schema = 'public' AND table_name = :table_name ) AS exists_flag """ ), {"table_name": TableName}, ) ).mappings().first() return bool(row["exists_flag"]) if row else False async def _loadTableColumns(self, Session, TableName: str) -> set[str]: """读取任意 public 表的真实列。""" rows = ( await Session.execute( text( """ SELECT column_name FROM information_schema.columns WHERE table_schema = 'public' AND table_name = :table_name """ ), {"table_name": TableName}, ) ).fetchall() return {str(row[0]) for row in rows} async def _ensureDocumentGroupColumn(self, Session) -> None: """渐进式补齐文档二级分组字段,避免旧环境缺列。""" await Session.execute( text( """ ALTER TABLE leaudit_documents ADD COLUMN IF NOT EXISTS group_id BIGINT NULL REFERENCES leaudit_evaluation_point_groups(id) """ ) ) await Session.execute( text("CREATE INDEX IF NOT EXISTS idx_leaudit_documents_group_id ON leaudit_documents(group_id)") ) async def _resolveDocumentGroupId(self, Session, TypeId: int, GroupId: int | None) -> int | None: """校验上传时选择的二级分组是否属于当前文档类型。""" if GroupId is None: return None row = ( await Session.execute( text( """ SELECT id FROM leaudit_evaluation_point_groups WHERE id = :group_id AND document_type_id = :doc_type_id AND deleted_at IS NULL AND COALESCE(pid, 0) <> 0 LIMIT 1 """ ), {"group_id": GroupId, "doc_type_id": TypeId}, ) ).mappings().first() if not row: raise LeauditException(StatusCodeEnum.HTTP_400_BAD_REQUEST, "当前子类型不属于所选文档类型,无法上传") return int(row["id"]) async def _resolveDocumentRootGroupId(self, Session, TypeId: int, GroupId: int | None) -> int | None: """解析上传命中的一级分组,用于跨二级类型做版本归档。""" if GroupId is not None: row = ( await Session.execute( text( """ SELECT CASE WHEN COALESCE(pid, 0) = 0 THEN id ELSE pid END AS root_group_id FROM leaudit_evaluation_point_groups WHERE id = :group_id AND deleted_at IS NULL LIMIT 1 """ ), {"group_id": GroupId}, ) ).mappings().first() if row and row["root_group_id"] is not None: return int(row["root_group_id"]) row = ( await Session.execute( text( """ SELECT CASE WHEN COUNT(DISTINCT pid) = 1 THEN MIN(pid) END AS root_group_id FROM leaudit_evaluation_point_groups WHERE document_type_id = :doc_type_id AND COALESCE(pid, 0) <> 0 AND deleted_at IS NULL AND is_enabled = true """ ), {"doc_type_id": TypeId}, ) ).mappings().first() if row and row["root_group_id"] is not None: return int(row["root_group_id"]) return None async def _getDocumentDetail( self, Session, DocumentId: int, CurrentUserId: int, CurrentUser: dict[str, Any], DocumentColumns: set[str], ) -> DocumentDetailVO | None: """查询单文档详情,并附带历史版本。""" params: dict[str, object] = {"id": DocumentId} filters = ["d.id = :id", "d.deleted_at IS NULL", "f.is_active = true", "f.file_role = 'primary'"] filters.extend( self._buildDocumentScopeFilters( CurrentUserId=CurrentUserId, CurrentUser=CurrentUser, Params=params, DocumentAlias="d", FileAlias="f", ) ) whereClause = " AND ".join(filters) groupIdSelectExpr = "d.group_id" if "group_id" in DocumentColumns else "NULL::bigint" resolvedGroupIdExpr = f"COALESCE({groupIdSelectExpr}, dg.inferred_group_id)" resolvedGroupNameExpr = ( f"CASE WHEN {groupIdSelectExpr} IS NOT NULL THEN eg.name ELSE dg.inferred_group_name END" ) optionalSelects = [ f"{resolvedGroupIdExpr} AS group_id", "d.document_number AS document_number" if "document_number" in DocumentColumns else "NULL::text AS document_number", "d.remark AS remark" if "remark" in DocumentColumns else "NULL::text AS remark", "d.is_test_document AS is_test_document" if "is_test_document" in DocumentColumns else "FALSE AS is_test_document", "d.audit_status AS audit_status" if "audit_status" in DocumentColumns else "NULL::integer AS audit_status", ] detailRow = ( await Session.execute( text( f""" SELECT d.id AS document_id, d.biz_document_id AS internal_document_no, d.version_group_key, d.version_no, d.root_version_id, d.previous_version_id, d.type_id, dt.code AS type_code, dt.name AS type_name, {resolvedGroupNameExpr} AS group_name, d.region, d.normalized_name, d.processing_status, d.current_run_id, d.updated_at, f.id AS file_id, f.file_name, f.file_ext, f.mime_type, f.file_size, f.oss_url, ar.status AS run_status, ar.result_status, ar.total_score, ar.passed_count, ar.failed_count, ar.skipped_count, vc.total_versions, COALESCE(vc.total_versions, 1) > 1 AS has_history, {', '.join(optionalSelects)} FROM leaudit_documents d JOIN leaudit_document_files f ON f.document_id = d.id LEFT JOIN leaudit_document_types dt ON dt.id = d.type_id LEFT JOIN leaudit_evaluation_point_groups eg ON eg.id = d.group_id LEFT JOIN ( SELECT document_type_id, CASE WHEN COUNT(*) = 1 THEN MIN(id) END AS inferred_group_id, CASE WHEN COUNT(*) = 1 THEN MIN(name) END AS inferred_group_name FROM leaudit_evaluation_point_groups WHERE pid <> 0 AND deleted_at IS NULL AND is_enabled = true AND document_type_id IS NOT NULL GROUP BY document_type_id ) dg ON dg.document_type_id = d.type_id LEFT JOIN leaudit_audit_runs ar ON ar.id = d.current_run_id LEFT JOIN ( SELECT version_group_key, COUNT(*) AS total_versions FROM leaudit_documents WHERE deleted_at IS NULL GROUP BY version_group_key ) vc ON vc.version_group_key = d.version_group_key WHERE {whereClause} LIMIT 1 """ ), params, ) ).mappings().first() if not detailRow: return None groupKey = str(detailRow["version_group_key"] or "") historyVersions: list[DocumentHistoryVersionVO] = [] if groupKey: historyRows = ( await Session.execute( text( """ SELECT d.id AS document_id, d.version_no, d.processing_status, d.updated_at, f.id AS file_id, f.file_name, f.file_ext, ar.status AS run_status, ar.result_status FROM leaudit_documents d JOIN leaudit_document_files f ON f.document_id = d.id AND f.is_active = true AND f.file_role = 'primary' LEFT JOIN leaudit_audit_runs ar ON ar.id = d.current_run_id WHERE d.version_group_key = :group_key AND d.id <> :document_id AND d.deleted_at IS NULL ORDER BY d.version_no DESC, d.id DESC """ ), {"group_key": groupKey, "document_id": int(detailRow["document_id"])}, ) ).mappings().all() historyVersions = [ DocumentHistoryVersionVO( documentId=int(row["document_id"]), fileId=int(row["file_id"]) if row["file_id"] is not None else None, versionNo=int(row["version_no"]), fileName=row["file_name"], fileExt=row["file_ext"], processingStatus=row["processing_status"], runStatus=row["run_status"], resultStatus=row["result_status"], updatedAt=row["updated_at"].isoformat() if row["updated_at"] else None, ) for row in historyRows ] attachmentRows = ( await Session.execute( text( """ SELECT id, file_name, file_ext, mime_type, file_size, file_role, oss_url, created_by, created_at FROM leaudit_document_files WHERE document_id = :document_id AND deleted_at IS NULL AND is_active = true AND file_role = 'attachment' ORDER BY id ASC """ ), {"document_id": int(detailRow["document_id"])}, ) ).mappings().all() attachments = [ DocumentAttachmentVO( fileId=int(row["id"]), fileName=str(row["file_name"] or ""), fileExt=row["file_ext"], mimeType=row["mime_type"], fileSize=int(row["file_size"]) if row["file_size"] is not None else None, fileRole=str(row["file_role"] or "attachment"), ossUrl=row["oss_url"], createdBy=int(row["created_by"]) if row["created_by"] is not None else None, createdAt=row["created_at"].isoformat() if row["created_at"] else None, ) for row in attachmentRows ] return DocumentDetailVO( documentId=int(detailRow["document_id"]), internalDocumentNo=int(detailRow["internal_document_no"]), versionGroupKey=groupKey, versionNo=int(detailRow["version_no"] or 1), rootVersionId=int(detailRow["root_version_id"] or detailRow["document_id"]), previousVersionId=int(detailRow["previous_version_id"]) if detailRow["previous_version_id"] is not None else None, typeId=int(detailRow["type_id"]) if detailRow["type_id"] is not None else None, typeCode=detailRow["type_code"], typeName=detailRow["type_name"], groupId=int(detailRow["group_id"]) if detailRow["group_id"] is not None else None, groupName=detailRow["group_name"], region=str(detailRow["region"] or ""), normalizedName=detailRow["normalized_name"], fileId=int(detailRow["file_id"]) if detailRow["file_id"] is not None else None, fileName=detailRow["file_name"], fileExt=detailRow["file_ext"], mimeType=detailRow["mime_type"], fileSize=int(detailRow["file_size"]) if detailRow["file_size"] is not None else None, ossUrl=detailRow["oss_url"], processingStatus=detailRow["processing_status"], currentRunId=int(detailRow["current_run_id"]) if detailRow["current_run_id"] is not None else None, runStatus=detailRow["run_status"], resultStatus=detailRow["result_status"], totalScore=float(detailRow["total_score"]) if detailRow["total_score"] is not None else None, passedCount=int(detailRow["passed_count"]) if detailRow["passed_count"] is not None else None, failedCount=int(detailRow["failed_count"]) if detailRow["failed_count"] is not None else None, skippedCount=int(detailRow["skipped_count"]) if detailRow["skipped_count"] is not None else None, updatedAt=detailRow["updated_at"].isoformat() if detailRow["updated_at"] else None, hasHistory=bool(detailRow["has_history"]), totalVersions=int(detailRow["total_versions"] or 1), historyVersions=historyVersions, documentNumber=detailRow["document_number"], remark=detailRow["remark"], isTestDocument=bool(detailRow["is_test_document"]), auditStatus=int(detailRow["audit_status"]) if detailRow["audit_status"] is not None else None, pageCount=None, attachments=attachments, ) def _buildDocumentScopeFilters( self, CurrentUserId: int, CurrentUser: dict[str, Any], Params: dict[str, object], DocumentAlias: str, FileAlias: str, RequestedRegion: str | None = None, RequestedUserId: int | None = None, ) -> list[str]: """根据当前用户角色与地区构建文档数据范围过滤。""" filters: list[str] = [] requestedRegion = (RequestedRegion or "").strip() area = str(CurrentUser["area"] or "").strip() if CurrentUser["is_global"]: if requestedRegion: filters.append(f"{DocumentAlias}.region = :requested_region") Params["requested_region"] = requestedRegion if RequestedUserId is not None: filters.append(f"{FileAlias}.created_by = :requested_user_id") Params["requested_user_id"] = RequestedUserId return filters if CurrentUser["can_manage"]: if not area: filters.append("1 = 0") return filters if requestedRegion and requestedRegion != area: filters.append("1 = 0") return filters filters.append(f"{DocumentAlias}.region = :scope_region") Params["scope_region"] = area if RequestedUserId is not None: filters.append(f"{FileAlias}.created_by = :requested_user_id") Params["requested_user_id"] = RequestedUserId return filters filters.append(f"{FileAlias}.created_by = :scope_user_id") Params["scope_user_id"] = CurrentUserId if requestedRegion: filters.append(f"{DocumentAlias}.region = :requested_region") Params["requested_region"] = requestedRegion if RequestedUserId is not None and RequestedUserId != CurrentUserId: filters.append("1 = 0") return filters async def _getCurrentUserContext(self, CurrentUserId: int) -> dict[str, Any]: """加载当前用户上下文,用于文档数据隔离。""" async with GetAsyncSession() as Session: row = ( await Session.execute( text( """ SELECT u.id, COALESCE(u.area, '') AS area, COALESCE(bool_or(r.role_key IN ('super_admin', 'provincial_admin')), FALSE) AS is_global, COALESCE(bool_or(r.role_key IN ('super_admin', 'provincial_admin', 'admin')), FALSE) AS can_manage, COALESCE(bool_or(r.role_key = 'super_admin'), FALSE) AS is_super_admin FROM sso_users u LEFT JOIN user_role ur ON ur.user_id = u.id LEFT JOIN roles r ON r.id = ur.role_id WHERE u.id = :user_id GROUP BY u.id, u.area """ ), {"user_id": CurrentUserId}, ) ).mappings().first() if not row: raise LeauditException(StatusCodeEnum.HTTP_404_NOT_FOUND, "当前用户不存在") return { "area": str(row["area"] or ""), "is_global": bool(row["is_global"]), "can_manage": bool(row["can_manage"]), "is_super_admin": bool(row["is_super_admin"]), } async def _loadReviewRunRow(self, Session, Detail: DocumentDetailVO) -> dict[str, Any] | None: """定位当前文档可展示的最新评查运行。""" params: dict[str, object] = {"document_id": Detail.documentId} if Detail.currentRunId is not None: params["run_id"] = Detail.currentRunId row = ( await Session.execute( text( """ SELECT id, status, result_status, total_score, passed_count, failed_count, skipped_count, llm_model, finished_at, updated_at FROM leaudit_audit_runs WHERE id = :run_id AND document_id = :document_id LIMIT 1 """ ), params, ) ).mappings().first() if row: return dict(row) row = ( await Session.execute( text( """ SELECT id, status, result_status, total_score, passed_count, failed_count, skipped_count, llm_model, finished_at, updated_at FROM leaudit_audit_runs WHERE document_id = :document_id ORDER BY id DESC LIMIT 1 """ ), params, ) ).mappings().first() return dict(row) if row else None async def _buildReviewDocumentPayload( self, Session, Detail: DocumentDetailVO, RunRow: dict[str, Any] | None, ) -> dict[str, Any]: """把新文档详情转换成旧详情页可直接消费的文档对象。""" metaRow = ( await Session.execute( text( """ SELECT d.created_at, f.created_by, COALESCE(u.nick_name, u.username, '') AS upload_user FROM leaudit_documents d JOIN leaudit_document_files f ON f.document_id = d.id AND f.is_active = true AND f.file_role = 'primary' LEFT JOIN sso_users u ON u.id = f.created_by WHERE d.id = :document_id LIMIT 1 """ ), {"document_id": Detail.documentId}, ) ).mappings().first() metricRow = None if RunRow: metricRow = ( await Session.execute( text( """ SELECT page_count FROM leaudit_run_metrics WHERE run_id = :run_id ORDER BY id DESC LIMIT 1 """ ), {"run_id": int(RunRow["id"])}, ) ).mappings().first() createdAt = metaRow["created_at"] if metaRow else None uploadUser = str(metaRow["upload_user"] or "") if metaRow else "" pageCount = int(metricRow["page_count"]) if metricRow and metricRow["page_count"] is not None else 0 ocrResultPayload = await self._buildReviewOcrPayload(Session, Detail.documentId, RunRow, pageCount) return { "id": Detail.documentId, "documentId": Detail.documentId, "name": Detail.fileName or Detail.normalizedName or f"文档{Detail.documentId}", "path": Detail.ossUrl or "", "fileName": Detail.fileName or "", "fileType": (Detail.fileExt or "").lower(), "size": Detail.fileSize, "file_size": Detail.fileSize, "documentNumber": Detail.documentNumber, "document_number": Detail.documentNumber, "type": str(Detail.typeId or ""), "type_id": Detail.typeId, "typeName": Detail.typeName, "type_name": Detail.typeName, "groupId": Detail.groupId, "groupName": Detail.groupName, "region": Detail.region, "auditStatus": Detail.auditStatus or 0, "audit_status": Detail.auditStatus or 0, "uploadTime": _format_iso_datetime(createdAt) or Detail.updatedAt or "", "created_at": _format_iso_datetime(createdAt) or Detail.updatedAt or "", "updated_at": Detail.updatedAt or "", "uploadUser": uploadUser, "pageCount": pageCount, "page_count": pageCount, "ocrResult": ocrResultPayload, "attachments": [item.model_dump() for item in Detail.attachments], } async def _buildReviewOcrPayload( self, Session, DocumentId: int, RunRow: dict[str, Any] | None, PageCount: int, ) -> dict[str, Any]: """构建旧详情页仍在读取的最小 ocrResult 结构。""" pageMap: dict[str, set[int]] = {} def add_page(field_name: str | None, page_value: Any) -> None: if not field_name: return page = _coerce_positive_int(page_value) if page is None: return section = str(field_name).split("-", 1)[0].strip() or "未分组" pageMap.setdefault(section, set()).add(page) if RunRow: fieldRows = ( await Session.execute( text( """ SELECT field_name, meta_json, raw_value_json FROM leaudit_field_results WHERE run_id = :run_id AND document_id = :document_id ORDER BY id ASC """ ), {"run_id": int(RunRow["id"]), "document_id": DocumentId}, ) ).mappings().all() for row in fieldRows: metaJson = row["meta_json"] if isinstance(row["meta_json"], dict) else {} rawValueJson = row["raw_value_json"] if isinstance(row["raw_value_json"], dict) else {} position = metaJson.get("position") if isinstance(metaJson.get("position"), dict) else {} add_page(row["field_name"], position.get("pageNum") or position.get("page")) rawPosition = rawValueJson.get("position") if isinstance(rawValueJson.get("position"), dict) else {} add_page(row["field_name"], rawPosition.get("pageNum") or rawPosition.get("page")) ruleRows = ( await Session.execute( text( """ SELECT extracted_fields, field_positions FROM leaudit_rule_results WHERE run_id = :run_id AND document_id = :document_id ORDER BY id ASC """ ), {"run_id": int(RunRow["id"]), "document_id": DocumentId}, ) ).mappings().all() for row in ruleRows: extractedFields = row["extracted_fields"] if isinstance(row["extracted_fields"], dict) else {} fieldPositions = row["field_positions"] if isinstance(row["field_positions"], dict) else {} for field_name, field_payload in extractedFields.items(): if isinstance(field_payload, dict): add_page(field_name, field_payload.get("page")) add_page(field_name, field_payload.get("pageNum")) position = fieldPositions.get(field_name) if isinstance(fieldPositions, dict) else None add_page(field_name, _extract_review_page(position)) normalizedMap = { key: {"pages": sorted(value)} for key, value in pageMap.items() if value } return { "__meta": {"page_offset": 0, "page_count": PageCount}, "ocr_result": normalizedMap, } async def _loadComparisonDocument(self, Session, DocumentId: int) -> dict[str, Any]: """读取合同结构比对结果;缺表或空数据时降级。""" if not await self._tableExists(Session, "contract_structure_comparison"): return {"template_contract_path": ""} columns = await self._loadTableColumns(Session, "contract_structure_comparison") selectColumns: list[str] = ["id", "document_id"] optionalColumns = [ "template_contract_path", "comparison_results", "created_at", "updated_at", "comparison_id", ] for column in optionalColumns: if column in columns: selectColumns.append(column) row = ( await Session.execute( text( f""" SELECT {', '.join(selectColumns)} FROM contract_structure_comparison WHERE document_id = :document_id ORDER BY id DESC LIMIT 1 """ ), {"document_id": DocumentId}, ) ).mappings().first() if not row: return {"template_contract_path": ""} payload = dict(row) rawResults = payload.get("comparison_results") if isinstance(rawResults, str): import json try: payload["comparison_results"] = json.loads(rawResults) except Exception: payload["comparison_results"] = None if "comparisonId" not in payload: payload["comparisonId"] = payload.get("comparison_id") or payload.get("id") if "template_contract_path" not in payload: payload["template_contract_path"] = "" return payload async def _loadScoringProposals(self, Session, DocumentId: int) -> list[dict[str, Any]]: """读取交叉评分提案;缺表时降级为空。""" if await self._tableExists(Session, "leaudit_cross_review_proposals"): rows = ( await Session.execute( text( """ SELECT id, rule_result_id AS evaluation_result_id, proposer_id, proposed_score_delta AS proposed_score, reason, status, create_time AS created_at, update_time AS updated_at, document_id FROM leaudit_cross_review_proposals WHERE document_id = :document_id AND delete_time IS NULL ORDER BY id DESC """ ), {"document_id": DocumentId}, ) ).mappings().all() if rows: return [dict(row) for row in rows] if not await self._tableExists(Session, "cross_scoring_proposals"): return [] columns = await self._loadTableColumns(Session, "cross_scoring_proposals") selectColumns = [ column for column in ( "id", "evaluation_result_id", "proposer_id", "proposed_score", "reason", "status", "created_at", "updated_at", "document_id", "deleted_at", ) if column in columns ] if not selectColumns: return [] filterClauses = ["document_id = :document_id"] if "deleted_at" in columns: filterClauses.append("deleted_at IS NULL") rows = ( await Session.execute( text( f""" SELECT {', '.join(selectColumns)} FROM cross_scoring_proposals WHERE {' AND '.join(filterClauses)} ORDER BY id DESC """ ), {"document_id": DocumentId}, ) ).mappings().all() return [dict(row) for row in rows] async def _loadReviewPointResults( self, Session, Detail: DocumentDetailVO, RunId: int, ) -> list[ReviewPointResultVO]: """读取评查运行下的规则结果并转换成前端详情页结构。""" rows = ( await Session.execute( text( """ SELECT rr.id, rr.rule_id, rr.rule_name, rr.risk, rr.score, rr.passed, rr.status, rr.skip_reason, rr.pass_message, rr.fail_message, rr.stages, rr.extracted_fields, rr.field_positions, rr.remediation, rr.rule_meta, rr.updated_at, a.id AS audit_id, a.edit_audit_status, a.override_result, a.message AS audit_message FROM leaudit_rule_results rr LEFT JOIN leaudit_review_point_audits a ON a.rule_result_id = rr.id WHERE rr.run_id = :run_id AND rr.document_id = :document_id ORDER BY id ASC """ ), {"run_id": RunId, "document_id": Detail.documentId}, ) ).mappings().all() rows = await self._backfillMissingReviewFieldPositions( Session, Detail.documentId, RunId, rows, ) result: list[ReviewPointResultVO] = [] for row in rows: extractedFields = row["extracted_fields"] if isinstance(row["extracted_fields"], dict) else {} fieldPositions = row["field_positions"] if isinstance(row["field_positions"], dict) else {} ruleMeta = row["rule_meta"] if isinstance(row["rule_meta"], dict) else {} remediation = row["remediation"] if isinstance(row["remediation"], dict) else {} rawStages = row["stages"] if isinstance(row["stages"], list) else [] stages = _enrich_stage_payload_with_positions(rawStages, fieldPositions) score = float(row["score"]) if row["score"] is not None else 0.0 auditedResult = row["override_result"] if row["edit_audit_status"] == 1 else None effectivePassed = auditedResult if auditedResult is not None else row["passed"] effectiveStatus = row["status"] if auditedResult is not None: effectiveStatus = "passed" if auditedResult else ("failed" if row["status"] != "error" else "error") resultFlag = None if effectiveStatus not in {"skipped", "not_applicable"}: resultFlag = bool(effectivePassed) if effectivePassed is not None else None title = self._buildReviewPointTitle(row) if row["edit_audit_status"] == 1 and row["audit_message"]: title = str(row["audit_message"]) result.append( ReviewPointResultVO( id=int(row["id"]), documentId=str(Detail.documentId), pointId=int(row["id"]), editAuditStatusId=int(row["audit_id"]) if row["audit_id"] is not None else "", editAuditStatus=int(row["edit_audit_status"] or 0), editAuditStatusMessage=str(row["audit_message"] or ""), title=title, pointName=str(row["rule_name"] or row["rule_id"] or ""), pointCode=str(row["rule_id"] or ""), groupName=str(ruleMeta.get("group") or Detail.groupName or Detail.typeName or "未分组"), status=_map_rule_status(effectiveStatus, effectivePassed, row["risk"]), content=_normalize_review_content(extractedFields, fieldPositions), contentPage=_normalize_review_content_pages(extractedFields, fieldPositions), suggestion=_extract_review_suggestion(remediation, row["fail_message"], row["pass_message"]), postAction="manual" if effectivePassed is not True else "", actionContent=remediation or {}, legalBasis=ruleMeta.get("references_laws") or {}, evaluationConfig={"risk": row["risk"], "status": effectiveStatus, "ruleMeta": ruleMeta}, score=score, finalScore=score if resultFlag is True else (0.0 if resultFlag is False else None), machineScore=score, result=resultFlag, failMessage=str(row["fail_message"] or ""), passMessage=str(row["pass_message"] or ""), evaluatedPointResultsLog={"rules": _normalize_review_stage_logs(stages)}, ) ) return result async def _backfillMissingReviewFieldPositions( self, Session, DocumentId: int, RunId: int, rows: list[dict[str, Any]], ) -> list[dict[str, Any]]: """详情页兜底补齐缺失页码,避免长文本字段只能按文本定位。""" missingFields: dict[str, str] = {} touchedRuleRows: dict[int, dict[str, Any]] = {} for rawRow in rows: row = dict(rawRow) extractedFields = row["extracted_fields"] if isinstance(row.get("extracted_fields"), dict) else {} fieldPositions = dict(row["field_positions"]) if isinstance(row.get("field_positions"), dict) else {} row["field_positions"] = fieldPositions rowTouched = False for fieldName, rawValue in extractedFields.items(): if _extract_review_page(fieldPositions.get(fieldName)) is not None: continue valueText = _stringify_char_position_value(_normalize_review_value(rawValue)) if not valueText.strip(): continue existing = missingFields.get(str(fieldName)) if existing is None or len(valueText) > len(existing): missingFields[str(fieldName)] = valueText rowTouched = True if rowTouched and row.get("id") is not None: touchedRuleRows[int(row["id"])] = row if not missingFields: return rows inferredPositions = await self._inferFieldPositionsFromDocumentSource( Session, DocumentId, missingFields, ) if not inferredPositions: return rows for ruleRowId, row in touchedRuleRows.items(): fieldPositions = row["field_positions"] if isinstance(row.get("field_positions"), dict) else {} changed = False extractedFields = row["extracted_fields"] if isinstance(row.get("extracted_fields"), dict) else {} for fieldName in extractedFields.keys(): key = str(fieldName) if _extract_review_page(fieldPositions.get(key)) is not None: continue inferred = inferredPositions.get(key) if inferred is None: continue fieldPositions[key] = inferred changed = True if not changed: continue await Session.execute( text( """ UPDATE leaudit_rule_results SET field_positions = CAST(:field_positions AS JSONB), updated_at = NOW() WHERE id = :rule_result_id """ ), { "rule_result_id": ruleRowId, "field_positions": json.dumps(fieldPositions, ensure_ascii=False), }, ) for fieldName, position in inferredPositions.items(): await Session.execute( text( """ UPDATE leaudit_field_results SET meta_json = jsonb_set( COALESCE(meta_json, '{}'::jsonb), '{position}', CAST(:position AS JSONB), true ), updated_at = NOW() WHERE run_id = :run_id AND document_id = :document_id AND field_name = :field_name AND ( meta_json IS NULL OR meta_json->'position' IS NULL OR meta_json->'position' = 'null'::jsonb ) """ ), { "run_id": RunId, "document_id": DocumentId, "field_name": fieldName, "position": json.dumps(position, ensure_ascii=False), }, ) await Session.commit() return rows async def _inferFieldPositionsFromDocumentSource( self, Session, DocumentId: int, FieldTexts: dict[str, str], ) -> dict[str, dict[str, Any]]: """按当前文档源文件逐页匹配字段文本,推导最小可用 pageNum。""" fileRow = ( await Session.execute( text( """ SELECT id, file_name, file_ext, local_path, oss_url, file_role FROM leaudit_document_files WHERE document_id = :document_id AND is_active = true ORDER BY CASE file_role WHEN 'converted_pdf' THEN 0 WHEN 'merged_pdf' THEN 1 WHEN 'primary' THEN 2 ELSE 9 END, id DESC LIMIT 1 """ ), {"document_id": DocumentId}, ) ).mappings().first() if not fileRow: return {} fileName = str(fileRow["file_name"] or "") fileExt = f".{str(fileRow['file_ext']).lstrip('.')}" if fileRow.get("file_ext") else Path(fileName).suffix tempPath: str | None = None localPath = str(fileRow["local_path"] or "").strip() try: if localPath and Path(localPath).is_file(): sourcePath = Path(localPath) else: ossUrl = str(fileRow["oss_url"] or "").strip() if not ossUrl: return {} downloaded = await self.OssService.DownloadToTempFile( ossUrl, Suffix=fileExt or Path(fileName).suffix or "", Prefix=f"review-page-{DocumentId}-", ) tempPath = downloaded sourcePath = Path(downloaded) suffix = sourcePath.suffix.lower() if suffix == ".pdf": pageTexts = _extract_page_texts_from_pdf(sourcePath) elif suffix == ".docx": pageTexts = _extract_page_texts_from_docx(sourcePath) else: return {} if not pageTexts: return {} inferred: dict[str, dict[str, Any]] = {} for fieldName, fieldText in FieldTexts.items(): pageNum = _infer_page_num_from_page_texts(fieldText, pageTexts) if pageNum is None: continue inferred[fieldName] = {"pageNum": pageNum, "matchMethod": "detail_page_fallback"} return inferred except Exception: return {} finally: if tempPath: try: Path(tempPath).unlink(missing_ok=True) except OSError: pass def _buildReviewPointStats(self, ReviewPoints: list[ReviewPointResultVO]) -> ReviewPointStatsVO: """按旧详情页口径汇总统计。""" stats = ReviewPointStatsVO(total=len(ReviewPoints), success=0, warning=0, error=0, score=0) for item in ReviewPoints: if item.status == "success": stats.success += 1 elif item.status == "warning": stats.warning += 1 elif item.status == "error": stats.error += 1 stats.score += float(item.score or 0) return stats def _buildReviewInfo( self, RunRow: dict[str, Any] | None, ReviewPoints: list[ReviewPointResultVO], Stats: ReviewPointStatsVO, ) -> ReviewPointInfoVO: """构建详情页右侧的评查摘要。""" groupNames = [item.groupName for item in ReviewPoints if item.groupName] uniqueGroups = list(dict.fromkeys(groupNames)) reviewTime = "" reviewModel = "DeepSeek" if RunRow: reviewTime = _format_display_datetime(RunRow.get("finished_at") or RunRow.get("updated_at")) reviewModel = str(RunRow.get("llm_model") or "DeepSeek") issueCount = Stats.warning + Stats.error return ReviewPointInfoVO( reviewTime=reviewTime, reviewModel=reviewModel, ruleGroup="、".join(uniqueGroups), result="warning" if issueCount > 0 else "success", issueCount=issueCount, ) def _buildReviewPointTitle(self, Row: dict[str, Any]) -> str: """生成详情页展示标题。""" if Row.get("passed") is True: return str(Row.get("pass_message") or Row.get("rule_name") or Row.get("rule_id") or "") if Row.get("fail_message"): return str(Row["fail_message"]) if Row.get("skip_reason"): return str(Row["skip_reason"]) return str(Row.get("rule_name") or Row.get("rule_id") or "") async def _syncRuleBindings(self, Session, DocTypeId: int, RuleSetIds: list[int], Region: str = "default") -> None: """全量替换规则绑定。""" await Session.execute( text("UPDATE leaudit_rule_type_bindings SET deleted_at = NOW() WHERE doc_type_id = :id AND deleted_at IS NULL"), {"id": DocTypeId}, ) for idx, ruleSetId in enumerate(RuleSetIds): await Session.execute( text( """ INSERT INTO leaudit_rule_type_bindings (doc_type_id, rule_set_id, binding_mode, priority, region, is_active, created_at, updated_at) VALUES (:doc_type_id, :rule_set_id, 'explicit', :priority, :region, true, NOW(), NOW()) """ ), {"doc_type_id": DocTypeId, "rule_set_id": ruleSetId, "priority": 100 - idx, "region": Region}, ) async def _find_latest_version_candidate( session, *, type_id: int, root_group_id: int | None, region: str, normalized_name: str, ) -> dict | None: """Find the latest primary document version candidate by normalized name. Preferred rule: same region + same root group + same normalized name. Fallback rule: when a root group cannot be resolved, keep the old same-type behavior. """ if root_group_id is not None: result = await session.execute( text( """ SELECT d.id AS document_id, d.version_group_key, d.version_no, d.root_version_id, f.id AS file_id, f.sha256 FROM leaudit_documents d JOIN leaudit_document_files f ON f.document_id = d.id AND f.is_active = true AND f.file_role = 'primary' LEFT JOIN leaudit_evaluation_point_groups eg ON eg.id = d.group_id LEFT JOIN ( SELECT document_type_id, CASE WHEN COUNT(DISTINCT pid) = 1 THEN MIN(pid) END AS inferred_root_group_id FROM leaudit_evaluation_point_groups WHERE COALESCE(pid, 0) <> 0 AND deleted_at IS NULL AND is_enabled = true AND document_type_id IS NOT NULL GROUP BY document_type_id ) dg ON dg.document_type_id = d.type_id WHERE d.region = :region AND d.normalized_name = :normalized_name AND d.is_latest_version = true AND d.deleted_at IS NULL AND COALESCE( CASE WHEN eg.id IS NULL THEN NULL WHEN COALESCE(eg.pid, 0) = 0 THEN eg.id ELSE eg.pid END, dg.inferred_root_group_id ) = :root_group_id ORDER BY d.version_no DESC, d.id DESC LIMIT 1 """ ), { "root_group_id": root_group_id, "region": region, "normalized_name": normalized_name, }, ) row = result.mappings().first() if row: return dict(row) result = await session.execute( text( """ SELECT d.id AS document_id, d.version_group_key, d.version_no, d.root_version_id, f.id AS file_id, f.sha256 FROM leaudit_documents d JOIN leaudit_document_files f ON f.document_id = d.id AND f.is_active = true AND f.file_role = 'primary' WHERE d.type_id = :type_id AND d.region = :region AND d.normalized_name = :normalized_name AND d.is_latest_version = true AND d.deleted_at IS NULL ORDER BY d.version_no DESC, d.id DESC LIMIT 1 """ ), { "type_id": type_id, "region": region, "normalized_name": normalized_name, }, ) row = result.mappings().first() return dict(row) if row else None def _map_rule_status(status: str | None, passed: bool | None, risk: str | None) -> str: """把新规则结果状态转换成旧详情页口径。""" normalized = (status or "").strip().lower() if normalized in {"skipped", "not_applicable"}: return "notApplicable" if passed is True or normalized in {"passed", "success"}: return "success" if normalized in {"error", "failed", "fail"}: return "error" if (risk or "").strip().lower() == "high" else "warning" return "warning" if passed is False else "success" def _normalize_review_content( extracted_fields: dict[str, Any], field_positions: dict[str, Any], ) -> dict[str, dict[str, Any]]: """归一化字段内容,兼容旧详情页结构。""" content: dict[str, dict[str, Any]] = {} for field_name, raw_value in extracted_fields.items(): position = field_positions.get(field_name) if isinstance(field_positions, dict) else None page = _extract_review_page(position) char_positions = _build_char_positions(position, raw_value) content[str(field_name)] = { "page": page if page is not None else "", "value": _normalize_review_value(raw_value), "char_positions": char_positions, } return content def _normalize_review_content_pages( extracted_fields: dict[str, Any], field_positions: dict[str, Any], ) -> dict[str, str]: """提取字段页码映射。""" pages: dict[str, str] = {} for field_name in extracted_fields.keys(): position = field_positions.get(field_name) if isinstance(field_positions, dict) else None page = _extract_review_page(position) pages[str(field_name)] = str(page) if page is not None else "" return pages def _normalize_review_value(raw_value: Any) -> Any: """尽量保留原值,同时把复杂对象压到旧前端能识别的结构。""" if isinstance(raw_value, dict): if "value" in raw_value: return raw_value["value"] if "text" in raw_value: return raw_value["text"] if "value_text" in raw_value: return raw_value["value_text"] return raw_value return raw_value def _extract_review_page(position: Any) -> int | None: """从字段位置信息中提取页码。""" if not isinstance(position, dict): return None page_num = position.get("pageNum") if isinstance(page_num, int): return page_num if isinstance(page_num, str) and page_num.isdigit(): return int(page_num) page_nums = position.get("pageNums") if isinstance(page_nums, list) and page_nums: first = page_nums[0] if isinstance(first, int): return first if isinstance(first, str) and first.isdigit(): return int(first) match_position = position.get("matchPosition") if isinstance(match_position, dict): for key in ("page", "pageNum"): value = match_position.get(key) if isinstance(value, int): return value if isinstance(value, str) and value.isdigit(): return int(value) return None def _extract_review_suggestion(remediation: dict[str, Any], fail_message: Any, pass_message: Any) -> str: """从 remediation 中提取适合旧详情页展示的一句建议。""" suggestions = remediation.get("suggestions") if isinstance(suggestions, list) and suggestions: first = suggestions[0] if isinstance(first, str): return first if isinstance(first, dict): return str(first.get("text") or first.get("message") or first.get("suggestion") or "") actions = remediation.get("actions") if isinstance(actions, list) and actions: first = actions[0] if isinstance(first, str): return first if isinstance(first, dict): return str(first.get("text") or first.get("message") or first.get("action") or "") if fail_message: return str(fail_message) if pass_message: return str(pass_message) return "" def _normalize_review_stage_logs(stages: list[Any]) -> list[dict[str, Any]]: """把 stages 压成旧前端的 evaluatedPointResultsLog.rules。""" result: list[dict[str, Any]] = [] for index, stage in enumerate(stages): if not isinstance(stage, dict): continue result.append( { "id": str(stage.get("id") or index), "type": str(stage.get("type") or stage.get("stage") or stage.get("name") or "rule"), "res": stage.get("passed"), "config": stage, } ) return result def _enrich_stage_payload_with_positions(payload: Any, field_positions: dict[str, Any]) -> Any: """递归给 stage payload 注入最小可用 char_positions。""" if isinstance(payload, list): return [_enrich_stage_payload_with_positions(item, field_positions) for item in payload] if not isinstance(payload, dict): return payload enriched: dict[str, Any] = {} for key, value in payload.items(): enriched[key] = _enrich_stage_payload_with_positions(value, field_positions) if isinstance(value, dict): position = field_positions.get(key) if isinstance(field_positions, dict) else None if position: if ("page" not in enriched[key] or not enriched[key].get("page")): page = _extract_review_page(position) if page is not None: enriched[key]["page"] = page if "char_positions" not in enriched[key]: char_positions = _build_char_positions(position, value.get("value")) if char_positions: enriched[key]["char_positions"] = char_positions return enriched def _build_char_positions(position: Any, raw_value: Any) -> list[dict[str, Any]] | None: """把后端已有 bbox / matchPosition 压成前端高亮所需的 char_positions。""" if not isinstance(position, dict): return None bbox = position.get("bbox") if bbox is None and isinstance(position.get("matchPosition"), dict): bbox = position["matchPosition"].get("bbox") normalized_box = _normalize_bbox_to_points(bbox) if not normalized_box: return None value_text = _stringify_char_position_value(raw_value) return [ { "box": normalized_box, "char": value_text, "score": 1, } ] def _normalize_bbox_to_points(bbox: Any) -> list[list[float]] | None: """兼容多种 bbox 形态,统一转成四点坐标。""" if not bbox: return None if ( isinstance(bbox, list) and len(bbox) == 4 and all(isinstance(item, (list, tuple)) and len(item) >= 2 for item in bbox) ): return [[float(point[0]), float(point[1])] for point in bbox] if isinstance(bbox, (list, tuple)) and len(bbox) >= 4 and all(isinstance(item, (int, float)) for item in bbox[:4]): x1, y1, x2, y2 = [float(item) for item in bbox[:4]] return [[x1, y1], [x2, y1], [x2, y2], [x1, y2]] if isinstance(bbox, dict): if all(key in bbox for key in ("x1", "y1", "x2", "y2")): x1 = float(bbox["x1"]) y1 = float(bbox["y1"]) x2 = float(bbox["x2"]) y2 = float(bbox["y2"]) return [[x1, y1], [x2, y1], [x2, y2], [x1, y2]] if all(key in bbox for key in ("left", "top", "right", "bottom")): x1 = float(bbox["left"]) y1 = float(bbox["top"]) x2 = float(bbox["right"]) y2 = float(bbox["bottom"]) return [[x1, y1], [x2, y1], [x2, y2], [x1, y2]] return None def _stringify_char_position_value(raw_value: Any) -> str: """将字段值压成前端高亮文本。""" normalized = _normalize_review_value(raw_value) if normalized is None: return "" if isinstance(normalized, str): return normalized if isinstance(normalized, (int, float, bool)): return str(normalized) return "" _REVIEW_TEXT_PUNCT_TABLE = str.maketrans({ ",": ",", "。": ".", ";": ";", ":": ":", "!": "!", "?": "?", "(": "(", ")": ")", "【": "[", "】": "]", "「": '"', "」": '"', "‘": "'", "’": "'", "“": '"', "”": '"', }) def _normalize_review_match_text(text: str) -> str: """统一比较文本,尽量消除 OCR/文档解析带来的空白和标点差异。""" if not text: return "" normalized = re.sub(r"<[^>]+>", "", str(text)) normalized = re.sub(r"\s+", "", normalized) return normalized.translate(_REVIEW_TEXT_PUNCT_TABLE) def _infer_page_num_from_page_texts(field_text: str, page_texts: list[tuple[int, str]]) -> int | None: """根据字段文本在逐页正文中的命中情况推导 1-based 页码。""" normalizedField = _normalize_review_match_text(field_text) if not normalizedField: return None bestPage: int | None = None bestScore = 0.0 fieldLength = len(normalizedField) for pageNum, pageText in page_texts: normalizedPage = _normalize_review_match_text(pageText) if not normalizedPage: continue if normalizedField in normalizedPage: return pageNum if fieldLength < 8: continue ratio = _partial_similarity(normalizedField, normalizedPage) if ratio > bestScore: bestScore = ratio bestPage = pageNum if bestScore >= 0.92: return bestPage return None def _partial_similarity(needle: str, haystack: str) -> float: """简化版 partial similarity,避免详情页兜底依赖额外包。""" if not needle or not haystack: return 0.0 if len(needle) > len(haystack): needle, haystack = haystack, needle window = len(needle) if window <= 0: return 0.0 if needle == haystack: return 1.0 best = 0.0 step = max(1, window // 6) stop = max(len(haystack) - window + 1, 1) for start in range(0, stop, step): chunk = haystack[start : start + window] if not chunk: continue matches = sum(1 for left, right in zip(needle, chunk) if left == right) best = max(best, matches / window) if best >= 0.999: return 1.0 return best def _extract_page_texts_from_pdf(path: Path) -> list[tuple[int, str]]: """平台侧直接从 PDF 提取逐页文本,避免依赖 leaudit 内核。""" import fitz doc = fitz.open(path) try: page_texts: list[tuple[int, str]] = [] for index in range(len(doc)): text = doc[index].get_text("text") or "" if text.strip(): page_texts.append((index + 1, text)) return page_texts finally: doc.close() def _extract_page_texts_from_docx(path: Path) -> list[tuple[int, str]]: """DOCX 无稳定真实分页时,平台侧仅退化为整文第 1 页定位。""" from docx import Document as DocxDocument doc = DocxDocument(str(path)) parts: list[str] = [] for para in doc.paragraphs: text = (para.text or "").strip() if text: parts.append(text) for table in doc.tables: for row in table.rows: cells = [cell.text.strip() for cell in row.cells if cell.text and cell.text.strip()] if cells: parts.append(" | ".join(cells)) text = "\n".join(parts).strip() return [(1, text)] if text else [] def _format_display_datetime(value: Any) -> str: """格式化详情页展示时间。""" if isinstance(value, datetime): return value.strftime("%Y-%m-%d %H:%M:%S") return "" def _format_iso_datetime(value: Any) -> str | None: """格式化 ISO 时间字符串。""" if isinstance(value, datetime): return value.isoformat() return None def _coerce_positive_int(value: Any) -> int | None: """把各种页码值安全转成正整数。""" if isinstance(value, int): return value if value > 0 else None if isinstance(value, float): int_value = int(value) return int_value if int_value > 0 else None if isinstance(value, str): match = re.search(r"\d+", value) if match: int_value = int(match.group(0)) return int_value if int_value > 0 else None return None def _normalize_speed(speed: str | None) -> str: """Normalize front-end speed selection to urgent/normal.""" normalized = (speed or "").strip().lower() if normalized in {"urgent", "high", "fast", "emergency", "紧急"}: return "urgent" return "normal" def _normalize_document_name(file_name: str) -> str: """Build a stable name key for same-name version matching.""" stem = Path(file_name).stem name = unicodedata.normalize("NFKC", stem).strip().lower() name = re.sub(r"[\s_\-]+", " ", name) name = re.sub(r"(?:\(|()\d+(?:\)|))$", "", name).strip() name = re.sub(r"(?:[-_\s]*副本|[-_\s]*copy)$", "", name).strip() name = re.sub(r"\s+", " ", name).strip() return name or "untitled"