Files
leaudit-platform-backend/fastapi_modules/fastapi_leaudit/services/impl/auditServiceImpl.py
T
wren c776af598a refactor: region from document, not app config
- Add region column to leaudit_documents + LeauditDocument model
- AuditServiceImpl: read region from document.region, not APP_REGION
- RuleServiceImpl: ListBindings/CreateBinding accept Region parameter
- RuleBindingCreateDTO: add region field
- RuleController: pass region from query param/DTO to service
- APP_REGION removed from binding queries; region flows from document

Region is now per-document: each document carries its region at upload
time, and rules are matched to the document's region at run time.
2026-04-28 14:19:29 +08:00

206 lines
8.4 KiB
Python

"""评查服务实现。
编排 LeAudit 引擎执行链路:
文档 → OCR → Extract → Evaluate → Rescue → Persist
"""
from datetime import datetime
from fastapi_common.fastapi_common_logger import logger
from fastapi_common.fastapi_common_sqlalchemy.database import GetAsyncSession
from fastapi_common.fastapi_common_web.domain.responses import StatusCodeEnum
from fastapi_common.fastapi_common_web.exception.LeauditException import LeauditException
from sqlalchemy import select, text
from fastapi_modules.fastapi_leaudit.domian.vo.auditVo import AuditRunVO, AuditResultVO
from fastapi_modules.fastapi_leaudit.leaudit_bridge.fileSourceResolver import FileSourceResolver
from fastapi_modules.fastapi_leaudit.leaudit_bridge.tasks import dispatch_leaudit_task
from fastapi_modules.fastapi_leaudit.models import (
LeauditAuditRun,
LeauditDocument,
LeauditDocumentFile,
)
from fastapi_modules.fastapi_leaudit.services import IAuditService
class AuditServiceImpl(IAuditService):
"""评查服务实现。"""
async def Run(self, DocumentId: int, RuleType: str | None = None, Force: bool = False) -> AuditRunVO:
"""触发文档评查。
当前阶段同步触发 bridge 执行链,后续再切换为 Celery 异步分发。
"""
async with GetAsyncSession() as session:
logger.info(f"触发评查: documentId={DocumentId}, ruleType={RuleType}")
document = await session.get(LeauditDocument, DocumentId)
if not document:
raise LeauditException(StatusCodeEnum.HTTP_404_NOT_FOUND, "评查文档不存在")
fileResult = await session.execute(
select(LeauditDocumentFile)
.where(
LeauditDocumentFile.documentId == DocumentId,
LeauditDocumentFile.isActive.is_(True),
)
.order_by(LeauditDocumentFile.Id.desc())
.limit(1)
)
documentFile = fileResult.scalar_one_or_none()
if not documentFile:
raise LeauditException(StatusCodeEnum.HTTP_400_BAD_REQUEST, "当前文档没有可执行文件版本")
runNoResult = await session.execute(
select(LeauditAuditRun.runNo)
.where(LeauditAuditRun.documentId == DocumentId)
.order_by(LeauditAuditRun.runNo.desc())
.limit(1)
)
latestRunNo = runNoResult.scalar_one_or_none() or 0
bindingResult = await session.execute(
text(
"""
SELECT
rs.id AS rule_set_id,
rs.current_version_id AS rule_version_id,
rv.oss_url AS rule_source_oss_url,
rv.file_sha256 AS rule_source_sha256,
rv.metadata_type_id AS rule_type_id
FROM leaudit_rule_type_bindings b
JOIN leaudit_rule_sets rs ON rs.id = b.rule_set_id
LEFT JOIN leaudit_rule_versions rv ON rv.id = rs.current_version_id
WHERE b.doc_type_id = :doc_type_id
AND b.is_active = true
AND b.region = :region
ORDER BY b.priority DESC, b.id DESC
LIMIT 1
"""
),
{"doc_type_id": document.typeId, "region": document.region},
)
binding = bindingResult.mappings().first()
if not binding or not binding["rule_set_id"] or not binding["rule_version_id"]:
raise LeauditException(StatusCodeEnum.HTTP_400_BAD_REQUEST, "当前文档类型未绑定可用规则版本")
run = LeauditAuditRun(
documentId=DocumentId,
documentFileId=documentFile.Id,
runNo=int(latestRunNo) + 1,
triggerSource="manual" if not Force else "retry",
status="pending",
ruleSetId=int(binding["rule_set_id"]),
ruleVersionId=int(binding["rule_version_id"]),
ruleTypeId=binding["rule_type_id"],
ruleSourceOssUrl=binding["rule_source_oss_url"],
ruleSourceSha256=binding["rule_source_sha256"],
startedAt=datetime.now(),
)
session.add(run)
await session.flush()
document.currentRunId = run.Id
document.processingStatus = "running"
await session.commit()
await session.refresh(run)
try:
Resolver = FileSourceResolver()
Payload = await Resolver.ResolvePayload(documentFile)
except Exception as Error:
raise LeauditException(
StatusCodeEnum.HTTP_500_INTERNAL_SERVER_ERROR,
f"读取评查文件失败: {Error}",
) from Error
dispatch_leaudit_task(
document_id=DocumentId,
file_content=Payload.fileContent,
filename=Payload.fileName,
upload_info={
"run_id": run.Id,
"rule_version_id": run.ruleVersionId,
"rule_source_oss_url": run.ruleSourceOssUrl,
"source_type": Payload.sourceType,
"source_path": Payload.sourcePath,
},
rules_path=RuleType,
)
await session.refresh(run)
return AuditRunVO(
runId=run.Id,
documentId=run.documentId,
runNo=run.runNo,
status=run.status,
phase=run.phase,
totalScore=float(run.totalScore) if run.totalScore else None,
passedCount=run.passedCount,
failedCount=run.failedCount,
startedAt=run.startedAt,
finishedAt=run.finishedAt,
)
async def GetRunStatus(self, RunId: int) -> AuditRunVO:
"""查询评查运行状态。"""
async with GetAsyncSession() as session:
run = await session.get(LeauditAuditRun, RunId)
if not run:
raise LeauditException(StatusCodeEnum.HTTP_404_NOT_FOUND, "评查运行记录不存在")
return AuditRunVO(
runId=run.Id,
documentId=run.documentId,
runNo=run.runNo,
status=run.status,
phase=run.phase,
totalScore=float(run.totalScore) if run.totalScore else None,
passedCount=run.passedCount,
failedCount=run.failedCount,
startedAt=run.startedAt,
finishedAt=run.finishedAt,
)
async def GetResult(self, RunId: int) -> AuditResultVO:
"""获取评查结果。"""
async with GetAsyncSession() as session:
run = await session.get(LeauditAuditRun, RunId)
if not run:
raise LeauditException(StatusCodeEnum.HTTP_404_NOT_FOUND, "评查运行记录不存在")
result = await session.execute(
text(
"""
SELECT
rule_id,
rule_name,
risk,
score,
passed,
status,
skip_reason,
confidence,
pass_message,
fail_message,
remediation,
extracted_fields,
field_positions,
rescue_applied,
rescue_passed
FROM leaudit_rule_results
WHERE run_id = :run_id
ORDER BY id ASC
"""
),
{"run_id": RunId},
)
rules = [dict(row) for row in result.mappings().all()]
return AuditResultVO(
runId=run.Id,
totalScore=float(run.totalScore) if run.totalScore else None,
passedCount=run.passedCount or 0,
failedCount=run.failedCount or 0,
skippedCount=run.skippedCount or 0,
phase=run.phase,
rescueApplied=run.rescueApplied or False,
rules=rules,
)