Files
leaudit-platform-backend/fastapi_modules/fastapi_leaudit/services/impl/contractTemplateServiceImpl.py
T

318 lines
12 KiB
Python

from __future__ import annotations
from typing import Any
from sqlalchemy import bindparam, text
from fastapi_common.fastapi_common_sqlalchemy.database import GetAsyncSession
from fastapi_modules.fastapi_leaudit.domian.Dto.contractTemplateDto import (
ContractTemplateListQueryDTO,
ContractTemplateSearchQueryDTO,
)
from fastapi_modules.fastapi_leaudit.domian.vo.contractTemplateVo import (
ContractTemplateCategoryVO,
ContractTemplateDetailVO,
ContractTemplateListItemVO,
ContractTemplatePageVO,
ContractTemplateSearchCategoryVO,
ContractTemplateSearchResultVO,
)
from fastapi_modules.fastapi_leaudit.services.contractTemplateService import IContractTemplateService
_ALLOWED_SORT_FIELDS = {
"id": "t.id",
"title": "t.title",
"created_at": "t.created_at",
"updated_at": "t.updated_at",
}
class ContractTemplateServiceImpl(IContractTemplateService):
"""合同模板服务实现。"""
async def ListCategories(self, IncludeDisabled: bool, WithTemplateCount: bool) -> list[ContractTemplateCategoryVO]:
count_select = "COUNT(t.id)::int AS template_count" if WithTemplateCount else "0::int AS template_count"
sql = text(
f"""
SELECT
c.id,
c.name,
c.icon,
c.description,
COALESCE(c.sort_order, 0) AS sort_order,
{count_select},
TRUE AS is_enabled
FROM contract_categories c
LEFT JOIN contract_templates t
ON t.category_id = c.id
WHERE 1=1
GROUP BY c.id, c.name, c.icon, c.description, c.sort_order
ORDER BY COALESCE(c.sort_order, 0) ASC, c.name ASC
"""
)
async with GetAsyncSession() as session:
rows = (await session.execute(sql)).mappings().all()
return [self._to_category_vo(row) for row in rows]
async def ListTemplates(self, Query: ContractTemplateListQueryDTO) -> ContractTemplatePageVO:
where_clause, params, needs_category_name_filter = self._build_template_filters(
keyword=Query.keyword,
category_id=Query.category_id,
category_name=Query.category_name,
file_format=Query.file_format,
is_featured=Query.is_featured,
)
order_sql = self._build_order_clause(Query.sort_by, Query.sort_order, default_field="updated_at", default_order="desc")
offset = max(Query.page - 1, 0) * Query.page_size
params.update({"limit": Query.page_size, "offset": offset})
from_sql = self._build_template_from_sql(needs_category_name_filter)
count_sql = text(
f"""
SELECT COUNT(*)
{from_sql}
WHERE {where_clause}
"""
)
list_sql = text(
f"""
SELECT
t.id,
t.template_code,
t.title,
t.category_id,
c.name AS category_name,
c.icon AS category_icon,
c.description AS category_description,
t.description,
t.file_path,
t.pdf_file_path,
t.file_format,
COALESCE(t.is_featured, FALSE) AS is_featured,
t.created_at,
t.updated_at
{from_sql}
WHERE {where_clause}
ORDER BY {order_sql}
LIMIT :limit OFFSET :offset
"""
)
count_sql, list_sql = self._bind_expanding(count_sql, list_sql, params)
async with GetAsyncSession() as session:
total = int((await session.execute(count_sql, params)).scalar_one())
rows = (await session.execute(list_sql, params)).mappings().all()
return ContractTemplatePageVO(
total=total,
page=Query.page,
page_size=Query.page_size,
total_pages=max((total + Query.page_size - 1) // Query.page_size, 1) if total else 0,
templates=[self._to_list_item_vo(row) for row in rows],
)
async def SearchTemplates(self, Query: ContractTemplateSearchQueryDTO) -> ContractTemplateSearchResultVO:
list_query = ContractTemplateListQueryDTO(
keyword=Query.q,
category_id=Query.category_id,
category_name=Query.category_name,
page=Query.page,
page_size=Query.page_size,
sort_by=Query.sort_by,
sort_order=Query.sort_order,
)
page_result = await self.ListTemplates(list_query)
category_stats = await self._load_search_category_stats(Query.q)
return ContractTemplateSearchResultVO(
total=page_result.total,
page=page_result.page,
page_size=page_result.page_size,
total_pages=page_result.total_pages,
templates=page_result.templates,
category_stats=category_stats,
)
async def GetTemplateDetail(self, TemplateId: int) -> ContractTemplateDetailVO | None:
sql = text(
"""
SELECT
t.id,
t.template_code,
t.title,
t.category_id,
c.name AS category_name,
c.icon AS category_icon,
c.description AS category_description,
t.description,
t.file_path,
t.pdf_file_path,
t.file_format,
COALESCE(t.is_featured, FALSE) AS is_featured,
t.created_at,
t.updated_at
FROM contract_templates t
LEFT JOIN contract_categories c ON c.id = t.category_id
WHERE t.id = :template_id
LIMIT 1
"""
)
async with GetAsyncSession() as session:
row = (await session.execute(sql, {"template_id": TemplateId})).mappings().first()
if not row:
return None
return self._to_detail_vo(row)
async def _load_search_category_stats(
self,
keyword: str,
) -> list[ContractTemplateSearchCategoryVO]:
clean_keyword = (keyword or "").strip()
if not clean_keyword:
return []
filters = [
"("
"t.title ILIKE :keyword "
"OR COALESCE(t.description, '') ILIKE :keyword "
"OR COALESCE(t.template_code, '') ILIKE :keyword "
"OR COALESCE(c.name, '') ILIKE :keyword"
")"
]
params: dict[str, Any] = {"keyword": f"%{clean_keyword}%"}
sql = text(
f"""
SELECT
c.id,
c.name,
COUNT(t.id)::int AS search_count
FROM contract_categories c
LEFT JOIN contract_templates t ON t.category_id = c.id
WHERE {' AND '.join(filters)}
GROUP BY c.id, c.name
ORDER BY c.name ASC
"""
)
async with GetAsyncSession() as session:
rows = (await session.execute(sql, params)).mappings().all()
return [
ContractTemplateSearchCategoryVO(
id=int(row["id"]),
name=str(row["name"] or ""),
search_count=int(row["search_count"] or 0),
)
for row in rows
if row.get("id") is not None
]
def _build_template_filters(
self,
keyword: str | None,
category_id: int | None,
category_name: str | None,
file_format: str | None,
is_featured: bool | None,
) -> tuple[str, dict[str, Any], bool]:
filters = ["1=1"]
params: dict[str, Any] = {}
needs_category_name_filter = False
if category_id is not None:
filters.append("t.category_id = :category_id")
params["category_id"] = category_id
elif category_name:
filters.append("c.name = :category_name")
params["category_name"] = category_name.strip()
needs_category_name_filter = True
if file_format:
filters.append("t.file_format = :file_format")
params["file_format"] = file_format.strip().lower()
if is_featured is not None:
filters.append("COALESCE(t.is_featured, FALSE) = :is_featured")
params["is_featured"] = is_featured
clean_keyword = (keyword or "").strip()
if clean_keyword:
filters.append(
"("
"t.title ILIKE :keyword "
"OR COALESCE(t.description, '') ILIKE :keyword "
"OR COALESCE(t.template_code, '') ILIKE :keyword "
"OR COALESCE(c.name, '') ILIKE :keyword"
")"
)
params["keyword"] = f"%{clean_keyword}%"
needs_category_name_filter = True
return " AND ".join(filters), params, needs_category_name_filter
def _build_template_from_sql(self, needs_category_name_filter: bool) -> str:
_ = needs_category_name_filter
return """
FROM contract_templates t
LEFT JOIN contract_categories c ON c.id = t.category_id
"""
def _build_order_clause(self, sort_by: str | None, sort_order: str | None, default_field: str, default_order: str) -> str:
field = _ALLOWED_SORT_FIELDS.get(str(sort_by or "").strip().lower(), _ALLOWED_SORT_FIELDS[default_field])
direction = "DESC" if str(sort_order or default_order).strip().lower() == "desc" else "ASC"
return f"{field} {direction}, t.id ASC"
def _bind_expanding(self, *sql_objects_and_params: Any):
sql_objects = list(sql_objects_and_params[:-1])
params = sql_objects_and_params[-1]
if "category_ids" in params:
sql_objects = [sql.bindparams(bindparam("category_ids", expanding=True)) for sql in sql_objects]
return tuple(sql_objects)
def _to_category_vo(self, row: Any) -> ContractTemplateCategoryVO:
return ContractTemplateCategoryVO(
id=int(row["id"]),
name=str(row["name"] or ""),
icon=row.get("icon"),
description=row.get("description"),
sort_order=int(row.get("sort_order") or 0),
template_count=int(row.get("template_count") or 0),
is_enabled=bool(row.get("is_enabled", True)),
)
def _to_list_item_vo(self, row: Any) -> ContractTemplateListItemVO:
return ContractTemplateListItemVO(
id=int(row["id"]),
template_code=str(row.get("template_code") or ""),
title=str(row.get("title") or ""),
category_id=int(row.get("category_id") or 0),
category_name=row.get("category_name"),
category_icon=row.get("category_icon"),
description=row.get("description"),
file_path=row.get("file_path"),
pdf_file_path=row.get("pdf_file_path"),
file_format=str(row.get("file_format") or ""),
is_featured=bool(row.get("is_featured", False)),
created_at=self._stringify_time(row.get("created_at")),
updated_at=self._stringify_time(row.get("updated_at")),
)
def _to_detail_vo(self, row: Any) -> ContractTemplateDetailVO:
base = self._to_list_item_vo(row)
return ContractTemplateDetailVO(
**base.model_dump(),
category_description=row.get("category_description"),
placeholder_schema=None,
)
def _stringify_time(self, value: Any) -> str | None:
if value is None:
return None
return str(value)