from __future__ import annotations import mimetypes from pathlib import Path from typing import Any from fastapi import UploadFile from sqlalchemy import bindparam, text from fastapi_common.fastapi_common_sqlalchemy.database import GetAsyncSession from fastapi_common.fastapi_common_storage.oss_path_utils import OssPathUtils from fastapi_common.fastapi_common_web.domain.responses import StatusCodeEnum from fastapi_common.fastapi_common_web.exception.LeauditException import LeauditException from fastapi_modules.fastapi_leaudit.domian.Dto.contractTemplateDto import ( ContractTemplateCreateDTO, ContractTemplateListQueryDTO, ContractTemplateSearchQueryDTO, ) from fastapi_modules.fastapi_leaudit.domian.vo.contractTemplateVo import ( ContractTemplateCategoryVO, ContractTemplateCreateVO, ContractTemplateDetailVO, ContractTemplateListItemVO, ContractTemplatePageVO, ContractTemplateSearchCategoryVO, ContractTemplateSearchResultVO, ) from fastapi_modules.fastapi_leaudit.services.contractTemplateService import IContractTemplateService from fastapi_modules.fastapi_leaudit.services.ossService import IOssService from fastapi_modules.fastapi_leaudit.services.impl.ossServiceImpl import OssServiceImpl _ALLOWED_SORT_FIELDS = { "id": "t.id", "title": "t.title", "created_at": "t.created_at", "updated_at": "t.updated_at", } class ContractTemplateServiceImpl(IContractTemplateService): """合同模板服务实现。""" def __init__(self, OssService: IOssService | None = None) -> None: self.OssService = OssService or OssServiceImpl() async def ListCategories(self, IncludeDisabled: bool, WithTemplateCount: bool) -> list[ContractTemplateCategoryVO]: count_select = "COUNT(t.id)::int AS template_count" if WithTemplateCount else "0::int AS template_count" filters = ["c.deleted_at IS NULL"] if not IncludeDisabled: filters.append("1=1") sql = text( f""" SELECT c.id, c.name, c.icon, c.description, COALESCE(c.sort_order, 0) AS sort_order, {count_select}, TRUE AS is_enabled FROM contract_categories c LEFT JOIN contract_templates t ON t.category_id = c.id AND t.deleted_at IS NULL WHERE {' AND '.join(filters)} GROUP BY c.id, c.name, c.icon, c.description, c.sort_order ORDER BY COALESCE(c.sort_order, 0) ASC, c.name ASC """ ) async with GetAsyncSession() as session: await self._ensureContractTemplateSchema(session) rows = (await session.execute(sql)).mappings().all() return [self._to_category_vo(row) for row in rows] async def ListTemplates(self, Query: ContractTemplateListQueryDTO, CurrentUserId: int) -> ContractTemplatePageVO: async with GetAsyncSession() as session: await self._ensureContractTemplateSchema(session) currentUser = await self._getCurrentUserContext(CurrentUserId, session) where_clause, params, needs_category_name_filter = self._build_template_filters( keyword=Query.keyword, category_id=Query.category_id, category_name=Query.category_name, region=Query.region, file_format=Query.file_format, is_featured=Query.is_featured, currentUser=currentUser, ) order_sql = self._build_order_clause(Query.sort_by, Query.sort_order, default_field="updated_at", default_order="desc") offset = max(Query.page - 1, 0) * Query.page_size params.update({"limit": Query.page_size, "offset": offset}) from_sql = self._build_template_from_sql(needs_category_name_filter) count_sql = text( f""" SELECT COUNT(*) {from_sql} WHERE {where_clause} """ ) list_sql = text( f""" SELECT t.id, t.template_code, t.title, t.category_id, c.name AS category_name, c.icon AS category_icon, c.description AS category_description, t.region, t.description, t.file_path, t.pdf_file_path, t.file_format, t.original_file_name, t.mime_type, COALESCE(t.file_size, 0) AS file_size, t.pdf_file_size, COALESCE(t.is_featured, FALSE) AS is_featured, t.created_by, t.updated_by, t.created_at, t.updated_at {from_sql} WHERE {where_clause} ORDER BY {order_sql} LIMIT :limit OFFSET :offset """ ) count_sql, list_sql = self._bind_expanding(count_sql, list_sql, params) total = int((await session.execute(count_sql, params)).scalar_one()) rows = (await session.execute(list_sql, params)).mappings().all() return ContractTemplatePageVO( total=total, page=Query.page, page_size=Query.page_size, total_pages=max((total + Query.page_size - 1) // Query.page_size, 1) if total else 0, templates=[self._to_list_item_vo(row) for row in rows], ) async def SearchTemplates(self, Query: ContractTemplateSearchQueryDTO, CurrentUserId: int) -> ContractTemplateSearchResultVO: list_query = ContractTemplateListQueryDTO( keyword=Query.q, category_id=Query.category_id, category_name=Query.category_name, region=Query.region, page=Query.page, page_size=Query.page_size, sort_by=Query.sort_by, sort_order=Query.sort_order, ) page_result = await self.ListTemplates(list_query, CurrentUserId) category_stats = await self._load_search_category_stats(Query.q, Query.region, CurrentUserId) return ContractTemplateSearchResultVO( total=page_result.total, page=page_result.page, page_size=page_result.page_size, total_pages=page_result.total_pages, templates=page_result.templates, category_stats=category_stats, ) async def GetTemplateDetail(self, TemplateId: int, CurrentUserId: int) -> ContractTemplateDetailVO | None: async with GetAsyncSession() as session: await self._ensureContractTemplateSchema(session) currentUser = await self._getCurrentUserContext(CurrentUserId, session) params: dict[str, Any] = {"template_id": TemplateId} scope_filters = self._build_template_scope_filters(currentUser, params, requestedRegion=None) sql = text( f""" SELECT t.id, t.template_code, t.title, t.category_id, c.name AS category_name, c.icon AS category_icon, c.description AS category_description, t.region, t.description, t.file_path, t.pdf_file_path, t.file_format, t.original_file_name, t.mime_type, COALESCE(t.file_size, 0) AS file_size, t.pdf_file_size, COALESCE(t.is_featured, FALSE) AS is_featured, t.created_by, t.updated_by, t.created_at, t.updated_at FROM contract_templates t LEFT JOIN contract_categories c ON c.id = t.category_id WHERE t.id = :template_id AND t.deleted_at IS NULL AND c.deleted_at IS NULL AND {' AND '.join(scope_filters)} LIMIT 1 """ ) row = (await session.execute(sql, params)).mappings().first() if not row: return None return self._to_detail_vo(row) async def CreateTemplate( self, Body: ContractTemplateCreateDTO, File: UploadFile, PdfFile: UploadFile | None, CurrentUserId: int, ) -> ContractTemplateCreateVO: if File is None or not File.filename: raise LeauditException(StatusCodeEnum.HTTP_400_BAD_REQUEST, "模板主文件不能为空") fileContent = await File.read() if not fileContent: raise LeauditException(StatusCodeEnum.HTTP_400_BAD_REQUEST, "模板主文件内容不能为空") normalizedCode = (Body.template_code or "").strip() normalizedTitle = (Body.title or "").strip() if not normalizedCode: raise LeauditException(StatusCodeEnum.HTTP_400_BAD_REQUEST, "模板编码不能为空") if not normalizedTitle: raise LeauditException(StatusCodeEnum.HTTP_400_BAD_REQUEST, "模板标题不能为空") fileExt = Path(File.filename).suffix.lstrip(".").lower() if fileExt not in {"doc", "docx", "pdf"}: raise LeauditException(StatusCodeEnum.HTTP_400_BAD_REQUEST, "当前仅支持上传 DOC、DOCX、PDF 模板") mimeType = File.content_type or mimetypes.guess_type(File.filename)[0] or "application/octet-stream" pdfContent: bytes | None = None pdfMimeType: str | None = None pdfFileName: str | None = None if PdfFile and PdfFile.filename: pdfContent = await PdfFile.read() if not pdfContent: raise LeauditException(StatusCodeEnum.HTTP_400_BAD_REQUEST, "预览 PDF 文件内容不能为空") pdfExt = Path(PdfFile.filename).suffix.lstrip(".").lower() if pdfExt != "pdf": raise LeauditException(StatusCodeEnum.HTTP_400_BAD_REQUEST, "预览文件仅支持 PDF") pdfMimeType = PdfFile.content_type or "application/pdf" pdfFileName = PdfFile.filename async with GetAsyncSession() as session: await self._ensureContractTemplateSchema(session) currentUser = await self._getCurrentUserContext(CurrentUserId, session) resolvedRegion = self._resolve_upload_region(currentUser, Body.region) categoryRow = ( await session.execute( text( """ SELECT id, name FROM contract_categories WHERE id = :category_id AND deleted_at IS NULL LIMIT 1 """ ), {"category_id": Body.category_id}, ) ).mappings().first() if not categoryRow: raise LeauditException(StatusCodeEnum.HTTP_404_NOT_FOUND, "合同模板分类不存在") duplicateRow = ( await session.execute( text( """ SELECT id FROM contract_templates WHERE region = :region AND template_code = :template_code AND deleted_at IS NULL LIMIT 1 """ ), {"region": resolvedRegion, "template_code": normalizedCode}, ) ).mappings().first() if duplicateRow: raise LeauditException(StatusCodeEnum.HTTP_409_CONFLICT, f"当前地区已存在模板编码 {normalizedCode}") categoryName = str(categoryRow["name"] or "未分类") objectKey = OssPathUtils.BuildContractTemplateKey( Region=resolvedRegion, CategoryName=categoryName, TemplateCode=normalizedCode, FileRole="source", FileName=File.filename, ) filePath = await self.OssService.UploadBytes( ObjectKey=objectKey, Content=fileContent, ContentType=mimeType, ) pdfPath: str | None = None if pdfContent is not None and pdfFileName: pdfObjectKey = OssPathUtils.BuildContractTemplateKey( Region=resolvedRegion, CategoryName=categoryName, TemplateCode=normalizedCode, FileRole="preview", FileName=pdfFileName, ) pdfPath = await self.OssService.UploadBytes( ObjectKey=pdfObjectKey, Content=pdfContent, ContentType=pdfMimeType or "application/pdf", ) createdRow = ( await session.execute( text( """ INSERT INTO contract_templates ( template_code, title, category_id, region, description, file_path, pdf_file_path, file_format, original_file_name, mime_type, file_size, pdf_file_size, is_featured, created_by, updated_by, created_at, updated_at ) VALUES ( :template_code, :title, :category_id, :region, :description, :file_path, :pdf_file_path, :file_format, :original_file_name, :mime_type, :file_size, :pdf_file_size, :is_featured, :created_by, :updated_by, NOW(), NOW() ) RETURNING id """ ), { "template_code": normalizedCode, "title": normalizedTitle, "category_id": Body.category_id, "region": resolvedRegion, "description": (Body.description or "").strip() or None, "file_path": filePath, "pdf_file_path": pdfPath, "file_format": fileExt, "original_file_name": File.filename, "mime_type": mimeType, "file_size": len(fileContent), "pdf_file_size": len(pdfContent) if pdfContent is not None else None, "is_featured": Body.is_featured, "created_by": CurrentUserId, "updated_by": CurrentUserId, }, ) ).mappings().first() await session.commit() if not createdRow: raise LeauditException(StatusCodeEnum.HTTP_500_INTERNAL_SERVER_ERROR, "合同模板创建失败") detail = await self.GetTemplateDetail(int(createdRow["id"]), CurrentUserId) if not detail: raise LeauditException(StatusCodeEnum.HTTP_500_INTERNAL_SERVER_ERROR, "合同模板创建成功但详情读取失败") return ContractTemplateCreateVO(**detail.model_dump()) async def DeleteTemplate(self, TemplateId: int, CurrentUserId: int) -> None: async with GetAsyncSession() as session: await self._ensureContractTemplateSchema(session) currentUser = await self._getCurrentUserContext(CurrentUserId, session) params: dict[str, Any] = {"template_id": TemplateId} scope_filters = self._build_template_scope_filters(currentUser, params, requestedRegion=None, writable=True) row = ( await session.execute( text( f""" SELECT id FROM contract_templates t WHERE t.id = :template_id AND t.deleted_at IS NULL AND {' AND '.join(scope_filters)} LIMIT 1 """ ), params, ) ).mappings().first() if not row: raise LeauditException(StatusCodeEnum.HTTP_404_NOT_FOUND, "合同模板不存在或无权删除") await session.execute( text( """ UPDATE contract_templates SET deleted_at = NOW(), updated_at = NOW(), updated_by = :updated_by WHERE id = :template_id AND deleted_at IS NULL """ ), {"template_id": TemplateId, "updated_by": CurrentUserId}, ) await session.commit() async def _load_search_category_stats( self, keyword: str, requestedRegion: str | None, CurrentUserId: int, ) -> list[ContractTemplateSearchCategoryVO]: clean_keyword = (keyword or "").strip() if not clean_keyword: return [] async with GetAsyncSession() as session: await self._ensureContractTemplateSchema(session) currentUser = await self._getCurrentUserContext(CurrentUserId, session) params: dict[str, Any] = {"keyword": f"%{clean_keyword}%"} scope_filters = self._build_template_scope_filters(currentUser, params, requestedRegion=requestedRegion) filters = [ "c.deleted_at IS NULL", "t.deleted_at IS NULL", "(" "t.title ILIKE :keyword " "OR COALESCE(t.description, '') ILIKE :keyword " "OR COALESCE(t.template_code, '') ILIKE :keyword " "OR COALESCE(c.name, '') ILIKE :keyword" ")", *scope_filters, ] sql = text( f""" SELECT c.id, c.name, COUNT(t.id)::int AS search_count FROM contract_categories c LEFT JOIN contract_templates t ON t.category_id = c.id WHERE {' AND '.join(filters)} GROUP BY c.id, c.name ORDER BY c.name ASC """ ) rows = (await session.execute(sql, params)).mappings().all() return [ ContractTemplateSearchCategoryVO( id=int(row["id"]), name=str(row["name"] or ""), search_count=int(row["search_count"] or 0), ) for row in rows if row.get("id") is not None ] def _build_template_filters( self, keyword: str | None, category_id: int | None, category_name: str | None, region: str | None, file_format: str | None, is_featured: bool | None, currentUser: dict[str, Any], ) -> tuple[str, dict[str, Any], bool]: filters = ["t.deleted_at IS NULL", "c.deleted_at IS NULL"] params: dict[str, Any] = {} needs_category_name_filter = False filters.extend(self._build_template_scope_filters(currentUser, params, region)) if category_id is not None: filters.append("t.category_id = :category_id") params["category_id"] = category_id elif category_name: filters.append("c.name = :category_name") params["category_name"] = category_name.strip() needs_category_name_filter = True if file_format: filters.append("LOWER(t.file_format) = :file_format") params["file_format"] = file_format.strip().lower() if is_featured is not None: filters.append("COALESCE(t.is_featured, FALSE) = :is_featured") params["is_featured"] = is_featured clean_keyword = (keyword or "").strip() if clean_keyword: filters.append( "(" "t.title ILIKE :keyword " "OR COALESCE(t.description, '') ILIKE :keyword " "OR COALESCE(t.template_code, '') ILIKE :keyword " "OR COALESCE(c.name, '') ILIKE :keyword" ")" ) params["keyword"] = f"%{clean_keyword}%" needs_category_name_filter = True return " AND ".join(filters), params, needs_category_name_filter def _build_template_from_sql(self, needs_category_name_filter: bool) -> str: _ = needs_category_name_filter return """ FROM contract_templates t LEFT JOIN contract_categories c ON c.id = t.category_id """ def _build_order_clause(self, sort_by: str | None, sort_order: str | None, default_field: str, default_order: str) -> str: field = _ALLOWED_SORT_FIELDS.get(str(sort_by or "").strip().lower(), _ALLOWED_SORT_FIELDS[default_field]) direction = "DESC" if str(sort_order or default_order).strip().lower() == "desc" else "ASC" return f"{field} {direction}, t.id ASC" def _bind_expanding(self, *sql_objects_and_params: Any): sql_objects = list(sql_objects_and_params[:-1]) params = sql_objects_and_params[-1] if "visible_regions" in params: sql_objects = [sql.bindparams(bindparam("visible_regions", expanding=True)) for sql in sql_objects] return tuple(sql_objects) def _to_category_vo(self, row: Any) -> ContractTemplateCategoryVO: return ContractTemplateCategoryVO( id=int(row["id"]), name=str(row["name"] or ""), icon=row.get("icon"), description=row.get("description"), sort_order=int(row.get("sort_order") or 0), template_count=int(row.get("template_count") or 0), is_enabled=bool(row.get("is_enabled", True)), ) def _to_list_item_vo(self, row: Any) -> ContractTemplateListItemVO: return ContractTemplateListItemVO( id=int(row["id"]), template_code=str(row.get("template_code") or ""), title=str(row.get("title") or ""), category_id=int(row.get("category_id") or 0), category_name=row.get("category_name"), category_icon=row.get("category_icon"), description=row.get("description"), region=str(row.get("region") or "省级"), file_path=row.get("file_path"), pdf_file_path=row.get("pdf_file_path"), file_format=str(row.get("file_format") or ""), original_file_name=row.get("original_file_name"), mime_type=row.get("mime_type"), file_size=int(row.get("file_size") or 0), pdf_file_size=int(row["pdf_file_size"]) if row.get("pdf_file_size") is not None else None, is_featured=bool(row.get("is_featured", False)), created_by=int(row["created_by"]) if row.get("created_by") is not None else None, updated_by=int(row["updated_by"]) if row.get("updated_by") is not None else None, created_at=self._stringify_time(row.get("created_at")), updated_at=self._stringify_time(row.get("updated_at")), ) def _to_detail_vo(self, row: Any) -> ContractTemplateDetailVO: base = self._to_list_item_vo(row) return ContractTemplateDetailVO( **base.model_dump(), category_description=row.get("category_description"), placeholder_schema=None, ) def _stringify_time(self, value: Any) -> str | None: if value is None: return None return str(value) def _build_template_scope_filters( self, currentUser: dict[str, Any], params: dict[str, Any], requestedRegion: str | None, writable: bool = False, ) -> list[str]: requested = (requestedRegion or "").strip() area = str(currentUser["area"] or "").strip() if currentUser["is_global"]: if requested: params["requested_region"] = requested return ["t.region = :requested_region"] return ["1=1"] if writable: if not area: return ["1=0"] if requested and requested != area: return ["1=0"] params["scope_region"] = area return ["t.region = :scope_region"] if currentUser["can_manage"]: if not area: return ["1=0"] if requested: if requested == "省级": params["requested_region"] = requested return ["t.region = :requested_region"] if requested != area: return ["1=0"] params["requested_region"] = requested return ["t.region = :requested_region"] params["visible_regions"] = ["省级", area] return ["t.region IN :visible_regions"] if requested: if requested == "省级": params["requested_region"] = requested return ["t.region = :requested_region"] if area and requested == area: params["requested_region"] = requested return ["t.region = :requested_region"] return ["1=0"] if area: params["visible_regions"] = ["省级", area] return ["t.region IN :visible_regions"] params["requested_region"] = "省级" return ["t.region = :requested_region"] async def _getCurrentUserContext(self, CurrentUserId: int, session=None) -> dict[str, Any]: own_session = False if session is None: own_session = True session_cm = GetAsyncSession() session = await session_cm.__aenter__() try: row = ( await session.execute( text( """ SELECT u.id, COALESCE(u.area, '') AS area, COALESCE(bool_or(r.role_key IN ('super_admin', 'provincial_admin')), FALSE) AS is_global, COALESCE(bool_or(r.role_key IN ('super_admin', 'provincial_admin', 'admin')), FALSE) AS can_manage FROM sso_users u LEFT JOIN user_role ur ON ur.user_id = u.id LEFT JOIN roles r ON r.id = ur.role_id WHERE u.id = :user_id GROUP BY u.id, u.area """ ), {"user_id": CurrentUserId}, ) ).mappings().first() if not row: raise LeauditException(StatusCodeEnum.HTTP_404_NOT_FOUND, "当前用户不存在") return { "id": int(row["id"]), "area": str(row["area"] or ""), "is_global": bool(row["is_global"]), "can_manage": bool(row["can_manage"]), } finally: if own_session: await session_cm.__aexit__(None, None, None) def _resolve_upload_region(self, currentUser: dict[str, Any], requestedRegion: str | None) -> str: area = str(currentUser["area"] or "").strip() if currentUser["can_manage"]: if not area: raise LeauditException(StatusCodeEnum.HTTP_403_FORBIDDEN, "当前管理员账号未配置地区,无法上传合同模板") return area raise LeauditException(StatusCodeEnum.HTTP_403_FORBIDDEN, "当前用户没有上传合同模板权限") async def _ensureContractTemplateSchema(self, session) -> None: statements = [ """ ALTER TABLE contract_categories ADD COLUMN IF NOT EXISTS created_by BIGINT """, """ ALTER TABLE contract_categories ADD COLUMN IF NOT EXISTS updated_by BIGINT """, """ ALTER TABLE contract_categories ADD COLUMN IF NOT EXISTS deleted_at TIMESTAMPTZ """, """ ALTER TABLE contract_templates ADD COLUMN IF NOT EXISTS region VARCHAR(50) NOT NULL DEFAULT '省级' """, """ ALTER TABLE contract_templates ADD COLUMN IF NOT EXISTS pdf_file_path VARCHAR(500) """, """ ALTER TABLE contract_templates ADD COLUMN IF NOT EXISTS original_file_name VARCHAR(500) NOT NULL DEFAULT '' """, """ ALTER TABLE contract_templates ADD COLUMN IF NOT EXISTS mime_type VARCHAR(200) """, """ ALTER TABLE contract_templates ADD COLUMN IF NOT EXISTS file_size BIGINT NOT NULL DEFAULT 0 """, """ ALTER TABLE contract_templates ADD COLUMN IF NOT EXISTS pdf_file_size BIGINT """, """ ALTER TABLE contract_templates ADD COLUMN IF NOT EXISTS created_by BIGINT """, """ ALTER TABLE contract_templates ADD COLUMN IF NOT EXISTS updated_by BIGINT """, """ ALTER TABLE contract_templates ADD COLUMN IF NOT EXISTS deleted_at TIMESTAMPTZ """, ] for statement in statements: await session.execute(text(statement)) await session.execute( text( """ UPDATE contract_templates SET region = '省级' WHERE region IS NULL OR BTRIM(region) = '' """ ) ) await session.execute( text( """ UPDATE contract_templates SET original_file_name = COALESCE(NULLIF(original_file_name, ''), title || CASE WHEN file_format IS NOT NULL AND BTRIM(file_format) <> '' THEN '.' || LOWER(file_format) ELSE '' END) WHERE original_file_name IS NULL OR BTRIM(original_file_name) = '' """ ) ) await session.commit()