From 0af41e370c74270b399d38254343469d255cd019 Mon Sep 17 00:00:00 2001 From: wren <“porlong@qq.com”> Date: Fri, 22 May 2026 12:21:43 +0800 Subject: [PATCH] fix: stabilize backend services and frontend pointer --- app.toml | 3 + .../impl/contractTemplateServiceImpl.py | 1 + .../services/impl/pageQualityServiceImpl.py | 68 ++++++++++++++++--- .../services/impl/ragChatServiceImpl.py | 32 ++++++++- .../services/impl/ragDatasetServiceImpl.py | 39 +++++++++-- legal-platform-frontend | 2 +- scripts/start_worker.sh | 10 ++- tests/test_contract_template_search.py | 68 +++++++++++++++++++ tests/test_page_quality_vlm.py | 58 ++++++++++++++++ 9 files changed, 265 insertions(+), 16 deletions(-) create mode 100644 tests/test_contract_template_search.py create mode 100644 tests/test_page_quality_vlm.py diff --git a/app.toml b/app.toml index dfb0181..877bfa3 100644 --- a/app.toml +++ b/app.toml @@ -74,6 +74,9 @@ SIGNATURE_PROBE_RETRY_BACKOFF_BASE_SECONDS = 0.5 WORKER_QUEUE_URGENT = "leaudit.urgent" WORKER_QUEUE_NORMAL = "leaudit.normal" WORKER_CONCURRENCY = 2 +PAGE_QUALITY_ENABLED = true +PAGE_QUALITY_QUEUE_URGENT = "leaudit.page_quality.urgent" +PAGE_QUALITY_QUEUE_NORMAL = "leaudit.page_quality.normal" RUN_LOCK_SECONDS = 1800 TASK_SOFT_TIME_LIMIT = 3300 TASK_TIME_LIMIT = 3600 diff --git a/fastapi_modules/fastapi_leaudit/services/impl/contractTemplateServiceImpl.py b/fastapi_modules/fastapi_leaudit/services/impl/contractTemplateServiceImpl.py index 5384acd..fe02138 100644 --- a/fastapi_modules/fastapi_leaudit/services/impl/contractTemplateServiceImpl.py +++ b/fastapi_modules/fastapi_leaudit/services/impl/contractTemplateServiceImpl.py @@ -508,6 +508,7 @@ class ContractTemplateServiceImpl(IContractTemplateService): ORDER BY c.name ASC """ ) + (sql,) = self._bind_expanding(sql, params) rows = (await session.execute(sql, params)).mappings().all() return [ diff --git a/fastapi_modules/fastapi_leaudit/services/impl/pageQualityServiceImpl.py b/fastapi_modules/fastapi_leaudit/services/impl/pageQualityServiceImpl.py index 5d16679..77e7941 100644 --- a/fastapi_modules/fastapi_leaudit/services/impl/pageQualityServiceImpl.py +++ b/fastapi_modules/fastapi_leaudit/services/impl/pageQualityServiceImpl.py @@ -26,6 +26,18 @@ from fastapi_modules.fastapi_leaudit.services.impl.ossServiceImpl import OssServ logger = logging.getLogger(__name__) +_PAGE_QUALITY_VLM_PROMPT = """ +你是文档扫描图片质量检测员。请判断这 1 页文档图片是否适合继续做 OCR 与合同/公文评查。 + +判定标准: +1. pass:文字主体清晰、方向正常、没有明显截断,能稳定阅读。 +2. review:存在轻微模糊、倾斜、阴影、低对比度、局部遮挡、轻微截断,建议人工确认但仍可能可读。 +3. reject:严重模糊、重影、过曝/过暗、页面大面积缺失、关键文字不可辨认、方向严重错误、空白页或非文档页,建议重拍。 + +只输出 JSON,不要输出 Markdown,不要解释额外文本: +{"status":"pass|review|reject","score":0.0到1.0,"reason":"20字以内中文原因"} +""".strip() + class PageQualityServiceImpl(IPageQualityService): """页级图片质量服务实现。""" @@ -33,6 +45,7 @@ class PageQualityServiceImpl(IPageQualityService): def __init__(self) -> None: self.OssService = OssServiceImpl() self.DocumentService = None + self.VlmClient = None async def DispatchForDocument( self, @@ -282,7 +295,7 @@ class PageQualityServiceImpl(IPageQualityService): reject_pages = 0 async with GetAsyncSession() as session: for page_num, page_image in page_images: - status, score, reason = self._classify_page_image(page_image) + status, score, reason = await self._classify_page_image_by_vlm(page_image) if status == "review": review_pages += 1 elif status == "reject": @@ -466,13 +479,52 @@ class PageQualityServiceImpl(IPageQualityService): finally: doc.close() - def _classify_page_image(self, image_bytes: bytes) -> tuple[str, float, str | None]: - size = len(image_bytes) - if size < 25_000: - return "reject", 0.2, "页面图像内容过少或清晰度较低,建议重拍" - if size < 60_000: - return "review", 0.45, "页面疑似存在模糊,建议人工确认" - return "pass", 0.9, None + async def _classify_page_image_by_vlm(self, image_bytes: bytes) -> tuple[str, float, str | None]: + """使用 VLM 对单页图片做质量判定。VLM 异常不能默认为通过。""" + client = self._vlm_client() + if client is None: + return "review", 0.5, "VLM未配置,需人工确认图片质量" + + try: + result = await client.extract_multifield( + prompt=_PAGE_QUALITY_VLM_PROMPT, + images_data_urls=[self._image_data_url(image_bytes)], + max_tokens=300, + ) + except Exception as exc: + logger.warning("VLM page quality detection failed: %s", exc) + return "review", 0.5, "VLM图片质量检测失败,需人工确认" + + status = str((result or {}).get("status") or "").strip().lower() + if status not in {"pass", "review", "reject"}: + return "review", 0.5, "VLM返回结果不可用,需人工确认" + + score = self._normalize_quality_score((result or {}).get("score"), status) + reason = str((result or {}).get("reason") or "").strip() or None + if status != "pass" and not reason: + reason = "页面图片质量需人工确认" + return status, score, reason + + def _vlm_client(self): + if self.VlmClient is None: + from fastapi_modules.fastapi_leaudit.leaudit_bridge.client_factory import create_vlm_client + + self.VlmClient = create_vlm_client() + return self.VlmClient + + def _image_data_url(self, image_bytes: bytes) -> str: + import base64 + + encoded = base64.b64encode(image_bytes).decode() + return f"data:image/png;base64,{encoded}" + + def _normalize_quality_score(self, raw_score: Any, status: str) -> float: + defaults = {"pass": 0.9, "review": 0.5, "reject": 0.2} + try: + score = float(raw_score) + except (TypeError, ValueError): + return defaults[status] + return max(0.0, min(1.0, score)) def _document_service(self): if self.DocumentService is None: diff --git a/fastapi_modules/fastapi_leaudit/services/impl/ragChatServiceImpl.py b/fastapi_modules/fastapi_leaudit/services/impl/ragChatServiceImpl.py index 2c3fb9c..ff035fb 100644 --- a/fastapi_modules/fastapi_leaudit/services/impl/ragChatServiceImpl.py +++ b/fastapi_modules/fastapi_leaudit/services/impl/ragChatServiceImpl.py @@ -54,6 +54,8 @@ class RagChatServiceImpl(IRagChatService): _task_done: dict[str, bool] = {} _task_locks: dict[str, asyncio.Lock] = {} _title_tasks: dict[str, asyncio.Task] = {} + _chat_schema_checked = False + _chat_schema_lock = asyncio.Lock() def __init__(self) -> None: self.TenantResolver = TenantResolver() @@ -731,8 +733,34 @@ class RagChatServiceImpl(IRagChatService): ) async def _ensure_rag_chat_schema(self, session) -> None: - await session.execute(text("ALTER TABLE rag_chat_app ADD COLUMN IF NOT EXISTS tenant_code VARCHAR(64) NULL")) - await session.execute(text("CREATE INDEX IF NOT EXISTS idx_rag_chat_app_tenant_code ON rag_chat_app(tenant_code) WHERE deleted_at IS NULL")) + if self.__class__._chat_schema_checked: + return + + async with self.__class__._chat_schema_lock: + if self.__class__._chat_schema_checked: + return + + exists = ( + await session.execute( + text( + """ + SELECT 1 + FROM information_schema.columns + WHERE table_schema = current_schema() + AND table_name = 'rag_chat_app' + AND column_name = 'tenant_code' + """ + ) + ) + ).scalar_one_or_none() + if exists: + self.__class__._chat_schema_checked = True + return + + await session.execute(text("SET LOCAL lock_timeout = '1000ms'")) + await session.execute(text("ALTER TABLE rag_chat_app ADD COLUMN tenant_code VARCHAR(64) NULL")) + await session.execute(text("CREATE INDEX IF NOT EXISTS idx_rag_chat_app_tenant_code ON rag_chat_app(tenant_code) WHERE deleted_at IS NULL")) + self.__class__._chat_schema_checked = True @staticmethod def _tenant_context_is_global(tenant_context: dict[str, str | None]) -> bool: diff --git a/fastapi_modules/fastapi_leaudit/services/impl/ragDatasetServiceImpl.py b/fastapi_modules/fastapi_leaudit/services/impl/ragDatasetServiceImpl.py index 243ae42..9210c5d 100644 --- a/fastapi_modules/fastapi_leaudit/services/impl/ragDatasetServiceImpl.py +++ b/fastapi_modules/fastapi_leaudit/services/impl/ragDatasetServiceImpl.py @@ -57,6 +57,8 @@ class RagDatasetServiceImpl(IRagDatasetService): ORDER BY dataset_id, is_default DESC, sort_order ASC, id ASC ) a ON a.dataset_id = d.id """ + _tenant_schema_checked = False + _tenant_schema_lock = asyncio.Lock() def __init__(self) -> None: self.TenantResolver = TenantResolver() @@ -1038,10 +1040,39 @@ class RagDatasetServiceImpl(IRagDatasetService): raise LeauditException(StatusCodeEnum.HTTP_403_FORBIDDEN, "当前用户只能管理本地区知识库") async def _ensure_rag_tenant_schema(self, session) -> None: - await session.execute(text("ALTER TABLE rag_dataset ADD COLUMN IF NOT EXISTS tenant_code VARCHAR(64) NULL")) - await session.execute(text("ALTER TABLE rag_chat_app ADD COLUMN IF NOT EXISTS tenant_code VARCHAR(64) NULL")) - await session.execute(text("CREATE INDEX IF NOT EXISTS idx_rag_dataset_tenant_code ON rag_dataset(tenant_code) WHERE deleted_at IS NULL")) - await session.execute(text("CREATE INDEX IF NOT EXISTS idx_rag_chat_app_tenant_code ON rag_chat_app(tenant_code) WHERE deleted_at IS NULL")) + if self.__class__._tenant_schema_checked: + return + + async with self.__class__._tenant_schema_lock: + if self.__class__._tenant_schema_checked: + return + + columns = ( + await session.execute( + text( + """ + SELECT table_name + FROM information_schema.columns + WHERE table_schema = current_schema() + AND table_name IN ('rag_dataset', 'rag_chat_app') + AND column_name = 'tenant_code' + """ + ) + ) + ).scalars().all() + existing = set(columns) + if existing == {"rag_dataset", "rag_chat_app"}: + self.__class__._tenant_schema_checked = True + return + + await session.execute(text("SET LOCAL lock_timeout = '1000ms'")) + if "rag_dataset" not in existing: + await session.execute(text("ALTER TABLE rag_dataset ADD COLUMN tenant_code VARCHAR(64) NULL")) + if "rag_chat_app" not in existing: + await session.execute(text("ALTER TABLE rag_chat_app ADD COLUMN tenant_code VARCHAR(64) NULL")) + await session.execute(text("CREATE INDEX IF NOT EXISTS idx_rag_dataset_tenant_code ON rag_dataset(tenant_code) WHERE deleted_at IS NULL")) + await session.execute(text("CREATE INDEX IF NOT EXISTS idx_rag_chat_app_tenant_code ON rag_chat_app(tenant_code) WHERE deleted_at IS NULL")) + self.__class__._tenant_schema_checked = True def _dataset_tenant_filter_sql( self, diff --git a/legal-platform-frontend b/legal-platform-frontend index f219811..df04238 160000 --- a/legal-platform-frontend +++ b/legal-platform-frontend @@ -1 +1 @@ -Subproject commit f219811a6e7e13b38813ccf5a3538d968a6e3e9e +Subproject commit df04238bbb869e6cbe4ff3976de28fcaac4a3cd2 diff --git a/scripts/start_worker.sh b/scripts/start_worker.sh index 30bf957..fe52567 100755 --- a/scripts/start_worker.sh +++ b/scripts/start_worker.sh @@ -9,13 +9,21 @@ source .venv/bin/activate eval "$( .venv/bin/python - <<'PY' from fastapi_admin.config import ( + LEAUDIT_PAGE_QUALITY_QUEUE_NORMAL, + LEAUDIT_PAGE_QUALITY_QUEUE_URGENT, LEAUDIT_WORKER_CONCURRENCY, LEAUDIT_WORKER_QUEUE_NORMAL, LEAUDIT_WORKER_QUEUE_URGENT, ) print(f'WORKER_CONCURRENCY={LEAUDIT_WORKER_CONCURRENCY}') -print(f'WORKER_QUEUES={LEAUDIT_WORKER_QUEUE_URGENT},{LEAUDIT_WORKER_QUEUE_NORMAL}') +print( + 'WORKER_QUEUES=' + f'{LEAUDIT_WORKER_QUEUE_URGENT},' + f'{LEAUDIT_WORKER_QUEUE_NORMAL},' + f'{LEAUDIT_PAGE_QUALITY_QUEUE_URGENT},' + f'{LEAUDIT_PAGE_QUALITY_QUEUE_NORMAL}' +) PY )" diff --git a/tests/test_contract_template_search.py b/tests/test_contract_template_search.py new file mode 100644 index 0000000..484f0c6 --- /dev/null +++ b/tests/test_contract_template_search.py @@ -0,0 +1,68 @@ +import asyncio +from unittest.mock import patch + +from fastapi_modules.fastapi_leaudit.services.impl.contractTemplateServiceImpl import ContractTemplateServiceImpl + + +class _EmptyMappingResult: + def mappings(self): + return self + + def all(self): + return [] + + +class _FakeSession: + def __init__(self): + self.executed_sql = None + self.executed_params = None + + async def execute(self, sql, params=None): + self.executed_sql = sql + self.executed_params = params + return _EmptyMappingResult() + + +class _FakeSessionContext: + def __init__(self, session): + self.session = session + + async def __aenter__(self): + return self.session + + async def __aexit__(self, exc_type, exc, tb): + return None + + +def test_contract_template_search_category_stats_binds_expanding_scope_params(): + service = ContractTemplateServiceImpl() + fake_session = _FakeSession() + + async def noop_ensure_schema(session): + return None + + async def fake_user_context(current_user_id, session): + return { + "id": current_user_id, + "area": "梅州", + "tenant_code": "MZ", + "tenant_name": "梅州", + "tenant_scope_value": "梅州", + "is_global": False, + "can_manage": True, + "is_area_admin": True, + } + + service._ensureContractTemplateSchema = noop_ensure_schema + service._getCurrentUserContext = fake_user_context + + with patch( + "fastapi_modules.fastapi_leaudit.services.impl.contractTemplateServiceImpl.GetAsyncSession", + return_value=_FakeSessionContext(fake_session), + ): + asyncio.run(service._load_search_category_stats("买卖", None, None, 5)) + + assert fake_session.executed_sql._bindparams["visible_tenant_codes"].expanding is True + assert fake_session.executed_sql._bindparams["visible_regions"].expanding is True + assert fake_session.executed_params["visible_tenant_codes"] == ["PROVINCIAL", "PUBLIC", "MZ"] + assert fake_session.executed_params["visible_regions"] == ["省级", "公共", "梅州"] diff --git a/tests/test_page_quality_vlm.py b/tests/test_page_quality_vlm.py new file mode 100644 index 0000000..7e8db82 --- /dev/null +++ b/tests/test_page_quality_vlm.py @@ -0,0 +1,58 @@ +import pytest + +from fastapi_modules.fastapi_leaudit.services.impl.pageQualityServiceImpl import PageQualityServiceImpl + + +class _FakeVlmClient: + def __init__(self, response): + self.response = response + self.prompts = [] + + async def extract_multifield(self, *, prompt, images_data_urls, max_tokens=800): + self.prompts.append((prompt, images_data_urls, max_tokens)) + if isinstance(self.response, Exception): + raise self.response + return self.response + + +@pytest.mark.asyncio +async def test_vlm_page_quality_reject_result_is_used(): + service = PageQualityServiceImpl() + service.VlmClient = _FakeVlmClient( + { + "status": "reject", + "score": 0.18, + "reason": "页面文字严重模糊,无法稳定辨认关键内容", + } + ) + + status, score, reason = await service._classify_page_image_by_vlm(b"image-bytes") + + assert status == "reject" + assert score == 0.18 + assert "严重模糊" in reason + assert "只输出 JSON" in service.VlmClient.prompts[0][0] + + +@pytest.mark.asyncio +async def test_vlm_page_quality_invalid_result_falls_back_to_review_not_pass(): + service = PageQualityServiceImpl() + service.VlmClient = _FakeVlmClient({"status": "unknown", "reason": ""}) + + status, score, reason = await service._classify_page_image_by_vlm(b"image-bytes") + + assert status == "review" + assert score == 0.5 + assert "VLM返回结果不可用" in reason + + +@pytest.mark.asyncio +async def test_vlm_page_quality_error_falls_back_to_review_not_pass(): + service = PageQualityServiceImpl() + service.VlmClient = _FakeVlmClient(RuntimeError("vlm down")) + + status, score, reason = await service._classify_page_image_by_vlm(b"image-bytes") + + assert status == "review" + assert score == 0.5 + assert "VLM图片质量检测失败" in reason -- 2.52.0