Files
leaudit-platform-backend/fastapi_modules/fastapi_leaudit/models/leauditDocumentFile.py
T
2026-04-28 16:53:16 +08:00

56 lines
2.5 KiB
Python

"""LeAudit 文档文件模型 —— leaudit_document_files 表。"""
from __future__ import annotations
from sqlalchemy import BigInteger, Boolean, String
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm import Mapped, mapped_column
from fastapi_common.fastapi_common_web.models import BaseModel
class LeauditDocumentFile(BaseModel):
"""文档文件表。"""
__tablename__ = "leaudit_document_files"
Id: Mapped[int] = mapped_column("id", BigInteger, primary_key=True, autoincrement=True)
documentId: Mapped[int] = mapped_column("document_id", BigInteger, comment="关联 leaudit_documents.id")
fileRole: Mapped[str] = mapped_column("file_role", String(64), comment="original/converted_pdf/merged_pdf/temp_input")
fileName: Mapped[str] = mapped_column("file_name", String(512), comment="文件名")
fileExt: Mapped[str | None] = mapped_column("file_ext", String(32), comment="扩展名")
mimeType: Mapped[str | None] = mapped_column("mime_type", String(128), comment="MIME")
fileSize: Mapped[int | None] = mapped_column("file_size", BigInteger, comment="文件大小")
sha256: Mapped[str | None] = mapped_column("sha256", String(64), comment="SHA256")
localPath: Mapped[str | None] = mapped_column("local_path", String(1024), comment="本地路径")
ossUrl: Mapped[str | None] = mapped_column("oss_url", String(2048), comment="OSS 地址")
storageProvider: Mapped[str | None] = mapped_column("storage_provider", String(32), comment="oss/minio/local")
isActive: Mapped[bool] = mapped_column("is_active", Boolean, default=True, comment="当前生效文件")
createdBy: Mapped[int | None] = mapped_column("created_by", BigInteger, comment="上传人")
@classmethod
async def deactivate_active_by_document(cls, session: AsyncSession, documentId: int) -> None:
"""把指定文档当前激活文件全部置为非激活。"""
from sqlalchemy import update
await session.execute(
update(cls)
.where(
cls.documentId == documentId,
cls.isActive.is_(True),
)
.values(isActive=False)
)
@classmethod
async def count_by_document(cls, session: AsyncSession, documentId: int) -> int:
"""统计指定文档历史文件版本数。"""
from sqlalchemy import func, select
return int(
await session.scalar(
select(func.count()).select_from(cls).where(cls.documentId == documentId)
)
or 0
)