鸡西市网站建设_网站建设公司_原型设计_seo优化
2026/1/7 18:44:41 网站建设 项目流程

流式传输技术详解:从概念到实现的全过程

目录

  1. 什么是流式传输?
  2. 流式传输的实现要求
  3. 流式传输的三个层面
  4. 适配层与包装层的实现
  5. 消息类型分类逻辑
  6. 完整流程示例
  7. 总结与最佳实践

什么是流式传输?

**流式传输(Streaming)**是一种数据传输方式,数据以连续流的形式逐步发送和接收,而不是一次性传输全部数据。

主要特点

  1. 实时性:数据生成后立即发送,无需等待全部完成
  2. 渐进式:接收端可以边接收边处理
  3. 低延迟:用户能更快看到结果
  4. 内存友好:不需要在内存中保存全部数据

应用场景

  • 聊天应用(实时消息)
  • 文件上传/下载进度
  • 实时日志监控
  • AI 模型生成(逐字输出)← 本文重点
  • 数据导入/导出进度

流式传输的实现要求

⚠️ 重要原则:双方都必须实现

流式传输需要发送方和接收方都实现才能正常工作:

组合结果
✅ 后端流式 + 前端流式正常工作,实时显示进度
❌ 后端流式 + 前端非流式无法正常工作或体验差
⚠️ 后端非流式 + 前端流式可以工作,但无实时效果
✅ 后端非流式 + 前端非流式传统方式,等待全部完成

发送方(后端)实现

# Flask 流式响应defgenerate():yieldf"data:{json.dumps({'type':'info','message':'开始处理...'},ensure_ascii=False)}\n\n"# 处理逻辑...yieldf"data:{json.dumps({'type':'token','token':'文本片段'},ensure_ascii=False)}\n\n"returnResponse(stream_with_context(generate()),mimetype='text/event-stream',headers={'Cache-Control':'no-cache','Connection':'keep-alive'})

接收方(前端)实现

// Fetch API 流式读取constreader=response.body.getReader();constdecoder=newTextDecoder();letbuffer='';while(true){const{done,value}=awaitreader.read();if(done)break;buffer+=decoder.decode(value,{stream:true});constlines=buffer.split('\n');buffer=lines.pop();for(constlineoflines){if(line.startsWith('data: ')){constmsg=JSON.parse(line.substring(6));// 处理消息...}}}

流式传输的三个层面

流式传输的实现涉及三个不同的层面,每个层面都有不同的格式要求:

1. 传输协议层(标准统一)

Server-Sent Events (SSE)- Web 标准协议

# 后端:设置 SSE 响应头mimetype='text/event-stream'headers={'Cache-Control':'no-cache','Connection':'keep-alive',...}

SSE 格式规范:

  • 每行以data:开头
  • 每行以\n\n结尾(两个换行符)
  • 这是标准格式,所有浏览器都支持

2. 数据格式层(项目自定义)

自定义 JSON 消息格式

# 后端发送的消息格式{"type":"token",# 消息类型标识"token":"文本片段"# 实际的文本内容}

消息类型:

  • info: 状态信息
  • error: 错误信息
  • results: 搜索结果
  • token: 答案片段(流式生成)
  • answer: 完整答案
  • final: 最终数据

3. 大模型 API 层(模型特定)

不同大模型库有不同的流式格式:

模型库流式格式
Ollama{delta: "文本"}{text: "文本"}
LlamaIndex{delta: "文本"}{response: Object}
OpenAI{delta: "文本"}

适配层与包装层的实现

这是流式传输的核心技术:如何统一不同格式

完整流程

大模型库(Ollama/LlamaIndex) ↓ {delta: "文本"} / {text: "文本"} / {response: Object} ↓ ┌─────────────────────────────────────────┐ │ 🔧 适配层(Adapter Layer) │ │ rag_generate_with_llamaindex_stream() │ │ │ │ 检查格式 → 提取文本 → 统一为字符串 │ └─────────────────────────────────────────┘ ↓ "文本片段" (字符串) ↓ ┌─────────────────────────────────────────┐ │ 📦 包装层(Wrapper Layer) │ │ search_alarm() 路由函数 │ │ │ │ 字符串 → JSON对象 → JSON字符串 → SSE │ └─────────────────────────────────────────┘ ↓ data: {"type":"token","token":"文本片段"}\n\n ↓ 前端接收并显示

适配层详解

位置rag_generate_with_llamaindex_stream()函数

作用:将不同大模型库的格式统一为字符串

