1142 lines
48 KiB
Python
1142 lines
48 KiB
Python
from __future__ import annotations
|
|
|
|
import json
|
|
import os
|
|
from datetime import datetime
|
|
from decimal import Decimal
|
|
from typing import Any
|
|
from urllib.parse import quote_plus
|
|
|
|
from sqlalchemy import bindparam, text
|
|
from sqlalchemy.ext.asyncio import async_sessionmaker, create_async_engine
|
|
|
|
from fastapi_admin.config import DB_HOST, DB_PASSWORD, DB_PORT, DB_USER
|
|
from fastapi_common.fastapi_common_sqlalchemy.database import GetAsyncSession
|
|
from fastapi_common.fastapi_common_web.domain.responses import StatusCodeEnum
|
|
from fastapi_common.fastapi_common_web.exception.LeauditException import LeauditException
|
|
from fastapi_modules.fastapi_leaudit.domian.Dto.evaluationPointDto import (
|
|
EvaluationPointCreateDTO,
|
|
EvaluationPointUpdateDTO,
|
|
)
|
|
from fastapi_modules.fastapi_leaudit.domian.vo.evaluationPointVo import (
|
|
AttributeTypeListVO,
|
|
AttributeTypeVO,
|
|
EvaluationPointDeleteVO,
|
|
EvaluationPointListVO,
|
|
EvaluationPointVO,
|
|
)
|
|
from fastapi_modules.fastapi_leaudit.services.evaluationPointService import IEvaluationPointService
|
|
from fastapi_modules.fastapi_leaudit.services.impl.permissionServiceImpl import PermissionServiceImpl
|
|
from fastapi_modules.fastapi_leaudit.services.impl.ssoUserCompat import SsoUserCompat
|
|
from fastapi_modules.fastapi_leaudit.services.impl.tenantResolver import TenantResolver
|
|
|
|
_LEGACY_DB_NAME = os.getenv("LEGACY_RULE_DB_NAME", "docauditai")
|
|
_LEGACY_DB_URL = (
|
|
f"postgresql+asyncpg://{quote_plus(str(DB_USER))}:{quote_plus(str(DB_PASSWORD))}"
|
|
f"@{DB_HOST}:{DB_PORT}/{quote_plus(_LEGACY_DB_NAME)}"
|
|
)
|
|
_LEGACY_ENGINE = create_async_engine(_LEGACY_DB_URL, pool_pre_ping=True)
|
|
_LegacySession = async_sessionmaker(_LEGACY_ENGINE, expire_on_commit=False)
|
|
|
|
|
|
class EvaluationPointServiceImpl(IEvaluationPointService):
|
|
"""评查点服务实现。"""
|
|
|
|
_WRITE_PERMISSION_KEYS: tuple[str, ...] = (
|
|
"evaluation_point:create:write",
|
|
"evaluation_point:update:write",
|
|
"evaluation_point:delete:delete",
|
|
)
|
|
|
|
def __init__(self) -> None:
|
|
self.PermissionService = PermissionServiceImpl()
|
|
self.TenantResolver = TenantResolver()
|
|
self._legacy_columns_cache: dict[str, set[str]] = {}
|
|
|
|
async def ListPoints(
|
|
self,
|
|
CurrentUserId: int,
|
|
UserArea: str | None,
|
|
UserRole: str | None,
|
|
CurrentTenantCode: str | None,
|
|
CurrentTenantName: str | None,
|
|
Name: str | None,
|
|
Code: str | None,
|
|
Risk: str | None,
|
|
IsEnabled: bool | None,
|
|
GroupPid: int | None,
|
|
GroupId: int | None,
|
|
DocumentAttributeType: str | None,
|
|
FilterArea: str | None,
|
|
FilterTenantCode: str | None,
|
|
FilterTenantName: str | None,
|
|
Page: int,
|
|
PageSize: int,
|
|
) -> EvaluationPointListVO:
|
|
offset = max(Page - 1, 0) * PageSize
|
|
current_user = await self._get_current_user_context(
|
|
CurrentUserId=CurrentUserId,
|
|
UserArea=UserArea,
|
|
UserRole=UserRole,
|
|
TenantCode=CurrentTenantCode,
|
|
TenantName=CurrentTenantName,
|
|
)
|
|
requested_scope = await self._resolve_requested_scope(
|
|
Area=FilterArea,
|
|
TenantCode=FilterTenantCode,
|
|
TenantName=FilterTenantName,
|
|
Source="evaluation_point_list",
|
|
)
|
|
where_clause, params = self._build_list_filters(
|
|
Name=Name,
|
|
Code=Code,
|
|
Risk=Risk,
|
|
IsEnabled=IsEnabled,
|
|
GroupPid=GroupPid,
|
|
GroupId=GroupId,
|
|
DocumentAttributeType=DocumentAttributeType,
|
|
RequestedScope=requested_scope,
|
|
CurrentUser=current_user,
|
|
EvaluationPointColumns=await self._load_legacy_table_columns("evaluation_points"),
|
|
)
|
|
params.update({"limit": PageSize, "offset": offset})
|
|
|
|
async with _LegacySession() as session:
|
|
evaluation_point_columns = await self._load_legacy_table_columns("evaluation_points", Session=session)
|
|
tenant_code_select = self._evaluation_point_tenant_code_select(evaluation_point_columns, alias="ep")
|
|
tenant_name_select = self._evaluation_point_tenant_name_select(evaluation_point_columns, alias="ep")
|
|
count_sql = text(
|
|
f"""
|
|
SELECT COUNT(*)
|
|
FROM evaluation_points ep
|
|
LEFT JOIN evaluation_point_groups child_group ON child_group.id = ep.evaluation_point_groups_id
|
|
LEFT JOIN evaluation_point_groups parent_group ON parent_group.id = COALESCE(ep.evaluation_point_groups_pid, child_group.pid)
|
|
WHERE {where_clause}
|
|
"""
|
|
)
|
|
list_sql = text(
|
|
f"""
|
|
SELECT
|
|
ep.id,
|
|
ep.code,
|
|
ep.name,
|
|
ep.evaluation_point_groups_id,
|
|
ep.evaluation_point_groups_pid,
|
|
ep.risk,
|
|
ep.description,
|
|
ep.is_enabled,
|
|
ep.document_attribute_type,
|
|
ep.references_laws,
|
|
ep.extraction_config,
|
|
ep.evaluation_config,
|
|
ep.pass_message,
|
|
ep.fail_message,
|
|
ep.suggestion_message,
|
|
ep.suggestion_message_type,
|
|
ep.post_action,
|
|
ep.action_config,
|
|
ep.score,
|
|
ep.area,
|
|
{tenant_code_select},
|
|
{tenant_name_select},
|
|
ep.created_at,
|
|
ep.updated_at,
|
|
child_group.name AS group_name,
|
|
parent_group.name AS rule_type
|
|
FROM evaluation_points ep
|
|
LEFT JOIN evaluation_point_groups child_group ON child_group.id = ep.evaluation_point_groups_id
|
|
LEFT JOIN evaluation_point_groups parent_group ON parent_group.id = COALESCE(ep.evaluation_point_groups_pid, child_group.pid)
|
|
WHERE {where_clause}
|
|
ORDER BY COALESCE(ep.sort, 0) ASC, ep.updated_at DESC, ep.id DESC
|
|
LIMIT :limit OFFSET :offset
|
|
"""
|
|
)
|
|
total = int(
|
|
(
|
|
await session.execute(
|
|
count_sql,
|
|
params,
|
|
)
|
|
).scalar_one()
|
|
)
|
|
rows = (
|
|
await session.execute(
|
|
list_sql,
|
|
params,
|
|
)
|
|
).mappings().all()
|
|
|
|
return EvaluationPointListVO(
|
|
data=[await self._to_point_vo(row) for row in rows],
|
|
total=total,
|
|
page=Page,
|
|
page_size=PageSize,
|
|
)
|
|
|
|
async def GetPoint(
|
|
self,
|
|
CurrentUserId: int,
|
|
UserArea: str | None,
|
|
UserRole: str | None,
|
|
TenantCode: str | None,
|
|
TenantName: str | None,
|
|
PointId: int,
|
|
) -> EvaluationPointVO:
|
|
current_user = await self._get_current_user_context(
|
|
CurrentUserId=CurrentUserId,
|
|
UserArea=UserArea,
|
|
UserRole=UserRole,
|
|
TenantCode=TenantCode,
|
|
TenantName=TenantName,
|
|
)
|
|
row = await self._get_point_row(PointId=PointId, CurrentUser=current_user)
|
|
if not row:
|
|
raise LeauditException(StatusCodeEnum.NOT_FOUND, "评查点不存在")
|
|
return await self._to_point_vo(row)
|
|
|
|
async def CreatePoint(
|
|
self,
|
|
CurrentUserId: int,
|
|
UserArea: str | None,
|
|
UserRole: str | None,
|
|
TenantCode: str | None,
|
|
TenantName: str | None,
|
|
Body: EvaluationPointCreateDTO,
|
|
) -> EvaluationPointVO:
|
|
current_user = await self._get_current_user_context(
|
|
CurrentUserId=CurrentUserId,
|
|
UserArea=UserArea,
|
|
UserRole=UserRole,
|
|
TenantCode=TenantCode,
|
|
TenantName=TenantName,
|
|
)
|
|
await self._validate_group_relation(Body.evaluation_point_groups_pid, Body.evaluation_point_groups_id)
|
|
await self._ensure_code_unique(str(Body.code).strip())
|
|
|
|
now = datetime.utcnow()
|
|
insert_params = self._build_write_params(Body, now)
|
|
requested_scope = await self._resolve_requested_scope(
|
|
Area=Body.area,
|
|
TenantCode=Body.tenant_code,
|
|
TenantName=Body.tenant_name,
|
|
Source="evaluation_point_create",
|
|
)
|
|
writable_scope = self._resolve_writable_scope(
|
|
current_user,
|
|
RequestedScope=requested_scope,
|
|
)
|
|
insert_params["area"] = writable_scope["area"]
|
|
insert_params["created_at"] = now
|
|
insert_params["updated_at"] = now
|
|
|
|
async with _LegacySession() as session:
|
|
evaluation_point_columns = await self._load_legacy_table_columns("evaluation_points", Session=session)
|
|
insert_fields = [
|
|
"code",
|
|
"name",
|
|
"evaluation_point_groups_id",
|
|
"evaluation_point_groups_pid",
|
|
"risk",
|
|
"description",
|
|
"is_enabled",
|
|
"document_attribute_type",
|
|
"references_laws",
|
|
"extraction_config",
|
|
"evaluation_config",
|
|
"pass_message",
|
|
"fail_message",
|
|
"suggestion_message",
|
|
"suggestion_message_type",
|
|
"post_action",
|
|
"action_config",
|
|
"score",
|
|
"area",
|
|
"created_at",
|
|
"updated_at",
|
|
]
|
|
insert_values = [
|
|
":code",
|
|
":name",
|
|
":evaluation_point_groups_id",
|
|
":evaluation_point_groups_pid",
|
|
":risk",
|
|
":description",
|
|
":is_enabled",
|
|
":document_attribute_type",
|
|
"CAST(:references_laws AS jsonb)",
|
|
"CAST(:extraction_config AS jsonb)",
|
|
"CAST(:evaluation_config AS jsonb)",
|
|
":pass_message",
|
|
":fail_message",
|
|
":suggestion_message",
|
|
":suggestion_message_type",
|
|
":post_action",
|
|
":action_config",
|
|
":score",
|
|
":area",
|
|
":created_at",
|
|
":updated_at",
|
|
]
|
|
if "tenant_code" in evaluation_point_columns:
|
|
insert_params["tenant_code"] = writable_scope["tenant_code"]
|
|
insert_fields.append("tenant_code")
|
|
insert_values.append(":tenant_code")
|
|
if "tenant_name" in evaluation_point_columns:
|
|
insert_params["tenant_name"] = writable_scope["tenant_name"]
|
|
insert_fields.append("tenant_name")
|
|
insert_values.append(":tenant_name")
|
|
async with session.begin():
|
|
new_id = await session.scalar(
|
|
text(
|
|
f"""
|
|
INSERT INTO evaluation_points (
|
|
{", ".join(insert_fields)}
|
|
) VALUES (
|
|
{", ".join(insert_values)}
|
|
)
|
|
RETURNING id
|
|
"""
|
|
),
|
|
insert_params,
|
|
)
|
|
await session.commit()
|
|
|
|
return await self.GetPoint(CurrentUserId, UserArea, UserRole, TenantCode, TenantName, int(new_id))
|
|
|
|
async def UpdatePoint(
|
|
self,
|
|
CurrentUserId: int,
|
|
UserArea: str | None,
|
|
UserRole: str | None,
|
|
TenantCode: str | None,
|
|
TenantName: str | None,
|
|
PointId: int,
|
|
Body: EvaluationPointUpdateDTO,
|
|
) -> EvaluationPointVO:
|
|
current_user = await self._get_current_user_context(
|
|
CurrentUserId=CurrentUserId,
|
|
UserArea=UserArea,
|
|
UserRole=UserRole,
|
|
TenantCode=TenantCode,
|
|
TenantName=TenantName,
|
|
)
|
|
await self.GetPoint(CurrentUserId, UserArea, UserRole, TenantCode, TenantName, PointId)
|
|
|
|
payload = Body.model_dump(exclude_unset=True)
|
|
if not payload:
|
|
return await self.GetPoint(CurrentUserId, UserArea, UserRole, TenantCode, TenantName, PointId)
|
|
|
|
group_pid = payload.get("evaluation_point_groups_pid")
|
|
group_id = payload.get("evaluation_point_groups_id")
|
|
if group_pid is not None or group_id is not None:
|
|
current = await self.GetPoint(CurrentUserId, UserArea, UserRole, TenantCode, TenantName, PointId)
|
|
await self._validate_group_relation(
|
|
group_pid if group_pid is not None else current.evaluation_point_groups_pid,
|
|
group_id if group_id is not None else current.evaluation_point_groups_id,
|
|
)
|
|
|
|
if "code" in payload and payload["code"]:
|
|
await self._ensure_code_unique(str(payload["code"]).strip(), PointId)
|
|
|
|
updates: list[str] = []
|
|
params: dict[str, Any] = {"point_id": PointId, "updated_at": datetime.utcnow()}
|
|
simple_fields = [
|
|
"code",
|
|
"name",
|
|
"evaluation_point_groups_id",
|
|
"evaluation_point_groups_pid",
|
|
"risk",
|
|
"description",
|
|
"is_enabled",
|
|
"document_attribute_type",
|
|
"pass_message",
|
|
"fail_message",
|
|
"suggestion_message",
|
|
"suggestion_message_type",
|
|
"post_action",
|
|
"action_config",
|
|
"score",
|
|
]
|
|
json_fields = ["references_laws", "extraction_config", "evaluation_config"]
|
|
|
|
for field in simple_fields:
|
|
if field not in payload:
|
|
continue
|
|
params[field] = self._normalize_scalar_field(field, payload[field])
|
|
updates.append(f"{field} = :{field}")
|
|
|
|
if any(key in payload for key in ("area", "tenant_code", "tenant_name")):
|
|
requested_scope = await self._resolve_requested_scope(
|
|
Area=payload.get("area"),
|
|
TenantCode=payload.get("tenant_code"),
|
|
TenantName=payload.get("tenant_name"),
|
|
Source="evaluation_point_update",
|
|
)
|
|
writable_scope = self._resolve_writable_scope(
|
|
current_user,
|
|
RequestedScope=requested_scope,
|
|
)
|
|
params["area"] = writable_scope["area"]
|
|
if "area = :area" not in updates:
|
|
updates.append("area = :area")
|
|
|
|
for field in json_fields:
|
|
if field not in payload:
|
|
continue
|
|
params[field] = json.dumps(payload[field] if payload[field] is not None else self._default_json(field), ensure_ascii=False)
|
|
updates.append(f"{field} = CAST(:{field} AS jsonb)")
|
|
|
|
updates.append("updated_at = :updated_at")
|
|
|
|
async with _LegacySession() as session:
|
|
evaluation_point_columns = await self._load_legacy_table_columns("evaluation_points", Session=session)
|
|
if "tenant_code" in evaluation_point_columns and "tenant_code" not in params and "writable_scope" in locals():
|
|
params["tenant_code"] = writable_scope["tenant_code"]
|
|
updates.append("tenant_code = :tenant_code")
|
|
if "tenant_name" in evaluation_point_columns and "tenant_name" not in params and "writable_scope" in locals():
|
|
params["tenant_name"] = writable_scope["tenant_name"]
|
|
updates.append("tenant_name = :tenant_name")
|
|
async with session.begin():
|
|
await session.execute(
|
|
text(f"UPDATE evaluation_points SET {', '.join(updates)} WHERE id = :point_id"),
|
|
params,
|
|
)
|
|
await session.commit()
|
|
|
|
return await self.GetPoint(CurrentUserId, UserArea, UserRole, TenantCode, TenantName, PointId)
|
|
|
|
async def DeletePoint(
|
|
self,
|
|
CurrentUserId: int,
|
|
UserArea: str | None,
|
|
UserRole: str | None,
|
|
TenantCode: str | None,
|
|
TenantName: str | None,
|
|
PointId: int,
|
|
) -> EvaluationPointDeleteVO:
|
|
current_user = await self._get_current_user_context(
|
|
CurrentUserId=CurrentUserId,
|
|
UserArea=UserArea,
|
|
UserRole=UserRole,
|
|
TenantCode=TenantCode,
|
|
TenantName=TenantName,
|
|
)
|
|
self._assert_can_write(current_user)
|
|
await self.GetPoint(CurrentUserId, UserArea, UserRole, TenantCode, TenantName, PointId)
|
|
async with _LegacySession() as session:
|
|
async with session.begin():
|
|
await session.execute(text("DELETE FROM evaluation_points WHERE id = :point_id"), {"point_id": PointId})
|
|
await session.commit()
|
|
return EvaluationPointDeleteVO(success=True, message="评查点删除成功")
|
|
|
|
async def GetAttributeTypes(self) -> AttributeTypeListVO:
|
|
async with _LegacySession() as session:
|
|
rows = (
|
|
await session.execute(
|
|
text(
|
|
"""
|
|
SELECT DISTINCT TRIM(document_attribute_type) AS code
|
|
FROM evaluation_points
|
|
WHERE document_attribute_type IS NOT NULL
|
|
AND TRIM(document_attribute_type) <> ''
|
|
ORDER BY TRIM(document_attribute_type) ASC
|
|
"""
|
|
)
|
|
)
|
|
).scalars().all()
|
|
|
|
types = [AttributeTypeVO(code=str(item), label=str(item)) for item in rows if item]
|
|
if not any(item.code == "ALL" for item in types):
|
|
types.insert(0, AttributeTypeVO(code="ALL", label="通用"))
|
|
return AttributeTypeListVO(types=types)
|
|
|
|
def _build_list_filters(
|
|
self,
|
|
Name: str | None,
|
|
Code: str | None,
|
|
Risk: str | None,
|
|
IsEnabled: bool | None,
|
|
GroupPid: int | None,
|
|
GroupId: int | None,
|
|
DocumentAttributeType: str | None,
|
|
RequestedScope: dict[str, str],
|
|
CurrentUser: dict[str, Any],
|
|
EvaluationPointColumns: set[str],
|
|
) -> tuple[str, dict[str, Any]]:
|
|
filters = ["1=1"]
|
|
params: dict[str, Any] = {}
|
|
|
|
name = (Name or "").strip()
|
|
code = (Code or "").strip()
|
|
if name and code:
|
|
if name == code:
|
|
filters.append("(ep.name ILIKE :keyword OR ep.code ILIKE :keyword)")
|
|
params["keyword"] = f"%{name}%"
|
|
else:
|
|
filters.append("ep.name ILIKE :name")
|
|
filters.append("ep.code ILIKE :code")
|
|
params["name"] = f"%{name}%"
|
|
params["code"] = f"%{code}%"
|
|
elif name:
|
|
filters.append("ep.name ILIKE :name")
|
|
params["name"] = f"%{name}%"
|
|
elif code:
|
|
filters.append("ep.code ILIKE :code")
|
|
params["code"] = f"%{code}%"
|
|
|
|
if Risk:
|
|
filters.append("ep.risk = :risk")
|
|
params["risk"] = Risk
|
|
if IsEnabled is not None:
|
|
filters.append("ep.is_enabled = :is_enabled")
|
|
params["is_enabled"] = IsEnabled
|
|
if GroupPid is not None:
|
|
filters.append("ep.evaluation_point_groups_pid = :group_pid")
|
|
params["group_pid"] = GroupPid
|
|
if GroupId is not None:
|
|
filters.append("ep.evaluation_point_groups_id = :group_id")
|
|
params["group_id"] = GroupId
|
|
if DocumentAttributeType:
|
|
if DocumentAttributeType == "ALL":
|
|
filters.append("(ep.document_attribute_type = 'ALL' OR ep.document_attribute_type = '通用' OR ep.document_attribute_type IS NULL OR TRIM(ep.document_attribute_type) = '')")
|
|
else:
|
|
filters.append("ep.document_attribute_type = :document_attribute_type")
|
|
params["document_attribute_type"] = DocumentAttributeType
|
|
filters.extend(
|
|
self._build_scope_filters(
|
|
CurrentUser=CurrentUser,
|
|
Params=params,
|
|
RequestedScope=RequestedScope,
|
|
EvaluationPointColumns=EvaluationPointColumns,
|
|
)
|
|
)
|
|
|
|
return " AND ".join(filters), params
|
|
|
|
async def _validate_group_relation(self, GroupPid: int | None, GroupId: int | None) -> None:
|
|
if not GroupPid:
|
|
raise LeauditException(StatusCodeEnum.BAD_REQUEST, "评查点类型不能为空")
|
|
if not GroupId:
|
|
raise LeauditException(StatusCodeEnum.BAD_REQUEST, "所属规则组不能为空")
|
|
|
|
async with _LegacySession() as session:
|
|
parent = (
|
|
await session.execute(
|
|
text("SELECT id, pid, is_enabled FROM evaluation_point_groups WHERE id = :group_pid"),
|
|
{"group_pid": GroupPid},
|
|
)
|
|
).mappings().first()
|
|
child = (
|
|
await session.execute(
|
|
text("SELECT id, pid, is_enabled FROM evaluation_point_groups WHERE id = :group_id"),
|
|
{"group_id": GroupId},
|
|
)
|
|
).mappings().first()
|
|
|
|
if not parent:
|
|
raise LeauditException(StatusCodeEnum.BAD_REQUEST, "评查点类型不存在")
|
|
if not child:
|
|
raise LeauditException(StatusCodeEnum.BAD_REQUEST, "所属规则组不存在")
|
|
if int(child.get("pid") or 0) != int(GroupPid):
|
|
raise LeauditException(StatusCodeEnum.BAD_REQUEST, "所属规则组与评查点类型不匹配")
|
|
|
|
async def _ensure_code_unique(self, Code: str, PointId: int | None = None) -> None:
|
|
params: dict[str, Any] = {"code": Code}
|
|
sql = "SELECT id FROM evaluation_points WHERE LOWER(code) = LOWER(:code)"
|
|
if PointId is not None:
|
|
sql += " AND id <> :point_id"
|
|
params["point_id"] = PointId
|
|
|
|
async with _LegacySession() as session:
|
|
exists = (
|
|
await session.execute(text(sql), params)
|
|
).scalar_one_or_none()
|
|
|
|
if exists:
|
|
raise LeauditException(StatusCodeEnum.BAD_REQUEST, "评查点编码已存在")
|
|
|
|
def _build_write_params(self, Body: EvaluationPointCreateDTO, Now: datetime) -> dict[str, Any]:
|
|
return {
|
|
"code": str(Body.code).strip(),
|
|
"name": str(Body.name).strip(),
|
|
"evaluation_point_groups_id": Body.evaluation_point_groups_id,
|
|
"evaluation_point_groups_pid": Body.evaluation_point_groups_pid,
|
|
"risk": Body.risk,
|
|
"description": Body.description or "",
|
|
"is_enabled": bool(Body.is_enabled),
|
|
"document_attribute_type": self._normalize_document_attribute_type(Body.document_attribute_type),
|
|
"references_laws": json.dumps(Body.references_laws or self._default_json("references_laws"), ensure_ascii=False),
|
|
"extraction_config": json.dumps(Body.extraction_config or self._default_json("extraction_config"), ensure_ascii=False),
|
|
"evaluation_config": json.dumps(Body.evaluation_config or self._default_json("evaluation_config"), ensure_ascii=False),
|
|
"pass_message": Body.pass_message or "",
|
|
"fail_message": Body.fail_message or "",
|
|
"suggestion_message": Body.suggestion_message or "",
|
|
"suggestion_message_type": Body.suggestion_message_type or "warning",
|
|
"post_action": Body.post_action or "none",
|
|
"action_config": Body.action_config or "",
|
|
"score": float(Body.score or 0),
|
|
"area": None,
|
|
"created_at": Now,
|
|
"updated_at": Now,
|
|
}
|
|
|
|
async def _to_point_vo(self, row: dict[str, Any]) -> EvaluationPointVO:
|
|
group_id = row.get("evaluation_point_groups_id")
|
|
tenant = await self._resolve_record_tenant(
|
|
Area=row.get("area"),
|
|
TenantCode=row.get("tenant_code"),
|
|
TenantName=row.get("tenant_name"),
|
|
)
|
|
return EvaluationPointVO(
|
|
id=int(row["id"]),
|
|
code=str(row.get("code") or ""),
|
|
name=str(row.get("name") or ""),
|
|
evaluation_point_groups_id=int(group_id) if group_id is not None else None,
|
|
evaluation_point_groups_pid=int(row["evaluation_point_groups_pid"]) if row.get("evaluation_point_groups_pid") is not None else None,
|
|
ruleType=str(row.get("rule_type") or ""),
|
|
groupName=str(row.get("group_name") or ""),
|
|
groupId=str(group_id) if group_id is not None else "",
|
|
risk=str(row.get("risk") or ""),
|
|
description=str(row.get("description") or ""),
|
|
is_enabled=bool(row.get("is_enabled")),
|
|
document_attribute_type=self._normalize_document_attribute_type(row.get("document_attribute_type")),
|
|
references_laws=self._parse_json(row.get("references_laws"), self._default_json("references_laws")),
|
|
extraction_config=self._parse_json(row.get("extraction_config"), self._default_json("extraction_config")),
|
|
evaluation_config=self._parse_json(row.get("evaluation_config"), self._default_json("evaluation_config")),
|
|
pass_message=str(row.get("pass_message") or ""),
|
|
fail_message=str(row.get("fail_message") or ""),
|
|
suggestion_message=str(row.get("suggestion_message") or ""),
|
|
suggestion_message_type=str(row.get("suggestion_message_type") or "warning"),
|
|
post_action=str(row.get("post_action") or "none"),
|
|
action_config=str(row.get("action_config") or ""),
|
|
score=self._normalize_score(row.get("score")),
|
|
area=str(row.get("area") or ""),
|
|
tenantCode=tenant.tenant_code or "",
|
|
tenantName=tenant.tenant_name or str(row.get("tenant_name") or row.get("area") or ""),
|
|
created_at=self._format_datetime(row.get("created_at")),
|
|
updated_at=self._format_datetime(row.get("updated_at")),
|
|
)
|
|
|
|
def _parse_json(self, value: Any, default: Any) -> Any:
|
|
if value is None:
|
|
return default
|
|
if isinstance(value, (dict, list)):
|
|
return value
|
|
if isinstance(value, str):
|
|
try:
|
|
return json.loads(value)
|
|
except json.JSONDecodeError:
|
|
return default
|
|
return value
|
|
|
|
def _format_datetime(self, value: Any) -> str | None:
|
|
if value is None:
|
|
return None
|
|
if isinstance(value, datetime):
|
|
return value.isoformat()
|
|
return str(value)
|
|
|
|
def _normalize_document_attribute_type(self, value: Any) -> str:
|
|
normalized = str(value or "").strip()
|
|
if not normalized:
|
|
return "通用"
|
|
if normalized == "ALL":
|
|
return "通用"
|
|
return normalized
|
|
|
|
def _normalize_score(self, value: Any) -> float:
|
|
if value is None:
|
|
return 0
|
|
if isinstance(value, Decimal):
|
|
return float(value)
|
|
if isinstance(value, (int, float)):
|
|
return float(value)
|
|
try:
|
|
return float(value)
|
|
except (TypeError, ValueError):
|
|
return 0
|
|
|
|
def _normalize_scalar_field(self, field: str, value: Any) -> Any:
|
|
if field in {"code", "name"}:
|
|
return str(value or "").strip()
|
|
if field == "document_attribute_type":
|
|
return self._normalize_document_attribute_type(value)
|
|
if field == "score":
|
|
return self._normalize_score(value)
|
|
return value
|
|
|
|
async def _resolve_area_value(
|
|
self,
|
|
*,
|
|
Area: str | None,
|
|
TenantCode: str | None,
|
|
TenantName: str | None,
|
|
Source: str,
|
|
) -> str | None:
|
|
resolution = await self.TenantResolver.Resolve(
|
|
RawValue=Area,
|
|
Source=Source,
|
|
PreferredTenantCode=str(TenantCode or "").strip() or None,
|
|
FallbackTenantName=TenantName,
|
|
)
|
|
normalized = resolution.tenant_name or resolution.normalized_value or str(Area or "").strip()
|
|
normalized = normalized.strip()
|
|
return normalized or None
|
|
|
|
async def _resolve_requested_scope(
|
|
self,
|
|
*,
|
|
Area: str | None,
|
|
TenantCode: str | None,
|
|
TenantName: str | None,
|
|
Source: str,
|
|
) -> dict[str, str]:
|
|
resolution = await self.TenantResolver.Resolve(
|
|
RawValue=Area,
|
|
Source=Source,
|
|
PreferredTenantCode=str(TenantCode or "").strip() or None,
|
|
FallbackTenantName=TenantName,
|
|
)
|
|
requested_scope = {
|
|
"tenant_code": str(resolution.tenant_code or TenantCode or "").strip(),
|
|
"tenant_name": str(resolution.tenant_name or TenantName or "").strip(),
|
|
"normalized_value": str(resolution.normalized_value or Area or "").strip(),
|
|
}
|
|
shared_scope = self._normalize_shared_writable_scope(requested_scope)
|
|
if shared_scope:
|
|
return {
|
|
"tenant_code": shared_scope["tenant_code"],
|
|
"tenant_name": shared_scope["tenant_name"],
|
|
"normalized_value": requested_scope["normalized_value"],
|
|
}
|
|
return requested_scope
|
|
|
|
async def _resolve_record_tenant(
|
|
self,
|
|
Area: str | None,
|
|
TenantCode: str | None = None,
|
|
TenantName: str | None = None,
|
|
):
|
|
return await self.TenantResolver.Resolve(
|
|
RawValue=Area,
|
|
Source="evaluation_point_record",
|
|
PreferredTenantCode=str(TenantCode or "").strip() or None,
|
|
FallbackTenantName=TenantName,
|
|
)
|
|
|
|
async def _get_current_user_context(
|
|
self,
|
|
*,
|
|
CurrentUserId: int,
|
|
UserArea: str | None,
|
|
UserRole: str | None,
|
|
TenantCode: str | None,
|
|
TenantName: str | None,
|
|
) -> dict[str, Any]:
|
|
async with GetAsyncSession() as session:
|
|
sso_user_columns = await SsoUserCompat.get_columns(session)
|
|
tenant_code_select = SsoUserCompat.optional_coalesce_as(
|
|
sso_user_columns,
|
|
alias="u",
|
|
column="tenant_code",
|
|
fallback_sql="''",
|
|
)
|
|
tenant_name_select = SsoUserCompat.optional_coalesce_as(
|
|
sso_user_columns,
|
|
alias="u",
|
|
column="tenant_name",
|
|
fallback_sql="''",
|
|
)
|
|
row = (
|
|
await session.execute(
|
|
text(
|
|
f"""
|
|
SELECT
|
|
u.id,
|
|
COALESCE(u.area, '') AS area,
|
|
{tenant_code_select},
|
|
{tenant_name_select},
|
|
COALESCE(bool_or(COALESCE(r.data_scope, '') = 'ALL'), FALSE) AS is_global
|
|
FROM sso_users u
|
|
LEFT JOIN user_role ur ON ur.user_id = u.id
|
|
LEFT JOIN roles r ON r.id = ur.role_id
|
|
WHERE u.id = :user_id
|
|
GROUP BY u.id, u.area
|
|
"""
|
|
),
|
|
{"user_id": CurrentUserId},
|
|
)
|
|
).mappings().first()
|
|
|
|
if row:
|
|
tenant = await self.TenantResolver.ResolveUserContext(
|
|
Area=str(row["area"] or ""),
|
|
TenantCode=str(row.get("tenant_code") or "") or None,
|
|
TenantName=str(row.get("tenant_name") or "") or None,
|
|
Source="evaluation_point_user_context",
|
|
)
|
|
can_manage = await self.PermissionService.HasAnyPermission(CurrentUserId, list(self._WRITE_PERMISSION_KEYS))
|
|
return {
|
|
"id": int(row["id"]),
|
|
"area": str(row["area"] or ""),
|
|
"tenant_code": tenant.tenant_code or (str(row.get("tenant_code") or "") or None),
|
|
"tenant_name": tenant.tenant_name or (str(row.get("tenant_name") or "") or None),
|
|
"tenant_scope_value": tenant.tenant_name or tenant.normalized_value or str(row["area"] or ""),
|
|
"is_global": bool(row["is_global"]),
|
|
"can_manage": can_manage,
|
|
}
|
|
|
|
tenant = await self.TenantResolver.ResolveUserContext(
|
|
Area=UserArea,
|
|
TenantCode=TenantCode,
|
|
TenantName=TenantName,
|
|
Source="evaluation_point_user_context_fallback",
|
|
)
|
|
can_manage = await self.PermissionService.HasAnyPermission(CurrentUserId, list(self._WRITE_PERMISSION_KEYS))
|
|
return {
|
|
"id": int(CurrentUserId),
|
|
"area": str(UserArea or ""),
|
|
"tenant_code": tenant.tenant_code or (str(TenantCode or "").strip() or None),
|
|
"tenant_name": tenant.tenant_name or (str(TenantName or "").strip() or None),
|
|
"tenant_scope_value": tenant.tenant_name or tenant.normalized_value or str(UserArea or ""),
|
|
"is_global": False,
|
|
"can_manage": can_manage,
|
|
}
|
|
|
|
def _build_scope_filters(
|
|
self,
|
|
*,
|
|
CurrentUser: dict[str, Any],
|
|
Params: dict[str, Any],
|
|
RequestedScope: dict[str, str],
|
|
EvaluationPointColumns: set[str],
|
|
) -> list[str]:
|
|
current_tenant_code = str(CurrentUser.get("tenant_code") or "").strip()
|
|
current_tenant_name = str(CurrentUser.get("tenant_name") or CurrentUser.get("tenant_scope_value") or CurrentUser.get("area") or "").strip()
|
|
requested_tenant_code = str(RequestedScope.get("tenant_code") or "").strip()
|
|
requested_tenant_name = str(
|
|
RequestedScope.get("tenant_name") or RequestedScope.get("normalized_value") or ""
|
|
).strip()
|
|
shared_scope_codes = {"PUBLIC", "PROVINCIAL"}
|
|
shared_scope_names = {"公共", "省级"}
|
|
|
|
if CurrentUser.get("is_global"):
|
|
if requested_tenant_code or requested_tenant_name:
|
|
return [
|
|
self._tenant_scope_match_sql(
|
|
alias="ep",
|
|
columns=EvaluationPointColumns,
|
|
params=Params,
|
|
prefix="requested_scope",
|
|
tenant_code=requested_tenant_code or None,
|
|
tenant_name=requested_tenant_name or requested_tenant_code,
|
|
)
|
|
]
|
|
return ["1=1"]
|
|
|
|
if not current_tenant_code and not current_tenant_name:
|
|
return ["1=0"]
|
|
|
|
if requested_tenant_code or requested_tenant_name:
|
|
allowed = False
|
|
effective_tenant_code = requested_tenant_code
|
|
effective_tenant_name = requested_tenant_name or requested_tenant_code
|
|
if requested_tenant_code in shared_scope_codes or effective_tenant_name in shared_scope_names:
|
|
allowed = True
|
|
elif current_tenant_code and requested_tenant_code == current_tenant_code:
|
|
allowed = True
|
|
effective_tenant_code = current_tenant_code
|
|
effective_tenant_name = effective_tenant_name or current_tenant_name
|
|
elif requested_tenant_name and requested_tenant_name == current_tenant_name:
|
|
allowed = True
|
|
effective_tenant_code = effective_tenant_code or current_tenant_code
|
|
effective_tenant_name = current_tenant_name
|
|
if not allowed:
|
|
return ["1=0"]
|
|
return [
|
|
self._tenant_scope_match_sql(
|
|
alias="ep",
|
|
columns=EvaluationPointColumns,
|
|
params=Params,
|
|
prefix="requested_scope",
|
|
tenant_code=effective_tenant_code or None,
|
|
tenant_name=effective_tenant_name,
|
|
)
|
|
]
|
|
|
|
visible_scopes: list[tuple[str | None, str]] = [
|
|
("PROVINCIAL", "省级"),
|
|
("PUBLIC", "公共"),
|
|
]
|
|
if current_tenant_code or current_tenant_name:
|
|
visible_scopes.append((current_tenant_code or None, current_tenant_name))
|
|
scope_clauses = [
|
|
self._tenant_scope_match_sql(
|
|
alias="ep",
|
|
columns=EvaluationPointColumns,
|
|
params=Params,
|
|
prefix=f"visible_scope_{index}",
|
|
tenant_code=tenant_code,
|
|
tenant_name=tenant_name,
|
|
)
|
|
for index, (tenant_code, tenant_name) in enumerate(visible_scopes)
|
|
if tenant_name
|
|
]
|
|
return [f"({' OR '.join(scope_clauses)})"] if scope_clauses else ["1=0"]
|
|
|
|
def _assert_can_write(self, current_user: dict[str, Any]) -> None:
|
|
if current_user.get("can_manage"):
|
|
return
|
|
raise LeauditException(StatusCodeEnum.HTTP_403_FORBIDDEN, "当前用户没有跨租户评查点管理权限")
|
|
|
|
def _normalize_shared_writable_scope(self, requested_scope: dict[str, str]) -> dict[str, str] | None:
|
|
requested_tenant_code = str(requested_scope.get("tenant_code") or "").strip()
|
|
requested_tenant_name = str(requested_scope.get("tenant_name") or "").strip()
|
|
requested_normalized_value = str(requested_scope.get("normalized_value") or "").strip()
|
|
|
|
if requested_tenant_code == "PROVINCIAL":
|
|
return {"area": "省级", "tenant_code": "PROVINCIAL", "tenant_name": requested_tenant_name or "省级"}
|
|
if requested_tenant_code == "PUBLIC":
|
|
return {"area": "公共", "tenant_code": "PUBLIC", "tenant_name": requested_tenant_name or "公共"}
|
|
if requested_tenant_name == "省级" or requested_normalized_value == "省级":
|
|
return {"area": "省级", "tenant_code": "PROVINCIAL", "tenant_name": "省级"}
|
|
if requested_tenant_name == "公共" or requested_normalized_value in {"公共", "default"}:
|
|
return {"area": "公共", "tenant_code": "PUBLIC", "tenant_name": "公共"}
|
|
return None
|
|
|
|
def _resolve_writable_scope(
|
|
self,
|
|
current_user: dict[str, Any],
|
|
*,
|
|
RequestedScope: dict[str, str],
|
|
) -> dict[str, str]:
|
|
self._assert_can_write(current_user)
|
|
current_tenant_code = str(current_user.get("tenant_code") or "").strip()
|
|
current_tenant_name = str(
|
|
current_user.get("tenant_name") or current_user.get("tenant_scope_value") or current_user.get("area") or ""
|
|
).strip()
|
|
current_area = str(current_user.get("tenant_scope_value") or current_user.get("area") or current_tenant_name).strip()
|
|
requested_tenant_code = str(RequestedScope.get("tenant_code") or "").strip()
|
|
requested_tenant_name = str(RequestedScope.get("tenant_name") or "").strip()
|
|
requested_normalized_value = str(RequestedScope.get("normalized_value") or "").strip()
|
|
shared_scope = self._normalize_shared_writable_scope(RequestedScope)
|
|
|
|
if current_user.get("is_global"):
|
|
if not requested_tenant_code and not requested_tenant_name and not requested_normalized_value:
|
|
raise LeauditException(StatusCodeEnum.HTTP_400_BAD_REQUEST, "请显式指定评查点所属租户或共享域")
|
|
if shared_scope:
|
|
return shared_scope
|
|
effective_tenant_code = requested_tenant_code
|
|
effective_tenant_name = requested_tenant_name or requested_normalized_value or effective_tenant_code
|
|
if not effective_tenant_code:
|
|
raise LeauditException(StatusCodeEnum.HTTP_400_BAD_REQUEST, "请使用标准 tenant_code 维护非公共评查点")
|
|
return {
|
|
"area": effective_tenant_name,
|
|
"tenant_code": effective_tenant_code,
|
|
"tenant_name": effective_tenant_name,
|
|
}
|
|
if not current_tenant_code and not current_tenant_name:
|
|
raise LeauditException(StatusCodeEnum.HTTP_403_FORBIDDEN, "当前租户管理员未配置所属租户,无法写入评查点")
|
|
current_scope = self._normalize_shared_writable_scope(
|
|
{
|
|
"tenant_code": current_tenant_code,
|
|
"tenant_name": current_tenant_name,
|
|
"normalized_value": current_area,
|
|
}
|
|
) or {
|
|
"area": current_tenant_name or current_area or current_tenant_code,
|
|
"tenant_code": current_tenant_code,
|
|
"tenant_name": current_tenant_name or current_area or current_tenant_code,
|
|
}
|
|
if not requested_tenant_code and not requested_tenant_name and not requested_normalized_value:
|
|
return current_scope
|
|
if shared_scope and shared_scope["tenant_code"] != current_scope["tenant_code"]:
|
|
raise LeauditException(StatusCodeEnum.HTTP_403_FORBIDDEN, "当前仅允许维护本人所属租户的评查点")
|
|
if requested_tenant_code and current_tenant_code and requested_tenant_code != current_tenant_code:
|
|
raise LeauditException(StatusCodeEnum.HTTP_403_FORBIDDEN, "当前仅允许维护本人所属租户的评查点")
|
|
if requested_tenant_name and not requested_tenant_code and requested_tenant_name != current_tenant_name:
|
|
raise LeauditException(StatusCodeEnum.HTTP_403_FORBIDDEN, "当前仅允许维护本人所属租户的评查点")
|
|
if (
|
|
requested_normalized_value
|
|
and not requested_tenant_code
|
|
and not requested_tenant_name
|
|
and requested_normalized_value != current_area
|
|
and requested_normalized_value != current_tenant_name
|
|
):
|
|
raise LeauditException(StatusCodeEnum.HTTP_403_FORBIDDEN, "当前仅允许维护本人所属租户的评查点")
|
|
return current_scope
|
|
|
|
async def _get_point_row(self, *, PointId: int, CurrentUser: dict[str, Any]) -> Any | None:
|
|
evaluation_point_columns = await self._load_legacy_table_columns("evaluation_points")
|
|
where_clause, params = self._build_list_filters(
|
|
Name=None,
|
|
Code=None,
|
|
Risk=None,
|
|
IsEnabled=None,
|
|
GroupPid=None,
|
|
GroupId=None,
|
|
DocumentAttributeType=None,
|
|
RequestedScope={"tenant_code": "", "tenant_name": "", "normalized_value": ""},
|
|
CurrentUser=CurrentUser,
|
|
EvaluationPointColumns=evaluation_point_columns,
|
|
)
|
|
params["point_id"] = PointId
|
|
async with _LegacySession() as session:
|
|
tenant_code_select = self._evaluation_point_tenant_code_select(evaluation_point_columns, alias="ep")
|
|
tenant_name_select = self._evaluation_point_tenant_name_select(evaluation_point_columns, alias="ep")
|
|
point_sql = text(
|
|
f"""
|
|
SELECT
|
|
ep.id,
|
|
ep.code,
|
|
ep.name,
|
|
ep.evaluation_point_groups_id,
|
|
ep.evaluation_point_groups_pid,
|
|
ep.risk,
|
|
ep.description,
|
|
ep.is_enabled,
|
|
ep.document_attribute_type,
|
|
ep.references_laws,
|
|
ep.extraction_config,
|
|
ep.evaluation_config,
|
|
ep.pass_message,
|
|
ep.fail_message,
|
|
ep.suggestion_message,
|
|
ep.suggestion_message_type,
|
|
ep.post_action,
|
|
ep.action_config,
|
|
ep.score,
|
|
ep.area,
|
|
{tenant_code_select},
|
|
{tenant_name_select},
|
|
ep.created_at,
|
|
ep.updated_at,
|
|
child_group.name AS group_name,
|
|
parent_group.name AS rule_type
|
|
FROM evaluation_points ep
|
|
LEFT JOIN evaluation_point_groups child_group ON child_group.id = ep.evaluation_point_groups_id
|
|
LEFT JOIN evaluation_point_groups parent_group ON parent_group.id = COALESCE(ep.evaluation_point_groups_pid, child_group.pid)
|
|
WHERE ep.id = :point_id
|
|
AND {where_clause}
|
|
"""
|
|
)
|
|
row = (
|
|
await session.execute(
|
|
point_sql,
|
|
params,
|
|
)
|
|
).mappings().first()
|
|
return row
|
|
|
|
async def _load_legacy_table_columns(self, table_name: str, Session=None) -> set[str]:
|
|
cached = self._legacy_columns_cache.get(table_name)
|
|
if cached is not None:
|
|
return cached
|
|
|
|
if Session is not None:
|
|
rows = (
|
|
await Session.execute(
|
|
text(
|
|
"""
|
|
SELECT column_name
|
|
FROM information_schema.columns
|
|
WHERE table_schema = current_schema()
|
|
AND table_name = :table_name
|
|
"""
|
|
),
|
|
{"table_name": table_name},
|
|
)
|
|
).mappings().all()
|
|
columns = {str(row["column_name"]) for row in rows if row.get("column_name")}
|
|
self._legacy_columns_cache[table_name] = columns
|
|
return columns
|
|
|
|
async with _LegacySession() as session:
|
|
return await self._load_legacy_table_columns(table_name, Session=session)
|
|
|
|
@staticmethod
|
|
def _evaluation_point_tenant_code_expr(columns: set[str], *, alias: str) -> str:
|
|
if "tenant_code" in columns:
|
|
return f"COALESCE(NULLIF(BTRIM({alias}.tenant_code), ''), '')"
|
|
return "''"
|
|
|
|
@staticmethod
|
|
def _evaluation_point_tenant_name_expr(columns: set[str], *, alias: str) -> str:
|
|
if "tenant_name" in columns:
|
|
return f"COALESCE(NULLIF(BTRIM({alias}.tenant_name), ''), NULLIF(BTRIM({alias}.area), ''), '')"
|
|
return f"COALESCE(NULLIF(BTRIM({alias}.area), ''), '')"
|
|
|
|
def _evaluation_point_tenant_code_select(self, columns: set[str], *, alias: str) -> str:
|
|
return f"{self._evaluation_point_tenant_code_expr(columns, alias=alias)} AS tenant_code"
|
|
|
|
def _evaluation_point_tenant_name_select(self, columns: set[str], *, alias: str) -> str:
|
|
return f"{self._evaluation_point_tenant_name_expr(columns, alias=alias)} AS tenant_name"
|
|
|
|
def _tenant_scope_match_sql(
|
|
self,
|
|
*,
|
|
alias: str,
|
|
columns: set[str],
|
|
params: dict[str, Any],
|
|
prefix: str,
|
|
tenant_code: str | None,
|
|
tenant_name: str,
|
|
) -> str:
|
|
tenant_code_expr = self._evaluation_point_tenant_code_expr(columns, alias=alias)
|
|
tenant_name_expr = self._evaluation_point_tenant_name_expr(columns, alias=alias)
|
|
normalized_tenant_code = str(tenant_code or "").strip()
|
|
normalized_tenant_name = str(tenant_name or "").strip()
|
|
fallback_names = self._shared_scope_legacy_names(normalized_tenant_code, normalized_tenant_name)
|
|
if normalized_tenant_code:
|
|
params[f"{prefix}_tenant_code"] = normalized_tenant_code
|
|
if fallback_names:
|
|
fallback_clauses: list[str] = []
|
|
for index, fallback_name in enumerate(fallback_names):
|
|
key = f"{prefix}_tenant_name_{index}"
|
|
params[key] = fallback_name
|
|
fallback_clauses.append(f"{tenant_name_expr} = :{key}")
|
|
return (
|
|
"("
|
|
f"{tenant_code_expr} = :{prefix}_tenant_code "
|
|
"OR ("
|
|
f"{tenant_code_expr} = '' "
|
|
f"AND ({' OR '.join(fallback_clauses)})"
|
|
")"
|
|
")"
|
|
)
|
|
params[f"{prefix}_tenant_name"] = normalized_tenant_name
|
|
return (
|
|
"("
|
|
f"{tenant_code_expr} = :{prefix}_tenant_code "
|
|
"OR ("
|
|
f"{tenant_code_expr} = '' "
|
|
f"AND {tenant_name_expr} = :{prefix}_tenant_name"
|
|
")"
|
|
")"
|
|
)
|
|
params[f"{prefix}_tenant_name"] = normalized_tenant_name
|
|
return f"{tenant_name_expr} = :{prefix}_tenant_name"
|
|
|
|
@staticmethod
|
|
def _shared_scope_legacy_names(tenant_code: str, tenant_name: str) -> list[str]:
|
|
if tenant_code == "PUBLIC":
|
|
return ["公共", "default", ""]
|
|
if tenant_code == "PROVINCIAL":
|
|
return ["省级", "省局"]
|
|
if tenant_name == "公共":
|
|
return ["公共", "default", ""]
|
|
if tenant_name == "省级":
|
|
return ["省级", "省局"]
|
|
return []
|
|
|
|
def _default_json(self, field: str) -> dict[str, Any]:
|
|
if field == "references_laws":
|
|
return {"name": "", "content": "", "articles": []}
|
|
if field == "evaluation_config":
|
|
return {"logicType": "and", "customLogic": "", "rules": []}
|
|
return {
|
|
"llm": {"fields": [], "prompt_setting": {"type": "system", "template": ""}},
|
|
"vlm": {"fields": [], "prompt_setting": {"type": "system", "template": ""}},
|
|
"regex": {"fields": []},
|
|
}
|