diff --git a/fastapi_modules/fastapi_leaudit/controllers/crossReviewController.py b/fastapi_modules/fastapi_leaudit/controllers/crossReviewController.py index d67c1b0..d64955b 100644 --- a/fastapi_modules/fastapi_leaudit/controllers/crossReviewController.py +++ b/fastapi_modules/fastapi_leaudit/controllers/crossReviewController.py @@ -3,7 +3,7 @@ from typing import Any from fastapi import Depends, File, Form, Query, UploadFile -from fastapi.responses import JSONResponse +from fastapi.responses import JSONResponse, Response from fastapi_common.fastapi_common_security.security import verify_access_token from fastapi_common.fastapi_common_web.controller import BaseController @@ -221,6 +221,26 @@ class CrossReviewController(BaseController): Data = await self.CrossReviewService.GetDocumentPendingVotes(CurrentUserId=int(payload["user_id"]), DocumentId=DocumentId) return Result.success(data=Data) + @self.router.get("/documents/{DocumentId}/proposals/export") + async def ExportDocumentProposals( + DocumentId: int, + payload: dict[str, Any] = Depends(verify_access_token), + ): + """导出文档提案列表 Excel。""" + if not await self._check_permission(int(payload["user_id"]), [self._PERMISSIONS["proposal_read"]]): + return JSONResponse(status_code=403, content={"code": 403, "msg": "当前用户没有导出交叉评查提案权限", "data": None}) + fileContent, fileName = await self.CrossReviewService.ExportDocumentProposals( + CurrentUserId=int(payload["user_id"]), + DocumentId=DocumentId, + ) + return Response( + content=fileContent, + media_type="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet", + headers={ + "Content-Disposition": f"attachment; filename*=UTF-8''{fileName}", + }, + ) + async def _check_permission(self, user_id: int, permission_keys: list[str]) -> bool: """OR 逻辑权限校验。""" for permission_key in permission_keys: diff --git a/fastapi_modules/fastapi_leaudit/domian/Dto/crossReviewDto.py b/fastapi_modules/fastapi_leaudit/domian/Dto/crossReviewDto.py index 5ff1766..d5c92ea 100644 --- a/fastapi_modules/fastapi_leaudit/domian/Dto/crossReviewDto.py +++ b/fastapi_modules/fastapi_leaudit/domian/Dto/crossReviewDto.py @@ -39,9 +39,12 @@ class CrossReviewTaskDocumentQueryDTO(BaseModel): class CrossReviewProposalCreateDTO(BaseModel): """创建交叉评查提案。""" - reviewPointResultId: int = Field(..., description="规则结果ID") + proposalType: str = Field("review_point", description="提案类型:review_point/supplement") + reviewPointResultId: int | None = Field(None, description="规则结果ID") documentId: int = Field(..., description="文档ID") evaluationPointId: int | None = Field(None, description="评查点ID") + evaluationPointName: str | None = Field(None, description="补充评查点名称") + extractionResultText: str | None = Field(None, description="补充抽取结果文本") auditOpinion: str = Field(..., min_length=1, description="提案理由") deductionScore: float = Field(..., description="分值调整量") diff --git a/fastapi_modules/fastapi_leaudit/domian/vo/crossReviewVo.py b/fastapi_modules/fastapi_leaudit/domian/vo/crossReviewVo.py index ed96061..7cfe4f0 100644 --- a/fastapi_modules/fastapi_leaudit/domian/vo/crossReviewVo.py +++ b/fastapi_modules/fastapi_leaudit/domian/vo/crossReviewVo.py @@ -146,7 +146,10 @@ class CrossReviewProposalItemVO(BaseModel): """提案列表项。""" proposalId: int = Field(..., description="提案ID") + proposalType: str = Field("review_point", description="提案类型") + reviewPointResultId: int | None = Field(None, description="评查点结果ID") evaluationPointName: str = Field("", description="评查点名称") + extractionResultText: str = Field("", description="抽取结果文本") proposedScore: float = Field(..., description="调整分值") reason: str = Field("", description="提案理由") proposer: str = Field("", description="提案人姓名") diff --git a/fastapi_modules/fastapi_leaudit/services/crossReviewService.py b/fastapi_modules/fastapi_leaudit/services/crossReviewService.py index 59a93fa..1db3fba 100644 --- a/fastapi_modules/fastapi_leaudit/services/crossReviewService.py +++ b/fastapi_modules/fastapi_leaudit/services/crossReviewService.py @@ -99,6 +99,11 @@ class ICrossReviewService(ABC): """获取文档待投票信息。""" ... + @abstractmethod + async def ExportDocumentProposals(self, CurrentUserId: int, DocumentId: int) -> tuple[bytes, str]: + """导出文档交叉评查意见 Excel。""" + ... + @abstractmethod async def UploadTaskDocument( self, diff --git a/fastapi_modules/fastapi_leaudit/services/impl/crossReviewServiceImpl.py b/fastapi_modules/fastapi_leaudit/services/impl/crossReviewServiceImpl.py index e2175eb..fb65594 100644 --- a/fastapi_modules/fastapi_leaudit/services/impl/crossReviewServiceImpl.py +++ b/fastapi_modules/fastapi_leaudit/services/impl/crossReviewServiceImpl.py @@ -2,8 +2,13 @@ from __future__ import annotations +import io import logging +from datetime import datetime from math import floor +from urllib.parse import quote +from xml.sax.saxutils import escape +from zipfile import ZIP_DEFLATED, ZipFile from sqlalchemy import bindparam, text @@ -49,6 +54,37 @@ from fastapi_modules.fastapi_leaudit.services.impl.documentServiceImpl import Do class CrossReviewServiceImpl(ICrossReviewService): """交叉评查服务实现。""" + _PROPOSAL_EXPORT_HEADERS: tuple[str, ...] = ( + "任务名称", + "文档名称", + "意见分类", + "评查点名称", + "问题依据/命中内容", + "原问题说明", + "评查意见", + "分数调整", + "提案状态", + "发起人", + "提出时间", + "同意人数", + "同意人员", + "不同意人数", + "不同意人员", + "未投人数", + "未投人员", + ) + + _VOTE_EXPORT_HEADERS: tuple[str, ...] = ( + "任务名称", + "文档名称", + "意见分类", + "评查点名称", + "发起人", + "投票人", + "投票状态", + "投票时间", + ) + _SCHEMA_BOOTSTRAP_STATEMENTS: tuple[str, ...] = ( """ CREATE TABLE IF NOT EXISTS leaudit_cross_review_tasks ( @@ -110,8 +146,11 @@ class CrossReviewServiceImpl(ICrossReviewService): id BIGINT GENERATED ALWAYS AS IDENTITY PRIMARY KEY, task_id BIGINT NOT NULL, document_id BIGINT NOT NULL, - rule_result_id BIGINT NOT NULL, + proposal_type VARCHAR(32) NOT NULL DEFAULT 'review_point', + rule_result_id BIGINT, proposer_id BIGINT NOT NULL, + evaluation_point_name VARCHAR(255), + extraction_result_text TEXT, proposed_score_delta NUMERIC(10, 2) NOT NULL, reason TEXT NOT NULL, status VARCHAR(32) NOT NULL DEFAULT 'pending', @@ -564,6 +603,13 @@ class CrossReviewServiceImpl(ICrossReviewService): WHERE rr.document_id = d.id AND rr.run_id = d.current_run_id ) es ON TRUE + LEFT JOIN LATERAL ( + SELECT COALESCE(SUM(proposed_score_delta), 0) AS approved_delta + FROM leaudit_cross_review_proposals p + WHERE p.document_id = d.id + AND p.status = 'approved' + AND p.delete_time IS NULL + ) pd ON TRUE WHERE {whereSql} ORDER BY d.created_at DESC, d.id DESC LIMIT :limit OFFSET :offset @@ -601,10 +647,16 @@ class CrossReviewServiceImpl(ICrossReviewService): errorMessages=self._parse_text_array(row.get("error_messages")), issueMessages=self._parse_text_array(row.get("issue_messages")), manualMessages=self._parse_text_array(row.get("manual_messages")), - finalScore=float(row.get("final_score") or 0), + finalScore=float(row.get("final_score") or 0) + float(row.get("approved_delta") or 0), fullScore=float(row.get("full_score") or 0), - scoreSummary=str(row.get("score_summary") or ""), - scorePercent=float(row.get("score_percent") or 0), + scoreSummary=self._build_score_summary( + float(row.get("final_score") or 0) + float(row.get("approved_delta") or 0), + float(row.get("full_score") or 0), + ), + scorePercent=self._build_score_percent( + float(row.get("final_score") or 0) + float(row.get("approved_delta") or 0), + float(row.get("full_score") or 0), + ), ) for row in rows ] @@ -738,13 +790,33 @@ class CrossReviewServiceImpl(ICrossReviewService): """创建交叉评查提案。""" async with GetAsyncSession() as session: await self._ensure_tables_ready(session) + proposalType = (Body.proposalType or "review_point").strip().lower() + if proposalType not in {"review_point", "supplement"}: + raise LeauditException(StatusCodeEnum.HTTP_400_BAD_REQUEST, "proposalType 仅支持 review_point、supplement") if Body.deductionScore == 0: raise LeauditException(StatusCodeEnum.HTTP_400_BAD_REQUEST, "不允许创建 0 分提案") taskId = await self._resolve_task_id_for_document(session, Body.documentId, CurrentUserId) await self._ensure_task_member(session, taskId, CurrentUserId) await self._ensure_task_document(session, taskId, Body.documentId) - ruleResult = await self._load_rule_result(session, Body.reviewPointResultId, Body.documentId) + + ruleResult = None + reviewPointResultId: int | None = None + evaluationPointName = (Body.evaluationPointName or "").strip() + extractionResultText = (Body.extractionResultText or "").strip() + + if proposalType == "review_point": + if Body.reviewPointResultId is None: + raise LeauditException(StatusCodeEnum.HTTP_400_BAD_REQUEST, "reviewPointResultId 不能为空") + reviewPointResultId = int(Body.reviewPointResultId) + ruleResult = await self._load_rule_result(session, reviewPointResultId, Body.documentId) + if not evaluationPointName: + evaluationPointName = str(ruleResult.get("rule_name") or ruleResult.get("rule_id") or "") + else: + if not evaluationPointName: + raise LeauditException(StatusCodeEnum.HTTP_400_BAD_REQUEST, "补充意见必须填写评查点名称") + if not extractionResultText: + raise LeauditException(StatusCodeEnum.HTTP_400_BAD_REQUEST, "补充意见必须填写抽取结果") duplicateExists = bool( await session.scalar( @@ -754,29 +826,41 @@ class CrossReviewServiceImpl(ICrossReviewService): FROM leaudit_cross_review_proposals WHERE task_id = :task_id AND document_id = :document_id - AND rule_result_id = :rule_result_id + AND proposal_type = :proposal_type AND proposer_id = :proposer_id AND status IN ('pending', 'approved') AND delete_time IS NULL + AND ( + (:proposal_type = 'review_point' AND rule_result_id = :rule_result_id) + OR ( + :proposal_type = 'supplement' + AND COALESCE(evaluation_point_name, '') = :evaluation_point_name + AND COALESCE(extraction_result_text, '') = :extraction_result_text + ) + ) LIMIT 1 """ ), { "task_id": taskId, "document_id": Body.documentId, - "rule_result_id": Body.reviewPointResultId, + "proposal_type": proposalType, + "rule_result_id": reviewPointResultId, + "evaluation_point_name": evaluationPointName, + "extraction_result_text": extractionResultText, "proposer_id": CurrentUserId, }, ) ) if duplicateExists: - raise LeauditException(StatusCodeEnum.HTTP_400_BAD_REQUEST, "当前评查点已存在您的有效提案") + raise LeauditException(StatusCodeEnum.HTTP_400_BAD_REQUEST, "当前内容已存在您的有效提案") - currentScore, fullScore = await self._calculate_current_score(session, Body.reviewPointResultId, Body.documentId) - if Body.deductionScore < 0 and currentScore <= 0: - raise LeauditException(StatusCodeEnum.HTTP_400_BAD_REQUEST, "当前分值已为 0,不能继续扣分") - if Body.deductionScore > 0 and currentScore >= fullScore: - raise LeauditException(StatusCodeEnum.HTTP_400_BAD_REQUEST, "当前分值已满分,不能继续加分") + if proposalType == "review_point": + currentScore, fullScore = await self._calculate_current_score(session, reviewPointResultId, Body.documentId) + if Body.deductionScore < 0 and currentScore <= 0: + raise LeauditException(StatusCodeEnum.HTTP_400_BAD_REQUEST, "当前分值已为 0,不能继续扣分") + if Body.deductionScore > 0 and currentScore >= fullScore: + raise LeauditException(StatusCodeEnum.HTTP_400_BAD_REQUEST, "当前分值已满分,不能继续加分") await self._reset_transaction_for_write(session) async with session.begin(): @@ -785,17 +869,42 @@ class CrossReviewServiceImpl(ICrossReviewService): text( """ INSERT INTO leaudit_cross_review_proposals - (task_id, document_id, rule_result_id, proposer_id, proposed_score_delta, reason, status) + ( + task_id, + document_id, + proposal_type, + rule_result_id, + proposer_id, + evaluation_point_name, + extraction_result_text, + proposed_score_delta, + reason, + status + ) VALUES - (:task_id, :document_id, :rule_result_id, :proposer_id, :proposed_score_delta, :reason, 'pending') + ( + :task_id, + :document_id, + :proposal_type, + :rule_result_id, + :proposer_id, + :evaluation_point_name, + :extraction_result_text, + :proposed_score_delta, + :reason, + 'pending' + ) RETURNING id, create_time """ ), { "task_id": taskId, "document_id": Body.documentId, - "rule_result_id": Body.reviewPointResultId, + "proposal_type": proposalType, + "rule_result_id": reviewPointResultId, "proposer_id": CurrentUserId, + "evaluation_point_name": evaluationPointName or None, + "extraction_result_text": extractionResultText or None, "proposed_score_delta": Body.deductionScore, "reason": Body.auditOpinion.strip(), }, @@ -965,6 +1074,74 @@ class CrossReviewServiceImpl(ICrossReviewService): pendingProposals=pendingProposals, ) + async def ExportDocumentProposals(self, CurrentUserId: int, DocumentId: int) -> tuple[bytes, str]: + """导出文档交叉评查意见 Excel。""" + async with GetAsyncSession() as session: + await self._ensure_tables_ready(session) + taskId = await self._resolve_task_id_for_document(session, DocumentId, CurrentUserId) + await self._ensure_task_member(session, taskId, CurrentUserId) + + documentMeta = await self._load_document_export_meta(session, taskId, DocumentId) + proposalPage = await self._build_document_proposals_page(session, CurrentUserId, taskId, DocumentId, 1, 1000) + + defaultRows: list[dict[str, object]] = [] + supplementRows: list[dict[str, object]] = [] + voteRows: list[dict[str, object]] = [] + taskMembers = await self._load_task_member_names(session, taskId) + + for item in proposalPage.items: + row = self._build_export_summary_row( + taskId=taskId, + taskName=documentMeta["task_name"], + documentId=DocumentId, + documentName=documentMeta["document_name"], + item=item, + ) + if item.proposalType == "supplement": + supplementRows.append(row) + else: + defaultRows.append(row) + + votedNames = {vote.voter for vote in item.votes} + for vote in item.votes: + voteRows.append( + { + "任务名称": documentMeta["task_name"], + "文档名称": documentMeta["document_name"], + "意见分类": "补充意见" if item.proposalType == "supplement" else "默认规则意见", + "评查点名称": item.evaluationPointName, + "发起人": item.proposer, + "投票人": vote.voter, + "投票结果": self._translate_vote_type(vote.voteType), + "投票时间": "", + } + ) + for _, memberName in taskMembers.items(): + if memberName in votedNames: + continue + voteRows.append( + { + "任务名称": documentMeta["task_name"], + "文档名称": documentMeta["document_name"], + "意见分类": "补充意见" if item.proposalType == "supplement" else "默认规则意见", + "评查点名称": item.evaluationPointName, + "发起人": item.proposer, + "投票人": memberName, + "投票结果": "未投", + "投票时间": "", + } + ) + + workbook = self._build_simple_xlsx( + sheets=[ + ("默认规则意见", list(self._PROPOSAL_EXPORT_HEADERS), defaultRows), + ("补充意见", list(self._PROPOSAL_EXPORT_HEADERS), supplementRows), + ("投票明细", list(self._VOTE_EXPORT_HEADERS), voteRows), + ] + ) + safeName = quote(f"交叉评查意见_{documentMeta['document_name']}_{datetime.now().strftime('%Y%m%d_%H%M%S')}.xlsx") + return workbook, safeName + async def UploadTaskDocument( self, CurrentUserId: int, @@ -1105,8 +1282,11 @@ class CrossReviewServiceImpl(ICrossReviewService): p.id, p.task_id, p.document_id, + p.proposal_type, p.rule_result_id, p.proposer_id, + p.evaluation_point_name, + p.extraction_result_text, p.proposed_score_delta, p.reason, p.status, @@ -1145,7 +1325,10 @@ class CrossReviewServiceImpl(ICrossReviewService): items.append( CrossReviewProposalItemVO( proposalId=proposalId, - evaluationPointName=str(row.get("rule_name") or ""), + proposalType=str(row.get("proposal_type") or "review_point"), + reviewPointResultId=self._to_int(row.get("rule_result_id")), + evaluationPointName=str(row.get("evaluation_point_name") or row.get("rule_name") or ""), + extractionResultText=str(row.get("extraction_result_text") or ""), proposedScore=float(row["proposed_score_delta"] or 0), reason=str(row.get("reason") or ""), proposer=str(row.get("proposer_name") or ""), @@ -1160,7 +1343,7 @@ class CrossReviewServiceImpl(ICrossReviewService): disagreeVoters=disagreeVoters, pendingVoters=pendingVoters, canVote=canVote, - problemMessage=str(row.get("fail_message") or row.get("rule_name") or ""), + problemMessage=str(row.get("fail_message") or row.get("extraction_result_text") or row.get("rule_name") or ""), proposerId=int(row["proposer_id"]), createdAt=row.get("create_time"), status=str(row.get("status") or "pending"), @@ -1288,6 +1471,38 @@ class CrossReviewServiceImpl(ICrossReviewService): raise LeauditException(StatusCodeEnum.HTTP_404_NOT_FOUND, "交叉评查提案不存在") return row + async def _load_document_export_meta(self, session, taskId: int, documentId: int) -> dict[str, str]: + row = ( + await session.execute( + text( + """ + SELECT + t.task_name, + COALESCE(f.file_name, d.normalized_name, CAST(d.id AS TEXT)) AS document_name + FROM leaudit_cross_review_tasks t + JOIN leaudit_documents d + ON d.id = :document_id + LEFT JOIN leaudit_document_files f + ON f.document_id = d.id + AND f.is_active = true + AND f.deleted_at IS NULL + AND f.file_role IN ('original', 'primary') + WHERE t.id = :task_id + AND t.delete_time IS NULL + AND d.deleted_at IS NULL + LIMIT 1 + """ + ), + {"task_id": taskId, "document_id": documentId}, + ) + ).mappings().first() + if not row: + raise LeauditException(StatusCodeEnum.HTTP_404_NOT_FOUND, "导出目标不存在") + return { + "task_name": str(row.get("task_name") or f"任务{taskId}"), + "document_name": str(row.get("document_name") or f"文档{documentId}"), + } + async def _load_task_member_names(self, session, taskId: int) -> dict[int, str]: rows = ( await session.execute( @@ -1469,6 +1684,41 @@ class CrossReviewServiceImpl(ICrossReviewService): if not exists: raise LeauditException(StatusCodeEnum.HTTP_500_INTERNAL_SERVER_ERROR, f"交叉评查表未初始化: {tableName}") + proposalColumns = await self._load_table_columns(session, "leaudit_cross_review_proposals") + if "proposal_type" not in proposalColumns: + await session.execute( + text("ALTER TABLE leaudit_cross_review_proposals ADD COLUMN proposal_type VARCHAR(32) NOT NULL DEFAULT 'review_point'") + ) + if "evaluation_point_name" not in proposalColumns: + await session.execute( + text("ALTER TABLE leaudit_cross_review_proposals ADD COLUMN evaluation_point_name VARCHAR(255)") + ) + if "extraction_result_text" not in proposalColumns: + await session.execute( + text("ALTER TABLE leaudit_cross_review_proposals ADD COLUMN extraction_result_text TEXT") + ) + if proposalColumns: + isRuleResultNotNull = bool( + await session.scalar( + text( + """ + SELECT 1 + FROM information_schema.columns + WHERE table_schema = current_schema() + AND table_name = 'leaudit_cross_review_proposals' + AND column_name = 'rule_result_id' + AND is_nullable = 'NO' + LIMIT 1 + """ + ) + ) + ) + if isRuleResultNotNull: + await session.execute( + text("ALTER TABLE leaudit_cross_review_proposals ALTER COLUMN rule_result_id DROP NOT NULL") + ) + await session.commit() + async def _ensure_task_member(self, session, taskId: int, userId: int) -> None: """校验当前用户是否是任务成员。""" exists = bool( @@ -1522,3 +1772,283 @@ class CrossReviewServiceImpl(ICrossReviewService): if isinstance(value, list): return [str(v) for v in value] return [str(value)] + + def _build_score_summary(self, finalScore: float, fullScore: float) -> str: + if fullScore <= 0: + return "0/0" + return f"{round(finalScore, 1)}/{round(fullScore, 1)}" + + def _build_score_percent(self, finalScore: float, fullScore: float) -> float: + if fullScore <= 0: + return 0.0 + return round(finalScore / fullScore * 100, 1) + + def _build_export_summary_row( + self, + taskId: int, + taskName: str, + documentId: int, + documentName: str, + item: CrossReviewProposalItemVO, + ) -> dict[str, object]: + return { + "任务名称": taskName, + "文档名称": documentName, + "意见分类": "补充意见" if item.proposalType == "supplement" else "默认规则意见", + "评查点名称": item.evaluationPointName, + "问题依据/命中内容": item.extractionResultText or item.problemMessage or "", + "原问题说明": item.problemMessage or "", + "评查意见": item.reason, + "分数调整": item.proposedScore, + "提案状态": self._translate_proposal_status(item.status), + "发起人": item.proposer, + "提出时间": self._format_datetime(item.createdAt), + "同意人数": len(item.agreeVoters), + "同意人员": "、".join(item.agreeVoters), + "不同意人数": len(item.disagreeVoters), + "不同意人员": "、".join(item.disagreeVoters), + "未投人数": len(item.pendingVoters), + "未投人员": "、".join(item.pendingVoters), + } + + def _translate_vote_type(self, voteType: str) -> str: + return { + "agree": "同意", + "disagree": "不同意", + "cancel": "撤销", + }.get(voteType, voteType or "") + + def _translate_proposal_status(self, status: str) -> str: + return { + "pending": "待定", + "approved": "已通过", + "rejected": "已驳回", + "cancelled": "已撤销", + }.get(status, status or "") + + def _format_datetime(self, value) -> str: + if value is None: + return "" + if isinstance(value, datetime): + return value.strftime("%Y-%m-%d %H:%M:%S") + return str(value) + + def _build_simple_xlsx(self, sheets: list[tuple[str, list[str], list[dict[str, object]]]]) -> bytes: + normalized_sheets: list[tuple[str, list[str], list[list[str]]]] = [] + for sheetName, headers, rows in sheets: + tableRows = [[self._excel_cell_text(row.get(header)) for header in headers] for row in rows] + normalized_sheets.append((sheetName, headers, tableRows)) + + sharedStrings: list[str] = [] + stringIndex: dict[str, int] = {} + + def intern(value: str) -> int: + if value in stringIndex: + return stringIndex[value] + idx = len(sharedStrings) + stringIndex[value] = idx + sharedStrings.append(value) + return idx + + for _, headers, rows in normalized_sheets: + for header in headers: + intern(header) + for row in rows: + for cell in row: + intern(cell) + + buffer = io.BytesIO() + with ZipFile(buffer, "w", ZIP_DEFLATED) as zf: + zf.writestr("[Content_Types].xml", self._xlsx_content_types(len(normalized_sheets))) + zf.writestr("_rels/.rels", self._xlsx_root_rels()) + zf.writestr("xl/workbook.xml", self._xlsx_workbook([sheet[0] for sheet in normalized_sheets])) + zf.writestr("xl/_rels/workbook.xml.rels", self._xlsx_workbook_rels(len(normalized_sheets))) + zf.writestr("xl/styles.xml", self._xlsx_styles()) + zf.writestr("xl/sharedStrings.xml", self._xlsx_shared_strings(sharedStrings)) + for index, (_, headers, rows) in enumerate(normalized_sheets, start=1): + zf.writestr(f"xl/worksheets/sheet{index}.xml", self._xlsx_sheet(headers, rows, intern)) + zf.writestr("docProps/core.xml", self._xlsx_core_props()) + zf.writestr("docProps/app.xml", self._xlsx_app_props([sheet[0] for sheet in normalized_sheets])) + return buffer.getvalue() + + def _excel_cell_text(self, value: object) -> str: + if value is None: + return "" + if isinstance(value, float): + return str(round(value, 2)) + return str(value) + + def _xlsx_content_types(self, sheetCount: int) -> str: + overrides = [ + '', + '', + '', + '', + '', + ] + overrides.extend( + f'' + for index in range(1, sheetCount + 1) + ) + return ( + '' + '' + '' + '' + + "".join(overrides) + + "" + ) + + def _xlsx_root_rels(self) -> str: + return ( + '' + '' + '' + '' + '' + "" + ) + + def _xlsx_workbook(self, sheetNames: list[str]) -> str: + sheets = "".join( + f'' + for index, name in enumerate(sheetNames, start=1) + ) + return ( + '' + '' + f"{sheets}" + "" + ) + + def _xlsx_workbook_rels(self, sheetCount: int) -> str: + relationships = [] + for index in range(1, sheetCount + 1): + relationships.append( + f'' + ) + relationships.append( + f'' + ) + relationships.append( + f'' + ) + return ( + '' + '' + + "".join(relationships) + + "" + ) + + def _xlsx_styles(self) -> str: + return ( + '' + '' + '' + '' + '' + '' + '' + '' + '' + '' + '' + '' + '' + '' + '' + '' + '' + '' + '' + '' + '' + '' + '' + '' + '' + '' + '' + '' + '' + '' + ) + + def _xlsx_shared_strings(self, values: list[str]) -> str: + items = "".join(f"{escape(value)}" for value in values) + return ( + '' + f'{items}' + ) + + def _xlsx_sheet(self, headers: list[str], rows: list[list[str]], intern) -> str: + all_rows: list[list[str]] = [] + if headers: + all_rows.append(headers) + all_rows.extend(rows) + sheet_rows = [] + for rowIndex, row in enumerate(all_rows, start=1): + cells = [] + styleIndex = "1" if headers and rowIndex == 1 else "0" + for colIndex, value in enumerate(row, start=1): + colRef = self._xlsx_column_name(colIndex) + cells.append(f'{intern(value)}') + sheet_rows.append(f'{"".join(cells)}') + return ( + '' + '' + f'{"".join(sheet_rows)}' + '' + ) + + def _xlsx_column_name(self, columnIndex: int) -> str: + result = "" + current = columnIndex + while current > 0: + current, remainder = divmod(current - 1, 26) + result = chr(65 + remainder) + result + return result + + def _xlsx_core_props(self) -> str: + now = datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%SZ") + return ( + '' + '' + 'LeAudit' + 'LeAudit' + f'{now}' + f'{now}' + '' + ) + + def _xlsx_app_props(self, sheetNames: list[str]) -> str: + parts = "".join(f"{escape(name)}" for name in sheetNames) + return ( + '' + '' + 'LeAudit' + f'{parts}' + '' + ) + + async def _load_table_columns(self, session, tableName: str) -> set[str]: + rows = ( + await session.execute( + text( + """ + SELECT column_name + FROM information_schema.columns + WHERE table_schema = current_schema() + AND table_name = :table_name + """ + ), + {"table_name": tableName}, + ) + ).mappings().all() + return {str(row["column_name"]) for row in rows if row.get("column_name")}