Files
leaudit-platform-backend/fastapi_modules/fastapi_leaudit/services/impl/documentServiceImpl.py
T
2026-04-29 11:48:50 +08:00

505 lines
20 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""文档服务实现。"""
from __future__ import annotations
from datetime import datetime
import hashlib
import mimetypes
import re
import time
import unicodedata
import uuid
from pathlib import Path
from sqlalchemy import text
from fastapi_common.fastapi_common_sqlalchemy.database import GetAsyncSession
from fastapi_common.fastapi_common_web.domain.responses import StatusCodeEnum
from fastapi_common.fastapi_common_web.exception.LeauditException import LeauditException
from fastapi_common.fastapi_common_storage.oss_path_utils import OssPathUtils
from fastapi_modules.fastapi_leaudit.domian.vo.documentVo import (
DocumentHistoryVersionVO,
DocumentListItemVO,
DocumentListPageVO,
DocumentUploadVO,
)
from fastapi_modules.fastapi_leaudit.models import LeauditDocument, LeauditDocumentFile
from fastapi_modules.fastapi_leaudit.services import IAuditService, IDocumentService, IOssService
from fastapi_modules.fastapi_leaudit.services.impl.auditServiceImpl import AuditServiceImpl
from fastapi_modules.fastapi_leaudit.services.impl.ossServiceImpl import OssServiceImpl
class DocumentServiceImpl(IDocumentService):
"""文档服务实现。"""
def __init__(
self,
OssService: IOssService | None = None,
AuditService: IAuditService | None = None,
) -> None:
self.OssService = OssService or OssServiceImpl()
self.AuditService = AuditService or AuditServiceImpl()
async def Upload(
self,
FileName: str,
FileContent: bytes,
ContentType: str | None,
TypeId: int | None = None,
TypeCode: str | None = None,
Region: str = "default",
FileRole: str = "primary",
CreatedBy: int | None = None,
AutoRun: bool = False,
Speed: str = "normal",
) -> DocumentUploadVO:
"""上传文档并建立 LeAudit document/file 记录。"""
if not FileName:
raise LeauditException(StatusCodeEnum.HTTP_400_BAD_REQUEST, "上传文件名不能为空")
if not FileContent:
raise LeauditException(StatusCodeEnum.HTTP_400_BAD_REQUEST, "上传文件内容不能为空")
if not TypeId and not TypeCode:
raise LeauditException(StatusCodeEnum.HTTP_400_BAD_REQUEST, "typeId 与 typeCode 至少传一个")
normalizedRegion = (Region or "default").strip() or "default"
normalizedFileRole = (FileRole or "primary").strip() or "primary"
fileExt = Path(FileName).suffix.lstrip(".").lower() or None
mimeType = ContentType or mimetypes.guess_type(FileName)[0] or "application/octet-stream"
fileSha256 = hashlib.sha256(FileContent).hexdigest()
fileSize = len(FileContent)
normalizedSpeed = _normalize_speed(Speed)
normalizedName = _normalize_document_name(FileName)
uploadedAt = datetime.now()
async with GetAsyncSession() as Session:
if TypeId is not None and TypeCode is not None:
typeResult = await Session.execute(
text(
"""
SELECT id, code
FROM leaudit_document_types
WHERE id = :type_id
AND code = :type_code
AND deleted_at IS NULL
LIMIT 1
"""
),
{"type_id": TypeId, "type_code": TypeCode},
)
elif TypeId is not None:
typeResult = await Session.execute(
text(
"""
SELECT id, code
FROM leaudit_document_types
WHERE id = :type_id
AND deleted_at IS NULL
LIMIT 1
"""
),
{"type_id": TypeId},
)
else:
typeResult = await Session.execute(
text(
"""
SELECT id, code
FROM leaudit_document_types
WHERE code = :type_code
AND deleted_at IS NULL
LIMIT 1
"""
),
{"type_code": TypeCode},
)
typeRow = typeResult.mappings().first()
if not typeRow:
raise LeauditException(StatusCodeEnum.HTTP_404_NOT_FOUND, "文档类型不存在或已停用")
resolvedTypeId = int(typeRow["id"])
resolvedTypeCode = str(typeRow["code"])
duplicateUpload = False
previousVersionId: int | None = None
rootVersionId: int | None = None
versionGroupKey: str | None = None
versionNo = 1
latestCandidate = None
if normalizedFileRole == "primary":
latestCandidate = await _find_latest_version_candidate(
Session,
type_id=resolvedTypeId,
region=normalizedRegion,
normalized_name=normalizedName,
)
if latestCandidate and latestCandidate["sha256"] == fileSha256:
duplicateUpload = True
document = await Session.get(LeauditDocument, int(latestCandidate["document_id"]))
documentFile = await Session.get(LeauditDocumentFile, int(latestCandidate["file_id"]))
if document is None or documentFile is None:
raise LeauditException(StatusCodeEnum.HTTP_500_INTERNAL_SERVER_ERROR, "重复上传版本定位失败")
await Session.commit()
else:
internalDocumentNo = time.time_ns()
if latestCandidate:
previousVersionId = int(latestCandidate["document_id"])
rootVersionId = int(latestCandidate["root_version_id"] or latestCandidate["document_id"])
versionGroupKey = str(latestCandidate["version_group_key"])
versionNo = int(latestCandidate["version_no"]) + 1
previousDocument = await Session.get(LeauditDocument, previousVersionId)
if previousDocument is not None:
previousDocument.isLatestVersion = False
else:
versionGroupKey = uuid.uuid4().hex
document = await LeauditDocument.create_new(
Session,
bizDocumentId=internalDocumentNo,
typeId=resolvedTypeId,
region=normalizedRegion,
processingStatus="waiting",
versionGroupKey=versionGroupKey,
versionNo=versionNo,
previousVersionId=previousVersionId,
rootVersionId=rootVersionId,
isLatestVersion=True,
normalizedName=normalizedName,
)
if document.rootVersionId is None:
document.rootVersionId = document.Id
rootVersionId = document.Id
else:
rootVersionId = document.rootVersionId
versionLabel = f"v{document.versionNo}"
objectKey = OssPathUtils.BuildBusinessDocKey(
Region=normalizedRegion,
TypeCode=resolvedTypeCode,
DocumentId=document.Id,
Version=versionLabel,
FileRole=normalizedFileRole,
FileName=FileName,
Year=uploadedAt.year,
Month=uploadedAt.month,
)
ossUrl = await self.OssService.UploadBytes(
ObjectKey=objectKey,
Content=FileContent,
ContentType=mimeType,
)
versionCount = await LeauditDocumentFile.count_by_document(Session, document.Id)
_ = versionCount # single-version-per-document in current model; kept for future extension
await LeauditDocumentFile.deactivate_active_by_document(Session, document.Id)
documentFile = LeauditDocumentFile(
documentId=document.Id,
fileRole=normalizedFileRole,
fileName=FileName,
fileExt=fileExt,
mimeType=mimeType,
fileSize=fileSize,
sha256=fileSha256,
localPath=None,
ossUrl=ossUrl,
storageProvider="minio",
isActive=True,
createdBy=CreatedBy,
)
Session.add(documentFile)
await Session.flush()
await Session.commit()
await Session.refresh(document)
await Session.refresh(documentFile)
ossUrl = documentFile.ossUrl or ""
run = None
processingStatus = document.processingStatus or "waiting"
if AutoRun:
run = await self.AuditService.Run(
DocumentId=document.Id,
Speed=Speed,
Force=duplicateUpload,
)
processingStatus = "running" if run.status in {"pending", "running"} else run.status
return DocumentUploadVO(
documentId=document.Id,
internalDocumentNo=document.bizDocumentId,
versionGroupKey=document.versionGroupKey or "",
versionNo=int(document.versionNo or 1),
previousVersionId=document.previousVersionId,
rootVersionId=int(document.rootVersionId or document.Id),
duplicateUpload=duplicateUpload,
fileId=documentFile.Id,
typeId=resolvedTypeId,
typeCode=resolvedTypeCode,
region=normalizedRegion,
fileName=documentFile.fileName,
ossUrl=ossUrl,
speed=normalizedSpeed,
processingStatus=processingStatus,
autoRunTriggered=AutoRun,
run=run,
)
async def ListDocuments(
self,
Page: int = 1,
PageSize: int = 20,
Keyword: str | None = None,
TypeCode: str | None = None,
Region: str | None = None,
ProcessingStatus: str | None = None,
ResultStatus: str | None = None,
) -> DocumentListPageVO:
"""获取文档列表(仅最新版本,附历史版本摘要)。"""
page = max(1, int(Page))
page_size = max(1, min(int(PageSize), 100))
offset = (page - 1) * page_size
filters = ["d.is_latest_version = true", "d.deleted_at IS NULL", "f.is_active = true", "f.file_role = 'primary'"]
params: dict[str, object] = {"limit": page_size, "offset": offset}
if Keyword:
filters.append("(f.file_name ILIKE :keyword OR d.normalized_name ILIKE :keyword)")
params["keyword"] = f"%{Keyword.strip()}%"
if TypeCode:
filters.append("dt.code = :type_code")
params["type_code"] = TypeCode.strip()
if Region:
filters.append("d.region = :region")
params["region"] = Region.strip()
if ProcessingStatus:
filters.append("d.processing_status = :processing_status")
params["processing_status"] = ProcessingStatus.strip()
if ResultStatus:
filters.append("ar.result_status = :result_status")
params["result_status"] = ResultStatus.strip()
where_clause = " AND ".join(filters)
count_sql = text(
f"""
SELECT COUNT(*)
FROM leaudit_documents d
JOIN leaudit_document_files f
ON f.document_id = d.id
LEFT JOIN leaudit_document_types dt
ON dt.id = d.type_id
LEFT JOIN leaudit_audit_runs ar
ON ar.id = d.current_run_id
WHERE {where_clause}
"""
)
list_sql = text(
f"""
SELECT
d.id AS document_id,
d.biz_document_id AS internal_document_no,
d.version_group_key,
d.version_no,
d.root_version_id,
d.previous_version_id,
d.type_id,
dt.code AS type_code,
d.region,
d.normalized_name,
d.processing_status,
d.current_run_id,
d.updated_at,
f.id AS file_id,
f.file_name,
f.file_ext,
f.mime_type,
f.file_size,
f.oss_url,
ar.status AS run_status,
ar.result_status,
ar.total_score,
ar.passed_count,
ar.failed_count,
ar.skipped_count,
vc.total_versions,
COALESCE(vc.total_versions, 1) > 1 AS has_history
FROM leaudit_documents d
JOIN leaudit_document_files f
ON f.document_id = d.id
LEFT JOIN leaudit_document_types dt
ON dt.id = d.type_id
LEFT JOIN leaudit_audit_runs ar
ON ar.id = d.current_run_id
LEFT JOIN (
SELECT version_group_key, COUNT(*) AS total_versions
FROM leaudit_documents
WHERE deleted_at IS NULL
GROUP BY version_group_key
) vc
ON vc.version_group_key = d.version_group_key
WHERE {where_clause}
ORDER BY d.updated_at DESC, d.id DESC
LIMIT :limit OFFSET :offset
"""
)
history_sql = text(
"""
SELECT
d.version_group_key,
d.id AS document_id,
d.version_no,
d.processing_status,
d.updated_at,
f.id AS file_id,
f.file_name,
f.file_ext,
ar.status AS run_status,
ar.result_status
FROM leaudit_documents d
JOIN leaudit_document_files f
ON f.document_id = d.id
AND f.is_active = true
AND f.file_role = 'primary'
LEFT JOIN leaudit_audit_runs ar
ON ar.id = d.current_run_id
WHERE d.version_group_key = ANY(:group_keys)
AND d.is_latest_version = false
AND d.deleted_at IS NULL
ORDER BY d.version_group_key, d.version_no DESC, d.id DESC
"""
)
async with GetAsyncSession() as Session:
total = int((await Session.execute(count_sql, params)).scalar_one())
rows = (await Session.execute(list_sql, params)).mappings().all()
history_by_group: dict[str, list[DocumentHistoryVersionVO]] = {}
group_keys = [str(row["version_group_key"]) for row in rows if row["version_group_key"]]
if group_keys:
history_rows = (
await Session.execute(history_sql, {"group_keys": group_keys})
).mappings().all()
for row in history_rows:
history_by_group.setdefault(str(row["version_group_key"]), []).append(
DocumentHistoryVersionVO(
documentId=int(row["document_id"]),
fileId=int(row["file_id"]) if row["file_id"] is not None else None,
versionNo=int(row["version_no"]),
fileName=row["file_name"],
fileExt=row["file_ext"],
processingStatus=row["processing_status"],
runStatus=row["run_status"],
resultStatus=row["result_status"],
updatedAt=row["updated_at"].isoformat() if row["updated_at"] else None,
)
)
documents: list[DocumentListItemVO] = []
for row in rows:
group_key = str(row["version_group_key"] or "")
documents.append(
DocumentListItemVO(
documentId=int(row["document_id"]),
internalDocumentNo=int(row["internal_document_no"]),
versionGroupKey=group_key,
versionNo=int(row["version_no"] or 1),
rootVersionId=int(row["root_version_id"] or row["document_id"]),
previousVersionId=int(row["previous_version_id"]) if row["previous_version_id"] is not None else None,
typeId=int(row["type_id"]) if row["type_id"] is not None else None,
typeCode=row["type_code"],
region=row["region"],
normalizedName=row["normalized_name"],
fileId=int(row["file_id"]) if row["file_id"] is not None else None,
fileName=row["file_name"],
fileExt=row["file_ext"],
mimeType=row["mime_type"],
fileSize=int(row["file_size"]) if row["file_size"] is not None else None,
ossUrl=row["oss_url"],
processingStatus=row["processing_status"],
currentRunId=int(row["current_run_id"]) if row["current_run_id"] is not None else None,
runStatus=row["run_status"],
resultStatus=row["result_status"],
totalScore=float(row["total_score"]) if row["total_score"] is not None else None,
passedCount=int(row["passed_count"]) if row["passed_count"] is not None else None,
failedCount=int(row["failed_count"]) if row["failed_count"] is not None else None,
skippedCount=int(row["skipped_count"]) if row["skipped_count"] is not None else None,
updatedAt=row["updated_at"].isoformat() if row["updated_at"] else None,
hasHistory=bool(row["has_history"]),
totalVersions=int(row["total_versions"] or 1),
historyVersions=history_by_group.get(group_key, []),
)
)
total_pages = (total + page_size - 1) // page_size if total else 0
return DocumentListPageVO(
total=total,
page=page,
pageSize=page_size,
totalPages=total_pages,
documents=documents,
)
async def _find_latest_version_candidate(
session,
*,
type_id: int,
region: str,
normalized_name: str,
) -> dict | None:
"""Find the latest primary document version candidate by normalized name."""
result = await session.execute(
text(
"""
SELECT
d.id AS document_id,
d.version_group_key,
d.version_no,
d.root_version_id,
f.id AS file_id,
f.sha256
FROM leaudit_documents d
JOIN leaudit_document_files f
ON f.document_id = d.id
AND f.is_active = true
AND f.file_role = 'primary'
WHERE d.type_id = :type_id
AND d.region = :region
AND d.normalized_name = :normalized_name
AND d.is_latest_version = true
AND d.deleted_at IS NULL
ORDER BY d.version_no DESC, d.id DESC
LIMIT 1
"""
),
{
"type_id": type_id,
"region": region,
"normalized_name": normalized_name,
},
)
row = result.mappings().first()
return dict(row) if row else None
def _normalize_speed(speed: str | None) -> str:
"""Normalize front-end speed selection to urgent/normal."""
normalized = (speed or "").strip().lower()
if normalized in {"urgent", "high", "fast", "emergency", "紧急"}:
return "urgent"
return "normal"
def _normalize_document_name(file_name: str) -> str:
"""Build a stable name key for same-name version matching."""
stem = Path(file_name).stem
name = unicodedata.normalize("NFKC", stem).strip().lower()
name = re.sub(r"[\s_\-]+", " ", name)
name = re.sub(r"(?:\(|)\d+(?:\)|)$", "", name).strip()
name = re.sub(r"(?:[-_\s]*副本|[-_\s]*copy)$", "", name).strip()
name = re.sub(r"\s+", " ", name).strip()
return name or "untitled"