from __future__ import annotations import asyncio from datetime import datetime, timedelta, timezone import pytest from fastapi_common.fastapi_common_web.domain.responses import StatusCodeEnum from fastapi_common.fastapi_common_web.exception.LeauditException import LeauditException from fastapi_modules.fastapi_leaudit.services.impl.ragChatAttachmentServiceImpl import RagChatAttachmentServiceImpl def _service() -> RagChatAttachmentServiceImpl: return RagChatAttachmentServiceImpl(chroma_client=None, embed_texts=lambda texts, model_name="": [[0.1] for _ in texts]) def test_default_expiry_is_seven_days_from_now(): service = _service() now = datetime(2026, 5, 25, 12, 0, 0, tzinfo=timezone.utc) expires_at = service._default_expires_at(now) assert expires_at == now + timedelta(days=7) def test_collection_name_contains_isolation_components(): service = _service() collection_name = service.BuildCollectionName( TenantCode="gd-tobacco", UserId=42, ConversationId="conversation-abc-123", AttachmentId="attach-xyz", ) assert collection_name.startswith("chat_attachment_gd_tobacco_42_") assert collection_name.endswith("_attach_xyz") assert "conversation-abc-123" not in collection_name assert len(collection_name) <= 120 def test_validate_attachment_scope_rejects_other_user_and_conversation(): service = _service() record = { "attachment_id": "att-1", "tenant_code": "tenant-a", "user_id": 100, "conversation_id": "conv-a", "indexing_status": "completed", "expires_at": datetime.now(timezone.utc) + timedelta(days=1), "deleted_at": None, } with pytest.raises(LeauditException) as user_exc: service._assert_attachment_scope( record, CurrentUserId=101, TenantCode="tenant-a", ConversationId="conv-a", RequireCompleted=True, Now=datetime.now(timezone.utc), ) assert user_exc.value.status == StatusCodeEnum.HTTP_403_FORBIDDEN with pytest.raises(LeauditException) as conversation_exc: service._assert_attachment_scope( record, CurrentUserId=100, TenantCode="tenant-a", ConversationId="conv-b", RequireCompleted=True, Now=datetime.now(timezone.utc), ) assert conversation_exc.value.status == StatusCodeEnum.HTTP_403_FORBIDDEN def test_get_attachment_requires_request_conversation_when_provided(): service = _service() record = { "attachment_id": "att-1", "tenant_code": "tenant-a", "user_id": 100, "conversation_id": "conv-a", "indexing_status": "completed", "expires_at": datetime.now(timezone.utc) + timedelta(days=1), "deleted_at": None, } with pytest.raises(LeauditException) as exc: service._assert_attachment_scope( record, CurrentUserId=100, TenantCode="tenant-a", ConversationId="conv-b", RequireCompleted=False, Now=datetime.now(timezone.utc), ) assert exc.value.status == StatusCodeEnum.HTTP_403_FORBIDDEN assert "当前会话" in exc.value.message def test_get_attachment_rejects_same_user_attachment_from_another_conversation(monkeypatch): service = _service() record = { "attachment_id": "att-1", "tenant_code": "tenant-a", "user_id": 100, "conversation_id": "conv-a", "indexing_status": "completed", "expires_at": datetime.now(timezone.utc) + timedelta(days=1), "deleted_at": None, } async def fake_get_attachment_record(_attachment_id): return record monkeypatch.setattr(service, "_get_attachment_record", fake_get_attachment_record) with pytest.raises(LeauditException) as exc: asyncio.run( service.GetAttachment( CurrentUserId=100, UserArea=None, UserRole=None, TenantCode="tenant-a", TenantName=None, ConversationId="conv-b", AttachmentId="att-1", ) ) assert exc.value.status == StatusCodeEnum.HTTP_403_FORBIDDEN def test_delete_attachment_rejects_same_user_attachment_from_another_conversation(monkeypatch): service = _service() record = { "attachment_id": "att-1", "tenant_code": "tenant-a", "user_id": 100, "conversation_id": "conv-a", "indexing_status": "completed", "expires_at": datetime.now(timezone.utc) + timedelta(days=1), "deleted_at": None, } async def fake_get_attachment_record(_attachment_id): return record monkeypatch.setattr(service, "_get_attachment_record", fake_get_attachment_record) with pytest.raises(LeauditException) as exc: asyncio.run( service.DeleteAttachment( CurrentUserId=100, UserArea=None, UserRole=None, TenantCode="tenant-a", TenantName=None, ConversationId="conv-b", AttachmentId="att-1", ) ) assert exc.value.status == StatusCodeEnum.HTTP_403_FORBIDDEN def test_validate_attachment_scope_rejects_other_tenant(): service = _service() record = { "attachment_id": "att-1", "tenant_code": "tenant-a", "user_id": 100, "conversation_id": "conv-a", "indexing_status": "completed", "expires_at": datetime.now(timezone.utc) + timedelta(days=1), "deleted_at": None, } with pytest.raises(LeauditException) as exc: service._assert_attachment_scope( record, CurrentUserId=100, TenantCode="tenant-b", ConversationId="conv-a", RequireCompleted=True, Now=datetime.now(timezone.utc), ) assert exc.value.status == StatusCodeEnum.HTTP_403_FORBIDDEN def test_validate_attachment_scope_rejects_expired_or_incomplete_attachment(): service = _service() expired = { "attachment_id": "att-1", "tenant_code": "tenant-a", "user_id": 100, "conversation_id": "conv-a", "indexing_status": "completed", "expires_at": datetime.now(timezone.utc) - timedelta(seconds=1), "deleted_at": None, } waiting = { **expired, "indexing_status": "indexing", "expires_at": datetime.now(timezone.utc) + timedelta(days=1), } with pytest.raises(LeauditException) as expired_exc: service._assert_attachment_scope( expired, CurrentUserId=100, TenantCode="tenant-a", ConversationId="conv-a", RequireCompleted=True, Now=datetime.now(timezone.utc), ) assert expired_exc.value.status == StatusCodeEnum.HTTP_400_BAD_REQUEST assert "已过期" in expired_exc.value.message with pytest.raises(LeauditException) as waiting_exc: service._assert_attachment_scope( waiting, CurrentUserId=100, TenantCode="tenant-a", ConversationId="conv-a", RequireCompleted=True, Now=datetime.now(timezone.utc), ) assert waiting_exc.value.status == StatusCodeEnum.HTTP_400_BAD_REQUEST assert "尚未完成" in waiting_exc.value.message def test_build_chunks_includes_isolation_metadata(): service = _service() chunks = service.BuildChunks( AttachmentId="att-1", TenantCode="tenant-a", UserId=100, ConversationId="conv-a", FileName="处罚材料.docx", PageTexts=[(1, "第一段违法事实。\n\n第二段处罚线索。")], ChunkMaxSize=20, ChunkOverlap=0, ) assert chunks metadata = chunks[0]["metadata"] assert metadata["tenant_code"] == "tenant-a" assert metadata["user_id"] == "100" assert metadata["conversation_id"] == "conv-a" assert metadata["attachment_id"] == "att-1" assert metadata["source_scope"] == "chat_attachment" assert metadata["document_name"] == "处罚材料.docx" assert metadata["page"] == 1 def test_resolve_active_attachment_id_uses_user_conversation_tenant_and_completed_state(monkeypatch): service = _service() captured_sql = {} captured_params = {} class FakeResult: def mappings(self): return self def first(self): return {"attachment_id": "att-active"} def all(self): return [{"attachment_id": "att-active"}] class FakeSession: async def __aenter__(self): return self async def __aexit__(self, exc_type, exc, tb): return None async def execute(self, statement, params=None): captured_sql["value"] = str(statement) captured_params.update(params or {}) return FakeResult() class FakeSessionFactory: def __call__(self): return FakeSession() service.__class__._attachment_schema_checked = True monkeypatch.setattr( "fastapi_modules.fastapi_leaudit.services.impl.ragChatAttachmentServiceImpl.GetAsyncSession", FakeSessionFactory(), ) attachment_id = asyncio.run( service.ResolveActiveAttachmentIdForConversation( CurrentUserId=100, TenantCode="tenant-a", UserArea="云浮", ConversationId="conv-a", ) ) assert attachment_id == "att-active" assert "user_id = :user_id" in captured_sql["value"] assert "conversation_id = :conversation_id" in captured_sql["value"] assert "indexing_status = 'completed'" in captured_sql["value"] assert "expires_at > NOW()" in captured_sql["value"] assert "deleted_at IS NULL" in captured_sql["value"] assert captured_params == { "user_id": 100, "conversation_id": "conv-a", "tenant_code": "tenant-a", "user_area": "云浮", } def test_resolve_active_attachment_ids_returns_all_completed_conversation_attachments(monkeypatch): service = _service() captured_sql = {} captured_params = {} class FakeResult: def mappings(self): return self def all(self): return [{"attachment_id": "att-1"}, {"attachment_id": "att-2"}] class FakeSession: async def __aenter__(self): return self async def __aexit__(self, exc_type, exc, tb): return None async def execute(self, statement, params=None): captured_sql["value"] = str(statement) captured_params.update(params or {}) return FakeResult() class FakeSessionFactory: def __call__(self): return FakeSession() service.__class__._attachment_schema_checked = True monkeypatch.setattr( "fastapi_modules.fastapi_leaudit.services.impl.ragChatAttachmentServiceImpl.GetAsyncSession", FakeSessionFactory(), ) attachment_ids = asyncio.run( service.ResolveActiveAttachmentIdsForConversation( CurrentUserId=100, TenantCode="tenant-a", UserArea="云浮", ConversationId="conv-a", ) ) assert attachment_ids == ["att-1", "att-2"] assert "ORDER BY indexing_completed_at DESC NULLS LAST, created_at DESC" in captured_sql["value"] assert "LIMIT 1" not in captured_sql["value"] assert captured_params == { "user_id": 100, "conversation_id": "conv-a", "tenant_code": "tenant-a", "user_area": "云浮", }