流式传输技术详解:从概念到实现的全过程
目录
- 什么是流式传输?
- 流式传输的实现要求
- 流式传输的三个层面
- 适配层与包装层的实现
- 消息类型分类逻辑
- 完整流程示例
- 总结与最佳实践
什么是流式传输?
**流式传输(Streaming)**是一种数据传输方式,数据以连续流的形式逐步发送和接收,而不是一次性传输全部数据。
主要特点
- 实时性:数据生成后立即发送,无需等待全部完成
- 渐进式:接收端可以边接收边处理
- 低延迟:用户能更快看到结果
- 内存友好:不需要在内存中保存全部数据
应用场景
- 聊天应用(实时消息)
- 文件上传/下载进度
- 实时日志监控
- AI 模型生成(逐字输出)← 本文重点
- 数据导入/导出进度
流式传输的实现要求
⚠️ 重要原则:双方都必须实现
流式传输需要发送方和接收方都实现才能正常工作:
| 组合 | 结果 |
|---|---|
| ✅ 后端流式 + 前端流式 | 正常工作,实时显示进度 |
| ❌ 后端流式 + 前端非流式 | 无法正常工作或体验差 |
| ⚠️ 后端非流式 + 前端流式 | 可以工作,但无实时效果 |
| ✅ 后端非流式 + 前端非流式 | 传统方式,等待全部完成 |
发送方(后端)实现
# Flask 流式响应defgenerate():yieldf"data:{json.dumps({'type':'info','message':'开始处理...'},ensure_ascii=False)}\n\n"# 处理逻辑...yieldf"data:{json.dumps({'type':'token','token':'文本片段'},ensure_ascii=False)}\n\n"returnResponse(stream_with_context(generate()),mimetype='text/event-stream',headers={'Cache-Control':'no-cache','Connection':'keep-alive'})接收方(前端)实现
// Fetch API 流式读取constreader=response.body.getReader();constdecoder=newTextDecoder();letbuffer='';while(true){const{done,value}=awaitreader.read();if(done)break;buffer+=decoder.decode(value,{stream:true});constlines=buffer.split('\n');buffer=lines.pop();for(constlineoflines){if(line.startsWith('data: ')){constmsg=JSON.parse(line.substring(6));// 处理消息...}}}流式传输的三个层面
流式传输的实现涉及三个不同的层面,每个层面都有不同的格式要求:
1. 传输协议层(标准统一)
Server-Sent Events (SSE)- Web 标准协议
# 后端:设置 SSE 响应头mimetype='text/event-stream'headers={'Cache-Control':'no-cache','Connection':'keep-alive',...}SSE 格式规范:
- 每行以
data:开头 - 每行以
\n\n结尾(两个换行符) - 这是标准格式,所有浏览器都支持
2. 数据格式层(项目自定义)
自定义 JSON 消息格式
# 后端发送的消息格式{"type":"token",# 消息类型标识"token":"文本片段"# 实际的文本内容}消息类型:
info: 状态信息error: 错误信息results: 搜索结果token: 答案片段(流式生成)answer: 完整答案final: 最终数据
3. 大模型 API 层(模型特定)
不同大模型库有不同的流式格式:
| 模型库 | 流式格式 |
|---|---|
| Ollama | {delta: "文本"}或{text: "文本"} |
| LlamaIndex | {delta: "文本"}或{response: Object} |
| OpenAI | {delta: "文本"} |
适配层与包装层的实现
这是流式传输的核心技术:如何统一不同格式。
完整流程
大模型库(Ollama/LlamaIndex) ↓ {delta: "文本"} / {text: "文本"} / {response: Object} ↓ ┌─────────────────────────────────────────┐ │ 🔧 适配层(Adapter Layer) │ │ rag_generate_with_llamaindex_stream() │ │ │ │ 检查格式 → 提取文本 → 统一为字符串 │ └─────────────────────────────────────────┘ ↓ "文本片段" (字符串) ↓ ┌─────────────────────────────────────────┐ │ 📦 包装层(Wrapper Layer) │ │ search_alarm() 路由函数 │ │ │ │ 字符串 → JSON对象 → JSON字符串 → SSE │ └─────────────────────────────────────────┘ ↓ data: {"type":"token","token":"文本片段"}\n\n ↓ 前端接收并显示适配层详解
位置:rag_generate_with_llamaindex_stream()函数
作用:将不同大模型库的格式统一为字符串
场景1:Ollama LLM 直接调用
# 调用 Ollama 的流式 APIresponse_stream=Settings.llm.stream_complete(prompt)# 适配层 - 处理 Ollama 的不同返回格式fortokeninresponse_stream:# 情况1:Ollama 返回 {delta: "文本片段"}ifhasattr(token,'delta')andtoken.delta:yieldtoken.delta# ← 提取 delta 属性# 情况2:Ollama 返回 {text: "文本片段"}elifhasattr(token,'text'):yieldtoken.text# ← 提取 text 属性# 情况3:Ollama 返回其他格式(字符串或对象)else:token_str=str(token)# ← 转换为字符串iftoken_str:yieldtoken_str适配逻辑:
- ✅ 检查
delta属性 → 提取文本 - ✅ 检查
text属性 → 提取文本 - ✅ 其他情况 → 转换为字符串
统一输出:所有格式最终都 yield 字符串
场景2:LlamaIndex ChatEngine
# 调用 LlamaIndex ChatEngine 的流式 APIresponse_stream=chat_engine.stream_chat(query)# 适配层 - 处理 LlamaIndex 的不同返回格式forresponse_chunkinresponse_stream:# 情况1:LlamaIndex 返回 {delta: "文本片段"}ifhasattr(response_chunk,'delta')andresponse_chunk.delta:yieldresponse_chunk.delta# 情况2:LlamaIndex 返回 {response: ResponseObject}elifhasattr(response_chunk,'response'):response_text=str(response_chunk.response)ifresponse_text:yieldresponse_text# 情况3:直接是字符串或其他格式else:chunk_str=str(response_chunk)ifchunk_str:yieldchunk_str场景3:LlamaIndex QueryEngine
# 调用 QueryEngine 的流式 APIstreaming_response=query_engine.query(enhanced_query)# 适配层 - 处理 QueryEngine 的不同返回格式ifhasattr(streaming_response,'response_gen'):# 情况1:返回 StreamingResponse 对象,有 response_gen 生成器fortext_chunkinstreaming_response.response_gen:iftext_chunk:yieldtext_chunkelifhasattr(streaming_response,'__iter__')andnotisinstance(streaming_response,str):# 情况2:直接是生成器forchunkinstreaming_response:ifhasattr(chunk,'delta')andchunk.delta:yieldchunk.deltaelse:chunk_str=str(chunk)ifchunk_str:yieldchunk_strelse:# 情况3:非流式,返回完整响应response_text=str(streaming_response)yieldresponse_text包装层详解
位置:search_alarm()路由函数
作用:将适配层输出的字符串包装成自定义 JSON 格式,并封装为 SSE 格式
包装过程
# 调用适配层,获取字符串流answer_stream=rag_generate_with_llamaindex_stream(query,results,chat_history_list=current_history)# 包装层 - 将字符串包装成 JSON + SSE 格式fortokeninanswer_stream:# ← token 是字符串(来自适配层)iftokenisNone:continue# 确保是字符串类型token_str=str(token)ifnotisinstance(token,str)elsetokeniftoken_str:# 包装成自定义 JSON 格式json_data={'type':'token',# ← 消息类型'token':token_str# ← 文本片段}# 转换为 JSON 字符串json_str=json.dumps(json_data,ensure_ascii=False)# 包装成 SSE 格式:data: {...}\n\nyieldf"data:{json_str}\n\n"包装格式说明
- 自定义 JSON 格式:
{"type":"token",// 消息类型标识"token":"文本片段"// 实际的文本内容}- SSE 格式:
data: {"type":"token","token":"文本片段"}\n\n设计优势
- 适配层:屏蔽不同库的格式差异,统一输出字符串
- 包装层:将字符串包装为应用层统一的消息格式
- 解耦:更换大模型库时只需修改适配层
- 扩展:新增消息类型只需在包装层添加
这就是适配器模式 + 包装器模式的组合应用。
消息类型分类逻辑
后端根据流程阶段和状态决定发送哪种类型的消息:
消息类型定义
| 消息类型 | 发送时机 | 发送条件 | 数据内容 |
|---|---|---|---|
| info | 流程关键节点 | 搜索开始/完成、RAG开始 | message字符串 |
| error | 出现错误 | 验证失败、配置错误、执行异常 | message错误信息 |
| results | 搜索完成 | 搜索完成后(无论是否有结果) | data.results结果列表 |
| token | RAG流式生成 | 每个文本片段生成时 | token文本片段 |
| answer | RAG生成完成 | 生成成功且有答案 | answer完整答案 |
| final | 流程结束 | 所有情况(成功/失败) | data包含所有最终数据 |
分类逻辑详解
1.info- 状态信息
发送时机:流程中的关键节点
# 搜索开始时yieldf"data:{json.dumps({'type':'info','message':'开始搜索知识库...'},ensure_ascii=False)}\n\n"# 找到精确匹配结果时yieldf"data:{json.dumps({'type':'info','message':f'找到{len(results)}个精确匹配结果'},ensure_ascii=False)}\n\n"# 开始向量搜索时yieldf"data:{json.dumps({'type':'info','message':'正在进行向量搜索...'},ensure_ascii=False)}\n\n"# 搜索完成时yieldf"data:{json.dumps({'type':'info','message':f'找到{len(results)}个相关结果'},ensure_ascii=False)}\n\n"# 开始生成RAG回答时yieldf"data:{json.dumps({'type':'info','message':'开始生成智能回答...'},ensure_ascii=False)}\n\n"分类逻辑:
- ✅ 流程开始:搜索开始
- ✅ 搜索阶段:精确匹配完成、向量搜索开始、搜索完成
- ✅ RAG阶段:开始生成回答
2.error- 错误信息
发送时机:出现错误时
# 查询为空ifnotquery:yieldf"data:{json.dumps({'type':'error','message':'查询内容不能为空'},ensure_ascii=False)}\n\n"# LLM未初始化ifSettings.llmisNone:error_msg="LlamaIndex LLM 未初始化,无法生成回答。请检查 Ollama 服务是否运行。"yieldf"data:{json.dumps({'type':'error','message':error_msg},ensure_ascii=False)}\n\n"# 流式生成失败ifnotfull_answer:error_msg="流式生成失败,未收到有效回答"yieldf"data:{json.dumps({'type':'error','message':error_msg},ensure_ascii=False)}\n\n"# 异常捕获exceptExceptionase:error_msg=f"流式生成过程出错:{str(e)}"yieldf"data:{json.dumps({'type':'error','message':error_msg},ensure_ascii=False)}\n\n"分类逻辑:
- ✅ 输入验证错误:查询为空
- ✅ 配置错误:LLM未初始化、RAG未启用
- ✅ 执行错误:生成失败、异常
3.results- 搜索结果
发送时机:搜索完成后
# 搜索完成后发送结果yieldf"data:{json.dumps({'type':'results','data':{'results':results,# 搜索结果列表(包含 images 字段)'count':len(results),# 结果数量'search_mode':search_mode# 搜索模式}},ensure_ascii=False)}\n\n"分类逻辑:
- ✅ 条件:搜索完成(无论是否有结果)
- ✅ 数据:结果列表、数量、搜索模式
- ✅图片数据:每个结果对象的
images字段包含图片路径数组
4.token- 答案片段(流式生成)
发送时机:RAG流式生成过程中
# 流式生成答案片段fortokeninanswer_stream:iftokenisNone:continuetoken_str=str(token)ifnotisinstance(token,str)elsetokeniftoken_str:full_answer+=token_str# 每个文本片段都发送一次 token 消息yieldf"data:{json.dumps({'type':'token','token':token_str},ensure_ascii=False)}\n\n"分类逻辑:
- ✅ 条件:启用RAG且流式生成中
- ✅ 频率:每个文本片段发送一次
- ✅ 数据:单个文本片段
5.answer- 完整答案
发送时机:RAG生成完成后
# 生成完成后发送完整答案iffull_answer:yieldf"data:{json.dumps({'type':'answer','answer':full_answer,# 完整答案'references':results[:5]ifresultselse[]# 参考来源(包含 images)},ensure_ascii=False)}\n\n"分类逻辑:
- ✅ 条件:RAG生成成功且有答案
- ✅ 数据:完整答案、参考来源
- ✅图片数据:
references中每个对象的images字段
6.final- 最终数据
发送时机:整个流程结束时
# RAG成功时(有答案)yieldf"data:{json.dumps({'type':'final','data':{'success':True,'answer':full_answer,'results':results,# 所有结果(包含 images)'references':results[:5],# 参考来源(包含 images)'session_id':session_id}},ensure_ascii=False)}\n\n"# 不使用RAG时(只有搜索结果)yieldf"data:{json.dumps({'type':'final','data':{'success':True,'results':results,# 所有结果(包含 images)'session_id':session_id}},ensure_ascii=False)}\n\n"分类逻辑:
- ✅ 条件:流程结束(成功或失败)
- ✅ 数据:包含所有最终数据(答案、结果、错误、会话ID等)
- ✅图片数据:
results和references中都包含images字段
完整流程示例
场景1:传统搜索模式(不使用RAG)
1. info: "开始搜索知识库..." ↓ 2. info: "找到 5 个相关结果" 或 "正在进行向量搜索..." ↓ 3. results: { results: [ {id: 1, images: ["img1.jpg"], ...}, {id: 2, images: ["img2.jpg"], ...}, ... ], count: 5 } ↓ 4. final: { success: true, results: [{images: [...]}, ...], session_id: "..." }图片返回时机:
- ✅
results消息:所有搜索结果的图片 - ✅
final消息:所有搜索结果的图片(再次返回)
场景2:RAG模式(成功)
1. info: "开始搜索知识库..." ↓ 2. info: "找到 5 个相关结果" ↓ 3. results: { results: [{images: [...]}, ...], count: 5 } ↓ 4. info: "开始生成智能回答..." ↓ 5. token: "根据" ↓ 6. token: "知识库" ↓ 7. token: "中的信息" ... (多个token,逐字显示) ↓ 8. answer: { answer: "完整答案...", references: [{images: [...]}, ...] // 前5条结果的图片 } ↓ 9. final: { success: true, answer: "完整答案...", results: [{images: [...]}, ...], // 所有结果的图片 references: [{images: [...]}, ...], // 参考来源的图片 session_id: "..." }图片返回时机:
- ✅
results消息:所有搜索结果的图片(第一次) - ✅
answer消息:前5条参考结果的图片(第二次) - ✅
final消息:所有结果和参考来源的图片(第三次和第四次)
场景3:RAG模式(失败)
1. info: "开始搜索知识库..." ↓ 2. results: {results: [{images: [...]}, ...], count: 5} ↓ 3. info: "开始生成智能回答..." ↓ 4. error: "LlamaIndex LLM 未初始化..." ↓ 5. final: { success: true, results: [{images: [...]}, ...], rag_error: "...", session_id: "..." }图片数据的返回时机
重要发现:图片不是单独返回的
图片数据不是单独返回,而是作为搜索结果的一部分,每个结果对象都有一个images字段(数组)。
图片返回的多个时机
| 返回时机 | 消息类型 | 包含字段 | 说明 |
|---|---|---|---|
| 搜索完成后 | results | data.results[].images | 所有搜索结果的图片 |
| RAG生成完成后 | answer | references[].images | 前5条参考结果的图片 |
| 流程结束时 | final | data.results[].imagesdata.references[].images | 所有结果和参考来源的图片 |
图片数据结构
{"id":1,"alarm_code":"E001","alarm_message":"温度传感器故障","solution":"检查传感器连接...","images":["uploads/images/sensor1.jpg","uploads/images/sensor2.jpg"],...}总结与最佳实践
核心技术要点
三个层面分离
- 传输协议层(SSE):标准统一
- 数据格式层(JSON):项目自定义
- 大模型API层:模型特定
适配器模式
- 适配层:统一不同库的格式差异
- 包装层:统一应用层的消息格式
消息类型设计
- 按流程阶段分类
- 实时反馈 + 最终数据
- 错误处理完善
实现建议
统一定义消息类型
# 使用常量定义classMessageType:INFO='info'ERROR='error'RESULTS='results'TOKEN='token'ANSWER='answer'FINAL='final'错误处理
- 所有错误都发送
error消息 - 最终都发送
final消息(包含错误信息)
- 所有错误都发送
性能优化
- 快速搜索直接返回完整结果
- RAG模式流式生成,实时显示
用户体验
- 实时状态反馈(info)
- 逐字显示答案(token)
- 完整数据保证(final)
设计模式应用
- 适配器模式:适配不同大模型库的格式
- 包装器模式:统一应用层的消息格式
- 观察者模式:前端监听流式消息并更新UI
结语
流式传输是一个涉及多个层面的复杂技术,需要:
- ✅理解三个层面:传输协议、数据格式、大模型API
- ✅实现适配层:统一不同库的格式差异
- ✅实现包装层:统一应用层的消息格式
- ✅设计消息类型:按流程阶段分类
- ✅前后端配合:双方都必须实现流式处理
通过这种设计,我们实现了:
- 🚀实时反馈:用户可以看到搜索和生成进度
- ✍️逐字显示:答案逐字显示,类似 ChatGPT
- 🔄向后兼容:传统搜索模式不受影响
- 🛡️错误处理:完善的错误处理和提示
希望这篇文章能帮助你理解流式传输的完整实现过程!