"""LeAudit 文档文件模型 —— leaudit_document_files 表。""" from __future__ import annotations from sqlalchemy import BigInteger, Boolean, String from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.orm import Mapped, mapped_column from fastapi_common.fastapi_common_web.models import BaseModel class LeauditDocumentFile(BaseModel): """文档文件表。""" __tablename__ = "leaudit_document_files" Id: Mapped[int] = mapped_column("id", BigInteger, primary_key=True, autoincrement=True) documentId: Mapped[int] = mapped_column("document_id", BigInteger, comment="关联 leaudit_documents.id") fileRole: Mapped[str] = mapped_column("file_role", String(64), comment="original/converted_pdf/merged_pdf/temp_input") fileName: Mapped[str] = mapped_column("file_name", String(512), comment="文件名") fileExt: Mapped[str | None] = mapped_column("file_ext", String(32), comment="扩展名") mimeType: Mapped[str | None] = mapped_column("mime_type", String(128), comment="MIME") fileSize: Mapped[int | None] = mapped_column("file_size", BigInteger, comment="文件大小") sha256: Mapped[str | None] = mapped_column("sha256", String(64), comment="SHA256") localPath: Mapped[str | None] = mapped_column("local_path", String(1024), comment="本地路径") ossUrl: Mapped[str | None] = mapped_column("oss_url", String(2048), comment="OSS 地址") storageProvider: Mapped[str | None] = mapped_column("storage_provider", String(32), comment="oss/minio/local") isActive: Mapped[bool] = mapped_column("is_active", Boolean, default=True, comment="当前生效文件") createdBy: Mapped[int | None] = mapped_column("created_by", BigInteger, comment="上传人") @classmethod async def deactivate_active_by_document(cls, session: AsyncSession, documentId: int) -> None: """把指定文档当前激活文件全部置为非激活。""" from sqlalchemy import update await session.execute( update(cls) .where( cls.documentId == documentId, cls.isActive.is_(True), ) .values(isActive=False) ) @classmethod async def count_by_document(cls, session: AsyncSession, documentId: int) -> int: """统计指定文档历史文件版本数。""" from sqlalchemy import func, select return int( await session.scalar( select(func.count()).select_from(cls).where(cls.documentId == documentId) ) or 0 )