56 lines
2.5 KiB
Python
56 lines
2.5 KiB
Python
"""LeAudit 文档文件模型 —— leaudit_document_files 表。"""
|
|
|
|
from __future__ import annotations
|
|
|
|
from sqlalchemy import BigInteger, Boolean, String
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
from sqlalchemy.orm import Mapped, mapped_column
|
|
|
|
from fastapi_common.fastapi_common_web.models import BaseModel
|
|
|
|
|
|
class LeauditDocumentFile(BaseModel):
|
|
"""文档文件表。"""
|
|
|
|
__tablename__ = "leaudit_document_files"
|
|
|
|
Id: Mapped[int] = mapped_column("id", BigInteger, primary_key=True, autoincrement=True)
|
|
documentId: Mapped[int] = mapped_column("document_id", BigInteger, comment="关联 leaudit_documents.id")
|
|
fileRole: Mapped[str] = mapped_column("file_role", String(64), comment="original/converted_pdf/merged_pdf/temp_input")
|
|
fileName: Mapped[str] = mapped_column("file_name", String(512), comment="文件名")
|
|
fileExt: Mapped[str | None] = mapped_column("file_ext", String(32), comment="扩展名")
|
|
mimeType: Mapped[str | None] = mapped_column("mime_type", String(128), comment="MIME")
|
|
fileSize: Mapped[int | None] = mapped_column("file_size", BigInteger, comment="文件大小")
|
|
sha256: Mapped[str | None] = mapped_column("sha256", String(64), comment="SHA256")
|
|
localPath: Mapped[str | None] = mapped_column("local_path", String(1024), comment="本地路径")
|
|
ossUrl: Mapped[str | None] = mapped_column("oss_url", String(2048), comment="OSS 地址")
|
|
storageProvider: Mapped[str | None] = mapped_column("storage_provider", String(32), comment="oss/minio/local")
|
|
isActive: Mapped[bool] = mapped_column("is_active", Boolean, default=True, comment="当前生效文件")
|
|
createdBy: Mapped[int | None] = mapped_column("created_by", BigInteger, comment="上传人")
|
|
|
|
@classmethod
|
|
async def deactivate_active_by_document(cls, session: AsyncSession, documentId: int) -> None:
|
|
"""把指定文档当前激活文件全部置为非激活。"""
|
|
from sqlalchemy import update
|
|
|
|
await session.execute(
|
|
update(cls)
|
|
.where(
|
|
cls.documentId == documentId,
|
|
cls.isActive.is_(True),
|
|
)
|
|
.values(isActive=False)
|
|
)
|
|
|
|
@classmethod
|
|
async def count_by_document(cls, session: AsyncSession, documentId: int) -> int:
|
|
"""统计指定文档历史文件版本数。"""
|
|
from sqlalchemy import func, select
|
|
|
|
return int(
|
|
await session.scalar(
|
|
select(func.count()).select_from(cls).where(cls.documentId == documentId)
|
|
)
|
|
or 0
|
|
)
|