feat:重构dify前端组件以及转发逻辑
This commit is contained in:
@@ -0,0 +1,360 @@
|
||||
/**
|
||||
* SSE (Server-Sent Events) 流处理模块
|
||||
*
|
||||
* 负责处理 Dify API 的流式响应,包括:
|
||||
* - 解析 SSE 数据流
|
||||
* - 处理各种事件类型(消息、思考、文件、工作流等)
|
||||
* - 错误处理和状态管理
|
||||
*
|
||||
* @module api/dify/sse-handler
|
||||
*/
|
||||
|
||||
import type {
|
||||
OnData,
|
||||
OnCompleted,
|
||||
OnThought,
|
||||
OnFile,
|
||||
OnMessageEnd,
|
||||
OnMessageReplace,
|
||||
OnError,
|
||||
OnWorkflowStarted,
|
||||
OnWorkflowFinished,
|
||||
OnNodeStarted,
|
||||
OnNodeFinished,
|
||||
SSECallbacks,
|
||||
ThoughtItem,
|
||||
VisionFile,
|
||||
MessageEnd,
|
||||
MessageReplace,
|
||||
WorkflowStartedResponse,
|
||||
WorkflowFinishedResponse,
|
||||
NodeStartedResponse,
|
||||
NodeFinishedResponse,
|
||||
} from './types';
|
||||
import { unicodeToChar } from '~/utils/chat-utils';
|
||||
|
||||
/**
|
||||
* SSE 事件类型枚举
|
||||
*/
|
||||
export enum SSEEventType {
|
||||
Message = 'message',
|
||||
AgentMessage = 'agent_message',
|
||||
AgentThought = 'agent_thought',
|
||||
MessageFile = 'message_file',
|
||||
MessageEnd = 'message_end',
|
||||
MessageReplace = 'message_replace',
|
||||
WorkflowStarted = 'workflow_started',
|
||||
WorkflowFinished = 'workflow_finished',
|
||||
NodeStarted = 'node_started',
|
||||
NodeFinished = 'node_finished',
|
||||
Error = 'error',
|
||||
}
|
||||
|
||||
/**
|
||||
* SSE 流处理器类
|
||||
*
|
||||
* 封装 SSE 流的读取和解析逻辑,提供清晰的事件分发机制
|
||||
*/
|
||||
export class SSEStreamHandler {
|
||||
private reader: ReadableStreamDefaultReader<Uint8Array> | null = null;
|
||||
private decoder = new TextDecoder('utf-8');
|
||||
private buffer = '';
|
||||
private isFirstMessage = true;
|
||||
private callbacks: SSECallbacks;
|
||||
private aborted = false;
|
||||
|
||||
constructor(callbacks: SSECallbacks) {
|
||||
this.callbacks = callbacks;
|
||||
}
|
||||
|
||||
/**
|
||||
* 开始处理 SSE 流
|
||||
* @param response - fetch API 返回的 Response 对象
|
||||
*/
|
||||
async handleStream(response: Response): Promise<void> {
|
||||
if (!response.ok) {
|
||||
console.error('❌ [SSEHandler] 响应错误:', response.status, response.statusText);
|
||||
this.callbacks.onError?.('网络响应错误');
|
||||
throw new Error('网络响应错误');
|
||||
}
|
||||
|
||||
this.reader = response.body?.getReader() ?? null;
|
||||
if (!this.reader) {
|
||||
this.callbacks.onError?.('无法读取响应流');
|
||||
throw new Error('无法读取响应流');
|
||||
}
|
||||
|
||||
await this.read();
|
||||
}
|
||||
|
||||
/**
|
||||
* 中止流处理
|
||||
*/
|
||||
abort(): void {
|
||||
this.aborted = true;
|
||||
this.reader?.cancel();
|
||||
}
|
||||
|
||||
/**
|
||||
* 递归读取流数据
|
||||
*/
|
||||
private async read(): Promise<void> {
|
||||
if (this.aborted || !this.reader) {
|
||||
return;
|
||||
}
|
||||
|
||||
try {
|
||||
const result = await this.reader.read();
|
||||
|
||||
if (result.done) {
|
||||
this.callbacks.onCompleted?.();
|
||||
return;
|
||||
}
|
||||
|
||||
const chunk = this.decoder.decode(result.value, { stream: true });
|
||||
this.buffer += chunk;
|
||||
|
||||
const hasError = this.processBuffer();
|
||||
|
||||
if (!hasError && !this.aborted) {
|
||||
await this.read();
|
||||
}
|
||||
} catch (err: any) {
|
||||
if (!this.aborted) {
|
||||
console.error('❌ [SSEHandler] 读取流时出错:', err);
|
||||
this.callbacks.onError?.(err.message);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 处理缓冲区中的数据
|
||||
* @returns 是否发生错误
|
||||
*/
|
||||
private processBuffer(): boolean {
|
||||
const lines = this.buffer.split('\n');
|
||||
let hasError = false;
|
||||
|
||||
try {
|
||||
for (const line of lines) {
|
||||
if (line.startsWith('data: ')) {
|
||||
const jsonStr = line.substring(6);
|
||||
hasError = this.processEventData(jsonStr);
|
||||
if (hasError) break;
|
||||
}
|
||||
}
|
||||
|
||||
// 保留最后一行(可能是不完整的消息)
|
||||
this.buffer = lines[lines.length - 1];
|
||||
} catch (err) {
|
||||
console.error('❌ [SSEHandler] 解析响应时出错:', err);
|
||||
this.callbacks.onData?.('', false, {
|
||||
conversationId: undefined,
|
||||
messageId: '',
|
||||
errorMessage: `${err}`,
|
||||
});
|
||||
hasError = true;
|
||||
this.callbacks.onCompleted?.(true);
|
||||
}
|
||||
|
||||
return hasError;
|
||||
}
|
||||
|
||||
/**
|
||||
* 处理单个事件数据
|
||||
* @param jsonStr - JSON 字符串
|
||||
* @returns 是否发生错误
|
||||
*/
|
||||
private processEventData(jsonStr: string): boolean {
|
||||
let eventData: Record<string, any>;
|
||||
|
||||
try {
|
||||
eventData = JSON.parse(jsonStr);
|
||||
} catch (e) {
|
||||
console.warn('⚠️ [SSEHandler] JSON解析失败:', e);
|
||||
// 处理消息截断情况
|
||||
this.callbacks.onData?.('', this.isFirstMessage, {
|
||||
conversationId: undefined,
|
||||
messageId: '',
|
||||
});
|
||||
return false;
|
||||
}
|
||||
|
||||
// 检查错误响应
|
||||
if (eventData.status === 400 || !eventData.event) {
|
||||
console.error('❌ [SSEHandler] 错误响应:', {
|
||||
status: eventData.status,
|
||||
event: eventData.event,
|
||||
message: eventData.message,
|
||||
code: eventData.code
|
||||
});
|
||||
this.callbacks.onData?.('', false, {
|
||||
conversationId: undefined,
|
||||
messageId: '',
|
||||
errorMessage: eventData.message,
|
||||
errorCode: eventData.code,
|
||||
});
|
||||
this.callbacks.onCompleted?.(true);
|
||||
return true;
|
||||
}
|
||||
|
||||
// 分发事件
|
||||
this.dispatchEvent(eventData);
|
||||
return false;
|
||||
}
|
||||
|
||||
/**
|
||||
* 根据事件类型分发到对应的回调
|
||||
* @param eventData - 事件数据
|
||||
*/
|
||||
private dispatchEvent(eventData: Record<string, any>): void {
|
||||
const { event } = eventData;
|
||||
|
||||
switch (event) {
|
||||
case SSEEventType.Message:
|
||||
case SSEEventType.AgentMessage:
|
||||
this.handleMessageEvent(eventData);
|
||||
break;
|
||||
|
||||
case SSEEventType.AgentThought:
|
||||
this.callbacks.onThought?.(eventData as ThoughtItem);
|
||||
break;
|
||||
|
||||
case SSEEventType.MessageFile:
|
||||
this.callbacks.onFile?.(eventData as VisionFile);
|
||||
break;
|
||||
|
||||
case SSEEventType.MessageEnd:
|
||||
this.callbacks.onMessageEnd?.(eventData as MessageEnd);
|
||||
break;
|
||||
|
||||
case SSEEventType.MessageReplace:
|
||||
this.callbacks.onMessageReplace?.(eventData as MessageReplace);
|
||||
break;
|
||||
|
||||
case SSEEventType.WorkflowStarted:
|
||||
this.callbacks.onWorkflowStarted?.(eventData as WorkflowStartedResponse);
|
||||
break;
|
||||
|
||||
case SSEEventType.WorkflowFinished:
|
||||
this.callbacks.onWorkflowFinished?.(eventData as WorkflowFinishedResponse);
|
||||
break;
|
||||
|
||||
case SSEEventType.NodeStarted:
|
||||
this.callbacks.onNodeStarted?.(eventData as NodeStartedResponse);
|
||||
break;
|
||||
|
||||
case SSEEventType.NodeFinished:
|
||||
this.callbacks.onNodeFinished?.(eventData as NodeFinishedResponse);
|
||||
break;
|
||||
|
||||
default:
|
||||
// 未知事件类型,忽略
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 处理消息事件
|
||||
* @param eventData - 事件数据
|
||||
*/
|
||||
private handleMessageEvent(eventData: Record<string, any>): void {
|
||||
const answer = unicodeToChar(eventData.answer);
|
||||
|
||||
this.callbacks.onData?.(answer, this.isFirstMessage, {
|
||||
conversationId: eventData.conversation_id,
|
||||
messageId: eventData.id || eventData.message_id,
|
||||
taskId: eventData.task_id,
|
||||
});
|
||||
|
||||
this.isFirstMessage = false;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 处理 SSE 流式响应(函数式 API)
|
||||
*
|
||||
* 这是核心的流式响应处理函数,负责:
|
||||
* - 解析 SSE 数据流
|
||||
* - 处理各种事件类型(消息、思考、文件、工作流等)
|
||||
* - 错误处理和状态管理
|
||||
*
|
||||
* @param response - fetch API 返回的 Response 对象
|
||||
* @param callbacks - SSE 回调配置
|
||||
* @returns SSEStreamHandler 实例(可用于中止流)
|
||||
*
|
||||
* @example
|
||||
* ```typescript
|
||||
* const handler = handleSSEStream(response, {
|
||||
* onData: (message, isFirst, info) => console.log('收到消息:', message),
|
||||
* onCompleted: () => console.log('流式响应完成'),
|
||||
* onThought: (thought) => console.log('AI思考:', thought),
|
||||
* onError: (error) => console.error('错误:', error),
|
||||
* });
|
||||
*
|
||||
* // 需要时可以中止流
|
||||
* handler.abort();
|
||||
* ```
|
||||
*/
|
||||
export function handleSSEStream(
|
||||
response: Response,
|
||||
callbacks: SSECallbacks
|
||||
): SSEStreamHandler {
|
||||
const handler = new SSEStreamHandler(callbacks);
|
||||
handler.handleStream(response);
|
||||
return handler;
|
||||
}
|
||||
|
||||
/**
|
||||
* 创建 SSE 回调配置的工厂函数
|
||||
*
|
||||
* 提供类型安全的回调配置创建方式
|
||||
*
|
||||
* @param callbacks - 部分回调配置
|
||||
* @returns 完整的 SSE 回调配置
|
||||
*
|
||||
* @example
|
||||
* ```typescript
|
||||
* const callbacks = createSSECallbacks({
|
||||
* onData: (message) => updateUI(message),
|
||||
* onCompleted: () => setLoading(false),
|
||||
* });
|
||||
* ```
|
||||
*/
|
||||
export function createSSECallbacks(callbacks: Partial<SSECallbacks> & { onData: OnData }): SSECallbacks {
|
||||
return callbacks as SSECallbacks;
|
||||
}
|
||||
|
||||
/**
|
||||
* 兼容旧版 handleStream 函数的适配器
|
||||
*
|
||||
* @deprecated 请使用 handleSSEStream 替代
|
||||
*/
|
||||
export const handleStream = (
|
||||
response: Response,
|
||||
onData: OnData,
|
||||
onCompleted?: OnCompleted,
|
||||
onThought?: OnThought,
|
||||
onMessageEnd?: OnMessageEnd,
|
||||
onMessageReplace?: OnMessageReplace,
|
||||
onFile?: OnFile,
|
||||
onWorkflowStarted?: OnWorkflowStarted,
|
||||
onWorkflowFinished?: OnWorkflowFinished,
|
||||
onNodeStarted?: OnNodeStarted,
|
||||
onNodeFinished?: OnNodeFinished,
|
||||
onError?: OnError,
|
||||
): void => {
|
||||
handleSSEStream(response, {
|
||||
onData,
|
||||
onCompleted,
|
||||
onThought,
|
||||
onFile,
|
||||
onMessageEnd,
|
||||
onMessageReplace,
|
||||
onError,
|
||||
onWorkflowStarted,
|
||||
onWorkflowFinished,
|
||||
onNodeStarted,
|
||||
onNodeFinished,
|
||||
});
|
||||
};
|
||||
Reference in New Issue
Block a user