"""Govdoc 公文模块服务实现。""" from __future__ import annotations import hashlib import json import mimetypes import time import uuid from dataclasses import dataclass from datetime import date, datetime from pathlib import Path from typing import Any from fastapi import UploadFile from sqlalchemy import bindparam, text from fastapi_common.fastapi_common_logger import logger from fastapi_common.fastapi_common_sqlalchemy.database import GetAsyncSession from fastapi_common.fastapi_common_storage.oss_path_utils import OssPathUtils from fastapi_common.fastapi_common_web.domain.responses import StatusCodeEnum from fastapi_common.fastapi_common_web.exception.LeauditException import LeauditException from fastapi_modules.fastapi_leaudit.govdoc_bridge.storage_adapter import StorageAdapter from fastapi_modules.fastapi_leaudit.govdoc_engine.engine.result import AuditResult, AuditSummary, CheckedRule, OutlineNode, StructureItem from fastapi_modules.fastapi_leaudit.govdoc_bridge.tasks import dispatch_govdoc_task from fastapi_modules.fastapi_leaudit.govdoc_engine.parser.docx_parser import parse_docx from fastapi_modules.fastapi_leaudit.govdoc_engine.models import Finding, Location from fastapi_modules.fastapi_leaudit.govdoc_engine.reporter.html_paragraph import paragraphs_to_html from fastapi_modules.fastapi_leaudit.govdoc_engine.reporter.html_renderer import render_html from fastapi_modules.fastapi_leaudit.models import LeauditDocument, LeauditDocumentFile from fastapi_modules.fastapi_leaudit.services import IGovdocService, IOssService from fastapi_modules.fastapi_leaudit.services.impl.ossServiceImpl import OssServiceImpl @dataclass(frozen=True) class _GovdocDocumentRow: documentId: int region: str processingStatus: str currentRunId: int | None versionGroupKey: str | None versionNo: int totalVersions: int previousVersionId: int | None rootVersionId: int | None isLatestVersion: bool normalizedName: str | None createdAt: Any updatedAt: Any fileId: int fileName: str fileExt: str | None mimeType: str | None fileSize: int | None ossUrl: str | None createdBy: int | None resultStatus: str | None totalScore: float | None passedCount: int | None failedCount: int | None skippedCount: int | None findingCount: int errorCount: int warningCount: int infoCount: int rulesPath: str | None hasHtmlReport: bool hasDocxReport: bool class GovdocServiceImpl(IGovdocService): """公文处理与格式审查服务实现。""" def __init__(self, OssService: IOssService | None = None) -> None: self.OssService = OssService or OssServiceImpl() self.Storage = StorageAdapter() def _parse_date_filter(self, value: str | None, field_name: str) -> date | None: if value is None: return None normalized = value.strip() if not normalized: return None try: return date.fromisoformat(normalized) except ValueError as exc: raise LeauditException( StatusCodeEnum.HTTP_400_BAD_REQUEST, f"{field_name} 格式非法,应为 YYYY-MM-DD", ) from exc # ── 文档 ────────────────────────────────────────────── async def UploadDocument( self, file: UploadFile, typeId: int | None = None, region: str = "default", autoRun: bool = True, speed: str = "normal", ruleVersionId: int | None = None, createdBy: int | None = None, ) -> dict[str, Any]: if createdBy is None: raise LeauditException(StatusCodeEnum.HTTP_401_UNAUTHORIZED, "当前用户未登录") if file is None or not file.filename: raise LeauditException(StatusCodeEnum.HTTP_400_BAD_REQUEST, "上传文件不能为空") content = await file.read() if not content: raise LeauditException(StatusCodeEnum.HTTP_400_BAD_REQUEST, "上传文件内容不能为空") normalizedRegion = (region or "default").strip() or "default" fileName = file.filename fileExt = Path(fileName).suffix.lstrip(".").lower() or None if fileExt != "docx": raise LeauditException( StatusCodeEnum.HTTP_400_BAD_REQUEST, "当前内部公文模块仅支持上传 DOCX 文件", ) mimeType = file.content_type or mimetypes.guess_type(fileName)[0] or "application/octet-stream" fileSha256 = hashlib.sha256(content).hexdigest() uploadedAt = datetime.now() normalizedName = self._normalize_document_name(fileName) async with GetAsyncSession() as session: await self._ensureGovdocSchema(session) await self._backfill_missing_version_groups(session) currentUser = await self._getCurrentUserContext(createdBy) resolvedRegion = self._resolve_upload_region(currentUser, normalizedRegion) latestCandidate = await self._find_latest_version_candidate( session, region=resolvedRegion, normalizedName=normalizedName, fileExt=fileExt, ) if latestCandidate and not latestCandidate.get("version_group_key"): latestCandidate = await self._backfill_legacy_version_chain( session, region=resolvedRegion, normalizedName=normalizedName, fileExt=fileExt, ) previousVersionId: int | None = None rootVersionId: int | None = None versionGroupKey: str | None = None versionNo = 1 if latestCandidate: previousVersionId = int(latestCandidate["document_id"]) rootVersionId = int(latestCandidate["root_version_id"] or latestCandidate["document_id"]) versionGroupKey = str(latestCandidate["version_group_key"] or "") versionNo = int(latestCandidate["version_no"] or 0) + 1 previousDocument = await session.get(LeauditDocument, previousVersionId) if previousDocument is not None: previousDocument.isLatestVersion = False else: versionGroupKey = uuid.uuid4().hex document = await LeauditDocument.create_new( session, bizDocumentId=time.time_ns(), typeId=typeId, groupId=None, region=resolvedRegion, processingStatus="waiting", currentRunId=None, versionGroupKey=versionGroupKey, versionNo=versionNo, previousVersionId=previousVersionId, rootVersionId=rootVersionId, isLatestVersion=True, normalizedName=normalizedName, reviewScope="govdoc", ) if document.rootVersionId is None: document.rootVersionId = document.Id await session.flush() objectKey = OssPathUtils.BuildBusinessDocKey( Region=resolvedRegion, TypeCode="govdoc", DocumentId=document.Id, Version=f"v{document.versionNo}", FileRole="original", FileName=fileName, Year=uploadedAt.year, Month=uploadedAt.month, ) ossUrl = await self.OssService.UploadBytes( ObjectKey=objectKey, Content=content, ContentType=mimeType, ) documentFile = LeauditDocumentFile( documentId=document.Id, fileRole="original", fileName=fileName, fileExt=fileExt, mimeType=mimeType, fileSize=len(content), sha256=fileSha256, localPath=None, ossUrl=ossUrl, storageProvider="minio", isActive=True, createdBy=createdBy, ) session.add(documentFile) await session.flush() await session.execute( text( """ UPDATE leaudit_documents SET engine_type = 'govdoc' WHERE id = :document_id """ ), {"document_id": document.Id}, ) await session.commit() await session.refresh(document) await session.refresh(documentFile) runPayload: dict[str, Any] | None = None shouldAutoRun = bool(autoRun) if shouldAutoRun: runPayload = await self.CreateRun( documentId=document.Id, ruleVersionId=ruleVersionId, speed=speed, force=False, triggerUserId=createdBy, ) return { "documentId": document.Id, "fileId": documentFile.Id, "fileName": documentFile.fileName, "region": resolvedRegion, "engineType": "govdoc", "processingStatus": "processing" if runPayload else (document.processingStatus or "waiting"), "autoRunTriggered": shouldAutoRun, "latestRunId": runPayload["runId"] if runPayload else None, "run": runPayload, } async def ListDocuments( self, page: int = 1, pageSize: int = 20, keyword: str | None = None, fileExt: str | None = None, region: str | None = None, status: str | None = None, resultStatus: str | None = None, createdBy: int | None = None, dateFrom: str | None = None, dateTo: str | None = None, userId: int | None = None, ) -> dict[str, Any]: if userId is None: raise LeauditException(StatusCodeEnum.HTTP_401_UNAUTHORIZED, "当前用户未登录") currentUser = await self._getCurrentUserContext(userId) page = max(1, int(page)) pageSize = max(1, min(int(pageSize), 100)) offset = (page - 1) * pageSize async with GetAsyncSession() as session: await self._ensureGovdocSchema(session) params: dict[str, Any] = { "limit": pageSize, "offset": offset, } filters = [ "d.deleted_at IS NULL", "f.deleted_at IS NULL", "f.is_active = true", "f.file_role = 'original'", "COALESCE(d.engine_type, 'leaudit') = 'govdoc'", ] filters.extend( self._buildDocumentScopeFilters( CurrentUserId=userId, CurrentUser=currentUser, Params=params, DocumentAlias="d", FileAlias="f", RequestedRegion=region, RequestedUserId=createdBy, ) ) if keyword: filters.append("(f.file_name ILIKE :keyword OR COALESCE(d.normalized_name, '') ILIKE :keyword)") params["keyword"] = f"%{keyword.strip()}%" if fileExt: normalizedExt = fileExt.strip().lstrip(".").lower() if normalizedExt: filters.append("LOWER(COALESCE(f.file_ext, '')) = :file_ext") params["file_ext"] = normalizedExt if status: filters.append("COALESCE(d.processing_status, '') = :status") params["status"] = status.strip() if resultStatus: filters.append("COALESCE(gr.result_status, '') = :result_status") params["result_status"] = resultStatus.strip() parsedDateFrom = self._parse_date_filter(dateFrom, "dateFrom") parsedDateTo = self._parse_date_filter(dateTo, "dateTo") if parsedDateFrom: filters.append("d.created_at::date >= :date_from") params["date_from"] = parsedDateFrom if parsedDateTo: filters.append("d.created_at::date <= :date_to") params["date_to"] = parsedDateTo whereClause = " AND ".join(filters) async with GetAsyncSession() as session: await self._backfill_missing_version_groups(session) baseSelect = f""" WITH effective_docs AS ( SELECT d.id AS document_id, COALESCE(d.region, 'default') AS region, COALESCE(d.processing_status, 'waiting') AS processing_status, d.current_run_id, COALESCE(NULLIF(d.version_group_key, ''), fallback_vc.derived_version_group_key, '') AS version_group_key, CASE WHEN COALESCE(d.version_group_key, '') = '' THEN COALESCE(fallback_vc.derived_version_no, 1) ELSE COALESCE(NULLIF(d.version_no, 0), 1) END AS version_no, CASE WHEN COALESCE(d.version_group_key, '') = '' THEN fallback_vc.derived_previous_version_id ELSE d.previous_version_id END AS previous_version_id, CASE WHEN COALESCE(d.version_group_key, '') = '' THEN COALESCE(fallback_vc.derived_root_version_id, d.id) ELSE COALESCE(d.root_version_id, d.id) END AS root_version_id, CASE WHEN COALESCE(d.version_group_key, '') = '' THEN COALESCE(fallback_vc.derived_is_latest_version, COALESCE(d.is_latest_version, true)) ELSE COALESCE(d.is_latest_version, true) END AS is_latest_version, COALESCE(d.normalized_name, '') AS normalized_name, COALESCE(vc.total_versions, fallback_vc.total_versions, 1) AS total_versions, fallback_vc.derived_version_no, d.created_at, d.updated_at, f.id AS file_id, f.file_name, f.file_ext, f.mime_type, f.file_size, f.oss_url, f.created_by, gr.result_status, gr.total_score, gr.passed_count, gr.failed_count, gr.skipped_count, gr.rules_path, COALESCE(fc.finding_count, 0) AS finding_count, COALESCE(fc.error_count, 0) AS error_count, COALESCE(fc.warning_count, 0) AS warning_count, COALESCE(fc.info_count, 0) AS info_count, EXISTS( SELECT 1 FROM govdoc_report_artifacts gra WHERE gra.run_id = d.current_run_id AND gra.artifact_type = 'html_report' AND gra.deleted_at IS NULL ) AS has_html_report, EXISTS( SELECT 1 FROM govdoc_report_artifacts gra WHERE gra.run_id = d.current_run_id AND gra.artifact_type = 'annotated_docx' AND gra.deleted_at IS NULL ) AS has_docx_report FROM leaudit_documents d JOIN leaudit_document_files f ON f.document_id = d.id AND f.is_active = true AND f.file_role = 'original' AND f.deleted_at IS NULL LEFT JOIN govdoc_runs gr ON gr.id = d.current_run_id LEFT JOIN ( SELECT run_id, COUNT(*) FILTER (WHERE result = 'fail') AS finding_count, COUNT(*) FILTER (WHERE result = 'fail' AND severity = 'error') AS error_count, COUNT(*) FILTER (WHERE result = 'fail' AND severity = 'warning') AS warning_count, COUNT(*) FILTER (WHERE result = 'fail' AND severity = 'info') AS info_count FROM govdoc_rule_results WHERE deleted_at IS NULL GROUP BY run_id ) fc ON fc.run_id = d.current_run_id LEFT JOIN ( SELECT version_group_key, COUNT(*) AS total_versions FROM leaudit_documents WHERE deleted_at IS NULL AND COALESCE(version_group_key, '') <> '' GROUP BY version_group_key ) vc ON vc.version_group_key = d.version_group_key LEFT JOIN ( SELECT d2.id AS document_id, COUNT(*) OVER ( PARTITION BY d2.region, COALESCE(d2.normalized_name, ''), COALESCE(f2.file_ext, '') ) AS total_versions, ROW_NUMBER() OVER ( PARTITION BY d2.region, COALESCE(d2.normalized_name, ''), COALESCE(f2.file_ext, '') ORDER BY d2.created_at ASC, d2.id ASC ) AS derived_version_no, LAG(d2.id) OVER ( PARTITION BY d2.region, COALESCE(d2.normalized_name, ''), COALESCE(f2.file_ext, '') ORDER BY d2.created_at ASC, d2.id ASC ) AS derived_previous_version_id, FIRST_VALUE(d2.id) OVER ( PARTITION BY d2.region, COALESCE(d2.normalized_name, ''), COALESCE(f2.file_ext, '') ORDER BY d2.created_at ASC, d2.id ASC ) AS derived_root_version_id, CASE WHEN ROW_NUMBER() OVER ( PARTITION BY d2.region, COALESCE(d2.normalized_name, ''), COALESCE(f2.file_ext, '') ORDER BY d2.created_at DESC, d2.id DESC ) = 1 THEN true ELSE false END AS derived_is_latest_version, md5(CONCAT_WS('|', d2.region, COALESCE(d2.normalized_name, ''), COALESCE(f2.file_ext, ''))) AS derived_version_group_key FROM leaudit_documents d2 JOIN leaudit_document_files f2 ON f2.document_id = d2.id AND f2.is_active = true AND f2.file_role = 'original' AND f2.deleted_at IS NULL WHERE d2.deleted_at IS NULL AND d2.review_scope = 'govdoc' AND COALESCE(d2.engine_type, 'leaudit') = 'govdoc' ) fallback_vc ON fallback_vc.document_id = d.id WHERE {whereClause} ) """ rows = ( await session.execute( text( f""" {baseSelect} SELECT * FROM effective_docs WHERE is_latest_version = true ORDER BY created_at DESC, document_id DESC LIMIT :limit OFFSET :offset """ ), params, ) ).mappings().all() total = int( ( await session.execute( text( f""" {baseSelect} SELECT COUNT(1) FROM effective_docs WHERE is_latest_version = true """ ), params, ) ).scalar_one() ) historyRowsByGroup: dict[str, list[dict[str, Any]]] = {} versionGroupKeys = [ str(row["version_group_key"]) for row in rows if row.get("version_group_key") and int(row.get("total_versions") or 1) > 1 ] if versionGroupKeys: historyRows = ( await session.execute( text( f""" {baseSelect} SELECT * FROM effective_docs WHERE version_group_key IN :version_group_keys AND is_latest_version = false ORDER BY created_at DESC, document_id DESC """ ).bindparams(bindparam("version_group_keys", expanding=True)), {"version_group_keys": versionGroupKeys, **params}, ) ).mappings().all() for historyRow in historyRows: groupKey = str(historyRow.get("version_group_key") or "") if not groupKey: continue historyRowsByGroup.setdefault(groupKey, []).append(dict(historyRow)) items = [] for row in rows: mapped = self._map_document_row(row) item = await self._build_document_list_item(mapped) groupKey = mapped.versionGroupKey or "" historyItems = [ await self._build_document_list_item(self._map_document_row(historyRow)) for historyRow in historyRowsByGroup.get(groupKey, []) ] item["historyCount"] = len(historyItems) item["historyVersions"] = historyItems items.append(item) return {"items": items, "total": total, "page": page, "pageSize": pageSize} async def GetDocumentDetail(self, documentId: int, userId: int | None = None) -> dict[str, Any]: if userId is None: raise LeauditException(StatusCodeEnum.HTTP_401_UNAUTHORIZED, "当前用户未登录") currentUser = await self._getCurrentUserContext(userId) params: dict[str, Any] = {"document_id": documentId, "limit": 20} filters = [ "d.id = :document_id", "d.deleted_at IS NULL", "f.deleted_at IS NULL", "f.is_active = true", "f.file_role = 'original'", "COALESCE(d.engine_type, 'leaudit') = 'govdoc'", ] filters.extend( self._buildDocumentScopeFilters( CurrentUserId=userId, CurrentUser=currentUser, Params=params, DocumentAlias="d", FileAlias="f", ) ) whereClause = " AND ".join(filters) async with GetAsyncSession() as session: await self._ensureGovdocSchema(session) await self._backfill_missing_version_groups(session) row = ( await session.execute( text( f""" SELECT d.id AS document_id, COALESCE(d.region, 'default') AS region, COALESCE(d.processing_status, 'waiting') AS processing_status, d.current_run_id, COALESCE(NULLIF(d.version_group_key, ''), fallback_vc.derived_version_group_key, '') AS version_group_key, CASE WHEN COALESCE(d.version_group_key, '') = '' THEN COALESCE(fallback_vc.derived_version_no, 1) ELSE COALESCE(NULLIF(d.version_no, 0), 1) END AS version_no, CASE WHEN COALESCE(d.version_group_key, '') = '' THEN fallback_vc.derived_previous_version_id ELSE d.previous_version_id END AS previous_version_id, CASE WHEN COALESCE(d.version_group_key, '') = '' THEN COALESCE(fallback_vc.derived_root_version_id, d.id) ELSE COALESCE(d.root_version_id, d.id) END AS root_version_id, CASE WHEN COALESCE(d.version_group_key, '') = '' THEN COALESCE(fallback_vc.derived_is_latest_version, COALESCE(d.is_latest_version, true)) ELSE COALESCE(d.is_latest_version, true) END AS is_latest_version, COALESCE(d.normalized_name, '') AS normalized_name, COALESCE(vc.total_versions, fallback_vc.total_versions, 1) AS total_versions, fallback_vc.derived_version_no, d.created_at, d.updated_at, f.id AS file_id, f.file_name, f.file_ext, f.mime_type, f.file_size, f.oss_url, f.created_by, gr.result_status, gr.total_score, gr.passed_count, gr.failed_count, gr.skipped_count, gr.rules_path, COALESCE(fc.finding_count, 0) AS finding_count, COALESCE(fc.error_count, 0) AS error_count, COALESCE(fc.warning_count, 0) AS warning_count, COALESCE(fc.info_count, 0) AS info_count, EXISTS( SELECT 1 FROM govdoc_report_artifacts gra WHERE gra.run_id = d.current_run_id AND gra.artifact_type = 'html_report' AND gra.deleted_at IS NULL ) AS has_html_report, EXISTS( SELECT 1 FROM govdoc_report_artifacts gra WHERE gra.run_id = d.current_run_id AND gra.artifact_type = 'annotated_docx' AND gra.deleted_at IS NULL ) AS has_docx_report FROM leaudit_documents d JOIN leaudit_document_files f ON f.document_id = d.id AND f.is_active = true AND f.file_role = 'original' AND f.deleted_at IS NULL LEFT JOIN govdoc_runs gr ON gr.id = d.current_run_id LEFT JOIN ( SELECT run_id, COUNT(*) FILTER (WHERE result = 'fail') AS finding_count, COUNT(*) FILTER (WHERE result = 'fail' AND severity = 'error') AS error_count, COUNT(*) FILTER (WHERE result = 'fail' AND severity = 'warning') AS warning_count, COUNT(*) FILTER (WHERE result = 'fail' AND severity = 'info') AS info_count FROM govdoc_rule_results WHERE deleted_at IS NULL GROUP BY run_id ) fc ON fc.run_id = d.current_run_id LEFT JOIN ( SELECT version_group_key, COUNT(*) AS total_versions FROM leaudit_documents WHERE deleted_at IS NULL AND COALESCE(version_group_key, '') <> '' GROUP BY version_group_key ) vc ON vc.version_group_key = d.version_group_key LEFT JOIN ( SELECT d2.id AS document_id, COUNT(*) OVER ( PARTITION BY d2.region, COALESCE(d2.normalized_name, ''), COALESCE(f2.file_ext, '') ) AS total_versions, ROW_NUMBER() OVER ( PARTITION BY d2.region, COALESCE(d2.normalized_name, ''), COALESCE(f2.file_ext, '') ORDER BY d2.created_at ASC, d2.id ASC ) AS derived_version_no, LAG(d2.id) OVER ( PARTITION BY d2.region, COALESCE(d2.normalized_name, ''), COALESCE(f2.file_ext, '') ORDER BY d2.created_at ASC, d2.id ASC ) AS derived_previous_version_id, FIRST_VALUE(d2.id) OVER ( PARTITION BY d2.region, COALESCE(d2.normalized_name, ''), COALESCE(f2.file_ext, '') ORDER BY d2.created_at ASC, d2.id ASC ) AS derived_root_version_id, CASE WHEN ROW_NUMBER() OVER ( PARTITION BY d2.region, COALESCE(d2.normalized_name, ''), COALESCE(f2.file_ext, '') ORDER BY d2.created_at DESC, d2.id DESC ) = 1 THEN true ELSE false END AS derived_is_latest_version, md5(CONCAT_WS('|', d2.region, COALESCE(d2.normalized_name, ''), COALESCE(f2.file_ext, ''))) AS derived_version_group_key FROM leaudit_documents d2 JOIN leaudit_document_files f2 ON f2.document_id = d2.id AND f2.is_active = true AND f2.file_role = 'original' AND f2.deleted_at IS NULL WHERE d2.deleted_at IS NULL AND d2.review_scope = 'govdoc' AND COALESCE(d2.engine_type, 'leaudit') = 'govdoc' ) fallback_vc ON fallback_vc.document_id = d.id WHERE {whereClause} LIMIT 1 """ ), params, ) ).mappings().first() if not row: raise LeauditException(StatusCodeEnum.HTTP_404_NOT_FOUND, "公文文档不存在或无权访问") runs = ( await session.execute( text( """ SELECT id, document_id, status, phase, result_status, total_score, passed_count, failed_count, skipped_count, error_message, created_at, updated_at, started_at, finished_at FROM govdoc_runs WHERE document_id = :document_id AND deleted_at IS NULL ORDER BY id DESC LIMIT :limit """ ), params, ) ).mappings().all() artifactRows = ( await session.execute( text( """ SELECT run_id, artifact_type, file_name, file_ext, mime_type, oss_url, description FROM govdoc_report_artifacts WHERE run_id = ANY( SELECT id FROM govdoc_runs WHERE document_id = :document_id AND deleted_at IS NULL ) AND deleted_at IS NULL ORDER BY id DESC """ ), {"document_id": documentId}, ) ).mappings().all() mapped = self._map_document_row(row) runItems = [self._build_run_summary(item) for item in runs] latestRunId = mapped.currentRunId or (runItems[0]["runId"] if runItems else None) latestRun = next((item for item in runItems if item["runId"] == latestRunId), runItems[0] if runItems else None) artifactsByRun = self._group_artifacts_by_run(artifactRows) return { "documentId": mapped.documentId, "latestRunId": latestRunId, "document": { "documentId": mapped.documentId, "fileId": mapped.fileId, "filename": mapped.fileName, "fileExt": mapped.fileExt, "mimeType": mapped.mimeType, "fileSize": mapped.fileSize, "region": mapped.region, "processingStatus": mapped.processingStatus, "versionGroupKey": mapped.versionGroupKey, "versionNo": mapped.versionNo, "totalVersions": mapped.totalVersions, "previousVersionId": mapped.previousVersionId, "rootVersionId": mapped.rootVersionId, "isLatestVersion": mapped.isLatestVersion, "createdAt": self._iso(mapped.createdAt), "updatedAt": self._iso(mapped.updatedAt), }, "latestRun": latestRun, "currentRun": latestRun, "runs": runItems, "reports": artifactsByRun.get(int(latestRunId), {}) if latestRunId else {}, } async def UpdateDocument(self, documentId: int, body: dict[str, Any], userId: int | None = None) -> dict[str, Any]: raise LeauditException(StatusCodeEnum.HTTP_403_FORBIDDEN, "当前阶段暂不开放修改公文信息") async def DeleteDocument(self, documentId: int, userId: int | None = None) -> dict[str, Any]: if userId is None: raise LeauditException(StatusCodeEnum.HTTP_401_UNAUTHORIZED, "当前用户未登录") currentUser = await self._getCurrentUserContext(userId) params: dict[str, Any] = {"document_id": documentId} filters = ["d.id = :document_id", "d.deleted_at IS NULL", "f.is_active = true", "f.file_role = 'original'"] filters.extend( self._buildDocumentScopeFilters( CurrentUserId=userId, CurrentUser=currentUser, Params=params, DocumentAlias="d", FileAlias="f", ) ) whereClause = " AND ".join(filters) async with GetAsyncSession() as session: await self._ensureGovdocSchema(session) row = ( await session.execute( text( f""" SELECT d.id FROM leaudit_documents d JOIN leaudit_document_files f ON f.document_id = d.id AND f.is_active = true AND f.file_role = 'original' AND f.deleted_at IS NULL WHERE {whereClause} LIMIT 1 """ ), params, ) ).first() if not row: raise LeauditException(StatusCodeEnum.HTTP_404_NOT_FOUND, "公文文档不存在或无权访问") await session.execute( text("UPDATE leaudit_documents SET deleted_at = now(), updated_at = now() WHERE id = :document_id"), {"document_id": documentId}, ) await session.commit() return {"documentId": documentId, "deleted": True} # ── 审查运行 ────────────────────────────────────────── async def CreateRun( self, documentId: int, ruleVersionId: int | None = None, speed: str = "normal", force: bool = False, triggerUserId: int | None = None, ) -> dict[str, Any]: if triggerUserId is None: raise LeauditException(StatusCodeEnum.HTTP_401_UNAUTHORIZED, "当前用户未登录") async with GetAsyncSession() as session: await self._ensureGovdocSchema(session) currentUser = await self._getCurrentUserContext(triggerUserId) documentMeta = await self._get_document_for_run(documentId, triggerUserId, currentUser) if documentMeta.currentRunId and not force: currentRun = await self.GetRunStatus(documentMeta.currentRunId) if currentRun["status"] in {"pending", "processing"}: return { "runId": documentMeta.currentRunId, "documentId": documentId, "status": currentRun["status"], "phase": currentRun.get("phase"), "reused": True, } runId = await self.Storage.CreateRun( { "documentId": documentId, "documentFileId": documentMeta.fileId, "runNo": await self._next_run_no(documentId), "triggerSource": "upload" if not documentMeta.currentRunId else "manual", "triggerUserId": triggerUserId, } ) rulesPath = await self._resolve_rules_path() await self.Storage.UpdateDocumentStatus(documentId, "processing", runId) task = dispatch_govdoc_task( documentId=documentId, runId=runId, rulesPath=rulesPath, triggerUserId=triggerUserId, speed=speed, ) async with GetAsyncSession() as session: await session.execute( text("UPDATE govdoc_runs SET task_id = :task_id, started_at = now(), updated_at = now() WHERE id = :run_id"), {"task_id": str(getattr(task, "id", "") or ""), "run_id": runId}, ) await session.commit() return { "runId": runId, "documentId": documentId, "status": "pending", "phase": "dispatch", "taskId": str(getattr(task, "id", "") or ""), } async def GetRunStatus(self, runId: int) -> dict[str, Any]: async with GetAsyncSession() as session: await self._ensureGovdocSchema(session) row = ( await session.execute( text( """ SELECT id, document_id, status, phase, result_status, total_score, passed_count, failed_count, skipped_count, error_message, task_id, created_at, updated_at, started_at, finished_at FROM govdoc_runs WHERE id = :run_id AND deleted_at IS NULL LIMIT 1 """ ), {"run_id": runId}, ) ).mappings().first() if not row: raise LeauditException(StatusCodeEnum.HTTP_404_NOT_FOUND, "审查运行不存在") return self._build_run_summary(row) # ── 结果与报告 ──────────────────────────────────────── async def GetRunResult(self, runId: int) -> dict[str, Any]: """从 govdoc_runs + govdoc_rule_results 读取审查结果,含 structure/outline。""" async with GetAsyncSession() as session: await self._ensureGovdocSchema(session) runRow = ( await session.execute( text( """ SELECT id, document_id, status, phase, total_score, passed_count, failed_count, skipped_count, result_status, result_summary_json, started_at, finished_at FROM govdoc_runs WHERE id = :run_id AND deleted_at IS NULL LIMIT 1 """ ), {"run_id": runId}, ) ).mappings().first() if not runRow: raise LeauditException(StatusCodeEnum.HTTP_404_NOT_FOUND, "审查运行不存在") documentRow = ( await session.execute( text( """ SELECT d.id AS document_id, f.file_name FROM leaudit_documents d JOIN leaudit_document_files f ON f.document_id = d.id AND f.is_active = true AND f.file_role = 'original' AND f.deleted_at IS NULL WHERE d.id = :document_id AND d.deleted_at IS NULL LIMIT 1 """ ), {"document_id": int(runRow["document_id"])}, ) ).mappings().first() rulesRows = ( await session.execute( text( """ SELECT rule_id, rule_name, severity, category, result, skip_reason, message, suggestion, actual, expected, evidence, paragraph_index, paragraph_text, location_path, score FROM govdoc_rule_results WHERE run_id = :run_id AND deleted_at IS NULL ORDER BY id ASC """ ), {"run_id": runId}, ) ).mappings().all() aux = self._parse_json(runRow.get("result_summary_json")) findings = [] checkedRules = [] severityStats: dict[str, int] = {} categoryStats: dict[str, int] = {} seenRuleIds: set[str] = set() for rr in rulesRows: location = { "paragraph_index": rr["paragraph_index"] or 0, "role": rr.get("location_path"), "char_start": 0, "char_end": 0, "context": rr.get("paragraph_text") or "", } if rr.get("result") == "fail": severity = str(rr.get("severity") or "info") category = str(rr.get("category") or "") severityStats[severity] = severityStats.get(severity, 0) + 1 if category: categoryStats[category] = categoryStats.get(category, 0) + 1 findings.append( { "finding_id": f"{rr['rule_id']}-{rr.get('paragraph_index') or len(findings)}", "rule_id": rr["rule_id"], "rule_name": rr.get("rule_name") or rr["rule_id"], "severity": severity, "category": category, "location": location, "actual": self._parse_json(rr.get("actual")) or {}, "expected": self._parse_json(rr.get("expected")) or {}, "message": rr.get("message") or "", "suggestion": rr.get("suggestion") or "", "evidence": rr.get("evidence") or "", "confidence": 1.0, } ) ruleId = str(rr["rule_id"]) if ruleId in seenRuleIds: continue seenRuleIds.add(ruleId) status = str(rr.get("result") or "pass") checkedRules.append( { "rule_id": ruleId, "name": rr.get("rule_name") or ruleId, "severity": rr.get("severity") or "info", "category": rr.get("category") or "", "status": status if status in {"pass", "fail", "skipped"} else "pass", "skip_reason": rr.get("skip_reason") or "", } ) totalScore = float(runRow.get("total_score") or 0) return { "runId": runId, "documentId": int(runRow["document_id"]), "document": { "documentId": int(runRow["document_id"]), "filename": str(documentRow["file_name"] or "") if documentRow else "", }, "summary": { "score": totalScore, "total_findings": len(findings), "by_severity": severityStats, "by_category": categoryStats, "passed_count": int(runRow.get("passed_count") or 0), "failed_count": int(runRow.get("failed_count") or 0), "skipped_count": int(runRow.get("skipped_count") or 0), }, "checkedRules": checkedRules, "findings": findings, "structure": aux.get("structure", []), "outline": aux.get("outline", []), "entities": aux.get("entities", {}), } async def GetRunFindings(self, runId: int) -> dict[str, Any]: result = await self.GetRunResult(runId) return {"runId": runId, "findings": result["findings"]} async def GetRunEntities(self, runId: int) -> dict[str, Any]: result = await self.GetRunResult(runId) return {"runId": runId, "entities": result.get("entities", {})} async def GetRunParagraphs(self, runId: int) -> str: runStatus = await self.GetRunStatus(runId) if runStatus["status"] != "completed": raise LeauditException( StatusCodeEnum.HTTP_409_CONFLICT, "当前审查尚未完成,暂时无法加载文档视图", ) paragraphArtifact = await self._get_report_artifact(runId, "paragraph_html") if paragraphArtifact: content = await self.OssService.DownloadBytes(str(paragraphArtifact["oss_url"])) return content.decode("utf-8") documentMeta = await self._get_document_for_read(int(runStatus["documentId"])) fileRow = await self._get_active_original_file(documentMeta.documentId) ossUrl = getattr(fileRow, "ossUrl", None) or fileRow.get("oss_url") fileExt = getattr(fileRow, "fileExt", None) or fileRow.get("file_ext") if not ossUrl: raise LeauditException(StatusCodeEnum.HTTP_404_NOT_FOUND, "原始文档未找到可用存储地址") if (fileExt or "").strip().lower() != "docx": raise LeauditException( StatusCodeEnum.HTTP_400_BAD_REQUEST, "当前文档视图仅支持 DOCX 原文预览,请上传 DOCX 文件", ) tempPath = await self.OssService.DownloadToTempFile( Source=ossUrl, Suffix=f".{fileExt or 'docx'}", Prefix="govdoc-run-", ) try: doc = parse_docx(tempPath) findingsResult = await self.GetRunFindings(runId) findingMap: dict[int, list[str]] = {} for finding in findingsResult["findings"]: pi = int(finding.get("location", {}).get("paragraph_index") or 0) findingMap.setdefault(pi, []).append(str(finding["finding_id"])) return paragraphs_to_html(doc, findingMap) finally: try: Path(tempPath).unlink(missing_ok=True) except Exception: pass async def GetRunStructure(self, runId: int) -> dict[str, Any]: result = await self.GetRunResult(runId) return {"runId": runId, "structure": result.get("structure", [])} async def GetRunOutline(self, runId: int) -> dict[str, Any]: result = await self.GetRunResult(runId) return {"runId": runId, "outline": result.get("outline", [])} async def GetReportHtml(self, runId: int) -> dict[str, Any]: result = await self.GetRunResult(runId) html = render_html(self._build_audit_result_from_run_result(result)) return {"runId": runId, "html": html} async def GetReportDocx(self, runId: int) -> dict[str, Any]: artifact = await self._get_report_artifact(runId, "annotated_docx") if not artifact: return {"runId": runId, "docxUrl": ""} return { "runId": runId, "docxUrl": await self.OssService.PresignGetUrl(str(artifact["oss_url"])), } async def DownloadOriginal(self, documentId: int) -> dict[str, Any]: fileRow = await self._get_active_original_file(documentId) ossUrl = getattr(fileRow, "ossUrl", None) or fileRow.get("oss_url") if not ossUrl: raise LeauditException(StatusCodeEnum.HTTP_404_NOT_FOUND, "原始文档不存在") return { "documentId": documentId, "downloadUrl": await self.OssService.PresignGetUrl(str(ossUrl)), } # ── 规则 ────────────────────────────────────────────── async def ListRules(self, rulesPath: str | None = None) -> dict[str, Any]: """从 govdoc 规则 YAML 文件加载规则清单。""" rules = await self._load_rules_list(rulesPath) return {"metadata": {}, "rules": rules, "total_rules": len(rules)} async def GetRuleDetail(self, ruleId: str, rulesPath: str | None = None) -> dict[str, Any]: """获取单条规则完整详情(名称、严重度、stages、消息等)。""" ruleset = await self._load_ruleset(rulesPath) if ruleset is None: return {"rule_id": ruleId, "name": ruleId, "severity": "info", "category": "", "group": ""} for rule in ruleset.all_rules(): if rule.rule_id == ruleId: return { "rule_id": rule.rule_id, "name": rule.name, "severity": rule.severity, "category": rule.category, "group": "", "applies_to": rule.applies_to.model_dump() if rule.applies_to else None, "target": rule.target, "on_missing": rule.on_missing, "stages": [s.model_dump(exclude_none=True) for s in (rule.stages or [])], "messages": rule.messages.model_dump() if rule.messages else {}, } return {"rule_id": ruleId, "name": ruleId, "severity": "info", "category": "", "group": ""} # ── 助手 ────────────────────────────────────────────── async def _resolve_rules_path(self, rulesPath: str | None = None) -> str | None: """解析规则 YAML 文件路径。""" if rulesPath: return rulesPath candidates = [ Path("/home/wren-dev/Porject/leaudit-platform/rules/govdoc/govdoc_general/rules.yaml"), Path("/home/wren-dev/Porject/leaudit-platform/rules/govdoc_general/rules.yaml"), ] for candidate in candidates: if candidate.is_file(): return str(candidate) return None async def _load_ruleset(self, rulesPath: str | None = None): resolved = await self._resolve_rules_path(rulesPath) if not resolved: logger.warning("[Govdoc] Cannot resolve rules path for GetRuleDetail/ListRules") return None from fastapi_modules.fastapi_leaudit.govdoc_engine.dsl.loader import load_rules return load_rules(resolved) async def _load_rules_list(self, rulesPath: str | None = None) -> list[dict[str, Any]]: ruleset = await self._load_ruleset(rulesPath) if ruleset is None: return [] result = [] for rule in ruleset.all_rules(): result.append( { "rule_id": rule.rule_id, "name": rule.name, "severity": rule.severity, "category": rule.category, "group": "", } ) return result async def _ensureGovdocSchema(self, session) -> None: statements = [ """ ALTER TABLE leaudit_documents ADD COLUMN IF NOT EXISTS engine_type VARCHAR(32) NOT NULL DEFAULT 'leaudit' """, """ CREATE TABLE IF NOT EXISTS public.govdoc_runs ( id BIGINT GENERATED ALWAYS AS IDENTITY PRIMARY KEY, document_id BIGINT NOT NULL, document_file_id BIGINT, run_no INTEGER NOT NULL DEFAULT 1, trigger_source VARCHAR(64) NOT NULL DEFAULT 'upload', trigger_user_id BIGINT, task_id VARCHAR(128), status VARCHAR(64) NOT NULL DEFAULT 'pending', phase VARCHAR(32), engine_version VARCHAR(64), llm_provider VARCHAR(64), llm_model VARCHAR(128), rules_path VARCHAR(1024), total_score NUMERIC(10, 2), passed_count INTEGER, failed_count INTEGER, skipped_count INTEGER, result_status VARCHAR(32), result_summary_json TEXT, error_message TEXT, started_at TIMESTAMPTZ, finished_at TIMESTAMPTZ, created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), deleted_at TIMESTAMPTZ DEFAULT NULL ) """, """ CREATE TABLE IF NOT EXISTS public.govdoc_rule_results ( id BIGINT GENERATED ALWAYS AS IDENTITY PRIMARY KEY, run_id BIGINT NOT NULL, rule_id VARCHAR(128) NOT NULL, rule_name VARCHAR(256), severity VARCHAR(32), category VARCHAR(128), message TEXT, suggestion TEXT, actual TEXT, expected TEXT, evidence TEXT, paragraph_index INTEGER, paragraph_text TEXT, location_path VARCHAR(512), result VARCHAR(32) NOT NULL DEFAULT 'pass', skip_reason TEXT, score NUMERIC(10, 2), created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), deleted_at TIMESTAMPTZ DEFAULT NULL ) """, """ CREATE TABLE IF NOT EXISTS public.govdoc_report_artifacts ( id BIGINT GENERATED ALWAYS AS IDENTITY PRIMARY KEY, run_id BIGINT NOT NULL, artifact_type VARCHAR(64) NOT NULL, file_name VARCHAR(512) NOT NULL, file_ext VARCHAR(32), mime_type VARCHAR(128), file_size BIGINT, sha256 VARCHAR(64), oss_url VARCHAR(2048), storage_provider VARCHAR(32), description VARCHAR(512), created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), deleted_at TIMESTAMPTZ DEFAULT NULL ) """, """ CREATE INDEX IF NOT EXISTS idx_leaudit_documents_engine_type ON public.leaudit_documents(engine_type) WHERE deleted_at IS NULL """, """ CREATE INDEX IF NOT EXISTS idx_govdoc_runs_document_id ON public.govdoc_runs(document_id) WHERE deleted_at IS NULL """, """ CREATE INDEX IF NOT EXISTS idx_govdoc_runs_status ON public.govdoc_runs(status) WHERE deleted_at IS NULL """, """ CREATE INDEX IF NOT EXISTS idx_govdoc_runs_trigger_user_id ON public.govdoc_runs(trigger_user_id) """, """ CREATE INDEX IF NOT EXISTS idx_govdoc_rule_results_run_id ON public.govdoc_rule_results(run_id) WHERE deleted_at IS NULL """, """ CREATE INDEX IF NOT EXISTS idx_govdoc_rule_results_rule_id ON public.govdoc_rule_results(rule_id) WHERE deleted_at IS NULL """, """ CREATE INDEX IF NOT EXISTS idx_govdoc_rule_results_result ON public.govdoc_rule_results(result) WHERE deleted_at IS NULL """, """ CREATE INDEX IF NOT EXISTS idx_govdoc_rule_results_paragraph ON public.govdoc_rule_results(run_id, paragraph_index) WHERE deleted_at IS NULL """, """ CREATE INDEX IF NOT EXISTS idx_govdoc_report_artifacts_run_id ON public.govdoc_report_artifacts(run_id) WHERE deleted_at IS NULL """, """ CREATE INDEX IF NOT EXISTS idx_govdoc_report_artifacts_type ON public.govdoc_report_artifacts(run_id, artifact_type) WHERE deleted_at IS NULL """, ] for statement in statements: await session.execute(text(statement)) await session.commit() async def _getCurrentUserContext(self, CurrentUserId: int) -> dict[str, Any]: async with GetAsyncSession() as session: await self._backfill_missing_version_groups(session) row = ( await session.execute( text( """ SELECT u.id, COALESCE(u.area, '') AS area, COALESCE(bool_or(r.role_key IN ('super_admin', 'provincial_admin')), FALSE) AS is_global, COALESCE(bool_or(r.role_key IN ('super_admin', 'provincial_admin', 'admin')), FALSE) AS can_manage, COALESCE(bool_or(r.role_key = 'super_admin'), FALSE) AS is_super_admin FROM sso_users u LEFT JOIN user_role ur ON ur.user_id = u.id LEFT JOIN roles r ON r.id = ur.role_id WHERE u.id = :user_id GROUP BY u.id, u.area """ ), {"user_id": CurrentUserId}, ) ).mappings().first() if not row: raise LeauditException(StatusCodeEnum.HTTP_404_NOT_FOUND, "当前用户不存在") return { "area": str(row["area"] or ""), "is_global": bool(row["is_global"]), "can_manage": bool(row["can_manage"]), "is_super_admin": bool(row["is_super_admin"]), } def _buildDocumentScopeFilters( self, CurrentUserId: int, CurrentUser: dict[str, Any], Params: dict[str, object], DocumentAlias: str, FileAlias: str, RequestedRegion: str | None = None, RequestedUserId: int | None = None, ) -> list[str]: filters: list[str] = [] requestedRegion = (RequestedRegion or "").strip() area = str(CurrentUser["area"] or "").strip() if CurrentUser["is_global"]: if requestedRegion: filters.append(f"{DocumentAlias}.region = :requested_region") Params["requested_region"] = requestedRegion if RequestedUserId is not None: filters.append(f"{FileAlias}.created_by = :requested_user_id") Params["requested_user_id"] = RequestedUserId return filters if CurrentUser["can_manage"]: if not area: filters.append("1 = 0") return filters if requestedRegion and requestedRegion != area: filters.append("1 = 0") return filters filters.append(f"{DocumentAlias}.region = :scope_region") Params["scope_region"] = area if RequestedUserId is not None: filters.append(f"{FileAlias}.created_by = :requested_user_id") Params["requested_user_id"] = RequestedUserId return filters filters.append(f"{FileAlias}.created_by = :scope_user_id") Params["scope_user_id"] = CurrentUserId if requestedRegion: filters.append(f"{DocumentAlias}.region = :requested_region") Params["requested_region"] = requestedRegion if RequestedUserId is not None and RequestedUserId != CurrentUserId: filters.append("1 = 0") return filters def _resolve_upload_region(self, currentUser: dict[str, Any], requestedRegion: str) -> str: area = str(currentUser["area"] or "").strip() if currentUser["is_global"]: return requestedRegion or area or "default" if currentUser["can_manage"]: if area and requestedRegion and requestedRegion != area: raise LeauditException(StatusCodeEnum.HTTP_403_FORBIDDEN, "不能上传到非本地区") return area or requestedRegion or "default" if area and requestedRegion and requestedRegion != area: raise LeauditException(StatusCodeEnum.HTTP_403_FORBIDDEN, "不能上传到非本人地区") return area or requestedRegion or "default" def _normalize_document_name(self, fileName: str) -> str: suffix = Path(fileName).suffix return fileName[: -len(suffix)] if suffix else fileName async def _get_document_for_run( self, documentId: int, userId: int, currentUser: dict[str, Any], ) -> _GovdocDocumentRow: params: dict[str, Any] = {"document_id": documentId} filters = [ "d.id = :document_id", "d.deleted_at IS NULL", "f.deleted_at IS NULL", "f.is_active = true", "f.file_role = 'original'", "COALESCE(d.engine_type, 'leaudit') = 'govdoc'", ] filters.extend( self._buildDocumentScopeFilters( CurrentUserId=userId, CurrentUser=currentUser, Params=params, DocumentAlias="d", FileAlias="f", ) ) whereClause = " AND ".join(filters) async with GetAsyncSession() as session: await self._backfill_missing_version_groups(session) row = ( await session.execute( text( f""" SELECT d.id AS document_id, COALESCE(d.region, 'default') AS region, COALESCE(d.processing_status, 'waiting') AS processing_status, d.current_run_id, COALESCE(NULLIF(d.version_group_key, ''), fallback_vc.derived_version_group_key, '') AS version_group_key, CASE WHEN COALESCE(d.version_group_key, '') = '' THEN COALESCE(fallback_vc.derived_version_no, 1) ELSE COALESCE(NULLIF(d.version_no, 0), 1) END AS version_no, CASE WHEN COALESCE(d.version_group_key, '') = '' THEN fallback_vc.derived_previous_version_id ELSE d.previous_version_id END AS previous_version_id, CASE WHEN COALESCE(d.version_group_key, '') = '' THEN COALESCE(fallback_vc.derived_root_version_id, d.id) ELSE COALESCE(d.root_version_id, d.id) END AS root_version_id, CASE WHEN COALESCE(d.version_group_key, '') = '' THEN COALESCE(fallback_vc.derived_is_latest_version, COALESCE(d.is_latest_version, true)) ELSE COALESCE(d.is_latest_version, true) END AS is_latest_version, COALESCE(d.normalized_name, '') AS normalized_name, COALESCE(vc.total_versions, fallback_vc.total_versions, 1) AS total_versions, fallback_vc.derived_version_no, d.created_at, d.updated_at, f.id AS file_id, f.file_name, f.file_ext, f.mime_type, f.file_size, f.oss_url, f.created_by, gr.result_status, gr.total_score, gr.passed_count, gr.failed_count, gr.skipped_count, NULL::VARCHAR AS rules_path, 0 AS finding_count, 0 AS error_count, 0 AS warning_count, 0 AS info_count, false AS has_html_report, false AS has_docx_report FROM leaudit_documents d JOIN leaudit_document_files f ON f.document_id = d.id AND f.is_active = true AND f.file_role = 'original' AND f.deleted_at IS NULL LEFT JOIN govdoc_runs gr ON gr.id = d.current_run_id LEFT JOIN ( SELECT version_group_key, COUNT(*) AS total_versions FROM leaudit_documents WHERE deleted_at IS NULL AND COALESCE(version_group_key, '') <> '' GROUP BY version_group_key ) vc ON vc.version_group_key = d.version_group_key LEFT JOIN ( SELECT d2.id AS document_id, COUNT(*) OVER ( PARTITION BY d2.region, COALESCE(d2.normalized_name, ''), COALESCE(f2.file_ext, '') ) AS total_versions, ROW_NUMBER() OVER ( PARTITION BY d2.region, COALESCE(d2.normalized_name, ''), COALESCE(f2.file_ext, '') ORDER BY d2.created_at ASC, d2.id ASC ) AS derived_version_no, LAG(d2.id) OVER ( PARTITION BY d2.region, COALESCE(d2.normalized_name, ''), COALESCE(f2.file_ext, '') ORDER BY d2.created_at ASC, d2.id ASC ) AS derived_previous_version_id, FIRST_VALUE(d2.id) OVER ( PARTITION BY d2.region, COALESCE(d2.normalized_name, ''), COALESCE(f2.file_ext, '') ORDER BY d2.created_at ASC, d2.id ASC ) AS derived_root_version_id, CASE WHEN ROW_NUMBER() OVER ( PARTITION BY d2.region, COALESCE(d2.normalized_name, ''), COALESCE(f2.file_ext, '') ORDER BY d2.created_at DESC, d2.id DESC ) = 1 THEN true ELSE false END AS derived_is_latest_version, md5(CONCAT_WS('|', d2.region, COALESCE(d2.normalized_name, ''), COALESCE(f2.file_ext, ''))) AS derived_version_group_key FROM leaudit_documents d2 JOIN leaudit_document_files f2 ON f2.document_id = d2.id AND f2.is_active = true AND f2.file_role = 'original' AND f2.deleted_at IS NULL WHERE d2.deleted_at IS NULL AND d2.review_scope = 'govdoc' AND COALESCE(d2.engine_type, 'leaudit') = 'govdoc' ) fallback_vc ON fallback_vc.document_id = d.id WHERE {whereClause} LIMIT 1 """ ), params, ) ).mappings().first() if not row: raise LeauditException(StatusCodeEnum.HTTP_404_NOT_FOUND, "公文文档不存在或无权访问") return self._map_document_row(row) async def _get_document_for_read(self, documentId: int) -> _GovdocDocumentRow: async with GetAsyncSession() as session: row = ( await session.execute( text( """ SELECT d.id AS document_id, COALESCE(d.region, 'default') AS region, COALESCE(d.processing_status, 'waiting') AS processing_status, d.current_run_id, COALESCE(NULLIF(d.version_group_key, ''), fallback_vc.derived_version_group_key, '') AS version_group_key, CASE WHEN COALESCE(d.version_group_key, '') = '' THEN COALESCE(fallback_vc.derived_version_no, 1) ELSE COALESCE(NULLIF(d.version_no, 0), 1) END AS version_no, CASE WHEN COALESCE(d.version_group_key, '') = '' THEN fallback_vc.derived_previous_version_id ELSE d.previous_version_id END AS previous_version_id, CASE WHEN COALESCE(d.version_group_key, '') = '' THEN COALESCE(fallback_vc.derived_root_version_id, d.id) ELSE COALESCE(d.root_version_id, d.id) END AS root_version_id, CASE WHEN COALESCE(d.version_group_key, '') = '' THEN COALESCE(fallback_vc.derived_is_latest_version, COALESCE(d.is_latest_version, true)) ELSE COALESCE(d.is_latest_version, true) END AS is_latest_version, COALESCE(d.normalized_name, '') AS normalized_name, COALESCE(vc.total_versions, fallback_vc.total_versions, 1) AS total_versions, fallback_vc.derived_version_no, d.created_at, d.updated_at, f.id AS file_id, f.file_name, f.file_ext, f.mime_type, f.file_size, f.oss_url, f.created_by, gr.result_status, gr.total_score, gr.passed_count, gr.failed_count, gr.skipped_count, NULL::VARCHAR AS rules_path, 0 AS finding_count, 0 AS error_count, 0 AS warning_count, 0 AS info_count, false AS has_html_report, false AS has_docx_report FROM leaudit_documents d JOIN leaudit_document_files f ON f.document_id = d.id AND f.is_active = true AND f.file_role = 'original' AND f.deleted_at IS NULL LEFT JOIN govdoc_runs gr ON gr.id = d.current_run_id LEFT JOIN ( SELECT version_group_key, COUNT(*) AS total_versions FROM leaudit_documents WHERE deleted_at IS NULL AND COALESCE(version_group_key, '') <> '' GROUP BY version_group_key ) vc ON vc.version_group_key = d.version_group_key LEFT JOIN ( SELECT d2.id AS document_id, COUNT(*) OVER ( PARTITION BY d2.region, COALESCE(d2.normalized_name, ''), COALESCE(f2.file_ext, '') ) AS total_versions, ROW_NUMBER() OVER ( PARTITION BY d2.region, COALESCE(d2.normalized_name, ''), COALESCE(f2.file_ext, '') ORDER BY d2.created_at ASC, d2.id ASC ) AS derived_version_no, LAG(d2.id) OVER ( PARTITION BY d2.region, COALESCE(d2.normalized_name, ''), COALESCE(f2.file_ext, '') ORDER BY d2.created_at ASC, d2.id ASC ) AS derived_previous_version_id, FIRST_VALUE(d2.id) OVER ( PARTITION BY d2.region, COALESCE(d2.normalized_name, ''), COALESCE(f2.file_ext, '') ORDER BY d2.created_at ASC, d2.id ASC ) AS derived_root_version_id, CASE WHEN ROW_NUMBER() OVER ( PARTITION BY d2.region, COALESCE(d2.normalized_name, ''), COALESCE(f2.file_ext, '') ORDER BY d2.created_at DESC, d2.id DESC ) = 1 THEN true ELSE false END AS derived_is_latest_version, md5(CONCAT_WS('|', d2.region, COALESCE(d2.normalized_name, ''), COALESCE(f2.file_ext, ''))) AS derived_version_group_key FROM leaudit_documents d2 JOIN leaudit_document_files f2 ON f2.document_id = d2.id AND f2.is_active = true AND f2.file_role = 'original' AND f2.deleted_at IS NULL WHERE d2.deleted_at IS NULL AND d2.review_scope = 'govdoc' AND COALESCE(d2.engine_type, 'leaudit') = 'govdoc' ) fallback_vc ON fallback_vc.document_id = d.id WHERE d.id = :document_id AND d.deleted_at IS NULL AND COALESCE(d.engine_type, 'leaudit') = 'govdoc' LIMIT 1 """ ), {"document_id": documentId}, ) ).mappings().first() if not row: raise LeauditException(StatusCodeEnum.HTTP_404_NOT_FOUND, "公文文档不存在") return self._map_document_row(row) async def _get_active_original_file(self, documentId: int): async with GetAsyncSession() as session: row = ( await session.execute( text( """ SELECT id, document_id, file_name, file_ext, mime_type, file_size, oss_url, created_by FROM leaudit_document_files WHERE document_id = :document_id AND is_active = true AND file_role = 'original' AND deleted_at IS NULL ORDER BY id DESC LIMIT 1 """ ), {"document_id": documentId}, ) ).mappings().first() if not row: raise LeauditException(StatusCodeEnum.HTTP_404_NOT_FOUND, "原始文档不存在") return row async def _next_run_no(self, documentId: int) -> int: async with GetAsyncSession() as session: current = ( await session.execute( text( """ SELECT COALESCE(MAX(run_no), 0) FROM govdoc_runs WHERE document_id = :document_id AND deleted_at IS NULL """ ), {"document_id": documentId}, ) ).scalar_one() return int(current or 0) + 1 def _build_run_summary(self, row: Any) -> dict[str, Any]: summary = self._build_summary_payload( row.get("total_score"), row.get("passed_count"), row.get("failed_count"), row.get("skipped_count"), ) return { "runId": int(row["id"]), "documentId": int(row["document_id"]), "status": str(row.get("status") or "pending"), "phase": row.get("phase"), "resultStatus": row.get("result_status"), "score": float(row.get("total_score") or 0), "passedCount": int(row.get("passed_count") or 0), "failedCount": int(row.get("failed_count") or 0), "skippedCount": int(row.get("skipped_count") or 0), "summary": summary, "errorMessage": row.get("error_message"), "taskId": row.get("task_id"), "createdAt": self._iso(row.get("created_at")), "updatedAt": self._iso(row.get("updated_at")), "startedAt": self._iso(row.get("started_at")), "finishedAt": self._iso(row.get("finished_at")), } def _build_summary_payload( self, totalScore: Any, passedCount: Any, failedCount: Any, skippedCount: Any, totalFindings: Any | None = None, bySeverity: dict[str, int] | None = None, byCategory: dict[str, int] | None = None, ) -> dict[str, Any]: return { "score": float(totalScore or 0), "total_findings": int(totalFindings if totalFindings is not None else (failedCount or 0)), "by_severity": bySeverity or {}, "by_category": byCategory or {}, "passed_count": int(passedCount or 0), "failed_count": int(failedCount or 0), "skipped_count": int(skippedCount or 0), } def _group_artifacts_by_run(self, rows: list[Any]) -> dict[int, dict[str, Any]]: grouped: dict[int, dict[str, Any]] = {} artifactTypeMap = { "html_report": "htmlUrl", "annotated_docx": "docxUrl", "paragraph_html": "paragraphHtmlUrl", } for row in rows: runId = int(row["run_id"]) payload = grouped.setdefault( runId, { "artifacts": [], "hasHtmlReport": False, "hasDocxReport": False, "hasParagraphHtml": False, }, ) artifactType = str(row.get("artifact_type") or "") if artifactType in artifactTypeMap and row.get("oss_url") and artifactTypeMap[artifactType] not in payload: payload[artifactTypeMap[artifactType]] = row["oss_url"] if artifactType == "html_report": payload["hasHtmlReport"] = True elif artifactType == "annotated_docx": payload["hasDocxReport"] = True elif artifactType == "paragraph_html": payload["hasParagraphHtml"] = True payload["artifacts"].append( { "artifactType": artifactType, "fileName": row.get("file_name") or "", "fileExt": row.get("file_ext") or "", "mimeType": row.get("mime_type") or "", "ossUrl": row.get("oss_url") or "", "description": row.get("description") or "", } ) return grouped def _map_document_row(self, row: Any) -> _GovdocDocumentRow: versionNoValue = row.get("derived_version_no") if versionNoValue is None: versionNoValue = row.get("version_no") return _GovdocDocumentRow( documentId=int(row["document_id"]), region=str(row["region"] or "default"), processingStatus=str(row["processing_status"] or "waiting"), currentRunId=int(row["current_run_id"]) if row.get("current_run_id") is not None else None, versionGroupKey=str(row["version_group_key"]) if row.get("version_group_key") else None, versionNo=int(versionNoValue or 1), totalVersions=int(row.get("total_versions") or 1), previousVersionId=int(row["previous_version_id"]) if row.get("previous_version_id") is not None else None, rootVersionId=int(row["root_version_id"]) if row.get("root_version_id") is not None else None, isLatestVersion=bool(row.get("is_latest_version", True)), normalizedName=str(row["normalized_name"]) if row.get("normalized_name") else None, createdAt=row.get("created_at"), updatedAt=row.get("updated_at"), fileId=int(row["file_id"]), fileName=str(row["file_name"] or ""), fileExt=str(row["file_ext"]) if row.get("file_ext") else None, mimeType=str(row["mime_type"]) if row.get("mime_type") else None, fileSize=int(row["file_size"]) if row.get("file_size") is not None else None, ossUrl=str(row["oss_url"]) if row.get("oss_url") else None, createdBy=int(row["created_by"]) if row.get("created_by") is not None else None, resultStatus=str(row["result_status"]) if row.get("result_status") else None, totalScore=float(row["total_score"]) if row.get("total_score") is not None else None, passedCount=int(row["passed_count"]) if row.get("passed_count") is not None else None, failedCount=int(row["failed_count"]) if row.get("failed_count") is not None else None, skippedCount=int(row["skipped_count"]) if row.get("skipped_count") is not None else None, findingCount=int(row.get("finding_count") or 0), errorCount=int(row.get("error_count") or 0), warningCount=int(row.get("warning_count") or 0), infoCount=int(row.get("info_count") or 0), rulesPath=str(row["rules_path"]) if row.get("rules_path") else None, hasHtmlReport=bool(row.get("has_html_report")), hasDocxReport=bool(row.get("has_docx_report")), ) async def _build_document_list_item(self, mapped: _GovdocDocumentRow) -> dict[str, Any]: summary = self._build_summary_payload( mapped.totalScore, mapped.passedCount, mapped.failedCount, mapped.skippedCount, totalFindings=mapped.findingCount, bySeverity={ "error": mapped.errorCount, "warning": mapped.warningCount, "info": mapped.infoCount, }, ) rulesetMeta = await self._resolve_ruleset_metadata(mapped.rulesPath) return { "documentId": mapped.documentId, "fileId": mapped.fileId, "fileName": mapped.fileName, "fileExt": mapped.fileExt, "mimeType": mapped.mimeType, "fileSize": mapped.fileSize, "region": mapped.region, "processingStatus": mapped.processingStatus, "currentRunId": mapped.currentRunId, "latestRunId": mapped.currentRunId, "resultStatus": mapped.resultStatus, "score": float(mapped.totalScore) if mapped.totalScore is not None else None, "versionGroupKey": mapped.versionGroupKey, "versionNo": mapped.versionNo, "totalVersions": mapped.totalVersions, "previousVersionId": mapped.previousVersionId, "rootVersionId": mapped.rootVersionId, "isLatestVersion": mapped.isLatestVersion, "rulesetId": rulesetMeta["typeId"], "rulesetName": rulesetMeta["name"], "rulesetVersion": rulesetMeta["version"], "passedCount": mapped.passedCount or 0, "failedCount": mapped.failedCount or 0, "skippedCount": mapped.skippedCount or 0, "latestRun": { "runId": mapped.currentRunId, "summary": summary, } if mapped.currentRunId else None, "reports": { "hasHtmlReport": mapped.hasHtmlReport, "hasDocxReport": mapped.hasDocxReport, }, "createdAt": self._iso(mapped.createdAt), "updatedAt": self._iso(mapped.updatedAt), } async def _find_latest_version_candidate( self, session, *, region: str, normalizedName: str, fileExt: str | None, ) -> dict[str, Any] | None: extClause = "" params: dict[str, Any] = { "region": region, "normalized_name": normalizedName, } if fileExt: extClause = " AND f.file_ext = :file_ext" params["file_ext"] = fileExt row = ( await session.execute( text( f""" SELECT d.id AS document_id, d.version_group_key, d.version_no, d.root_version_id FROM leaudit_documents d JOIN leaudit_document_files f ON f.document_id = d.id AND f.is_active = true AND f.file_role = 'original' AND f.deleted_at IS NULL WHERE d.deleted_at IS NULL AND d.review_scope = 'govdoc' AND COALESCE(d.engine_type, 'leaudit') = 'govdoc' AND d.region = :region AND COALESCE(d.normalized_name, '') = :normalized_name AND COALESCE(d.is_latest_version, true) = true{extClause} ORDER BY d.version_no DESC, d.id DESC LIMIT 1 """ ), params, ) ).mappings().first() return dict(row) if row else None async def _backfill_legacy_version_chain( self, session, *, region: str, normalizedName: str, fileExt: str | None, ) -> dict[str, Any] | None: extClause = "" params: dict[str, Any] = { "region": region, "normalized_name": normalizedName, } if fileExt: extClause = " AND f.file_ext = :file_ext" params["file_ext"] = fileExt rows = ( await session.execute( text( f""" SELECT d.id AS document_id FROM leaudit_documents d JOIN leaudit_document_files f ON f.document_id = d.id AND f.is_active = true AND f.file_role = 'original' AND f.deleted_at IS NULL WHERE d.deleted_at IS NULL AND d.review_scope = 'govdoc' AND COALESCE(d.engine_type, 'leaudit') = 'govdoc' AND d.region = :region AND COALESCE(d.normalized_name, '') = :normalized_name{extClause} ORDER BY d.created_at ASC, d.id ASC """ ), params, ) ).mappings().all() if not rows: return None groupKey = uuid.uuid4().hex rootId = int(rows[0]["document_id"]) previousId: int | None = None for index, row in enumerate(rows, start=1): documentId = int(row["document_id"]) isLatest = index == len(rows) await session.execute( text( """ UPDATE leaudit_documents SET version_group_key = :version_group_key, version_no = :version_no, previous_version_id = :previous_version_id, root_version_id = :root_version_id, is_latest_version = :is_latest_version, updated_at = NOW() WHERE id = :document_id """ ), { "version_group_key": groupKey, "version_no": index, "previous_version_id": previousId, "root_version_id": rootId, "is_latest_version": isLatest, "document_id": documentId, }, ) previousId = documentId latestId = int(rows[-1]["document_id"]) return { "document_id": latestId, "version_group_key": groupKey, "version_no": len(rows), "root_version_id": rootId, } async def _backfill_missing_version_groups(self, session) -> None: groups = ( await session.execute( text( """ SELECT d.region, COALESCE(d.normalized_name, '') AS normalized_name, COALESCE(f.file_ext, '') AS file_ext, ARRAY_AGG(d.id ORDER BY d.created_at ASC, d.id ASC) AS document_ids, MIN(NULLIF(d.version_group_key, '')) AS existing_version_group_key FROM leaudit_documents d JOIN leaudit_document_files f ON f.document_id = d.id AND f.is_active = true AND f.file_role = 'original' AND f.deleted_at IS NULL WHERE d.deleted_at IS NULL AND d.review_scope = 'govdoc' AND COALESCE(d.engine_type, 'leaudit') = 'govdoc' GROUP BY d.region, COALESCE(d.normalized_name, ''), COALESCE(f.file_ext, '') HAVING BOOL_OR(COALESCE(d.version_group_key, '') = '') """ ) ) ).mappings().all() if not groups: return for group in groups: region = str(group["region"] or "default") normalizedName = str(group["normalized_name"] or "") fileExt = str(group["file_ext"] or "") documentIds = [int(value) for value in (group["document_ids"] or [])] if not documentIds: continue versionGroupKey = str(group["existing_version_group_key"] or "").strip() or self._derive_version_group_key( region=region, normalizedName=normalizedName, fileExt=fileExt or None, ) rootId = documentIds[0] previousId: int | None = None for index, documentId in enumerate(documentIds, start=1): isLatest = index == len(documentIds) await session.execute( text( """ UPDATE leaudit_documents SET version_group_key = :version_group_key, version_no = :version_no, previous_version_id = :previous_version_id, root_version_id = :root_version_id, is_latest_version = :is_latest_version, updated_at = NOW() WHERE id = :document_id """ ), { "version_group_key": versionGroupKey, "version_no": index, "previous_version_id": previousId, "root_version_id": rootId, "is_latest_version": isLatest, "document_id": documentId, }, ) previousId = documentId await session.commit() def _derive_version_group_key(self, *, region: str, normalizedName: str, fileExt: str | None) -> str: raw = f"{region}|{normalizedName}|{fileExt or ''}" return hashlib.md5(raw.encode("utf-8")).hexdigest() async def _resolve_ruleset_metadata(self, rulesPath: str | None) -> dict[str, str]: ruleset = await self._load_ruleset(rulesPath) if ruleset is not None: return { "typeId": str(ruleset.metadata.type_id or ""), "name": str(ruleset.metadata.name or ""), "version": str(ruleset.metadata.version or ""), } resolved = await self._resolve_rules_path(rulesPath) if not resolved: return {"typeId": "", "name": "", "version": ""} path = Path(resolved) return { "typeId": path.stem, "name": path.parent.name, "version": "", } async def _get_report_artifact(self, runId: int, artifactType: str) -> Any | None: async with GetAsyncSession() as session: await self._ensureGovdocSchema(session) return ( await session.execute( text( """ SELECT oss_url FROM govdoc_report_artifacts WHERE run_id = :run_id AND artifact_type = :artifact_type AND deleted_at IS NULL AND COALESCE(oss_url, '') <> '' ORDER BY id DESC LIMIT 1 """ ), {"run_id": runId, "artifact_type": artifactType}, ) ).mappings().first() def _parse_json(self, raw: Any) -> Any: if raw is None or raw == "": return None if isinstance(raw, (dict, list)): return raw try: return json.loads(raw) except Exception: return None def _iso(self, value: Any) -> str | None: if value is None: return None if isinstance(value, datetime): return value.isoformat() return str(value) def _build_audit_result_from_run_result(self, payload: dict[str, Any]) -> AuditResult: summaryPayload = payload.get("summary") or {} findingsPayload = payload.get("findings") or [] checkedRulesPayload = payload.get("checkedRules") or [] structurePayload = payload.get("structure") or [] outlinePayload = payload.get("outline") or [] entitiesPayload = payload.get("entities") or {} documentPayload = payload.get("document") or {} return AuditResult( audit_id=str(payload.get("runId") or ""), document=documentPayload, summary=AuditSummary.model_validate(summaryPayload), findings=[Finding.model_validate(item) for item in findingsPayload], checked_rules=[CheckedRule.model_validate(item) for item in checkedRulesPayload], structure=[StructureItem.model_validate(item) for item in structurePayload], outline=[OutlineNode.model_validate(item) for item in outlinePayload], entities=entitiesPayload, )