玉林市网站建设_网站建设公司_Node.js_seo优化
2025/12/20 16:36:35 网站建设 项目流程

AI 流式响应实战:从同步等待到实时推送

在 IM 系统中集成 AI 时,流式响应能显著提升性能。本文介绍 AQChat 如何实现 AI 流式响应,从同步等待到实时推送。

一、为什么需要流式响应?

同步等待的问题

传统同步方式的问题

// ❌ 同步方式:用户需要等待AI完整响应StringaiResponse=aiService.getAnswer(userMessage);// 如果AI响应需要10秒,用户就要等待10秒sendMessage(aiResponse);

问题:

  1. 等待时间长:AI 生成可能需要 5-10 秒,用户长时间等待
  2. 体验差:无法看到生成过程,感觉卡顿
  3. 资源占用:长时间占用连线和线程

流式响应的优势

  1. 实时反馈:逐字显示,用户可立即看到内容
  2. 体验更好:类似 ChatGPT 的打字机效果
  3. 资源利用:边生成边推送,不阻塞

对比

方式首字延迟完整响应时间用户体验
同步等待10秒10秒
流式响应1-2秒10秒

回调函数模式的设计

统一接口设计

定义统一的 AI 服务接口

publicinterfaceIAiService{/** * 流式调用AI服务 * @param userMsg 用户消息 * @param consumer 回调函数,处理每个数据块 */voidstreamCallWithMessage(StringuserMsg,Consumer<AIResult>consumer);/** * 多轮对话 */defaultvoidchat(Stringmessage,List<MessageRecord>messages,Consumer<AIResult>consumer){}}

关键点

  • 使用Consumer<AIResult>作为回调
  • 每个数据块通过回调处理
  • 支持多轮对话

AIResult 设计

