361 lines
9.9 KiB
TypeScript
361 lines
9.9 KiB
TypeScript
/**
|
|
* SSE (Server-Sent Events) 流处理模块
|
|
*
|
|
* 负责处理 Dify API 的流式响应,包括:
|
|
* - 解析 SSE 数据流
|
|
* - 处理各种事件类型(消息、思考、文件、工作流等)
|
|
* - 错误处理和状态管理
|
|
*
|
|
* @module api/dify/sse-handler
|
|
*/
|
|
|
|
import type {
|
|
OnData,
|
|
OnCompleted,
|
|
OnThought,
|
|
OnFile,
|
|
OnMessageEnd,
|
|
OnMessageReplace,
|
|
OnError,
|
|
OnWorkflowStarted,
|
|
OnWorkflowFinished,
|
|
OnNodeStarted,
|
|
OnNodeFinished,
|
|
SSECallbacks,
|
|
ThoughtItem,
|
|
VisionFile,
|
|
MessageEnd,
|
|
MessageReplace,
|
|
WorkflowStartedResponse,
|
|
WorkflowFinishedResponse,
|
|
NodeStartedResponse,
|
|
NodeFinishedResponse,
|
|
} from './types';
|
|
import { unicodeToChar } from '~/utils/chat-utils';
|
|
|
|
/**
|
|
* SSE 事件类型枚举
|
|
*/
|
|
export enum SSEEventType {
|
|
Message = 'message',
|
|
AgentMessage = 'agent_message',
|
|
AgentThought = 'agent_thought',
|
|
MessageFile = 'message_file',
|
|
MessageEnd = 'message_end',
|
|
MessageReplace = 'message_replace',
|
|
WorkflowStarted = 'workflow_started',
|
|
WorkflowFinished = 'workflow_finished',
|
|
NodeStarted = 'node_started',
|
|
NodeFinished = 'node_finished',
|
|
Error = 'error',
|
|
}
|
|
|
|
/**
|
|
* SSE 流处理器类
|
|
*
|
|
* 封装 SSE 流的读取和解析逻辑,提供清晰的事件分发机制
|
|
*/
|
|
export class SSEStreamHandler {
|
|
private reader: ReadableStreamDefaultReader<Uint8Array> | null = null;
|
|
private decoder = new TextDecoder('utf-8');
|
|
private buffer = '';
|
|
private isFirstMessage = true;
|
|
private callbacks: SSECallbacks;
|
|
private aborted = false;
|
|
|
|
constructor(callbacks: SSECallbacks) {
|
|
this.callbacks = callbacks;
|
|
}
|
|
|
|
/**
|
|
* 开始处理 SSE 流
|
|
* @param response - fetch API 返回的 Response 对象
|
|
*/
|
|
async handleStream(response: Response): Promise<void> {
|
|
if (!response.ok) {
|
|
console.error('❌ [SSEHandler] 响应错误:', response.status, response.statusText);
|
|
this.callbacks.onError?.('网络响应错误');
|
|
throw new Error('网络响应错误');
|
|
}
|
|
|
|
this.reader = response.body?.getReader() ?? null;
|
|
if (!this.reader) {
|
|
this.callbacks.onError?.('无法读取响应流');
|
|
throw new Error('无法读取响应流');
|
|
}
|
|
|
|
await this.read();
|
|
}
|
|
|
|
/**
|
|
* 中止流处理
|
|
*/
|
|
abort(): void {
|
|
this.aborted = true;
|
|
this.reader?.cancel();
|
|
}
|
|
|
|
/**
|
|
* 递归读取流数据
|
|
*/
|
|
private async read(): Promise<void> {
|
|
if (this.aborted || !this.reader) {
|
|
return;
|
|
}
|
|
|
|
try {
|
|
const result = await this.reader.read();
|
|
|
|
if (result.done) {
|
|
this.callbacks.onCompleted?.();
|
|
return;
|
|
}
|
|
|
|
const chunk = this.decoder.decode(result.value, { stream: true });
|
|
this.buffer += chunk;
|
|
|
|
const hasError = this.processBuffer();
|
|
|
|
if (!hasError && !this.aborted) {
|
|
await this.read();
|
|
}
|
|
} catch (err: any) {
|
|
if (!this.aborted) {
|
|
console.error('❌ [SSEHandler] 读取流时出错:', err);
|
|
this.callbacks.onError?.(err.message);
|
|
}
|
|
}
|
|
}
|
|
|
|
/**
|
|
* 处理缓冲区中的数据
|
|
* @returns 是否发生错误
|
|
*/
|
|
private processBuffer(): boolean {
|
|
const lines = this.buffer.split('\n');
|
|
let hasError = false;
|
|
|
|
try {
|
|
for (const line of lines) {
|
|
if (line.startsWith('data: ')) {
|
|
const jsonStr = line.substring(6);
|
|
hasError = this.processEventData(jsonStr);
|
|
if (hasError) break;
|
|
}
|
|
}
|
|
|
|
// 保留最后一行(可能是不完整的消息)
|
|
this.buffer = lines[lines.length - 1];
|
|
} catch (err) {
|
|
console.error('❌ [SSEHandler] 解析响应时出错:', err);
|
|
this.callbacks.onData?.('', false, {
|
|
conversationId: undefined,
|
|
messageId: '',
|
|
errorMessage: `${err}`,
|
|
});
|
|
hasError = true;
|
|
this.callbacks.onCompleted?.(true);
|
|
}
|
|
|
|
return hasError;
|
|
}
|
|
|
|
/**
|
|
* 处理单个事件数据
|
|
* @param jsonStr - JSON 字符串
|
|
* @returns 是否发生错误
|
|
*/
|
|
private processEventData(jsonStr: string): boolean {
|
|
let eventData: Record<string, any>;
|
|
|
|
try {
|
|
eventData = JSON.parse(jsonStr);
|
|
} catch (e) {
|
|
console.warn('⚠️ [SSEHandler] JSON解析失败:', e);
|
|
// 处理消息截断情况
|
|
this.callbacks.onData?.('', this.isFirstMessage, {
|
|
conversationId: undefined,
|
|
messageId: '',
|
|
});
|
|
return false;
|
|
}
|
|
|
|
// 检查错误响应
|
|
if (eventData.status === 400 || !eventData.event) {
|
|
console.error('❌ [SSEHandler] 错误响应:', {
|
|
status: eventData.status,
|
|
event: eventData.event,
|
|
message: eventData.message,
|
|
code: eventData.code
|
|
});
|
|
this.callbacks.onData?.('', false, {
|
|
conversationId: undefined,
|
|
messageId: '',
|
|
errorMessage: eventData.message,
|
|
errorCode: eventData.code,
|
|
});
|
|
this.callbacks.onCompleted?.(true);
|
|
return true;
|
|
}
|
|
|
|
// 分发事件
|
|
this.dispatchEvent(eventData);
|
|
return false;
|
|
}
|
|
|
|
/**
|
|
* 根据事件类型分发到对应的回调
|
|
* @param eventData - 事件数据
|
|
*/
|
|
private dispatchEvent(eventData: Record<string, any>): void {
|
|
const { event } = eventData;
|
|
|
|
switch (event) {
|
|
case SSEEventType.Message:
|
|
case SSEEventType.AgentMessage:
|
|
this.handleMessageEvent(eventData);
|
|
break;
|
|
|
|
case SSEEventType.AgentThought:
|
|
this.callbacks.onThought?.(eventData as ThoughtItem);
|
|
break;
|
|
|
|
case SSEEventType.MessageFile:
|
|
this.callbacks.onFile?.(eventData as VisionFile);
|
|
break;
|
|
|
|
case SSEEventType.MessageEnd:
|
|
this.callbacks.onMessageEnd?.(eventData as MessageEnd);
|
|
break;
|
|
|
|
case SSEEventType.MessageReplace:
|
|
this.callbacks.onMessageReplace?.(eventData as MessageReplace);
|
|
break;
|
|
|
|
case SSEEventType.WorkflowStarted:
|
|
this.callbacks.onWorkflowStarted?.(eventData as WorkflowStartedResponse);
|
|
break;
|
|
|
|
case SSEEventType.WorkflowFinished:
|
|
this.callbacks.onWorkflowFinished?.(eventData as WorkflowFinishedResponse);
|
|
break;
|
|
|
|
case SSEEventType.NodeStarted:
|
|
this.callbacks.onNodeStarted?.(eventData as NodeStartedResponse);
|
|
break;
|
|
|
|
case SSEEventType.NodeFinished:
|
|
this.callbacks.onNodeFinished?.(eventData as NodeFinishedResponse);
|
|
break;
|
|
|
|
default:
|
|
// 未知事件类型,忽略
|
|
break;
|
|
}
|
|
}
|
|
|
|
/**
|
|
* 处理消息事件
|
|
* @param eventData - 事件数据
|
|
*/
|
|
private handleMessageEvent(eventData: Record<string, any>): void {
|
|
const answer = unicodeToChar(eventData.answer);
|
|
|
|
this.callbacks.onData?.(answer, this.isFirstMessage, {
|
|
conversationId: eventData.conversation_id,
|
|
messageId: eventData.id || eventData.message_id,
|
|
taskId: eventData.task_id,
|
|
});
|
|
|
|
this.isFirstMessage = false;
|
|
}
|
|
}
|
|
|
|
/**
|
|
* 处理 SSE 流式响应(函数式 API)
|
|
*
|
|
* 这是核心的流式响应处理函数,负责:
|
|
* - 解析 SSE 数据流
|
|
* - 处理各种事件类型(消息、思考、文件、工作流等)
|
|
* - 错误处理和状态管理
|
|
*
|
|
* @param response - fetch API 返回的 Response 对象
|
|
* @param callbacks - SSE 回调配置
|
|
* @returns SSEStreamHandler 实例(可用于中止流)
|
|
*
|
|
* @example
|
|
* ```typescript
|
|
* const handler = handleSSEStream(response, {
|
|
* onData: (message, isFirst, info) => console.log('收到消息:', message),
|
|
* onCompleted: () => console.log('流式响应完成'),
|
|
* onThought: (thought) => console.log('AI思考:', thought),
|
|
* onError: (error) => console.error('错误:', error),
|
|
* });
|
|
*
|
|
* // 需要时可以中止流
|
|
* handler.abort();
|
|
* ```
|
|
*/
|
|
export function handleSSEStream(
|
|
response: Response,
|
|
callbacks: SSECallbacks
|
|
): SSEStreamHandler {
|
|
const handler = new SSEStreamHandler(callbacks);
|
|
handler.handleStream(response);
|
|
return handler;
|
|
}
|
|
|
|
/**
|
|
* 创建 SSE 回调配置的工厂函数
|
|
*
|
|
* 提供类型安全的回调配置创建方式
|
|
*
|
|
* @param callbacks - 部分回调配置
|
|
* @returns 完整的 SSE 回调配置
|
|
*
|
|
* @example
|
|
* ```typescript
|
|
* const callbacks = createSSECallbacks({
|
|
* onData: (message) => updateUI(message),
|
|
* onCompleted: () => setLoading(false),
|
|
* });
|
|
* ```
|
|
*/
|
|
export function createSSECallbacks(callbacks: Partial<SSECallbacks> & { onData: OnData }): SSECallbacks {
|
|
return callbacks as SSECallbacks;
|
|
}
|
|
|
|
/**
|
|
* 兼容旧版 handleStream 函数的适配器
|
|
*
|
|
* @deprecated 请使用 handleSSEStream 替代
|
|
*/
|
|
export const handleStream = (
|
|
response: Response,
|
|
onData: OnData,
|
|
onCompleted?: OnCompleted,
|
|
onThought?: OnThought,
|
|
onMessageEnd?: OnMessageEnd,
|
|
onMessageReplace?: OnMessageReplace,
|
|
onFile?: OnFile,
|
|
onWorkflowStarted?: OnWorkflowStarted,
|
|
onWorkflowFinished?: OnWorkflowFinished,
|
|
onNodeStarted?: OnNodeStarted,
|
|
onNodeFinished?: OnNodeFinished,
|
|
onError?: OnError,
|
|
): void => {
|
|
handleSSEStream(response, {
|
|
onData,
|
|
onCompleted,
|
|
onThought,
|
|
onFile,
|
|
onMessageEnd,
|
|
onMessageReplace,
|
|
onError,
|
|
onWorkflowStarted,
|
|
onWorkflowFinished,
|
|
onNodeStarted,
|
|
onNodeFinished,
|
|
});
|
|
};
|