1509 lines
61 KiB
Python
1509 lines
61 KiB
Python
"""Govdoc 公文模块服务实现。"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import hashlib
|
|
import json
|
|
import mimetypes
|
|
import time
|
|
from dataclasses import dataclass
|
|
from datetime import datetime
|
|
from pathlib import Path
|
|
from typing import Any
|
|
|
|
from fastapi import UploadFile
|
|
from sqlalchemy import text
|
|
|
|
from fastapi_common.fastapi_common_logger import logger
|
|
from fastapi_common.fastapi_common_sqlalchemy.database import GetAsyncSession
|
|
from fastapi_common.fastapi_common_storage.oss_path_utils import OssPathUtils
|
|
from fastapi_common.fastapi_common_web.domain.responses import StatusCodeEnum
|
|
from fastapi_common.fastapi_common_web.exception.LeauditException import LeauditException
|
|
|
|
from fastapi_modules.fastapi_leaudit.govdoc_bridge.storage_adapter import StorageAdapter
|
|
from fastapi_modules.fastapi_leaudit.govdoc_bridge.tasks import dispatch_govdoc_task
|
|
from fastapi_modules.fastapi_leaudit.govdoc_engine.parser.docx_parser import parse_docx
|
|
from fastapi_modules.fastapi_leaudit.govdoc_engine.reporter.html_paragraph import paragraphs_to_html
|
|
from fastapi_modules.fastapi_leaudit.models import LeauditDocument, LeauditDocumentFile
|
|
from fastapi_modules.fastapi_leaudit.services import IGovdocService, IOssService
|
|
from fastapi_modules.fastapi_leaudit.services.impl.ossServiceImpl import OssServiceImpl
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class _GovdocDocumentRow:
|
|
documentId: int
|
|
region: str
|
|
processingStatus: str
|
|
currentRunId: int | None
|
|
createdAt: Any
|
|
updatedAt: Any
|
|
fileId: int
|
|
fileName: str
|
|
fileExt: str | None
|
|
mimeType: str | None
|
|
fileSize: int | None
|
|
ossUrl: str | None
|
|
createdBy: int | None
|
|
resultStatus: str | None
|
|
totalScore: float | None
|
|
passedCount: int | None
|
|
failedCount: int | None
|
|
skippedCount: int | None
|
|
hasHtmlReport: bool
|
|
hasDocxReport: bool
|
|
|
|
|
|
class GovdocServiceImpl(IGovdocService):
|
|
"""公文处理与格式审查服务实现。"""
|
|
|
|
def __init__(self, OssService: IOssService | None = None) -> None:
|
|
self.OssService = OssService or OssServiceImpl()
|
|
self.Storage = StorageAdapter()
|
|
|
|
# ── 文档 ──────────────────────────────────────────────
|
|
|
|
async def UploadDocument(
|
|
self,
|
|
file: UploadFile,
|
|
typeId: int | None = None,
|
|
region: str = "default",
|
|
autoRun: bool = True,
|
|
speed: str = "normal",
|
|
ruleVersionId: int | None = None,
|
|
createdBy: int | None = None,
|
|
) -> dict[str, Any]:
|
|
if createdBy is None:
|
|
raise LeauditException(StatusCodeEnum.HTTP_401_UNAUTHORIZED, "当前用户未登录")
|
|
if file is None or not file.filename:
|
|
raise LeauditException(StatusCodeEnum.HTTP_400_BAD_REQUEST, "上传文件不能为空")
|
|
|
|
content = await file.read()
|
|
if not content:
|
|
raise LeauditException(StatusCodeEnum.HTTP_400_BAD_REQUEST, "上传文件内容不能为空")
|
|
|
|
normalizedRegion = (region or "default").strip() or "default"
|
|
fileName = file.filename
|
|
fileExt = Path(fileName).suffix.lstrip(".").lower() or None
|
|
if fileExt != "docx":
|
|
raise LeauditException(
|
|
StatusCodeEnum.HTTP_400_BAD_REQUEST,
|
|
"当前内部公文模块仅支持上传 DOCX 文件",
|
|
)
|
|
mimeType = file.content_type or mimetypes.guess_type(fileName)[0] or "application/octet-stream"
|
|
fileSha256 = hashlib.sha256(content).hexdigest()
|
|
uploadedAt = datetime.now()
|
|
normalizedName = self._normalize_document_name(fileName)
|
|
|
|
async with GetAsyncSession() as session:
|
|
await self._ensureGovdocSchema(session)
|
|
currentUser = await self._getCurrentUserContext(createdBy)
|
|
resolvedRegion = self._resolve_upload_region(currentUser, normalizedRegion)
|
|
|
|
document = await LeauditDocument.create_new(
|
|
session,
|
|
bizDocumentId=time.time_ns(),
|
|
typeId=typeId,
|
|
groupId=None,
|
|
region=resolvedRegion,
|
|
processingStatus="waiting",
|
|
currentRunId=None,
|
|
versionGroupKey=None,
|
|
versionNo=1,
|
|
previousVersionId=None,
|
|
rootVersionId=None,
|
|
isLatestVersion=True,
|
|
normalizedName=normalizedName,
|
|
reviewScope="govdoc",
|
|
)
|
|
document.rootVersionId = document.Id
|
|
await session.flush()
|
|
|
|
objectKey = OssPathUtils.BuildBusinessDocKey(
|
|
Region=resolvedRegion,
|
|
TypeCode="govdoc",
|
|
DocumentId=document.Id,
|
|
Version="v1",
|
|
FileRole="original",
|
|
FileName=fileName,
|
|
Year=uploadedAt.year,
|
|
Month=uploadedAt.month,
|
|
)
|
|
ossUrl = await self.OssService.UploadBytes(
|
|
ObjectKey=objectKey,
|
|
Content=content,
|
|
ContentType=mimeType,
|
|
)
|
|
|
|
documentFile = LeauditDocumentFile(
|
|
documentId=document.Id,
|
|
fileRole="original",
|
|
fileName=fileName,
|
|
fileExt=fileExt,
|
|
mimeType=mimeType,
|
|
fileSize=len(content),
|
|
sha256=fileSha256,
|
|
localPath=None,
|
|
ossUrl=ossUrl,
|
|
storageProvider="minio",
|
|
isActive=True,
|
|
createdBy=createdBy,
|
|
)
|
|
session.add(documentFile)
|
|
await session.flush()
|
|
|
|
await session.execute(
|
|
text(
|
|
"""
|
|
UPDATE leaudit_documents
|
|
SET engine_type = 'govdoc'
|
|
WHERE id = :document_id
|
|
"""
|
|
),
|
|
{"document_id": document.Id},
|
|
)
|
|
await session.commit()
|
|
|
|
await session.refresh(document)
|
|
await session.refresh(documentFile)
|
|
|
|
runPayload: dict[str, Any] | None = None
|
|
shouldAutoRun = bool(autoRun)
|
|
if shouldAutoRun:
|
|
runPayload = await self.CreateRun(
|
|
documentId=document.Id,
|
|
ruleVersionId=ruleVersionId,
|
|
speed=speed,
|
|
force=False,
|
|
triggerUserId=createdBy,
|
|
)
|
|
|
|
return {
|
|
"documentId": document.Id,
|
|
"fileId": documentFile.Id,
|
|
"fileName": documentFile.fileName,
|
|
"region": resolvedRegion,
|
|
"engineType": "govdoc",
|
|
"processingStatus": "processing" if runPayload else (document.processingStatus or "waiting"),
|
|
"autoRunTriggered": shouldAutoRun,
|
|
"latestRunId": runPayload["runId"] if runPayload else None,
|
|
"run": runPayload,
|
|
}
|
|
|
|
async def ListDocuments(
|
|
self,
|
|
page: int = 1,
|
|
pageSize: int = 20,
|
|
keyword: str | None = None,
|
|
fileExt: str | None = None,
|
|
region: str | None = None,
|
|
status: str | None = None,
|
|
resultStatus: str | None = None,
|
|
createdBy: int | None = None,
|
|
dateFrom: str | None = None,
|
|
dateTo: str | None = None,
|
|
userId: int | None = None,
|
|
) -> dict[str, Any]:
|
|
if userId is None:
|
|
raise LeauditException(StatusCodeEnum.HTTP_401_UNAUTHORIZED, "当前用户未登录")
|
|
|
|
currentUser = await self._getCurrentUserContext(userId)
|
|
page = max(1, int(page))
|
|
pageSize = max(1, min(int(pageSize), 100))
|
|
offset = (page - 1) * pageSize
|
|
|
|
async with GetAsyncSession() as session:
|
|
await self._ensureGovdocSchema(session)
|
|
|
|
params: dict[str, Any] = {
|
|
"limit": pageSize,
|
|
"offset": offset,
|
|
}
|
|
filters = [
|
|
"d.deleted_at IS NULL",
|
|
"f.deleted_at IS NULL",
|
|
"f.is_active = true",
|
|
"f.file_role = 'original'",
|
|
"COALESCE(d.engine_type, 'leaudit') = 'govdoc'",
|
|
]
|
|
filters.extend(
|
|
self._buildDocumentScopeFilters(
|
|
CurrentUserId=userId,
|
|
CurrentUser=currentUser,
|
|
Params=params,
|
|
DocumentAlias="d",
|
|
FileAlias="f",
|
|
RequestedRegion=region,
|
|
RequestedUserId=createdBy,
|
|
)
|
|
)
|
|
if keyword:
|
|
filters.append("(f.file_name ILIKE :keyword OR COALESCE(d.normalized_name, '') ILIKE :keyword)")
|
|
params["keyword"] = f"%{keyword.strip()}%"
|
|
if fileExt:
|
|
normalizedExt = fileExt.strip().lstrip(".").lower()
|
|
if normalizedExt:
|
|
filters.append("LOWER(COALESCE(f.file_ext, '')) = :file_ext")
|
|
params["file_ext"] = normalizedExt
|
|
if status:
|
|
filters.append("COALESCE(d.processing_status, '') = :status")
|
|
params["status"] = status.strip()
|
|
if resultStatus:
|
|
filters.append("COALESCE(gr.result_status, '') = :result_status")
|
|
params["result_status"] = resultStatus.strip()
|
|
if dateFrom:
|
|
filters.append("d.created_at >= CAST(:date_from AS date)")
|
|
params["date_from"] = dateFrom.strip()
|
|
if dateTo:
|
|
filters.append("d.created_at < (CAST(:date_to AS date) + INTERVAL '1 day')")
|
|
params["date_to"] = dateTo.strip()
|
|
|
|
whereClause = " AND ".join(filters)
|
|
|
|
async with GetAsyncSession() as session:
|
|
rows = (
|
|
await session.execute(
|
|
text(
|
|
f"""
|
|
SELECT
|
|
d.id AS document_id,
|
|
COALESCE(d.region, 'default') AS region,
|
|
COALESCE(d.processing_status, 'waiting') AS processing_status,
|
|
d.current_run_id,
|
|
d.created_at,
|
|
d.updated_at,
|
|
f.id AS file_id,
|
|
f.file_name,
|
|
f.file_ext,
|
|
f.mime_type,
|
|
f.file_size,
|
|
f.oss_url,
|
|
f.created_by,
|
|
gr.result_status,
|
|
gr.total_score,
|
|
gr.passed_count,
|
|
gr.failed_count,
|
|
gr.skipped_count,
|
|
EXISTS(
|
|
SELECT 1
|
|
FROM govdoc_report_artifacts gra
|
|
WHERE gra.run_id = d.current_run_id
|
|
AND gra.artifact_type = 'html_report'
|
|
AND gra.deleted_at IS NULL
|
|
) AS has_html_report,
|
|
EXISTS(
|
|
SELECT 1
|
|
FROM govdoc_report_artifacts gra
|
|
WHERE gra.run_id = d.current_run_id
|
|
AND gra.artifact_type = 'annotated_docx'
|
|
AND gra.deleted_at IS NULL
|
|
) AS has_docx_report
|
|
FROM leaudit_documents d
|
|
JOIN leaudit_document_files f
|
|
ON f.document_id = d.id
|
|
AND f.is_active = true
|
|
AND f.file_role = 'original'
|
|
AND f.deleted_at IS NULL
|
|
LEFT JOIN govdoc_runs gr
|
|
ON gr.id = d.current_run_id
|
|
WHERE {whereClause}
|
|
ORDER BY d.created_at DESC
|
|
LIMIT :limit OFFSET :offset
|
|
"""
|
|
),
|
|
params,
|
|
)
|
|
).mappings().all()
|
|
|
|
total = int(
|
|
(
|
|
await session.execute(
|
|
text(
|
|
f"""
|
|
SELECT COUNT(1)
|
|
FROM leaudit_documents d
|
|
JOIN leaudit_document_files f
|
|
ON f.document_id = d.id
|
|
AND f.is_active = true
|
|
AND f.file_role = 'original'
|
|
AND f.deleted_at IS NULL
|
|
LEFT JOIN govdoc_runs gr
|
|
ON gr.id = d.current_run_id
|
|
WHERE {whereClause}
|
|
"""
|
|
),
|
|
params,
|
|
)
|
|
).scalar_one()
|
|
)
|
|
|
|
items = []
|
|
for row in rows:
|
|
mapped = self._map_document_row(row)
|
|
summary = self._build_summary_payload(
|
|
mapped.totalScore,
|
|
mapped.passedCount,
|
|
mapped.failedCount,
|
|
mapped.skippedCount,
|
|
)
|
|
items.append(
|
|
{
|
|
"documentId": mapped.documentId,
|
|
"fileId": mapped.fileId,
|
|
"fileName": mapped.fileName,
|
|
"fileExt": mapped.fileExt,
|
|
"mimeType": mapped.mimeType,
|
|
"fileSize": mapped.fileSize,
|
|
"region": mapped.region,
|
|
"processingStatus": mapped.processingStatus,
|
|
"currentRunId": mapped.currentRunId,
|
|
"latestRunId": mapped.currentRunId,
|
|
"resultStatus": mapped.resultStatus,
|
|
"score": float(mapped.totalScore) if mapped.totalScore is not None else None,
|
|
"passedCount": mapped.passedCount or 0,
|
|
"failedCount": mapped.failedCount or 0,
|
|
"skippedCount": mapped.skippedCount or 0,
|
|
"latestRun": {
|
|
"runId": mapped.currentRunId,
|
|
"summary": summary,
|
|
} if mapped.currentRunId else None,
|
|
"reports": {
|
|
"hasHtmlReport": mapped.hasHtmlReport,
|
|
"hasDocxReport": mapped.hasDocxReport,
|
|
},
|
|
"createdAt": self._iso(mapped.createdAt),
|
|
"updatedAt": self._iso(mapped.updatedAt),
|
|
}
|
|
)
|
|
|
|
return {"items": items, "total": total, "page": page, "pageSize": pageSize}
|
|
|
|
async def GetDocumentDetail(self, documentId: int, userId: int | None = None) -> dict[str, Any]:
|
|
if userId is None:
|
|
raise LeauditException(StatusCodeEnum.HTTP_401_UNAUTHORIZED, "当前用户未登录")
|
|
|
|
currentUser = await self._getCurrentUserContext(userId)
|
|
params: dict[str, Any] = {"document_id": documentId, "limit": 20}
|
|
filters = [
|
|
"d.id = :document_id",
|
|
"d.deleted_at IS NULL",
|
|
"f.deleted_at IS NULL",
|
|
"f.is_active = true",
|
|
"f.file_role = 'original'",
|
|
"COALESCE(d.engine_type, 'leaudit') = 'govdoc'",
|
|
]
|
|
filters.extend(
|
|
self._buildDocumentScopeFilters(
|
|
CurrentUserId=userId,
|
|
CurrentUser=currentUser,
|
|
Params=params,
|
|
DocumentAlias="d",
|
|
FileAlias="f",
|
|
)
|
|
)
|
|
whereClause = " AND ".join(filters)
|
|
|
|
async with GetAsyncSession() as session:
|
|
await self._ensureGovdocSchema(session)
|
|
row = (
|
|
await session.execute(
|
|
text(
|
|
f"""
|
|
SELECT
|
|
d.id AS document_id,
|
|
COALESCE(d.region, 'default') AS region,
|
|
COALESCE(d.processing_status, 'waiting') AS processing_status,
|
|
d.current_run_id,
|
|
d.created_at,
|
|
d.updated_at,
|
|
f.id AS file_id,
|
|
f.file_name,
|
|
f.file_ext,
|
|
f.mime_type,
|
|
f.file_size,
|
|
f.oss_url,
|
|
f.created_by,
|
|
gr.result_status,
|
|
gr.total_score,
|
|
gr.passed_count,
|
|
gr.failed_count,
|
|
gr.skipped_count
|
|
FROM leaudit_documents d
|
|
JOIN leaudit_document_files f
|
|
ON f.document_id = d.id
|
|
AND f.is_active = true
|
|
AND f.file_role = 'original'
|
|
AND f.deleted_at IS NULL
|
|
LEFT JOIN govdoc_runs gr
|
|
ON gr.id = d.current_run_id
|
|
WHERE {whereClause}
|
|
LIMIT 1
|
|
"""
|
|
),
|
|
params,
|
|
)
|
|
).mappings().first()
|
|
if not row:
|
|
raise LeauditException(StatusCodeEnum.HTTP_404_NOT_FOUND, "公文文档不存在或无权访问")
|
|
|
|
runs = (
|
|
await session.execute(
|
|
text(
|
|
"""
|
|
SELECT
|
|
id,
|
|
document_id,
|
|
status,
|
|
phase,
|
|
result_status,
|
|
total_score,
|
|
passed_count,
|
|
failed_count,
|
|
skipped_count,
|
|
error_message,
|
|
created_at,
|
|
updated_at,
|
|
started_at,
|
|
finished_at
|
|
FROM govdoc_runs
|
|
WHERE document_id = :document_id
|
|
AND deleted_at IS NULL
|
|
ORDER BY id DESC
|
|
LIMIT :limit
|
|
"""
|
|
),
|
|
params,
|
|
)
|
|
).mappings().all()
|
|
|
|
artifactRows = (
|
|
await session.execute(
|
|
text(
|
|
"""
|
|
SELECT run_id, artifact_type, file_name, file_ext, mime_type, oss_url, description
|
|
FROM govdoc_report_artifacts
|
|
WHERE run_id = ANY(
|
|
SELECT id
|
|
FROM govdoc_runs
|
|
WHERE document_id = :document_id
|
|
AND deleted_at IS NULL
|
|
)
|
|
AND deleted_at IS NULL
|
|
ORDER BY id DESC
|
|
"""
|
|
),
|
|
{"document_id": documentId},
|
|
)
|
|
).mappings().all()
|
|
|
|
mapped = self._map_document_row(row)
|
|
runItems = [self._build_run_summary(item) for item in runs]
|
|
latestRunId = mapped.currentRunId or (runItems[0]["runId"] if runItems else None)
|
|
latestRun = next((item for item in runItems if item["runId"] == latestRunId), runItems[0] if runItems else None)
|
|
artifactsByRun = self._group_artifacts_by_run(artifactRows)
|
|
|
|
return {
|
|
"documentId": mapped.documentId,
|
|
"latestRunId": latestRunId,
|
|
"document": {
|
|
"documentId": mapped.documentId,
|
|
"fileId": mapped.fileId,
|
|
"filename": mapped.fileName,
|
|
"fileExt": mapped.fileExt,
|
|
"mimeType": mapped.mimeType,
|
|
"fileSize": mapped.fileSize,
|
|
"region": mapped.region,
|
|
"processingStatus": mapped.processingStatus,
|
|
"createdAt": self._iso(mapped.createdAt),
|
|
"updatedAt": self._iso(mapped.updatedAt),
|
|
},
|
|
"latestRun": latestRun,
|
|
"currentRun": latestRun,
|
|
"runs": runItems,
|
|
"reports": artifactsByRun.get(int(latestRunId), {}) if latestRunId else {},
|
|
}
|
|
|
|
async def UpdateDocument(self, documentId: int, body: dict[str, Any], userId: int | None = None) -> dict[str, Any]:
|
|
raise LeauditException(StatusCodeEnum.HTTP_403_FORBIDDEN, "当前阶段暂不开放修改公文信息")
|
|
|
|
async def DeleteDocument(self, documentId: int, userId: int | None = None) -> dict[str, Any]:
|
|
if userId is None:
|
|
raise LeauditException(StatusCodeEnum.HTTP_401_UNAUTHORIZED, "当前用户未登录")
|
|
|
|
currentUser = await self._getCurrentUserContext(userId)
|
|
params: dict[str, Any] = {"document_id": documentId}
|
|
filters = ["d.id = :document_id", "d.deleted_at IS NULL", "f.is_active = true", "f.file_role = 'original'"]
|
|
filters.extend(
|
|
self._buildDocumentScopeFilters(
|
|
CurrentUserId=userId,
|
|
CurrentUser=currentUser,
|
|
Params=params,
|
|
DocumentAlias="d",
|
|
FileAlias="f",
|
|
)
|
|
)
|
|
whereClause = " AND ".join(filters)
|
|
async with GetAsyncSession() as session:
|
|
await self._ensureGovdocSchema(session)
|
|
row = (
|
|
await session.execute(
|
|
text(
|
|
f"""
|
|
SELECT d.id
|
|
FROM leaudit_documents d
|
|
JOIN leaudit_document_files f
|
|
ON f.document_id = d.id
|
|
AND f.is_active = true
|
|
AND f.file_role = 'original'
|
|
AND f.deleted_at IS NULL
|
|
WHERE {whereClause}
|
|
LIMIT 1
|
|
"""
|
|
),
|
|
params,
|
|
)
|
|
).first()
|
|
if not row:
|
|
raise LeauditException(StatusCodeEnum.HTTP_404_NOT_FOUND, "公文文档不存在或无权访问")
|
|
|
|
await session.execute(
|
|
text("UPDATE leaudit_documents SET deleted_at = now(), updated_at = now() WHERE id = :document_id"),
|
|
{"document_id": documentId},
|
|
)
|
|
await session.commit()
|
|
|
|
return {"documentId": documentId, "deleted": True}
|
|
|
|
# ── 审查运行 ──────────────────────────────────────────
|
|
|
|
async def CreateRun(
|
|
self,
|
|
documentId: int,
|
|
ruleVersionId: int | None = None,
|
|
speed: str = "normal",
|
|
force: bool = False,
|
|
triggerUserId: int | None = None,
|
|
) -> dict[str, Any]:
|
|
if triggerUserId is None:
|
|
raise LeauditException(StatusCodeEnum.HTTP_401_UNAUTHORIZED, "当前用户未登录")
|
|
|
|
async with GetAsyncSession() as session:
|
|
await self._ensureGovdocSchema(session)
|
|
|
|
currentUser = await self._getCurrentUserContext(triggerUserId)
|
|
documentMeta = await self._get_document_for_run(documentId, triggerUserId, currentUser)
|
|
if documentMeta.currentRunId and not force:
|
|
currentRun = await self.GetRunStatus(documentMeta.currentRunId)
|
|
if currentRun["status"] in {"pending", "processing"}:
|
|
return {
|
|
"runId": documentMeta.currentRunId,
|
|
"documentId": documentId,
|
|
"status": currentRun["status"],
|
|
"phase": currentRun.get("phase"),
|
|
"reused": True,
|
|
}
|
|
|
|
runId = await self.Storage.CreateRun(
|
|
{
|
|
"documentId": documentId,
|
|
"documentFileId": documentMeta.fileId,
|
|
"runNo": await self._next_run_no(documentId),
|
|
"triggerSource": "upload" if not documentMeta.currentRunId else "manual",
|
|
"triggerUserId": triggerUserId,
|
|
}
|
|
)
|
|
rulesPath = await self._resolve_rules_path()
|
|
await self.Storage.UpdateDocumentStatus(documentId, "processing", runId)
|
|
task = dispatch_govdoc_task(
|
|
documentId=documentId,
|
|
runId=runId,
|
|
rulesPath=rulesPath,
|
|
triggerUserId=triggerUserId,
|
|
speed=speed,
|
|
)
|
|
|
|
async with GetAsyncSession() as session:
|
|
await session.execute(
|
|
text("UPDATE govdoc_runs SET task_id = :task_id, started_at = now(), updated_at = now() WHERE id = :run_id"),
|
|
{"task_id": str(getattr(task, "id", "") or ""), "run_id": runId},
|
|
)
|
|
await session.commit()
|
|
|
|
return {
|
|
"runId": runId,
|
|
"documentId": documentId,
|
|
"status": "pending",
|
|
"phase": "dispatch",
|
|
"taskId": str(getattr(task, "id", "") or ""),
|
|
}
|
|
|
|
async def GetRunStatus(self, runId: int) -> dict[str, Any]:
|
|
async with GetAsyncSession() as session:
|
|
await self._ensureGovdocSchema(session)
|
|
row = (
|
|
await session.execute(
|
|
text(
|
|
"""
|
|
SELECT
|
|
id,
|
|
document_id,
|
|
status,
|
|
phase,
|
|
result_status,
|
|
total_score,
|
|
passed_count,
|
|
failed_count,
|
|
skipped_count,
|
|
error_message,
|
|
task_id,
|
|
created_at,
|
|
updated_at,
|
|
started_at,
|
|
finished_at
|
|
FROM govdoc_runs
|
|
WHERE id = :run_id
|
|
AND deleted_at IS NULL
|
|
LIMIT 1
|
|
"""
|
|
),
|
|
{"run_id": runId},
|
|
)
|
|
).mappings().first()
|
|
if not row:
|
|
raise LeauditException(StatusCodeEnum.HTTP_404_NOT_FOUND, "审查运行不存在")
|
|
return self._build_run_summary(row)
|
|
|
|
# ── 结果与报告 ────────────────────────────────────────
|
|
|
|
async def GetRunResult(self, runId: int) -> dict[str, Any]:
|
|
"""从 govdoc_runs + govdoc_rule_results 读取审查结果,含 structure/outline。"""
|
|
async with GetAsyncSession() as session:
|
|
await self._ensureGovdocSchema(session)
|
|
runRow = (
|
|
await session.execute(
|
|
text(
|
|
"""
|
|
SELECT
|
|
id,
|
|
document_id,
|
|
status,
|
|
phase,
|
|
total_score,
|
|
passed_count,
|
|
failed_count,
|
|
skipped_count,
|
|
result_status,
|
|
result_summary_json,
|
|
started_at,
|
|
finished_at
|
|
FROM govdoc_runs
|
|
WHERE id = :run_id
|
|
AND deleted_at IS NULL
|
|
LIMIT 1
|
|
"""
|
|
),
|
|
{"run_id": runId},
|
|
)
|
|
).mappings().first()
|
|
if not runRow:
|
|
raise LeauditException(StatusCodeEnum.HTTP_404_NOT_FOUND, "审查运行不存在")
|
|
|
|
documentRow = (
|
|
await session.execute(
|
|
text(
|
|
"""
|
|
SELECT
|
|
d.id AS document_id,
|
|
f.file_name
|
|
FROM leaudit_documents d
|
|
JOIN leaudit_document_files f
|
|
ON f.document_id = d.id
|
|
AND f.is_active = true
|
|
AND f.file_role = 'original'
|
|
AND f.deleted_at IS NULL
|
|
WHERE d.id = :document_id
|
|
AND d.deleted_at IS NULL
|
|
LIMIT 1
|
|
"""
|
|
),
|
|
{"document_id": int(runRow["document_id"])},
|
|
)
|
|
).mappings().first()
|
|
|
|
rulesRows = (
|
|
await session.execute(
|
|
text(
|
|
"""
|
|
SELECT
|
|
rule_id,
|
|
rule_name,
|
|
severity,
|
|
category,
|
|
result,
|
|
skip_reason,
|
|
message,
|
|
suggestion,
|
|
actual,
|
|
expected,
|
|
evidence,
|
|
paragraph_index,
|
|
paragraph_text,
|
|
location_path,
|
|
score
|
|
FROM govdoc_rule_results
|
|
WHERE run_id = :run_id
|
|
AND deleted_at IS NULL
|
|
ORDER BY id ASC
|
|
"""
|
|
),
|
|
{"run_id": runId},
|
|
)
|
|
).mappings().all()
|
|
|
|
aux = self._parse_json(runRow.get("result_summary_json"))
|
|
findings = []
|
|
checkedRules = []
|
|
severityStats: dict[str, int] = {}
|
|
categoryStats: dict[str, int] = {}
|
|
seenRuleIds: set[str] = set()
|
|
for rr in rulesRows:
|
|
location = {
|
|
"paragraph_index": rr["paragraph_index"] or 0,
|
|
"role": rr.get("location_path"),
|
|
"char_start": 0,
|
|
"char_end": 0,
|
|
"context": rr.get("paragraph_text") or "",
|
|
}
|
|
if rr.get("result") == "fail":
|
|
severity = str(rr.get("severity") or "info")
|
|
category = str(rr.get("category") or "")
|
|
severityStats[severity] = severityStats.get(severity, 0) + 1
|
|
if category:
|
|
categoryStats[category] = categoryStats.get(category, 0) + 1
|
|
findings.append(
|
|
{
|
|
"finding_id": f"{rr['rule_id']}-{rr.get('paragraph_index') or len(findings)}",
|
|
"rule_id": rr["rule_id"],
|
|
"rule_name": rr.get("rule_name") or rr["rule_id"],
|
|
"severity": severity,
|
|
"category": category,
|
|
"location": location,
|
|
"actual": self._parse_json(rr.get("actual")) or {},
|
|
"expected": self._parse_json(rr.get("expected")) or {},
|
|
"message": rr.get("message") or "",
|
|
"suggestion": rr.get("suggestion") or "",
|
|
"evidence": rr.get("evidence") or "",
|
|
"confidence": 1.0,
|
|
}
|
|
)
|
|
ruleId = str(rr["rule_id"])
|
|
if ruleId in seenRuleIds:
|
|
continue
|
|
seenRuleIds.add(ruleId)
|
|
status = str(rr.get("result") or "pass")
|
|
checkedRules.append(
|
|
{
|
|
"rule_id": ruleId,
|
|
"name": rr.get("rule_name") or ruleId,
|
|
"severity": rr.get("severity") or "info",
|
|
"category": rr.get("category") or "",
|
|
"status": status if status in {"pass", "fail", "skipped"} else "pass",
|
|
"skip_reason": rr.get("skip_reason") or "",
|
|
}
|
|
)
|
|
|
|
totalScore = float(runRow.get("total_score") or 0)
|
|
return {
|
|
"runId": runId,
|
|
"documentId": int(runRow["document_id"]),
|
|
"document": {
|
|
"documentId": int(runRow["document_id"]),
|
|
"filename": str(documentRow["file_name"] or "") if documentRow else "",
|
|
},
|
|
"summary": {
|
|
"score": totalScore,
|
|
"total_findings": len(findings),
|
|
"by_severity": severityStats,
|
|
"by_category": categoryStats,
|
|
"passed_count": int(runRow.get("passed_count") or 0),
|
|
"failed_count": int(runRow.get("failed_count") or 0),
|
|
"skipped_count": int(runRow.get("skipped_count") or 0),
|
|
},
|
|
"checkedRules": checkedRules,
|
|
"findings": findings,
|
|
"structure": aux.get("structure", []),
|
|
"outline": aux.get("outline", []),
|
|
"entities": aux.get("entities", {}),
|
|
}
|
|
|
|
async def GetRunFindings(self, runId: int) -> dict[str, Any]:
|
|
result = await self.GetRunResult(runId)
|
|
return {"runId": runId, "findings": result["findings"]}
|
|
|
|
async def GetRunEntities(self, runId: int) -> dict[str, Any]:
|
|
result = await self.GetRunResult(runId)
|
|
return {"runId": runId, "entities": result.get("entities", {})}
|
|
|
|
async def GetRunParagraphs(self, runId: int) -> str:
|
|
runStatus = await self.GetRunStatus(runId)
|
|
if runStatus["status"] != "completed":
|
|
raise LeauditException(
|
|
StatusCodeEnum.HTTP_409_CONFLICT,
|
|
"当前审查尚未完成,暂时无法加载文档视图",
|
|
)
|
|
|
|
paragraphArtifact = await self._get_report_artifact(runId, "paragraph_html")
|
|
if paragraphArtifact:
|
|
content = await self.OssService.DownloadBytes(str(paragraphArtifact["oss_url"]))
|
|
return content.decode("utf-8")
|
|
|
|
documentMeta = await self._get_document_for_read(int(runStatus["documentId"]))
|
|
fileRow = await self._get_active_original_file(documentMeta.documentId)
|
|
|
|
ossUrl = getattr(fileRow, "ossUrl", None) or fileRow.get("oss_url")
|
|
fileExt = getattr(fileRow, "fileExt", None) or fileRow.get("file_ext")
|
|
|
|
if not ossUrl:
|
|
raise LeauditException(StatusCodeEnum.HTTP_404_NOT_FOUND, "原始文档未找到可用存储地址")
|
|
if (fileExt or "").strip().lower() != "docx":
|
|
raise LeauditException(
|
|
StatusCodeEnum.HTTP_400_BAD_REQUEST,
|
|
"当前文档视图仅支持 DOCX 原文预览,请上传 DOCX 文件",
|
|
)
|
|
|
|
tempPath = await self.OssService.DownloadToTempFile(
|
|
Source=ossUrl,
|
|
Suffix=f".{fileExt or 'docx'}",
|
|
Prefix="govdoc-run-",
|
|
)
|
|
try:
|
|
doc = parse_docx(tempPath)
|
|
findingsResult = await self.GetRunFindings(runId)
|
|
findingMap: dict[int, list[str]] = {}
|
|
for finding in findingsResult["findings"]:
|
|
pi = int(finding.get("location", {}).get("paragraph_index") or 0)
|
|
findingMap.setdefault(pi, []).append(str(finding["finding_id"]))
|
|
return paragraphs_to_html(doc, findingMap)
|
|
finally:
|
|
try:
|
|
Path(tempPath).unlink(missing_ok=True)
|
|
except Exception:
|
|
pass
|
|
|
|
async def GetRunStructure(self, runId: int) -> dict[str, Any]:
|
|
result = await self.GetRunResult(runId)
|
|
return {"runId": runId, "structure": result.get("structure", [])}
|
|
|
|
async def GetRunOutline(self, runId: int) -> dict[str, Any]:
|
|
result = await self.GetRunResult(runId)
|
|
return {"runId": runId, "outline": result.get("outline", [])}
|
|
|
|
async def GetReportHtml(self, runId: int) -> dict[str, Any]:
|
|
artifact = await self._get_report_artifact(runId, "html_report")
|
|
if not artifact:
|
|
return {"runId": runId, "htmlUrl": ""}
|
|
return {
|
|
"runId": runId,
|
|
"htmlUrl": await self.OssService.PresignGetUrl(str(artifact["oss_url"])),
|
|
}
|
|
|
|
async def GetReportDocx(self, runId: int) -> dict[str, Any]:
|
|
artifact = await self._get_report_artifact(runId, "annotated_docx")
|
|
if not artifact:
|
|
return {"runId": runId, "docxUrl": ""}
|
|
return {
|
|
"runId": runId,
|
|
"docxUrl": await self.OssService.PresignGetUrl(str(artifact["oss_url"])),
|
|
}
|
|
|
|
async def DownloadOriginal(self, documentId: int) -> dict[str, Any]:
|
|
fileRow = await self._get_active_original_file(documentId)
|
|
ossUrl = getattr(fileRow, "ossUrl", None) or fileRow.get("oss_url")
|
|
if not ossUrl:
|
|
raise LeauditException(StatusCodeEnum.HTTP_404_NOT_FOUND, "原始文档不存在")
|
|
return {
|
|
"documentId": documentId,
|
|
"downloadUrl": await self.OssService.PresignGetUrl(str(ossUrl)),
|
|
}
|
|
|
|
# ── 规则 ──────────────────────────────────────────────
|
|
|
|
async def ListRules(self, rulesPath: str | None = None) -> dict[str, Any]:
|
|
"""从 govdoc 规则 YAML 文件加载规则清单。"""
|
|
rules = await self._load_rules_list(rulesPath)
|
|
return {"metadata": {}, "rules": rules, "total_rules": len(rules)}
|
|
|
|
async def GetRuleDetail(self, ruleId: str, rulesPath: str | None = None) -> dict[str, Any]:
|
|
"""获取单条规则完整详情(名称、严重度、stages、消息等)。"""
|
|
ruleset = await self._load_ruleset(rulesPath)
|
|
if ruleset is None:
|
|
return {"rule_id": ruleId, "name": ruleId, "severity": "info", "category": "", "group": ""}
|
|
for rule in ruleset.all_rules():
|
|
if rule.rule_id == ruleId:
|
|
return {
|
|
"rule_id": rule.rule_id,
|
|
"name": rule.name,
|
|
"severity": rule.severity,
|
|
"category": rule.category,
|
|
"group": "",
|
|
"applies_to": rule.applies_to.model_dump() if rule.applies_to else None,
|
|
"target": rule.target,
|
|
"on_missing": rule.on_missing,
|
|
"stages": [s.model_dump(exclude_none=True) for s in (rule.stages or [])],
|
|
"messages": rule.messages.model_dump() if rule.messages else {},
|
|
}
|
|
return {"rule_id": ruleId, "name": ruleId, "severity": "info", "category": "", "group": ""}
|
|
|
|
# ── 助手 ──────────────────────────────────────────────
|
|
|
|
async def _resolve_rules_path(self, rulesPath: str | None = None) -> str | None:
|
|
"""解析规则 YAML 文件路径。"""
|
|
if rulesPath:
|
|
return rulesPath
|
|
candidates = [
|
|
Path("/home/wren-dev/Porject/leaudit-platform/rules/govdoc/govdoc_general/rules.yaml"),
|
|
Path("/home/wren-dev/Porject/leaudit-platform/rules/govdoc_general/rules.yaml"),
|
|
]
|
|
for candidate in candidates:
|
|
if candidate.is_file():
|
|
return str(candidate)
|
|
return None
|
|
|
|
async def _load_ruleset(self, rulesPath: str | None = None):
|
|
resolved = await self._resolve_rules_path(rulesPath)
|
|
if not resolved:
|
|
logger.warning("[Govdoc] Cannot resolve rules path for GetRuleDetail/ListRules")
|
|
return None
|
|
from fastapi_modules.fastapi_leaudit.govdoc_engine.dsl.loader import load_rules
|
|
|
|
return load_rules(resolved)
|
|
|
|
async def _load_rules_list(self, rulesPath: str | None = None) -> list[dict[str, Any]]:
|
|
ruleset = await self._load_ruleset(rulesPath)
|
|
if ruleset is None:
|
|
return []
|
|
result = []
|
|
for rule in ruleset.all_rules():
|
|
result.append(
|
|
{
|
|
"rule_id": rule.rule_id,
|
|
"name": rule.name,
|
|
"severity": rule.severity,
|
|
"category": rule.category,
|
|
"group": "",
|
|
}
|
|
)
|
|
return result
|
|
|
|
async def _ensureGovdocSchema(self, session) -> None:
|
|
statements = [
|
|
"""
|
|
ALTER TABLE leaudit_documents
|
|
ADD COLUMN IF NOT EXISTS engine_type VARCHAR(32) NOT NULL DEFAULT 'leaudit'
|
|
""",
|
|
"""
|
|
CREATE TABLE IF NOT EXISTS public.govdoc_runs (
|
|
id BIGINT GENERATED ALWAYS AS IDENTITY PRIMARY KEY,
|
|
document_id BIGINT NOT NULL,
|
|
document_file_id BIGINT,
|
|
run_no INTEGER NOT NULL DEFAULT 1,
|
|
trigger_source VARCHAR(64) NOT NULL DEFAULT 'upload',
|
|
trigger_user_id BIGINT,
|
|
task_id VARCHAR(128),
|
|
status VARCHAR(64) NOT NULL DEFAULT 'pending',
|
|
phase VARCHAR(32),
|
|
engine_version VARCHAR(64),
|
|
llm_provider VARCHAR(64),
|
|
llm_model VARCHAR(128),
|
|
rules_path VARCHAR(1024),
|
|
total_score NUMERIC(10, 2),
|
|
passed_count INTEGER,
|
|
failed_count INTEGER,
|
|
skipped_count INTEGER,
|
|
result_status VARCHAR(32),
|
|
result_summary_json TEXT,
|
|
error_message TEXT,
|
|
started_at TIMESTAMPTZ,
|
|
finished_at TIMESTAMPTZ,
|
|
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
|
|
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
|
|
deleted_at TIMESTAMPTZ DEFAULT NULL
|
|
)
|
|
""",
|
|
"""
|
|
CREATE TABLE IF NOT EXISTS public.govdoc_rule_results (
|
|
id BIGINT GENERATED ALWAYS AS IDENTITY PRIMARY KEY,
|
|
run_id BIGINT NOT NULL,
|
|
rule_id VARCHAR(128) NOT NULL,
|
|
rule_name VARCHAR(256),
|
|
severity VARCHAR(32),
|
|
category VARCHAR(128),
|
|
message TEXT,
|
|
suggestion TEXT,
|
|
actual TEXT,
|
|
expected TEXT,
|
|
evidence TEXT,
|
|
paragraph_index INTEGER,
|
|
paragraph_text TEXT,
|
|
location_path VARCHAR(512),
|
|
result VARCHAR(32) NOT NULL DEFAULT 'pass',
|
|
skip_reason TEXT,
|
|
score NUMERIC(10, 2),
|
|
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
|
|
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
|
|
deleted_at TIMESTAMPTZ DEFAULT NULL
|
|
)
|
|
""",
|
|
"""
|
|
CREATE TABLE IF NOT EXISTS public.govdoc_report_artifacts (
|
|
id BIGINT GENERATED ALWAYS AS IDENTITY PRIMARY KEY,
|
|
run_id BIGINT NOT NULL,
|
|
artifact_type VARCHAR(64) NOT NULL,
|
|
file_name VARCHAR(512) NOT NULL,
|
|
file_ext VARCHAR(32),
|
|
mime_type VARCHAR(128),
|
|
file_size BIGINT,
|
|
sha256 VARCHAR(64),
|
|
oss_url VARCHAR(2048),
|
|
storage_provider VARCHAR(32),
|
|
description VARCHAR(512),
|
|
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
|
|
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
|
|
deleted_at TIMESTAMPTZ DEFAULT NULL
|
|
)
|
|
""",
|
|
"""
|
|
CREATE INDEX IF NOT EXISTS idx_leaudit_documents_engine_type
|
|
ON public.leaudit_documents(engine_type) WHERE deleted_at IS NULL
|
|
""",
|
|
"""
|
|
CREATE INDEX IF NOT EXISTS idx_govdoc_runs_document_id
|
|
ON public.govdoc_runs(document_id) WHERE deleted_at IS NULL
|
|
""",
|
|
"""
|
|
CREATE INDEX IF NOT EXISTS idx_govdoc_runs_status
|
|
ON public.govdoc_runs(status) WHERE deleted_at IS NULL
|
|
""",
|
|
"""
|
|
CREATE INDEX IF NOT EXISTS idx_govdoc_runs_trigger_user_id
|
|
ON public.govdoc_runs(trigger_user_id)
|
|
""",
|
|
"""
|
|
CREATE INDEX IF NOT EXISTS idx_govdoc_rule_results_run_id
|
|
ON public.govdoc_rule_results(run_id) WHERE deleted_at IS NULL
|
|
""",
|
|
"""
|
|
CREATE INDEX IF NOT EXISTS idx_govdoc_rule_results_rule_id
|
|
ON public.govdoc_rule_results(rule_id) WHERE deleted_at IS NULL
|
|
""",
|
|
"""
|
|
CREATE INDEX IF NOT EXISTS idx_govdoc_rule_results_result
|
|
ON public.govdoc_rule_results(result) WHERE deleted_at IS NULL
|
|
""",
|
|
"""
|
|
CREATE INDEX IF NOT EXISTS idx_govdoc_rule_results_paragraph
|
|
ON public.govdoc_rule_results(run_id, paragraph_index) WHERE deleted_at IS NULL
|
|
""",
|
|
"""
|
|
CREATE INDEX IF NOT EXISTS idx_govdoc_report_artifacts_run_id
|
|
ON public.govdoc_report_artifacts(run_id) WHERE deleted_at IS NULL
|
|
""",
|
|
"""
|
|
CREATE INDEX IF NOT EXISTS idx_govdoc_report_artifacts_type
|
|
ON public.govdoc_report_artifacts(run_id, artifact_type) WHERE deleted_at IS NULL
|
|
""",
|
|
]
|
|
for statement in statements:
|
|
await session.execute(text(statement))
|
|
await session.commit()
|
|
|
|
async def _getCurrentUserContext(self, CurrentUserId: int) -> dict[str, Any]:
|
|
async with GetAsyncSession() as session:
|
|
row = (
|
|
await session.execute(
|
|
text(
|
|
"""
|
|
SELECT
|
|
u.id,
|
|
COALESCE(u.area, '') AS area,
|
|
COALESCE(bool_or(r.role_key IN ('super_admin', 'provincial_admin')), FALSE) AS is_global,
|
|
COALESCE(bool_or(r.role_key IN ('super_admin', 'provincial_admin', 'admin')), FALSE) AS can_manage,
|
|
COALESCE(bool_or(r.role_key = 'super_admin'), FALSE) AS is_super_admin
|
|
FROM sso_users u
|
|
LEFT JOIN user_role ur ON ur.user_id = u.id
|
|
LEFT JOIN roles r ON r.id = ur.role_id
|
|
WHERE u.id = :user_id
|
|
GROUP BY u.id, u.area
|
|
"""
|
|
),
|
|
{"user_id": CurrentUserId},
|
|
)
|
|
).mappings().first()
|
|
if not row:
|
|
raise LeauditException(StatusCodeEnum.HTTP_404_NOT_FOUND, "当前用户不存在")
|
|
return {
|
|
"area": str(row["area"] or ""),
|
|
"is_global": bool(row["is_global"]),
|
|
"can_manage": bool(row["can_manage"]),
|
|
"is_super_admin": bool(row["is_super_admin"]),
|
|
}
|
|
|
|
def _buildDocumentScopeFilters(
|
|
self,
|
|
CurrentUserId: int,
|
|
CurrentUser: dict[str, Any],
|
|
Params: dict[str, object],
|
|
DocumentAlias: str,
|
|
FileAlias: str,
|
|
RequestedRegion: str | None = None,
|
|
RequestedUserId: int | None = None,
|
|
) -> list[str]:
|
|
filters: list[str] = []
|
|
requestedRegion = (RequestedRegion or "").strip()
|
|
area = str(CurrentUser["area"] or "").strip()
|
|
|
|
if CurrentUser["is_global"]:
|
|
if requestedRegion:
|
|
filters.append(f"{DocumentAlias}.region = :requested_region")
|
|
Params["requested_region"] = requestedRegion
|
|
if RequestedUserId is not None:
|
|
filters.append(f"{FileAlias}.created_by = :requested_user_id")
|
|
Params["requested_user_id"] = RequestedUserId
|
|
return filters
|
|
|
|
if CurrentUser["can_manage"]:
|
|
if not area:
|
|
filters.append("1 = 0")
|
|
return filters
|
|
if requestedRegion and requestedRegion != area:
|
|
filters.append("1 = 0")
|
|
return filters
|
|
filters.append(f"{DocumentAlias}.region = :scope_region")
|
|
Params["scope_region"] = area
|
|
if RequestedUserId is not None:
|
|
filters.append(f"{FileAlias}.created_by = :requested_user_id")
|
|
Params["requested_user_id"] = RequestedUserId
|
|
return filters
|
|
|
|
filters.append(f"{FileAlias}.created_by = :scope_user_id")
|
|
Params["scope_user_id"] = CurrentUserId
|
|
if requestedRegion:
|
|
filters.append(f"{DocumentAlias}.region = :requested_region")
|
|
Params["requested_region"] = requestedRegion
|
|
if RequestedUserId is not None and RequestedUserId != CurrentUserId:
|
|
filters.append("1 = 0")
|
|
return filters
|
|
|
|
def _resolve_upload_region(self, currentUser: dict[str, Any], requestedRegion: str) -> str:
|
|
area = str(currentUser["area"] or "").strip()
|
|
if currentUser["is_global"]:
|
|
return requestedRegion or area or "default"
|
|
if currentUser["can_manage"]:
|
|
if area and requestedRegion and requestedRegion != area:
|
|
raise LeauditException(StatusCodeEnum.HTTP_403_FORBIDDEN, "不能上传到非本地区")
|
|
return area or requestedRegion or "default"
|
|
if area and requestedRegion and requestedRegion != area:
|
|
raise LeauditException(StatusCodeEnum.HTTP_403_FORBIDDEN, "不能上传到非本人地区")
|
|
return area or requestedRegion or "default"
|
|
|
|
def _normalize_document_name(self, fileName: str) -> str:
|
|
suffix = Path(fileName).suffix
|
|
return fileName[: -len(suffix)] if suffix else fileName
|
|
|
|
async def _get_document_for_run(
|
|
self,
|
|
documentId: int,
|
|
userId: int,
|
|
currentUser: dict[str, Any],
|
|
) -> _GovdocDocumentRow:
|
|
params: dict[str, Any] = {"document_id": documentId}
|
|
filters = [
|
|
"d.id = :document_id",
|
|
"d.deleted_at IS NULL",
|
|
"f.deleted_at IS NULL",
|
|
"f.is_active = true",
|
|
"f.file_role = 'original'",
|
|
"COALESCE(d.engine_type, 'leaudit') = 'govdoc'",
|
|
]
|
|
filters.extend(
|
|
self._buildDocumentScopeFilters(
|
|
CurrentUserId=userId,
|
|
CurrentUser=currentUser,
|
|
Params=params,
|
|
DocumentAlias="d",
|
|
FileAlias="f",
|
|
)
|
|
)
|
|
whereClause = " AND ".join(filters)
|
|
async with GetAsyncSession() as session:
|
|
row = (
|
|
await session.execute(
|
|
text(
|
|
f"""
|
|
SELECT
|
|
d.id AS document_id,
|
|
COALESCE(d.region, 'default') AS region,
|
|
COALESCE(d.processing_status, 'waiting') AS processing_status,
|
|
d.current_run_id,
|
|
d.created_at,
|
|
d.updated_at,
|
|
f.id AS file_id,
|
|
f.file_name,
|
|
f.file_ext,
|
|
f.mime_type,
|
|
f.file_size,
|
|
f.oss_url,
|
|
f.created_by,
|
|
gr.result_status,
|
|
gr.total_score,
|
|
gr.passed_count,
|
|
gr.failed_count,
|
|
gr.skipped_count
|
|
FROM leaudit_documents d
|
|
JOIN leaudit_document_files f
|
|
ON f.document_id = d.id
|
|
AND f.is_active = true
|
|
AND f.file_role = 'original'
|
|
AND f.deleted_at IS NULL
|
|
LEFT JOIN govdoc_runs gr
|
|
ON gr.id = d.current_run_id
|
|
WHERE {whereClause}
|
|
LIMIT 1
|
|
"""
|
|
),
|
|
params,
|
|
)
|
|
).mappings().first()
|
|
if not row:
|
|
raise LeauditException(StatusCodeEnum.HTTP_404_NOT_FOUND, "公文文档不存在或无权访问")
|
|
return self._map_document_row(row)
|
|
|
|
async def _get_document_for_read(self, documentId: int) -> _GovdocDocumentRow:
|
|
async with GetAsyncSession() as session:
|
|
row = (
|
|
await session.execute(
|
|
text(
|
|
"""
|
|
SELECT
|
|
d.id AS document_id,
|
|
COALESCE(d.region, 'default') AS region,
|
|
COALESCE(d.processing_status, 'waiting') AS processing_status,
|
|
d.current_run_id,
|
|
d.created_at,
|
|
d.updated_at,
|
|
f.id AS file_id,
|
|
f.file_name,
|
|
f.file_ext,
|
|
f.mime_type,
|
|
f.file_size,
|
|
f.oss_url,
|
|
f.created_by,
|
|
gr.result_status,
|
|
gr.total_score,
|
|
gr.passed_count,
|
|
gr.failed_count,
|
|
gr.skipped_count
|
|
FROM leaudit_documents d
|
|
JOIN leaudit_document_files f
|
|
ON f.document_id = d.id
|
|
AND f.is_active = true
|
|
AND f.file_role = 'original'
|
|
AND f.deleted_at IS NULL
|
|
LEFT JOIN govdoc_runs gr
|
|
ON gr.id = d.current_run_id
|
|
WHERE d.id = :document_id
|
|
AND d.deleted_at IS NULL
|
|
AND COALESCE(d.engine_type, 'leaudit') = 'govdoc'
|
|
LIMIT 1
|
|
"""
|
|
),
|
|
{"document_id": documentId},
|
|
)
|
|
).mappings().first()
|
|
if not row:
|
|
raise LeauditException(StatusCodeEnum.HTTP_404_NOT_FOUND, "公文文档不存在")
|
|
return self._map_document_row(row)
|
|
|
|
async def _get_active_original_file(self, documentId: int):
|
|
async with GetAsyncSession() as session:
|
|
row = (
|
|
await session.execute(
|
|
text(
|
|
"""
|
|
SELECT id, document_id, file_name, file_ext, mime_type, file_size, oss_url, created_by
|
|
FROM leaudit_document_files
|
|
WHERE document_id = :document_id
|
|
AND is_active = true
|
|
AND file_role = 'original'
|
|
AND deleted_at IS NULL
|
|
ORDER BY id DESC
|
|
LIMIT 1
|
|
"""
|
|
),
|
|
{"document_id": documentId},
|
|
)
|
|
).mappings().first()
|
|
if not row:
|
|
raise LeauditException(StatusCodeEnum.HTTP_404_NOT_FOUND, "原始文档不存在")
|
|
return row
|
|
|
|
async def _next_run_no(self, documentId: int) -> int:
|
|
async with GetAsyncSession() as session:
|
|
current = (
|
|
await session.execute(
|
|
text(
|
|
"""
|
|
SELECT COALESCE(MAX(run_no), 0)
|
|
FROM govdoc_runs
|
|
WHERE document_id = :document_id
|
|
AND deleted_at IS NULL
|
|
"""
|
|
),
|
|
{"document_id": documentId},
|
|
)
|
|
).scalar_one()
|
|
return int(current or 0) + 1
|
|
|
|
def _build_run_summary(self, row: Any) -> dict[str, Any]:
|
|
summary = self._build_summary_payload(
|
|
row.get("total_score"),
|
|
row.get("passed_count"),
|
|
row.get("failed_count"),
|
|
row.get("skipped_count"),
|
|
)
|
|
return {
|
|
"runId": int(row["id"]),
|
|
"documentId": int(row["document_id"]),
|
|
"status": str(row.get("status") or "pending"),
|
|
"phase": row.get("phase"),
|
|
"resultStatus": row.get("result_status"),
|
|
"score": float(row.get("total_score") or 0),
|
|
"passedCount": int(row.get("passed_count") or 0),
|
|
"failedCount": int(row.get("failed_count") or 0),
|
|
"skippedCount": int(row.get("skipped_count") or 0),
|
|
"summary": summary,
|
|
"errorMessage": row.get("error_message"),
|
|
"taskId": row.get("task_id"),
|
|
"createdAt": self._iso(row.get("created_at")),
|
|
"updatedAt": self._iso(row.get("updated_at")),
|
|
"startedAt": self._iso(row.get("started_at")),
|
|
"finishedAt": self._iso(row.get("finished_at")),
|
|
}
|
|
|
|
def _build_summary_payload(
|
|
self,
|
|
totalScore: Any,
|
|
passedCount: Any,
|
|
failedCount: Any,
|
|
skippedCount: Any,
|
|
) -> dict[str, Any]:
|
|
return {
|
|
"score": float(totalScore or 0),
|
|
"total_findings": int(failedCount or 0),
|
|
"by_severity": {},
|
|
"by_category": {},
|
|
"passed_count": int(passedCount or 0),
|
|
"failed_count": int(failedCount or 0),
|
|
"skipped_count": int(skippedCount or 0),
|
|
}
|
|
|
|
def _group_artifacts_by_run(self, rows: list[Any]) -> dict[int, dict[str, Any]]:
|
|
grouped: dict[int, dict[str, Any]] = {}
|
|
artifactTypeMap = {
|
|
"html_report": "htmlUrl",
|
|
"annotated_docx": "docxUrl",
|
|
"paragraph_html": "paragraphHtmlUrl",
|
|
}
|
|
for row in rows:
|
|
runId = int(row["run_id"])
|
|
payload = grouped.setdefault(
|
|
runId,
|
|
{
|
|
"artifacts": [],
|
|
"hasHtmlReport": False,
|
|
"hasDocxReport": False,
|
|
"hasParagraphHtml": False,
|
|
},
|
|
)
|
|
artifactType = str(row.get("artifact_type") or "")
|
|
if artifactType in artifactTypeMap and row.get("oss_url") and artifactTypeMap[artifactType] not in payload:
|
|
payload[artifactTypeMap[artifactType]] = row["oss_url"]
|
|
if artifactType == "html_report":
|
|
payload["hasHtmlReport"] = True
|
|
elif artifactType == "annotated_docx":
|
|
payload["hasDocxReport"] = True
|
|
elif artifactType == "paragraph_html":
|
|
payload["hasParagraphHtml"] = True
|
|
payload["artifacts"].append(
|
|
{
|
|
"artifactType": artifactType,
|
|
"fileName": row.get("file_name") or "",
|
|
"fileExt": row.get("file_ext") or "",
|
|
"mimeType": row.get("mime_type") or "",
|
|
"ossUrl": row.get("oss_url") or "",
|
|
"description": row.get("description") or "",
|
|
}
|
|
)
|
|
return grouped
|
|
|
|
def _map_document_row(self, row: Any) -> _GovdocDocumentRow:
|
|
return _GovdocDocumentRow(
|
|
documentId=int(row["document_id"]),
|
|
region=str(row["region"] or "default"),
|
|
processingStatus=str(row["processing_status"] or "waiting"),
|
|
currentRunId=int(row["current_run_id"]) if row.get("current_run_id") is not None else None,
|
|
createdAt=row.get("created_at"),
|
|
updatedAt=row.get("updated_at"),
|
|
fileId=int(row["file_id"]),
|
|
fileName=str(row["file_name"] or ""),
|
|
fileExt=str(row["file_ext"]) if row.get("file_ext") else None,
|
|
mimeType=str(row["mime_type"]) if row.get("mime_type") else None,
|
|
fileSize=int(row["file_size"]) if row.get("file_size") is not None else None,
|
|
ossUrl=str(row["oss_url"]) if row.get("oss_url") else None,
|
|
createdBy=int(row["created_by"]) if row.get("created_by") is not None else None,
|
|
resultStatus=str(row["result_status"]) if row.get("result_status") else None,
|
|
totalScore=float(row["total_score"]) if row.get("total_score") is not None else None,
|
|
passedCount=int(row["passed_count"]) if row.get("passed_count") is not None else None,
|
|
failedCount=int(row["failed_count"]) if row.get("failed_count") is not None else None,
|
|
skippedCount=int(row["skipped_count"]) if row.get("skipped_count") is not None else None,
|
|
hasHtmlReport=bool(row.get("has_html_report")),
|
|
hasDocxReport=bool(row.get("has_docx_report")),
|
|
)
|
|
|
|
async def _get_report_artifact(self, runId: int, artifactType: str) -> Any | None:
|
|
async with GetAsyncSession() as session:
|
|
await self._ensureGovdocSchema(session)
|
|
return (
|
|
await session.execute(
|
|
text(
|
|
"""
|
|
SELECT oss_url
|
|
FROM govdoc_report_artifacts
|
|
WHERE run_id = :run_id
|
|
AND artifact_type = :artifact_type
|
|
AND deleted_at IS NULL
|
|
AND COALESCE(oss_url, '') <> ''
|
|
ORDER BY id DESC
|
|
LIMIT 1
|
|
"""
|
|
),
|
|
{"run_id": runId, "artifact_type": artifactType},
|
|
)
|
|
).mappings().first()
|
|
|
|
def _parse_json(self, raw: Any) -> Any:
|
|
if raw is None or raw == "":
|
|
return None
|
|
if isinstance(raw, (dict, list)):
|
|
return raw
|
|
try:
|
|
return json.loads(raw)
|
|
except Exception:
|
|
return None
|
|
|
|
def _iso(self, value: Any) -> str | None:
|
|
if value is None:
|
|
return None
|
|
if isinstance(value, datetime):
|
|
return value.isoformat()
|
|
return str(value)
|