场景1:Ollama LLM 直接调用
# 调用 Ollama 的流式 APIresponse_stream=Settings.llm.stream_complete(prompt)# 适配层 - 处理 Ollama 的不同返回格式fortokeninresponse_stream:# 情况1:Ollama 返回 {delta: "文本片段"}ifhasattr(token,'delta')andtoken.delta:yieldtoken.delta# ← 提取 delta 属性# 情况2:Ollama 返回 {text: "文本片段"}elifhasattr(token,'text'):yieldtoken.text# ← 提取 text 属性# 情况3:Ollama 返回其他格式(字符串或对象)else:token_str=str(token)# ← 转换为字符串iftoken_str:yieldtoken_str

适配逻辑:

  • ✅ 检查delta属性 → 提取文本
  • ✅ 检查text属性 → 提取文本
  • ✅ 其他情况 → 转换为字符串

统一输出:所有格式最终都 yield 字符串

场景2:LlamaIndex ChatEngine
# 调用 LlamaIndex ChatEngine 的流式 APIresponse_stream=chat_engine.stream_chat(query)# 适配层 - 处理 LlamaIndex 的不同返回格式forresponse_chunkinresponse_stream:# 情况1:LlamaIndex 返回 {delta: "文本片段"}ifhasattr(response_chunk,'delta')andresponse_chunk.delta:yieldresponse_chunk.delta# 情况2:LlamaIndex 返回 {response: ResponseObject}elifhasattr(response_chunk,'response'):response_text=str(response_chunk.response)ifresponse_text:yieldresponse_text# 情况3:直接是字符串或其他格式else:chunk_str=str(response_chunk)ifchunk_str:yieldchunk_str
场景3:LlamaIndex QueryEngine
# 调用 QueryEngine 的流式 APIstreaming_response=query_engine.query(enhanced_query)# 适配层 - 处理 QueryEngine 的不同返回格式ifhasattr(streaming_response,'response_gen'):# 情况1:返回 StreamingResponse 对象,有 response_gen 生成器fortext_chunkinstreaming_response.response_gen:iftext_chunk:yieldtext_chunkelifhasattr(streaming_response,'__iter__')andnotisinstance(streaming_response,str):# 情况2:直接是生成器forchunkinstreaming_response:ifhasattr(chunk,'delta')andchunk.delta:yieldchunk.deltaelse:chunk_str=str(chunk)ifchunk_str:yieldchunk_strelse:# 情况3:非流式,返回完整响应response_text=str(streaming_response)yieldresponse_text

包装层详解

位置search_alarm()路由函数

作用:将适配层输出的字符串包装成自定义 JSON 格式,并封装为 SSE 格式

