import { CHAT_CONFIG, ContentType, SSE_TIMEOUT } from '../config/chat'; import type { Feedbacktype, ThoughtItem, VisionFile, MessageEnd, MessageReplace } from '../types/dify_chat'; import { unicodeToChar } from '../utils/chat-utils'; // 基础请求选项 const baseOptions = { method: 'GET', mode: 'cors' as RequestMode, credentials: 'include' as RequestCredentials, headers: new Headers({ 'Content-Type': ContentType.json, }), redirect: 'follow' as RequestRedirect, }; // 回调接口定义 export type IOnDataMoreInfo = { conversationId?: string; taskId?: string; messageId: string; errorMessage?: string; errorCode?: string; } export type IOnData = (message: string, isFirstMessage: boolean, moreInfo: IOnDataMoreInfo) => void; export type IOnThought = (thought: ThoughtItem) => void; export type IOnFile = (file: VisionFile) => void; export type IOnMessageEnd = (messageEnd: MessageEnd) => void; export type IOnMessageReplace = (messageReplace: MessageReplace) => void; export type IOnCompleted = (hasError?: boolean) => void; export type IOnError = (msg: string, code?: string) => void; // 工作流相关类型 export type WorkflowStartedResponse = { task_id: string; workflow_run_id: string; event: string; data: { id: string; workflow_id: string; sequence_number: number; created_at: number; }; } export type WorkflowFinishedResponse = { task_id: string; workflow_run_id: string; event: string; data: { id: string; workflow_id: string; status: string; outputs: any; error: string; elapsed_time: number; total_tokens: number; total_steps: number; created_at: number; finished_at: number; }; } export type NodeStartedResponse = { task_id: string; workflow_run_id: string; event: string; data: { id: string; node_id: string; node_type: string; index: number; predecessor_node_id?: string; inputs: any; created_at: number; extras?: any; }; } export type NodeFinishedResponse = { task_id: string; workflow_run_id: string; event: string; data: { id: string; node_id: string; node_type: string; index: number; predecessor_node_id?: string; inputs: any; process_data: any; outputs: any; status: string; error: string; elapsed_time: number; execution_metadata: { total_tokens: number; total_price: number; currency: string; }; created_at: number; }; } export type IOnWorkflowStarted = (workflowStarted: WorkflowStartedResponse) => void; export type IOnWorkflowFinished = (workflowFinished: WorkflowFinishedResponse) => void; export type IOnNodeStarted = (nodeStarted: NodeStartedResponse) => void; export type IOnNodeFinished = (nodeFinished: NodeFinishedResponse) => void; // 处理流式响应 const handleStream = ( response: Response, onData: IOnData, onCompleted?: IOnCompleted, onThought?: IOnThought, onMessageEnd?: IOnMessageEnd, onMessageReplace?: IOnMessageReplace, onFile?: IOnFile, onWorkflowStarted?: IOnWorkflowStarted, onWorkflowFinished?: IOnWorkflowFinished, onNodeStarted?: IOnNodeStarted, onNodeFinished?: IOnNodeFinished, onError?: IOnError, ) => { // console.log('🌊 [handleStream] 开始处理流式响应:', { // status: response.status, // statusText: response.statusText, // headers: Object.fromEntries(response.headers.entries()) // }); if (!response.ok) { console.error('❌ [handleStream] 响应错误:', response.status, response.statusText); onError?.('网络响应错误'); throw new Error('网络响应错误'); } const reader = response.body?.getReader(); const decoder = new TextDecoder('utf-8'); let buffer = ''; let bufferObj: Record; let isFirstMessage = true; let messageCount = 0; // console.log('📖 [handleStream] 获取reader:', !!reader); function read() { let hasError = false; reader?.read().then((result: any) => { // console.log('📨 [handleStream] 读取数据块:', { // done: result.done, // valueLength: result.value?.length, // messageCount: ++messageCount // }); if (result.done) { // console.log('✅ [handleStream] 流式响应完成, 总消息数:', messageCount); onCompleted && onCompleted(); return; } const chunk = decoder.decode(result.value, { stream: true }); buffer += chunk; const lines = buffer.split('\n'); // console.log('🔍 [handleStream] 处理数据块:', { // chunkLength: chunk.length, // bufferLength: buffer.length, // linesCount: lines.length, // chunk: chunk.substring(0, 100) + (chunk.length > 100 ? '...' : '') // }); try { lines.forEach((message, index) => { if (message.startsWith('data: ')) { const jsonStr = message.substring(6); // console.log(`📋 [handleStream] 解析消息 ${index}:`, { // jsonLength: jsonStr.length, // preview: jsonStr.substring(0, 200) + (jsonStr.length > 200 ? '...' : '') // }); try { bufferObj = JSON.parse(jsonStr) as Record; // console.log('✨ [handleStream] JSON解析成功:', { // event: bufferObj.event, // hasAnswer: !!bufferObj.answer, // answerLength: bufferObj.answer?.length || 0, // conversationId: bufferObj.conversation_id, // messageId: bufferObj.id || bufferObj.message_id // }); } catch (e) { console.warn('⚠️ [handleStream] JSON解析失败:', e, 'JSON:', jsonStr); // 处理消息截断 onData('', isFirstMessage, { conversationId: bufferObj?.conversation_id, messageId: bufferObj?.message_id || bufferObj?.id, }); return; } if (bufferObj.status === 400 || !bufferObj.event) { console.error('❌ [handleStream] 错误响应:', { status: bufferObj.status, event: bufferObj.event, message: bufferObj.message, code: bufferObj.code }); onData('', false, { conversationId: undefined, messageId: '', errorMessage: bufferObj?.message, errorCode: bufferObj?.code, }); hasError = true; onCompleted?.(true); return; } if (bufferObj.event === 'message' || bufferObj.event === 'agent_message') { const answer = unicodeToChar(bufferObj.answer); // console.log('💬 [handleStream] 处理消息事件:', { // event: bufferObj.event, // isFirstMessage, // answerLength: answer.length, // answer: answer.substring(0, 50) + (answer.length > 50 ? '...' : ''), // conversationId: bufferObj.conversation_id, // messageId: bufferObj.id || bufferObj.message_id // }); onData(answer, isFirstMessage, { conversationId: bufferObj.conversation_id, messageId: bufferObj.id || bufferObj.message_id, taskId: bufferObj.task_id, }); isFirstMessage = false; } else if (bufferObj.event === 'agent_thought' && onThought) { // console.log('🤔 [handleStream] 处理思考事件:', bufferObj.event); onThought(bufferObj as ThoughtItem); } else if (bufferObj.event === 'message_file' && onFile) { // console.log('📁 [handleStream] 处理文件事件:', bufferObj.event); onFile(bufferObj as VisionFile); } else if (bufferObj.event === 'message_end' && onMessageEnd) { // console.log('🏁 [handleStream] 处理消息结束事件:', bufferObj.event); onMessageEnd(bufferObj as MessageEnd); } else if (bufferObj.event === 'message_replace' && onMessageReplace) { // console.log('🔄 [handleStream] 处理消息替换事件:', bufferObj.event); onMessageReplace(bufferObj as MessageReplace); } else if (bufferObj.event === 'workflow_started' && onWorkflowStarted) { // console.log('🚀 [handleStream] 处理工作流开始事件:', bufferObj.event); onWorkflowStarted(bufferObj as WorkflowStartedResponse); } else if (bufferObj.event === 'workflow_finished' && onWorkflowFinished) { // console.log('🎯 [handleStream] 处理工作流完成事件:', bufferObj.event); onWorkflowFinished(bufferObj as WorkflowFinishedResponse); } else if (bufferObj.event === 'node_started' && onNodeStarted) { // console.log('🔗 [handleStream] 处理节点开始事件:', bufferObj.event); onNodeStarted(bufferObj as NodeStartedResponse); } else if (bufferObj.event === 'node_finished' && onNodeFinished) { // console.log('✅ [handleStream] 处理节点完成事件:', bufferObj.event); onNodeFinished(bufferObj as NodeFinishedResponse); } else { // console.log('❓ [handleStream] 未知事件类型:', bufferObj.event); } } else if (message.trim()) { // console.log('📝 [handleStream] 非data消息:', message.substring(0, 100)); } }); // 保留最后一行(可能是不完整的消息) const lastLine = lines[lines.length - 1]; buffer = lastLine; // console.log('💾 [handleStream] 保留缓冲区:', { // lastLineLength: lastLine.length, // preview: lastLine.substring(0, 50) // }); } catch (err) { console.error('❌ [handleStream] 解析响应时出错:', err); onData('', false, { conversationId: undefined, messageId: '', errorMessage: `${err}`, }); hasError = true; onCompleted?.(true); return; } if (!hasError) { // console.log('🔄 [handleStream] 继续读取下一块...'); read(); } else { // console.log('🛑 [handleStream] 因错误停止读取'); } }).catch(err => { console.error('❌ [handleStream] 读取流时出错:', err); onError?.(err.message); }); } read(); }; // 基础Fetch函数 const baseFetch = (url: string, fetchOptions: any, needAllResponseContent: boolean = false) => { const options = Object.assign({}, baseOptions, fetchOptions); // 构建完整URL - 修复重复的/v1问题 // CHAT_CONFIG.API_URL 已经包含了 /v1,所以不需要再添加 API_PREFIX let urlWithPrefix = `${CHAT_CONFIG.API_URL}/${url.replace(/^\//, '')}`; const { method, params, body } = options; // 处理GET请求的查询参数 if (method === 'GET' && params) { const paramsArray: string[] = []; Object.keys(params).forEach(key => paramsArray.push(`${key}=${encodeURIComponent(params[key])}`), ); if (urlWithPrefix.search(/\?/) === -1) urlWithPrefix += `?${paramsArray.join('&')}`; else urlWithPrefix += `&${paramsArray.join('&')}`; delete options.params; } // 处理请求体 if (body && typeof body === 'object') options.body = JSON.stringify(body); // 添加认证头 if (!options.headers) options.headers = {}; if (CHAT_CONFIG.API_KEY) { options.headers['Authorization'] = `Bearer ${CHAT_CONFIG.API_KEY}`; } return Promise.race([ new Promise((resolve, reject) => { setTimeout(() => { reject(new Error('请求超时')); }, SSE_TIMEOUT); }), new Promise((resolve, reject) => { fetch(urlWithPrefix, options) .then((res: Response) => { const resClone = res.clone(); // console.log('📥 API Response:', { // status: res.status, // statusText: res.statusText, // url: urlWithPrefix // }); // 错误处理 if (!/^(2|3)\d{2}$/.test(res.status.toString())) { try { const bodyJson = res.json(); switch (res.status) { case 401: console.error('❌ Invalid token'); break; default: bodyJson.then((data: any) => { console.error('❌ API Error:', data.message); }); } } catch (e) { console.error('❌ Response Error:', e); } return Promise.reject(resClone); } // 处理删除API(204状态码) if (res.status === 204) { resolve({ result: 'success' }); return; } // 返回数据 const contentType = res.headers.get('Content-Type') || ''; const data = contentType.includes('application/octet-stream') ? res.blob() : res.json(); resolve(needAllResponseContent ? resClone : data); }) .catch((err) => { console.error('❌ Fetch Error:', err); reject(err); }); }), ]); }; // SSE POST 请求 export const ssePost = ( url: string, fetchOptions: any, { onData, onCompleted, onThought, onFile, onMessageEnd, onMessageReplace, onWorkflowStarted, onWorkflowFinished, onNodeStarted, onNodeFinished, onError, getAbortController, }: { onData: IOnData; onCompleted?: IOnCompleted; onThought?: IOnThought; onFile?: IOnFile; onMessageEnd?: IOnMessageEnd; onMessageReplace?: IOnMessageReplace; onError?: IOnError; getAbortController?: (abortController: AbortController) => void; onWorkflowStarted?: IOnWorkflowStarted; onWorkflowFinished?: IOnWorkflowFinished; onNodeStarted?: IOnNodeStarted; onNodeFinished?: IOnNodeFinished; }, ) => { const options = Object.assign({}, baseOptions, { method: 'POST', }, fetchOptions); // 修复URL构建逻辑,与baseFetch保持一致 const urlWithPrefix = `${CHAT_CONFIG.API_URL}/${url.replace(/^\//, '')}`; const controller = new AbortController(); if (getAbortController) getAbortController(controller); options.headers = { ...options.headers, 'Content-Type': 'application/json', 'Accept': ContentType.stream, }; if (CHAT_CONFIG.API_KEY) { options.headers['Authorization'] = `Bearer ${CHAT_CONFIG.API_KEY}`; } options.signal = controller.signal; const { body } = options; if (body && typeof body === 'object') options.body = JSON.stringify(body); return fetch(urlWithPrefix, options) .then((res: Response) => { // console.log('📡 SSE Response:', { // status: res.status, // statusText: res.statusText, // url: urlWithPrefix // }); if (!/^(2|3)\d{2}$/.test(res.status.toString())) { res.json().then((data: any) => { console.error('❌ SSE Error:', data.message || 'Server Error'); onError?.(data.message || 'Server Error'); }); return; } handleStream( res, onData, onCompleted, onThought, onMessageEnd, onMessageReplace, onFile, onWorkflowStarted, onWorkflowFinished, onNodeStarted, onNodeFinished, onError ); }) .catch((err) => { console.error('❌ SSE Request Error:', err); onError?.(err.message); }); }; // 公共请求函数 export const request = (url: string, options = {}, needAllResponseContent = false) => { return baseFetch(url, { ...baseOptions, ...options }, needAllResponseContent); }; // GET 请求 export const get = (url: string, options = {}) => { return request(url, { ...options, method: 'GET' }); }; // POST 请求 export const post = (url: string, options = {}) => { return request(url, { ...options, method: 'POST' }); }; // PUT 请求 export const put = (url: string, options = {}) => { return request(url, { ...options, method: 'PUT' }); }; // DELETE 请求 export const del = (url: string, options = {}) => { return request(url, { ...options, method: 'DELETE' }); }; // 发送聊天消息 export const sendChatMessage = async ( body: Record, { onData, onCompleted, onThought, onFile, onError, getAbortController, onMessageEnd, onMessageReplace, onWorkflowStarted, onNodeStarted, onNodeFinished, onWorkflowFinished, }: { onData: IOnData; onCompleted: IOnCompleted; onFile?: IOnFile; onThought?: IOnThought; onMessageEnd?: IOnMessageEnd; onMessageReplace?: IOnMessageReplace; onError?: IOnError; getAbortController?: (abortController: AbortController) => void; onWorkflowStarted?: IOnWorkflowStarted; onNodeStarted?: IOnNodeStarted; onNodeFinished?: IOnNodeFinished; onWorkflowFinished?: IOnWorkflowFinished; }, ) => { return ssePost('chat-messages', { body: { ...body, response_mode: 'streaming', }, }, { onData, onCompleted, onThought, onFile, onError, getAbortController, onMessageEnd, onMessageReplace, onNodeStarted, onWorkflowStarted, onWorkflowFinished, onNodeFinished }); }; // 获取会话列表 export const fetchConversations = async () => { return get('conversations', { params: { limit: 100, first_id: '' }, }); }; // 获取聊天消息列表 export const fetchChatList = async (conversationId: string) => { return get('messages', { params: { conversation_id: conversationId, limit: 20, last_id: '' }, }); }; // 获取应用参数 export const fetchAppParams = async () => { return get('parameters'); }; // 更新反馈 export const updateFeedback = async ({ url, body }: { url: string; body: Feedbacktype }) => { return post(url, { body }); }; // 生成会话名称 export const generateConversationName = async (id: string) => { return post(`conversations/${id}/name`, { body: { auto_generate: true }, }); }; // 重命名会话 export const renameConversation = async (id: string, name: string, autoGenerate: boolean = false) => { return post(`conversations/${id}/name`, { body: { name: autoGenerate ? undefined : name, auto_generate: autoGenerate }, }); }; // 删除会话 export const deleteConversation = async (id: string) => { return del(`conversations/${id}`); }; // 文件上传 export const upload = (fetchOptions: any): Promise => { const urlPrefix = CHAT_CONFIG.API_PREFIX; const urlWithPrefix = `${CHAT_CONFIG.API_URL}${urlPrefix}/file-upload`; const defaultOptions = { method: 'POST', url: urlWithPrefix, data: {}, }; const options = { ...defaultOptions, ...fetchOptions, }; return new Promise((resolve, reject) => { const xhr = options.xhr; xhr.open(options.method, options.url); for (const key in options.headers) xhr.setRequestHeader(key, options.headers[key]); if (CHAT_CONFIG.API_KEY) { xhr.setRequestHeader('Authorization', `Bearer ${CHAT_CONFIG.API_KEY}`); } xhr.withCredentials = true; xhr.onreadystatechange = function () { if (xhr.readyState === 4) { if (xhr.status === 200) resolve({ id: xhr.response }); else reject(xhr); } }; xhr.upload.onprogress = options.onprogress; xhr.send(options.data); }); };