Files

2236 lines
99 KiB
Python

"""Govdoc 公文模块服务实现。"""
from __future__ import annotations
import hashlib
import json
import mimetypes
import time
import uuid
from dataclasses import dataclass
from datetime import date, datetime
from pathlib import Path
from typing import Any
from fastapi import UploadFile
from sqlalchemy import bindparam, text
from fastapi_common.fastapi_common_logger import logger
from fastapi_common.fastapi_common_sqlalchemy.database import GetAsyncSession
from fastapi_common.fastapi_common_storage.oss_path_utils import OssPathUtils
from fastapi_common.fastapi_common_web.domain.responses import StatusCodeEnum
from fastapi_common.fastapi_common_web.exception.LeauditException import LeauditException
from fastapi_modules.fastapi_leaudit.govdoc_bridge.storage_adapter import StorageAdapter
from fastapi_modules.fastapi_leaudit.govdoc_engine.engine.result import AuditResult, AuditSummary, CheckedRule, OutlineNode, StructureItem
from fastapi_modules.fastapi_leaudit.govdoc_bridge.tasks import dispatch_govdoc_task
from fastapi_modules.fastapi_leaudit.govdoc_engine.parser.docx_parser import parse_docx
from fastapi_modules.fastapi_leaudit.govdoc_engine.models import Finding, Location
from fastapi_modules.fastapi_leaudit.govdoc_engine.reporter.html_paragraph import paragraphs_to_html
from fastapi_modules.fastapi_leaudit.govdoc_engine.reporter.html_renderer import render_html
from fastapi_modules.fastapi_leaudit.models import LeauditDocument, LeauditDocumentFile
from fastapi_modules.fastapi_leaudit.services import IGovdocService, IOssService
from fastapi_modules.fastapi_leaudit.services.impl.ossServiceImpl import OssServiceImpl
@dataclass(frozen=True)
class _GovdocDocumentRow:
documentId: int
region: str
processingStatus: str
currentRunId: int | None
versionGroupKey: str | None
versionNo: int
totalVersions: int
previousVersionId: int | None
rootVersionId: int | None
isLatestVersion: bool
normalizedName: str | None
createdAt: Any
updatedAt: Any
fileId: int
fileName: str
fileExt: str | None
mimeType: str | None
fileSize: int | None
ossUrl: str | None
createdBy: int | None
resultStatus: str | None
totalScore: float | None
passedCount: int | None
failedCount: int | None
skippedCount: int | None
findingCount: int
errorCount: int
warningCount: int
infoCount: int
rulesPath: str | None
hasHtmlReport: bool
hasDocxReport: bool
class GovdocServiceImpl(IGovdocService):
"""公文处理与格式审查服务实现。"""
def __init__(self, OssService: IOssService | None = None) -> None:
self.OssService = OssService or OssServiceImpl()
self.Storage = StorageAdapter()
def _parse_date_filter(self, value: str | None, field_name: str) -> date | None:
if value is None:
return None
normalized = value.strip()
if not normalized:
return None
try:
return date.fromisoformat(normalized)
except ValueError as exc:
raise LeauditException(
StatusCodeEnum.HTTP_400_BAD_REQUEST,
f"{field_name} 格式非法,应为 YYYY-MM-DD",
) from exc
# ── 文档 ──────────────────────────────────────────────
async def UploadDocument(
self,
file: UploadFile,
typeId: int | None = None,
region: str = "default",
autoRun: bool = True,
speed: str = "normal",
ruleVersionId: int | None = None,
createdBy: int | None = None,
) -> dict[str, Any]:
if createdBy is None:
raise LeauditException(StatusCodeEnum.HTTP_401_UNAUTHORIZED, "当前用户未登录")
if file is None or not file.filename:
raise LeauditException(StatusCodeEnum.HTTP_400_BAD_REQUEST, "上传文件不能为空")
content = await file.read()
if not content:
raise LeauditException(StatusCodeEnum.HTTP_400_BAD_REQUEST, "上传文件内容不能为空")
normalizedRegion = (region or "default").strip() or "default"
fileName = file.filename
fileExt = Path(fileName).suffix.lstrip(".").lower() or None
if fileExt != "docx":
raise LeauditException(
StatusCodeEnum.HTTP_400_BAD_REQUEST,
"当前内部公文模块仅支持上传 DOCX 文件",
)
mimeType = file.content_type or mimetypes.guess_type(fileName)[0] or "application/octet-stream"
fileSha256 = hashlib.sha256(content).hexdigest()
uploadedAt = datetime.now()
normalizedName = self._normalize_document_name(fileName)
async with GetAsyncSession() as session:
await self._ensureGovdocSchema(session)
await self._backfill_missing_version_groups(session)
currentUser = await self._getCurrentUserContext(createdBy)
resolvedRegion = self._resolve_upload_region(currentUser, normalizedRegion)
latestCandidate = await self._find_latest_version_candidate(
session,
region=resolvedRegion,
normalizedName=normalizedName,
fileExt=fileExt,
)
if latestCandidate and not latestCandidate.get("version_group_key"):
latestCandidate = await self._backfill_legacy_version_chain(
session,
region=resolvedRegion,
normalizedName=normalizedName,
fileExt=fileExt,
)
previousVersionId: int | None = None
rootVersionId: int | None = None
versionGroupKey: str | None = None
versionNo = 1
if latestCandidate:
previousVersionId = int(latestCandidate["document_id"])
rootVersionId = int(latestCandidate["root_version_id"] or latestCandidate["document_id"])
versionGroupKey = str(latestCandidate["version_group_key"] or "")
versionNo = int(latestCandidate["version_no"] or 0) + 1
previousDocument = await session.get(LeauditDocument, previousVersionId)
if previousDocument is not None:
previousDocument.isLatestVersion = False
else:
versionGroupKey = uuid.uuid4().hex
document = await LeauditDocument.create_new(
session,
bizDocumentId=time.time_ns(),
typeId=typeId,
groupId=None,
region=resolvedRegion,
processingStatus="waiting",
currentRunId=None,
versionGroupKey=versionGroupKey,
versionNo=versionNo,
previousVersionId=previousVersionId,
rootVersionId=rootVersionId,
isLatestVersion=True,
normalizedName=normalizedName,
reviewScope="govdoc",
)
if document.rootVersionId is None:
document.rootVersionId = document.Id
await session.flush()
objectKey = OssPathUtils.BuildBusinessDocKey(
Region=resolvedRegion,
TypeCode="govdoc",
DocumentId=document.Id,
Version=f"v{document.versionNo}",
FileRole="original",
FileName=fileName,
Year=uploadedAt.year,
Month=uploadedAt.month,
)
ossUrl = await self.OssService.UploadBytes(
ObjectKey=objectKey,
Content=content,
ContentType=mimeType,
)
documentFile = LeauditDocumentFile(
documentId=document.Id,
fileRole="original",
fileName=fileName,
fileExt=fileExt,
mimeType=mimeType,
fileSize=len(content),
sha256=fileSha256,
localPath=None,
ossUrl=ossUrl,
storageProvider="minio",
isActive=True,
createdBy=createdBy,
)
session.add(documentFile)
await session.flush()
await session.execute(
text(
"""
UPDATE leaudit_documents
SET engine_type = 'govdoc'
WHERE id = :document_id
"""
),
{"document_id": document.Id},
)
await session.commit()
await session.refresh(document)
await session.refresh(documentFile)
runPayload: dict[str, Any] | None = None
shouldAutoRun = bool(autoRun)
if shouldAutoRun:
runPayload = await self.CreateRun(
documentId=document.Id,
ruleVersionId=ruleVersionId,
speed=speed,
force=False,
triggerUserId=createdBy,
)
return {
"documentId": document.Id,
"fileId": documentFile.Id,
"fileName": documentFile.fileName,
"region": resolvedRegion,
"engineType": "govdoc",
"processingStatus": "processing" if runPayload else (document.processingStatus or "waiting"),
"autoRunTriggered": shouldAutoRun,
"latestRunId": runPayload["runId"] if runPayload else None,
"run": runPayload,
}
async def ListDocuments(
self,
page: int = 1,
pageSize: int = 20,
keyword: str | None = None,
fileExt: str | None = None,
region: str | None = None,
status: str | None = None,
resultStatus: str | None = None,
createdBy: int | None = None,
dateFrom: str | None = None,
dateTo: str | None = None,
userId: int | None = None,
) -> dict[str, Any]:
if userId is None:
raise LeauditException(StatusCodeEnum.HTTP_401_UNAUTHORIZED, "当前用户未登录")
currentUser = await self._getCurrentUserContext(userId)
page = max(1, int(page))
pageSize = max(1, min(int(pageSize), 100))
offset = (page - 1) * pageSize
async with GetAsyncSession() as session:
await self._ensureGovdocSchema(session)
params: dict[str, Any] = {
"limit": pageSize,
"offset": offset,
}
filters = [
"d.deleted_at IS NULL",
"f.deleted_at IS NULL",
"f.is_active = true",
"f.file_role = 'original'",
"COALESCE(d.engine_type, 'leaudit') = 'govdoc'",
]
filters.extend(
self._buildDocumentScopeFilters(
CurrentUserId=userId,
CurrentUser=currentUser,
Params=params,
DocumentAlias="d",
FileAlias="f",
RequestedRegion=region,
RequestedUserId=createdBy,
)
)
if keyword:
filters.append("(f.file_name ILIKE :keyword OR COALESCE(d.normalized_name, '') ILIKE :keyword)")
params["keyword"] = f"%{keyword.strip()}%"
if fileExt:
normalizedExt = fileExt.strip().lstrip(".").lower()
if normalizedExt:
filters.append("LOWER(COALESCE(f.file_ext, '')) = :file_ext")
params["file_ext"] = normalizedExt
if status:
filters.append("COALESCE(d.processing_status, '') = :status")
params["status"] = status.strip()
if resultStatus:
filters.append("COALESCE(gr.result_status, '') = :result_status")
params["result_status"] = resultStatus.strip()
parsedDateFrom = self._parse_date_filter(dateFrom, "dateFrom")
parsedDateTo = self._parse_date_filter(dateTo, "dateTo")
if parsedDateFrom:
filters.append("d.created_at::date >= :date_from")
params["date_from"] = parsedDateFrom
if parsedDateTo:
filters.append("d.created_at::date <= :date_to")
params["date_to"] = parsedDateTo
whereClause = " AND ".join(filters)
async with GetAsyncSession() as session:
await self._backfill_missing_version_groups(session)
baseSelect = f"""
WITH effective_docs AS (
SELECT
d.id AS document_id,
COALESCE(d.region, 'default') AS region,
COALESCE(d.processing_status, 'waiting') AS processing_status,
d.current_run_id,
COALESCE(NULLIF(d.version_group_key, ''), fallback_vc.derived_version_group_key, '') AS version_group_key,
CASE
WHEN COALESCE(d.version_group_key, '') = ''
THEN COALESCE(fallback_vc.derived_version_no, 1)
ELSE COALESCE(NULLIF(d.version_no, 0), 1)
END AS version_no,
CASE
WHEN COALESCE(d.version_group_key, '') = ''
THEN fallback_vc.derived_previous_version_id
ELSE d.previous_version_id
END AS previous_version_id,
CASE
WHEN COALESCE(d.version_group_key, '') = ''
THEN COALESCE(fallback_vc.derived_root_version_id, d.id)
ELSE COALESCE(d.root_version_id, d.id)
END AS root_version_id,
CASE
WHEN COALESCE(d.version_group_key, '') = ''
THEN COALESCE(fallback_vc.derived_is_latest_version, COALESCE(d.is_latest_version, true))
ELSE COALESCE(d.is_latest_version, true)
END AS is_latest_version,
COALESCE(d.normalized_name, '') AS normalized_name,
COALESCE(vc.total_versions, fallback_vc.total_versions, 1) AS total_versions,
fallback_vc.derived_version_no,
d.created_at,
d.updated_at,
f.id AS file_id,
f.file_name,
f.file_ext,
f.mime_type,
f.file_size,
f.oss_url,
f.created_by,
gr.result_status,
gr.total_score,
gr.passed_count,
gr.failed_count,
gr.skipped_count,
gr.rules_path,
COALESCE(fc.finding_count, 0) AS finding_count,
COALESCE(fc.error_count, 0) AS error_count,
COALESCE(fc.warning_count, 0) AS warning_count,
COALESCE(fc.info_count, 0) AS info_count,
EXISTS(
SELECT 1
FROM govdoc_report_artifacts gra
WHERE gra.run_id = d.current_run_id
AND gra.artifact_type = 'html_report'
AND gra.deleted_at IS NULL
) AS has_html_report,
EXISTS(
SELECT 1
FROM govdoc_report_artifacts gra
WHERE gra.run_id = d.current_run_id
AND gra.artifact_type = 'annotated_docx'
AND gra.deleted_at IS NULL
) AS has_docx_report
FROM leaudit_documents d
JOIN leaudit_document_files f
ON f.document_id = d.id
AND f.is_active = true
AND f.file_role = 'original'
AND f.deleted_at IS NULL
LEFT JOIN govdoc_runs gr
ON gr.id = d.current_run_id
LEFT JOIN (
SELECT
run_id,
COUNT(*) FILTER (WHERE result = 'fail') AS finding_count,
COUNT(*) FILTER (WHERE result = 'fail' AND severity = 'error') AS error_count,
COUNT(*) FILTER (WHERE result = 'fail' AND severity = 'warning') AS warning_count,
COUNT(*) FILTER (WHERE result = 'fail' AND severity = 'info') AS info_count
FROM govdoc_rule_results
WHERE deleted_at IS NULL
GROUP BY run_id
) fc
ON fc.run_id = d.current_run_id
LEFT JOIN (
SELECT version_group_key, COUNT(*) AS total_versions
FROM leaudit_documents
WHERE deleted_at IS NULL
AND COALESCE(version_group_key, '') <> ''
GROUP BY version_group_key
) vc
ON vc.version_group_key = d.version_group_key
LEFT JOIN (
SELECT
d2.id AS document_id,
COUNT(*) OVER (
PARTITION BY d2.region, COALESCE(d2.normalized_name, ''), COALESCE(f2.file_ext, '')
) AS total_versions,
ROW_NUMBER() OVER (
PARTITION BY d2.region, COALESCE(d2.normalized_name, ''), COALESCE(f2.file_ext, '')
ORDER BY d2.created_at ASC, d2.id ASC
) AS derived_version_no,
LAG(d2.id) OVER (
PARTITION BY d2.region, COALESCE(d2.normalized_name, ''), COALESCE(f2.file_ext, '')
ORDER BY d2.created_at ASC, d2.id ASC
) AS derived_previous_version_id,
FIRST_VALUE(d2.id) OVER (
PARTITION BY d2.region, COALESCE(d2.normalized_name, ''), COALESCE(f2.file_ext, '')
ORDER BY d2.created_at ASC, d2.id ASC
) AS derived_root_version_id,
CASE
WHEN ROW_NUMBER() OVER (
PARTITION BY d2.region, COALESCE(d2.normalized_name, ''), COALESCE(f2.file_ext, '')
ORDER BY d2.created_at DESC, d2.id DESC
) = 1 THEN true
ELSE false
END AS derived_is_latest_version,
md5(CONCAT_WS('|', d2.region, COALESCE(d2.normalized_name, ''), COALESCE(f2.file_ext, ''))) AS derived_version_group_key
FROM leaudit_documents d2
JOIN leaudit_document_files f2
ON f2.document_id = d2.id
AND f2.is_active = true
AND f2.file_role = 'original'
AND f2.deleted_at IS NULL
WHERE d2.deleted_at IS NULL
AND d2.review_scope = 'govdoc'
AND COALESCE(d2.engine_type, 'leaudit') = 'govdoc'
) fallback_vc
ON fallback_vc.document_id = d.id
WHERE {whereClause}
)
"""
rows = (
await session.execute(
text(
f"""
{baseSelect}
SELECT *
FROM effective_docs
WHERE is_latest_version = true
ORDER BY created_at DESC, document_id DESC
LIMIT :limit OFFSET :offset
"""
),
params,
)
).mappings().all()
total = int(
(
await session.execute(
text(
f"""
{baseSelect}
SELECT COUNT(1)
FROM effective_docs
WHERE is_latest_version = true
"""
),
params,
)
).scalar_one()
)
historyRowsByGroup: dict[str, list[dict[str, Any]]] = {}
versionGroupKeys = [
str(row["version_group_key"])
for row in rows
if row.get("version_group_key") and int(row.get("total_versions") or 1) > 1
]
if versionGroupKeys:
historyRows = (
await session.execute(
text(
f"""
{baseSelect}
SELECT *
FROM effective_docs
WHERE version_group_key IN :version_group_keys
AND is_latest_version = false
ORDER BY created_at DESC, document_id DESC
"""
).bindparams(bindparam("version_group_keys", expanding=True)),
{"version_group_keys": versionGroupKeys, **params},
)
).mappings().all()
for historyRow in historyRows:
groupKey = str(historyRow.get("version_group_key") or "")
if not groupKey:
continue
historyRowsByGroup.setdefault(groupKey, []).append(dict(historyRow))
items = []
for row in rows:
mapped = self._map_document_row(row)
item = await self._build_document_list_item(mapped)
groupKey = mapped.versionGroupKey or ""
historyItems = [
await self._build_document_list_item(self._map_document_row(historyRow))
for historyRow in historyRowsByGroup.get(groupKey, [])
]
item["historyCount"] = len(historyItems)
item["historyVersions"] = historyItems
items.append(item)
return {"items": items, "total": total, "page": page, "pageSize": pageSize}
async def GetDocumentDetail(self, documentId: int, userId: int | None = None) -> dict[str, Any]:
if userId is None:
raise LeauditException(StatusCodeEnum.HTTP_401_UNAUTHORIZED, "当前用户未登录")
currentUser = await self._getCurrentUserContext(userId)
params: dict[str, Any] = {"document_id": documentId, "limit": 20}
filters = [
"d.id = :document_id",
"d.deleted_at IS NULL",
"f.deleted_at IS NULL",
"f.is_active = true",
"f.file_role = 'original'",
"COALESCE(d.engine_type, 'leaudit') = 'govdoc'",
]
filters.extend(
self._buildDocumentScopeFilters(
CurrentUserId=userId,
CurrentUser=currentUser,
Params=params,
DocumentAlias="d",
FileAlias="f",
)
)
whereClause = " AND ".join(filters)
async with GetAsyncSession() as session:
await self._ensureGovdocSchema(session)
await self._backfill_missing_version_groups(session)
row = (
await session.execute(
text(
f"""
SELECT
d.id AS document_id,
COALESCE(d.region, 'default') AS region,
COALESCE(d.processing_status, 'waiting') AS processing_status,
d.current_run_id,
COALESCE(NULLIF(d.version_group_key, ''), fallback_vc.derived_version_group_key, '') AS version_group_key,
CASE
WHEN COALESCE(d.version_group_key, '') = ''
THEN COALESCE(fallback_vc.derived_version_no, 1)
ELSE COALESCE(NULLIF(d.version_no, 0), 1)
END AS version_no,
CASE
WHEN COALESCE(d.version_group_key, '') = ''
THEN fallback_vc.derived_previous_version_id
ELSE d.previous_version_id
END AS previous_version_id,
CASE
WHEN COALESCE(d.version_group_key, '') = ''
THEN COALESCE(fallback_vc.derived_root_version_id, d.id)
ELSE COALESCE(d.root_version_id, d.id)
END AS root_version_id,
CASE
WHEN COALESCE(d.version_group_key, '') = ''
THEN COALESCE(fallback_vc.derived_is_latest_version, COALESCE(d.is_latest_version, true))
ELSE COALESCE(d.is_latest_version, true)
END AS is_latest_version,
COALESCE(d.normalized_name, '') AS normalized_name,
COALESCE(vc.total_versions, fallback_vc.total_versions, 1) AS total_versions,
fallback_vc.derived_version_no,
d.created_at,
d.updated_at,
f.id AS file_id,
f.file_name,
f.file_ext,
f.mime_type,
f.file_size,
f.oss_url,
f.created_by,
gr.result_status,
gr.total_score,
gr.passed_count,
gr.failed_count,
gr.skipped_count,
gr.rules_path,
COALESCE(fc.finding_count, 0) AS finding_count,
COALESCE(fc.error_count, 0) AS error_count,
COALESCE(fc.warning_count, 0) AS warning_count,
COALESCE(fc.info_count, 0) AS info_count,
EXISTS(
SELECT 1
FROM govdoc_report_artifacts gra
WHERE gra.run_id = d.current_run_id
AND gra.artifact_type = 'html_report'
AND gra.deleted_at IS NULL
) AS has_html_report,
EXISTS(
SELECT 1
FROM govdoc_report_artifacts gra
WHERE gra.run_id = d.current_run_id
AND gra.artifact_type = 'annotated_docx'
AND gra.deleted_at IS NULL
) AS has_docx_report
FROM leaudit_documents d
JOIN leaudit_document_files f
ON f.document_id = d.id
AND f.is_active = true
AND f.file_role = 'original'
AND f.deleted_at IS NULL
LEFT JOIN govdoc_runs gr
ON gr.id = d.current_run_id
LEFT JOIN (
SELECT
run_id,
COUNT(*) FILTER (WHERE result = 'fail') AS finding_count,
COUNT(*) FILTER (WHERE result = 'fail' AND severity = 'error') AS error_count,
COUNT(*) FILTER (WHERE result = 'fail' AND severity = 'warning') AS warning_count,
COUNT(*) FILTER (WHERE result = 'fail' AND severity = 'info') AS info_count
FROM govdoc_rule_results
WHERE deleted_at IS NULL
GROUP BY run_id
) fc
ON fc.run_id = d.current_run_id
LEFT JOIN (
SELECT version_group_key, COUNT(*) AS total_versions
FROM leaudit_documents
WHERE deleted_at IS NULL
AND COALESCE(version_group_key, '') <> ''
GROUP BY version_group_key
) vc
ON vc.version_group_key = d.version_group_key
LEFT JOIN (
SELECT
d2.id AS document_id,
COUNT(*) OVER (
PARTITION BY d2.region, COALESCE(d2.normalized_name, ''), COALESCE(f2.file_ext, '')
) AS total_versions,
ROW_NUMBER() OVER (
PARTITION BY d2.region, COALESCE(d2.normalized_name, ''), COALESCE(f2.file_ext, '')
ORDER BY d2.created_at ASC, d2.id ASC
) AS derived_version_no,
LAG(d2.id) OVER (
PARTITION BY d2.region, COALESCE(d2.normalized_name, ''), COALESCE(f2.file_ext, '')
ORDER BY d2.created_at ASC, d2.id ASC
) AS derived_previous_version_id,
FIRST_VALUE(d2.id) OVER (
PARTITION BY d2.region, COALESCE(d2.normalized_name, ''), COALESCE(f2.file_ext, '')
ORDER BY d2.created_at ASC, d2.id ASC
) AS derived_root_version_id,
CASE
WHEN ROW_NUMBER() OVER (
PARTITION BY d2.region, COALESCE(d2.normalized_name, ''), COALESCE(f2.file_ext, '')
ORDER BY d2.created_at DESC, d2.id DESC
) = 1 THEN true
ELSE false
END AS derived_is_latest_version,
md5(CONCAT_WS('|', d2.region, COALESCE(d2.normalized_name, ''), COALESCE(f2.file_ext, ''))) AS derived_version_group_key
FROM leaudit_documents d2
JOIN leaudit_document_files f2
ON f2.document_id = d2.id
AND f2.is_active = true
AND f2.file_role = 'original'
AND f2.deleted_at IS NULL
WHERE d2.deleted_at IS NULL
AND d2.review_scope = 'govdoc'
AND COALESCE(d2.engine_type, 'leaudit') = 'govdoc'
) fallback_vc
ON fallback_vc.document_id = d.id
WHERE {whereClause}
LIMIT 1
"""
),
params,
)
).mappings().first()
if not row:
raise LeauditException(StatusCodeEnum.HTTP_404_NOT_FOUND, "公文文档不存在或无权访问")
runs = (
await session.execute(
text(
"""
SELECT
id,
document_id,
status,
phase,
result_status,
total_score,
passed_count,
failed_count,
skipped_count,
error_message,
created_at,
updated_at,
started_at,
finished_at
FROM govdoc_runs
WHERE document_id = :document_id
AND deleted_at IS NULL
ORDER BY id DESC
LIMIT :limit
"""
),
params,
)
).mappings().all()
artifactRows = (
await session.execute(
text(
"""
SELECT run_id, artifact_type, file_name, file_ext, mime_type, oss_url, description
FROM govdoc_report_artifacts
WHERE run_id = ANY(
SELECT id
FROM govdoc_runs
WHERE document_id = :document_id
AND deleted_at IS NULL
)
AND deleted_at IS NULL
ORDER BY id DESC
"""
),
{"document_id": documentId},
)
).mappings().all()
mapped = self._map_document_row(row)
runItems = [self._build_run_summary(item) for item in runs]
latestRunId = mapped.currentRunId or (runItems[0]["runId"] if runItems else None)
latestRun = next((item for item in runItems if item["runId"] == latestRunId), runItems[0] if runItems else None)
artifactsByRun = self._group_artifacts_by_run(artifactRows)
return {
"documentId": mapped.documentId,
"latestRunId": latestRunId,
"document": {
"documentId": mapped.documentId,
"fileId": mapped.fileId,
"filename": mapped.fileName,
"fileExt": mapped.fileExt,
"mimeType": mapped.mimeType,
"fileSize": mapped.fileSize,
"region": mapped.region,
"processingStatus": mapped.processingStatus,
"versionGroupKey": mapped.versionGroupKey,
"versionNo": mapped.versionNo,
"totalVersions": mapped.totalVersions,
"previousVersionId": mapped.previousVersionId,
"rootVersionId": mapped.rootVersionId,
"isLatestVersion": mapped.isLatestVersion,
"createdAt": self._iso(mapped.createdAt),
"updatedAt": self._iso(mapped.updatedAt),
},
"latestRun": latestRun,
"currentRun": latestRun,
"runs": runItems,
"reports": artifactsByRun.get(int(latestRunId), {}) if latestRunId else {},
}
async def UpdateDocument(self, documentId: int, body: dict[str, Any], userId: int | None = None) -> dict[str, Any]:
raise LeauditException(StatusCodeEnum.HTTP_403_FORBIDDEN, "当前阶段暂不开放修改公文信息")
async def DeleteDocument(self, documentId: int, userId: int | None = None) -> dict[str, Any]:
if userId is None:
raise LeauditException(StatusCodeEnum.HTTP_401_UNAUTHORIZED, "当前用户未登录")
currentUser = await self._getCurrentUserContext(userId)
params: dict[str, Any] = {"document_id": documentId}
filters = ["d.id = :document_id", "d.deleted_at IS NULL", "f.is_active = true", "f.file_role = 'original'"]
filters.extend(
self._buildDocumentScopeFilters(
CurrentUserId=userId,
CurrentUser=currentUser,
Params=params,
DocumentAlias="d",
FileAlias="f",
)
)
whereClause = " AND ".join(filters)
async with GetAsyncSession() as session:
await self._ensureGovdocSchema(session)
row = (
await session.execute(
text(
f"""
SELECT d.id
FROM leaudit_documents d
JOIN leaudit_document_files f
ON f.document_id = d.id
AND f.is_active = true
AND f.file_role = 'original'
AND f.deleted_at IS NULL
WHERE {whereClause}
LIMIT 1
"""
),
params,
)
).first()
if not row:
raise LeauditException(StatusCodeEnum.HTTP_404_NOT_FOUND, "公文文档不存在或无权访问")
await session.execute(
text("UPDATE leaudit_documents SET deleted_at = now(), updated_at = now() WHERE id = :document_id"),
{"document_id": documentId},
)
await session.commit()
return {"documentId": documentId, "deleted": True}
# ── 审查运行 ──────────────────────────────────────────
async def CreateRun(
self,
documentId: int,
ruleVersionId: int | None = None,
speed: str = "normal",
force: bool = False,
triggerUserId: int | None = None,
) -> dict[str, Any]:
if triggerUserId is None:
raise LeauditException(StatusCodeEnum.HTTP_401_UNAUTHORIZED, "当前用户未登录")
async with GetAsyncSession() as session:
await self._ensureGovdocSchema(session)
currentUser = await self._getCurrentUserContext(triggerUserId)
documentMeta = await self._get_document_for_run(documentId, triggerUserId, currentUser)
if documentMeta.currentRunId and not force:
currentRun = await self.GetRunStatus(documentMeta.currentRunId)
if currentRun["status"] in {"pending", "processing"}:
return {
"runId": documentMeta.currentRunId,
"documentId": documentId,
"status": currentRun["status"],
"phase": currentRun.get("phase"),
"reused": True,
}
runId = await self.Storage.CreateRun(
{
"documentId": documentId,
"documentFileId": documentMeta.fileId,
"runNo": await self._next_run_no(documentId),
"triggerSource": "upload" if not documentMeta.currentRunId else "manual",
"triggerUserId": triggerUserId,
}
)
rulesPath = await self._resolve_rules_path()
await self.Storage.UpdateDocumentStatus(documentId, "processing", runId)
task = dispatch_govdoc_task(
documentId=documentId,
runId=runId,
rulesPath=rulesPath,
triggerUserId=triggerUserId,
speed=speed,
)
async with GetAsyncSession() as session:
await session.execute(
text("UPDATE govdoc_runs SET task_id = :task_id, started_at = now(), updated_at = now() WHERE id = :run_id"),
{"task_id": str(getattr(task, "id", "") or ""), "run_id": runId},
)
await session.commit()
return {
"runId": runId,
"documentId": documentId,
"status": "pending",
"phase": "dispatch",
"taskId": str(getattr(task, "id", "") or ""),
}
async def GetRunStatus(self, runId: int) -> dict[str, Any]:
async with GetAsyncSession() as session:
await self._ensureGovdocSchema(session)
row = (
await session.execute(
text(
"""
SELECT
id,
document_id,
status,
phase,
result_status,
total_score,
passed_count,
failed_count,
skipped_count,
error_message,
task_id,
created_at,
updated_at,
started_at,
finished_at
FROM govdoc_runs
WHERE id = :run_id
AND deleted_at IS NULL
LIMIT 1
"""
),
{"run_id": runId},
)
).mappings().first()
if not row:
raise LeauditException(StatusCodeEnum.HTTP_404_NOT_FOUND, "审查运行不存在")
return self._build_run_summary(row)
# ── 结果与报告 ────────────────────────────────────────
async def GetRunResult(self, runId: int) -> dict[str, Any]:
"""从 govdoc_runs + govdoc_rule_results 读取审查结果,含 structure/outline。"""
async with GetAsyncSession() as session:
await self._ensureGovdocSchema(session)
runRow = (
await session.execute(
text(
"""
SELECT
id,
document_id,
status,
phase,
total_score,
passed_count,
failed_count,
skipped_count,
result_status,
result_summary_json,
started_at,
finished_at
FROM govdoc_runs
WHERE id = :run_id
AND deleted_at IS NULL
LIMIT 1
"""
),
{"run_id": runId},
)
).mappings().first()
if not runRow:
raise LeauditException(StatusCodeEnum.HTTP_404_NOT_FOUND, "审查运行不存在")
documentRow = (
await session.execute(
text(
"""
SELECT
d.id AS document_id,
f.file_name
FROM leaudit_documents d
JOIN leaudit_document_files f
ON f.document_id = d.id
AND f.is_active = true
AND f.file_role = 'original'
AND f.deleted_at IS NULL
WHERE d.id = :document_id
AND d.deleted_at IS NULL
LIMIT 1
"""
),
{"document_id": int(runRow["document_id"])},
)
).mappings().first()
rulesRows = (
await session.execute(
text(
"""
SELECT
rule_id,
rule_name,
severity,
category,
result,
skip_reason,
message,
suggestion,
actual,
expected,
evidence,
paragraph_index,
paragraph_text,
location_path,
score
FROM govdoc_rule_results
WHERE run_id = :run_id
AND deleted_at IS NULL
ORDER BY id ASC
"""
),
{"run_id": runId},
)
).mappings().all()
aux = self._parse_json(runRow.get("result_summary_json"))
findings = []
checkedRules = []
severityStats: dict[str, int] = {}
categoryStats: dict[str, int] = {}
seenRuleIds: set[str] = set()
for rr in rulesRows:
location = {
"paragraph_index": rr["paragraph_index"] or 0,
"role": rr.get("location_path"),
"char_start": 0,
"char_end": 0,
"context": rr.get("paragraph_text") or "",
}
if rr.get("result") == "fail":
severity = str(rr.get("severity") or "info")
category = str(rr.get("category") or "")
severityStats[severity] = severityStats.get(severity, 0) + 1
if category:
categoryStats[category] = categoryStats.get(category, 0) + 1
findings.append(
{
"finding_id": f"{rr['rule_id']}-{rr.get('paragraph_index') or len(findings)}",
"rule_id": rr["rule_id"],
"rule_name": rr.get("rule_name") or rr["rule_id"],
"severity": severity,
"category": category,
"location": location,
"actual": self._parse_json(rr.get("actual")) or {},
"expected": self._parse_json(rr.get("expected")) or {},
"message": rr.get("message") or "",
"suggestion": rr.get("suggestion") or "",
"evidence": rr.get("evidence") or "",
"confidence": 1.0,
}
)
ruleId = str(rr["rule_id"])
if ruleId in seenRuleIds:
continue
seenRuleIds.add(ruleId)
status = str(rr.get("result") or "pass")
checkedRules.append(
{
"rule_id": ruleId,
"name": rr.get("rule_name") or ruleId,
"severity": rr.get("severity") or "info",
"category": rr.get("category") or "",
"status": status if status in {"pass", "fail", "skipped"} else "pass",
"skip_reason": rr.get("skip_reason") or "",
}
)
totalScore = float(runRow.get("total_score") or 0)
return {
"runId": runId,
"documentId": int(runRow["document_id"]),
"document": {
"documentId": int(runRow["document_id"]),
"filename": str(documentRow["file_name"] or "") if documentRow else "",
},
"summary": {
"score": totalScore,
"total_findings": len(findings),
"by_severity": severityStats,
"by_category": categoryStats,
"passed_count": int(runRow.get("passed_count") or 0),
"failed_count": int(runRow.get("failed_count") or 0),
"skipped_count": int(runRow.get("skipped_count") or 0),
},
"checkedRules": checkedRules,
"findings": findings,
"structure": aux.get("structure", []),
"outline": aux.get("outline", []),
"entities": aux.get("entities", {}),
}
async def GetRunFindings(self, runId: int) -> dict[str, Any]:
result = await self.GetRunResult(runId)
return {"runId": runId, "findings": result["findings"]}
async def GetRunEntities(self, runId: int) -> dict[str, Any]:
result = await self.GetRunResult(runId)
return {"runId": runId, "entities": result.get("entities", {})}
async def GetRunParagraphs(self, runId: int) -> str:
runStatus = await self.GetRunStatus(runId)
if runStatus["status"] != "completed":
raise LeauditException(
StatusCodeEnum.HTTP_409_CONFLICT,
"当前审查尚未完成,暂时无法加载文档视图",
)
paragraphArtifact = await self._get_report_artifact(runId, "paragraph_html")
if paragraphArtifact:
content = await self.OssService.DownloadBytes(str(paragraphArtifact["oss_url"]))
return content.decode("utf-8")
documentMeta = await self._get_document_for_read(int(runStatus["documentId"]))
fileRow = await self._get_active_original_file(documentMeta.documentId)
ossUrl = getattr(fileRow, "ossUrl", None) or fileRow.get("oss_url")
fileExt = getattr(fileRow, "fileExt", None) or fileRow.get("file_ext")
if not ossUrl:
raise LeauditException(StatusCodeEnum.HTTP_404_NOT_FOUND, "原始文档未找到可用存储地址")
if (fileExt or "").strip().lower() != "docx":
raise LeauditException(
StatusCodeEnum.HTTP_400_BAD_REQUEST,
"当前文档视图仅支持 DOCX 原文预览,请上传 DOCX 文件",
)
tempPath = await self.OssService.DownloadToTempFile(
Source=ossUrl,
Suffix=f".{fileExt or 'docx'}",
Prefix="govdoc-run-",
)
try:
doc = parse_docx(tempPath)
findingsResult = await self.GetRunFindings(runId)
findingMap: dict[int, list[str]] = {}
for finding in findingsResult["findings"]:
pi = int(finding.get("location", {}).get("paragraph_index") or 0)
findingMap.setdefault(pi, []).append(str(finding["finding_id"]))
return paragraphs_to_html(doc, findingMap)
finally:
try:
Path(tempPath).unlink(missing_ok=True)
except Exception:
pass
async def GetRunStructure(self, runId: int) -> dict[str, Any]:
result = await self.GetRunResult(runId)
return {"runId": runId, "structure": result.get("structure", [])}
async def GetRunOutline(self, runId: int) -> dict[str, Any]:
result = await self.GetRunResult(runId)
return {"runId": runId, "outline": result.get("outline", [])}
async def GetReportHtml(self, runId: int) -> dict[str, Any]:
result = await self.GetRunResult(runId)
html = render_html(self._build_audit_result_from_run_result(result))
return {"runId": runId, "html": html}
async def GetReportDocx(self, runId: int) -> dict[str, Any]:
artifact = await self._get_report_artifact(runId, "annotated_docx")
if not artifact:
return {"runId": runId, "docxUrl": ""}
return {
"runId": runId,
"docxUrl": await self.OssService.PresignGetUrl(str(artifact["oss_url"])),
}
async def DownloadOriginal(self, documentId: int) -> dict[str, Any]:
fileRow = await self._get_active_original_file(documentId)
ossUrl = getattr(fileRow, "ossUrl", None) or fileRow.get("oss_url")
if not ossUrl:
raise LeauditException(StatusCodeEnum.HTTP_404_NOT_FOUND, "原始文档不存在")
return {
"documentId": documentId,
"downloadUrl": await self.OssService.PresignGetUrl(str(ossUrl)),
}
# ── 规则 ──────────────────────────────────────────────
async def ListRules(self, rulesPath: str | None = None) -> dict[str, Any]:
"""从 govdoc 规则 YAML 文件加载规则清单。"""
rules = await self._load_rules_list(rulesPath)
return {"metadata": {}, "rules": rules, "total_rules": len(rules)}
async def GetRuleDetail(self, ruleId: str, rulesPath: str | None = None) -> dict[str, Any]:
"""获取单条规则完整详情(名称、严重度、stages、消息等)。"""
ruleset = await self._load_ruleset(rulesPath)
if ruleset is None:
return {"rule_id": ruleId, "name": ruleId, "severity": "info", "category": "", "group": ""}
for rule in ruleset.all_rules():
if rule.rule_id == ruleId:
return {
"rule_id": rule.rule_id,
"name": rule.name,
"severity": rule.severity,
"category": rule.category,
"group": "",
"applies_to": rule.applies_to.model_dump() if rule.applies_to else None,
"target": rule.target,
"on_missing": rule.on_missing,
"stages": [s.model_dump(exclude_none=True) for s in (rule.stages or [])],
"messages": rule.messages.model_dump() if rule.messages else {},
}
return {"rule_id": ruleId, "name": ruleId, "severity": "info", "category": "", "group": ""}
# ── 助手 ──────────────────────────────────────────────
async def _resolve_rules_path(self, rulesPath: str | None = None) -> str | None:
"""解析规则 YAML 文件路径。"""
if rulesPath:
return rulesPath
candidates = [
Path("/home/wren-dev/Porject/leaudit-platform/rules/govdoc/govdoc_general/rules.yaml"),
Path("/home/wren-dev/Porject/leaudit-platform/rules/govdoc_general/rules.yaml"),
]
for candidate in candidates:
if candidate.is_file():
return str(candidate)
return None
async def _load_ruleset(self, rulesPath: str | None = None):
resolved = await self._resolve_rules_path(rulesPath)
if not resolved:
logger.warning("[Govdoc] Cannot resolve rules path for GetRuleDetail/ListRules")
return None
from fastapi_modules.fastapi_leaudit.govdoc_engine.dsl.loader import load_rules
return load_rules(resolved)
async def _load_rules_list(self, rulesPath: str | None = None) -> list[dict[str, Any]]:
ruleset = await self._load_ruleset(rulesPath)
if ruleset is None:
return []
result = []
for rule in ruleset.all_rules():
result.append(
{
"rule_id": rule.rule_id,
"name": rule.name,
"severity": rule.severity,
"category": rule.category,
"group": "",
}
)
return result
async def _ensureGovdocSchema(self, session) -> None:
statements = [
"""
ALTER TABLE leaudit_documents
ADD COLUMN IF NOT EXISTS engine_type VARCHAR(32) NOT NULL DEFAULT 'leaudit'
""",
"""
CREATE TABLE IF NOT EXISTS public.govdoc_runs (
id BIGINT GENERATED ALWAYS AS IDENTITY PRIMARY KEY,
document_id BIGINT NOT NULL,
document_file_id BIGINT,
run_no INTEGER NOT NULL DEFAULT 1,
trigger_source VARCHAR(64) NOT NULL DEFAULT 'upload',
trigger_user_id BIGINT,
task_id VARCHAR(128),
status VARCHAR(64) NOT NULL DEFAULT 'pending',
phase VARCHAR(32),
engine_version VARCHAR(64),
llm_provider VARCHAR(64),
llm_model VARCHAR(128),
rules_path VARCHAR(1024),
total_score NUMERIC(10, 2),
passed_count INTEGER,
failed_count INTEGER,
skipped_count INTEGER,
result_status VARCHAR(32),
result_summary_json TEXT,
error_message TEXT,
started_at TIMESTAMPTZ,
finished_at TIMESTAMPTZ,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
deleted_at TIMESTAMPTZ DEFAULT NULL
)
""",
"""
CREATE TABLE IF NOT EXISTS public.govdoc_rule_results (
id BIGINT GENERATED ALWAYS AS IDENTITY PRIMARY KEY,
run_id BIGINT NOT NULL,
rule_id VARCHAR(128) NOT NULL,
rule_name VARCHAR(256),
severity VARCHAR(32),
category VARCHAR(128),
message TEXT,
suggestion TEXT,
actual TEXT,
expected TEXT,
evidence TEXT,
paragraph_index INTEGER,
paragraph_text TEXT,
location_path VARCHAR(512),
result VARCHAR(32) NOT NULL DEFAULT 'pass',
skip_reason TEXT,
score NUMERIC(10, 2),
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
deleted_at TIMESTAMPTZ DEFAULT NULL
)
""",
"""
CREATE TABLE IF NOT EXISTS public.govdoc_report_artifacts (
id BIGINT GENERATED ALWAYS AS IDENTITY PRIMARY KEY,
run_id BIGINT NOT NULL,
artifact_type VARCHAR(64) NOT NULL,
file_name VARCHAR(512) NOT NULL,
file_ext VARCHAR(32),
mime_type VARCHAR(128),
file_size BIGINT,
sha256 VARCHAR(64),
oss_url VARCHAR(2048),
storage_provider VARCHAR(32),
description VARCHAR(512),
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
deleted_at TIMESTAMPTZ DEFAULT NULL
)
""",
"""
CREATE INDEX IF NOT EXISTS idx_leaudit_documents_engine_type
ON public.leaudit_documents(engine_type) WHERE deleted_at IS NULL
""",
"""
CREATE INDEX IF NOT EXISTS idx_govdoc_runs_document_id
ON public.govdoc_runs(document_id) WHERE deleted_at IS NULL
""",
"""
CREATE INDEX IF NOT EXISTS idx_govdoc_runs_status
ON public.govdoc_runs(status) WHERE deleted_at IS NULL
""",
"""
CREATE INDEX IF NOT EXISTS idx_govdoc_runs_trigger_user_id
ON public.govdoc_runs(trigger_user_id)
""",
"""
CREATE INDEX IF NOT EXISTS idx_govdoc_rule_results_run_id
ON public.govdoc_rule_results(run_id) WHERE deleted_at IS NULL
""",
"""
CREATE INDEX IF NOT EXISTS idx_govdoc_rule_results_rule_id
ON public.govdoc_rule_results(rule_id) WHERE deleted_at IS NULL
""",
"""
CREATE INDEX IF NOT EXISTS idx_govdoc_rule_results_result
ON public.govdoc_rule_results(result) WHERE deleted_at IS NULL
""",
"""
CREATE INDEX IF NOT EXISTS idx_govdoc_rule_results_paragraph
ON public.govdoc_rule_results(run_id, paragraph_index) WHERE deleted_at IS NULL
""",
"""
CREATE INDEX IF NOT EXISTS idx_govdoc_report_artifacts_run_id
ON public.govdoc_report_artifacts(run_id) WHERE deleted_at IS NULL
""",
"""
CREATE INDEX IF NOT EXISTS idx_govdoc_report_artifacts_type
ON public.govdoc_report_artifacts(run_id, artifact_type) WHERE deleted_at IS NULL
""",
]
for statement in statements:
await session.execute(text(statement))
await session.commit()
async def _getCurrentUserContext(self, CurrentUserId: int) -> dict[str, Any]:
async with GetAsyncSession() as session:
await self._backfill_missing_version_groups(session)
row = (
await session.execute(
text(
"""
SELECT
u.id,
COALESCE(u.area, '') AS area,
COALESCE(bool_or(r.role_key IN ('super_admin', 'provincial_admin')), FALSE) AS is_global,
COALESCE(bool_or(r.role_key IN ('super_admin', 'provincial_admin', 'admin')), FALSE) AS can_manage,
COALESCE(bool_or(r.role_key = 'super_admin'), FALSE) AS is_super_admin
FROM sso_users u
LEFT JOIN user_role ur ON ur.user_id = u.id
LEFT JOIN roles r ON r.id = ur.role_id
WHERE u.id = :user_id
GROUP BY u.id, u.area
"""
),
{"user_id": CurrentUserId},
)
).mappings().first()
if not row:
raise LeauditException(StatusCodeEnum.HTTP_404_NOT_FOUND, "当前用户不存在")
return {
"area": str(row["area"] or ""),
"is_global": bool(row["is_global"]),
"can_manage": bool(row["can_manage"]),
"is_super_admin": bool(row["is_super_admin"]),
}
def _buildDocumentScopeFilters(
self,
CurrentUserId: int,
CurrentUser: dict[str, Any],
Params: dict[str, object],
DocumentAlias: str,
FileAlias: str,
RequestedRegion: str | None = None,
RequestedUserId: int | None = None,
) -> list[str]:
filters: list[str] = []
requestedRegion = (RequestedRegion or "").strip()
area = str(CurrentUser["area"] or "").strip()
if CurrentUser["is_global"]:
if requestedRegion:
filters.append(f"{DocumentAlias}.region = :requested_region")
Params["requested_region"] = requestedRegion
if RequestedUserId is not None:
filters.append(f"{FileAlias}.created_by = :requested_user_id")
Params["requested_user_id"] = RequestedUserId
return filters
if CurrentUser["can_manage"]:
if not area:
filters.append("1 = 0")
return filters
if requestedRegion and requestedRegion != area:
filters.append("1 = 0")
return filters
filters.append(f"{DocumentAlias}.region = :scope_region")
Params["scope_region"] = area
if RequestedUserId is not None:
filters.append(f"{FileAlias}.created_by = :requested_user_id")
Params["requested_user_id"] = RequestedUserId
return filters
filters.append(f"{FileAlias}.created_by = :scope_user_id")
Params["scope_user_id"] = CurrentUserId
if requestedRegion:
filters.append(f"{DocumentAlias}.region = :requested_region")
Params["requested_region"] = requestedRegion
if RequestedUserId is not None and RequestedUserId != CurrentUserId:
filters.append("1 = 0")
return filters
def _resolve_upload_region(self, currentUser: dict[str, Any], requestedRegion: str) -> str:
area = str(currentUser["area"] or "").strip()
if currentUser["is_global"]:
return requestedRegion or area or "default"
if currentUser["can_manage"]:
if area and requestedRegion and requestedRegion != area:
raise LeauditException(StatusCodeEnum.HTTP_403_FORBIDDEN, "不能上传到非本地区")
return area or requestedRegion or "default"
if area and requestedRegion and requestedRegion != area:
raise LeauditException(StatusCodeEnum.HTTP_403_FORBIDDEN, "不能上传到非本人地区")
return area or requestedRegion or "default"
def _normalize_document_name(self, fileName: str) -> str:
suffix = Path(fileName).suffix
return fileName[: -len(suffix)] if suffix else fileName
async def _get_document_for_run(
self,
documentId: int,
userId: int,
currentUser: dict[str, Any],
) -> _GovdocDocumentRow:
params: dict[str, Any] = {"document_id": documentId}
filters = [
"d.id = :document_id",
"d.deleted_at IS NULL",
"f.deleted_at IS NULL",
"f.is_active = true",
"f.file_role = 'original'",
"COALESCE(d.engine_type, 'leaudit') = 'govdoc'",
]
filters.extend(
self._buildDocumentScopeFilters(
CurrentUserId=userId,
CurrentUser=currentUser,
Params=params,
DocumentAlias="d",
FileAlias="f",
)
)
whereClause = " AND ".join(filters)
async with GetAsyncSession() as session:
await self._backfill_missing_version_groups(session)
row = (
await session.execute(
text(
f"""
SELECT
d.id AS document_id,
COALESCE(d.region, 'default') AS region,
COALESCE(d.processing_status, 'waiting') AS processing_status,
d.current_run_id,
COALESCE(NULLIF(d.version_group_key, ''), fallback_vc.derived_version_group_key, '') AS version_group_key,
CASE
WHEN COALESCE(d.version_group_key, '') = ''
THEN COALESCE(fallback_vc.derived_version_no, 1)
ELSE COALESCE(NULLIF(d.version_no, 0), 1)
END AS version_no,
CASE
WHEN COALESCE(d.version_group_key, '') = ''
THEN fallback_vc.derived_previous_version_id
ELSE d.previous_version_id
END AS previous_version_id,
CASE
WHEN COALESCE(d.version_group_key, '') = ''
THEN COALESCE(fallback_vc.derived_root_version_id, d.id)
ELSE COALESCE(d.root_version_id, d.id)
END AS root_version_id,
CASE
WHEN COALESCE(d.version_group_key, '') = ''
THEN COALESCE(fallback_vc.derived_is_latest_version, COALESCE(d.is_latest_version, true))
ELSE COALESCE(d.is_latest_version, true)
END AS is_latest_version,
COALESCE(d.normalized_name, '') AS normalized_name,
COALESCE(vc.total_versions, fallback_vc.total_versions, 1) AS total_versions,
fallback_vc.derived_version_no,
d.created_at,
d.updated_at,
f.id AS file_id,
f.file_name,
f.file_ext,
f.mime_type,
f.file_size,
f.oss_url,
f.created_by,
gr.result_status,
gr.total_score,
gr.passed_count,
gr.failed_count,
gr.skipped_count,
NULL::VARCHAR AS rules_path,
0 AS finding_count,
0 AS error_count,
0 AS warning_count,
0 AS info_count,
false AS has_html_report,
false AS has_docx_report
FROM leaudit_documents d
JOIN leaudit_document_files f
ON f.document_id = d.id
AND f.is_active = true
AND f.file_role = 'original'
AND f.deleted_at IS NULL
LEFT JOIN govdoc_runs gr
ON gr.id = d.current_run_id
LEFT JOIN (
SELECT version_group_key, COUNT(*) AS total_versions
FROM leaudit_documents
WHERE deleted_at IS NULL
AND COALESCE(version_group_key, '') <> ''
GROUP BY version_group_key
) vc
ON vc.version_group_key = d.version_group_key
LEFT JOIN (
SELECT
d2.id AS document_id,
COUNT(*) OVER (
PARTITION BY d2.region, COALESCE(d2.normalized_name, ''), COALESCE(f2.file_ext, '')
) AS total_versions,
ROW_NUMBER() OVER (
PARTITION BY d2.region, COALESCE(d2.normalized_name, ''), COALESCE(f2.file_ext, '')
ORDER BY d2.created_at ASC, d2.id ASC
) AS derived_version_no,
LAG(d2.id) OVER (
PARTITION BY d2.region, COALESCE(d2.normalized_name, ''), COALESCE(f2.file_ext, '')
ORDER BY d2.created_at ASC, d2.id ASC
) AS derived_previous_version_id,
FIRST_VALUE(d2.id) OVER (
PARTITION BY d2.region, COALESCE(d2.normalized_name, ''), COALESCE(f2.file_ext, '')
ORDER BY d2.created_at ASC, d2.id ASC
) AS derived_root_version_id,
CASE
WHEN ROW_NUMBER() OVER (
PARTITION BY d2.region, COALESCE(d2.normalized_name, ''), COALESCE(f2.file_ext, '')
ORDER BY d2.created_at DESC, d2.id DESC
) = 1 THEN true
ELSE false
END AS derived_is_latest_version,
md5(CONCAT_WS('|', d2.region, COALESCE(d2.normalized_name, ''), COALESCE(f2.file_ext, ''))) AS derived_version_group_key
FROM leaudit_documents d2
JOIN leaudit_document_files f2
ON f2.document_id = d2.id
AND f2.is_active = true
AND f2.file_role = 'original'
AND f2.deleted_at IS NULL
WHERE d2.deleted_at IS NULL
AND d2.review_scope = 'govdoc'
AND COALESCE(d2.engine_type, 'leaudit') = 'govdoc'
) fallback_vc
ON fallback_vc.document_id = d.id
WHERE {whereClause}
LIMIT 1
"""
),
params,
)
).mappings().first()
if not row:
raise LeauditException(StatusCodeEnum.HTTP_404_NOT_FOUND, "公文文档不存在或无权访问")
return self._map_document_row(row)
async def _get_document_for_read(self, documentId: int) -> _GovdocDocumentRow:
async with GetAsyncSession() as session:
row = (
await session.execute(
text(
"""
SELECT
d.id AS document_id,
COALESCE(d.region, 'default') AS region,
COALESCE(d.processing_status, 'waiting') AS processing_status,
d.current_run_id,
COALESCE(NULLIF(d.version_group_key, ''), fallback_vc.derived_version_group_key, '') AS version_group_key,
CASE
WHEN COALESCE(d.version_group_key, '') = ''
THEN COALESCE(fallback_vc.derived_version_no, 1)
ELSE COALESCE(NULLIF(d.version_no, 0), 1)
END AS version_no,
CASE
WHEN COALESCE(d.version_group_key, '') = ''
THEN fallback_vc.derived_previous_version_id
ELSE d.previous_version_id
END AS previous_version_id,
CASE
WHEN COALESCE(d.version_group_key, '') = ''
THEN COALESCE(fallback_vc.derived_root_version_id, d.id)
ELSE COALESCE(d.root_version_id, d.id)
END AS root_version_id,
CASE
WHEN COALESCE(d.version_group_key, '') = ''
THEN COALESCE(fallback_vc.derived_is_latest_version, COALESCE(d.is_latest_version, true))
ELSE COALESCE(d.is_latest_version, true)
END AS is_latest_version,
COALESCE(d.normalized_name, '') AS normalized_name,
COALESCE(vc.total_versions, fallback_vc.total_versions, 1) AS total_versions,
fallback_vc.derived_version_no,
d.created_at,
d.updated_at,
f.id AS file_id,
f.file_name,
f.file_ext,
f.mime_type,
f.file_size,
f.oss_url,
f.created_by,
gr.result_status,
gr.total_score,
gr.passed_count,
gr.failed_count,
gr.skipped_count,
NULL::VARCHAR AS rules_path,
0 AS finding_count,
0 AS error_count,
0 AS warning_count,
0 AS info_count,
false AS has_html_report,
false AS has_docx_report
FROM leaudit_documents d
JOIN leaudit_document_files f
ON f.document_id = d.id
AND f.is_active = true
AND f.file_role = 'original'
AND f.deleted_at IS NULL
LEFT JOIN govdoc_runs gr
ON gr.id = d.current_run_id
LEFT JOIN (
SELECT version_group_key, COUNT(*) AS total_versions
FROM leaudit_documents
WHERE deleted_at IS NULL
AND COALESCE(version_group_key, '') <> ''
GROUP BY version_group_key
) vc
ON vc.version_group_key = d.version_group_key
LEFT JOIN (
SELECT
d2.id AS document_id,
COUNT(*) OVER (
PARTITION BY d2.region, COALESCE(d2.normalized_name, ''), COALESCE(f2.file_ext, '')
) AS total_versions,
ROW_NUMBER() OVER (
PARTITION BY d2.region, COALESCE(d2.normalized_name, ''), COALESCE(f2.file_ext, '')
ORDER BY d2.created_at ASC, d2.id ASC
) AS derived_version_no,
LAG(d2.id) OVER (
PARTITION BY d2.region, COALESCE(d2.normalized_name, ''), COALESCE(f2.file_ext, '')
ORDER BY d2.created_at ASC, d2.id ASC
) AS derived_previous_version_id,
FIRST_VALUE(d2.id) OVER (
PARTITION BY d2.region, COALESCE(d2.normalized_name, ''), COALESCE(f2.file_ext, '')
ORDER BY d2.created_at ASC, d2.id ASC
) AS derived_root_version_id,
CASE
WHEN ROW_NUMBER() OVER (
PARTITION BY d2.region, COALESCE(d2.normalized_name, ''), COALESCE(f2.file_ext, '')
ORDER BY d2.created_at DESC, d2.id DESC
) = 1 THEN true
ELSE false
END AS derived_is_latest_version,
md5(CONCAT_WS('|', d2.region, COALESCE(d2.normalized_name, ''), COALESCE(f2.file_ext, ''))) AS derived_version_group_key
FROM leaudit_documents d2
JOIN leaudit_document_files f2
ON f2.document_id = d2.id
AND f2.is_active = true
AND f2.file_role = 'original'
AND f2.deleted_at IS NULL
WHERE d2.deleted_at IS NULL
AND d2.review_scope = 'govdoc'
AND COALESCE(d2.engine_type, 'leaudit') = 'govdoc'
) fallback_vc
ON fallback_vc.document_id = d.id
WHERE d.id = :document_id
AND d.deleted_at IS NULL
AND COALESCE(d.engine_type, 'leaudit') = 'govdoc'
LIMIT 1
"""
),
{"document_id": documentId},
)
).mappings().first()
if not row:
raise LeauditException(StatusCodeEnum.HTTP_404_NOT_FOUND, "公文文档不存在")
return self._map_document_row(row)
async def _get_active_original_file(self, documentId: int):
async with GetAsyncSession() as session:
row = (
await session.execute(
text(
"""
SELECT id, document_id, file_name, file_ext, mime_type, file_size, oss_url, created_by
FROM leaudit_document_files
WHERE document_id = :document_id
AND is_active = true
AND file_role = 'original'
AND deleted_at IS NULL
ORDER BY id DESC
LIMIT 1
"""
),
{"document_id": documentId},
)
).mappings().first()
if not row:
raise LeauditException(StatusCodeEnum.HTTP_404_NOT_FOUND, "原始文档不存在")
return row
async def _next_run_no(self, documentId: int) -> int:
async with GetAsyncSession() as session:
current = (
await session.execute(
text(
"""
SELECT COALESCE(MAX(run_no), 0)
FROM govdoc_runs
WHERE document_id = :document_id
AND deleted_at IS NULL
"""
),
{"document_id": documentId},
)
).scalar_one()
return int(current or 0) + 1
def _build_run_summary(self, row: Any) -> dict[str, Any]:
summary = self._build_summary_payload(
row.get("total_score"),
row.get("passed_count"),
row.get("failed_count"),
row.get("skipped_count"),
)
return {
"runId": int(row["id"]),
"documentId": int(row["document_id"]),
"status": str(row.get("status") or "pending"),
"phase": row.get("phase"),
"resultStatus": row.get("result_status"),
"score": float(row.get("total_score") or 0),
"passedCount": int(row.get("passed_count") or 0),
"failedCount": int(row.get("failed_count") or 0),
"skippedCount": int(row.get("skipped_count") or 0),
"summary": summary,
"errorMessage": row.get("error_message"),
"taskId": row.get("task_id"),
"createdAt": self._iso(row.get("created_at")),
"updatedAt": self._iso(row.get("updated_at")),
"startedAt": self._iso(row.get("started_at")),
"finishedAt": self._iso(row.get("finished_at")),
}
def _build_summary_payload(
self,
totalScore: Any,
passedCount: Any,
failedCount: Any,
skippedCount: Any,
totalFindings: Any | None = None,
bySeverity: dict[str, int] | None = None,
byCategory: dict[str, int] | None = None,
) -> dict[str, Any]:
return {
"score": float(totalScore or 0),
"total_findings": int(totalFindings if totalFindings is not None else (failedCount or 0)),
"by_severity": bySeverity or {},
"by_category": byCategory or {},
"passed_count": int(passedCount or 0),
"failed_count": int(failedCount or 0),
"skipped_count": int(skippedCount or 0),
}
def _group_artifacts_by_run(self, rows: list[Any]) -> dict[int, dict[str, Any]]:
grouped: dict[int, dict[str, Any]] = {}
artifactTypeMap = {
"html_report": "htmlUrl",
"annotated_docx": "docxUrl",
"paragraph_html": "paragraphHtmlUrl",
}
for row in rows:
runId = int(row["run_id"])
payload = grouped.setdefault(
runId,
{
"artifacts": [],
"hasHtmlReport": False,
"hasDocxReport": False,
"hasParagraphHtml": False,
},
)
artifactType = str(row.get("artifact_type") or "")
if artifactType in artifactTypeMap and row.get("oss_url") and artifactTypeMap[artifactType] not in payload:
payload[artifactTypeMap[artifactType]] = row["oss_url"]
if artifactType == "html_report":
payload["hasHtmlReport"] = True
elif artifactType == "annotated_docx":
payload["hasDocxReport"] = True
elif artifactType == "paragraph_html":
payload["hasParagraphHtml"] = True
payload["artifacts"].append(
{
"artifactType": artifactType,
"fileName": row.get("file_name") or "",
"fileExt": row.get("file_ext") or "",
"mimeType": row.get("mime_type") or "",
"ossUrl": row.get("oss_url") or "",
"description": row.get("description") or "",
}
)
return grouped
def _map_document_row(self, row: Any) -> _GovdocDocumentRow:
versionNoValue = row.get("derived_version_no")
if versionNoValue is None:
versionNoValue = row.get("version_no")
return _GovdocDocumentRow(
documentId=int(row["document_id"]),
region=str(row["region"] or "default"),
processingStatus=str(row["processing_status"] or "waiting"),
currentRunId=int(row["current_run_id"]) if row.get("current_run_id") is not None else None,
versionGroupKey=str(row["version_group_key"]) if row.get("version_group_key") else None,
versionNo=int(versionNoValue or 1),
totalVersions=int(row.get("total_versions") or 1),
previousVersionId=int(row["previous_version_id"]) if row.get("previous_version_id") is not None else None,
rootVersionId=int(row["root_version_id"]) if row.get("root_version_id") is not None else None,
isLatestVersion=bool(row.get("is_latest_version", True)),
normalizedName=str(row["normalized_name"]) if row.get("normalized_name") else None,
createdAt=row.get("created_at"),
updatedAt=row.get("updated_at"),
fileId=int(row["file_id"]),
fileName=str(row["file_name"] or ""),
fileExt=str(row["file_ext"]) if row.get("file_ext") else None,
mimeType=str(row["mime_type"]) if row.get("mime_type") else None,
fileSize=int(row["file_size"]) if row.get("file_size") is not None else None,
ossUrl=str(row["oss_url"]) if row.get("oss_url") else None,
createdBy=int(row["created_by"]) if row.get("created_by") is not None else None,
resultStatus=str(row["result_status"]) if row.get("result_status") else None,
totalScore=float(row["total_score"]) if row.get("total_score") is not None else None,
passedCount=int(row["passed_count"]) if row.get("passed_count") is not None else None,
failedCount=int(row["failed_count"]) if row.get("failed_count") is not None else None,
skippedCount=int(row["skipped_count"]) if row.get("skipped_count") is not None else None,
findingCount=int(row.get("finding_count") or 0),
errorCount=int(row.get("error_count") or 0),
warningCount=int(row.get("warning_count") or 0),
infoCount=int(row.get("info_count") or 0),
rulesPath=str(row["rules_path"]) if row.get("rules_path") else None,
hasHtmlReport=bool(row.get("has_html_report")),
hasDocxReport=bool(row.get("has_docx_report")),
)
async def _build_document_list_item(self, mapped: _GovdocDocumentRow) -> dict[str, Any]:
summary = self._build_summary_payload(
mapped.totalScore,
mapped.passedCount,
mapped.failedCount,
mapped.skippedCount,
totalFindings=mapped.findingCount,
bySeverity={
"error": mapped.errorCount,
"warning": mapped.warningCount,
"info": mapped.infoCount,
},
)
rulesetMeta = await self._resolve_ruleset_metadata(mapped.rulesPath)
return {
"documentId": mapped.documentId,
"fileId": mapped.fileId,
"fileName": mapped.fileName,
"fileExt": mapped.fileExt,
"mimeType": mapped.mimeType,
"fileSize": mapped.fileSize,
"region": mapped.region,
"processingStatus": mapped.processingStatus,
"currentRunId": mapped.currentRunId,
"latestRunId": mapped.currentRunId,
"resultStatus": mapped.resultStatus,
"score": float(mapped.totalScore) if mapped.totalScore is not None else None,
"versionGroupKey": mapped.versionGroupKey,
"versionNo": mapped.versionNo,
"totalVersions": mapped.totalVersions,
"previousVersionId": mapped.previousVersionId,
"rootVersionId": mapped.rootVersionId,
"isLatestVersion": mapped.isLatestVersion,
"rulesetId": rulesetMeta["typeId"],
"rulesetName": rulesetMeta["name"],
"rulesetVersion": rulesetMeta["version"],
"passedCount": mapped.passedCount or 0,
"failedCount": mapped.failedCount or 0,
"skippedCount": mapped.skippedCount or 0,
"latestRun": {
"runId": mapped.currentRunId,
"summary": summary,
} if mapped.currentRunId else None,
"reports": {
"hasHtmlReport": mapped.hasHtmlReport,
"hasDocxReport": mapped.hasDocxReport,
},
"createdAt": self._iso(mapped.createdAt),
"updatedAt": self._iso(mapped.updatedAt),
}
async def _find_latest_version_candidate(
self,
session,
*,
region: str,
normalizedName: str,
fileExt: str | None,
) -> dict[str, Any] | None:
extClause = ""
params: dict[str, Any] = {
"region": region,
"normalized_name": normalizedName,
}
if fileExt:
extClause = " AND f.file_ext = :file_ext"
params["file_ext"] = fileExt
row = (
await session.execute(
text(
f"""
SELECT
d.id AS document_id,
d.version_group_key,
d.version_no,
d.root_version_id
FROM leaudit_documents d
JOIN leaudit_document_files f
ON f.document_id = d.id
AND f.is_active = true
AND f.file_role = 'original'
AND f.deleted_at IS NULL
WHERE d.deleted_at IS NULL
AND d.review_scope = 'govdoc'
AND COALESCE(d.engine_type, 'leaudit') = 'govdoc'
AND d.region = :region
AND COALESCE(d.normalized_name, '') = :normalized_name
AND COALESCE(d.is_latest_version, true) = true{extClause}
ORDER BY d.version_no DESC, d.id DESC
LIMIT 1
"""
),
params,
)
).mappings().first()
return dict(row) if row else None
async def _backfill_legacy_version_chain(
self,
session,
*,
region: str,
normalizedName: str,
fileExt: str | None,
) -> dict[str, Any] | None:
extClause = ""
params: dict[str, Any] = {
"region": region,
"normalized_name": normalizedName,
}
if fileExt:
extClause = " AND f.file_ext = :file_ext"
params["file_ext"] = fileExt
rows = (
await session.execute(
text(
f"""
SELECT d.id AS document_id
FROM leaudit_documents d
JOIN leaudit_document_files f
ON f.document_id = d.id
AND f.is_active = true
AND f.file_role = 'original'
AND f.deleted_at IS NULL
WHERE d.deleted_at IS NULL
AND d.review_scope = 'govdoc'
AND COALESCE(d.engine_type, 'leaudit') = 'govdoc'
AND d.region = :region
AND COALESCE(d.normalized_name, '') = :normalized_name{extClause}
ORDER BY d.created_at ASC, d.id ASC
"""
),
params,
)
).mappings().all()
if not rows:
return None
groupKey = uuid.uuid4().hex
rootId = int(rows[0]["document_id"])
previousId: int | None = None
for index, row in enumerate(rows, start=1):
documentId = int(row["document_id"])
isLatest = index == len(rows)
await session.execute(
text(
"""
UPDATE leaudit_documents
SET version_group_key = :version_group_key,
version_no = :version_no,
previous_version_id = :previous_version_id,
root_version_id = :root_version_id,
is_latest_version = :is_latest_version,
updated_at = NOW()
WHERE id = :document_id
"""
),
{
"version_group_key": groupKey,
"version_no": index,
"previous_version_id": previousId,
"root_version_id": rootId,
"is_latest_version": isLatest,
"document_id": documentId,
},
)
previousId = documentId
latestId = int(rows[-1]["document_id"])
return {
"document_id": latestId,
"version_group_key": groupKey,
"version_no": len(rows),
"root_version_id": rootId,
}
async def _backfill_missing_version_groups(self, session) -> None:
groups = (
await session.execute(
text(
"""
SELECT
d.region,
COALESCE(d.normalized_name, '') AS normalized_name,
COALESCE(f.file_ext, '') AS file_ext,
ARRAY_AGG(d.id ORDER BY d.created_at ASC, d.id ASC) AS document_ids,
MIN(NULLIF(d.version_group_key, '')) AS existing_version_group_key
FROM leaudit_documents d
JOIN leaudit_document_files f
ON f.document_id = d.id
AND f.is_active = true
AND f.file_role = 'original'
AND f.deleted_at IS NULL
WHERE d.deleted_at IS NULL
AND d.review_scope = 'govdoc'
AND COALESCE(d.engine_type, 'leaudit') = 'govdoc'
GROUP BY d.region, COALESCE(d.normalized_name, ''), COALESCE(f.file_ext, '')
HAVING BOOL_OR(COALESCE(d.version_group_key, '') = '')
"""
)
)
).mappings().all()
if not groups:
return
for group in groups:
region = str(group["region"] or "default")
normalizedName = str(group["normalized_name"] or "")
fileExt = str(group["file_ext"] or "")
documentIds = [int(value) for value in (group["document_ids"] or [])]
if not documentIds:
continue
versionGroupKey = str(group["existing_version_group_key"] or "").strip() or self._derive_version_group_key(
region=region,
normalizedName=normalizedName,
fileExt=fileExt or None,
)
rootId = documentIds[0]
previousId: int | None = None
for index, documentId in enumerate(documentIds, start=1):
isLatest = index == len(documentIds)
await session.execute(
text(
"""
UPDATE leaudit_documents
SET version_group_key = :version_group_key,
version_no = :version_no,
previous_version_id = :previous_version_id,
root_version_id = :root_version_id,
is_latest_version = :is_latest_version,
updated_at = NOW()
WHERE id = :document_id
"""
),
{
"version_group_key": versionGroupKey,
"version_no": index,
"previous_version_id": previousId,
"root_version_id": rootId,
"is_latest_version": isLatest,
"document_id": documentId,
},
)
previousId = documentId
await session.commit()
def _derive_version_group_key(self, *, region: str, normalizedName: str, fileExt: str | None) -> str:
raw = f"{region}|{normalizedName}|{fileExt or ''}"
return hashlib.md5(raw.encode("utf-8")).hexdigest()
async def _resolve_ruleset_metadata(self, rulesPath: str | None) -> dict[str, str]:
ruleset = await self._load_ruleset(rulesPath)
if ruleset is not None:
return {
"typeId": str(ruleset.metadata.type_id or ""),
"name": str(ruleset.metadata.name or ""),
"version": str(ruleset.metadata.version or ""),
}
resolved = await self._resolve_rules_path(rulesPath)
if not resolved:
return {"typeId": "", "name": "", "version": ""}
path = Path(resolved)
return {
"typeId": path.stem,
"name": path.parent.name,
"version": "",
}
async def _get_report_artifact(self, runId: int, artifactType: str) -> Any | None:
async with GetAsyncSession() as session:
await self._ensureGovdocSchema(session)
return (
await session.execute(
text(
"""
SELECT oss_url
FROM govdoc_report_artifacts
WHERE run_id = :run_id
AND artifact_type = :artifact_type
AND deleted_at IS NULL
AND COALESCE(oss_url, '') <> ''
ORDER BY id DESC
LIMIT 1
"""
),
{"run_id": runId, "artifact_type": artifactType},
)
).mappings().first()
def _parse_json(self, raw: Any) -> Any:
if raw is None or raw == "":
return None
if isinstance(raw, (dict, list)):
return raw
try:
return json.loads(raw)
except Exception:
return None
def _iso(self, value: Any) -> str | None:
if value is None:
return None
if isinstance(value, datetime):
return value.isoformat()
return str(value)
def _build_audit_result_from_run_result(self, payload: dict[str, Any]) -> AuditResult:
summaryPayload = payload.get("summary") or {}
findingsPayload = payload.get("findings") or []
checkedRulesPayload = payload.get("checkedRules") or []
structurePayload = payload.get("structure") or []
outlinePayload = payload.get("outline") or []
entitiesPayload = payload.get("entities") or {}
documentPayload = payload.get("document") or {}
return AuditResult(
audit_id=str(payload.get("runId") or ""),
document=documentPayload,
summary=AuditSummary.model_validate(summaryPayload),
findings=[Finding.model_validate(item) for item in findingsPayload],
checked_rules=[CheckedRule.model_validate(item) for item in checkedRulesPayload],
structure=[StructureItem.model_validate(item) for item in structurePayload],
outline=[OutlineNode.model_validate(item) for item in outlinePayload],
entities=entitiesPayload,
)