Files
leaudit-platform-backend/fastapi_modules/fastapi_leaudit/services/impl/contractTemplateServiceImpl.py
T

772 lines
31 KiB
Python

from __future__ import annotations
import mimetypes
from pathlib import Path
from typing import Any
from fastapi import UploadFile
from sqlalchemy import bindparam, text
from fastapi_common.fastapi_common_sqlalchemy.database import GetAsyncSession
from fastapi_common.fastapi_common_storage.oss_path_utils import OssPathUtils
from fastapi_common.fastapi_common_web.domain.responses import StatusCodeEnum
from fastapi_common.fastapi_common_web.exception.LeauditException import LeauditException
from fastapi_modules.fastapi_leaudit.domian.Dto.contractTemplateDto import (
ContractTemplateCreateDTO,
ContractTemplateListQueryDTO,
ContractTemplateSearchQueryDTO,
)
from fastapi_modules.fastapi_leaudit.domian.vo.contractTemplateVo import (
ContractTemplateCategoryVO,
ContractTemplateCreateVO,
ContractTemplateDetailVO,
ContractTemplateListItemVO,
ContractTemplatePageVO,
ContractTemplateSearchCategoryVO,
ContractTemplateSearchResultVO,
)
from fastapi_modules.fastapi_leaudit.services.contractTemplateService import IContractTemplateService
from fastapi_modules.fastapi_leaudit.services.ossService import IOssService
from fastapi_modules.fastapi_leaudit.services.impl.ossServiceImpl import OssServiceImpl
_ALLOWED_SORT_FIELDS = {
"id": "t.id",
"title": "t.title",
"created_at": "t.created_at",
"updated_at": "t.updated_at",
}
class ContractTemplateServiceImpl(IContractTemplateService):
"""合同模板服务实现。"""
def __init__(self, OssService: IOssService | None = None) -> None:
self.OssService = OssService or OssServiceImpl()
async def ListCategories(self, IncludeDisabled: bool, WithTemplateCount: bool) -> list[ContractTemplateCategoryVO]:
count_select = "COUNT(t.id)::int AS template_count" if WithTemplateCount else "0::int AS template_count"
filters = ["c.deleted_at IS NULL"]
if not IncludeDisabled:
filters.append("1=1")
sql = text(
f"""
SELECT
c.id,
c.name,
c.icon,
c.description,
COALESCE(c.sort_order, 0) AS sort_order,
{count_select},
TRUE AS is_enabled
FROM contract_categories c
LEFT JOIN contract_templates t
ON t.category_id = c.id
AND t.deleted_at IS NULL
WHERE {' AND '.join(filters)}
GROUP BY c.id, c.name, c.icon, c.description, c.sort_order
ORDER BY COALESCE(c.sort_order, 0) ASC, c.name ASC
"""
)
async with GetAsyncSession() as session:
await self._ensureContractTemplateSchema(session)
rows = (await session.execute(sql)).mappings().all()
return [self._to_category_vo(row) for row in rows]
async def ListTemplates(self, Query: ContractTemplateListQueryDTO, CurrentUserId: int) -> ContractTemplatePageVO:
async with GetAsyncSession() as session:
await self._ensureContractTemplateSchema(session)
currentUser = await self._getCurrentUserContext(CurrentUserId, session)
where_clause, params, needs_category_name_filter = self._build_template_filters(
keyword=Query.keyword,
category_id=Query.category_id,
category_name=Query.category_name,
region=Query.region,
file_format=Query.file_format,
is_featured=Query.is_featured,
currentUser=currentUser,
)
order_sql = self._build_order_clause(Query.sort_by, Query.sort_order, default_field="updated_at", default_order="desc")
offset = max(Query.page - 1, 0) * Query.page_size
params.update({"limit": Query.page_size, "offset": offset})
from_sql = self._build_template_from_sql(needs_category_name_filter)
count_sql = text(
f"""
SELECT COUNT(*)
{from_sql}
WHERE {where_clause}
"""
)
list_sql = text(
f"""
SELECT
t.id,
t.template_code,
t.title,
t.category_id,
c.name AS category_name,
c.icon AS category_icon,
c.description AS category_description,
t.region,
t.description,
t.file_path,
t.pdf_file_path,
t.file_format,
t.original_file_name,
t.mime_type,
COALESCE(t.file_size, 0) AS file_size,
t.pdf_file_size,
COALESCE(t.is_featured, FALSE) AS is_featured,
t.created_by,
t.updated_by,
t.created_at,
t.updated_at
{from_sql}
WHERE {where_clause}
ORDER BY {order_sql}
LIMIT :limit OFFSET :offset
"""
)
count_sql, list_sql = self._bind_expanding(count_sql, list_sql, params)
total = int((await session.execute(count_sql, params)).scalar_one())
rows = (await session.execute(list_sql, params)).mappings().all()
return ContractTemplatePageVO(
total=total,
page=Query.page,
page_size=Query.page_size,
total_pages=max((total + Query.page_size - 1) // Query.page_size, 1) if total else 0,
templates=[self._to_list_item_vo(row) for row in rows],
)
async def SearchTemplates(self, Query: ContractTemplateSearchQueryDTO, CurrentUserId: int) -> ContractTemplateSearchResultVO:
list_query = ContractTemplateListQueryDTO(
keyword=Query.q,
category_id=Query.category_id,
category_name=Query.category_name,
region=Query.region,
page=Query.page,
page_size=Query.page_size,
sort_by=Query.sort_by,
sort_order=Query.sort_order,
)
page_result = await self.ListTemplates(list_query, CurrentUserId)
category_stats = await self._load_search_category_stats(Query.q, Query.region, CurrentUserId)
return ContractTemplateSearchResultVO(
total=page_result.total,
page=page_result.page,
page_size=page_result.page_size,
total_pages=page_result.total_pages,
templates=page_result.templates,
category_stats=category_stats,
)
async def GetTemplateDetail(self, TemplateId: int, CurrentUserId: int) -> ContractTemplateDetailVO | None:
async with GetAsyncSession() as session:
await self._ensureContractTemplateSchema(session)
currentUser = await self._getCurrentUserContext(CurrentUserId, session)
params: dict[str, Any] = {"template_id": TemplateId}
scope_filters = self._build_template_scope_filters(currentUser, params, requestedRegion=None)
sql = text(
f"""
SELECT
t.id,
t.template_code,
t.title,
t.category_id,
c.name AS category_name,
c.icon AS category_icon,
c.description AS category_description,
t.region,
t.description,
t.file_path,
t.pdf_file_path,
t.file_format,
t.original_file_name,
t.mime_type,
COALESCE(t.file_size, 0) AS file_size,
t.pdf_file_size,
COALESCE(t.is_featured, FALSE) AS is_featured,
t.created_by,
t.updated_by,
t.created_at,
t.updated_at
FROM contract_templates t
LEFT JOIN contract_categories c ON c.id = t.category_id
WHERE t.id = :template_id
AND t.deleted_at IS NULL
AND c.deleted_at IS NULL
AND {' AND '.join(scope_filters)}
LIMIT 1
"""
)
row = (await session.execute(sql, params)).mappings().first()
if not row:
return None
return self._to_detail_vo(row)
async def CreateTemplate(
self,
Body: ContractTemplateCreateDTO,
File: UploadFile,
PdfFile: UploadFile | None,
CurrentUserId: int,
) -> ContractTemplateCreateVO:
if File is None or not File.filename:
raise LeauditException(StatusCodeEnum.HTTP_400_BAD_REQUEST, "模板主文件不能为空")
fileContent = await File.read()
if not fileContent:
raise LeauditException(StatusCodeEnum.HTTP_400_BAD_REQUEST, "模板主文件内容不能为空")
normalizedCode = (Body.template_code or "").strip()
normalizedTitle = (Body.title or "").strip()
if not normalizedCode:
raise LeauditException(StatusCodeEnum.HTTP_400_BAD_REQUEST, "模板编码不能为空")
if not normalizedTitle:
raise LeauditException(StatusCodeEnum.HTTP_400_BAD_REQUEST, "模板标题不能为空")
fileExt = Path(File.filename).suffix.lstrip(".").lower()
if fileExt not in {"doc", "docx", "pdf"}:
raise LeauditException(StatusCodeEnum.HTTP_400_BAD_REQUEST, "当前仅支持上传 DOC、DOCX、PDF 模板")
mimeType = File.content_type or mimetypes.guess_type(File.filename)[0] or "application/octet-stream"
pdfContent: bytes | None = None
pdfMimeType: str | None = None
pdfFileName: str | None = None
if PdfFile and PdfFile.filename:
pdfContent = await PdfFile.read()
if not pdfContent:
raise LeauditException(StatusCodeEnum.HTTP_400_BAD_REQUEST, "预览 PDF 文件内容不能为空")
pdfExt = Path(PdfFile.filename).suffix.lstrip(".").lower()
if pdfExt != "pdf":
raise LeauditException(StatusCodeEnum.HTTP_400_BAD_REQUEST, "预览文件仅支持 PDF")
pdfMimeType = PdfFile.content_type or "application/pdf"
pdfFileName = PdfFile.filename
async with GetAsyncSession() as session:
await self._ensureContractTemplateSchema(session)
currentUser = await self._getCurrentUserContext(CurrentUserId, session)
resolvedRegion = self._resolve_upload_region(currentUser, Body.region)
categoryRow = (
await session.execute(
text(
"""
SELECT id, name
FROM contract_categories
WHERE id = :category_id
AND deleted_at IS NULL
LIMIT 1
"""
),
{"category_id": Body.category_id},
)
).mappings().first()
if not categoryRow:
raise LeauditException(StatusCodeEnum.HTTP_404_NOT_FOUND, "合同模板分类不存在")
duplicateRow = (
await session.execute(
text(
"""
SELECT id
FROM contract_templates
WHERE region = :region
AND template_code = :template_code
AND deleted_at IS NULL
LIMIT 1
"""
),
{"region": resolvedRegion, "template_code": normalizedCode},
)
).mappings().first()
if duplicateRow:
raise LeauditException(StatusCodeEnum.HTTP_409_CONFLICT, f"当前地区已存在模板编码 {normalizedCode}")
categoryName = str(categoryRow["name"] or "未分类")
objectKey = OssPathUtils.BuildContractTemplateKey(
Region=resolvedRegion,
CategoryName=categoryName,
TemplateCode=normalizedCode,
FileRole="source",
FileName=File.filename,
)
filePath = await self.OssService.UploadBytes(
ObjectKey=objectKey,
Content=fileContent,
ContentType=mimeType,
)
pdfPath: str | None = None
if pdfContent is not None and pdfFileName:
pdfObjectKey = OssPathUtils.BuildContractTemplateKey(
Region=resolvedRegion,
CategoryName=categoryName,
TemplateCode=normalizedCode,
FileRole="preview",
FileName=pdfFileName,
)
pdfPath = await self.OssService.UploadBytes(
ObjectKey=pdfObjectKey,
Content=pdfContent,
ContentType=pdfMimeType or "application/pdf",
)
createdRow = (
await session.execute(
text(
"""
INSERT INTO contract_templates (
template_code,
title,
category_id,
region,
description,
file_path,
pdf_file_path,
file_format,
original_file_name,
mime_type,
file_size,
pdf_file_size,
is_featured,
created_by,
updated_by,
created_at,
updated_at
) VALUES (
:template_code,
:title,
:category_id,
:region,
:description,
:file_path,
:pdf_file_path,
:file_format,
:original_file_name,
:mime_type,
:file_size,
:pdf_file_size,
:is_featured,
:created_by,
:updated_by,
NOW(),
NOW()
)
RETURNING id
"""
),
{
"template_code": normalizedCode,
"title": normalizedTitle,
"category_id": Body.category_id,
"region": resolvedRegion,
"description": (Body.description or "").strip() or None,
"file_path": filePath,
"pdf_file_path": pdfPath,
"file_format": fileExt,
"original_file_name": File.filename,
"mime_type": mimeType,
"file_size": len(fileContent),
"pdf_file_size": len(pdfContent) if pdfContent is not None else None,
"is_featured": Body.is_featured,
"created_by": CurrentUserId,
"updated_by": CurrentUserId,
},
)
).mappings().first()
await session.commit()
if not createdRow:
raise LeauditException(StatusCodeEnum.HTTP_500_INTERNAL_SERVER_ERROR, "合同模板创建失败")
detail = await self.GetTemplateDetail(int(createdRow["id"]), CurrentUserId)
if not detail:
raise LeauditException(StatusCodeEnum.HTTP_500_INTERNAL_SERVER_ERROR, "合同模板创建成功但详情读取失败")
return ContractTemplateCreateVO(**detail.model_dump())
async def DeleteTemplate(self, TemplateId: int, CurrentUserId: int) -> None:
async with GetAsyncSession() as session:
await self._ensureContractTemplateSchema(session)
currentUser = await self._getCurrentUserContext(CurrentUserId, session)
params: dict[str, Any] = {"template_id": TemplateId}
scope_filters = self._build_template_scope_filters(currentUser, params, requestedRegion=None, writable=True)
row = (
await session.execute(
text(
f"""
SELECT id
FROM contract_templates t
WHERE t.id = :template_id
AND t.deleted_at IS NULL
AND {' AND '.join(scope_filters)}
LIMIT 1
"""
),
params,
)
).mappings().first()
if not row:
raise LeauditException(StatusCodeEnum.HTTP_404_NOT_FOUND, "合同模板不存在或无权删除")
await session.execute(
text(
"""
UPDATE contract_templates
SET deleted_at = NOW(),
updated_at = NOW(),
updated_by = :updated_by
WHERE id = :template_id
AND deleted_at IS NULL
"""
),
{"template_id": TemplateId, "updated_by": CurrentUserId},
)
await session.commit()
async def _load_search_category_stats(
self,
keyword: str,
requestedRegion: str | None,
CurrentUserId: int,
) -> list[ContractTemplateSearchCategoryVO]:
clean_keyword = (keyword or "").strip()
if not clean_keyword:
return []
async with GetAsyncSession() as session:
await self._ensureContractTemplateSchema(session)
currentUser = await self._getCurrentUserContext(CurrentUserId, session)
params: dict[str, Any] = {"keyword": f"%{clean_keyword}%"}
scope_filters = self._build_template_scope_filters(currentUser, params, requestedRegion=requestedRegion)
filters = [
"c.deleted_at IS NULL",
"t.deleted_at IS NULL",
"("
"t.title ILIKE :keyword "
"OR COALESCE(t.description, '') ILIKE :keyword "
"OR COALESCE(t.template_code, '') ILIKE :keyword "
"OR COALESCE(c.name, '') ILIKE :keyword"
")",
*scope_filters,
]
sql = text(
f"""
SELECT
c.id,
c.name,
COUNT(t.id)::int AS search_count
FROM contract_categories c
LEFT JOIN contract_templates t ON t.category_id = c.id
WHERE {' AND '.join(filters)}
GROUP BY c.id, c.name
ORDER BY c.name ASC
"""
)
rows = (await session.execute(sql, params)).mappings().all()
return [
ContractTemplateSearchCategoryVO(
id=int(row["id"]),
name=str(row["name"] or ""),
search_count=int(row["search_count"] or 0),
)
for row in rows
if row.get("id") is not None
]
def _build_template_filters(
self,
keyword: str | None,
category_id: int | None,
category_name: str | None,
region: str | None,
file_format: str | None,
is_featured: bool | None,
currentUser: dict[str, Any],
) -> tuple[str, dict[str, Any], bool]:
filters = ["t.deleted_at IS NULL", "c.deleted_at IS NULL"]
params: dict[str, Any] = {}
needs_category_name_filter = False
filters.extend(self._build_template_scope_filters(currentUser, params, region))
if category_id is not None:
filters.append("t.category_id = :category_id")
params["category_id"] = category_id
elif category_name:
filters.append("c.name = :category_name")
params["category_name"] = category_name.strip()
needs_category_name_filter = True
if file_format:
filters.append("LOWER(t.file_format) = :file_format")
params["file_format"] = file_format.strip().lower()
if is_featured is not None:
filters.append("COALESCE(t.is_featured, FALSE) = :is_featured")
params["is_featured"] = is_featured
clean_keyword = (keyword or "").strip()
if clean_keyword:
filters.append(
"("
"t.title ILIKE :keyword "
"OR COALESCE(t.description, '') ILIKE :keyword "
"OR COALESCE(t.template_code, '') ILIKE :keyword "
"OR COALESCE(c.name, '') ILIKE :keyword"
")"
)
params["keyword"] = f"%{clean_keyword}%"
needs_category_name_filter = True
return " AND ".join(filters), params, needs_category_name_filter
def _build_template_from_sql(self, needs_category_name_filter: bool) -> str:
_ = needs_category_name_filter
return """
FROM contract_templates t
LEFT JOIN contract_categories c ON c.id = t.category_id
"""
def _build_order_clause(self, sort_by: str | None, sort_order: str | None, default_field: str, default_order: str) -> str:
field = _ALLOWED_SORT_FIELDS.get(str(sort_by or "").strip().lower(), _ALLOWED_SORT_FIELDS[default_field])
direction = "DESC" if str(sort_order or default_order).strip().lower() == "desc" else "ASC"
return f"{field} {direction}, t.id ASC"
def _bind_expanding(self, *sql_objects_and_params: Any):
sql_objects = list(sql_objects_and_params[:-1])
params = sql_objects_and_params[-1]
if "visible_regions" in params:
sql_objects = [sql.bindparams(bindparam("visible_regions", expanding=True)) for sql in sql_objects]
return tuple(sql_objects)
def _to_category_vo(self, row: Any) -> ContractTemplateCategoryVO:
return ContractTemplateCategoryVO(
id=int(row["id"]),
name=str(row["name"] or ""),
icon=row.get("icon"),
description=row.get("description"),
sort_order=int(row.get("sort_order") or 0),
template_count=int(row.get("template_count") or 0),
is_enabled=bool(row.get("is_enabled", True)),
)
def _to_list_item_vo(self, row: Any) -> ContractTemplateListItemVO:
return ContractTemplateListItemVO(
id=int(row["id"]),
template_code=str(row.get("template_code") or ""),
title=str(row.get("title") or ""),
category_id=int(row.get("category_id") or 0),
category_name=row.get("category_name"),
category_icon=row.get("category_icon"),
description=row.get("description"),
region=str(row.get("region") or "省级"),
file_path=row.get("file_path"),
pdf_file_path=row.get("pdf_file_path"),
file_format=str(row.get("file_format") or ""),
original_file_name=row.get("original_file_name"),
mime_type=row.get("mime_type"),
file_size=int(row.get("file_size") or 0),
pdf_file_size=int(row["pdf_file_size"]) if row.get("pdf_file_size") is not None else None,
is_featured=bool(row.get("is_featured", False)),
created_by=int(row["created_by"]) if row.get("created_by") is not None else None,
updated_by=int(row["updated_by"]) if row.get("updated_by") is not None else None,
created_at=self._stringify_time(row.get("created_at")),
updated_at=self._stringify_time(row.get("updated_at")),
)
def _to_detail_vo(self, row: Any) -> ContractTemplateDetailVO:
base = self._to_list_item_vo(row)
return ContractTemplateDetailVO(
**base.model_dump(),
category_description=row.get("category_description"),
placeholder_schema=None,
)
def _stringify_time(self, value: Any) -> str | None:
if value is None:
return None
return str(value)
def _build_template_scope_filters(
self,
currentUser: dict[str, Any],
params: dict[str, Any],
requestedRegion: str | None,
writable: bool = False,
) -> list[str]:
requested = (requestedRegion or "").strip()
area = str(currentUser["area"] or "").strip()
if currentUser["is_global"]:
if requested:
params["requested_region"] = requested
return ["t.region = :requested_region"]
return ["1=1"]
if writable:
if not area:
return ["1=0"]
if requested and requested != area:
return ["1=0"]
params["scope_region"] = area
return ["t.region = :scope_region"]
if currentUser["can_manage"]:
if not area:
return ["1=0"]
if requested:
if requested == "省级":
params["requested_region"] = requested
return ["t.region = :requested_region"]
if requested != area:
return ["1=0"]
params["requested_region"] = requested
return ["t.region = :requested_region"]
params["visible_regions"] = ["省级", area]
return ["t.region IN :visible_regions"]
if requested:
if requested == "省级":
params["requested_region"] = requested
return ["t.region = :requested_region"]
if area and requested == area:
params["requested_region"] = requested
return ["t.region = :requested_region"]
return ["1=0"]
if area:
params["visible_regions"] = ["省级", area]
return ["t.region IN :visible_regions"]
params["requested_region"] = "省级"
return ["t.region = :requested_region"]
async def _getCurrentUserContext(self, CurrentUserId: int, session=None) -> dict[str, Any]:
own_session = False
if session is None:
own_session = True
session_cm = GetAsyncSession()
session = await session_cm.__aenter__()
try:
row = (
await session.execute(
text(
"""
SELECT
u.id,
COALESCE(u.area, '') AS area,
COALESCE(bool_or(r.role_key IN ('super_admin', 'provincial_admin')), FALSE) AS is_global,
COALESCE(bool_or(r.role_key IN ('super_admin', 'provincial_admin', 'admin')), FALSE) AS can_manage
FROM sso_users u
LEFT JOIN user_role ur ON ur.user_id = u.id
LEFT JOIN roles r ON r.id = ur.role_id
WHERE u.id = :user_id
GROUP BY u.id, u.area
"""
),
{"user_id": CurrentUserId},
)
).mappings().first()
if not row:
raise LeauditException(StatusCodeEnum.HTTP_404_NOT_FOUND, "当前用户不存在")
return {
"id": int(row["id"]),
"area": str(row["area"] or ""),
"is_global": bool(row["is_global"]),
"can_manage": bool(row["can_manage"]),
}
finally:
if own_session:
await session_cm.__aexit__(None, None, None)
def _resolve_upload_region(self, currentUser: dict[str, Any], requestedRegion: str | None) -> str:
area = str(currentUser["area"] or "").strip()
if currentUser["can_manage"]:
if not area:
raise LeauditException(StatusCodeEnum.HTTP_403_FORBIDDEN, "当前管理员账号未配置地区,无法上传合同模板")
return area
raise LeauditException(StatusCodeEnum.HTTP_403_FORBIDDEN, "当前用户没有上传合同模板权限")
async def _ensureContractTemplateSchema(self, session) -> None:
statements = [
"""
ALTER TABLE contract_categories
ADD COLUMN IF NOT EXISTS created_by BIGINT
""",
"""
ALTER TABLE contract_categories
ADD COLUMN IF NOT EXISTS updated_by BIGINT
""",
"""
ALTER TABLE contract_categories
ADD COLUMN IF NOT EXISTS deleted_at TIMESTAMPTZ
""",
"""
ALTER TABLE contract_templates
ADD COLUMN IF NOT EXISTS region VARCHAR(50) NOT NULL DEFAULT '省级'
""",
"""
ALTER TABLE contract_templates
ADD COLUMN IF NOT EXISTS pdf_file_path VARCHAR(500)
""",
"""
ALTER TABLE contract_templates
ADD COLUMN IF NOT EXISTS original_file_name VARCHAR(500) NOT NULL DEFAULT ''
""",
"""
ALTER TABLE contract_templates
ADD COLUMN IF NOT EXISTS mime_type VARCHAR(200)
""",
"""
ALTER TABLE contract_templates
ADD COLUMN IF NOT EXISTS file_size BIGINT NOT NULL DEFAULT 0
""",
"""
ALTER TABLE contract_templates
ADD COLUMN IF NOT EXISTS pdf_file_size BIGINT
""",
"""
ALTER TABLE contract_templates
ADD COLUMN IF NOT EXISTS created_by BIGINT
""",
"""
ALTER TABLE contract_templates
ADD COLUMN IF NOT EXISTS updated_by BIGINT
""",
"""
ALTER TABLE contract_templates
ADD COLUMN IF NOT EXISTS deleted_at TIMESTAMPTZ
""",
]
for statement in statements:
await session.execute(text(statement))
await session.execute(
text(
"""
UPDATE contract_templates
SET region = '省级'
WHERE region IS NULL OR BTRIM(region) = ''
"""
)
)
await session.execute(
text(
"""
UPDATE contract_templates
SET original_file_name = COALESCE(NULLIF(original_file_name, ''), title || CASE
WHEN file_format IS NOT NULL AND BTRIM(file_format) <> '' THEN '.' || LOWER(file_format)
ELSE ''
END)
WHERE original_file_name IS NULL OR BTRIM(original_file_name) = ''
"""
)
)
await session.commit()