Files

390 lines
19 KiB
Python

"""文档控制器。"""
import json
from typing import Any
from fastapi import Depends, File, Form, Query, UploadFile
from pydantic import BaseModel, Field
from sqlalchemy import text
from fastapi_common.fastapi_common_sqlalchemy.database import GetAsyncSession
from fastapi_common.fastapi_common_web.controller import BaseController
from fastapi_common.fastapi_common_web.domain.responses import Result, StatusCodeEnum
from fastapi_common.fastapi_common_web.exception.LeauditException import LeauditException
from fastapi_common.fastapi_common_security.security import verify_access_token
from fastapi_modules.fastapi_leaudit.domian.vo.documentVo import (
DocumentDetailVO,
DocumentListPageVO,
DocumentStatusItemVO,
DocumentUpdateDTO,
DocumentTypeCreateDTO,
DocumentTypeItemVO,
DocumentTypeRootCreateDTO,
DocumentTypeRootItemVO,
DocumentTypeRootUpdateDTO,
ContractTemplateUploadVO,
DocumentTypeUpdateDTO,
DocumentUploadVO,
)
from fastapi_modules.fastapi_leaudit.domian.vo.reviewPointVo import (
DocumentConfirmVO,
ReviewPointAuditVO,
ReviewPointsAggregateVO,
)
from fastapi_modules.fastapi_leaudit.services import IDocumentService
from fastapi_modules.fastapi_leaudit.services.impl.documentServiceImpl import DocumentServiceImpl
class QueueStatusVO(BaseModel):
success: bool = Field(True)
timestamp: str = Field("")
queue: dict[str, int] = Field(default_factory=lambda: {"pending_tasks": 0, "processing_tasks": 0, "available_slots": 4, "max_concurrent": 4})
documents: dict[str, object] = Field(default_factory=lambda: {"waiting": 0, "processing": 0, "processing_ids": []})
class ReviewPointAuditDTO(BaseModel):
result: str = Field(..., description="true/false/review")
message: str = Field("", description="审核意见")
class DocumentController(BaseController):
"""文档控制器。"""
def __init__(self):
super().__init__(prefix="", tags=["文档"])
self.DocumentService: IDocumentService = DocumentServiceImpl()
@self.router.post("/upload", response_model=Result[DocumentUploadVO])
async def UploadDocument(
file: UploadFile = File(..., description="上传文档"),
attachments: list[UploadFile] | None = File(None, description="合同附件列表"),
typeId: int | None = Form(None, description="文档类型ID"),
typeCode: str | None = Form(None, description="文档类型编码"),
groupId: int | None = Form(None, description="二级分组ID"),
region: str = Form("default", description="所属地区"),
fileRole: str = Form("primary", description="文件角色"),
createdBy: int | None = Form(None, description="上传用户ID"),
autoRun: bool = Form(False, description="是否上传后自动触发评查"),
speed: str = Form("normal", description="执行速度档位:urgent/normal"),
payload: dict[str, Any] = Depends(verify_access_token),
):
"""上传文档并建立评查输入。"""
Content = await file.read()
attachmentPayloads: list[tuple[str, bytes, str | None]] = []
for attachment in attachments or []:
attachmentContent = await attachment.read()
attachmentPayloads.append(
(
attachment.filename or "attachment.bin",
attachmentContent,
attachment.content_type,
)
)
Data = await self.DocumentService.Upload(
FileName=file.filename or "upload.bin",
FileContent=Content,
ContentType=file.content_type,
TypeId=typeId,
TypeCode=typeCode,
GroupId=groupId,
Region=region,
FileRole=fileRole,
CreatedBy=int(payload["user_id"]),
Attachments=attachmentPayloads,
AutoRun=autoRun,
Speed=speed,
)
return Result.success(data=Data)
@self.router.post("/upload/upload_contract_template", response_model=Result[ContractTemplateUploadVO])
async def UploadContractTemplate(
file: UploadFile = File(..., description="合同模板文件"),
upload_info: str = Form(..., description="模板上传信息 JSON,包含 document_id/comparison_id"),
payload: dict[str, Any] = Depends(verify_access_token),
):
"""兼容旧前端的合同模板上传接口。"""
try:
uploadInfo = json.loads(upload_info or "{}")
except json.JSONDecodeError as error:
raise LeauditException(StatusCodeEnum.HTTP_400_BAD_REQUEST, "upload_info 不是合法 JSON") from error
documentId = int(uploadInfo.get("document_id") or 0)
comparisonIdRaw = uploadInfo.get("comparison_id")
comparisonId = int(comparisonIdRaw) if comparisonIdRaw not in (None, "") else None
content = await file.read()
data = await self.DocumentService.UploadContractTemplate(
CurrentUserId=int(payload["user_id"]),
DocumentId=documentId,
FileName=file.filename or "template.bin",
FileContent=content,
ContentType=file.content_type,
ComparisonId=comparisonId,
)
return Result.success(data=data, message="合同模板上传成功")
@self.router.get("/documents/list", response_model=Result[DocumentListPageVO])
async def ListDocuments(
page: int = 1,
pageSize: int = 20,
keyword: str | None = None,
documentNumber: str | None = None,
typeCode: str | None = None,
type_ids: str | None = Query(None, description="逗号分隔的文档类型ID列表"),
entry_module_id: int | None = Query(None, description="按入口模块ID过滤文档"),
region: str | None = None,
processingStatus: str | None = None,
resultStatus: str | None = None,
auditStatus: int | None = Query(None, description="按人工审核状态过滤"),
userId: int | None = Query(None, description="按用户ID过滤"),
dateFrom: str | None = Query(None, description="起始日期 (YYYY-MM-DD)"),
dateTo: str | None = Query(None, description="结束日期 (YYYY-MM-DD)"),
payload: dict[str, Any] = Depends(verify_access_token),
):
"""获取文档列表(带数据隔离:省级全量、地市仅本地区、普通用户仅自己)。"""
typeIdList: list[int] | None = None
if type_ids:
typeIdList = [int(x.strip()) for x in type_ids.split(",") if x.strip().isdigit()]
Data = await self.DocumentService.ListDocuments(
CurrentUserId=int(payload["user_id"]),
Page=page,
PageSize=pageSize,
Keyword=keyword,
DocumentNumber=documentNumber,
TypeCode=typeCode,
TypeIds=typeIdList,
EntryModuleId=entry_module_id,
Region=region,
ProcessingStatus=processingStatus,
ResultStatus=resultStatus,
AuditStatus=auditStatus,
UserId=userId,
DateFrom=dateFrom,
DateTo=dateTo,
)
return Result.success(data=Data)
@self.router.get("/documents/status", response_model=Result[list[DocumentStatusItemVO]])
async def GetDocumentsStatus(
ids: str = Query(..., description="逗号分隔的文档ID列表"),
payload: dict[str, Any] = Depends(verify_access_token),
):
"""批量获取文档状态(带数据隔离校验)。"""
idList = [int(x.strip()) for x in ids.split(",") if x.strip().isdigit()]
Data = await self.DocumentService.GetDocumentsStatus(CurrentUserId=int(payload["user_id"]), Ids=idList)
return Result.success(data=Data)
@self.router.get("/documents/{DocumentId}", response_model=Result[DocumentDetailVO])
async def GetDocument(
DocumentId: int,
payload: dict[str, Any] = Depends(verify_access_token),
):
"""获取单个文档详情(带数据隔离校验)。"""
Data = await self.DocumentService.GetDocument(CurrentUserId=int(payload["user_id"]), Id=DocumentId)
return Result.success(data=Data)
@self.router.get("/v3/review-points/{DocumentId}", response_model=Result[ReviewPointsAggregateVO])
async def GetReviewPoints(
DocumentId: int,
payload: dict[str, Any] = Depends(verify_access_token),
):
"""获取评查详情页聚合数据(带数据隔离校验)。"""
Data = await self.DocumentService.GetReviewPoints(CurrentUserId=int(payload["user_id"]), DocumentId=DocumentId)
return Result.success(data=Data)
@self.router.patch("/v3/review-points/{ReviewPointResultId}/audit", response_model=Result[ReviewPointAuditVO])
async def AuditReviewPoint(
ReviewPointResultId: int,
Body: ReviewPointAuditDTO,
payload: dict[str, Any] = Depends(verify_access_token),
):
"""更新单个评查点的人工审核状态。"""
Data = await self.DocumentService.AuditReviewPoint(
CurrentUserId=int(payload["user_id"]),
ReviewPointResultId=ReviewPointResultId,
Result=Body.result,
Message=Body.message,
)
return Result.success(data=Data, message="评查点审核状态已更新")
@self.router.patch("/v3/documents/{DocumentId}/confirm", response_model=Result[DocumentConfirmVO])
async def ConfirmReviewResults(
DocumentId: int,
payload: dict[str, Any] = Depends(verify_access_token),
):
"""确认文档评查结果。"""
Data = await self.DocumentService.ConfirmReviewResults(CurrentUserId=int(payload["user_id"]), DocumentId=DocumentId)
return Result.success(data=Data, message="评查结果已确认")
@self.router.post("/documents/{DocumentId}/attachments", response_model=Result[DocumentDetailVO])
async def AppendAttachments(
DocumentId: int,
files: list[UploadFile] = File(..., description="附件文件列表"),
mergeMode: str = Form("new", description="附件合并模式:overwrite/new"),
remark: str | None = Form(None, description="本次追加附件备注"),
payload: dict[str, Any] = Depends(verify_access_token),
):
"""为现有文档追加附件(带数据隔离校验)。"""
filePayloads: list[tuple[str, bytes, str | None]] = []
for file in files:
content = await file.read()
filePayloads.append((file.filename or "attachment.bin", content, file.content_type))
Data = await self.DocumentService.AppendAttachments(
CurrentUserId=int(payload["user_id"]),
Id=DocumentId,
Files=filePayloads,
MergeMode=mergeMode,
Remark=remark,
)
return Result.success(data=Data, message="附件上传成功")
@self.router.put("/documents/{DocumentId}", response_model=Result[DocumentDetailVO])
async def UpdateDocument(
DocumentId: int,
Body: DocumentUpdateDTO,
payload: dict[str, Any] = Depends(verify_access_token),
):
"""更新文档元数据(带数据隔离校验)。"""
Data = await self.DocumentService.UpdateDocument(
CurrentUserId=int(payload["user_id"]),
Id=DocumentId,
Body=Body,
)
return Result.success(data=Data, message="文档更新成功")
@self.router.delete("/documents/{DocumentId}", response_model=Result[None])
async def DeleteDocument(
DocumentId: int,
payload: dict[str, Any] = Depends(verify_access_token),
):
"""软删除文档(带数据隔离校验)。"""
await self.DocumentService.DeleteDocument(CurrentUserId=int(payload["user_id"]), Id=DocumentId)
return Result.success(message="文档已删除")
@self.router.get("/document-types", response_model=Result[list[DocumentTypeItemVO]])
async def ListDocumentTypes(
ids: str | None = Query(None, description="逗号分隔的ID列表,不传则返回全部"),
entry_module_id: int | None = Query(None, description="按入口模块ID过滤文档类型"),
):
"""获取文档类型列表。"""
idList: list[int] | None = None
if ids:
idList = [int(x.strip()) for x in ids.split(",") if x.strip().isdigit()]
Data = await self.DocumentService.ListDocumentTypes(Ids=idList, EntryModuleId=entry_module_id)
return Result.success(data=Data)
@self.router.get("/document-types/{TypeId}", response_model=Result[DocumentTypeItemVO])
async def GetDocumentType(TypeId: int):
"""获取文档类型详情。"""
Data = await self.DocumentService.GetDocumentType(Id=TypeId)
return Result.success(data=Data)
@self.router.post("/document-types", response_model=Result[DocumentTypeItemVO])
async def CreateDocumentType(Body: DocumentTypeCreateDTO):
"""创建文档类型。"""
Data = await self.DocumentService.CreateDocumentType(Body=Body)
return Result.success(data=Data, message="文档类型创建成功")
@self.router.put("/document-types/{TypeId}", response_model=Result[DocumentTypeItemVO])
async def UpdateDocumentType(TypeId: int, Body: DocumentTypeUpdateDTO):
"""更新文档类型。"""
Data = await self.DocumentService.UpdateDocumentType(Id=TypeId, Body=Body)
return Result.success(data=Data, message="文档类型更新成功")
@self.router.delete("/document-types/{TypeId}", response_model=Result[None])
async def DeleteDocumentType(TypeId: int):
"""删除文档类型(软删除)。"""
await self.DocumentService.DeleteDocumentType(Id=TypeId)
return Result.success(message="文档类型已删除")
@self.router.get("/v3/document-type-roots", response_model=Result[list[DocumentTypeRootItemVO]])
async def ListDocumentTypeRoots(
entry_module_id: int | None = Query(None, description="按入口模块过滤一级大类"),
):
"""获取一级文档类型(业务大类)列表。"""
Data = await self.DocumentService.ListDocumentTypeRoots(EntryModuleId=entry_module_id)
return Result.success(data=Data)
@self.router.get("/v3/document-type-roots/{RootId}", response_model=Result[DocumentTypeRootItemVO])
async def GetDocumentTypeRoot(RootId: int):
"""获取一级文档类型(业务大类)详情。"""
Data = await self.DocumentService.GetDocumentTypeRoot(Id=RootId)
return Result.success(data=Data)
@self.router.post("/v3/document-type-roots", response_model=Result[DocumentTypeRootItemVO])
async def CreateDocumentTypeRoot(Body: DocumentTypeRootCreateDTO):
"""创建一级文档类型(业务大类)。"""
Data = await self.DocumentService.CreateDocumentTypeRoot(Body=Body)
return Result.success(data=Data, message="一级文档类型创建成功")
@self.router.put("/v3/document-type-roots/{RootId}", response_model=Result[DocumentTypeRootItemVO])
async def UpdateDocumentTypeRoot(RootId: int, Body: DocumentTypeRootUpdateDTO):
"""更新一级文档类型(业务大类)。"""
Data = await self.DocumentService.UpdateDocumentTypeRoot(Id=RootId, Body=Body)
return Result.success(data=Data, message="一级文档类型更新成功")
@self.router.get("/v2/system/queue/status", response_model=Result[QueueStatusVO])
async def GetQueueStatus():
"""获取文档处理队列状态。"""
from datetime import datetime
from fastapi_admin.config import LEAUDIT_WORKER_CONCURRENCY
async with GetAsyncSession() as Session:
runStats = (
await Session.execute(
text(
"""
SELECT
COUNT(*) FILTER (WHERE r.status IN ('queued', 'pending', 'retrying'))::int AS pending_tasks,
COUNT(*) FILTER (WHERE r.status = 'running')::int AS processing_tasks
FROM leaudit_audit_runs r
JOIN leaudit_documents d
ON d.current_run_id = r.id
WHERE d.deleted_at IS NULL
AND d.is_latest_version = true
"""
)
)
).mappings().first()
pending = int((runStats or {}).get("pending_tasks") or 0)
processing = int((runStats or {}).get("processing_tasks") or 0)
processingIdsRows = (
await Session.execute(
text(
"""
SELECT d.id
FROM leaudit_documents d
JOIN leaudit_audit_runs r
ON r.id = d.current_run_id
WHERE d.deleted_at IS NULL
AND d.is_latest_version = true
AND r.status = 'running'
ORDER BY COALESCE(r.started_at, r.created_at) DESC, d.id DESC
LIMIT 50
"""
)
)
).fetchall()
processingIds = [int(r[0]) for r in processingIdsRows]
return Result.success(
data=QueueStatusVO(
success=True,
timestamp=datetime.now().isoformat(),
queue={
"pending_tasks": pending,
"processing_tasks": processing,
"available_slots": max(0, LEAUDIT_WORKER_CONCURRENCY - processing),
"max_concurrent": LEAUDIT_WORKER_CONCURRENCY,
},
documents={
"waiting": pending,
"processing": processing,
"processing_ids": processingIds,
},
)
)