diff --git a/fastapi_modules/fastapi_leaudit/controllers/documentController.py b/fastapi_modules/fastapi_leaudit/controllers/documentController.py index 2450d01..6506067 100644 --- a/fastapi_modules/fastapi_leaudit/controllers/documentController.py +++ b/fastapi_modules/fastapi_leaudit/controllers/documentController.py @@ -2,21 +2,30 @@ from typing import Any -from fastapi import File, Form, Query, UploadFile +from fastapi import Depends, File, Form, Query, UploadFile from pydantic import BaseModel, Field from sqlalchemy import text from fastapi_common.fastapi_common_sqlalchemy.database import GetAsyncSession from fastapi_common.fastapi_common_web.controller import BaseController from fastapi_common.fastapi_common_web.domain.responses import Result +from fastapi_common.fastapi_common_security.security import verify_access_token from fastapi_modules.fastapi_leaudit.domian.vo.documentVo import ( + DocumentDetailVO, DocumentListPageVO, + DocumentStatusItemVO, + DocumentUpdateDTO, DocumentTypeCreateDTO, DocumentTypeItemVO, DocumentTypeUpdateDTO, DocumentUploadVO, ) +from fastapi_modules.fastapi_leaudit.domian.vo.reviewPointVo import ( + DocumentConfirmVO, + ReviewPointAuditVO, + ReviewPointsAggregateVO, +) from fastapi_modules.fastapi_leaudit.services import IDocumentService from fastapi_modules.fastapi_leaudit.services.impl.documentServiceImpl import DocumentServiceImpl @@ -28,6 +37,11 @@ class QueueStatusVO(BaseModel): documents: dict[str, object] = Field(default_factory=lambda: {"waiting": 0, "processing": 0, "processing_ids": []}) +class ReviewPointAuditDTO(BaseModel): + result: str = Field(..., description="true/false/review") + message: str = Field("", description="审核意见") + + class DocumentController(BaseController): """文档控制器。""" @@ -38,25 +52,40 @@ class DocumentController(BaseController): @self.router.post("/upload", response_model=Result[DocumentUploadVO]) async def UploadDocument( file: UploadFile = File(..., description="上传文档"), + attachments: list[UploadFile] | None = File(None, description="合同附件列表"), typeId: int | None = Form(None, description="文档类型ID"), typeCode: str | None = Form(None, description="文档类型编码"), + groupId: int | None = Form(None, description="二级分组ID"), region: str = Form("default", description="所属地区"), fileRole: str = Form("primary", description="文件角色"), createdBy: int | None = Form(None, description="上传用户ID"), autoRun: bool = Form(False, description="是否上传后自动触发评查"), speed: str = Form("normal", description="执行速度档位:urgent/normal"), + payload: dict[str, Any] = Depends(verify_access_token), ): """上传文档并建立评查输入。""" Content = await file.read() + attachmentPayloads: list[tuple[str, bytes, str | None]] = [] + for attachment in attachments or []: + attachmentContent = await attachment.read() + attachmentPayloads.append( + ( + attachment.filename or "attachment.bin", + attachmentContent, + attachment.content_type, + ) + ) Data = await self.DocumentService.Upload( FileName=file.filename or "upload.bin", FileContent=Content, ContentType=file.content_type, TypeId=typeId, TypeCode=typeCode, + GroupId=groupId, Region=region, FileRole=fileRole, - CreatedBy=createdBy, + CreatedBy=int(payload["user_id"]), + Attachments=attachmentPayloads, AutoRun=autoRun, Speed=speed, ) @@ -67,38 +96,145 @@ class DocumentController(BaseController): page: int = 1, pageSize: int = 20, keyword: str | None = None, + documentNumber: str | None = None, typeCode: str | None = None, + type_ids: str | None = Query(None, description="逗号分隔的文档类型ID列表"), + entry_module_id: int | None = Query(None, description="按入口模块ID过滤文档"), region: str | None = None, processingStatus: str | None = None, resultStatus: str | None = None, + auditStatus: int | None = Query(None, description="按人工审核状态过滤"), userId: int | None = Query(None, description="按用户ID过滤"), dateFrom: str | None = Query(None, description="起始日期 (YYYY-MM-DD)"), dateTo: str | None = Query(None, description="结束日期 (YYYY-MM-DD)"), + payload: dict[str, Any] = Depends(verify_access_token), ): - """获取文档列表(仅返回最新版本,附历史版本摘要)。""" + """获取文档列表(带数据隔离:省级全量、地市仅本地区、普通用户仅自己)。""" + typeIdList: list[int] | None = None + if type_ids: + typeIdList = [int(x.strip()) for x in type_ids.split(",") if x.strip().isdigit()] Data = await self.DocumentService.ListDocuments( + CurrentUserId=int(payload["user_id"]), Page=page, PageSize=pageSize, Keyword=keyword, + DocumentNumber=documentNumber, TypeCode=typeCode, + TypeIds=typeIdList, + EntryModuleId=entry_module_id, Region=region, ProcessingStatus=processingStatus, ResultStatus=resultStatus, + AuditStatus=auditStatus, UserId=userId, DateFrom=dateFrom, DateTo=dateTo, ) return Result.success(data=Data) + @self.router.get("/documents/status", response_model=Result[list[DocumentStatusItemVO]]) + async def GetDocumentsStatus( + ids: str = Query(..., description="逗号分隔的文档ID列表"), + payload: dict[str, Any] = Depends(verify_access_token), + ): + """批量获取文档状态(带数据隔离校验)。""" + idList = [int(x.strip()) for x in ids.split(",") if x.strip().isdigit()] + Data = await self.DocumentService.GetDocumentsStatus(CurrentUserId=int(payload["user_id"]), Ids=idList) + return Result.success(data=Data) + + @self.router.get("/documents/{DocumentId}", response_model=Result[DocumentDetailVO]) + async def GetDocument( + DocumentId: int, + payload: dict[str, Any] = Depends(verify_access_token), + ): + """获取单个文档详情(带数据隔离校验)。""" + Data = await self.DocumentService.GetDocument(CurrentUserId=int(payload["user_id"]), Id=DocumentId) + return Result.success(data=Data) + + @self.router.get("/v3/review-points/{DocumentId}", response_model=Result[ReviewPointsAggregateVO]) + async def GetReviewPoints( + DocumentId: int, + payload: dict[str, Any] = Depends(verify_access_token), + ): + """获取评查详情页聚合数据(带数据隔离校验)。""" + Data = await self.DocumentService.GetReviewPoints(CurrentUserId=int(payload["user_id"]), DocumentId=DocumentId) + return Result.success(data=Data) + + @self.router.patch("/v3/review-points/{ReviewPointResultId}/audit", response_model=Result[ReviewPointAuditVO]) + async def AuditReviewPoint( + ReviewPointResultId: int, + Body: ReviewPointAuditDTO, + payload: dict[str, Any] = Depends(verify_access_token), + ): + """更新单个评查点的人工审核状态。""" + Data = await self.DocumentService.AuditReviewPoint( + CurrentUserId=int(payload["user_id"]), + ReviewPointResultId=ReviewPointResultId, + Result=Body.result, + Message=Body.message, + ) + return Result.success(data=Data, message="评查点审核状态已更新") + + @self.router.patch("/v3/documents/{DocumentId}/confirm", response_model=Result[DocumentConfirmVO]) + async def ConfirmReviewResults( + DocumentId: int, + payload: dict[str, Any] = Depends(verify_access_token), + ): + """确认文档评查结果。""" + Data = await self.DocumentService.ConfirmReviewResults(CurrentUserId=int(payload["user_id"]), DocumentId=DocumentId) + return Result.success(data=Data, message="评查结果已确认") + + @self.router.post("/documents/{DocumentId}/attachments", response_model=Result[DocumentDetailVO]) + async def AppendAttachments( + DocumentId: int, + files: list[UploadFile] = File(..., description="附件文件列表"), + payload: dict[str, Any] = Depends(verify_access_token), + ): + """为现有文档追加附件(带数据隔离校验)。""" + filePayloads: list[tuple[str, bytes, str | None]] = [] + for file in files: + content = await file.read() + filePayloads.append((file.filename or "attachment.bin", content, file.content_type)) + Data = await self.DocumentService.AppendAttachments( + CurrentUserId=int(payload["user_id"]), + Id=DocumentId, + Files=filePayloads, + ) + return Result.success(data=Data, message="附件上传成功") + + @self.router.put("/documents/{DocumentId}", response_model=Result[DocumentDetailVO]) + async def UpdateDocument( + DocumentId: int, + Body: DocumentUpdateDTO, + payload: dict[str, Any] = Depends(verify_access_token), + ): + """更新文档元数据(带数据隔离校验)。""" + Data = await self.DocumentService.UpdateDocument( + CurrentUserId=int(payload["user_id"]), + Id=DocumentId, + Body=Body, + ) + return Result.success(data=Data, message="文档更新成功") + + @self.router.delete("/documents/{DocumentId}", response_model=Result[None]) + async def DeleteDocument( + DocumentId: int, + payload: dict[str, Any] = Depends(verify_access_token), + ): + """软删除文档(带数据隔离校验)。""" + await self.DocumentService.DeleteDocument(CurrentUserId=int(payload["user_id"]), Id=DocumentId) + return Result.success(message="文档已删除") + @self.router.get("/document-types", response_model=Result[list[DocumentTypeItemVO]]) async def ListDocumentTypes( ids: str | None = Query(None, description="逗号分隔的ID列表,不传则返回全部"), + entry_module_id: int | None = Query(None, description="按入口模块ID过滤文档类型"), ): """获取文档类型列表。""" idList: list[int] | None = None if ids: idList = [int(x.strip()) for x in ids.split(",") if x.strip().isdigit()] - Data = await self.DocumentService.ListDocumentTypes(Ids=idList) + Data = await self.DocumentService.ListDocumentTypes(Ids=idList, EntryModuleId=entry_module_id) return Result.success(data=Data) @self.router.get("/document-types/{TypeId}", response_model=Result[DocumentTypeItemVO]) diff --git a/fastapi_modules/fastapi_leaudit/domian/vo/reviewPointVo.py b/fastapi_modules/fastapi_leaudit/domian/vo/reviewPointVo.py new file mode 100644 index 0000000..4ab7af4 --- /dev/null +++ b/fastapi_modules/fastapi_leaudit/domian/vo/reviewPointVo.py @@ -0,0 +1,85 @@ +"""评查详情聚合 VO。""" + +from __future__ import annotations + +from typing import Any + +from pydantic import BaseModel, Field + + +class ReviewPointResultVO(BaseModel): + """前端评查详情页所需的单条评查点结果。""" + + id: str | int = Field(..., description="评查结果ID") + documentId: str = Field(..., description="文档ID") + pointId: str | int = Field(..., description="评查点/规则ID") + editAuditStatusId: str | int = Field("", description="人工审核状态ID") + editAuditStatus: int = Field(0, description="人工审核状态") + editAuditStatusMessage: str = Field("", description="人工审核意见") + title: str = Field("", description="标题") + pointName: str = Field("", description="评查点名称") + pointCode: str = Field("", description="评查点编码") + groupName: str = Field("", description="所属分组名") + status: str = Field("success", description="success/warning/error/notApplicable") + content: dict[str, Any] = Field(default_factory=dict, description="抽取字段内容") + contentPage: dict[str, str] = Field(default_factory=dict, description="字段页码映射") + suggestion: str = Field("", description="建议内容") + postAction: str = Field("", description="后续动作") + actionContent: Any = Field(default_factory=dict, description="动作配置") + legalBasis: Any = Field(default_factory=dict, description="法律依据") + evaluationConfig: Any = Field(default_factory=dict, description="评查配置") + score: float = Field(0, description="分值") + finalScore: float | None = Field(None, description="最终得分") + machineScore: float | None = Field(None, description="机器得分") + result: bool | None = Field(None, description="是否通过") + failMessage: str = Field("", description="失败提示") + passMessage: str = Field("", description="通过提示") + evaluatedPointResultsLog: dict[str, Any] = Field(default_factory=dict, description="规则执行日志") + + +class ReviewPointStatsVO(BaseModel): + """统计信息。""" + + total: int = Field(0, description="总数") + success: int = Field(0, description="通过数") + warning: int = Field(0, description="警告数") + error: int = Field(0, description="错误数") + score: float = Field(0, description="总分") + + +class ReviewPointInfoVO(BaseModel): + """评查摘要信息。""" + + reviewTime: str = Field("", description="评查时间") + reviewModel: str = Field("DeepSeek", description="评查模型") + ruleGroup: str = Field("", description="规则组") + result: str = Field("success", description="总体结果") + issueCount: int = Field(0, description="问题数") + + +class ReviewPointsAggregateVO(BaseModel): + """评查详情聚合响应。""" + + data: list[ReviewPointResultVO] = Field(default_factory=list, description="评查点结果") + stats: ReviewPointStatsVO = Field(default_factory=ReviewPointStatsVO, description="统计信息") + reviewInfo: ReviewPointInfoVO = Field(default_factory=ReviewPointInfoVO, description="评查摘要") + document: dict[str, Any] | None = Field(default=None, description="文档信息") + comparison_document: dict[str, Any] | None = Field(default=None, description="比对文档信息") + scoring_proposals: list[dict[str, Any]] = Field(default_factory=list, description="评分提案") + + +class ReviewPointAuditVO(BaseModel): + """评查点人工审核更新结果。""" + + reviewPointResultId: int = Field(..., description="规则结果ID") + editAuditStatusId: int = Field(..., description="审核记录ID") + editAuditStatus: int = Field(..., description="审核状态") + overrideResult: bool | None = Field(default=None, description="人工覆盖结果") + message: str = Field("", description="审核意见") + + +class DocumentConfirmVO(BaseModel): + """文档确认评查结果。""" + + documentId: int = Field(..., description="文档ID") + auditStatus: int = Field(..., description="文档审核状态") diff --git a/fastapi_modules/fastapi_leaudit/services/documentService.py b/fastapi_modules/fastapi_leaudit/services/documentService.py index 6388bfe..218f6b7 100644 --- a/fastapi_modules/fastapi_leaudit/services/documentService.py +++ b/fastapi_modules/fastapi_leaudit/services/documentService.py @@ -3,12 +3,20 @@ from abc import ABC, abstractmethod from fastapi_modules.fastapi_leaudit.domian.vo.documentVo import ( + DocumentDetailVO, DocumentListPageVO, + DocumentStatusItemVO, + DocumentUpdateDTO, DocumentTypeCreateDTO, DocumentTypeItemVO, DocumentTypeUpdateDTO, DocumentUploadVO, ) +from fastapi_modules.fastapi_leaudit.domian.vo.reviewPointVo import ( + DocumentConfirmVO, + ReviewPointAuditVO, + ReviewPointsAggregateVO, +) class IDocumentService(ABC): @@ -22,9 +30,11 @@ class IDocumentService(ABC): ContentType: str | None, TypeId: int | None = None, TypeCode: str | None = None, + GroupId: int | None = None, Region: str = "default", FileRole: str = "primary", CreatedBy: int | None = None, + Attachments: list[tuple[str, bytes, str | None]] | None = None, AutoRun: bool = False, Speed: str = "normal", ) -> DocumentUploadVO: @@ -34,13 +44,18 @@ class IDocumentService(ABC): @abstractmethod async def ListDocuments( self, + CurrentUserId: int, Page: int = 1, PageSize: int = 20, Keyword: str | None = None, + DocumentNumber: str | None = None, TypeCode: str | None = None, + TypeIds: list[int] | None = None, + EntryModuleId: int | None = None, Region: str | None = None, ProcessingStatus: str | None = None, ResultStatus: str | None = None, + AuditStatus: int | None = None, UserId: int | None = None, DateFrom: str | None = None, DateTo: str | None = None, @@ -49,7 +64,58 @@ class IDocumentService(ABC): ... @abstractmethod - async def ListDocumentTypes(self, Ids: list[int] | None = None) -> list[DocumentTypeItemVO]: + async def GetDocument(self, CurrentUserId: int, Id: int) -> DocumentDetailVO: + """获取单个文档详情,并执行数据隔离校验。""" + ... + + @abstractmethod + async def GetReviewPoints(self, CurrentUserId: int, DocumentId: int) -> ReviewPointsAggregateVO: + """获取评查详情页所需的聚合数据。""" + ... + + @abstractmethod + async def AuditReviewPoint( + self, + CurrentUserId: int, + ReviewPointResultId: int, + Result: str, + Message: str, + ) -> ReviewPointAuditVO: + """更新单个评查点的人工审核状态。""" + ... + + @abstractmethod + async def ConfirmReviewResults(self, CurrentUserId: int, DocumentId: int) -> DocumentConfirmVO: + """确认文档评查结果。""" + ... + + @abstractmethod + async def GetDocumentsStatus(self, CurrentUserId: int, Ids: list[int]) -> list[DocumentStatusItemVO]: + """批量获取文档状态,并执行数据隔离校验。""" + ... + + @abstractmethod + async def UpdateDocument(self, CurrentUserId: int, Id: int, Body: DocumentUpdateDTO) -> DocumentDetailVO: + """更新文档元数据,并执行数据隔离校验。""" + ... + + @abstractmethod + async def DeleteDocument(self, CurrentUserId: int, Id: int) -> None: + """软删除文档,并执行数据隔离校验。""" + ... + + @abstractmethod + async def AppendAttachments( + self, + CurrentUserId: int, + Id: int, + Files: list[tuple[str, bytes, str | None]], + ) -> DocumentDetailVO: + """为现有文档追加附件,并执行数据隔离校验。""" + ... + + @abstractmethod + async def ListDocumentTypes(self, Ids: list[int] | None = None, EntryModuleId: int | None = None) -> list[DocumentTypeItemVO]: """获取文档类型列表。""" ... diff --git a/fastapi_modules/fastapi_leaudit/services/impl/documentServiceImpl.py b/fastapi_modules/fastapi_leaudit/services/impl/documentServiceImpl.py index 3e55f84..f865332 100644 --- a/fastapi_modules/fastapi_leaudit/services/impl/documentServiceImpl.py +++ b/fastapi_modules/fastapi_leaudit/services/impl/documentServiceImpl.py @@ -2,6 +2,7 @@ from __future__ import annotations +from typing import Any from datetime import date as date_type, datetime import hashlib import mimetypes @@ -19,18 +20,31 @@ from fastapi_common.fastapi_common_web.exception.LeauditException import Leaudit from fastapi_common.fastapi_common_storage.oss_path_utils import OssPathUtils from fastapi_modules.fastapi_leaudit.domian.vo.documentVo import ( + DocumentAttachmentVO, + DocumentDetailVO, DocumentHistoryVersionVO, DocumentListItemVO, DocumentListPageVO, + DocumentStatusItemVO, + DocumentUpdateDTO, DocumentTypeCreateDTO, DocumentTypeItemVO, DocumentTypeUpdateDTO, DocumentUploadVO, ) +from fastapi_modules.fastapi_leaudit.domian.vo.reviewPointVo import ( + DocumentConfirmVO, + ReviewPointAuditVO, + ReviewPointInfoVO, + ReviewPointResultVO, + ReviewPointStatsVO, + ReviewPointsAggregateVO, +) from fastapi_modules.fastapi_leaudit.models import LeauditDocument, LeauditDocumentFile from fastapi_modules.fastapi_leaudit.services import IAuditService, IDocumentService, IOssService from fastapi_modules.fastapi_leaudit.services.impl.auditServiceImpl import AuditServiceImpl from fastapi_modules.fastapi_leaudit.services.impl.ossServiceImpl import OssServiceImpl +from fastapi_modules.fastapi_leaudit.services.impl.ruleGroupSupport import sync_group_bindings_from_doc_type class DocumentServiceImpl(IDocumentService): @@ -51,9 +65,11 @@ class DocumentServiceImpl(IDocumentService): ContentType: str | None, TypeId: int | None = None, TypeCode: str | None = None, + GroupId: int | None = None, Region: str = "default", FileRole: str = "primary", CreatedBy: int | None = None, + Attachments: list[tuple[str, bytes, str | None]] | None = None, AutoRun: bool = False, Speed: str = "normal", ) -> DocumentUploadVO: @@ -76,6 +92,7 @@ class DocumentServiceImpl(IDocumentService): uploadedAt = datetime.now() async with GetAsyncSession() as Session: + await self._ensureDocumentGroupColumn(Session) if TypeId is not None and TypeCode is not None: typeResult = await Session.execute( text( @@ -122,6 +139,7 @@ class DocumentServiceImpl(IDocumentService): resolvedTypeId = int(typeRow["id"]) resolvedTypeCode = str(typeRow["code"]) + resolvedGroupId = await self._resolveDocumentGroupId(Session, resolvedTypeId, GroupId) duplicateUpload = False previousVersionId: int | None = None rootVersionId: int | None = None @@ -161,6 +179,7 @@ class DocumentServiceImpl(IDocumentService): Session, bizDocumentId=internalDocumentNo, typeId=resolvedTypeId, + groupId=resolvedGroupId, region=normalizedRegion, processingStatus="waiting", versionGroupKey=versionGroupKey, @@ -216,6 +235,17 @@ class DocumentServiceImpl(IDocumentService): await Session.refresh(document) await Session.refresh(documentFile) + if normalizedFileRole == "primary" and Attachments: + await self._appendAttachmentFiles( + Session=Session, + DocumentId=document.Id, + TypeCode=resolvedTypeCode, + Region=normalizedRegion, + VersionNo=int(document.versionNo or 1), + Files=Attachments, + CreatedBy=CreatedBy, + ) + ossUrl = documentFile.ossUrl or "" run = None @@ -239,6 +269,7 @@ class DocumentServiceImpl(IDocumentService): fileId=documentFile.Id, typeId=resolvedTypeId, typeCode=resolvedTypeCode, + groupId=resolvedGroupId, region=normalizedRegion, fileName=documentFile.fileName, ossUrl=ossUrl, @@ -250,13 +281,18 @@ class DocumentServiceImpl(IDocumentService): async def ListDocuments( self, + CurrentUserId: int, Page: int = 1, PageSize: int = 20, Keyword: str | None = None, + DocumentNumber: str | None = None, TypeCode: str | None = None, + TypeIds: list[int] | None = None, + EntryModuleId: int | None = None, Region: str | None = None, ProcessingStatus: str | None = None, ResultStatus: str | None = None, + AuditStatus: int | None = None, UserId: int | None = None, DateFrom: str | None = None, DateTo: str | None = None, @@ -266,27 +302,51 @@ class DocumentServiceImpl(IDocumentService): page_size = max(1, min(int(PageSize), 100)) offset = (page - 1) * page_size + async with GetAsyncSession() as Session: + await self._ensureDocumentGroupColumn(Session) + documentColumns = await self._loadDocumentColumns(Session) + + currentUser = await self._getCurrentUserContext(CurrentUserId) filters = ["d.is_latest_version = true", "d.deleted_at IS NULL", "f.is_active = true", "f.file_role = 'primary'"] params: dict[str, object] = {"limit": page_size, "offset": offset} + filters.extend( + self._buildDocumentScopeFilters( + CurrentUserId=CurrentUserId, + CurrentUser=currentUser, + Params=params, + RequestedRegion=Region, + RequestedUserId=UserId, + DocumentAlias="d", + FileAlias="f", + ) + ) if Keyword: - filters.append("(f.file_name ILIKE :keyword OR d.normalized_name ILIKE :keyword)") + if "document_number" in documentColumns: + filters.append("(f.file_name ILIKE :keyword OR d.normalized_name ILIKE :keyword OR d.document_number ILIKE :keyword)") + else: + filters.append("(f.file_name ILIKE :keyword OR d.normalized_name ILIKE :keyword)") params["keyword"] = f"%{Keyword.strip()}%" + if DocumentNumber and "document_number" in documentColumns: + filters.append("d.document_number ILIKE :document_number") + params["document_number"] = f"%{DocumentNumber.strip()}%" if TypeCode: filters.append("dt.code = :type_code") params["type_code"] = TypeCode.strip() - if Region: - filters.append("d.region = :region") - params["region"] = Region.strip() + if TypeIds: + normalizedTypeIds = [int(typeId) for typeId in TypeIds if int(typeId) > 0] + if normalizedTypeIds: + filters.append("d.type_id = ANY(:type_ids)") + params["type_ids"] = normalizedTypeIds + if EntryModuleId is not None and int(EntryModuleId) > 0: + filters.append("dt.entry_module_id = :entry_module_id") + params["entry_module_id"] = int(EntryModuleId) if ProcessingStatus: filters.append("d.processing_status = :processing_status") params["processing_status"] = ProcessingStatus.strip() if ResultStatus: filters.append("ar.result_status = :result_status") params["result_status"] = ResultStatus.strip() - if UserId is not None: - filters.append("f.created_by = :user_id") - params["user_id"] = UserId if DateFrom: filters.append("d.created_at >= :date_from") params["date_from"] = date_type.fromisoformat(DateFrom.strip()) @@ -296,6 +356,45 @@ class DocumentServiceImpl(IDocumentService): where_clause = " AND ".join(filters) + derivedAuditStatusExpr = """ + CASE + WHEN LOWER(COALESCE(ar.status, '')) IN ('queued', 'running') + OR LOWER(COALESCE(d.processing_status, '')) = 'running' THEN 2 + WHEN LOWER(COALESCE(d.processing_status, '')) IN ('waiting', 'pending') THEN 0 + WHEN LOWER(COALESCE(d.processing_status, '')) = 'failed' THEN -1 + WHEN COALESCE(ar.failed_count, 0) > 0 THEN -1 + WHEN COALESCE(ar.passed_count, 0) > 0 THEN 1 + ELSE 0 + END + """.strip() + persistedOrDerivedAuditStatusExpr = ( + f"COALESCE(d.audit_status, {derivedAuditStatusExpr})" + if "audit_status" in documentColumns + else derivedAuditStatusExpr + ) + + if AuditStatus is not None: + filters.append(f"{persistedOrDerivedAuditStatusExpr} = :audit_status") + params["audit_status"] = int(AuditStatus) + where_clause = " AND ".join(filters) + + groupIdSelectExpr = "d.group_id" if "group_id" in documentColumns else "NULL::bigint" + # Transitional display fallback: if the document itself has not yet persisted a child group, + # but its document type currently maps to exactly one active child group, surface it in list/detail UI. + resolvedGroupIdExpr = f"COALESCE({groupIdSelectExpr}, dg.inferred_group_id)" + resolvedGroupNameExpr = ( + f"CASE WHEN {groupIdSelectExpr} IS NOT NULL THEN eg.name ELSE dg.inferred_group_name END" + ) + + optionalSelects = [ + f"{resolvedGroupIdExpr} AS group_id", + "d.document_number AS document_number" if "document_number" in documentColumns else "NULL::text AS document_number", + f"{persistedOrDerivedAuditStatusExpr} AS audit_status", + "d.is_test_document AS is_test_document" if "is_test_document" in documentColumns else "FALSE AS is_test_document", + "dt.name AS type_name", + f"{resolvedGroupNameExpr} AS group_name", + ] + count_sql = text( f""" SELECT COUNT(*) @@ -304,6 +403,21 @@ class DocumentServiceImpl(IDocumentService): ON f.document_id = d.id LEFT JOIN leaudit_document_types dt ON dt.id = d.type_id + LEFT JOIN leaudit_evaluation_point_groups eg + ON eg.id = d.group_id + LEFT JOIN ( + SELECT + document_type_id, + CASE WHEN COUNT(*) = 1 THEN MIN(id) END AS inferred_group_id, + CASE WHEN COUNT(*) = 1 THEN MIN(name) END AS inferred_group_name + FROM leaudit_evaluation_point_groups + WHERE pid <> 0 + AND deleted_at IS NULL + AND is_enabled = true + AND document_type_id IS NOT NULL + GROUP BY document_type_id + ) dg + ON dg.document_type_id = d.type_id LEFT JOIN leaudit_audit_runs ar ON ar.id = d.current_run_id WHERE {where_clause} @@ -338,6 +452,7 @@ class DocumentServiceImpl(IDocumentService): ar.passed_count, ar.failed_count, ar.skipped_count, + {', '.join(optionalSelects)}, vc.total_versions, COALESCE(vc.total_versions, 1) > 1 AS has_history FROM leaudit_documents d @@ -345,6 +460,21 @@ class DocumentServiceImpl(IDocumentService): ON f.document_id = d.id LEFT JOIN leaudit_document_types dt ON dt.id = d.type_id + LEFT JOIN leaudit_evaluation_point_groups eg + ON eg.id = d.group_id + LEFT JOIN ( + SELECT + document_type_id, + CASE WHEN COUNT(*) = 1 THEN MIN(id) END AS inferred_group_id, + CASE WHEN COUNT(*) = 1 THEN MIN(name) END AS inferred_group_name + FROM leaudit_evaluation_point_groups + WHERE pid <> 0 + AND deleted_at IS NULL + AND is_enabled = true + AND document_type_id IS NOT NULL + GROUP BY document_type_id + ) dg + ON dg.document_type_id = d.type_id LEFT JOIN leaudit_audit_runs ar ON ar.id = d.current_run_id LEFT JOIN ( @@ -425,6 +555,9 @@ class DocumentServiceImpl(IDocumentService): previousVersionId=int(row["previous_version_id"]) if row["previous_version_id"] is not None else None, typeId=int(row["type_id"]) if row["type_id"] is not None else None, typeCode=row["type_code"], + typeName=row["type_name"], + groupId=int(row["group_id"]) if row["group_id"] is not None else None, + groupName=row["group_name"], region=row["region"], normalizedName=row["normalized_name"], fileId=int(row["file_id"]) if row["file_id"] is not None else None, @@ -441,6 +574,9 @@ class DocumentServiceImpl(IDocumentService): passedCount=int(row["passed_count"]) if row["passed_count"] is not None else None, failedCount=int(row["failed_count"]) if row["failed_count"] is not None else None, skippedCount=int(row["skipped_count"]) if row["skipped_count"] is not None else None, + documentNumber=row["document_number"], + auditStatus=int(row["audit_status"]) if row["audit_status"] is not None else None, + isTestDocument=bool(row["is_test_document"]), updatedAt=row["updated_at"].isoformat() if row["updated_at"] else None, hasHistory=bool(row["has_history"]), totalVersions=int(row["total_versions"] or 1), @@ -457,11 +593,479 @@ class DocumentServiceImpl(IDocumentService): documents=documents, ) + async def GetDocument(self, CurrentUserId: int, Id: int) -> DocumentDetailVO: + """获取单个文档详情,并执行数据隔离校验。""" + async with GetAsyncSession() as Session: + await self._ensureDocumentGroupColumn(Session) + currentUser = await self._getCurrentUserContext(CurrentUserId) + documentColumns = await self._loadDocumentColumns(Session) + detail = await self._getDocumentDetail(Session, Id, CurrentUserId, currentUser, documentColumns) + if not detail: + raise LeauditException(StatusCodeEnum.HTTP_404_NOT_FOUND, "文档不存在或无权访问") + return detail - async def ListDocumentTypes(self, Ids: list[int] | None = None) -> list[DocumentTypeItemVO]: + async def GetReviewPoints(self, CurrentUserId: int, DocumentId: int) -> ReviewPointsAggregateVO: + """获取评查详情页聚合数据。""" + async with GetAsyncSession() as Session: + await self._ensureDocumentGroupColumn(Session) + await self._ensureReviewPointAuditTable(Session) + currentUser = await self._getCurrentUserContext(CurrentUserId) + documentColumns = await self._loadDocumentColumns(Session) + detail = await self._getDocumentDetail(Session, DocumentId, CurrentUserId, currentUser, documentColumns) + if not detail: + raise LeauditException(StatusCodeEnum.HTTP_404_NOT_FOUND, "文档不存在或无权访问") + + runRow = await self._loadReviewRunRow(Session, detail) + reviewPoints: list[ReviewPointResultVO] = [] + stats = ReviewPointStatsVO() + + if runRow: + reviewPoints = await self._loadReviewPointResults(Session, detail, int(runRow["id"])) + stats = self._buildReviewPointStats(reviewPoints) + + documentPayload = await self._buildReviewDocumentPayload(Session, detail, runRow) + reviewInfo = self._buildReviewInfo(runRow, reviewPoints, stats) + comparisonDocument = await self._loadComparisonDocument(Session, detail.documentId) + scoringProposals = await self._loadScoringProposals(Session, detail.documentId) + + return ReviewPointsAggregateVO( + data=reviewPoints, + stats=stats, + reviewInfo=reviewInfo, + document=documentPayload, + comparison_document=comparisonDocument, + scoring_proposals=scoringProposals, + ) + + async def AuditReviewPoint( + self, + CurrentUserId: int, + ReviewPointResultId: int, + Result: str, + Message: str, + ) -> ReviewPointAuditVO: + """更新单个评查点的人工审核状态。""" + normalizedResult = (Result or "").strip().lower() + if normalizedResult not in {"true", "false", "review"}: + raise LeauditException(StatusCodeEnum.HTTP_400_BAD_REQUEST, "result 仅支持 true、false、review") + + async with GetAsyncSession() as Session: + await self._ensureDocumentGroupColumn(Session) + await self._ensureReviewPointAuditTable(Session) + + row = ( + await Session.execute( + text( + """ + SELECT document_id + FROM leaudit_rule_results + WHERE id = :result_id + LIMIT 1 + """ + ), + {"result_id": ReviewPointResultId}, + ) + ).mappings().first() + if not row: + raise LeauditException(StatusCodeEnum.HTTP_404_NOT_FOUND, "评查点结果不存在") + + documentId = int(row["document_id"]) + currentUser = await self._getCurrentUserContext(CurrentUserId) + documentColumns = await self._loadDocumentColumns(Session) + detail = await self._getDocumentDetail(Session, documentId, CurrentUserId, currentUser, documentColumns) + if not detail: + raise LeauditException(StatusCodeEnum.HTTP_404_NOT_FOUND, "文档不存在或无权访问") + + payload = { + "rule_result_id": ReviewPointResultId, + "document_id": documentId, + "edit_audit_status": 0 if normalizedResult == "review" else 1, + "override_result": None if normalizedResult == "review" else (normalizedResult == "true"), + "message": "" if normalizedResult == "review" else (Message or ""), + "reviewed_by": CurrentUserId, + } + auditRow = ( + await Session.execute( + text( + """ + INSERT INTO leaudit_review_point_audits ( + rule_result_id, + document_id, + edit_audit_status, + override_result, + message, + reviewed_by, + created_at, + updated_at + ) VALUES ( + :rule_result_id, + :document_id, + :edit_audit_status, + :override_result, + :message, + :reviewed_by, + NOW(), + NOW() + ) + ON CONFLICT (rule_result_id) DO UPDATE SET + edit_audit_status = EXCLUDED.edit_audit_status, + override_result = EXCLUDED.override_result, + message = EXCLUDED.message, + reviewed_by = EXCLUDED.reviewed_by, + updated_at = NOW() + RETURNING id, edit_audit_status, override_result, message + """ + ), + payload, + ) + ).mappings().first() + await Session.commit() + + return ReviewPointAuditVO( + reviewPointResultId=ReviewPointResultId, + editAuditStatusId=int(auditRow["id"]), + editAuditStatus=int(auditRow["edit_audit_status"] or 0), + overrideResult=bool(auditRow["override_result"]) if auditRow["override_result"] is not None else None, + message=str(auditRow["message"] or ""), + ) + + async def ConfirmReviewResults(self, CurrentUserId: int, DocumentId: int) -> DocumentConfirmVO: + """确认文档评查结果。""" + async with GetAsyncSession() as Session: + await self._ensureDocumentGroupColumn(Session) + currentUser = await self._getCurrentUserContext(CurrentUserId) + documentColumns = await self._loadDocumentColumns(Session) + detail = await self._getDocumentDetail(Session, DocumentId, CurrentUserId, currentUser, documentColumns) + if not detail: + raise LeauditException(StatusCodeEnum.HTTP_404_NOT_FOUND, "文档不存在或无权访问") + + if "audit_status" in documentColumns: + await Session.execute( + text( + """ + UPDATE leaudit_documents + SET audit_status = 1, updated_at = NOW() + WHERE id = :document_id + AND deleted_at IS NULL + """ + ), + {"document_id": DocumentId}, + ) + await Session.commit() + + return DocumentConfirmVO(documentId=DocumentId, auditStatus=1) + + async def GetDocumentsStatus(self, CurrentUserId: int, Ids: list[int]) -> list[DocumentStatusItemVO]: + """批量获取文档状态,并执行数据隔离校验。""" + normalizedIds = [int(docId) for docId in Ids if int(docId) > 0] + if not normalizedIds: + return [] + + async with GetAsyncSession() as Session: + currentUser = await self._getCurrentUserContext(CurrentUserId) + params: dict[str, object] = {"ids": normalizedIds} + filters = [ + "d.id = ANY(:ids)", + "d.deleted_at IS NULL", + "f.is_active = true", + "f.file_role = 'primary'", + ] + filters.extend( + self._buildDocumentScopeFilters( + CurrentUserId=CurrentUserId, + CurrentUser=currentUser, + Params=params, + DocumentAlias="d", + FileAlias="f", + ) + ) + + rows = ( + await Session.execute( + text( + f""" + SELECT + d.id AS document_id, + d.processing_status, + ar.status AS run_status, + ar.result_status, + d.updated_at + FROM leaudit_documents d + JOIN leaudit_document_files f + ON f.document_id = d.id + LEFT JOIN leaudit_audit_runs ar + ON ar.id = d.current_run_id + WHERE {' AND '.join(filters)} + ORDER BY d.updated_at DESC, d.id DESC + """ + ), + params, + ) + ).mappings().all() + + return [ + DocumentStatusItemVO( + documentId=int(row["document_id"]), + processingStatus=row["processing_status"], + runStatus=row["run_status"], + resultStatus=row["result_status"], + updatedAt=row["updated_at"].isoformat() if row["updated_at"] else None, + ) + for row in rows + ] + + async def UpdateDocument(self, CurrentUserId: int, Id: int, Body: DocumentUpdateDTO) -> DocumentDetailVO: + """更新文档元数据,并执行数据隔离校验。""" + async with GetAsyncSession() as Session: + await self._ensureDocumentGroupColumn(Session) + currentUser = await self._getCurrentUserContext(CurrentUserId) + documentColumns = await self._loadDocumentColumns(Session) + detail = await self._getDocumentDetail(Session, Id, CurrentUserId, currentUser, documentColumns) + if not detail: + raise LeauditException(StatusCodeEnum.HTTP_404_NOT_FOUND, "文档不存在或无权访问") + + assignments: list[str] = [] + params: dict[str, object] = {"id": Id} + + if Body.documentNumber is not None and "document_number" in documentColumns: + assignments.append("document_number = :document_number") + params["document_number"] = Body.documentNumber.strip() or None + if Body.remark is not None and "remark" in documentColumns: + assignments.append("remark = :remark") + params["remark"] = Body.remark.strip() or None + if Body.isTestDocument is not None and "is_test_document" in documentColumns: + assignments.append("is_test_document = :is_test_document") + params["is_test_document"] = Body.isTestDocument + if Body.auditStatus is not None and "audit_status" in documentColumns: + assignments.append("audit_status = :audit_status") + params["audit_status"] = Body.auditStatus + + if assignments: + assignments.append("updated_at = NOW()") + await Session.execute( + text(f"UPDATE leaudit_documents SET {', '.join(assignments)} WHERE id = :id AND deleted_at IS NULL"), + params, + ) + await Session.commit() + + refreshed = await self._getDocumentDetail(Session, Id, CurrentUserId, currentUser, documentColumns) + if not refreshed: + raise LeauditException(StatusCodeEnum.HTTP_404_NOT_FOUND, "文档不存在或无权访问") + return refreshed + + async def _appendAttachmentFiles( + self, + Session, + *, + DocumentId: int, + TypeCode: str, + Region: str, + VersionNo: int, + Files: list[tuple[str, bytes, str | None]], + CreatedBy: int | None, + ) -> None: + """为文档追加附件文件记录。""" + versionLabel = f"v{VersionNo}" + uploadedAt = datetime.now() + + for fileName, fileContent, contentType in Files: + if not fileName: + raise LeauditException(StatusCodeEnum.HTTP_400_BAD_REQUEST, "附件文件名不能为空") + if not fileContent: + raise LeauditException(StatusCodeEnum.HTTP_400_BAD_REQUEST, f"附件 {fileName} 内容不能为空") + + fileExt = Path(fileName).suffix.lstrip(".").lower() or None + mimeType = contentType or mimetypes.guess_type(fileName)[0] or "application/octet-stream" + fileSha256 = hashlib.sha256(fileContent).hexdigest() + fileSize = len(fileContent) + objectKey = OssPathUtils.BuildBusinessDocKey( + Region=Region, + TypeCode=TypeCode, + DocumentId=DocumentId, + Version=versionLabel, + FileRole="attachment", + FileName=fileName, + Year=uploadedAt.year, + Month=uploadedAt.month, + ) + ossUrl = await self.OssService.UploadBytes( + ObjectKey=objectKey, + Content=fileContent, + ContentType=mimeType, + ) + + attachment = LeauditDocumentFile( + documentId=DocumentId, + fileRole="attachment", + fileName=fileName, + fileExt=fileExt, + mimeType=mimeType, + fileSize=fileSize, + sha256=fileSha256, + localPath=None, + ossUrl=ossUrl, + storageProvider="minio", + isActive=True, + createdBy=CreatedBy, + ) + Session.add(attachment) + + await Session.commit() + + async def DeleteDocument(self, CurrentUserId: int, Id: int) -> None: + """软删除文档,并执行数据隔离校验。""" + async with GetAsyncSession() as Session: + currentUser = await self._getCurrentUserContext(CurrentUserId) + filters = ["d.id = :id", "d.deleted_at IS NULL", "f.is_active = true", "f.file_role = 'primary'"] + params: dict[str, object] = {"id": Id} + filters.extend( + self._buildDocumentScopeFilters( + CurrentUserId=CurrentUserId, + CurrentUser=currentUser, + Params=params, + DocumentAlias="d", + FileAlias="f", + ) + ) + row = ( + await Session.execute( + text( + f""" + SELECT d.id, d.version_group_key, d.is_latest_version + FROM leaudit_documents d + JOIN leaudit_document_files f + ON f.document_id = d.id + WHERE {' AND '.join(filters)} + LIMIT 1 + """ + ), + params, + ) + ).mappings().first() + if not row: + raise LeauditException(StatusCodeEnum.HTTP_404_NOT_FOUND, "文档不存在或无权访问") + + versionGroupKey = str(row["version_group_key"] or "") + isLatestVersion = bool(row["is_latest_version"]) + + await Session.execute( + text( + """ + UPDATE leaudit_documents + SET deleted_at = NOW(), is_latest_version = false, updated_at = NOW() + WHERE id = :id AND deleted_at IS NULL + """ + ), + {"id": Id}, + ) + await Session.execute( + text( + """ + UPDATE leaudit_document_files + SET is_active = false + WHERE document_id = :id AND is_active = true + """ + ), + {"id": Id}, + ) + + if isLatestVersion and versionGroupKey: + nextLatest = ( + await Session.execute( + text( + """ + SELECT id + FROM leaudit_documents + WHERE version_group_key = :group_key + AND deleted_at IS NULL + ORDER BY version_no DESC, id DESC + LIMIT 1 + """ + ), + {"group_key": versionGroupKey}, + ) + ).scalar_one_or_none() + if nextLatest is not None: + await Session.execute( + text( + """ + UPDATE leaudit_documents + SET is_latest_version = true, updated_at = NOW() + WHERE id = :id + """ + ), + {"id": int(nextLatest)}, + ) + + await Session.commit() + + async def AppendAttachments( + self, + CurrentUserId: int, + Id: int, + Files: list[tuple[str, bytes, str | None]], + ) -> DocumentDetailVO: + """为现有文档追加附件,并执行数据隔离校验。""" + if not Files: + raise LeauditException(StatusCodeEnum.HTTP_400_BAD_REQUEST, "至少上传一个附件文件") + + async with GetAsyncSession() as Session: + await self._ensureDocumentGroupColumn(Session) + currentUser = await self._getCurrentUserContext(CurrentUserId) + documentColumns = await self._loadDocumentColumns(Session) + detail = await self._getDocumentDetail(Session, Id, CurrentUserId, currentUser, documentColumns) + if not detail: + raise LeauditException(StatusCodeEnum.HTTP_404_NOT_FOUND, "文档不存在或无权访问") + + documentMeta = ( + await Session.execute( + text( + """ + SELECT + d.id AS document_id, + d.type_id, + d.version_no, + d.region, + dt.code AS type_code + FROM leaudit_documents d + LEFT JOIN leaudit_document_types dt + ON dt.id = d.type_id + WHERE d.id = :document_id + AND d.deleted_at IS NULL + LIMIT 1 + """ + ), + {"document_id": Id}, + ) + ).mappings().first() + if not documentMeta: + raise LeauditException(StatusCodeEnum.HTTP_404_NOT_FOUND, "文档不存在或无权访问") + + resolvedTypeCode = str(documentMeta["type_code"] or "").strip() + if not resolvedTypeCode: + raise LeauditException(StatusCodeEnum.HTTP_400_BAD_REQUEST, "当前文档缺少文档类型编码,无法追加附件") + + normalizedRegion = str(documentMeta["region"] or "default").strip() or "default" + await self._appendAttachmentFiles( + Session=Session, + DocumentId=int(documentMeta["document_id"]), + TypeCode=resolvedTypeCode, + Region=normalizedRegion, + VersionNo=int(documentMeta["version_no"] or 1), + Files=Files, + CreatedBy=CurrentUserId, + ) + + refreshed = await self._getDocumentDetail(Session, Id, CurrentUserId, currentUser, documentColumns) + if not refreshed: + raise LeauditException(StatusCodeEnum.HTTP_404_NOT_FOUND, "文档不存在或无权访问") + return refreshed + + + async def ListDocumentTypes(self, Ids: list[int] | None = None, EntryModuleId: int | None = None) -> list[DocumentTypeItemVO]: """获取文档类型列表。""" async with GetAsyncSession() as Session: - rows, bindingsMap = await self._queryDocumentTypes(Session, Ids) + rows, bindingsMap = await self._queryDocumentTypes(Session, Ids, EntryModuleId) return [ DocumentTypeItemVO( id=int(r["id"]), name=str(r["name"] or ""), code=str(r["code"] or ""), @@ -518,6 +1122,7 @@ class DocumentServiceImpl(IDocumentService): ) ).scalar_one() await self._syncRuleBindings(Session, int(row), Body.ruleSetIds, "default") + await sync_group_bindings_from_doc_type(Session, int(row), Body.ruleSetIds) await Session.commit() return await self.GetDocumentType(int(row)) @@ -534,29 +1139,33 @@ class DocumentServiceImpl(IDocumentService): if not current: raise LeauditException(StatusCodeEnum.HTTP_404_NOT_FOUND, "文档类型不存在") + providedFields = set(getattr(Body, "model_fields_set", set())) sets: list[str] = [] params: dict[str, object] = {"id": Id} - if Body.name is not None: + if "name" in providedFields and Body.name is not None: sets.append("name = :name") params["name"] = Body.name.strip() - if Body.description is not None: + if "description" in providedFields: sets.append("description = :description") - params["description"] = Body.description.strip() or None - if Body.entryModuleId is not None: + params["description"] = Body.description.strip() if Body.description is not None else None + if params["description"] == "": + params["description"] = None + if "entryModuleId" in providedFields: sets.append("entry_module_id = :entry_module_id") params["entry_module_id"] = Body.entryModuleId - if Body.isEnabled is not None: + if "isEnabled" in providedFields and Body.isEnabled is not None: sets.append("is_enabled = :is_enabled") params["is_enabled"] = Body.isEnabled - if Body.sortOrder is not None: + if "sortOrder" in providedFields and Body.sortOrder is not None: sets.append("sort_order = :sort_order") params["sort_order"] = Body.sortOrder if sets: sets.append("updated_at = NOW()") await Session.execute(text(f"UPDATE leaudit_document_types SET {', '.join(sets)} WHERE id = :id"), params) - if Body.ruleSetIds is not None: + if "ruleSetIds" in providedFields and Body.ruleSetIds is not None: await self._syncRuleBindings(Session, Id, Body.ruleSetIds, "default") + await sync_group_bindings_from_doc_type(Session, Id, Body.ruleSetIds) await Session.commit() return await self.GetDocumentType(Id) @@ -576,7 +1185,7 @@ class DocumentServiceImpl(IDocumentService): await Session.execute(text("UPDATE leaudit_rule_type_bindings SET deleted_at = NOW() WHERE doc_type_id = :id AND deleted_at IS NULL"), {"id": Id}) await Session.commit() - async def _queryDocumentTypes(self, Session, Ids: list[int] | None = None): + async def _queryDocumentTypes(self, Session, Ids: list[int] | None = None, EntryModuleId: int | None = None): """查询文档类型及其规则绑定。""" if Ids: rows = ( @@ -592,6 +1201,21 @@ class DocumentServiceImpl(IDocumentService): {"ids": Ids}, ) ).mappings().all() + elif EntryModuleId is not None: + rows = ( + await Session.execute( + text( + """ + SELECT id, code, name, description, entry_module_id, is_enabled, sort_order + FROM leaudit_document_types + WHERE deleted_at IS NULL + AND entry_module_id = :entry_module_id + ORDER BY sort_order ASC, id ASC + """ + ), + {"entry_module_id": EntryModuleId}, + ) + ).mappings().all() else: rows = ( await Session.execute( @@ -626,6 +1250,894 @@ class DocumentServiceImpl(IDocumentService): bindingsMap.setdefault(int(b[0]), []).append(int(b[1])) return rows, bindingsMap + async def _loadDocumentColumns(self, Session) -> set[str]: + """读取 leaudit_documents 当前真实列,兼容不同环境的渐进式字段上线。""" + rows = ( + await Session.execute( + text( + """ + SELECT column_name + FROM information_schema.columns + WHERE table_schema = 'public' + AND table_name = 'leaudit_documents' + """ + ) + ) + ).fetchall() + return {str(row[0]) for row in rows} + + async def _ensureReviewPointAuditTable(self, Session) -> None: + """补齐详情页人工审核表。""" + await Session.execute( + text( + """ + CREATE TABLE IF NOT EXISTS leaudit_review_point_audits ( + id BIGSERIAL PRIMARY KEY, + rule_result_id BIGINT NOT NULL UNIQUE, + document_id BIGINT NOT NULL, + edit_audit_status INTEGER NOT NULL DEFAULT 0, + override_result BOOLEAN NULL, + message TEXT NOT NULL DEFAULT '', + reviewed_by BIGINT NULL, + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW() + ) + """ + ) + ) + await Session.execute( + text( + "CREATE INDEX IF NOT EXISTS idx_leaudit_review_point_audits_document_id ON leaudit_review_point_audits(document_id)" + ) + ) + + async def _tableExists(self, Session, TableName: str) -> bool: + """检查表是否存在。""" + row = ( + await Session.execute( + text( + """ + SELECT EXISTS ( + SELECT 1 + FROM information_schema.tables + WHERE table_schema = 'public' + AND table_name = :table_name + ) AS exists_flag + """ + ), + {"table_name": TableName}, + ) + ).mappings().first() + return bool(row["exists_flag"]) if row else False + + async def _loadTableColumns(self, Session, TableName: str) -> set[str]: + """读取任意 public 表的真实列。""" + rows = ( + await Session.execute( + text( + """ + SELECT column_name + FROM information_schema.columns + WHERE table_schema = 'public' + AND table_name = :table_name + """ + ), + {"table_name": TableName}, + ) + ).fetchall() + return {str(row[0]) for row in rows} + + async def _ensureDocumentGroupColumn(self, Session) -> None: + """渐进式补齐文档二级分组字段,避免旧环境缺列。""" + await Session.execute( + text( + """ + ALTER TABLE leaudit_documents + ADD COLUMN IF NOT EXISTS group_id BIGINT NULL + REFERENCES leaudit_evaluation_point_groups(id) + """ + ) + ) + await Session.execute( + text("CREATE INDEX IF NOT EXISTS idx_leaudit_documents_group_id ON leaudit_documents(group_id)") + ) + + async def _resolveDocumentGroupId(self, Session, TypeId: int, GroupId: int | None) -> int | None: + """校验上传时选择的二级分组是否属于当前文档类型。""" + if GroupId is None: + return None + row = ( + await Session.execute( + text( + """ + SELECT id + FROM leaudit_evaluation_point_groups + WHERE id = :group_id + AND document_type_id = :doc_type_id + AND deleted_at IS NULL + AND COALESCE(pid, 0) <> 0 + LIMIT 1 + """ + ), + {"group_id": GroupId, "doc_type_id": TypeId}, + ) + ).mappings().first() + if not row: + raise LeauditException(StatusCodeEnum.HTTP_400_BAD_REQUEST, "当前子类型不属于所选文档类型,无法上传") + return int(row["id"]) + + async def _getDocumentDetail( + self, + Session, + DocumentId: int, + CurrentUserId: int, + CurrentUser: dict[str, Any], + DocumentColumns: set[str], + ) -> DocumentDetailVO | None: + """查询单文档详情,并附带历史版本。""" + params: dict[str, object] = {"id": DocumentId} + filters = ["d.id = :id", "d.deleted_at IS NULL", "f.is_active = true", "f.file_role = 'primary'"] + filters.extend( + self._buildDocumentScopeFilters( + CurrentUserId=CurrentUserId, + CurrentUser=CurrentUser, + Params=params, + DocumentAlias="d", + FileAlias="f", + ) + ) + whereClause = " AND ".join(filters) + + groupIdSelectExpr = "d.group_id" if "group_id" in DocumentColumns else "NULL::bigint" + resolvedGroupIdExpr = f"COALESCE({groupIdSelectExpr}, dg.inferred_group_id)" + resolvedGroupNameExpr = ( + f"CASE WHEN {groupIdSelectExpr} IS NOT NULL THEN eg.name ELSE dg.inferred_group_name END" + ) + + optionalSelects = [ + f"{resolvedGroupIdExpr} AS group_id", + "d.document_number AS document_number" if "document_number" in DocumentColumns else "NULL::text AS document_number", + "d.remark AS remark" if "remark" in DocumentColumns else "NULL::text AS remark", + "d.is_test_document AS is_test_document" if "is_test_document" in DocumentColumns else "FALSE AS is_test_document", + "d.audit_status AS audit_status" if "audit_status" in DocumentColumns else "NULL::integer AS audit_status", + ] + + detailRow = ( + await Session.execute( + text( + f""" + SELECT + d.id AS document_id, + d.biz_document_id AS internal_document_no, + d.version_group_key, + d.version_no, + d.root_version_id, + d.previous_version_id, + d.type_id, + dt.code AS type_code, + dt.name AS type_name, + {resolvedGroupNameExpr} AS group_name, + d.region, + d.normalized_name, + d.processing_status, + d.current_run_id, + d.updated_at, + f.id AS file_id, + f.file_name, + f.file_ext, + f.mime_type, + f.file_size, + f.oss_url, + ar.status AS run_status, + ar.result_status, + ar.total_score, + ar.passed_count, + ar.failed_count, + ar.skipped_count, + vc.total_versions, + COALESCE(vc.total_versions, 1) > 1 AS has_history, + {', '.join(optionalSelects)} + FROM leaudit_documents d + JOIN leaudit_document_files f + ON f.document_id = d.id + LEFT JOIN leaudit_document_types dt + ON dt.id = d.type_id + LEFT JOIN leaudit_evaluation_point_groups eg + ON eg.id = d.group_id + LEFT JOIN ( + SELECT + document_type_id, + CASE WHEN COUNT(*) = 1 THEN MIN(id) END AS inferred_group_id, + CASE WHEN COUNT(*) = 1 THEN MIN(name) END AS inferred_group_name + FROM leaudit_evaluation_point_groups + WHERE pid <> 0 + AND deleted_at IS NULL + AND is_enabled = true + AND document_type_id IS NOT NULL + GROUP BY document_type_id + ) dg + ON dg.document_type_id = d.type_id + LEFT JOIN leaudit_audit_runs ar + ON ar.id = d.current_run_id + LEFT JOIN ( + SELECT version_group_key, COUNT(*) AS total_versions + FROM leaudit_documents + WHERE deleted_at IS NULL + GROUP BY version_group_key + ) vc + ON vc.version_group_key = d.version_group_key + WHERE {whereClause} + LIMIT 1 + """ + ), + params, + ) + ).mappings().first() + if not detailRow: + return None + + groupKey = str(detailRow["version_group_key"] or "") + historyVersions: list[DocumentHistoryVersionVO] = [] + if groupKey: + historyRows = ( + await Session.execute( + text( + """ + SELECT + d.id AS document_id, + d.version_no, + d.processing_status, + d.updated_at, + f.id AS file_id, + f.file_name, + f.file_ext, + ar.status AS run_status, + ar.result_status + FROM leaudit_documents d + JOIN leaudit_document_files f + ON f.document_id = d.id + AND f.is_active = true + AND f.file_role = 'primary' + LEFT JOIN leaudit_audit_runs ar + ON ar.id = d.current_run_id + WHERE d.version_group_key = :group_key + AND d.id <> :document_id + AND d.deleted_at IS NULL + ORDER BY d.version_no DESC, d.id DESC + """ + ), + {"group_key": groupKey, "document_id": int(detailRow["document_id"])}, + ) + ).mappings().all() + historyVersions = [ + DocumentHistoryVersionVO( + documentId=int(row["document_id"]), + fileId=int(row["file_id"]) if row["file_id"] is not None else None, + versionNo=int(row["version_no"]), + fileName=row["file_name"], + fileExt=row["file_ext"], + processingStatus=row["processing_status"], + runStatus=row["run_status"], + resultStatus=row["result_status"], + updatedAt=row["updated_at"].isoformat() if row["updated_at"] else None, + ) + for row in historyRows + ] + + attachmentRows = ( + await Session.execute( + text( + """ + SELECT + id, + file_name, + file_ext, + mime_type, + file_size, + file_role, + oss_url, + created_by, + created_at + FROM leaudit_document_files + WHERE document_id = :document_id + AND deleted_at IS NULL + AND is_active = true + AND file_role = 'attachment' + ORDER BY rr.id ASC + """ + ), + {"document_id": int(detailRow["document_id"])}, + ) + ).mappings().all() + attachments = [ + DocumentAttachmentVO( + fileId=int(row["id"]), + fileName=str(row["file_name"] or ""), + fileExt=row["file_ext"], + mimeType=row["mime_type"], + fileSize=int(row["file_size"]) if row["file_size"] is not None else None, + fileRole=str(row["file_role"] or "attachment"), + ossUrl=row["oss_url"], + createdBy=int(row["created_by"]) if row["created_by"] is not None else None, + createdAt=row["created_at"].isoformat() if row["created_at"] else None, + ) + for row in attachmentRows + ] + + return DocumentDetailVO( + documentId=int(detailRow["document_id"]), + internalDocumentNo=int(detailRow["internal_document_no"]), + versionGroupKey=groupKey, + versionNo=int(detailRow["version_no"] or 1), + rootVersionId=int(detailRow["root_version_id"] or detailRow["document_id"]), + previousVersionId=int(detailRow["previous_version_id"]) if detailRow["previous_version_id"] is not None else None, + typeId=int(detailRow["type_id"]) if detailRow["type_id"] is not None else None, + typeCode=detailRow["type_code"], + typeName=detailRow["type_name"], + groupId=int(detailRow["group_id"]) if detailRow["group_id"] is not None else None, + groupName=detailRow["group_name"], + region=str(detailRow["region"] or ""), + normalizedName=detailRow["normalized_name"], + fileId=int(detailRow["file_id"]) if detailRow["file_id"] is not None else None, + fileName=detailRow["file_name"], + fileExt=detailRow["file_ext"], + mimeType=detailRow["mime_type"], + fileSize=int(detailRow["file_size"]) if detailRow["file_size"] is not None else None, + ossUrl=detailRow["oss_url"], + processingStatus=detailRow["processing_status"], + currentRunId=int(detailRow["current_run_id"]) if detailRow["current_run_id"] is not None else None, + runStatus=detailRow["run_status"], + resultStatus=detailRow["result_status"], + totalScore=float(detailRow["total_score"]) if detailRow["total_score"] is not None else None, + passedCount=int(detailRow["passed_count"]) if detailRow["passed_count"] is not None else None, + failedCount=int(detailRow["failed_count"]) if detailRow["failed_count"] is not None else None, + skippedCount=int(detailRow["skipped_count"]) if detailRow["skipped_count"] is not None else None, + updatedAt=detailRow["updated_at"].isoformat() if detailRow["updated_at"] else None, + hasHistory=bool(detailRow["has_history"]), + totalVersions=int(detailRow["total_versions"] or 1), + historyVersions=historyVersions, + documentNumber=detailRow["document_number"], + remark=detailRow["remark"], + isTestDocument=bool(detailRow["is_test_document"]), + auditStatus=int(detailRow["audit_status"]) if detailRow["audit_status"] is not None else None, + pageCount=None, + attachments=attachments, + ) + + def _buildDocumentScopeFilters( + self, + CurrentUserId: int, + CurrentUser: dict[str, Any], + Params: dict[str, object], + DocumentAlias: str, + FileAlias: str, + RequestedRegion: str | None = None, + RequestedUserId: int | None = None, + ) -> list[str]: + """根据当前用户角色与地区构建文档数据范围过滤。""" + filters: list[str] = [] + requestedRegion = (RequestedRegion or "").strip() + area = str(CurrentUser["area"] or "").strip() + + if CurrentUser["is_global"]: + if requestedRegion: + filters.append(f"{DocumentAlias}.region = :requested_region") + Params["requested_region"] = requestedRegion + if RequestedUserId is not None: + filters.append(f"{FileAlias}.created_by = :requested_user_id") + Params["requested_user_id"] = RequestedUserId + return filters + + if CurrentUser["can_manage"]: + if not area: + filters.append("1 = 0") + return filters + if requestedRegion and requestedRegion != area: + filters.append("1 = 0") + return filters + filters.append(f"{DocumentAlias}.region = :scope_region") + Params["scope_region"] = area + if RequestedUserId is not None: + filters.append(f"{FileAlias}.created_by = :requested_user_id") + Params["requested_user_id"] = RequestedUserId + return filters + + filters.append(f"{FileAlias}.created_by = :scope_user_id") + Params["scope_user_id"] = CurrentUserId + if requestedRegion: + filters.append(f"{DocumentAlias}.region = :requested_region") + Params["requested_region"] = requestedRegion + if RequestedUserId is not None and RequestedUserId != CurrentUserId: + filters.append("1 = 0") + return filters + + async def _getCurrentUserContext(self, CurrentUserId: int) -> dict[str, Any]: + """加载当前用户上下文,用于文档数据隔离。""" + async with GetAsyncSession() as Session: + row = ( + await Session.execute( + text( + """ + SELECT + u.id, + COALESCE(u.area, '') AS area, + COALESCE(bool_or(r.role_key IN ('super_admin', 'provincial_admin')), FALSE) AS is_global, + COALESCE(bool_or(r.role_key IN ('super_admin', 'provincial_admin', 'admin')), FALSE) AS can_manage, + COALESCE(bool_or(r.role_key = 'super_admin'), FALSE) AS is_super_admin + FROM sso_users u + LEFT JOIN user_role ur ON ur.user_id = u.id + LEFT JOIN roles r ON r.id = ur.role_id + WHERE u.id = :user_id + GROUP BY u.id, u.area + """ + ), + {"user_id": CurrentUserId}, + ) + ).mappings().first() + if not row: + raise LeauditException(StatusCodeEnum.HTTP_404_NOT_FOUND, "当前用户不存在") + return { + "area": str(row["area"] or ""), + "is_global": bool(row["is_global"]), + "can_manage": bool(row["can_manage"]), + "is_super_admin": bool(row["is_super_admin"]), + } + + async def _loadReviewRunRow(self, Session, Detail: DocumentDetailVO) -> dict[str, Any] | None: + """定位当前文档可展示的最新评查运行。""" + params: dict[str, object] = {"document_id": Detail.documentId} + if Detail.currentRunId is not None: + params["run_id"] = Detail.currentRunId + row = ( + await Session.execute( + text( + """ + SELECT + id, + status, + result_status, + total_score, + passed_count, + failed_count, + skipped_count, + llm_model, + finished_at, + updated_at + FROM leaudit_audit_runs + WHERE id = :run_id + AND document_id = :document_id + LIMIT 1 + """ + ), + params, + ) + ).mappings().first() + if row: + return dict(row) + + row = ( + await Session.execute( + text( + """ + SELECT + id, + status, + result_status, + total_score, + passed_count, + failed_count, + skipped_count, + llm_model, + finished_at, + updated_at + FROM leaudit_audit_runs + WHERE document_id = :document_id + ORDER BY id DESC + LIMIT 1 + """ + ), + params, + ) + ).mappings().first() + return dict(row) if row else None + + async def _buildReviewDocumentPayload( + self, + Session, + Detail: DocumentDetailVO, + RunRow: dict[str, Any] | None, + ) -> dict[str, Any]: + """把新文档详情转换成旧详情页可直接消费的文档对象。""" + metaRow = ( + await Session.execute( + text( + """ + SELECT + d.created_at, + f.created_by, + COALESCE(u.nick_name, u.username, '') AS upload_user + FROM leaudit_documents d + JOIN leaudit_document_files f + ON f.document_id = d.id + AND f.is_active = true + AND f.file_role = 'primary' + LEFT JOIN sso_users u + ON u.id = f.created_by + WHERE d.id = :document_id + LIMIT 1 + """ + ), + {"document_id": Detail.documentId}, + ) + ).mappings().first() + + metricRow = None + if RunRow: + metricRow = ( + await Session.execute( + text( + """ + SELECT page_count + FROM leaudit_run_metrics + WHERE run_id = :run_id + ORDER BY id DESC + LIMIT 1 + """ + ), + {"run_id": int(RunRow["id"])}, + ) + ).mappings().first() + + createdAt = metaRow["created_at"] if metaRow else None + uploadUser = str(metaRow["upload_user"] or "") if metaRow else "" + pageCount = int(metricRow["page_count"]) if metricRow and metricRow["page_count"] is not None else 0 + + ocrResultPayload = await self._buildReviewOcrPayload(Session, Detail.documentId, RunRow, pageCount) + + return { + "id": Detail.documentId, + "documentId": Detail.documentId, + "name": Detail.fileName or Detail.normalizedName or f"文档{Detail.documentId}", + "path": Detail.ossUrl or "", + "fileName": Detail.fileName or "", + "fileType": (Detail.fileExt or "").lower(), + "size": Detail.fileSize, + "file_size": Detail.fileSize, + "documentNumber": Detail.documentNumber, + "document_number": Detail.documentNumber, + "type": str(Detail.typeId or ""), + "type_id": Detail.typeId, + "typeName": Detail.typeName, + "type_name": Detail.typeName, + "groupId": Detail.groupId, + "groupName": Detail.groupName, + "region": Detail.region, + "auditStatus": Detail.auditStatus or 0, + "audit_status": Detail.auditStatus or 0, + "uploadTime": _format_iso_datetime(createdAt) or Detail.updatedAt or "", + "created_at": _format_iso_datetime(createdAt) or Detail.updatedAt or "", + "updated_at": Detail.updatedAt or "", + "uploadUser": uploadUser, + "pageCount": pageCount, + "page_count": pageCount, + "ocrResult": ocrResultPayload, + "attachments": [item.model_dump() for item in Detail.attachments], + } + + async def _buildReviewOcrPayload( + self, + Session, + DocumentId: int, + RunRow: dict[str, Any] | None, + PageCount: int, + ) -> dict[str, Any]: + """构建旧详情页仍在读取的最小 ocrResult 结构。""" + pageMap: dict[str, set[int]] = {} + + def add_page(field_name: str | None, page_value: Any) -> None: + if not field_name: + return + page = _coerce_positive_int(page_value) + if page is None: + return + section = str(field_name).split("-", 1)[0].strip() or "未分组" + pageMap.setdefault(section, set()).add(page) + + if RunRow: + fieldRows = ( + await Session.execute( + text( + """ + SELECT field_name, meta_json, raw_value_json + FROM leaudit_field_results + WHERE run_id = :run_id + AND document_id = :document_id + ORDER BY id ASC + """ + ), + {"run_id": int(RunRow["id"]), "document_id": DocumentId}, + ) + ).mappings().all() + for row in fieldRows: + metaJson = row["meta_json"] if isinstance(row["meta_json"], dict) else {} + rawValueJson = row["raw_value_json"] if isinstance(row["raw_value_json"], dict) else {} + position = metaJson.get("position") if isinstance(metaJson.get("position"), dict) else {} + add_page(row["field_name"], position.get("pageNum") or position.get("page")) + rawPosition = rawValueJson.get("position") if isinstance(rawValueJson.get("position"), dict) else {} + add_page(row["field_name"], rawPosition.get("pageNum") or rawPosition.get("page")) + + ruleRows = ( + await Session.execute( + text( + """ + SELECT extracted_fields, field_positions + FROM leaudit_rule_results + WHERE run_id = :run_id + AND document_id = :document_id + ORDER BY id ASC + """ + ), + {"run_id": int(RunRow["id"]), "document_id": DocumentId}, + ) + ).mappings().all() + for row in ruleRows: + extractedFields = row["extracted_fields"] if isinstance(row["extracted_fields"], dict) else {} + fieldPositions = row["field_positions"] if isinstance(row["field_positions"], dict) else {} + for field_name, field_payload in extractedFields.items(): + if isinstance(field_payload, dict): + add_page(field_name, field_payload.get("page")) + add_page(field_name, field_payload.get("pageNum")) + position = fieldPositions.get(field_name) if isinstance(fieldPositions, dict) else None + add_page(field_name, _extract_review_page(position)) + + normalizedMap = { + key: {"pages": sorted(value)} + for key, value in pageMap.items() + if value + } + return { + "__meta": {"page_offset": 0, "page_count": PageCount}, + "ocr_result": normalizedMap, + } + + async def _loadComparisonDocument(self, Session, DocumentId: int) -> dict[str, Any]: + """读取合同结构比对结果;缺表或空数据时降级。""" + if not await self._tableExists(Session, "contract_structure_comparison"): + return {"template_contract_path": ""} + + columns = await self._loadTableColumns(Session, "contract_structure_comparison") + selectColumns: list[str] = ["id", "document_id"] + optionalColumns = [ + "template_contract_path", + "comparison_results", + "created_at", + "updated_at", + "comparison_id", + ] + for column in optionalColumns: + if column in columns: + selectColumns.append(column) + + row = ( + await Session.execute( + text( + f""" + SELECT {', '.join(selectColumns)} + FROM contract_structure_comparison + WHERE document_id = :document_id + ORDER BY id DESC + LIMIT 1 + """ + ), + {"document_id": DocumentId}, + ) + ).mappings().first() + + if not row: + return {"template_contract_path": ""} + + payload = dict(row) + rawResults = payload.get("comparison_results") + if isinstance(rawResults, str): + import json + + try: + payload["comparison_results"] = json.loads(rawResults) + except Exception: + payload["comparison_results"] = None + + if "comparisonId" not in payload: + payload["comparisonId"] = payload.get("comparison_id") or payload.get("id") + if "template_contract_path" not in payload: + payload["template_contract_path"] = "" + return payload + + async def _loadScoringProposals(self, Session, DocumentId: int) -> list[dict[str, Any]]: + """读取交叉评分提案;缺表时降级为空。""" + if not await self._tableExists(Session, "cross_scoring_proposals"): + return [] + + columns = await self._loadTableColumns(Session, "cross_scoring_proposals") + selectColumns = [ + column + for column in ( + "id", + "evaluation_result_id", + "proposer_id", + "proposed_score", + "reason", + "status", + "created_at", + "updated_at", + "document_id", + "deleted_at", + ) + if column in columns + ] + if not selectColumns: + return [] + + filterClauses = ["document_id = :document_id"] + if "deleted_at" in columns: + filterClauses.append("deleted_at IS NULL") + + rows = ( + await Session.execute( + text( + f""" + SELECT {', '.join(selectColumns)} + FROM cross_scoring_proposals + WHERE {' AND '.join(filterClauses)} + ORDER BY id DESC + """ + ), + {"document_id": DocumentId}, + ) + ).mappings().all() + return [dict(row) for row in rows] + + async def _loadReviewPointResults( + self, + Session, + Detail: DocumentDetailVO, + RunId: int, + ) -> list[ReviewPointResultVO]: + """读取评查运行下的规则结果并转换成前端详情页结构。""" + rows = ( + await Session.execute( + text( + """ + SELECT + rr.id, + rr.rule_id, + rr.rule_name, + rr.risk, + rr.score, + rr.passed, + rr.status, + rr.skip_reason, + rr.pass_message, + rr.fail_message, + rr.stages, + rr.extracted_fields, + rr.field_positions, + rr.remediation, + rr.rule_meta, + rr.updated_at, + a.id AS audit_id, + a.edit_audit_status, + a.override_result, + a.message AS audit_message + FROM leaudit_rule_results rr + LEFT JOIN leaudit_review_point_audits a + ON a.rule_result_id = rr.id + WHERE rr.run_id = :run_id + AND rr.document_id = :document_id + ORDER BY id ASC + """ + ), + {"run_id": RunId, "document_id": Detail.documentId}, + ) + ).mappings().all() + + result: list[ReviewPointResultVO] = [] + for row in rows: + extractedFields = row["extracted_fields"] if isinstance(row["extracted_fields"], dict) else {} + fieldPositions = row["field_positions"] if isinstance(row["field_positions"], dict) else {} + ruleMeta = row["rule_meta"] if isinstance(row["rule_meta"], dict) else {} + remediation = row["remediation"] if isinstance(row["remediation"], dict) else {} + rawStages = row["stages"] if isinstance(row["stages"], list) else [] + stages = _enrich_stage_payload_with_positions(rawStages, fieldPositions) + score = float(row["score"]) if row["score"] is not None else 0.0 + auditedResult = row["override_result"] if row["edit_audit_status"] == 1 else None + effectivePassed = auditedResult if auditedResult is not None else row["passed"] + effectiveStatus = row["status"] + if auditedResult is not None: + effectiveStatus = "passed" if auditedResult else ("failed" if row["status"] != "error" else "error") + resultFlag = None + if effectiveStatus not in {"skipped", "not_applicable"}: + resultFlag = bool(effectivePassed) if effectivePassed is not None else None + title = self._buildReviewPointTitle(row) + if row["edit_audit_status"] == 1 and row["audit_message"]: + title = str(row["audit_message"]) + + result.append( + ReviewPointResultVO( + id=int(row["id"]), + documentId=str(Detail.documentId), + pointId=int(row["id"]), + editAuditStatusId=int(row["audit_id"]) if row["audit_id"] is not None else "", + editAuditStatus=int(row["edit_audit_status"] or 0), + editAuditStatusMessage=str(row["audit_message"] or ""), + title=title, + pointName=str(row["rule_name"] or row["rule_id"] or ""), + pointCode=str(row["rule_id"] or ""), + groupName=str(ruleMeta.get("group") or Detail.groupName or Detail.typeName or "未分组"), + status=_map_rule_status(effectiveStatus, effectivePassed, row["risk"]), + content=_normalize_review_content(extractedFields, fieldPositions), + contentPage=_normalize_review_content_pages(extractedFields, fieldPositions), + suggestion=_extract_review_suggestion(remediation, row["fail_message"], row["pass_message"]), + postAction="manual" if effectivePassed is not True else "", + actionContent=remediation or {}, + legalBasis=ruleMeta.get("references_laws") or {}, + evaluationConfig={"risk": row["risk"], "status": effectiveStatus, "ruleMeta": ruleMeta}, + score=score, + finalScore=score if resultFlag is True else (0.0 if resultFlag is False else None), + machineScore=score, + result=resultFlag, + failMessage=str(row["fail_message"] or ""), + passMessage=str(row["pass_message"] or ""), + evaluatedPointResultsLog={"rules": _normalize_review_stage_logs(stages)}, + ) + ) + return result + + def _buildReviewPointStats(self, ReviewPoints: list[ReviewPointResultVO]) -> ReviewPointStatsVO: + """按旧详情页口径汇总统计。""" + stats = ReviewPointStatsVO(total=len(ReviewPoints), success=0, warning=0, error=0, score=0) + for item in ReviewPoints: + if item.status == "success": + stats.success += 1 + elif item.status == "warning": + stats.warning += 1 + elif item.status == "error": + stats.error += 1 + stats.score += float(item.score or 0) + return stats + + def _buildReviewInfo( + self, + RunRow: dict[str, Any] | None, + ReviewPoints: list[ReviewPointResultVO], + Stats: ReviewPointStatsVO, + ) -> ReviewPointInfoVO: + """构建详情页右侧的评查摘要。""" + groupNames = [item.groupName for item in ReviewPoints if item.groupName] + uniqueGroups = list(dict.fromkeys(groupNames)) + reviewTime = "" + reviewModel = "DeepSeek" + if RunRow: + reviewTime = _format_display_datetime(RunRow.get("finished_at") or RunRow.get("updated_at")) + reviewModel = str(RunRow.get("llm_model") or "DeepSeek") + issueCount = Stats.warning + Stats.error + return ReviewPointInfoVO( + reviewTime=reviewTime, + reviewModel=reviewModel, + ruleGroup="、".join(uniqueGroups), + result="warning" if issueCount > 0 else "success", + issueCount=issueCount, + ) + + def _buildReviewPointTitle(self, Row: dict[str, Any]) -> str: + """生成详情页展示标题。""" + if Row.get("passed") is True: + return str(Row.get("pass_message") or Row.get("rule_name") or Row.get("rule_id") or "") + if Row.get("fail_message"): + return str(Row["fail_message"]) + if Row.get("skip_reason"): + return str(Row["skip_reason"]) + return str(Row.get("rule_name") or Row.get("rule_id") or "") + async def _syncRuleBindings(self, Session, DocTypeId: int, RuleSetIds: list[int], Region: str = "default") -> None: """全量替换规则绑定。""" await Session.execute( @@ -686,6 +2198,251 @@ async def _find_latest_version_candidate( return dict(row) if row else None +def _map_rule_status(status: str | None, passed: bool | None, risk: str | None) -> str: + """把新规则结果状态转换成旧详情页口径。""" + normalized = (status or "").strip().lower() + if normalized in {"skipped", "not_applicable"}: + return "notApplicable" + if passed is True or normalized in {"passed", "success"}: + return "success" + if normalized in {"error", "failed", "fail"}: + return "error" if (risk or "").strip().lower() == "high" else "warning" + return "warning" if passed is False else "success" + + +def _normalize_review_content( + extracted_fields: dict[str, Any], + field_positions: dict[str, Any], +) -> dict[str, dict[str, Any]]: + """归一化字段内容,兼容旧详情页结构。""" + content: dict[str, dict[str, Any]] = {} + for field_name, raw_value in extracted_fields.items(): + position = field_positions.get(field_name) if isinstance(field_positions, dict) else None + page = _extract_review_page(position) + char_positions = _build_char_positions(position, raw_value) + content[str(field_name)] = { + "page": page if page is not None else "", + "value": _normalize_review_value(raw_value), + "char_positions": char_positions, + } + return content + + +def _normalize_review_content_pages( + extracted_fields: dict[str, Any], + field_positions: dict[str, Any], +) -> dict[str, str]: + """提取字段页码映射。""" + pages: dict[str, str] = {} + for field_name in extracted_fields.keys(): + position = field_positions.get(field_name) if isinstance(field_positions, dict) else None + page = _extract_review_page(position) + pages[str(field_name)] = str(page) if page is not None else "" + return pages + + +def _normalize_review_value(raw_value: Any) -> Any: + """尽量保留原值,同时把复杂对象压到旧前端能识别的结构。""" + if isinstance(raw_value, dict): + if "value" in raw_value: + return raw_value["value"] + if "text" in raw_value: + return raw_value["text"] + if "value_text" in raw_value: + return raw_value["value_text"] + return raw_value + return raw_value + + +def _extract_review_page(position: Any) -> int | None: + """从字段位置信息中提取页码。""" + if not isinstance(position, dict): + return None + page_num = position.get("pageNum") + if isinstance(page_num, int): + return page_num + if isinstance(page_num, str) and page_num.isdigit(): + return int(page_num) + page_nums = position.get("pageNums") + if isinstance(page_nums, list) and page_nums: + first = page_nums[0] + if isinstance(first, int): + return first + if isinstance(first, str) and first.isdigit(): + return int(first) + match_position = position.get("matchPosition") + if isinstance(match_position, dict): + for key in ("page", "pageNum"): + value = match_position.get(key) + if isinstance(value, int): + return value + if isinstance(value, str) and value.isdigit(): + return int(value) + return None + + +def _extract_review_suggestion(remediation: dict[str, Any], fail_message: Any, pass_message: Any) -> str: + """从 remediation 中提取适合旧详情页展示的一句建议。""" + suggestions = remediation.get("suggestions") + if isinstance(suggestions, list) and suggestions: + first = suggestions[0] + if isinstance(first, str): + return first + if isinstance(first, dict): + return str(first.get("text") or first.get("message") or first.get("suggestion") or "") + actions = remediation.get("actions") + if isinstance(actions, list) and actions: + first = actions[0] + if isinstance(first, str): + return first + if isinstance(first, dict): + return str(first.get("text") or first.get("message") or first.get("action") or "") + if fail_message: + return str(fail_message) + if pass_message: + return str(pass_message) + return "" + + +def _normalize_review_stage_logs(stages: list[Any]) -> list[dict[str, Any]]: + """把 stages 压成旧前端的 evaluatedPointResultsLog.rules。""" + result: list[dict[str, Any]] = [] + for index, stage in enumerate(stages): + if not isinstance(stage, dict): + continue + result.append( + { + "id": str(stage.get("id") or index), + "type": str(stage.get("type") or stage.get("stage") or stage.get("name") or "rule"), + "res": stage.get("passed"), + "config": stage, + } + ) + return result + + +def _enrich_stage_payload_with_positions(payload: Any, field_positions: dict[str, Any]) -> Any: + """递归给 stage payload 注入最小可用 char_positions。""" + if isinstance(payload, list): + return [_enrich_stage_payload_with_positions(item, field_positions) for item in payload] + + if not isinstance(payload, dict): + return payload + + enriched: dict[str, Any] = {} + for key, value in payload.items(): + enriched[key] = _enrich_stage_payload_with_positions(value, field_positions) + + if isinstance(value, dict): + position = field_positions.get(key) if isinstance(field_positions, dict) else None + if position: + if ("page" not in enriched[key] or not enriched[key].get("page")): + page = _extract_review_page(position) + if page is not None: + enriched[key]["page"] = page + if "char_positions" not in enriched[key]: + char_positions = _build_char_positions(position, value.get("value")) + if char_positions: + enriched[key]["char_positions"] = char_positions + return enriched + + +def _build_char_positions(position: Any, raw_value: Any) -> list[dict[str, Any]] | None: + """把后端已有 bbox / matchPosition 压成前端高亮所需的 char_positions。""" + if not isinstance(position, dict): + return None + + bbox = position.get("bbox") + if bbox is None and isinstance(position.get("matchPosition"), dict): + bbox = position["matchPosition"].get("bbox") + + normalized_box = _normalize_bbox_to_points(bbox) + if not normalized_box: + return None + + value_text = _stringify_char_position_value(raw_value) + return [ + { + "box": normalized_box, + "char": value_text, + "score": 1, + } + ] + + +def _normalize_bbox_to_points(bbox: Any) -> list[list[float]] | None: + """兼容多种 bbox 形态,统一转成四点坐标。""" + if not bbox: + return None + + if ( + isinstance(bbox, list) + and len(bbox) == 4 + and all(isinstance(item, (list, tuple)) and len(item) >= 2 for item in bbox) + ): + return [[float(point[0]), float(point[1])] for point in bbox] + + if isinstance(bbox, (list, tuple)) and len(bbox) >= 4 and all(isinstance(item, (int, float)) for item in bbox[:4]): + x1, y1, x2, y2 = [float(item) for item in bbox[:4]] + return [[x1, y1], [x2, y1], [x2, y2], [x1, y2]] + + if isinstance(bbox, dict): + if all(key in bbox for key in ("x1", "y1", "x2", "y2")): + x1 = float(bbox["x1"]) + y1 = float(bbox["y1"]) + x2 = float(bbox["x2"]) + y2 = float(bbox["y2"]) + return [[x1, y1], [x2, y1], [x2, y2], [x1, y2]] + if all(key in bbox for key in ("left", "top", "right", "bottom")): + x1 = float(bbox["left"]) + y1 = float(bbox["top"]) + x2 = float(bbox["right"]) + y2 = float(bbox["bottom"]) + return [[x1, y1], [x2, y1], [x2, y2], [x1, y2]] + return None + + +def _stringify_char_position_value(raw_value: Any) -> str: + """将字段值压成前端高亮文本。""" + normalized = _normalize_review_value(raw_value) + if normalized is None: + return "" + if isinstance(normalized, str): + return normalized + if isinstance(normalized, (int, float, bool)): + return str(normalized) + return "" + + +def _format_display_datetime(value: Any) -> str: + """格式化详情页展示时间。""" + if isinstance(value, datetime): + return value.strftime("%Y-%m-%d %H:%M:%S") + return "" + + +def _format_iso_datetime(value: Any) -> str | None: + """格式化 ISO 时间字符串。""" + if isinstance(value, datetime): + return value.isoformat() + return None + + +def _coerce_positive_int(value: Any) -> int | None: + """把各种页码值安全转成正整数。""" + if isinstance(value, int): + return value if value > 0 else None + if isinstance(value, float): + int_value = int(value) + return int_value if int_value > 0 else None + if isinstance(value, str): + match = re.search(r"\d+", value) + if match: + int_value = int(match.group(0)) + return int_value if int_value > 0 else None + return None + + def _normalize_speed(speed: str | None) -> str: """Normalize front-end speed selection to urgent/normal.""" normalized = (speed or "").strip().lower()