Files
leaudit-platform-backend/fastapi_modules/fastapi_leaudit/services/impl/crossReviewServiceImpl.py
T

1370 lines
58 KiB
Python

"""交叉评查服务实现(第一阶段骨架)。"""
from __future__ import annotations
from math import floor
from sqlalchemy import bindparam, text
from fastapi_common.fastapi_common_sqlalchemy.database import GetAsyncSession
from fastapi_common.fastapi_common_web.domain.responses import StatusCodeEnum
from fastapi_common.fastapi_common_web.exception.LeauditException import LeauditException
from fastapi_modules.fastapi_leaudit.domian.Dto.crossReviewDto import (
CrossReviewProposalCreateDTO,
CrossReviewProposalVoteDTO,
CrossReviewTaskCreateDTO,
CrossReviewTaskDocumentQueryDTO,
CrossReviewTaskQueryDTO,
)
from fastapi_modules.fastapi_leaudit.domian.vo.crossReviewVo import (
CrossReviewPendingProposalVO,
CrossReviewPendingVotesVO,
CrossReviewPermissionVO,
CrossReviewProposalCancelVO,
CrossReviewProposalCreateVO,
CrossReviewProposalItemVO,
CrossReviewProposalPageVO,
CrossReviewProposalVoteItemVO,
CrossReviewProposalVoteVO,
CrossReviewTaskCompleteVO,
CrossReviewTaskCreateVO,
CrossReviewTaskDocumentPageVO,
CrossReviewTaskDocumentUploadVO,
CrossReviewTaskDocumentVO,
CrossReviewTaskItemVO,
CrossReviewTaskPageVO,
CrossReviewTaskProgressVO,
)
from fastapi_modules.fastapi_leaudit.services.crossReviewService import ICrossReviewService
from fastapi_modules.fastapi_leaudit.services.documentService import IDocumentService
from fastapi_modules.fastapi_leaudit.services.impl.documentServiceImpl import DocumentServiceImpl
class CrossReviewServiceImpl(ICrossReviewService):
"""交叉评查服务实现。"""
_SCHEMA_BOOTSTRAP_STATEMENTS: tuple[str, ...] = (
"""
CREATE TABLE IF NOT EXISTS leaudit_cross_review_tasks (
id BIGINT GENERATED ALWAYS AS IDENTITY PRIMARY KEY,
task_name VARCHAR(255) NOT NULL,
task_type VARCHAR(32) NOT NULL,
doc_type_id BIGINT,
doc_type_code VARCHAR(64),
assigner_id BIGINT NOT NULL,
status VARCHAR(32) NOT NULL DEFAULT 'in_progress',
create_time TIMESTAMPTZ NOT NULL DEFAULT NOW(),
update_time TIMESTAMPTZ NOT NULL DEFAULT NOW(),
delete_time TIMESTAMPTZ
)
""",
"CREATE INDEX IF NOT EXISTS idx_lcr_tasks_assigner_id ON leaudit_cross_review_tasks (assigner_id)",
"CREATE INDEX IF NOT EXISTS idx_lcr_tasks_status ON leaudit_cross_review_tasks (status)",
"CREATE INDEX IF NOT EXISTS idx_lcr_tasks_doc_type_id ON leaudit_cross_review_tasks (doc_type_id)",
"""
CREATE TABLE IF NOT EXISTS leaudit_cross_review_task_members (
id BIGINT GENERATED ALWAYS AS IDENTITY PRIMARY KEY,
task_id BIGINT NOT NULL,
user_id BIGINT NOT NULL,
member_role VARCHAR(32) NOT NULL DEFAULT 'participant',
create_time TIMESTAMPTZ NOT NULL DEFAULT NOW(),
update_time TIMESTAMPTZ NOT NULL DEFAULT NOW(),
delete_time TIMESTAMPTZ
)
""",
"CREATE INDEX IF NOT EXISTS idx_lcr_task_members_task_id ON leaudit_cross_review_task_members (task_id)",
"CREATE INDEX IF NOT EXISTS idx_lcr_task_members_user_id ON leaudit_cross_review_task_members (user_id)",
"CREATE INDEX IF NOT EXISTS idx_lcr_task_members_role ON leaudit_cross_review_task_members (member_role)",
"""
CREATE UNIQUE INDEX IF NOT EXISTS uq_lcr_task_members_task_user_active
ON leaudit_cross_review_task_members (task_id, user_id)
WHERE delete_time IS NULL
""",
"""
CREATE TABLE IF NOT EXISTS leaudit_cross_review_task_documents (
id BIGINT GENERATED ALWAYS AS IDENTITY PRIMARY KEY,
task_id BIGINT NOT NULL,
document_id BIGINT NOT NULL,
audit_status INTEGER NOT NULL DEFAULT 0,
create_time TIMESTAMPTZ NOT NULL DEFAULT NOW(),
update_time TIMESTAMPTZ NOT NULL DEFAULT NOW(),
delete_time TIMESTAMPTZ
)
""",
"CREATE INDEX IF NOT EXISTS idx_lcr_task_documents_task_id ON leaudit_cross_review_task_documents (task_id)",
"CREATE INDEX IF NOT EXISTS idx_lcr_task_documents_document_id ON leaudit_cross_review_task_documents (document_id)",
"CREATE INDEX IF NOT EXISTS idx_lcr_task_documents_task_status ON leaudit_cross_review_task_documents (task_id, audit_status)",
"""
CREATE UNIQUE INDEX IF NOT EXISTS uq_lcr_task_documents_task_document_active
ON leaudit_cross_review_task_documents (task_id, document_id)
WHERE delete_time IS NULL
""",
"""
CREATE TABLE IF NOT EXISTS leaudit_cross_review_proposals (
id BIGINT GENERATED ALWAYS AS IDENTITY PRIMARY KEY,
task_id BIGINT NOT NULL,
document_id BIGINT NOT NULL,
rule_result_id BIGINT NOT NULL,
proposer_id BIGINT NOT NULL,
proposed_score_delta NUMERIC(10, 2) NOT NULL,
reason TEXT NOT NULL,
status VARCHAR(32) NOT NULL DEFAULT 'pending',
create_time TIMESTAMPTZ NOT NULL DEFAULT NOW(),
update_time TIMESTAMPTZ NOT NULL DEFAULT NOW(),
delete_time TIMESTAMPTZ
)
""",
"CREATE INDEX IF NOT EXISTS idx_lcr_proposals_task_id ON leaudit_cross_review_proposals (task_id)",
"CREATE INDEX IF NOT EXISTS idx_lcr_proposals_document_id ON leaudit_cross_review_proposals (document_id)",
"CREATE INDEX IF NOT EXISTS idx_lcr_proposals_rule_result_id ON leaudit_cross_review_proposals (rule_result_id)",
"CREATE INDEX IF NOT EXISTS idx_lcr_proposals_proposer_id ON leaudit_cross_review_proposals (proposer_id)",
"CREATE INDEX IF NOT EXISTS idx_lcr_proposals_status ON leaudit_cross_review_proposals (status)",
"""
CREATE TABLE IF NOT EXISTS leaudit_cross_review_votes (
id BIGINT GENERATED ALWAYS AS IDENTITY PRIMARY KEY,
proposal_id BIGINT NOT NULL,
voter_id BIGINT NOT NULL,
vote_type VARCHAR(16) NOT NULL,
create_time TIMESTAMPTZ NOT NULL DEFAULT NOW(),
update_time TIMESTAMPTZ NOT NULL DEFAULT NOW(),
delete_time TIMESTAMPTZ
)
""",
"CREATE INDEX IF NOT EXISTS idx_lcr_votes_proposal_id ON leaudit_cross_review_votes (proposal_id)",
"CREATE INDEX IF NOT EXISTS idx_lcr_votes_voter_id ON leaudit_cross_review_votes (voter_id)",
"""
CREATE UNIQUE INDEX IF NOT EXISTS uq_lcr_votes_proposal_voter_active
ON leaudit_cross_review_votes (proposal_id, voter_id)
WHERE delete_time IS NULL
""",
)
def __init__(self):
self.DocumentService: IDocumentService = DocumentServiceImpl()
async def CreateTask(self, CurrentUserId: int, Body: CrossReviewTaskCreateDTO) -> CrossReviewTaskCreateVO:
"""创建交叉评查任务。"""
async with GetAsyncSession() as session:
await self._ensure_tables_ready(session)
memberUserIds = self._unique_int_list(Body.memberUserIds + [CurrentUserId])
principalUserIds = self._unique_int_list(Body.principalUserIds)
documentIds = self._unique_int_list(Body.documentIds)
await self._reset_transaction_for_write(session)
async with session.begin():
taskRow = (
await session.execute(
text(
"""
INSERT INTO leaudit_cross_review_tasks
(task_name, task_type, doc_type_id, doc_type_code, assigner_id, status)
VALUES
(:task_name, :task_type, :doc_type_id, :doc_type_code, :assigner_id, 'in_progress')
RETURNING id, task_name
"""
),
{
"task_name": Body.taskName.strip(),
"task_type": Body.taskType,
"doc_type_id": Body.docTypeId,
"doc_type_code": Body.docTypeCode,
"assigner_id": CurrentUserId,
},
)
).mappings().first()
if not taskRow:
raise LeauditException(StatusCodeEnum.HTTP_500_INTERNAL_SERVER_ERROR, "创建交叉评查任务失败")
taskId = int(taskRow["id"])
for userId in memberUserIds:
await session.execute(
text(
"""
INSERT INTO leaudit_cross_review_task_members
(task_id, user_id, member_role)
VALUES
(:task_id, :user_id, :member_role)
"""
),
{
"task_id": taskId,
"user_id": userId,
"member_role": "principal" if userId in principalUserIds else "participant",
},
)
for documentId in documentIds:
await session.execute(
text(
"""
INSERT INTO leaudit_cross_review_task_documents
(task_id, document_id, audit_status)
VALUES
(:task_id, :document_id, 0)
"""
),
{
"task_id": taskId,
"document_id": documentId,
},
)
return CrossReviewTaskCreateVO(
taskId=taskId,
taskName=str(taskRow["task_name"]),
memberCount=len(memberUserIds),
documentCount=len(documentIds),
)
async def GetUserTasks(self, CurrentUserId: int, Body: CrossReviewTaskQueryDTO) -> CrossReviewTaskPageVO:
"""查询当前用户参与的交叉评查任务。"""
async with GetAsyncSession() as session:
await self._ensure_tables_ready(session)
offset = (Body.page - 1) * Body.pageSize
params: dict[str, object] = {
"current_user_id": CurrentUserId,
"limit": Body.pageSize,
"offset": offset,
}
whereClauses = [
"tm.user_id = :current_user_id",
"tm.delete_time IS NULL",
"t.delete_time IS NULL",
]
if Body.keyword:
whereClauses.append("t.task_name ILIKE :keyword")
params["keyword"] = f"%{Body.keyword.strip()}%"
if Body.status:
whereClauses.append("t.status = :status")
params["status"] = Body.status
if Body.taskType:
whereClauses.append("t.task_type = :task_type")
params["task_type"] = Body.taskType
if Body.docTypeCode:
whereClauses.append("t.doc_type_code = :doc_type_code")
params["doc_type_code"] = Body.docTypeCode
whereSql = " AND ".join(whereClauses)
total = int(
(
await session.scalar(
text(
f"""
SELECT COUNT(DISTINCT t.id)
FROM leaudit_cross_review_tasks t
JOIN leaudit_cross_review_task_members tm
ON tm.task_id = t.id
WHERE {whereSql}
"""
),
params,
)
)
or 0
)
rows = (
await session.execute(
text(
f"""
WITH doc_stats AS (
SELECT
td.task_id,
COUNT(*)::int AS total_documents,
COUNT(*) FILTER (WHERE td.audit_status = 1)::int AS completed_documents
FROM leaudit_cross_review_task_documents td
WHERE td.delete_time IS NULL
GROUP BY td.task_id
)
SELECT
t.id AS task_id,
t.task_name,
t.task_type,
t.doc_type_id,
t.doc_type_code,
t.status,
t.create_time,
COALESCE(ds.total_documents, 0) AS total_documents,
COALESCE(ds.completed_documents, 0) AS completed_documents
FROM leaudit_cross_review_tasks t
JOIN leaudit_cross_review_task_members tm
ON tm.task_id = t.id
LEFT JOIN doc_stats ds
ON ds.task_id = t.id
WHERE {whereSql}
GROUP BY
t.id, t.task_name, t.task_type, t.doc_type_id, t.doc_type_code,
t.status, t.create_time, ds.total_documents, ds.completed_documents
ORDER BY t.create_time DESC, t.id DESC
LIMIT :limit OFFSET :offset
"""
),
params,
)
).mappings().all()
items = []
for row in rows:
totalDocuments = int(row["total_documents"] or 0)
completedDocuments = int(row["completed_documents"] or 0)
progress = round((completedDocuments / totalDocuments * 100) if totalDocuments > 0 else 0, 2)
items.append(
CrossReviewTaskItemVO(
taskId=int(row["task_id"]),
taskName=str(row["task_name"]),
taskType=str(row["task_type"]),
docTypeId=self._to_int(row.get("doc_type_id")),
docTypeCode=row.get("doc_type_code"),
status=str(row["status"]),
progress=progress,
totalDocuments=totalDocuments,
completedDocuments=completedDocuments,
createdAt=row.get("create_time"),
)
)
return CrossReviewTaskPageVO(total=total, page=Body.page, pageSize=Body.pageSize, items=items)
async def GetTaskProgress(self, CurrentUserId: int, TaskId: int) -> CrossReviewTaskProgressVO:
"""查询任务进度。"""
async with GetAsyncSession() as session:
await self._ensure_tables_ready(session)
await self._ensure_task_member(session, TaskId, CurrentUserId)
row = (
await session.execute(
text(
"""
SELECT
COUNT(*)::int AS total_documents,
COUNT(*) FILTER (WHERE audit_status = 1)::int AS completed_documents
FROM leaudit_cross_review_task_documents
WHERE task_id = :task_id
AND delete_time IS NULL
"""
),
{"task_id": TaskId},
)
).mappings().first()
totalDocuments = int((row or {}).get("total_documents") or 0)
completedDocuments = int((row or {}).get("completed_documents") or 0)
progress = round((completedDocuments / totalDocuments * 100) if totalDocuments > 0 else 0, 2)
return CrossReviewTaskProgressVO(
taskId=TaskId,
totalDocuments=totalDocuments,
completedDocuments=completedDocuments,
progress=progress,
)
async def GetTaskDocuments(
self,
CurrentUserId: int,
TaskId: int,
Body: CrossReviewTaskDocumentQueryDTO,
) -> CrossReviewTaskDocumentPageVO:
"""查询任务文档列表。"""
async with GetAsyncSession() as session:
await self._ensure_tables_ready(session)
await self._ensure_task_member(session, TaskId, CurrentUserId)
params: dict[str, object] = {
"task_id": TaskId,
"limit": Body.pageSize,
"offset": (Body.page - 1) * Body.pageSize,
}
whereClauses = [
"td.task_id = :task_id",
"td.delete_time IS NULL",
]
if Body.keyword:
whereClauses.append("(d.normalized_name ILIKE :keyword OR CAST(d.biz_document_id AS TEXT) ILIKE :keyword)")
params["keyword"] = f"%{Body.keyword.strip()}%"
whereSql = " AND ".join(whereClauses)
total = int(
(
await session.scalar(
text(
f"""
SELECT COUNT(*)
FROM leaudit_cross_review_task_documents td
JOIN leaudit_documents d
ON d.id = td.document_id
WHERE {whereSql}
"""
),
params,
)
)
or 0
)
rows = (
await session.execute(
text(
f"""
SELECT
d.id AS document_id,
COALESCE(d.normalized_name, '') AS name,
CAST(d.biz_document_id AS TEXT) AS document_number,
d.type_id,
COALESCE(
CASE
WHEN LOWER(COALESCE(ar.status, '')) IN ('pending', 'queued', 'running', 'retrying')
THEN ar.status
ELSE d.processing_status
END,
d.processing_status,
'waiting'
) AS processing_status,
d.version_no,
d.is_latest_version,
d.created_at,
td.audit_status,
COALESCE(dt.name, '') AS type_name,
COALESCE(df.file_size, 0) AS file_size
FROM leaudit_cross_review_task_documents td
JOIN leaudit_documents d
ON d.id = td.document_id
LEFT JOIN leaudit_audit_runs ar
ON ar.id = d.current_run_id
LEFT JOIN leaudit_document_types dt
ON dt.id = d.type_id
LEFT JOIN LATERAL (
SELECT file_size
FROM leaudit_document_files
WHERE document_id = d.id
ORDER BY id ASC
LIMIT 1
) df ON TRUE
WHERE {whereSql}
ORDER BY d.created_at DESC, d.id DESC
LIMIT :limit OFFSET :offset
"""
),
params,
)
).mappings().all()
items = [
CrossReviewTaskDocumentVO(
documentId=int(row["document_id"]),
name=str(row["name"] or ""),
documentNumber=row.get("document_number"),
typeId=self._to_int(row.get("type_id")),
typeName=row.get("type_name"),
processingStatus=row.get("processing_status"),
versionNo=int(row.get("version_no") or 1),
isLatestVersion=bool(row.get("is_latest_version")),
auditStatus=int(row.get("audit_status") or 0),
createdAt=row.get("created_at"),
fileSize=int(row.get("file_size") or 0),
)
for row in rows
]
return CrossReviewTaskDocumentPageVO(
taskId=TaskId,
total=total,
page=Body.page,
pageSize=Body.pageSize,
items=items,
)
async def CanConfirmTaskDocument(self, CurrentUserId: int, TaskId: int) -> CrossReviewPermissionVO:
"""判断当前用户是否有权确认任务文档完成。"""
async with GetAsyncSession() as session:
await self._ensure_tables_ready(session)
row = (
await session.execute(
text(
"""
SELECT
t.assigner_id,
COALESCE(
MAX(CASE WHEN tm.user_id = :current_user_id AND tm.member_role = 'principal' THEN 1 ELSE 0 END),
0
) AS is_principal
FROM leaudit_cross_review_tasks t
LEFT JOIN leaudit_cross_review_task_members tm
ON tm.task_id = t.id
AND tm.delete_time IS NULL
WHERE t.id = :task_id
AND t.delete_time IS NULL
GROUP BY t.assigner_id
"""
),
{"task_id": TaskId, "current_user_id": CurrentUserId},
)
).mappings().first()
if not row:
raise LeauditException(StatusCodeEnum.HTTP_404_NOT_FOUND, "交叉评查任务不存在")
if int(row["assigner_id"]) == CurrentUserId:
return CrossReviewPermissionVO(canConfirm=True, reason="您是任务创建者,可确认完成")
if int(row["is_principal"] or 0) == 1:
return CrossReviewPermissionVO(canConfirm=True, reason="您是任务负责人,可确认完成")
return CrossReviewPermissionVO(canConfirm=False, reason="您不是任务创建者或负责人")
async def CompleteTaskDocument(self, CurrentUserId: int, TaskId: int, DocumentId: int) -> CrossReviewTaskCompleteVO:
"""确认任务文档完成。"""
async with GetAsyncSession() as session:
await self._ensure_tables_ready(session)
permission = await self.CanConfirmTaskDocument(CurrentUserId, TaskId)
if not permission.canConfirm:
raise LeauditException(StatusCodeEnum.HTTP_403_FORBIDDEN, permission.reason)
await self._reset_transaction_for_write(session)
async with session.begin():
mapping = (
await session.execute(
text(
"""
SELECT id, audit_status
FROM leaudit_cross_review_task_documents
WHERE task_id = :task_id
AND document_id = :document_id
AND delete_time IS NULL
LIMIT 1
"""
),
{"task_id": TaskId, "document_id": DocumentId},
)
).mappings().first()
if not mapping:
raise LeauditException(StatusCodeEnum.HTTP_404_NOT_FOUND, "任务文档不存在")
await session.execute(
text(
"""
UPDATE leaudit_cross_review_task_documents
SET audit_status = 1,
update_time = NOW()
WHERE id = :id
"""
),
{"id": int(mapping["id"])},
)
stats = (
await session.execute(
text(
"""
SELECT
COUNT(*)::int AS total_documents,
COUNT(*) FILTER (WHERE audit_status = 1)::int AS completed_documents
FROM leaudit_cross_review_task_documents
WHERE task_id = :task_id
AND delete_time IS NULL
"""
),
{"task_id": TaskId},
)
).mappings().first()
totalDocuments = int((stats or {}).get("total_documents") or 0)
completedDocuments = int((stats or {}).get("completed_documents") or 0)
taskCompleted = totalDocuments > 0 and totalDocuments == completedDocuments
taskStatus = "completed" if taskCompleted else "in_progress"
await session.execute(
text(
"""
UPDATE leaudit_cross_review_tasks
SET status = :status,
update_time = NOW()
WHERE id = :task_id
AND delete_time IS NULL
"""
),
{"task_id": TaskId, "status": taskStatus},
)
return CrossReviewTaskCompleteVO(
taskId=TaskId,
documentId=DocumentId,
auditStatus=1,
taskStatus=taskStatus,
taskCompleted=taskCompleted,
)
async def CreateProposal(self, CurrentUserId: int, Body: CrossReviewProposalCreateDTO) -> CrossReviewProposalCreateVO:
"""创建交叉评查提案。"""
async with GetAsyncSession() as session:
await self._ensure_tables_ready(session)
if Body.deductionScore == 0:
raise LeauditException(StatusCodeEnum.HTTP_400_BAD_REQUEST, "不允许创建 0 分提案")
taskId = await self._resolve_task_id_for_document(session, Body.documentId, CurrentUserId)
await self._ensure_task_member(session, taskId, CurrentUserId)
await self._ensure_task_document(session, taskId, Body.documentId)
ruleResult = await self._load_rule_result(session, Body.reviewPointResultId, Body.documentId)
duplicateExists = bool(
await session.scalar(
text(
"""
SELECT 1
FROM leaudit_cross_review_proposals
WHERE task_id = :task_id
AND document_id = :document_id
AND rule_result_id = :rule_result_id
AND proposer_id = :proposer_id
AND status IN ('pending', 'approved')
AND delete_time IS NULL
LIMIT 1
"""
),
{
"task_id": taskId,
"document_id": Body.documentId,
"rule_result_id": Body.reviewPointResultId,
"proposer_id": CurrentUserId,
},
)
)
if duplicateExists:
raise LeauditException(StatusCodeEnum.HTTP_400_BAD_REQUEST, "当前评查点已存在您的有效提案")
currentScore, fullScore = await self._calculate_current_score(session, Body.reviewPointResultId, Body.documentId)
if Body.deductionScore < 0 and currentScore <= 0:
raise LeauditException(StatusCodeEnum.HTTP_400_BAD_REQUEST, "当前分值已为 0,不能继续扣分")
if Body.deductionScore > 0 and currentScore >= fullScore:
raise LeauditException(StatusCodeEnum.HTTP_400_BAD_REQUEST, "当前分值已满分,不能继续加分")
await self._reset_transaction_for_write(session)
async with session.begin():
proposalRow = (
await session.execute(
text(
"""
INSERT INTO leaudit_cross_review_proposals
(task_id, document_id, rule_result_id, proposer_id, proposed_score_delta, reason, status)
VALUES
(:task_id, :document_id, :rule_result_id, :proposer_id, :proposed_score_delta, :reason, 'pending')
RETURNING id, create_time
"""
),
{
"task_id": taskId,
"document_id": Body.documentId,
"rule_result_id": Body.reviewPointResultId,
"proposer_id": CurrentUserId,
"proposed_score_delta": Body.deductionScore,
"reason": Body.auditOpinion.strip(),
},
)
).mappings().first()
if not proposalRow:
raise LeauditException(StatusCodeEnum.HTTP_500_INTERNAL_SERVER_ERROR, "创建交叉评查提案失败")
proposalId = int(proposalRow["id"])
await session.execute(
text(
"""
INSERT INTO leaudit_cross_review_votes
(proposal_id, voter_id, vote_type)
VALUES
(:proposal_id, :voter_id, 'agree')
"""
),
{"proposal_id": proposalId, "voter_id": CurrentUserId},
)
await self._refresh_proposal_status(session, proposalId)
return CrossReviewProposalCreateVO(proposalId=proposalId, createdAt=proposalRow.get("create_time"))
async def VoteProposal(
self,
CurrentUserId: int,
ProposalId: int,
Body: CrossReviewProposalVoteDTO,
) -> CrossReviewProposalVoteVO:
"""对交叉评查提案投票。"""
voteType = Body.voteType.strip().lower()
if voteType not in {"agree", "disagree", "cancel"}:
raise LeauditException(StatusCodeEnum.HTTP_400_BAD_REQUEST, "投票类型仅支持 agree/disagree/cancel")
async with GetAsyncSession() as session:
await self._ensure_tables_ready(session)
proposal = await self._load_proposal(session, ProposalId)
await self._ensure_task_member(session, int(proposal["task_id"]), CurrentUserId)
if str(proposal["status"]) in {"approved", "rejected", "cancelled"}:
raise LeauditException(StatusCodeEnum.HTTP_400_BAD_REQUEST, "当前提案状态不允许继续投票")
await self._reset_transaction_for_write(session)
async with session.begin():
if voteType == "cancel":
deleted = await session.execute(
text(
"""
UPDATE leaudit_cross_review_votes
SET delete_time = NOW(),
update_time = NOW()
WHERE proposal_id = :proposal_id
AND voter_id = :voter_id
AND delete_time IS NULL
"""
),
{"proposal_id": ProposalId, "voter_id": CurrentUserId},
)
if deleted.rowcount == 0:
raise LeauditException(StatusCodeEnum.HTTP_400_BAD_REQUEST, "当前没有可撤销的投票")
else:
await session.execute(
text(
"""
UPDATE leaudit_cross_review_votes
SET delete_time = NOW(),
update_time = NOW()
WHERE proposal_id = :proposal_id
AND voter_id = :voter_id
AND delete_time IS NULL
"""
),
{"proposal_id": ProposalId, "voter_id": CurrentUserId},
)
await session.execute(
text(
"""
INSERT INTO leaudit_cross_review_votes
(proposal_id, voter_id, vote_type)
VALUES
(:proposal_id, :voter_id, :vote_type)
"""
),
{"proposal_id": ProposalId, "voter_id": CurrentUserId, "vote_type": voteType},
)
proposalStatus = await self._refresh_proposal_status(session, ProposalId)
return CrossReviewProposalVoteVO(
proposalId=ProposalId,
voterId=CurrentUserId,
voteType=voteType,
proposalStatus=proposalStatus,
)
async def CancelProposal(self, CurrentUserId: int, ProposalId: int) -> CrossReviewProposalCancelVO:
"""撤销交叉评查提案。"""
async with GetAsyncSession() as session:
await self._ensure_tables_ready(session)
proposal = await self._load_proposal(session, ProposalId)
if int(proposal["proposer_id"]) != CurrentUserId:
raise LeauditException(StatusCodeEnum.HTTP_403_FORBIDDEN, "仅提案人可撤销提案")
if str(proposal["status"]) not in {"pending"}:
raise LeauditException(StatusCodeEnum.HTTP_400_BAD_REQUEST, "当前提案状态不允许撤销")
await self._reset_transaction_for_write(session)
async with session.begin():
await session.execute(
text(
"""
UPDATE leaudit_cross_review_proposals
SET status = 'cancelled',
update_time = NOW()
WHERE id = :proposal_id
AND delete_time IS NULL
"""
),
{"proposal_id": ProposalId},
)
await session.execute(
text(
"""
UPDATE leaudit_cross_review_votes
SET delete_time = NOW(),
update_time = NOW()
WHERE proposal_id = :proposal_id
AND delete_time IS NULL
"""
),
{"proposal_id": ProposalId},
)
return CrossReviewProposalCancelVO(proposalId=ProposalId, status="cancelled")
async def GetDocumentProposals(
self,
CurrentUserId: int,
DocumentId: int,
Page: int,
PageSize: int,
) -> CrossReviewProposalPageVO:
"""获取文档提案列表。"""
async with GetAsyncSession() as session:
await self._ensure_tables_ready(session)
taskId = await self._resolve_task_id_for_document(session, DocumentId, CurrentUserId)
await self._ensure_task_member(session, taskId, CurrentUserId)
return await self._build_document_proposals_page(session, CurrentUserId, taskId, DocumentId, Page, PageSize)
async def GetDocumentPendingVotes(self, CurrentUserId: int, DocumentId: int) -> CrossReviewPendingVotesVO:
"""获取文档待投票信息。"""
async with GetAsyncSession() as session:
await self._ensure_tables_ready(session)
taskId = await self._resolve_task_id_for_document(session, DocumentId, CurrentUserId)
await self._ensure_task_member(session, taskId, CurrentUserId)
proposalPage = await self._build_document_proposals_page(session, CurrentUserId, taskId, DocumentId, 1, 200)
pendingProposals = [
CrossReviewPendingProposalVO(
evaluationPointName=item.evaluationPointName,
pendingVotersNum=len(item.pendingVoters),
)
for item in proposalPage.items
if item.status == "pending" and len(item.pendingVoters) > 0
]
return CrossReviewPendingVotesVO(
hasPendingVotes=len(pendingProposals) > 0,
pendingProposals=pendingProposals,
)
async def UploadTaskDocument(
self,
CurrentUserId: int,
TaskId: int,
FileName: str,
FileContent: bytes,
ContentType: str | None,
) -> CrossReviewTaskDocumentUploadVO:
"""向交叉评查任务补传文档。"""
async with GetAsyncSession() as session:
await self._ensure_tables_ready(session)
permission = await self.CanConfirmTaskDocument(CurrentUserId, TaskId)
if not permission.canConfirm:
raise LeauditException(StatusCodeEnum.HTTP_403_FORBIDDEN, permission.reason)
taskMeta = (
await session.execute(
text(
"""
SELECT id, doc_type_id, doc_type_code
FROM leaudit_cross_review_tasks
WHERE id = :task_id
AND delete_time IS NULL
LIMIT 1
"""
),
{"task_id": TaskId},
)
).mappings().first()
if not taskMeta:
raise LeauditException(StatusCodeEnum.HTTP_404_NOT_FOUND, "交叉评查任务不存在")
uploadResult = await self.DocumentService.Upload(
FileName=FileName,
FileContent=FileContent,
ContentType=ContentType,
TypeId=self._to_int(taskMeta.get("doc_type_id")),
TypeCode=taskMeta.get("doc_type_code"),
CreatedBy=CurrentUserId,
AutoRun=True,
)
async with GetAsyncSession() as session:
await self._ensure_tables_ready(session)
await self._reset_transaction_for_write(session)
async with session.begin():
exists = bool(
await session.scalar(
text(
"""
SELECT 1
FROM leaudit_cross_review_task_documents
WHERE task_id = :task_id
AND document_id = :document_id
AND delete_time IS NULL
LIMIT 1
"""
),
{"task_id": TaskId, "document_id": uploadResult.documentId},
)
)
if not exists:
await session.execute(
text(
"""
INSERT INTO leaudit_cross_review_task_documents
(task_id, document_id, audit_status)
VALUES
(:task_id, :document_id, 0)
"""
),
{"task_id": TaskId, "document_id": uploadResult.documentId},
)
await session.execute(
text(
"""
UPDATE leaudit_cross_review_tasks
SET status = 'in_progress',
update_time = NOW()
WHERE id = :task_id
AND delete_time IS NULL
"""
),
{"task_id": TaskId},
)
return CrossReviewTaskDocumentUploadVO(
taskId=TaskId,
documentId=uploadResult.documentId,
auditStatus=0,
processingStatus=uploadResult.processingStatus,
)
async def _build_document_proposals_page(
self,
session,
CurrentUserId: int,
TaskId: int,
DocumentId: int,
Page: int,
PageSize: int,
) -> CrossReviewProposalPageVO:
params = {
"task_id": TaskId,
"document_id": DocumentId,
"limit": PageSize,
"offset": (Page - 1) * PageSize,
}
total = int(
(
await session.scalar(
text(
"""
SELECT COUNT(*)
FROM leaudit_cross_review_proposals p
WHERE p.task_id = :task_id
AND p.document_id = :document_id
AND p.delete_time IS NULL
"""
),
params,
)
)
or 0
)
proposalRows = (
await session.execute(
text(
"""
SELECT
p.id,
p.task_id,
p.document_id,
p.rule_result_id,
p.proposer_id,
p.proposed_score_delta,
p.reason,
p.status,
p.create_time,
rr.rule_name,
rr.fail_message,
COALESCE(u.nick_name, u.username, '') AS proposer_name
FROM leaudit_cross_review_proposals p
LEFT JOIN leaudit_rule_results rr
ON rr.id = p.rule_result_id
LEFT JOIN sso_users u
ON u.id = p.proposer_id
WHERE p.task_id = :task_id
AND p.document_id = :document_id
AND p.delete_time IS NULL
ORDER BY p.id DESC
LIMIT :limit OFFSET :offset
"""
),
params,
)
).mappings().all()
taskMembers = await self._load_task_member_names(session, TaskId)
proposalIds = [int(row["id"]) for row in proposalRows]
votesMap = await self._load_votes_map(session, proposalIds)
items: list[CrossReviewProposalItemVO] = []
for row in proposalRows:
proposalId = int(row["id"])
voteRows = votesMap.get(proposalId, [])
agreeVoters = [str(vote["voter_name"]) for vote in voteRows if vote["vote_type"] == "agree"]
disagreeVoters = [str(vote["voter_name"]) for vote in voteRows if vote["vote_type"] == "disagree"]
votedUserIds = {int(vote["voter_id"]) for vote in voteRows}
pendingVoters = [name for userId, name in taskMembers.items() if userId not in votedUserIds]
canVote = CurrentUserId in taskMembers and CurrentUserId not in votedUserIds and str(row["status"]) == "pending"
items.append(
CrossReviewProposalItemVO(
proposalId=proposalId,
evaluationPointName=str(row.get("rule_name") or ""),
proposedScore=float(row["proposed_score_delta"] or 0),
reason=str(row.get("reason") or ""),
proposer=str(row.get("proposer_name") or ""),
votes=[
CrossReviewProposalVoteItemVO(
voter=str(vote["voter_name"]),
voteType=str(vote["vote_type"]),
)
for vote in voteRows
],
agreeVoters=agreeVoters,
disagreeVoters=disagreeVoters,
pendingVoters=pendingVoters,
canVote=canVote,
problemMessage=str(row.get("fail_message") or row.get("rule_name") or ""),
proposerId=int(row["proposer_id"]),
createdAt=row.get("create_time"),
status=str(row.get("status") or "pending"),
)
)
return CrossReviewProposalPageVO(total=total, page=Page, pageSize=PageSize, items=items)
async def _resolve_task_id_for_document(self, session, documentId: int, userId: int) -> int:
row = (
await session.execute(
text(
"""
SELECT td.task_id
FROM leaudit_cross_review_task_documents td
JOIN leaudit_cross_review_task_members tm
ON tm.task_id = td.task_id
JOIN leaudit_cross_review_tasks t
ON t.id = td.task_id
WHERE td.document_id = :document_id
AND td.delete_time IS NULL
AND tm.user_id = :user_id
AND tm.delete_time IS NULL
AND t.delete_time IS NULL
ORDER BY td.task_id DESC
LIMIT 1
"""
),
{"document_id": documentId, "user_id": userId},
)
).mappings().first()
if not row:
raise LeauditException(StatusCodeEnum.HTTP_404_NOT_FOUND, "当前文档不在您的交叉评查任务中")
return int(row["task_id"])
async def _ensure_task_document(self, session, taskId: int, documentId: int) -> None:
exists = bool(
await session.scalar(
text(
"""
SELECT 1
FROM leaudit_cross_review_task_documents
WHERE task_id = :task_id
AND document_id = :document_id
AND delete_time IS NULL
LIMIT 1
"""
),
{"task_id": taskId, "document_id": documentId},
)
)
if not exists:
raise LeauditException(StatusCodeEnum.HTTP_404_NOT_FOUND, "当前文档不在交叉评查任务内")
async def _load_rule_result(self, session, ruleResultId: int, documentId: int):
row = (
await session.execute(
text(
"""
SELECT id, document_id, rule_id, rule_name, score, passed
FROM leaudit_rule_results
WHERE id = :rule_result_id
AND document_id = :document_id
LIMIT 1
"""
),
{"rule_result_id": ruleResultId, "document_id": documentId},
)
).mappings().first()
if not row:
raise LeauditException(StatusCodeEnum.HTTP_404_NOT_FOUND, "对应评查点结果不存在")
return row
async def _calculate_current_score(self, session, ruleResultId: int, documentId: int) -> tuple[float, float]:
row = (
await session.execute(
text(
"""
WITH approved_delta AS (
SELECT COALESCE(SUM(proposed_score_delta), 0) AS delta
FROM leaudit_cross_review_proposals
WHERE rule_result_id = :rule_result_id
AND document_id = :document_id
AND status = 'approved'
AND delete_time IS NULL
)
SELECT
COALESCE(rr.score, 0) AS full_score,
CASE
WHEN a.edit_audit_status = 1 THEN CASE WHEN a.override_result IS TRUE THEN COALESCE(rr.score, 0) ELSE 0 END
WHEN rr.passed IS TRUE THEN COALESCE(rr.score, 0)
ELSE 0
END + COALESCE(ad.delta, 0) AS current_score
FROM leaudit_rule_results rr
LEFT JOIN leaudit_review_point_audits a
ON a.rule_result_id = rr.id
CROSS JOIN approved_delta ad
WHERE rr.id = :rule_result_id
AND rr.document_id = :document_id
LIMIT 1
"""
),
{"rule_result_id": ruleResultId, "document_id": documentId},
)
).mappings().first()
if not row:
raise LeauditException(StatusCodeEnum.HTTP_404_NOT_FOUND, "对应评查点结果不存在")
return float(row.get("current_score") or 0), float(row.get("full_score") or 0)
async def _load_proposal(self, session, proposalId: int):
row = (
await session.execute(
text(
"""
SELECT id, task_id, document_id, proposer_id, status
FROM leaudit_cross_review_proposals
WHERE id = :proposal_id
AND delete_time IS NULL
LIMIT 1
"""
),
{"proposal_id": proposalId},
)
).mappings().first()
if not row:
raise LeauditException(StatusCodeEnum.HTTP_404_NOT_FOUND, "交叉评查提案不存在")
return row
async def _load_task_member_names(self, session, taskId: int) -> dict[int, str]:
rows = (
await session.execute(
text(
"""
SELECT tm.user_id, COALESCE(u.nick_name, u.username, CAST(tm.user_id AS TEXT)) AS display_name
FROM leaudit_cross_review_task_members tm
LEFT JOIN sso_users u
ON u.id = tm.user_id
WHERE tm.task_id = :task_id
AND tm.delete_time IS NULL
ORDER BY tm.id ASC
"""
),
{"task_id": taskId},
)
).mappings().all()
return {int(row["user_id"]): str(row.get("display_name") or row["user_id"]) for row in rows}
async def _load_votes_map(self, session, proposalIds: list[int]) -> dict[int, list[dict[str, object]]]:
if not proposalIds:
return {}
rows = (
await session.execute(
text(
"""
SELECT
v.proposal_id,
v.voter_id,
v.vote_type,
COALESCE(u.nick_name, u.username, CAST(v.voter_id AS TEXT)) AS voter_name
FROM leaudit_cross_review_votes v
LEFT JOIN sso_users u
ON u.id = v.voter_id
WHERE v.proposal_id IN :proposal_ids
AND v.delete_time IS NULL
ORDER BY v.id ASC
"""
).bindparams(bindparam("proposal_ids", expanding=True)),
{"proposal_ids": proposalIds},
)
).mappings().all()
result: dict[int, list[dict[str, object]]] = {}
for row in rows:
result.setdefault(int(row["proposal_id"]), []).append(dict(row))
return result
async def _refresh_proposal_status(self, session, proposalId: int) -> str:
row = (
await session.execute(
text(
"""
SELECT p.id, p.task_id, p.status
FROM leaudit_cross_review_proposals p
WHERE p.id = :proposal_id
AND p.delete_time IS NULL
LIMIT 1
"""
),
{"proposal_id": proposalId},
)
).mappings().first()
if not row:
raise LeauditException(StatusCodeEnum.HTTP_404_NOT_FOUND, "交叉评查提案不存在")
currentStatus = str(row["status"])
if currentStatus == "cancelled":
return currentStatus
taskId = int(row["task_id"])
memberCount = int(
(
await session.scalar(
text(
"""
SELECT COUNT(*)
FROM leaudit_cross_review_task_members
WHERE task_id = :task_id
AND delete_time IS NULL
"""
),
{"task_id": taskId},
)
)
or 0
)
threshold = floor(memberCount / 2) + 1 if memberCount > 0 else 1
voteStats = (
await session.execute(
text(
"""
SELECT
COUNT(*) FILTER (WHERE vote_type = 'agree')::int AS agree_count,
COUNT(*) FILTER (WHERE vote_type = 'disagree')::int AS disagree_count
FROM leaudit_cross_review_votes
WHERE proposal_id = :proposal_id
AND delete_time IS NULL
"""
),
{"proposal_id": proposalId},
)
).mappings().first()
agreeCount = int((voteStats or {}).get("agree_count") or 0)
disagreeCount = int((voteStats or {}).get("disagree_count") or 0)
remaining = max(memberCount - agreeCount - disagreeCount, 0)
if agreeCount >= threshold:
nextStatus = "approved"
elif disagreeCount >= threshold:
nextStatus = "rejected"
elif agreeCount + remaining < threshold:
nextStatus = "rejected"
else:
nextStatus = "pending"
if nextStatus != currentStatus:
await session.execute(
text(
"""
UPDATE leaudit_cross_review_proposals
SET status = :status,
update_time = NOW()
WHERE id = :proposal_id
"""
),
{"proposal_id": proposalId, "status": nextStatus},
)
return nextStatus
async def _ensure_tables_ready(self, session) -> None:
"""确保第一阶段所需表已初始化。"""
required = [
"leaudit_cross_review_tasks",
"leaudit_cross_review_task_members",
"leaudit_cross_review_task_documents",
"leaudit_cross_review_proposals",
"leaudit_cross_review_votes",
]
missing_tables: list[str] = []
for tableName in required:
exists = bool(
await session.scalar(
text(
"""
SELECT EXISTS (
SELECT 1
FROM information_schema.tables
WHERE table_schema = current_schema()
AND table_name = :table_name
)
"""
),
{"table_name": tableName},
)
)
if not exists:
missing_tables.append(tableName)
if missing_tables:
for statement in self._SCHEMA_BOOTSTRAP_STATEMENTS:
await session.execute(text(statement))
await session.commit()
for tableName in required:
exists = bool(
await session.scalar(
text(
"""
SELECT EXISTS (
SELECT 1
FROM information_schema.tables
WHERE table_schema = current_schema()
AND table_name = :table_name
)
"""
),
{"table_name": tableName},
)
)
if not exists:
raise LeauditException(StatusCodeEnum.HTTP_500_INTERNAL_SERVER_ERROR, f"交叉评查表未初始化: {tableName}")
async def _ensure_task_member(self, session, taskId: int, userId: int) -> None:
"""校验当前用户是否是任务成员。"""
exists = bool(
await session.scalar(
text(
"""
SELECT 1
FROM leaudit_cross_review_task_members tm
JOIN leaudit_cross_review_tasks t
ON t.id = tm.task_id
WHERE tm.task_id = :task_id
AND tm.user_id = :user_id
AND tm.delete_time IS NULL
AND t.delete_time IS NULL
LIMIT 1
"""
),
{"task_id": taskId, "user_id": userId},
)
)
if not exists:
raise LeauditException(StatusCodeEnum.HTTP_403_FORBIDDEN, "当前用户不是交叉评查任务成员")
async def _reset_transaction_for_write(self, session) -> None:
"""显式写事务前清理查询阶段开启的隐式事务。"""
if session.in_transaction():
await session.rollback()
def _unique_int_list(self, values: list[int]) -> list[int]:
"""去重并保留原顺序。"""
seen: set[int] = set()
result: list[int] = []
for value in values:
intValue = int(value)
if intValue in seen:
continue
seen.add(intValue)
result.append(intValue)
return result
def _to_int(self, value) -> int | None:
"""安全转 int。"""
if value is None:
return None
return int(value)