Files
2025-11-30 19:27:01 +08:00

361 lines
9.9 KiB
TypeScript

/**
* SSE (Server-Sent Events) 流处理模块
*
* 负责处理 Dify API 的流式响应,包括:
* - 解析 SSE 数据流
* - 处理各种事件类型(消息、思考、文件、工作流等)
* - 错误处理和状态管理
*
* @module api/dify/sse-handler
*/
import type {
OnData,
OnCompleted,
OnThought,
OnFile,
OnMessageEnd,
OnMessageReplace,
OnError,
OnWorkflowStarted,
OnWorkflowFinished,
OnNodeStarted,
OnNodeFinished,
SSECallbacks,
ThoughtItem,
VisionFile,
MessageEnd,
MessageReplace,
WorkflowStartedResponse,
WorkflowFinishedResponse,
NodeStartedResponse,
NodeFinishedResponse,
} from './types';
import { unicodeToChar } from '~/utils/chat-utils';
/**
* SSE 事件类型枚举
*/
export enum SSEEventType {
Message = 'message',
AgentMessage = 'agent_message',
AgentThought = 'agent_thought',
MessageFile = 'message_file',
MessageEnd = 'message_end',
MessageReplace = 'message_replace',
WorkflowStarted = 'workflow_started',
WorkflowFinished = 'workflow_finished',
NodeStarted = 'node_started',
NodeFinished = 'node_finished',
Error = 'error',
}
/**
* SSE 流处理器类
*
* 封装 SSE 流的读取和解析逻辑,提供清晰的事件分发机制
*/
export class SSEStreamHandler {
private reader: ReadableStreamDefaultReader<Uint8Array> | null = null;
private decoder = new TextDecoder('utf-8');
private buffer = '';
private isFirstMessage = true;
private callbacks: SSECallbacks;
private aborted = false;
constructor(callbacks: SSECallbacks) {
this.callbacks = callbacks;
}
/**
* 开始处理 SSE 流
* @param response - fetch API 返回的 Response 对象
*/
async handleStream(response: Response): Promise<void> {
if (!response.ok) {
console.error('❌ [SSEHandler] 响应错误:', response.status, response.statusText);
this.callbacks.onError?.('网络响应错误');
throw new Error('网络响应错误');
}
this.reader = response.body?.getReader() ?? null;
if (!this.reader) {
this.callbacks.onError?.('无法读取响应流');
throw new Error('无法读取响应流');
}
await this.read();
}
/**
* 中止流处理
*/
abort(): void {
this.aborted = true;
this.reader?.cancel();
}
/**
* 递归读取流数据
*/
private async read(): Promise<void> {
if (this.aborted || !this.reader) {
return;
}
try {
const result = await this.reader.read();
if (result.done) {
this.callbacks.onCompleted?.();
return;
}
const chunk = this.decoder.decode(result.value, { stream: true });
this.buffer += chunk;
const hasError = this.processBuffer();
if (!hasError && !this.aborted) {
await this.read();
}
} catch (err: any) {
if (!this.aborted) {
console.error('❌ [SSEHandler] 读取流时出错:', err);
this.callbacks.onError?.(err.message);
}
}
}
/**
* 处理缓冲区中的数据
* @returns 是否发生错误
*/
private processBuffer(): boolean {
const lines = this.buffer.split('\n');
let hasError = false;
try {
for (const line of lines) {
if (line.startsWith('data: ')) {
const jsonStr = line.substring(6);
hasError = this.processEventData(jsonStr);
if (hasError) break;
}
}
// 保留最后一行(可能是不完整的消息)
this.buffer = lines[lines.length - 1];
} catch (err) {
console.error('❌ [SSEHandler] 解析响应时出错:', err);
this.callbacks.onData?.('', false, {
conversationId: undefined,
messageId: '',
errorMessage: `${err}`,
});
hasError = true;
this.callbacks.onCompleted?.(true);
}
return hasError;
}
/**
* 处理单个事件数据
* @param jsonStr - JSON 字符串
* @returns 是否发生错误
*/
private processEventData(jsonStr: string): boolean {
let eventData: Record<string, any>;
try {
eventData = JSON.parse(jsonStr);
} catch (e) {
console.warn('⚠️ [SSEHandler] JSON解析失败:', e);
// 处理消息截断情况
this.callbacks.onData?.('', this.isFirstMessage, {
conversationId: undefined,
messageId: '',
});
return false;
}
// 检查错误响应
if (eventData.status === 400 || !eventData.event) {
console.error('❌ [SSEHandler] 错误响应:', {
status: eventData.status,
event: eventData.event,
message: eventData.message,
code: eventData.code
});
this.callbacks.onData?.('', false, {
conversationId: undefined,
messageId: '',
errorMessage: eventData.message,
errorCode: eventData.code,
});
this.callbacks.onCompleted?.(true);
return true;
}
// 分发事件
this.dispatchEvent(eventData);
return false;
}
/**
* 根据事件类型分发到对应的回调
* @param eventData - 事件数据
*/
private dispatchEvent(eventData: Record<string, any>): void {
const { event } = eventData;
switch (event) {
case SSEEventType.Message:
case SSEEventType.AgentMessage:
this.handleMessageEvent(eventData);
break;
case SSEEventType.AgentThought:
this.callbacks.onThought?.(eventData as ThoughtItem);
break;
case SSEEventType.MessageFile:
this.callbacks.onFile?.(eventData as VisionFile);
break;
case SSEEventType.MessageEnd:
this.callbacks.onMessageEnd?.(eventData as MessageEnd);
break;
case SSEEventType.MessageReplace:
this.callbacks.onMessageReplace?.(eventData as MessageReplace);
break;
case SSEEventType.WorkflowStarted:
this.callbacks.onWorkflowStarted?.(eventData as WorkflowStartedResponse);
break;
case SSEEventType.WorkflowFinished:
this.callbacks.onWorkflowFinished?.(eventData as WorkflowFinishedResponse);
break;
case SSEEventType.NodeStarted:
this.callbacks.onNodeStarted?.(eventData as NodeStartedResponse);
break;
case SSEEventType.NodeFinished:
this.callbacks.onNodeFinished?.(eventData as NodeFinishedResponse);
break;
default:
// 未知事件类型,忽略
break;
}
}
/**
* 处理消息事件
* @param eventData - 事件数据
*/
private handleMessageEvent(eventData: Record<string, any>): void {
const answer = unicodeToChar(eventData.answer);
this.callbacks.onData?.(answer, this.isFirstMessage, {
conversationId: eventData.conversation_id,
messageId: eventData.id || eventData.message_id,
taskId: eventData.task_id,
});
this.isFirstMessage = false;
}
}
/**
* 处理 SSE 流式响应(函数式 API)
*
* 这是核心的流式响应处理函数,负责:
* - 解析 SSE 数据流
* - 处理各种事件类型(消息、思考、文件、工作流等)
* - 错误处理和状态管理
*
* @param response - fetch API 返回的 Response 对象
* @param callbacks - SSE 回调配置
* @returns SSEStreamHandler 实例(可用于中止流)
*
* @example
* ```typescript
* const handler = handleSSEStream(response, {
* onData: (message, isFirst, info) => console.log('收到消息:', message),
* onCompleted: () => console.log('流式响应完成'),
* onThought: (thought) => console.log('AI思考:', thought),
* onError: (error) => console.error('错误:', error),
* });
*
* // 需要时可以中止流
* handler.abort();
* ```
*/
export function handleSSEStream(
response: Response,
callbacks: SSECallbacks
): SSEStreamHandler {
const handler = new SSEStreamHandler(callbacks);
handler.handleStream(response);
return handler;
}
/**
* 创建 SSE 回调配置的工厂函数
*
* 提供类型安全的回调配置创建方式
*
* @param callbacks - 部分回调配置
* @returns 完整的 SSE 回调配置
*
* @example
* ```typescript
* const callbacks = createSSECallbacks({
* onData: (message) => updateUI(message),
* onCompleted: () => setLoading(false),
* });
* ```
*/
export function createSSECallbacks(callbacks: Partial<SSECallbacks> & { onData: OnData }): SSECallbacks {
return callbacks as SSECallbacks;
}
/**
* 兼容旧版 handleStream 函数的适配器
*
* @deprecated 请使用 handleSSEStream 替代
*/
export const handleStream = (
response: Response,
onData: OnData,
onCompleted?: OnCompleted,
onThought?: OnThought,
onMessageEnd?: OnMessageEnd,
onMessageReplace?: OnMessageReplace,
onFile?: OnFile,
onWorkflowStarted?: OnWorkflowStarted,
onWorkflowFinished?: OnWorkflowFinished,
onNodeStarted?: OnNodeStarted,
onNodeFinished?: OnNodeFinished,
onError?: OnError,
): void => {
handleSSEStream(response, {
onData,
onCompleted,
onThought,
onFile,
onMessageEnd,
onMessageReplace,
onError,
onWorkflowStarted,
onWorkflowFinished,
onNodeStarted,
onNodeFinished,
});
};