包装过程
# 调用适配层,获取字符串流answer_stream=rag_generate_with_llamaindex_stream(query,results,chat_history_list=current_history)# 包装层 - 将字符串包装成 JSON + SSE 格式fortokeninanswer_stream:# ← token 是字符串(来自适配层)iftokenisNone:continue# 确保是字符串类型token_str=str(token)ifnotisinstance(token,str)elsetokeniftoken_str:# 包装成自定义 JSON 格式json_data={'type':'token',# ← 消息类型'token':token_str# ← 文本片段}# 转换为 JSON 字符串json_str=json.dumps(json_data,ensure_ascii=False)# 包装成 SSE 格式:data: {...}\n\nyieldf"data:{json_str}\n\n"
包装格式说明
  1. 自定义 JSON 格式:
{"type":"token",// 消息类型标识"token":"文本片段"// 实际的文本内容}
  1. SSE 格式:
data: {"type":"token","token":"文本片段"}\n\n

设计优势

  1. 适配层:屏蔽不同库的格式差异,统一输出字符串
  2. 包装层:将字符串包装为应用层统一的消息格式
  3. 解耦:更换大模型库时只需修改适配层
  4. 扩展:新增消息类型只需在包装层添加

这就是适配器模式 + 包装器模式的组合应用。


消息类型分类逻辑

后端根据流程阶段和状态决定发送哪种类型的消息:

消息类型定义

消息类型发送时机发送条件数据内容
info流程关键节点搜索开始/完成、RAG开始message字符串
error出现错误验证失败、配置错误、执行异常message错误信息
results搜索完成搜索完成后(无论是否有结果)data.results结果列表
tokenRAG流式生成每个文本片段生成时token文本片段
answerRAG生成完成生成成功且有答案answer完整答案
final流程结束所有情况(成功/失败)data包含所有最终数据

分类逻辑详解

1.info- 状态信息

发送时机:流程中的关键节点

# 搜索开始时yieldf"data:{json.dumps({'type':'info','message':'开始搜索知识库...'},ensure_ascii=False)}\n\n"# 找到精确匹配结果时yieldf"data:{json.dumps({'type':'info','message':f'找到{len(results)}个精确匹配结果'},ensure_ascii=False)}\n\n"# 开始向量搜索时yieldf"data:{json.dumps({'type':'info','message':'正在进行向量搜索...'},ensure_ascii=False)}\n\n"# 搜索完成时yieldf"data:{json.dumps({'type':'info','message':f'找到{len(results)}个相关结果'},ensure_ascii=False)}\n\n"# 开始生成RAG回答时yieldf"data:{json.dumps({'type':'info','message':'开始生成智能回答...'},ensure_ascii=False)}\n\n"

分类逻辑:

  • ✅ 流程开始:搜索开始
  • ✅ 搜索阶段:精确匹配完成、向量搜索开始、搜索完成
  • ✅ RAG阶段:开始生成回答
2.error- 错误信息

发送时机:出现错误时

# 查询为空ifnotquery:yieldf"data:{json.dumps({'type':'error','message':'查询内容不能为空'},ensure_ascii=False)}\n\n"# LLM未初始化ifSettings.llmisNone:error_msg="LlamaIndex LLM 未初始化,无法生成回答。请检查 Ollama 服务是否运行。"yieldf"data:{json.dumps({'type':'error','message':error_msg},ensure_ascii=False)}\n\n"# 流式生成失败ifnotfull_answer:error_msg="流式生成失败,未收到有效回答"yieldf"data:{json.dumps({'type':'error','message':error_msg},ensure_ascii=False)}\n\n"# 异常捕获exceptExceptionase:error_msg=f"流式生成过程出错:{str(e)}"yieldf"data:{json.dumps({'type':'error','message':error_msg},ensure_ascii=False)}\n\n"

分类逻辑:

  • ✅ 输入验证错误:查询为空
  • ✅ 配置错误:LLM未初始化、RAG未启用
  • ✅ 执行错误:生成失败、异常
3.results- 搜索结果

发送时机:搜索完成后

# 搜索完成后发送结果yieldf"data:{json.dumps({'type':'results','data':{'results':results,# 搜索结果列表(包含 images 字段)'count':len(results),# 结果数量'search_mode':search_mode# 搜索模式}},ensure_ascii=False)}\n\n"

分类逻辑:

  • ✅ 条件:搜索完成(无论是否有结果)
  • ✅ 数据:结果列表、数量、搜索模式
  • 图片数据:每个结果对象的images字段包含图片路径数组
4.token- 答案片段(流式生成)

发送时机:RAG流式生成过程中

# 流式生成答案片段fortokeninanswer_stream:iftokenisNone:continuetoken_str=str(token)ifnotisinstance(token,str)elsetokeniftoken_str:full_answer+=token_str# 每个文本片段都发送一次 token 消息yieldf"data:{json.dumps({'type':'token','token':token_str},ensure_ascii=False)}\n\n"

分类逻辑:

  • ✅ 条件:启用RAG且流式生成中
  • ✅ 频率:每个文本片段发送一次
  • ✅ 数据:单个文本片段
5.answer- 完整答案

发送时机:RAG生成完成后

# 生成完成后发送完整答案iffull_answer:yieldf"data:{json.dumps({'type':'answer','answer':full_answer,# 完整答案'references':results[:5]ifresultselse[]# 参考来源(包含 images)},ensure_ascii=False)}\n\n"

分类逻辑:

  • ✅ 条件:RAG生成成功且有答案
  • ✅ 数据:完整答案、参考来源
  • 图片数据references中每个对象的images字段
6.final- 最终数据

发送时机:整个流程结束时

# RAG成功时(有答案)yieldf"data:{json.dumps({'type':'final','data':{'success':True,'answer':full_answer,'results':results,# 所有结果(包含 images)'references':results[:5],# 参考来源(包含 images)'session_id':session_id}},ensure_ascii=False)}\n\n"# 不使用RAG时(只有搜索结果)yieldf"data:{json.dumps({'type':'final','data':{'success':True,'results':results,# 所有结果(包含 images)'session_id':session_id}},ensure_ascii=False)}\n\n"

分类逻辑:

  • ✅ 条件:流程结束(成功或失败)
  • ✅ 数据:包含所有最终数据(答案、结果、错误、会话ID等)
  • 图片数据resultsreferences中都包含images字段

完整流程示例

场景1:传统搜索模式(不使用RAG)

1. info: "开始搜索知识库..." ↓ 2. info: "找到 5 个相关结果" 或 "正在进行向量搜索..." ↓ 3. results: { results: [ {id: 1, images: ["img1.jpg"], ...}, {id: 2, images: ["img2.jpg"], ...}, ... ], count: 5 } ↓ 4. final: { success: true, results: [{images: [...]}, ...], session_id: "..." }

图片返回时机:

  • results消息:所有搜索结果的图片
  • final消息:所有搜索结果的图片(再次返回)

场景2:RAG模式(成功)

1. info: "开始搜索知识库..." ↓ 2. info: "找到 5 个相关结果" ↓ 3. results: { results: [{images: [...]}, ...], count: 5 } ↓ 4. info: "开始生成智能回答..." ↓ 5. token: "根据" ↓ 6. token: "知识库" ↓ 7. token: "中的信息" ... (多个token,逐字显示) ↓ 8. answer: { answer: "完整答案...", references: [{images: [...]}, ...] // 前5条结果的图片 } ↓ 9. final: { success: true, answer: "完整答案...", results: [{images: [...]}, ...], // 所有结果的图片 references: [{images: [...]}, ...], // 参考来源的图片 session_id: "..." }

图片返回时机:

  • results消息:所有搜索结果的图片(第一次)
  • answer消息:前5条参考结果的图片(第二次)
  • final消息:所有结果和参考来源的图片(第三次和第四次)

场景3:RAG模式(失败)

1. info: "开始搜索知识库..." ↓ 2. results: {results: [{images: [...]}, ...], count: 5} ↓ 3. info: "开始生成智能回答..." ↓ 4. error: "LlamaIndex LLM 未初始化..." ↓ 5. final: { success: true, results: [{images: [...]}, ...], rag_error: "...", session_id: "..." }

图片数据的返回时机

重要发现:图片不是单独返回的

图片数据不是单独返回,而是作为搜索结果的一部分,每个结果对象都有一个images字段(数组)。

图片返回的多个时机

返回时机消息类型包含字段说明
搜索完成后resultsdata.results[].images所有搜索结果的图片
RAG生成完成后answerreferences[].images前5条参考结果的图片
流程结束时finaldata.results[].images
data.references[].images
所有结果和参考来源的图片

图片数据结构

{"id":1,"alarm_code":"E001","alarm_message":"温度传感器故障","solution":"检查传感器连接...","images":["uploads/images/sensor1.jpg","uploads/images/sensor2.jpg"],...}

总结与最佳实践

核心技术要点

  1. 三个层面分离

    • 传输协议层(SSE):标准统一
    • 数据格式层(JSON):项目自定义
    • 大模型API层:模型特定
  2. 适配器模式

    • 适配层:统一不同库的格式差异
    • 包装层:统一应用层的消息格式
  3. 消息类型设计

    • 按流程阶段分类
    • 实时反馈 + 最终数据
    • 错误处理完善

实现建议

  1. 统一定义消息类型

    # 使用常量定义classMessageType:INFO='info'ERROR='error'RESULTS='results'TOKEN='token'ANSWER='answer'FINAL='final'
  2. 错误处理

    • 所有错误都发送error消息
    • 最终都发送final消息(包含错误信息)
  3. 性能优化

    • 快速搜索直接返回完整结果
    • RAG模式流式生成,实时显示
  4. 用户体验

    • 实时状态反馈(info)
    • 逐字显示答案(token)
    • 完整数据保证(final)

设计模式应用

  • 适配器模式:适配不同大模型库的格式
  • 包装器模式:统一应用层的消息格式
  • 观察者模式:前端监听流式消息并更新UI

结语

流式传输是一个涉及多个层面的复杂技术,需要:

  1. 理解三个层面:传输协议、数据格式、大模型API
  2. 实现适配层:统一不同库的格式差异
  3. 实现包装层:统一应用层的消息格式
  4. 设计消息类型:按流程阶段分类
  5. 前后端配合:双方都必须实现流式处理

通过这种设计,我们实现了:

  • 🚀实时反馈:用户可以看到搜索和生成进度
  • ✍️逐字显示:答案逐字显示,类似 ChatGPT
  • 🔄向后兼容:传统搜索模式不受影响
  • 🛡️错误处理:完善的错误处理和提示

希望这篇文章能帮助你理解流式传输的完整实现过程!


需要专业的网站建设服务?

联系我们获取免费的网站建设咨询和方案报价,让我们帮助您实现业务目标

立即咨询