publicinterfaceAIResult{StringgetContent();// 当前数据块的内容intgetStatus();// 状态:WAIT(0-进行中)、END(1-结束)、FAIL(2-失败)}

状态枚举

publicenumAIMessageStatusEnum{WAIT(0,"wait"),// 流式响应进行中END(1,"end"),// 流式响应结束FAIL(2,"fail");// 流式响应失败}

三、WebSocket 实时推送的实现

整体流程

用户发送消息 ↓RocketMQ异步处理 ↓ AI服务流式调用 ↓ 回调函数处理每个数据块 ↓ 封装为 STREAM_MSG_NOTIFY ↓WebSocket实时推送

代码实现

  1. RocketMQ 消费者接收消息
@ComponentpublicclassAIHelperReceiverimplementsInitializingBean{@ResourceprivateIAiServiceaiService;@ResourceprivateGlobalChannelHolderglobalChannelHolder;publicvoidinitConsumer(){defaultMQPushConsumer.setMessageListener((MessageListenerConcurrently)(messageExtList,context)->{for(MessageExtmessageExt:messageExtList){MessageDtomessageDto=JSONObject.parseObject(msgStr,MessageDto.class);// 提交到独立线程池,不阻塞MQ消费线程threadPoolUtil.submitTask(()->{StringBuilderfullContent=newStringBuilder();try{// 流式调用AI服务aiService.streamCallWithMessage(messageDto.getMessageContent(),aiResult->{// 回调函数:处理每个数据块AIMessageDtoaiMessageDto=newAIMessageDto();aiMessageDto.setMessageId(messageDto.getMessageId());aiMessageDto.setRoomId(messageDto.getRoomId());aiMessageDto.setContent(aiResult.getContent());aiMessageDto.setStatus(aiResult.getStatus());// 实时推送globalChannelHolder.sendBroadcastAIMessage(aiMessageDto,AQBusinessConstant.AI_HELPER_ID);// 累积完整内容fullContent.append(aiResult.getContent());});}catch(Exceptione){// 错误处理LOGGER.error("AI助手处理消息失败",e);AIMessageDtofailMessage=newAIMessageDto();failMessage.setStatus(AIMessageStatusEnum.FAIL.getCode());globalChannelHolder.sendBroadcastAIMessage(failMessage,AQBusinessConstant.AI_HELPER_ID);}finally{// 流式响应结束后,持久化完整消息MessageDtostoreMessage=buildStoreMessage(messageDto,fullContent);messageService.saveMessage(storeMessage);}});}returnConsumeConcurrentlyStatus.CONSUME_SUCCESS;});}}
  1. 封装流式消息并推送
@ComponentpublicclassGlobalChannelHolder{publicvoidsendBroadcastAIMessage(AIMessageDtoaiMessageDto,StringaiId){// 1. 获取AI助手信息UserGlobalInfoDtouserInfo=userHolder.getUserInfo(aiId);// 2. 构建流式消息AQChatMsgProtocol.StreamMsgNotifystreamMsgNotify=AQChatMsgProtocol.StreamMsgNotify.newBuilder().setUser(userBuilder).setMsgId(aiMessageDto.getMessageId()).setRoomId(aiMessageDto.getRoomId()).setContent(aiMessageDto.getContent()==null?"":aiMessageDto.getContent()).setStreamType(aiMessageDto.getStatus())// 0-进行中,1-结束,2-失败.build();// 3. 广播到房间内所有用户messageBroadcaster.broadcast(aiMessageDto.getRoomId(),streamMsgNotify);}}
  1. 消息广播
@ComponentpublicclassMessageBroadcaster{privatefinalMap<String,ChannelGroup>channelGroupMap=newConcurrentHashMap<>();public<TextendsGeneratedMessageV3>voidbroadcast(StringroomId,Tmsg){ChannelGroupchannelGroup=channelGroupMap.get(roomId);if(channelGroup!=null){// 批量发送,高效channelGroup.writeAndFlush(msg);}}}

四、流式消息的封装(STREM_MSG_NOTIFY)

Protobuf 消息定义

// 流式消息通知messageStreamMsgNotify{string roomId=1;// 房间IDstring msgId=2;// 消息IDUseruser=3;// AI助手信息int32 streamType=4;// 流类型:0-进行中,1-结束,2-失败string content=5;// 当前数据块内容}

消息类型

enumMsgCommand{// ...STREAM_MSG_NOTIFY=32;// 流式消息通知// ...}

消息状态流转

用户发送消息 ↓ STREAM_MSG_NOTIFY(streamType=0,content="你")← 第一个数据块 ↓ STREAM_MSG_NOTIFY(streamType=0,content="好")← 第二个数据块 ↓ STREAM_MSG_NOTIFY(streamType=0,content=",")← 第三个数据块 ↓...↓ STREAM_MSG_NOTIFY(streamType=1,content="")← 结束标志

前端处理示例(伪代码)

websocket.onmessage=(event)=>{constmessage=JSON.parse(event.data);if(message.command==='STREAM_MSG_NOTIFY'){if(message.streamType===0){// 进行中:追加内容appendContent(message.content);}elseif(message.streamType===1){// 结束:显示完整消息showCompleteMessage();}elseif(message.streamType===2){// 失败:显示错误提示showErrorMessage();}}};

五、多 AI 平台集成的统一接口设计

问题:不同 AI 平台的 API 不同

  • 阿里百炼:使用Flowable<GenerationResult>
  • Gitee AI:使用MessageHandler<String>
  • 其他平台:可能有不同的流式接口

解决方案:统一接口 + 适配器模式

  1. 统一接口定义
publicinterfaceIAiService{/** * 流式调用,统一使用 Consumer<AIResult> 回调 */voidstreamCallWithMessage(StringuserMsg,Consumer<AIResult>consumer);}
  1. 阿里百炼实现
@Service@PrimarypublicclassQWAiServiceimplementsIAiService{@OverridepublicvoidstreamCallWithMessage(StringuserMsg,Consumer<AIResult>consumer){Generationgen=newGeneration();Messagemessage=Message.builder().role(Role.USER.getValue()).content(userMsg).build();// 调用阿里百炼流式APIFlowable<GenerationResult>result=gen.streamCall(generationParam);// 转换为统一格式result.blockingForEach(r->{Stringcontent=r.getOutput().getChoices().get(0).getMessage().getContent();StringfinishReason=r.getOutput().getChoices().get(0).getFinishReason();QWResultqwResult=newQWResult();qwResult.setContent(content);// 判断是否结束qwResult.setStatus("stop".equals(finishReason)?AIMessageStatusEnum.END.getCode():AIMessageStatusEnum.WAIT.getCode());// 调用统一回调consumer.accept(qwResult);});}}
  1. Gitee AI 实现
@ServicepublicclassGiteeAIServiceimplementsIAiService{@ResourceprivateGiteeAIClientgiteeAIClient;@OverridepublicvoidstreamCallWithMessage(StringuserMsg,Consumer<AIResult>consumer){// 调用Gitee AI流式APIgiteeAIClient.streamChat(message,messageList,data->{JSONObjectparse=JSONObject.parseObject(data);JSONArraychoices=parse.getJSONArray("choices");JSONObjectchoicesIn=choices.getJSONObject(0);StringfinishReason=choicesIn.getString("finish_reason");if(finishReason!=null&&finishReason.equals("stop")){// 结束GiteeResultgiteeResult=newGiteeResult();giteeResult.setStatus(AIMessageStatusEnum.END.getCode());consumer.accept(giteeResult);return;}// 进行中JSONObjectdelta=choicesIn.getJSONObject("delta");Stringcontent=delta.getString("content");if(content!=null&&!content.isEmpty()){GiteeResultgiteeResult=newGiteeResult();giteeResult.setContent(content);giteeResult.setStatus(AIMessageStatusEnum.WAIT.getCode());consumer.accept(giteeResult);}});}}

统一接口的优势

  1. 业务代码无需关心具体平台
  2. 易于扩展新平台
  3. 便于切换平台(通过@Prime注解)

使用示例

// 业务代码只需要调用统一接口@ResourceprivateIAiServiceaiService;// Spring会自动注入@Primary的实现aiService.streamCallWithMessage(userMsg,aiResult->{// 处理流式响应,不关心是哪个AI平台sendBroadcastAIMessage(aiResult);});

六、性能优化

  1. 独立线程池
// AI处理在独立线程池中执行,不阻塞MQ消费线程threadPoolUtil.submitTask(()->{aiService.streamCallWithMessage(userMsg,consumer);});

优势

  • 不阻塞 RocketMQ 消费线程
  • AI 处理失败不影响其他消息
  • 可控制并发数
  1. 异步处理
// 消息发送到RocketMQ,异步处理mqSendingAgent.aiHelper(messageDto);// 立即返回,不等待AI响应

优势

  • 用户发送消息后立即返回
  • AI 响应通过 WebSocket 实时推送
  • 提升响应速度

七、总结

关键点

  1. 流式响应:使用回调函数模式,实时推送每个数据块
  2. 统一接口:IAiservice统一不同 AI 平台的接口
  3. WebSocket 推送:通过STREAM_MSG_NOTIFY实时推送
  4. 异步处理:使用 RocketMQ + 独立线程池,不阻塞主流程

优化效果

指标同步流式响应提升
首字延迟10秒1-2秒5-10倍
用户体验显著提升
资源占用降低

经验总结

  1. 流式响应能显著提升性能
  2. 统一接口便于多平台集成
  3. 异步处理避免阻塞
  4. 回调函数模式适合流式场景

通过以上实现,AQChat 实现了类似 ChatGPT 的流式响应效果,提升了用户体验。

需要专业的网站建设服务?

联系我们获取免费的网站建设咨询和方案报价,让我们帮助您实现业务目标

立即咨询