"""交叉评查服务实现(第一阶段骨架)。""" from __future__ import annotations import io import logging from datetime import datetime from math import floor from urllib.parse import quote from xml.sax.saxutils import escape from zipfile import ZIP_DEFLATED, ZipFile from sqlalchemy import bindparam, text logger = logging.getLogger(__name__) from fastapi_common.fastapi_common_sqlalchemy.database import GetAsyncSession from fastapi_common.fastapi_common_web.domain.responses import StatusCodeEnum from fastapi_common.fastapi_common_web.exception.LeauditException import LeauditException from fastapi_modules.fastapi_leaudit.domian.Dto.crossReviewDto import ( CrossReviewProposalCreateDTO, CrossReviewProposalVoteDTO, CrossReviewTaskCreateDTO, CrossReviewTaskDocumentQueryDTO, CrossReviewTaskQueryDTO, ) from fastapi_modules.fastapi_leaudit.domian.vo.crossReviewVo import ( CrossReviewTaskDocumentAppendVO, CrossReviewTaskHistoryVersionVO, CrossReviewPendingProposalVO, CrossReviewPendingVotesVO, CrossReviewPermissionVO, CrossReviewProposalCancelVO, CrossReviewProposalCreateVO, CrossReviewProposalItemVO, CrossReviewProposalPageVO, CrossReviewProposalVoteItemVO, CrossReviewProposalVoteVO, CrossReviewTaskCompleteVO, CrossReviewTaskCreateVO, CrossReviewTaskDocumentPageVO, CrossReviewTaskDocumentUploadVO, CrossReviewTaskDocumentVO, CrossReviewTaskItemVO, CrossReviewTaskPageVO, CrossReviewTaskProgressVO, ) from fastapi_modules.fastapi_leaudit.services.crossReviewService import ICrossReviewService from fastapi_modules.fastapi_leaudit.services.documentService import IDocumentService from fastapi_modules.fastapi_leaudit.services.auditService import IAuditService from fastapi_modules.fastapi_leaudit.services.impl.auditServiceImpl import AuditServiceImpl from fastapi_modules.fastapi_leaudit.services.impl.documentServiceImpl import DocumentServiceImpl class CrossReviewServiceImpl(ICrossReviewService): """交叉评查服务实现。""" _PROPOSAL_EXPORT_HEADERS: tuple[str, ...] = ( "任务名称", "文档名称", "意见分类", "评查点名称", "问题依据/命中内容", "原问题说明", "评查意见", "分数调整", "提案状态", "发起人", "提出时间", "同意人数", "同意人员", "不同意人数", "不同意人员", "未投人数", "未投人员", ) _VOTE_EXPORT_HEADERS: tuple[str, ...] = ( "任务名称", "文档名称", "意见分类", "评查点名称", "发起人", "投票人", "投票状态", "投票时间", ) _SCHEMA_BOOTSTRAP_STATEMENTS: tuple[str, ...] = ( """ CREATE TABLE IF NOT EXISTS leaudit_cross_review_tasks ( id BIGINT GENERATED ALWAYS AS IDENTITY PRIMARY KEY, task_name VARCHAR(255) NOT NULL, task_type VARCHAR(32) NOT NULL, doc_type_id BIGINT, doc_type_code VARCHAR(64), assigner_id BIGINT NOT NULL, status VARCHAR(32) NOT NULL DEFAULT 'in_progress', create_time TIMESTAMPTZ NOT NULL DEFAULT NOW(), update_time TIMESTAMPTZ NOT NULL DEFAULT NOW(), delete_time TIMESTAMPTZ ) """, "CREATE INDEX IF NOT EXISTS idx_lcr_tasks_assigner_id ON leaudit_cross_review_tasks (assigner_id)", "CREATE INDEX IF NOT EXISTS idx_lcr_tasks_status ON leaudit_cross_review_tasks (status)", "CREATE INDEX IF NOT EXISTS idx_lcr_tasks_doc_type_id ON leaudit_cross_review_tasks (doc_type_id)", """ CREATE TABLE IF NOT EXISTS leaudit_cross_review_task_members ( id BIGINT GENERATED ALWAYS AS IDENTITY PRIMARY KEY, task_id BIGINT NOT NULL, user_id BIGINT NOT NULL, member_role VARCHAR(32) NOT NULL DEFAULT 'participant', create_time TIMESTAMPTZ NOT NULL DEFAULT NOW(), update_time TIMESTAMPTZ NOT NULL DEFAULT NOW(), delete_time TIMESTAMPTZ ) """, "CREATE INDEX IF NOT EXISTS idx_lcr_task_members_task_id ON leaudit_cross_review_task_members (task_id)", "CREATE INDEX IF NOT EXISTS idx_lcr_task_members_user_id ON leaudit_cross_review_task_members (user_id)", "CREATE INDEX IF NOT EXISTS idx_lcr_task_members_role ON leaudit_cross_review_task_members (member_role)", """ CREATE UNIQUE INDEX IF NOT EXISTS uq_lcr_task_members_task_user_active ON leaudit_cross_review_task_members (task_id, user_id) WHERE delete_time IS NULL """, """ CREATE TABLE IF NOT EXISTS leaudit_cross_review_task_documents ( id BIGINT GENERATED ALWAYS AS IDENTITY PRIMARY KEY, task_id BIGINT NOT NULL, document_id BIGINT NOT NULL, audit_status INTEGER NOT NULL DEFAULT 0, create_time TIMESTAMPTZ NOT NULL DEFAULT NOW(), update_time TIMESTAMPTZ NOT NULL DEFAULT NOW(), delete_time TIMESTAMPTZ ) """, "CREATE INDEX IF NOT EXISTS idx_lcr_task_documents_task_id ON leaudit_cross_review_task_documents (task_id)", "CREATE INDEX IF NOT EXISTS idx_lcr_task_documents_document_id ON leaudit_cross_review_task_documents (document_id)", "CREATE INDEX IF NOT EXISTS idx_lcr_task_documents_task_status ON leaudit_cross_review_task_documents (task_id, audit_status)", """ CREATE UNIQUE INDEX IF NOT EXISTS uq_lcr_task_documents_task_document_active ON leaudit_cross_review_task_documents (task_id, document_id) WHERE delete_time IS NULL """, """ CREATE TABLE IF NOT EXISTS leaudit_cross_review_proposals ( id BIGINT GENERATED ALWAYS AS IDENTITY PRIMARY KEY, task_id BIGINT NOT NULL, document_id BIGINT NOT NULL, proposal_type VARCHAR(32) NOT NULL DEFAULT 'review_point', rule_result_id BIGINT, proposer_id BIGINT NOT NULL, evaluation_point_name VARCHAR(255), extraction_result_text TEXT, proposed_score_delta NUMERIC(10, 2) NOT NULL, reason TEXT NOT NULL, status VARCHAR(32) NOT NULL DEFAULT 'pending', create_time TIMESTAMPTZ NOT NULL DEFAULT NOW(), update_time TIMESTAMPTZ NOT NULL DEFAULT NOW(), delete_time TIMESTAMPTZ ) """, "CREATE INDEX IF NOT EXISTS idx_lcr_proposals_task_id ON leaudit_cross_review_proposals (task_id)", "CREATE INDEX IF NOT EXISTS idx_lcr_proposals_document_id ON leaudit_cross_review_proposals (document_id)", "CREATE INDEX IF NOT EXISTS idx_lcr_proposals_rule_result_id ON leaudit_cross_review_proposals (rule_result_id)", "CREATE INDEX IF NOT EXISTS idx_lcr_proposals_proposer_id ON leaudit_cross_review_proposals (proposer_id)", "CREATE INDEX IF NOT EXISTS idx_lcr_proposals_status ON leaudit_cross_review_proposals (status)", """ CREATE TABLE IF NOT EXISTS leaudit_cross_review_votes ( id BIGINT GENERATED ALWAYS AS IDENTITY PRIMARY KEY, proposal_id BIGINT NOT NULL, voter_id BIGINT NOT NULL, vote_type VARCHAR(16) NOT NULL, create_time TIMESTAMPTZ NOT NULL DEFAULT NOW(), update_time TIMESTAMPTZ NOT NULL DEFAULT NOW(), delete_time TIMESTAMPTZ ) """, "CREATE INDEX IF NOT EXISTS idx_lcr_votes_proposal_id ON leaudit_cross_review_votes (proposal_id)", "CREATE INDEX IF NOT EXISTS idx_lcr_votes_voter_id ON leaudit_cross_review_votes (voter_id)", """ CREATE UNIQUE INDEX IF NOT EXISTS uq_lcr_votes_proposal_voter_active ON leaudit_cross_review_votes (proposal_id, voter_id) WHERE delete_time IS NULL """, ) def __init__(self): self.DocumentService: IDocumentService = DocumentServiceImpl() self.AuditService: IAuditService = AuditServiceImpl() async def CreateTask(self, CurrentUserId: int, Body: CrossReviewTaskCreateDTO) -> CrossReviewTaskCreateVO: """创建交叉评查任务。""" async with GetAsyncSession() as session: await self._ensure_tables_ready(session) memberUserIds = self._unique_int_list(Body.memberUserIds + [CurrentUserId]) principalUserIds = self._unique_int_list(Body.principalUserIds) documentIds = self._unique_int_list(Body.documentIds) await self._reset_transaction_for_write(session) async with session.begin(): taskRow = ( await session.execute( text( """ INSERT INTO leaudit_cross_review_tasks (task_name, task_type, doc_type_id, doc_type_code, assigner_id, status) VALUES (:task_name, :task_type, :doc_type_id, :doc_type_code, :assigner_id, 'in_progress') RETURNING id, task_name """ ), { "task_name": Body.taskName.strip(), "task_type": Body.taskType, "doc_type_id": Body.docTypeId, "doc_type_code": Body.docTypeCode, "assigner_id": CurrentUserId, }, ) ).mappings().first() if not taskRow: raise LeauditException(StatusCodeEnum.HTTP_500_INTERNAL_SERVER_ERROR, "创建交叉评查任务失败") taskId = int(taskRow["id"]) for userId in memberUserIds: await session.execute( text( """ INSERT INTO leaudit_cross_review_task_members (task_id, user_id, member_role) VALUES (:task_id, :user_id, :member_role) """ ), { "task_id": taskId, "user_id": userId, "member_role": "principal" if userId in principalUserIds else "participant", }, ) for documentId in documentIds: await session.execute( text( """ INSERT INTO leaudit_cross_review_task_documents (task_id, document_id, audit_status) VALUES (:task_id, :document_id, 0) """ ), {"task_id": taskId, "document_id": documentId}, ) await session.execute( text( "UPDATE leaudit_documents SET review_scope = 'cross_review' WHERE id = :id" ), {"id": documentId}, ) for documentId in documentIds: try: await self.AuditService.Run( DocumentId=documentId, Force=False, Speed="normal", TriggerUserId=CurrentUserId, ) except Exception as exc: logger.warning( "交叉评查任务创建时触发评查失败: task_id=%s document_id=%s error=%s", taskId, documentId, exc, ) return CrossReviewTaskCreateVO( taskId=taskId, taskName=str(taskRow["task_name"]), memberCount=len(memberUserIds), documentCount=len(documentIds), ) async def GetUserTasks(self, CurrentUserId: int, Body: CrossReviewTaskQueryDTO) -> CrossReviewTaskPageVO: """查询当前用户参与的交叉评查任务。""" async with GetAsyncSession() as session: await self._ensure_tables_ready(session) offset = (Body.page - 1) * Body.pageSize params: dict[str, object] = { "current_user_id": CurrentUserId, "limit": Body.pageSize, "offset": offset, } whereClauses = [ "tm.user_id = :current_user_id", "tm.delete_time IS NULL", "t.delete_time IS NULL", ] if Body.keyword: whereClauses.append("t.task_name ILIKE :keyword") params["keyword"] = f"%{Body.keyword.strip()}%" if Body.status: whereClauses.append("t.status = :status") params["status"] = Body.status if Body.taskType: whereClauses.append("t.task_type = :task_type") params["task_type"] = Body.taskType if Body.docTypeCode: whereClauses.append("t.doc_type_code = :doc_type_code") params["doc_type_code"] = Body.docTypeCode whereSql = " AND ".join(whereClauses) total = int( ( await session.scalar( text( f""" SELECT COUNT(DISTINCT t.id) FROM leaudit_cross_review_tasks t JOIN leaudit_cross_review_task_members tm ON tm.task_id = t.id WHERE {whereSql} """ ), params, ) ) or 0 ) rows = ( await session.execute( text( f""" WITH doc_stats AS ( SELECT td.task_id, COUNT(*)::int AS total_documents, COUNT(*) FILTER (WHERE td.audit_status = 1)::int AS completed_documents FROM leaudit_cross_review_task_documents td WHERE td.delete_time IS NULL GROUP BY td.task_id ), task_regions AS ( SELECT tm.task_id, ARRAY_AGG(DISTINCT u.area ORDER BY u.area) FILTER ( WHERE u.area IS NOT NULL AND u.area != '' ) AS evaluation_regions FROM leaudit_cross_review_task_members tm JOIN sso_users u ON u.id = tm.user_id WHERE tm.delete_time IS NULL GROUP BY tm.task_id ) SELECT t.id AS task_id, t.task_name, t.task_type, t.doc_type_id, t.doc_type_code, t.status, t.create_time, COALESCE(ds.total_documents, 0) AS total_documents, COALESCE(ds.completed_documents, 0) AS completed_documents, COALESCE(tr.evaluation_regions, ARRAY[]::varchar[]) AS evaluation_regions FROM leaudit_cross_review_tasks t JOIN leaudit_cross_review_task_members tm ON tm.task_id = t.id LEFT JOIN doc_stats ds ON ds.task_id = t.id LEFT JOIN task_regions tr ON tr.task_id = t.id WHERE {whereSql} GROUP BY t.id, t.task_name, t.task_type, t.doc_type_id, t.doc_type_code, t.status, t.create_time, ds.total_documents, ds.completed_documents, tr.evaluation_regions ORDER BY t.create_time DESC, t.id DESC LIMIT :limit OFFSET :offset """ ), params, ) ).mappings().all() items = [] for row in rows: totalDocuments = int(row["total_documents"] or 0) completedDocuments = int(row["completed_documents"] or 0) progress = round((completedDocuments / totalDocuments * 100) if totalDocuments > 0 else 0, 2) rawRegions = row.get("evaluation_regions") if rawRegions is None: evaluationRegion: list[str] = [] elif isinstance(rawRegions, list): evaluationRegion = [str(r) for r in rawRegions] else: evaluationRegion = [str(rawRegions)] items.append( CrossReviewTaskItemVO( taskId=int(row["task_id"]), taskName=str(row["task_name"]), taskType=str(row["task_type"]), docTypeId=self._to_int(row.get("doc_type_id")), docTypeCode=row.get("doc_type_code"), status=str(row["status"]), progress=progress, totalDocuments=totalDocuments, completedDocuments=completedDocuments, createdAt=row.get("create_time"), evaluationRegion=evaluationRegion, ) ) return CrossReviewTaskPageVO(total=total, page=Body.page, pageSize=Body.pageSize, items=items) async def GetTaskProgress(self, CurrentUserId: int, TaskId: int) -> CrossReviewTaskProgressVO: """查询任务进度。""" async with GetAsyncSession() as session: await self._ensure_tables_ready(session) await self._ensure_task_member(session, TaskId, CurrentUserId) row = ( await session.execute( text( """ SELECT COUNT(*)::int AS total_documents, COUNT(*) FILTER (WHERE audit_status = 1)::int AS completed_documents FROM leaudit_cross_review_task_documents WHERE task_id = :task_id AND delete_time IS NULL """ ), {"task_id": TaskId}, ) ).mappings().first() totalDocuments = int((row or {}).get("total_documents") or 0) completedDocuments = int((row or {}).get("completed_documents") or 0) progress = round((completedDocuments / totalDocuments * 100) if totalDocuments > 0 else 0, 2) return CrossReviewTaskProgressVO( taskId=TaskId, totalDocuments=totalDocuments, completedDocuments=completedDocuments, progress=progress, ) async def GetTaskDocuments( self, CurrentUserId: int, TaskId: int, Body: CrossReviewTaskDocumentQueryDTO, ) -> CrossReviewTaskDocumentPageVO: """查询任务文档列表。""" async with GetAsyncSession() as session: await self._ensure_tables_ready(session) await self._ensure_task_member(session, TaskId, CurrentUserId) params: dict[str, object] = { "task_id": TaskId, "limit": Body.pageSize, "offset": (Body.page - 1) * Body.pageSize, } baseWhereClauses = [ "td.task_id = :task_id", "td.delete_time IS NULL", "d.deleted_at IS NULL", ] if Body.keyword: baseWhereClauses.append("(d.normalized_name ILIKE :keyword OR CAST(d.biz_document_id AS TEXT) ILIKE :keyword)") params["keyword"] = f"%{Body.keyword.strip()}%" baseWhereSql = " AND ".join(baseWhereClauses) latestWhereSql = f"{baseWhereSql} AND COALESCE(d.is_latest_version, false) = true" total = int( ( await session.scalar( text( f""" SELECT COUNT(*) FROM leaudit_cross_review_task_documents td JOIN leaudit_documents d ON d.id = td.document_id WHERE {latestWhereSql} """ ), params, ) ) or 0 ) rows = ( await session.execute( text( f""" SELECT d.id AS document_id, COALESCE(d.normalized_name, '') AS name, CAST(d.biz_document_id AS TEXT) AS document_number, d.type_id, COALESCE( CASE WHEN LOWER(COALESCE(ar.status, '')) IN ('pending', 'queued', 'running', 'retrying') THEN ar.status ELSE d.processing_status END, d.processing_status, 'waiting' ) AS processing_status, d.version_no, d.is_latest_version, COALESCE(NULLIF(d.version_group_key, ''), CONCAT('root:', COALESCE(d.root_version_id, d.id)::text)) AS version_group_key, COALESCE(vc.total_versions, 1)::int AS total_versions, d.created_at, td.audit_status, COALESCE(dt.name, '') AS type_name, COALESCE(df.file_size, 0) AS file_size, COALESCE(df.file_path, '') AS path, df.file_upload_time AS upload_time, COALESCE(df.file_ext, '') AS file_ext, COALESCE(es.total_evaluation_points, 0) AS total_evaluation_points, COALESCE(es.pass_count, 0) AS pass_count, COALESCE(es.warning_count, 0) AS warning_count, COALESCE(es.error_count, 0) AS error_count, COALESCE(es.manual_count, 0) AS manual_count, COALESCE(es.issue_count, 0) AS issue_count, COALESCE(es.warning_messages, ARRAY[]::text[]) AS warning_messages, COALESCE(es.error_messages, ARRAY[]::text[]) AS error_messages, COALESCE(es.issue_messages, ARRAY[]::text[]) AS issue_messages, COALESCE(es.manual_messages, ARRAY[]::text[]) AS manual_messages, COALESCE(es.final_score, 0) AS final_score, COALESCE(es.full_score, 0) AS full_score, COALESCE(es.score_summary, '') AS score_summary, COALESCE(es.score_percent, 0) AS score_percent FROM leaudit_cross_review_task_documents td JOIN leaudit_documents d ON d.id = td.document_id LEFT JOIN leaudit_audit_runs ar ON ar.id = d.current_run_id LEFT JOIN leaudit_document_types dt ON dt.id = d.type_id LEFT JOIN LATERAL ( SELECT file_size, local_path AS file_path, created_at AS file_upload_time, COALESCE(file_ext, '') AS file_ext FROM leaudit_document_files WHERE document_id = d.id ORDER BY id ASC LIMIT 1 ) df ON TRUE LEFT JOIN ( SELECT COALESCE(NULLIF(d2.version_group_key, ''), CONCAT('root:', COALESCE(d2.root_version_id, d2.id)::text)) AS version_group_key, COUNT(*) AS total_versions FROM leaudit_documents d2 JOIN leaudit_cross_review_task_documents td2 ON td2.document_id = d2.id AND td2.delete_time IS NULL AND td2.task_id = :task_id WHERE d2.deleted_at IS NULL GROUP BY d2.version_group_key , COALESCE(d2.root_version_id, d2.id) ) vc ON vc.version_group_key = COALESCE(NULLIF(d.version_group_key, ''), CONCAT('root:', COALESCE(d.root_version_id, d.id)::text)) LEFT JOIN LATERAL ( SELECT COUNT(*)::int AS total_evaluation_points, COUNT(*) FILTER (WHERE rr.passed IS TRUE)::int AS pass_count, COUNT(*) FILTER (WHERE rr.passed IS FALSE AND rr.risk = 'high')::int AS error_count, COUNT(*) FILTER (WHERE rr.passed IS FALSE AND rr.risk IN ('low', 'medium'))::int AS warning_count, 0::int AS manual_count, COUNT(*) FILTER (WHERE rr.passed IS FALSE)::int AS issue_count, ARRAY_AGG(rr.fail_message ORDER BY rr.id) FILTER ( WHERE rr.passed IS FALSE AND rr.risk = 'high' AND rr.fail_message IS NOT NULL AND rr.fail_message != '' ) AS error_messages, ARRAY_AGG(rr.fail_message ORDER BY rr.id) FILTER ( WHERE rr.passed IS FALSE AND rr.risk IN ('low', 'medium') AND rr.fail_message IS NOT NULL AND rr.fail_message != '' ) AS warning_messages, ARRAY_AGG(rr.fail_message ORDER BY rr.id) FILTER ( WHERE rr.passed IS FALSE AND rr.fail_message IS NOT NULL AND rr.fail_message != '' ) AS issue_messages, ARRAY[]::text[] AS manual_messages, COALESCE(SUM(rr.score) FILTER (WHERE rr.passed IS TRUE), 0) AS final_score, COALESCE(SUM(rr.score), 0) AS full_score, CASE WHEN COALESCE(SUM(rr.score), 0) > 0 THEN CONCAT( ROUND( COALESCE(SUM(rr.score) FILTER (WHERE rr.passed IS TRUE), 0)::numeric, 1 )::text, '/', ROUND(COALESCE(SUM(rr.score), 0)::numeric, 1)::text ) ELSE '0/0' END AS score_summary, CASE WHEN COALESCE(SUM(rr.score), 0) > 0 THEN ROUND( COALESCE(SUM(rr.score) FILTER (WHERE rr.passed IS TRUE), 0) / SUM(rr.score) * 100, 1 ) ELSE 0 END AS score_percent FROM leaudit_rule_results rr WHERE rr.document_id = d.id AND rr.run_id = d.current_run_id ) es ON TRUE LEFT JOIN LATERAL ( SELECT COALESCE(SUM(proposed_score_delta), 0) AS approved_delta FROM leaudit_cross_review_proposals p WHERE p.document_id = d.id AND p.status = 'approved' AND p.delete_time IS NULL ) pd ON TRUE WHERE {latestWhereSql} ORDER BY d.created_at DESC, d.id DESC LIMIT :limit OFFSET :offset """ ), params, ) ).mappings().all() latestDocumentIds = [int(row["document_id"]) for row in rows] historyByGroup: dict[str, list[CrossReviewTaskHistoryVersionVO]] = {} if latestDocumentIds: historyRows = ( await session.execute( text( """ WITH target_groups AS ( SELECT DISTINCT COALESCE(NULLIF(d.version_group_key, ''), CONCAT('root:', COALESCE(d.root_version_id, d.id)::text)) AS version_group_key FROM leaudit_cross_review_task_documents td JOIN leaudit_documents d ON d.id = td.document_id WHERE td.task_id = :task_id AND td.delete_time IS NULL AND d.id = ANY(:document_ids) ) SELECT d.id AS document_id, COALESCE(d.normalized_name, '') AS name, CAST(d.biz_document_id AS TEXT) AS document_number, d.type_id, COALESCE( CASE WHEN LOWER(COALESCE(ar.status, '')) IN ('pending', 'queued', 'running', 'retrying') THEN ar.status ELSE d.processing_status END, d.processing_status, 'waiting' ) AS processing_status, d.version_no, d.is_latest_version, COALESCE(NULLIF(d.version_group_key, ''), CONCAT('root:', COALESCE(d.root_version_id, d.id)::text)) AS version_group_key, td.audit_status, d.created_at, COALESCE(dt.name, '') AS type_name, COALESCE(df.file_size, 0) AS file_size, COALESCE(df.file_path, '') AS path, df.file_upload_time AS upload_time, COALESCE(df.file_ext, '') AS file_ext, COALESCE(es.total_evaluation_points, 0) AS total_evaluation_points, COALESCE(es.pass_count, 0) AS pass_count, COALESCE(es.warning_count, 0) AS warning_count, COALESCE(es.error_count, 0) AS error_count, COALESCE(es.manual_count, 0) AS manual_count, COALESCE(es.issue_count, 0) AS issue_count, COALESCE(es.warning_messages, ARRAY[]::text[]) AS warning_messages, COALESCE(es.error_messages, ARRAY[]::text[]) AS error_messages, COALESCE(es.issue_messages, ARRAY[]::text[]) AS issue_messages, COALESCE(es.manual_messages, ARRAY[]::text[]) AS manual_messages, COALESCE(es.final_score, 0) AS final_score, COALESCE(es.full_score, 0) AS full_score, COALESCE(pd.approved_delta, 0) AS approved_delta FROM leaudit_cross_review_task_documents td JOIN leaudit_documents d ON d.id = td.document_id JOIN target_groups tg ON tg.version_group_key = COALESCE(NULLIF(d.version_group_key, ''), CONCAT('root:', COALESCE(d.root_version_id, d.id)::text)) LEFT JOIN leaudit_audit_runs ar ON ar.id = d.current_run_id LEFT JOIN leaudit_document_types dt ON dt.id = d.type_id LEFT JOIN LATERAL ( SELECT file_size, local_path AS file_path, created_at AS file_upload_time, COALESCE(file_ext, '') AS file_ext FROM leaudit_document_files WHERE document_id = d.id ORDER BY id ASC LIMIT 1 ) df ON TRUE LEFT JOIN LATERAL ( SELECT COUNT(*)::int AS total_evaluation_points, COUNT(*) FILTER (WHERE rr.passed IS TRUE)::int AS pass_count, COUNT(*) FILTER (WHERE rr.passed IS FALSE AND rr.risk = 'high')::int AS error_count, COUNT(*) FILTER (WHERE rr.passed IS FALSE AND rr.risk IN ('low', 'medium'))::int AS warning_count, 0::int AS manual_count, COUNT(*) FILTER (WHERE rr.passed IS FALSE)::int AS issue_count, ARRAY_AGG(rr.fail_message ORDER BY rr.id) FILTER ( WHERE rr.passed IS FALSE AND rr.risk = 'high' AND rr.fail_message IS NOT NULL AND rr.fail_message != '' ) AS error_messages, ARRAY_AGG(rr.fail_message ORDER BY rr.id) FILTER ( WHERE rr.passed IS FALSE AND rr.risk IN ('low', 'medium') AND rr.fail_message IS NOT NULL AND rr.fail_message != '' ) AS warning_messages, ARRAY_AGG(rr.fail_message ORDER BY rr.id) FILTER ( WHERE rr.passed IS FALSE AND rr.fail_message IS NOT NULL AND rr.fail_message != '' ) AS issue_messages, ARRAY[]::text[] AS manual_messages, COALESCE(SUM(rr.score) FILTER (WHERE rr.passed IS TRUE), 0) AS final_score, COALESCE(SUM(rr.score), 0) AS full_score FROM leaudit_rule_results rr WHERE rr.document_id = d.id AND rr.run_id = d.current_run_id ) es ON TRUE LEFT JOIN LATERAL ( SELECT COALESCE(SUM(proposed_score_delta), 0) AS approved_delta FROM leaudit_cross_review_proposals p WHERE p.document_id = d.id AND p.status = 'approved' AND p.delete_time IS NULL ) pd ON TRUE WHERE td.task_id = :task_id AND td.delete_time IS NULL AND d.deleted_at IS NULL ORDER BY COALESCE(NULLIF(d.version_group_key, ''), CONCAT('root:', COALESCE(d.root_version_id, d.id)::text)), d.version_no DESC, d.id DESC """ ).bindparams(bindparam("document_ids", expanding=False)), {"task_id": TaskId, "document_ids": latestDocumentIds}, ) ).mappings().all() groupedRows: dict[str, list[dict]] = {} for row in historyRows: groupedRows.setdefault(str(row.get("version_group_key") or ""), []).append(dict(row)) for groupKey, groupRows in groupedRows.items(): orderedRows = sorted( groupRows, key=lambda item: (int(item.get("version_no") or 1), int(item.get("document_id") or 0)), ) localVersionMap = { int(item["document_id"]): index + 1 for index, item in enumerate(orderedRows) } historyItems: list[CrossReviewTaskHistoryVersionVO] = [] for item in reversed(orderedRows[:-1]): finalScore = float(item.get("final_score") or 0) + float(item.get("approved_delta") or 0) fullScore = float(item.get("full_score") or 0) historyItems.append( CrossReviewTaskHistoryVersionVO( documentId=int(item["document_id"]), name=str(item.get("name") or ""), documentNumber=item.get("document_number"), typeId=self._to_int(item.get("type_id")), typeName=item.get("type_name"), processingStatus=item.get("processing_status"), versionNo=int(localVersionMap.get(int(item["document_id"]), 1)), auditStatus=int(item.get("audit_status") or 0), createdAt=item.get("created_at"), fileSize=int(item.get("file_size") or 0), path=str(item.get("path") or ""), uploadTime=item.get("upload_time"), fileExt=str(item.get("file_ext") or "") or None, totalEvaluationPoints=int(item.get("total_evaluation_points") or 0), passCount=int(item.get("pass_count") or 0), warningCount=int(item.get("warning_count") or 0), errorCount=int(item.get("error_count") or 0), manualCount=int(item.get("manual_count") or 0), issueCount=int(item.get("issue_count") or 0), warningMessages=self._parse_text_array(item.get("warning_messages")), errorMessages=self._parse_text_array(item.get("error_messages")), issueMessages=self._parse_text_array(item.get("issue_messages")), manualMessages=self._parse_text_array(item.get("manual_messages")), finalScore=finalScore, fullScore=fullScore, scoreSummary=self._build_score_summary(finalScore, fullScore), scorePercent=self._build_score_percent(finalScore, fullScore), ) ) historyByGroup[groupKey] = historyItems items: list[CrossReviewTaskDocumentVO] = [] for row in rows: groupKey = str(row.get("version_group_key") or "") historyVersions = historyByGroup.get(groupKey, []) finalScore = float(row.get("final_score") or 0) + float(row.get("approved_delta") or 0) fullScore = float(row.get("full_score") or 0) versionNo = int(row.get("total_versions") or 1) items.append( CrossReviewTaskDocumentVO( documentId=int(row["document_id"]), name=str(row["name"] or ""), documentNumber=row.get("document_number"), typeId=self._to_int(row.get("type_id")), typeName=row.get("type_name"), processingStatus=row.get("processing_status"), versionNo=versionNo, isLatestVersion=True, versionGroupKey=groupKey, totalVersions=int(row.get("total_versions") or 1), auditStatus=int(row.get("audit_status") or 0), createdAt=row.get("created_at"), fileSize=int(row.get("file_size") or 0), path=str(row.get("path") or ""), uploadTime=row.get("upload_time"), fileExt=str(row.get("file_ext") or "") or None, totalEvaluationPoints=int(row.get("total_evaluation_points") or 0), passCount=int(row.get("pass_count") or 0), warningCount=int(row.get("warning_count") or 0), errorCount=int(row.get("error_count") or 0), manualCount=int(row.get("manual_count") or 0), issueCount=int(row.get("issue_count") or 0), warningMessages=self._parse_text_array(row.get("warning_messages")), errorMessages=self._parse_text_array(row.get("error_messages")), issueMessages=self._parse_text_array(row.get("issue_messages")), manualMessages=self._parse_text_array(row.get("manual_messages")), finalScore=finalScore, fullScore=fullScore, scoreSummary=self._build_score_summary(finalScore, fullScore), scorePercent=self._build_score_percent(finalScore, fullScore), historyVersions=historyVersions, ) ) return CrossReviewTaskDocumentPageVO( taskId=TaskId, total=total, page=Body.page, pageSize=Body.pageSize, items=items, ) async def CanConfirmTaskDocument(self, CurrentUserId: int, TaskId: int) -> CrossReviewPermissionVO: """判断当前用户是否有权确认任务文档完成。""" async with GetAsyncSession() as session: await self._ensure_tables_ready(session) row = ( await session.execute( text( """ SELECT t.assigner_id, COALESCE( MAX(CASE WHEN tm.user_id = :current_user_id AND tm.member_role = 'principal' THEN 1 ELSE 0 END), 0 ) AS is_principal FROM leaudit_cross_review_tasks t LEFT JOIN leaudit_cross_review_task_members tm ON tm.task_id = t.id AND tm.delete_time IS NULL WHERE t.id = :task_id AND t.delete_time IS NULL GROUP BY t.assigner_id """ ), {"task_id": TaskId, "current_user_id": CurrentUserId}, ) ).mappings().first() if not row: raise LeauditException(StatusCodeEnum.HTTP_404_NOT_FOUND, "交叉评查任务不存在") if int(row["assigner_id"]) == CurrentUserId: return CrossReviewPermissionVO(canConfirm=True, reason="您是任务创建者,可确认完成") if int(row["is_principal"] or 0) == 1: return CrossReviewPermissionVO(canConfirm=True, reason="您是任务负责人,可确认完成") return CrossReviewPermissionVO(canConfirm=False, reason="您不是任务创建者或负责人") async def CompleteTaskDocument(self, CurrentUserId: int, TaskId: int, DocumentId: int) -> CrossReviewTaskCompleteVO: """确认任务文档完成。""" async with GetAsyncSession() as session: await self._ensure_tables_ready(session) permission = await self.CanConfirmTaskDocument(CurrentUserId, TaskId) if not permission.canConfirm: raise LeauditException(StatusCodeEnum.HTTP_403_FORBIDDEN, permission.reason) await self._reset_transaction_for_write(session) async with session.begin(): mapping = ( await session.execute( text( """ SELECT id, audit_status FROM leaudit_cross_review_task_documents WHERE task_id = :task_id AND document_id = :document_id AND delete_time IS NULL LIMIT 1 """ ), {"task_id": TaskId, "document_id": DocumentId}, ) ).mappings().first() if not mapping: raise LeauditException(StatusCodeEnum.HTTP_404_NOT_FOUND, "任务文档不存在") await session.execute( text( """ UPDATE leaudit_cross_review_task_documents SET audit_status = 1, update_time = NOW() WHERE id = :id """ ), {"id": int(mapping["id"])}, ) stats = ( await session.execute( text( """ SELECT COUNT(*)::int AS total_documents, COUNT(*) FILTER (WHERE audit_status = 1)::int AS completed_documents FROM leaudit_cross_review_task_documents WHERE task_id = :task_id AND delete_time IS NULL """ ), {"task_id": TaskId}, ) ).mappings().first() totalDocuments = int((stats or {}).get("total_documents") or 0) completedDocuments = int((stats or {}).get("completed_documents") or 0) taskCompleted = totalDocuments > 0 and totalDocuments == completedDocuments taskStatus = "completed" if taskCompleted else "in_progress" await session.execute( text( """ UPDATE leaudit_cross_review_tasks SET status = :status, update_time = NOW() WHERE id = :task_id AND delete_time IS NULL """ ), {"task_id": TaskId, "status": taskStatus}, ) return CrossReviewTaskCompleteVO( taskId=TaskId, documentId=DocumentId, auditStatus=1, taskStatus=taskStatus, taskCompleted=taskCompleted, ) async def CreateProposal(self, CurrentUserId: int, Body: CrossReviewProposalCreateDTO) -> CrossReviewProposalCreateVO: """创建交叉评查提案。""" async with GetAsyncSession() as session: await self._ensure_tables_ready(session) proposalType = (Body.proposalType or "review_point").strip().lower() if proposalType not in {"review_point", "supplement"}: raise LeauditException(StatusCodeEnum.HTTP_400_BAD_REQUEST, "proposalType 仅支持 review_point、supplement") if Body.deductionScore == 0: raise LeauditException(StatusCodeEnum.HTTP_400_BAD_REQUEST, "不允许创建 0 分提案") taskId = await self._resolve_task_id_for_document(session, Body.documentId, CurrentUserId) await self._ensure_task_member(session, taskId, CurrentUserId) await self._ensure_task_document(session, taskId, Body.documentId) ruleResult = None reviewPointResultId: int | None = None evaluationPointName = (Body.evaluationPointName or "").strip() extractionResultText = (Body.extractionResultText or "").strip() if proposalType == "review_point": if Body.reviewPointResultId is None: raise LeauditException(StatusCodeEnum.HTTP_400_BAD_REQUEST, "reviewPointResultId 不能为空") reviewPointResultId = int(Body.reviewPointResultId) ruleResult = await self._load_rule_result(session, reviewPointResultId, Body.documentId) if not evaluationPointName: evaluationPointName = str(ruleResult.get("rule_name") or ruleResult.get("rule_id") or "") else: if not evaluationPointName: raise LeauditException(StatusCodeEnum.HTTP_400_BAD_REQUEST, "补充意见必须填写评查点名称") if not extractionResultText: raise LeauditException(StatusCodeEnum.HTTP_400_BAD_REQUEST, "补充意见必须填写抽取结果") duplicateExists = bool( await session.scalar( text( """ SELECT 1 FROM leaudit_cross_review_proposals WHERE task_id = :task_id AND document_id = :document_id AND proposal_type = :proposal_type AND proposer_id = :proposer_id AND status IN ('pending', 'approved') AND delete_time IS NULL AND ( (:proposal_type = 'review_point' AND rule_result_id = :rule_result_id) OR ( :proposal_type = 'supplement' AND COALESCE(evaluation_point_name, '') = :evaluation_point_name AND COALESCE(extraction_result_text, '') = :extraction_result_text ) ) LIMIT 1 """ ), { "task_id": taskId, "document_id": Body.documentId, "proposal_type": proposalType, "rule_result_id": reviewPointResultId, "evaluation_point_name": evaluationPointName, "extraction_result_text": extractionResultText, "proposer_id": CurrentUserId, }, ) ) if duplicateExists: raise LeauditException(StatusCodeEnum.HTTP_400_BAD_REQUEST, "当前内容已存在您的有效提案") if proposalType == "review_point": currentScore, fullScore = await self._calculate_current_score(session, reviewPointResultId, Body.documentId) if Body.deductionScore < 0 and currentScore <= 0: raise LeauditException(StatusCodeEnum.HTTP_400_BAD_REQUEST, "当前分值已为 0,不能继续扣分") if Body.deductionScore > 0 and currentScore >= fullScore: raise LeauditException(StatusCodeEnum.HTTP_400_BAD_REQUEST, "当前分值已满分,不能继续加分") await self._reset_transaction_for_write(session) async with session.begin(): proposalRow = ( await session.execute( text( """ INSERT INTO leaudit_cross_review_proposals ( task_id, document_id, proposal_type, rule_result_id, proposer_id, evaluation_point_name, extraction_result_text, proposed_score_delta, reason, status ) VALUES ( :task_id, :document_id, :proposal_type, :rule_result_id, :proposer_id, :evaluation_point_name, :extraction_result_text, :proposed_score_delta, :reason, 'pending' ) RETURNING id, create_time """ ), { "task_id": taskId, "document_id": Body.documentId, "proposal_type": proposalType, "rule_result_id": reviewPointResultId, "proposer_id": CurrentUserId, "evaluation_point_name": evaluationPointName or None, "extraction_result_text": extractionResultText or None, "proposed_score_delta": Body.deductionScore, "reason": Body.auditOpinion.strip(), }, ) ).mappings().first() if not proposalRow: raise LeauditException(StatusCodeEnum.HTTP_500_INTERNAL_SERVER_ERROR, "创建交叉评查提案失败") proposalId = int(proposalRow["id"]) await session.execute( text( """ INSERT INTO leaudit_cross_review_votes (proposal_id, voter_id, vote_type) VALUES (:proposal_id, :voter_id, 'agree') """ ), {"proposal_id": proposalId, "voter_id": CurrentUserId}, ) await self._refresh_proposal_status(session, proposalId) return CrossReviewProposalCreateVO(proposalId=proposalId, createdAt=proposalRow.get("create_time")) async def VoteProposal( self, CurrentUserId: int, ProposalId: int, Body: CrossReviewProposalVoteDTO, ) -> CrossReviewProposalVoteVO: """对交叉评查提案投票。""" voteType = Body.voteType.strip().lower() if voteType not in {"agree", "disagree", "cancel"}: raise LeauditException(StatusCodeEnum.HTTP_400_BAD_REQUEST, "投票类型仅支持 agree/disagree/cancel") async with GetAsyncSession() as session: await self._ensure_tables_ready(session) proposal = await self._load_proposal(session, ProposalId) await self._ensure_task_member(session, int(proposal["task_id"]), CurrentUserId) if str(proposal["status"]) in {"approved", "rejected", "cancelled"}: raise LeauditException(StatusCodeEnum.HTTP_400_BAD_REQUEST, "当前提案状态不允许继续投票") await self._reset_transaction_for_write(session) async with session.begin(): if voteType == "cancel": deleted = await session.execute( text( """ UPDATE leaudit_cross_review_votes SET delete_time = NOW(), update_time = NOW() WHERE proposal_id = :proposal_id AND voter_id = :voter_id AND delete_time IS NULL """ ), {"proposal_id": ProposalId, "voter_id": CurrentUserId}, ) if deleted.rowcount == 0: raise LeauditException(StatusCodeEnum.HTTP_400_BAD_REQUEST, "当前没有可撤销的投票") else: await session.execute( text( """ UPDATE leaudit_cross_review_votes SET delete_time = NOW(), update_time = NOW() WHERE proposal_id = :proposal_id AND voter_id = :voter_id AND delete_time IS NULL """ ), {"proposal_id": ProposalId, "voter_id": CurrentUserId}, ) await session.execute( text( """ INSERT INTO leaudit_cross_review_votes (proposal_id, voter_id, vote_type) VALUES (:proposal_id, :voter_id, :vote_type) """ ), {"proposal_id": ProposalId, "voter_id": CurrentUserId, "vote_type": voteType}, ) proposalStatus = await self._refresh_proposal_status(session, ProposalId) return CrossReviewProposalVoteVO( proposalId=ProposalId, voterId=CurrentUserId, voteType=voteType, proposalStatus=proposalStatus, ) async def CancelProposal(self, CurrentUserId: int, ProposalId: int) -> CrossReviewProposalCancelVO: """撤销交叉评查提案。""" async with GetAsyncSession() as session: await self._ensure_tables_ready(session) proposal = await self._load_proposal(session, ProposalId) if int(proposal["proposer_id"]) != CurrentUserId: raise LeauditException(StatusCodeEnum.HTTP_403_FORBIDDEN, "仅提案人可撤销提案") if str(proposal["status"]) not in {"pending"}: raise LeauditException(StatusCodeEnum.HTTP_400_BAD_REQUEST, "当前提案状态不允许撤销") await self._reset_transaction_for_write(session) async with session.begin(): await session.execute( text( """ UPDATE leaudit_cross_review_proposals SET status = 'cancelled', update_time = NOW() WHERE id = :proposal_id AND delete_time IS NULL """ ), {"proposal_id": ProposalId}, ) await session.execute( text( """ UPDATE leaudit_cross_review_votes SET delete_time = NOW(), update_time = NOW() WHERE proposal_id = :proposal_id AND delete_time IS NULL """ ), {"proposal_id": ProposalId}, ) return CrossReviewProposalCancelVO(proposalId=ProposalId, status="cancelled") async def GetDocumentProposals( self, CurrentUserId: int, DocumentId: int, Page: int, PageSize: int, ) -> CrossReviewProposalPageVO: """获取文档提案列表。""" async with GetAsyncSession() as session: await self._ensure_tables_ready(session) taskId = await self._resolve_task_id_for_document(session, DocumentId, CurrentUserId) await self._ensure_task_member(session, taskId, CurrentUserId) return await self._build_document_proposals_page(session, CurrentUserId, taskId, DocumentId, Page, PageSize) async def GetDocumentPendingVotes(self, CurrentUserId: int, DocumentId: int) -> CrossReviewPendingVotesVO: """获取文档待投票信息。""" async with GetAsyncSession() as session: await self._ensure_tables_ready(session) taskId = await self._resolve_task_id_for_document(session, DocumentId, CurrentUserId) await self._ensure_task_member(session, taskId, CurrentUserId) proposalPage = await self._build_document_proposals_page(session, CurrentUserId, taskId, DocumentId, 1, 200) pendingProposals = [ CrossReviewPendingProposalVO( evaluationPointName=item.evaluationPointName, pendingVotersNum=len(item.pendingVoters), ) for item in proposalPage.items if item.status == "pending" and len(item.pendingVoters) > 0 ] return CrossReviewPendingVotesVO( hasPendingVotes=len(pendingProposals) > 0, pendingProposals=pendingProposals, ) async def ExportDocumentProposals(self, CurrentUserId: int, DocumentId: int) -> tuple[bytes, str]: """导出文档交叉评查意见 Excel。""" async with GetAsyncSession() as session: await self._ensure_tables_ready(session) taskId = await self._resolve_task_id_for_document(session, DocumentId, CurrentUserId) await self._ensure_task_member(session, taskId, CurrentUserId) documentMeta = await self._load_document_export_meta(session, taskId, DocumentId) proposalPage = await self._build_document_proposals_page(session, CurrentUserId, taskId, DocumentId, 1, 1000) defaultRows: list[dict[str, object]] = [] supplementRows: list[dict[str, object]] = [] voteRows: list[dict[str, object]] = [] taskMembers = await self._load_task_member_names(session, taskId) for item in proposalPage.items: row = self._build_export_summary_row( taskId=taskId, taskName=documentMeta["task_name"], documentId=DocumentId, documentName=documentMeta["document_name"], item=item, ) if item.proposalType == "supplement": supplementRows.append(row) else: defaultRows.append(row) votedNames = {vote.voter for vote in item.votes} for vote in item.votes: voteRows.append( { "任务名称": documentMeta["task_name"], "文档名称": documentMeta["document_name"], "意见分类": "补充意见" if item.proposalType == "supplement" else "默认规则意见", "评查点名称": item.evaluationPointName, "发起人": item.proposer, "投票人": vote.voter, "投票结果": self._translate_vote_type(vote.voteType), "投票时间": "", } ) for _, memberName in taskMembers.items(): if memberName in votedNames: continue voteRows.append( { "任务名称": documentMeta["task_name"], "文档名称": documentMeta["document_name"], "意见分类": "补充意见" if item.proposalType == "supplement" else "默认规则意见", "评查点名称": item.evaluationPointName, "发起人": item.proposer, "投票人": memberName, "投票结果": "未投", "投票时间": "", } ) workbook = self._build_simple_xlsx( sheets=[ ("默认规则意见", list(self._PROPOSAL_EXPORT_HEADERS), defaultRows), ("补充意见", list(self._PROPOSAL_EXPORT_HEADERS), supplementRows), ("投票明细", list(self._VOTE_EXPORT_HEADERS), voteRows), ] ) safeName = quote(f"交叉评查意见_{documentMeta['document_name']}_{datetime.now().strftime('%Y%m%d_%H%M%S')}.xlsx") return workbook, safeName async def UploadTaskDocument( self, CurrentUserId: int, TaskId: int, FileName: str, FileContent: bytes, ContentType: str | None, TypeId: int | None = None, GroupId: int | None = None, ) -> CrossReviewTaskDocumentUploadVO: """向交叉评查任务补传文档。""" async with GetAsyncSession() as session: await self._ensure_tables_ready(session) permission = await self.CanConfirmTaskDocument(CurrentUserId, TaskId) if not permission.canConfirm: raise LeauditException(StatusCodeEnum.HTTP_403_FORBIDDEN, permission.reason) taskMeta = ( await session.execute( text( """ SELECT id, doc_type_id, doc_type_code FROM leaudit_cross_review_tasks WHERE id = :task_id AND delete_time IS NULL LIMIT 1 """ ), {"task_id": TaskId}, ) ).mappings().first() if not taskMeta: raise LeauditException(StatusCodeEnum.HTTP_404_NOT_FOUND, "交叉评查任务不存在") resolvedTypeId = int(TypeId) if TypeId is not None else self._to_int(taskMeta.get("doc_type_id")) resolvedGroupId = int(GroupId) if GroupId is not None else None uploadResult = await self.DocumentService.Upload( FileName=FileName, FileContent=FileContent, ContentType=ContentType, TypeId=resolvedTypeId, TypeCode=None if resolvedTypeId is not None else taskMeta.get("doc_type_code"), GroupId=resolvedGroupId, CreatedBy=CurrentUserId, AutoRun=True, ReviewScope="cross_review", ) async with GetAsyncSession() as session: await self._ensure_tables_ready(session) await self._reset_transaction_for_write(session) async with session.begin(): exists = bool( await session.scalar( text( """ SELECT 1 FROM leaudit_cross_review_task_documents WHERE task_id = :task_id AND document_id = :document_id AND delete_time IS NULL LIMIT 1 """ ), {"task_id": TaskId, "document_id": uploadResult.documentId}, ) ) if not exists: await session.execute( text( """ INSERT INTO leaudit_cross_review_task_documents (task_id, document_id, audit_status) VALUES (:task_id, :document_id, 0) """ ), {"task_id": TaskId, "document_id": uploadResult.documentId}, ) await session.execute( text( """ UPDATE leaudit_cross_review_tasks SET status = 'in_progress', update_time = NOW() WHERE id = :task_id AND delete_time IS NULL """ ), {"task_id": TaskId}, ) return CrossReviewTaskDocumentUploadVO( taskId=TaskId, documentId=uploadResult.documentId, auditStatus=0, processingStatus=uploadResult.processingStatus, ) async def AppendTaskDocumentAttachments( self, CurrentUserId: int, TaskId: int, DocumentId: int, Files: list[tuple[str, bytes, str | None]], Remark: str | None = None, ) -> CrossReviewTaskDocumentAppendVO: """为交叉评查任务文档追加附件,并生成同版本链新版本。""" async with GetAsyncSession() as session: await self._ensure_tables_ready(session) permission = await self.CanConfirmTaskDocument(CurrentUserId, TaskId) if not permission.canConfirm: raise LeauditException(StatusCodeEnum.HTTP_403_FORBIDDEN, permission.reason) await self._ensure_task_document(session, TaskId, DocumentId) appendResult = await self.DocumentService.AppendAttachments( CurrentUserId=CurrentUserId, Id=DocumentId, Files=Files, MergeMode="new", Remark=Remark, ) async with GetAsyncSession() as session: await self._ensure_tables_ready(session) await self._reset_transaction_for_write(session) async with session.begin(): exists = bool( await session.scalar( text( """ SELECT 1 FROM leaudit_cross_review_task_documents WHERE task_id = :task_id AND document_id = :document_id AND delete_time IS NULL LIMIT 1 """ ), {"task_id": TaskId, "document_id": appendResult.documentId}, ) ) if not exists: await session.execute( text( """ INSERT INTO leaudit_cross_review_task_documents (task_id, document_id, audit_status) VALUES (:task_id, :document_id, 0) """ ), {"task_id": TaskId, "document_id": appendResult.documentId}, ) await session.execute( text( """ UPDATE leaudit_cross_review_tasks SET status = 'in_progress', update_time = NOW() WHERE id = :task_id AND delete_time IS NULL """ ), {"task_id": TaskId}, ) return CrossReviewTaskDocumentAppendVO( taskId=TaskId, originalDocumentId=DocumentId, documentId=appendResult.documentId, versionNo=appendResult.versionNo, versionGroupKey=appendResult.versionGroupKey, auditStatus=0, processingStatus=appendResult.processingStatus, ) async def _build_document_proposals_page( self, session, CurrentUserId: int, TaskId: int, DocumentId: int, Page: int, PageSize: int, ) -> CrossReviewProposalPageVO: params = { "task_id": TaskId, "document_id": DocumentId, "limit": PageSize, "offset": (Page - 1) * PageSize, } total = int( ( await session.scalar( text( """ SELECT COUNT(*) FROM leaudit_cross_review_proposals p WHERE p.task_id = :task_id AND p.document_id = :document_id AND p.delete_time IS NULL """ ), params, ) ) or 0 ) proposalRows = ( await session.execute( text( """ SELECT p.id, p.task_id, p.document_id, p.proposal_type, p.rule_result_id, p.proposer_id, p.evaluation_point_name, p.extraction_result_text, p.proposed_score_delta, p.reason, p.status, p.create_time, rr.rule_name, rr.fail_message, COALESCE(u.nick_name, u.username, '') AS proposer_name FROM leaudit_cross_review_proposals p LEFT JOIN leaudit_rule_results rr ON rr.id = p.rule_result_id LEFT JOIN sso_users u ON u.id = p.proposer_id WHERE p.task_id = :task_id AND p.document_id = :document_id AND p.delete_time IS NULL ORDER BY p.id DESC LIMIT :limit OFFSET :offset """ ), params, ) ).mappings().all() taskMembers = await self._load_task_member_names(session, TaskId) proposalIds = [int(row["id"]) for row in proposalRows] votesMap = await self._load_votes_map(session, proposalIds) items: list[CrossReviewProposalItemVO] = [] for row in proposalRows: proposalId = int(row["id"]) voteRows = votesMap.get(proposalId, []) agreeVoters = [str(vote["voter_name"]) for vote in voteRows if vote["vote_type"] == "agree"] disagreeVoters = [str(vote["voter_name"]) for vote in voteRows if vote["vote_type"] == "disagree"] votedUserIds = {int(vote["voter_id"]) for vote in voteRows} pendingVoters = [name for userId, name in taskMembers.items() if userId not in votedUserIds] canVote = CurrentUserId in taskMembers and CurrentUserId not in votedUserIds and str(row["status"]) == "pending" items.append( CrossReviewProposalItemVO( proposalId=proposalId, proposalType=str(row.get("proposal_type") or "review_point"), reviewPointResultId=self._to_int(row.get("rule_result_id")), evaluationPointName=str(row.get("evaluation_point_name") or row.get("rule_name") or ""), extractionResultText=str(row.get("extraction_result_text") or ""), proposedScore=float(row["proposed_score_delta"] or 0), reason=str(row.get("reason") or ""), proposer=str(row.get("proposer_name") or ""), votes=[ CrossReviewProposalVoteItemVO( voter=str(vote["voter_name"]), voteType=str(vote["vote_type"]), ) for vote in voteRows ], agreeVoters=agreeVoters, disagreeVoters=disagreeVoters, pendingVoters=pendingVoters, canVote=canVote, problemMessage=str(row.get("fail_message") or row.get("extraction_result_text") or row.get("rule_name") or ""), proposerId=int(row["proposer_id"]), createdAt=row.get("create_time"), status=str(row.get("status") or "pending"), ) ) return CrossReviewProposalPageVO(total=total, page=Page, pageSize=PageSize, items=items) async def _resolve_task_id_for_document(self, session, documentId: int, userId: int) -> int: row = ( await session.execute( text( """ SELECT td.task_id FROM leaudit_cross_review_task_documents td JOIN leaudit_cross_review_task_members tm ON tm.task_id = td.task_id JOIN leaudit_cross_review_tasks t ON t.id = td.task_id WHERE td.document_id = :document_id AND td.delete_time IS NULL AND tm.user_id = :user_id AND tm.delete_time IS NULL AND t.delete_time IS NULL ORDER BY td.task_id DESC LIMIT 1 """ ), {"document_id": documentId, "user_id": userId}, ) ).mappings().first() if not row: raise LeauditException(StatusCodeEnum.HTTP_404_NOT_FOUND, "当前文档不在您的交叉评查任务中") return int(row["task_id"]) async def _ensure_task_document(self, session, taskId: int, documentId: int) -> None: exists = bool( await session.scalar( text( """ SELECT 1 FROM leaudit_cross_review_task_documents WHERE task_id = :task_id AND document_id = :document_id AND delete_time IS NULL LIMIT 1 """ ), {"task_id": taskId, "document_id": documentId}, ) ) if not exists: raise LeauditException(StatusCodeEnum.HTTP_404_NOT_FOUND, "当前文档不在交叉评查任务内") async def _load_rule_result(self, session, ruleResultId: int, documentId: int): row = ( await session.execute( text( """ SELECT id, document_id, rule_id, rule_name, score, passed FROM leaudit_rule_results WHERE id = :rule_result_id AND document_id = :document_id LIMIT 1 """ ), {"rule_result_id": ruleResultId, "document_id": documentId}, ) ).mappings().first() if not row: raise LeauditException(StatusCodeEnum.HTTP_404_NOT_FOUND, "对应评查点结果不存在") return row async def _calculate_current_score(self, session, ruleResultId: int, documentId: int) -> tuple[float, float]: row = ( await session.execute( text( """ WITH approved_delta AS ( SELECT COALESCE(SUM(proposed_score_delta), 0) AS delta FROM leaudit_cross_review_proposals WHERE rule_result_id = :rule_result_id AND document_id = :document_id AND status = 'approved' AND delete_time IS NULL ) SELECT COALESCE(rr.score, 0) AS full_score, CASE WHEN a.edit_audit_status = 1 THEN CASE WHEN a.override_result IS TRUE THEN COALESCE(rr.score, 0) ELSE 0 END WHEN rr.passed IS TRUE THEN COALESCE(rr.score, 0) ELSE 0 END + COALESCE(ad.delta, 0) AS current_score FROM leaudit_rule_results rr LEFT JOIN leaudit_review_point_audits a ON a.rule_result_id = rr.id CROSS JOIN approved_delta ad WHERE rr.id = :rule_result_id AND rr.document_id = :document_id LIMIT 1 """ ), {"rule_result_id": ruleResultId, "document_id": documentId}, ) ).mappings().first() if not row: raise LeauditException(StatusCodeEnum.HTTP_404_NOT_FOUND, "对应评查点结果不存在") return float(row.get("current_score") or 0), float(row.get("full_score") or 0) async def _load_proposal(self, session, proposalId: int): row = ( await session.execute( text( """ SELECT id, task_id, document_id, proposer_id, status FROM leaudit_cross_review_proposals WHERE id = :proposal_id AND delete_time IS NULL LIMIT 1 """ ), {"proposal_id": proposalId}, ) ).mappings().first() if not row: raise LeauditException(StatusCodeEnum.HTTP_404_NOT_FOUND, "交叉评查提案不存在") return row async def _load_document_export_meta(self, session, taskId: int, documentId: int) -> dict[str, str]: row = ( await session.execute( text( """ SELECT t.task_name, COALESCE(f.file_name, d.normalized_name, CAST(d.id AS TEXT)) AS document_name FROM leaudit_cross_review_tasks t JOIN leaudit_documents d ON d.id = :document_id LEFT JOIN leaudit_document_files f ON f.document_id = d.id AND f.is_active = true AND f.deleted_at IS NULL AND f.file_role IN ('original', 'primary') WHERE t.id = :task_id AND t.delete_time IS NULL AND d.deleted_at IS NULL LIMIT 1 """ ), {"task_id": taskId, "document_id": documentId}, ) ).mappings().first() if not row: raise LeauditException(StatusCodeEnum.HTTP_404_NOT_FOUND, "导出目标不存在") return { "task_name": str(row.get("task_name") or f"任务{taskId}"), "document_name": str(row.get("document_name") or f"文档{documentId}"), } async def _load_task_member_names(self, session, taskId: int) -> dict[int, str]: rows = ( await session.execute( text( """ SELECT tm.user_id, COALESCE(u.nick_name, u.username, CAST(tm.user_id AS TEXT)) AS display_name FROM leaudit_cross_review_task_members tm LEFT JOIN sso_users u ON u.id = tm.user_id WHERE tm.task_id = :task_id AND tm.delete_time IS NULL ORDER BY tm.id ASC """ ), {"task_id": taskId}, ) ).mappings().all() return {int(row["user_id"]): str(row.get("display_name") or row["user_id"]) for row in rows} async def _load_votes_map(self, session, proposalIds: list[int]) -> dict[int, list[dict[str, object]]]: if not proposalIds: return {} rows = ( await session.execute( text( """ SELECT v.proposal_id, v.voter_id, v.vote_type, COALESCE(u.nick_name, u.username, CAST(v.voter_id AS TEXT)) AS voter_name FROM leaudit_cross_review_votes v LEFT JOIN sso_users u ON u.id = v.voter_id WHERE v.proposal_id IN :proposal_ids AND v.delete_time IS NULL ORDER BY v.id ASC """ ).bindparams(bindparam("proposal_ids", expanding=True)), {"proposal_ids": proposalIds}, ) ).mappings().all() result: dict[int, list[dict[str, object]]] = {} for row in rows: result.setdefault(int(row["proposal_id"]), []).append(dict(row)) return result async def _refresh_proposal_status(self, session, proposalId: int) -> str: row = ( await session.execute( text( """ SELECT p.id, p.task_id, p.status FROM leaudit_cross_review_proposals p WHERE p.id = :proposal_id AND p.delete_time IS NULL LIMIT 1 """ ), {"proposal_id": proposalId}, ) ).mappings().first() if not row: raise LeauditException(StatusCodeEnum.HTTP_404_NOT_FOUND, "交叉评查提案不存在") currentStatus = str(row["status"]) if currentStatus == "cancelled": return currentStatus taskId = int(row["task_id"]) memberCount = int( ( await session.scalar( text( """ SELECT COUNT(*) FROM leaudit_cross_review_task_members WHERE task_id = :task_id AND delete_time IS NULL """ ), {"task_id": taskId}, ) ) or 0 ) threshold = floor(memberCount / 2) + 1 if memberCount > 0 else 1 voteStats = ( await session.execute( text( """ SELECT COUNT(*) FILTER (WHERE vote_type = 'agree')::int AS agree_count, COUNT(*) FILTER (WHERE vote_type = 'disagree')::int AS disagree_count FROM leaudit_cross_review_votes WHERE proposal_id = :proposal_id AND delete_time IS NULL """ ), {"proposal_id": proposalId}, ) ).mappings().first() agreeCount = int((voteStats or {}).get("agree_count") or 0) disagreeCount = int((voteStats or {}).get("disagree_count") or 0) remaining = max(memberCount - agreeCount - disagreeCount, 0) if agreeCount >= threshold: nextStatus = "approved" elif disagreeCount >= threshold: nextStatus = "rejected" elif agreeCount + remaining < threshold: nextStatus = "rejected" else: nextStatus = "pending" if nextStatus != currentStatus: await session.execute( text( """ UPDATE leaudit_cross_review_proposals SET status = :status, update_time = NOW() WHERE id = :proposal_id """ ), {"proposal_id": proposalId, "status": nextStatus}, ) return nextStatus async def _ensure_tables_ready(self, session) -> None: """确保第一阶段所需表已初始化。""" required = [ "leaudit_cross_review_tasks", "leaudit_cross_review_task_members", "leaudit_cross_review_task_documents", "leaudit_cross_review_proposals", "leaudit_cross_review_votes", ] missing_tables: list[str] = [] for tableName in required: exists = bool( await session.scalar( text( """ SELECT EXISTS ( SELECT 1 FROM information_schema.tables WHERE table_schema = current_schema() AND table_name = :table_name ) """ ), {"table_name": tableName}, ) ) if not exists: missing_tables.append(tableName) if missing_tables: for statement in self._SCHEMA_BOOTSTRAP_STATEMENTS: await session.execute(text(statement)) await session.commit() for tableName in required: exists = bool( await session.scalar( text( """ SELECT EXISTS ( SELECT 1 FROM information_schema.tables WHERE table_schema = current_schema() AND table_name = :table_name ) """ ), {"table_name": tableName}, ) ) if not exists: raise LeauditException(StatusCodeEnum.HTTP_500_INTERNAL_SERVER_ERROR, f"交叉评查表未初始化: {tableName}") proposalColumns = await self._load_table_columns(session, "leaudit_cross_review_proposals") if "proposal_type" not in proposalColumns: await session.execute( text("ALTER TABLE leaudit_cross_review_proposals ADD COLUMN proposal_type VARCHAR(32) NOT NULL DEFAULT 'review_point'") ) if "evaluation_point_name" not in proposalColumns: await session.execute( text("ALTER TABLE leaudit_cross_review_proposals ADD COLUMN evaluation_point_name VARCHAR(255)") ) if "extraction_result_text" not in proposalColumns: await session.execute( text("ALTER TABLE leaudit_cross_review_proposals ADD COLUMN extraction_result_text TEXT") ) if proposalColumns: isRuleResultNotNull = bool( await session.scalar( text( """ SELECT 1 FROM information_schema.columns WHERE table_schema = current_schema() AND table_name = 'leaudit_cross_review_proposals' AND column_name = 'rule_result_id' AND is_nullable = 'NO' LIMIT 1 """ ) ) ) if isRuleResultNotNull: await session.execute( text("ALTER TABLE leaudit_cross_review_proposals ALTER COLUMN rule_result_id DROP NOT NULL") ) await session.commit() async def _ensure_task_member(self, session, taskId: int, userId: int) -> None: """校验当前用户是否是任务成员。""" exists = bool( await session.scalar( text( """ SELECT 1 FROM leaudit_cross_review_task_members tm JOIN leaudit_cross_review_tasks t ON t.id = tm.task_id WHERE tm.task_id = :task_id AND tm.user_id = :user_id AND tm.delete_time IS NULL AND t.delete_time IS NULL LIMIT 1 """ ), {"task_id": taskId, "user_id": userId}, ) ) if not exists: raise LeauditException(StatusCodeEnum.HTTP_403_FORBIDDEN, "当前用户不是交叉评查任务成员") async def _reset_transaction_for_write(self, session) -> None: """显式写事务前清理查询阶段开启的隐式事务。""" if session.in_transaction(): await session.rollback() def _unique_int_list(self, values: list[int]) -> list[int]: """去重并保留原顺序。""" seen: set[int] = set() result: list[int] = [] for value in values: intValue = int(value) if intValue in seen: continue seen.add(intValue) result.append(intValue) return result def _to_int(self, value) -> int | None: """安全转 int。""" if value is None: return None return int(value) def _parse_text_array(self, value) -> list[str]: """安全解析 PostgreSQL text[] 为字符串列表。""" if value is None: return [] if isinstance(value, list): return [str(v) for v in value] return [str(value)] def _build_score_summary(self, finalScore: float, fullScore: float) -> str: if fullScore <= 0: return "0/0" return f"{round(finalScore, 1)}/{round(fullScore, 1)}" def _build_score_percent(self, finalScore: float, fullScore: float) -> float: if fullScore <= 0: return 0.0 return round(finalScore / fullScore * 100, 1) def _build_export_summary_row( self, taskId: int, taskName: str, documentId: int, documentName: str, item: CrossReviewProposalItemVO, ) -> dict[str, object]: return { "任务名称": taskName, "文档名称": documentName, "意见分类": "补充意见" if item.proposalType == "supplement" else "默认规则意见", "评查点名称": item.evaluationPointName, "问题依据/命中内容": item.extractionResultText or item.problemMessage or "", "原问题说明": item.problemMessage or "", "评查意见": item.reason, "分数调整": item.proposedScore, "提案状态": self._translate_proposal_status(item.status), "发起人": item.proposer, "提出时间": self._format_datetime(item.createdAt), "同意人数": len(item.agreeVoters), "同意人员": "、".join(item.agreeVoters), "不同意人数": len(item.disagreeVoters), "不同意人员": "、".join(item.disagreeVoters), "未投人数": len(item.pendingVoters), "未投人员": "、".join(item.pendingVoters), } def _translate_vote_type(self, voteType: str) -> str: return { "agree": "同意", "disagree": "不同意", "cancel": "撤销", }.get(voteType, voteType or "") def _translate_proposal_status(self, status: str) -> str: return { "pending": "待定", "approved": "已通过", "rejected": "已驳回", "cancelled": "已撤销", }.get(status, status or "") def _format_datetime(self, value) -> str: if value is None: return "" if isinstance(value, datetime): return value.strftime("%Y-%m-%d %H:%M:%S") return str(value) def _build_simple_xlsx(self, sheets: list[tuple[str, list[str], list[dict[str, object]]]]) -> bytes: normalized_sheets: list[tuple[str, list[str], list[list[str]]]] = [] for sheetName, headers, rows in sheets: tableRows = [[self._excel_cell_text(row.get(header)) for header in headers] for row in rows] normalized_sheets.append((sheetName, headers, tableRows)) sharedStrings: list[str] = [] stringIndex: dict[str, int] = {} def intern(value: str) -> int: if value in stringIndex: return stringIndex[value] idx = len(sharedStrings) stringIndex[value] = idx sharedStrings.append(value) return idx for _, headers, rows in normalized_sheets: for header in headers: intern(header) for row in rows: for cell in row: intern(cell) buffer = io.BytesIO() with ZipFile(buffer, "w", ZIP_DEFLATED) as zf: zf.writestr("[Content_Types].xml", self._xlsx_content_types(len(normalized_sheets))) zf.writestr("_rels/.rels", self._xlsx_root_rels()) zf.writestr("xl/workbook.xml", self._xlsx_workbook([sheet[0] for sheet in normalized_sheets])) zf.writestr("xl/_rels/workbook.xml.rels", self._xlsx_workbook_rels(len(normalized_sheets))) zf.writestr("xl/styles.xml", self._xlsx_styles()) zf.writestr("xl/sharedStrings.xml", self._xlsx_shared_strings(sharedStrings)) for index, (_, headers, rows) in enumerate(normalized_sheets, start=1): zf.writestr(f"xl/worksheets/sheet{index}.xml", self._xlsx_sheet(headers, rows, intern)) zf.writestr("docProps/core.xml", self._xlsx_core_props()) zf.writestr("docProps/app.xml", self._xlsx_app_props([sheet[0] for sheet in normalized_sheets])) return buffer.getvalue() def _excel_cell_text(self, value: object) -> str: if value is None: return "" if isinstance(value, float): return str(round(value, 2)) return str(value) def _xlsx_content_types(self, sheetCount: int) -> str: overrides = [ '', '', '', '', '', ] overrides.extend( f'' for index in range(1, sheetCount + 1) ) return ( '' '' '' '' + "".join(overrides) + "" ) def _xlsx_root_rels(self) -> str: return ( '' '' '' '' '' "" ) def _xlsx_workbook(self, sheetNames: list[str]) -> str: sheets = "".join( f'' for index, name in enumerate(sheetNames, start=1) ) return ( '' '' f"{sheets}" "" ) def _xlsx_workbook_rels(self, sheetCount: int) -> str: relationships = [] for index in range(1, sheetCount + 1): relationships.append( f'' ) relationships.append( f'' ) relationships.append( f'' ) return ( '' '' + "".join(relationships) + "" ) def _xlsx_styles(self) -> str: return ( '' '' '' '' '' '' '' '' '' '' '' '' '' '' '' '' '' '' '' '' '' '' '' '' '' '' '' '' '' '' ) def _xlsx_shared_strings(self, values: list[str]) -> str: items = "".join(f"{escape(value)}" for value in values) return ( '' f'{items}' ) def _xlsx_sheet(self, headers: list[str], rows: list[list[str]], intern) -> str: all_rows: list[list[str]] = [] if headers: all_rows.append(headers) all_rows.extend(rows) sheet_rows = [] for rowIndex, row in enumerate(all_rows, start=1): cells = [] styleIndex = "1" if headers and rowIndex == 1 else "0" for colIndex, value in enumerate(row, start=1): colRef = self._xlsx_column_name(colIndex) cells.append(f'{intern(value)}') sheet_rows.append(f'{"".join(cells)}') return ( '' '' f'{"".join(sheet_rows)}' '' ) def _xlsx_column_name(self, columnIndex: int) -> str: result = "" current = columnIndex while current > 0: current, remainder = divmod(current - 1, 26) result = chr(65 + remainder) + result return result def _xlsx_core_props(self) -> str: now = datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%SZ") return ( '' '' 'LeAudit' 'LeAudit' f'{now}' f'{now}' '' ) def _xlsx_app_props(self, sheetNames: list[str]) -> str: parts = "".join(f"{escape(name)}" for name in sheetNames) return ( '' '' 'LeAudit' f'{parts}' '' ) async def _load_table_columns(self, session, tableName: str) -> set[str]: rows = ( await session.execute( text( """ SELECT column_name FROM information_schema.columns WHERE table_schema = current_schema() AND table_name = :table_name """ ), {"table_name": tableName}, ) ).mappings().all() return {str(row["column_name"]) for row in rows if row.get("column_name")}