/** * SSE (Server-Sent Events) 流处理模块 * * 负责处理 Dify API 的流式响应,包括: * - 解析 SSE 数据流 * - 处理各种事件类型(消息、思考、文件、工作流等) * - 错误处理和状态管理 * * @module api/dify/sse-handler */ import type { OnData, OnCompleted, OnThought, OnFile, OnMessageEnd, OnMessageReplace, OnError, OnWorkflowStarted, OnWorkflowFinished, OnNodeStarted, OnNodeFinished, SSECallbacks, ThoughtItem, VisionFile, MessageEnd, MessageReplace, WorkflowStartedResponse, WorkflowFinishedResponse, NodeStartedResponse, NodeFinishedResponse, } from './types'; import { unicodeToChar } from '~/utils/chat-utils'; /** * SSE 事件类型枚举 */ export enum SSEEventType { Message = 'message', AgentMessage = 'agent_message', AgentThought = 'agent_thought', MessageFile = 'message_file', MessageEnd = 'message_end', MessageReplace = 'message_replace', WorkflowStarted = 'workflow_started', WorkflowFinished = 'workflow_finished', NodeStarted = 'node_started', NodeFinished = 'node_finished', Error = 'error', } /** * SSE 流处理器类 * * 封装 SSE 流的读取和解析逻辑,提供清晰的事件分发机制 */ export class SSEStreamHandler { private reader: ReadableStreamDefaultReader | null = null; private decoder = new TextDecoder('utf-8'); private buffer = ''; private isFirstMessage = true; private callbacks: SSECallbacks; private aborted = false; constructor(callbacks: SSECallbacks) { this.callbacks = callbacks; } /** * 开始处理 SSE 流 * @param response - fetch API 返回的 Response 对象 */ async handleStream(response: Response): Promise { if (!response.ok) { console.error('❌ [SSEHandler] 响应错误:', response.status, response.statusText); this.callbacks.onError?.('网络响应错误'); throw new Error('网络响应错误'); } this.reader = response.body?.getReader() ?? null; if (!this.reader) { this.callbacks.onError?.('无法读取响应流'); throw new Error('无法读取响应流'); } await this.read(); } /** * 中止流处理 */ abort(): void { this.aborted = true; this.reader?.cancel(); } /** * 递归读取流数据 */ private async read(): Promise { if (this.aborted || !this.reader) { return; } try { const result = await this.reader.read(); if (result.done) { this.callbacks.onCompleted?.(); return; } const chunk = this.decoder.decode(result.value, { stream: true }); this.buffer += chunk; const hasError = this.processBuffer(); if (!hasError && !this.aborted) { await this.read(); } } catch (err: any) { if (!this.aborted) { console.error('❌ [SSEHandler] 读取流时出错:', err); this.callbacks.onError?.(err.message); } } } /** * 处理缓冲区中的数据 * @returns 是否发生错误 */ private processBuffer(): boolean { const lines = this.buffer.split('\n'); let hasError = false; try { for (const line of lines) { if (line.startsWith('data: ')) { const jsonStr = line.substring(6); hasError = this.processEventData(jsonStr); if (hasError) break; } } // 保留最后一行(可能是不完整的消息) this.buffer = lines[lines.length - 1]; } catch (err) { console.error('❌ [SSEHandler] 解析响应时出错:', err); this.callbacks.onData?.('', false, { conversationId: undefined, messageId: '', errorMessage: `${err}`, }); hasError = true; this.callbacks.onCompleted?.(true); } return hasError; } /** * 处理单个事件数据 * @param jsonStr - JSON 字符串 * @returns 是否发生错误 */ private processEventData(jsonStr: string): boolean { let eventData: Record; try { eventData = JSON.parse(jsonStr); } catch (e) { console.warn('⚠️ [SSEHandler] JSON解析失败:', e); // 处理消息截断情况 this.callbacks.onData?.('', this.isFirstMessage, { conversationId: undefined, messageId: '', }); return false; } // 检查错误响应 if (eventData.status === 400 || !eventData.event) { console.error('❌ [SSEHandler] 错误响应:', { status: eventData.status, event: eventData.event, message: eventData.message, code: eventData.code }); this.callbacks.onData?.('', false, { conversationId: undefined, messageId: '', errorMessage: eventData.message, errorCode: eventData.code, }); this.callbacks.onCompleted?.(true); return true; } // 分发事件 this.dispatchEvent(eventData); return false; } /** * 根据事件类型分发到对应的回调 * @param eventData - 事件数据 */ private dispatchEvent(eventData: Record): void { const { event } = eventData; switch (event) { case SSEEventType.Message: case SSEEventType.AgentMessage: this.handleMessageEvent(eventData); break; case SSEEventType.AgentThought: this.callbacks.onThought?.(eventData as ThoughtItem); break; case SSEEventType.MessageFile: this.callbacks.onFile?.(eventData as VisionFile); break; case SSEEventType.MessageEnd: this.callbacks.onMessageEnd?.(eventData as MessageEnd); break; case SSEEventType.MessageReplace: this.callbacks.onMessageReplace?.(eventData as MessageReplace); break; case SSEEventType.WorkflowStarted: this.callbacks.onWorkflowStarted?.(eventData as WorkflowStartedResponse); break; case SSEEventType.WorkflowFinished: this.callbacks.onWorkflowFinished?.(eventData as WorkflowFinishedResponse); break; case SSEEventType.NodeStarted: this.callbacks.onNodeStarted?.(eventData as NodeStartedResponse); break; case SSEEventType.NodeFinished: this.callbacks.onNodeFinished?.(eventData as NodeFinishedResponse); break; default: // 未知事件类型,忽略 break; } } /** * 处理消息事件 * @param eventData - 事件数据 */ private handleMessageEvent(eventData: Record): void { const answer = unicodeToChar(eventData.answer); this.callbacks.onData?.(answer, this.isFirstMessage, { conversationId: eventData.conversation_id, messageId: eventData.id || eventData.message_id, taskId: eventData.task_id, }); this.isFirstMessage = false; } } /** * 处理 SSE 流式响应(函数式 API) * * 这是核心的流式响应处理函数,负责: * - 解析 SSE 数据流 * - 处理各种事件类型(消息、思考、文件、工作流等) * - 错误处理和状态管理 * * @param response - fetch API 返回的 Response 对象 * @param callbacks - SSE 回调配置 * @returns SSEStreamHandler 实例(可用于中止流) * * @example * ```typescript * const handler = handleSSEStream(response, { * onData: (message, isFirst, info) => console.log('收到消息:', message), * onCompleted: () => console.log('流式响应完成'), * onThought: (thought) => console.log('AI思考:', thought), * onError: (error) => console.error('错误:', error), * }); * * // 需要时可以中止流 * handler.abort(); * ``` */ export function handleSSEStream( response: Response, callbacks: SSECallbacks ): SSEStreamHandler { const handler = new SSEStreamHandler(callbacks); handler.handleStream(response); return handler; } /** * 创建 SSE 回调配置的工厂函数 * * 提供类型安全的回调配置创建方式 * * @param callbacks - 部分回调配置 * @returns 完整的 SSE 回调配置 * * @example * ```typescript * const callbacks = createSSECallbacks({ * onData: (message) => updateUI(message), * onCompleted: () => setLoading(false), * }); * ``` */ export function createSSECallbacks(callbacks: Partial & { onData: OnData }): SSECallbacks { return callbacks as SSECallbacks; } /** * 兼容旧版 handleStream 函数的适配器 * * @deprecated 请使用 handleSSEStream 替代 */ export const handleStream = ( response: Response, onData: OnData, onCompleted?: OnCompleted, onThought?: OnThought, onMessageEnd?: OnMessageEnd, onMessageReplace?: OnMessageReplace, onFile?: OnFile, onWorkflowStarted?: OnWorkflowStarted, onWorkflowFinished?: OnWorkflowFinished, onNodeStarted?: OnNodeStarted, onNodeFinished?: OnNodeFinished, onError?: OnError, ): void => { handleSSEStream(response, { onData, onCompleted, onThought, onFile, onMessageEnd, onMessageReplace, onError, onWorkflowStarted, onWorkflowFinished, onNodeStarted, onNodeFinished, }); };