Files
leaudit-platform-backend/fastapi_modules/fastapi_leaudit/services/impl/documentServiceImpl.py
T
wren 52c2bed4f9 feat: add document type CRUD with inline rule set binding
- GET/POST /api/document-types, GET/PUT/DELETE /api/document-types/{id}
- DocumentTypeItemVO extended with description, entryModuleId,
  isEnabled, ruleSetIds
- Create/Update DTOs accept ruleSetIds array for automatic
  leaudit_rule_type_bindings sync (full replace on update)
- Soft delete cascades to rule_type_bindings
2026-04-30 12:50:56 +08:00

706 lines
29 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""文档服务实现。"""
from __future__ import annotations
from datetime import datetime
import hashlib
import mimetypes
import re
import time
import unicodedata
import uuid
from pathlib import Path
from sqlalchemy import text
from fastapi_common.fastapi_common_sqlalchemy.database import GetAsyncSession
from fastapi_common.fastapi_common_web.domain.responses import StatusCodeEnum
from fastapi_common.fastapi_common_web.exception.LeauditException import LeauditException
from fastapi_common.fastapi_common_storage.oss_path_utils import OssPathUtils
from fastapi_modules.fastapi_leaudit.domian.vo.documentVo import (
DocumentHistoryVersionVO,
DocumentListItemVO,
DocumentListPageVO,
DocumentTypeCreateDTO,
DocumentTypeItemVO,
DocumentTypeUpdateDTO,
DocumentUploadVO,
)
from fastapi_modules.fastapi_leaudit.models import LeauditDocument, LeauditDocumentFile
from fastapi_modules.fastapi_leaudit.services import IAuditService, IDocumentService, IOssService
from fastapi_modules.fastapi_leaudit.services.impl.auditServiceImpl import AuditServiceImpl
from fastapi_modules.fastapi_leaudit.services.impl.ossServiceImpl import OssServiceImpl
class DocumentServiceImpl(IDocumentService):
"""文档服务实现。"""
def __init__(
self,
OssService: IOssService | None = None,
AuditService: IAuditService | None = None,
) -> None:
self.OssService = OssService or OssServiceImpl()
self.AuditService = AuditService or AuditServiceImpl()
async def Upload(
self,
FileName: str,
FileContent: bytes,
ContentType: str | None,
TypeId: int | None = None,
TypeCode: str | None = None,
Region: str = "default",
FileRole: str = "primary",
CreatedBy: int | None = None,
AutoRun: bool = False,
Speed: str = "normal",
) -> DocumentUploadVO:
"""上传文档并建立 LeAudit document/file 记录。"""
if not FileName:
raise LeauditException(StatusCodeEnum.HTTP_400_BAD_REQUEST, "上传文件名不能为空")
if not FileContent:
raise LeauditException(StatusCodeEnum.HTTP_400_BAD_REQUEST, "上传文件内容不能为空")
if not TypeId and not TypeCode:
raise LeauditException(StatusCodeEnum.HTTP_400_BAD_REQUEST, "typeId 与 typeCode 至少传一个")
normalizedRegion = (Region or "default").strip() or "default"
normalizedFileRole = (FileRole or "primary").strip() or "primary"
fileExt = Path(FileName).suffix.lstrip(".").lower() or None
mimeType = ContentType or mimetypes.guess_type(FileName)[0] or "application/octet-stream"
fileSha256 = hashlib.sha256(FileContent).hexdigest()
fileSize = len(FileContent)
normalizedSpeed = _normalize_speed(Speed)
normalizedName = _normalize_document_name(FileName)
uploadedAt = datetime.now()
async with GetAsyncSession() as Session:
if TypeId is not None and TypeCode is not None:
typeResult = await Session.execute(
text(
"""
SELECT id, code
FROM leaudit_document_types
WHERE id = :type_id
AND code = :type_code
AND deleted_at IS NULL
LIMIT 1
"""
),
{"type_id": TypeId, "type_code": TypeCode},
)
elif TypeId is not None:
typeResult = await Session.execute(
text(
"""
SELECT id, code
FROM leaudit_document_types
WHERE id = :type_id
AND deleted_at IS NULL
LIMIT 1
"""
),
{"type_id": TypeId},
)
else:
typeResult = await Session.execute(
text(
"""
SELECT id, code
FROM leaudit_document_types
WHERE code = :type_code
AND deleted_at IS NULL
LIMIT 1
"""
),
{"type_code": TypeCode},
)
typeRow = typeResult.mappings().first()
if not typeRow:
raise LeauditException(StatusCodeEnum.HTTP_404_NOT_FOUND, "文档类型不存在或已停用")
resolvedTypeId = int(typeRow["id"])
resolvedTypeCode = str(typeRow["code"])
duplicateUpload = False
previousVersionId: int | None = None
rootVersionId: int | None = None
versionGroupKey: str | None = None
versionNo = 1
latestCandidate = None
if normalizedFileRole == "primary":
latestCandidate = await _find_latest_version_candidate(
Session,
type_id=resolvedTypeId,
region=normalizedRegion,
normalized_name=normalizedName,
)
if latestCandidate and latestCandidate["sha256"] == fileSha256:
duplicateUpload = True
document = await Session.get(LeauditDocument, int(latestCandidate["document_id"]))
documentFile = await Session.get(LeauditDocumentFile, int(latestCandidate["file_id"]))
if document is None or documentFile is None:
raise LeauditException(StatusCodeEnum.HTTP_500_INTERNAL_SERVER_ERROR, "重复上传版本定位失败")
await Session.commit()
else:
internalDocumentNo = time.time_ns()
if latestCandidate:
previousVersionId = int(latestCandidate["document_id"])
rootVersionId = int(latestCandidate["root_version_id"] or latestCandidate["document_id"])
versionGroupKey = str(latestCandidate["version_group_key"])
versionNo = int(latestCandidate["version_no"]) + 1
previousDocument = await Session.get(LeauditDocument, previousVersionId)
if previousDocument is not None:
previousDocument.isLatestVersion = False
else:
versionGroupKey = uuid.uuid4().hex
document = await LeauditDocument.create_new(
Session,
bizDocumentId=internalDocumentNo,
typeId=resolvedTypeId,
region=normalizedRegion,
processingStatus="waiting",
versionGroupKey=versionGroupKey,
versionNo=versionNo,
previousVersionId=previousVersionId,
rootVersionId=rootVersionId,
isLatestVersion=True,
normalizedName=normalizedName,
)
if document.rootVersionId is None:
document.rootVersionId = document.Id
rootVersionId = document.Id
else:
rootVersionId = document.rootVersionId
versionLabel = f"v{document.versionNo}"
objectKey = OssPathUtils.BuildBusinessDocKey(
Region=normalizedRegion,
TypeCode=resolvedTypeCode,
DocumentId=document.Id,
Version=versionLabel,
FileRole=normalizedFileRole,
FileName=FileName,
Year=uploadedAt.year,
Month=uploadedAt.month,
)
ossUrl = await self.OssService.UploadBytes(
ObjectKey=objectKey,
Content=FileContent,
ContentType=mimeType,
)
versionCount = await LeauditDocumentFile.count_by_document(Session, document.Id)
_ = versionCount # single-version-per-document in current model; kept for future extension
await LeauditDocumentFile.deactivate_active_by_document(Session, document.Id)
documentFile = LeauditDocumentFile(
documentId=document.Id,
fileRole=normalizedFileRole,
fileName=FileName,
fileExt=fileExt,
mimeType=mimeType,
fileSize=fileSize,
sha256=fileSha256,
localPath=None,
ossUrl=ossUrl,
storageProvider="minio",
isActive=True,
createdBy=CreatedBy,
)
Session.add(documentFile)
await Session.flush()
await Session.commit()
await Session.refresh(document)
await Session.refresh(documentFile)
ossUrl = documentFile.ossUrl or ""
run = None
processingStatus = document.processingStatus or "waiting"
if AutoRun:
run = await self.AuditService.Run(
DocumentId=document.Id,
Speed=Speed,
Force=duplicateUpload,
)
processingStatus = "running" if run.status in {"pending", "running"} else run.status
return DocumentUploadVO(
documentId=document.Id,
internalDocumentNo=document.bizDocumentId,
versionGroupKey=document.versionGroupKey or "",
versionNo=int(document.versionNo or 1),
previousVersionId=document.previousVersionId,
rootVersionId=int(document.rootVersionId or document.Id),
duplicateUpload=duplicateUpload,
fileId=documentFile.Id,
typeId=resolvedTypeId,
typeCode=resolvedTypeCode,
region=normalizedRegion,
fileName=documentFile.fileName,
ossUrl=ossUrl,
speed=normalizedSpeed,
processingStatus=processingStatus,
autoRunTriggered=AutoRun,
run=run,
)
async def ListDocuments(
self,
Page: int = 1,
PageSize: int = 20,
Keyword: str | None = None,
TypeCode: str | None = None,
Region: str | None = None,
ProcessingStatus: str | None = None,
ResultStatus: str | None = None,
UserId: int | None = None,
DateFrom: str | None = None,
DateTo: str | None = None,
) -> DocumentListPageVO:
"""获取文档列表(仅最新版本,附历史版本摘要)。"""
page = max(1, int(Page))
page_size = max(1, min(int(PageSize), 100))
offset = (page - 1) * page_size
filters = ["d.is_latest_version = true", "d.deleted_at IS NULL", "f.is_active = true", "f.file_role = 'primary'"]
params: dict[str, object] = {"limit": page_size, "offset": offset}
if Keyword:
filters.append("(f.file_name ILIKE :keyword OR d.normalized_name ILIKE :keyword)")
params["keyword"] = f"%{Keyword.strip()}%"
if TypeCode:
filters.append("dt.code = :type_code")
params["type_code"] = TypeCode.strip()
if Region:
filters.append("d.region = :region")
params["region"] = Region.strip()
if ProcessingStatus:
filters.append("d.processing_status = :processing_status")
params["processing_status"] = ProcessingStatus.strip()
if ResultStatus:
filters.append("ar.result_status = :result_status")
params["result_status"] = ResultStatus.strip()
if UserId is not None:
filters.append("f.created_by = :user_id")
params["user_id"] = UserId
if DateFrom:
filters.append("d.created_at >= :date_from")
params["date_from"] = DateFrom.strip()
if DateTo:
filters.append("d.created_at < (CAST(:date_to AS date) + INTERVAL '1 day')")
params["date_to"] = DateTo.strip()
where_clause = " AND ".join(filters)
count_sql = text(
f"""
SELECT COUNT(*)
FROM leaudit_documents d
JOIN leaudit_document_files f
ON f.document_id = d.id
LEFT JOIN leaudit_document_types dt
ON dt.id = d.type_id
LEFT JOIN leaudit_audit_runs ar
ON ar.id = d.current_run_id
WHERE {where_clause}
"""
)
list_sql = text(
f"""
SELECT
d.id AS document_id,
d.biz_document_id AS internal_document_no,
d.version_group_key,
d.version_no,
d.root_version_id,
d.previous_version_id,
d.type_id,
dt.code AS type_code,
d.region,
d.normalized_name,
d.processing_status,
d.current_run_id,
d.updated_at,
f.id AS file_id,
f.file_name,
f.file_ext,
f.mime_type,
f.file_size,
f.oss_url,
ar.status AS run_status,
ar.result_status,
ar.total_score,
ar.passed_count,
ar.failed_count,
ar.skipped_count,
vc.total_versions,
COALESCE(vc.total_versions, 1) > 1 AS has_history
FROM leaudit_documents d
JOIN leaudit_document_files f
ON f.document_id = d.id
LEFT JOIN leaudit_document_types dt
ON dt.id = d.type_id
LEFT JOIN leaudit_audit_runs ar
ON ar.id = d.current_run_id
LEFT JOIN (
SELECT version_group_key, COUNT(*) AS total_versions
FROM leaudit_documents
WHERE deleted_at IS NULL
GROUP BY version_group_key
) vc
ON vc.version_group_key = d.version_group_key
WHERE {where_clause}
ORDER BY d.updated_at DESC, d.id DESC
LIMIT :limit OFFSET :offset
"""
)
history_sql = text(
"""
SELECT
d.version_group_key,
d.id AS document_id,
d.version_no,
d.processing_status,
d.updated_at,
f.id AS file_id,
f.file_name,
f.file_ext,
ar.status AS run_status,
ar.result_status
FROM leaudit_documents d
JOIN leaudit_document_files f
ON f.document_id = d.id
AND f.is_active = true
AND f.file_role = 'primary'
LEFT JOIN leaudit_audit_runs ar
ON ar.id = d.current_run_id
WHERE d.version_group_key = ANY(:group_keys)
AND d.is_latest_version = false
AND d.deleted_at IS NULL
ORDER BY d.version_group_key, d.version_no DESC, d.id DESC
"""
)
async with GetAsyncSession() as Session:
total = int((await Session.execute(count_sql, params)).scalar_one())
rows = (await Session.execute(list_sql, params)).mappings().all()
history_by_group: dict[str, list[DocumentHistoryVersionVO]] = {}
group_keys = [str(row["version_group_key"]) for row in rows if row["version_group_key"]]
if group_keys:
history_rows = (
await Session.execute(history_sql, {"group_keys": group_keys})
).mappings().all()
for row in history_rows:
history_by_group.setdefault(str(row["version_group_key"]), []).append(
DocumentHistoryVersionVO(
documentId=int(row["document_id"]),
fileId=int(row["file_id"]) if row["file_id"] is not None else None,
versionNo=int(row["version_no"]),
fileName=row["file_name"],
fileExt=row["file_ext"],
processingStatus=row["processing_status"],
runStatus=row["run_status"],
resultStatus=row["result_status"],
updatedAt=row["updated_at"].isoformat() if row["updated_at"] else None,
)
)
documents: list[DocumentListItemVO] = []
for row in rows:
group_key = str(row["version_group_key"] or "")
documents.append(
DocumentListItemVO(
documentId=int(row["document_id"]),
internalDocumentNo=int(row["internal_document_no"]),
versionGroupKey=group_key,
versionNo=int(row["version_no"] or 1),
rootVersionId=int(row["root_version_id"] or row["document_id"]),
previousVersionId=int(row["previous_version_id"]) if row["previous_version_id"] is not None else None,
typeId=int(row["type_id"]) if row["type_id"] is not None else None,
typeCode=row["type_code"],
region=row["region"],
normalizedName=row["normalized_name"],
fileId=int(row["file_id"]) if row["file_id"] is not None else None,
fileName=row["file_name"],
fileExt=row["file_ext"],
mimeType=row["mime_type"],
fileSize=int(row["file_size"]) if row["file_size"] is not None else None,
ossUrl=row["oss_url"],
processingStatus=row["processing_status"],
currentRunId=int(row["current_run_id"]) if row["current_run_id"] is not None else None,
runStatus=row["run_status"],
resultStatus=row["result_status"],
totalScore=float(row["total_score"]) if row["total_score"] is not None else None,
passedCount=int(row["passed_count"]) if row["passed_count"] is not None else None,
failedCount=int(row["failed_count"]) if row["failed_count"] is not None else None,
skippedCount=int(row["skipped_count"]) if row["skipped_count"] is not None else None,
updatedAt=row["updated_at"].isoformat() if row["updated_at"] else None,
hasHistory=bool(row["has_history"]),
totalVersions=int(row["total_versions"] or 1),
historyVersions=history_by_group.get(group_key, []),
)
)
total_pages = (total + page_size - 1) // page_size if total else 0
return DocumentListPageVO(
total=total,
page=page,
pageSize=page_size,
totalPages=total_pages,
documents=documents,
)
async def ListDocumentTypes(self, Ids: list[int] | None = None) -> list[DocumentTypeItemVO]:
"""获取文档类型列表。"""
async with GetAsyncSession() as Session:
rows, bindingsMap = await self._queryDocumentTypes(Session, Ids)
return [
DocumentTypeItemVO(
id=int(r["id"]), name=str(r["name"] or ""), code=str(r["code"] or ""),
description=r.get("description"), entryModuleId=r.get("entry_module_id"),
isEnabled=bool(r.get("is_enabled", True)),
ruleSetIds=bindingsMap.get(int(r["id"]), []),
)
for r in rows
]
async def GetDocumentType(self, Id: int) -> DocumentTypeItemVO:
"""获取文档类型详情。"""
async with GetAsyncSession() as Session:
rows, bindingsMap = await self._queryDocumentTypes(Session, [Id])
if not rows:
raise LeauditException(StatusCodeEnum.HTTP_404_NOT_FOUND, "文档类型不存在")
r = rows[0]
return DocumentTypeItemVO(
id=int(r["id"]), name=str(r["name"] or ""), code=str(r["code"] or ""),
description=r.get("description"), entryModuleId=r.get("entry_module_id"),
isEnabled=bool(r.get("is_enabled", True)),
ruleSetIds=bindingsMap.get(int(r["id"]), []),
)
async def CreateDocumentType(self, Body: DocumentTypeCreateDTO) -> DocumentTypeItemVO:
"""创建文档类型。"""
async with GetAsyncSession() as Session:
existing = (
await Session.execute(
text("SELECT 1 FROM leaudit_document_types WHERE code = :code AND deleted_at IS NULL LIMIT 1"),
{"code": Body.code.strip()},
)
).first()
if existing:
raise LeauditException(StatusCodeEnum.HTTP_409_CONFLICT, f"文档类型编码 {Body.code} 已存在")
row = (
await Session.execute(
text(
"""
INSERT INTO leaudit_document_types (code, name, description, entry_module_id, is_enabled, sort_order, created_at, updated_at)
VALUES (:code, :name, :description, :entry_module_id, :is_enabled, :sort_order, NOW(), NOW())
RETURNING id
"""
),
{
"code": Body.code.strip(),
"name": Body.name.strip(),
"description": Body.description.strip() or None,
"entry_module_id": Body.entryModuleId,
"is_enabled": Body.isEnabled,
"sort_order": Body.sortOrder,
},
)
).scalar_one()
await self._syncRuleBindings(Session, int(row), Body.ruleSetIds, "default")
await Session.commit()
return await self.GetDocumentType(int(row))
async def UpdateDocumentType(self, Id: int, Body: DocumentTypeUpdateDTO) -> DocumentTypeItemVO:
"""更新文档类型。"""
async with GetAsyncSession() as Session:
current = (
await Session.execute(
text("SELECT id FROM leaudit_document_types WHERE id = :id AND deleted_at IS NULL"),
{"id": Id},
)
).first()
if not current:
raise LeauditException(StatusCodeEnum.HTTP_404_NOT_FOUND, "文档类型不存在")
sets: list[str] = []
params: dict[str, object] = {"id": Id}
if Body.name is not None:
sets.append("name = :name")
params["name"] = Body.name.strip()
if Body.description is not None:
sets.append("description = :description")
params["description"] = Body.description.strip() or None
if Body.entryModuleId is not None:
sets.append("entry_module_id = :entry_module_id")
params["entry_module_id"] = Body.entryModuleId
if Body.isEnabled is not None:
sets.append("is_enabled = :is_enabled")
params["is_enabled"] = Body.isEnabled
if Body.sortOrder is not None:
sets.append("sort_order = :sort_order")
params["sort_order"] = Body.sortOrder
if sets:
sets.append("updated_at = NOW()")
await Session.execute(text(f"UPDATE leaudit_document_types SET {', '.join(sets)} WHERE id = :id"), params)
if Body.ruleSetIds is not None:
await self._syncRuleBindings(Session, Id, Body.ruleSetIds, "default")
await Session.commit()
return await self.GetDocumentType(Id)
async def DeleteDocumentType(self, Id: int) -> None:
"""软删除文档类型。"""
async with GetAsyncSession() as Session:
current = (
await Session.execute(
text("SELECT 1 FROM leaudit_document_types WHERE id = :id AND deleted_at IS NULL"),
{"id": Id},
)
).first()
if not current:
raise LeauditException(StatusCodeEnum.HTTP_404_NOT_FOUND, "文档类型不存在")
await Session.execute(text("UPDATE leaudit_document_types SET deleted_at = NOW() WHERE id = :id"), {"id": Id})
await Session.execute(text("UPDATE leaudit_rule_type_bindings SET deleted_at = NOW() WHERE doc_type_id = :id AND deleted_at IS NULL"), {"id": Id})
await Session.commit()
async def _queryDocumentTypes(self, Session, Ids: list[int] | None = None):
"""查询文档类型及其规则绑定。"""
if Ids:
rows = (
await Session.execute(
text(
"""
SELECT id, code, name, description, entry_module_id, is_enabled, sort_order
FROM leaudit_document_types
WHERE deleted_at IS NULL AND id = ANY(:ids)
ORDER BY sort_order ASC, id ASC
"""
),
{"ids": Ids},
)
).mappings().all()
else:
rows = (
await Session.execute(
text(
"""
SELECT id, code, name, description, entry_module_id, is_enabled, sort_order
FROM leaudit_document_types
WHERE deleted_at IS NULL
ORDER BY sort_order ASC, id ASC
"""
)
)
).mappings().all()
allIds = [int(r["id"]) for r in rows]
bindingsMap: dict[int, list[int]] = {i: [] for i in allIds}
if allIds:
bindingRows = (
await Session.execute(
text(
"""
SELECT doc_type_id, rule_set_id
FROM leaudit_rule_type_bindings
WHERE doc_type_id = ANY(:ids) AND deleted_at IS NULL AND is_active = true
ORDER BY priority DESC
"""
),
{"ids": allIds},
)
).fetchall()
for b in bindingRows:
bindingsMap.setdefault(int(b[0]), []).append(int(b[1]))
return rows, bindingsMap
async def _syncRuleBindings(self, Session, DocTypeId: int, RuleSetIds: list[int], Region: str = "default") -> None:
"""全量替换规则绑定。"""
await Session.execute(
text("UPDATE leaudit_rule_type_bindings SET deleted_at = NOW() WHERE doc_type_id = :id AND deleted_at IS NULL"),
{"id": DocTypeId},
)
for idx, ruleSetId in enumerate(RuleSetIds):
await Session.execute(
text(
"""
INSERT INTO leaudit_rule_type_bindings (doc_type_id, rule_set_id, binding_mode, priority, region, is_active, created_at, updated_at)
VALUES (:doc_type_id, :rule_set_id, 'explicit', :priority, :region, true, NOW(), NOW())
"""
),
{"doc_type_id": DocTypeId, "rule_set_id": ruleSetId, "priority": 100 - idx, "region": Region},
)
async def _find_latest_version_candidate(
session,
*,
type_id: int,
region: str,
normalized_name: str,
) -> dict | None:
"""Find the latest primary document version candidate by normalized name."""
result = await session.execute(
text(
"""
SELECT
d.id AS document_id,
d.version_group_key,
d.version_no,
d.root_version_id,
f.id AS file_id,
f.sha256
FROM leaudit_documents d
JOIN leaudit_document_files f
ON f.document_id = d.id
AND f.is_active = true
AND f.file_role = 'primary'
WHERE d.type_id = :type_id
AND d.region = :region
AND d.normalized_name = :normalized_name
AND d.is_latest_version = true
AND d.deleted_at IS NULL
ORDER BY d.version_no DESC, d.id DESC
LIMIT 1
"""
),
{
"type_id": type_id,
"region": region,
"normalized_name": normalized_name,
},
)
row = result.mappings().first()
return dict(row) if row else None
def _normalize_speed(speed: str | None) -> str:
"""Normalize front-end speed selection to urgent/normal."""
normalized = (speed or "").strip().lower()
if normalized in {"urgent", "high", "fast", "emergency", "紧急"}:
return "urgent"
return "normal"
def _normalize_document_name(file_name: str) -> str:
"""Build a stable name key for same-name version matching."""
stem = Path(file_name).stem
name = unicodedata.normalize("NFKC", stem).strip().lower()
name = re.sub(r"[\s_\-]+", " ", name)
name = re.sub(r"(?:\(|)\d+(?:\)|)$", "", name).strip()
name = re.sub(r"(?:[-_\s]*副本|[-_\s]*copy)$", "", name).strip()
name = re.sub(r"\s+", " ", name).strip()
return name or "untitled"