测试dify是否可以支持流式http
- 先写一个fastapi 流式返回的接口
from fastapi import FastAPI
from fastapi.responses import StreamingResponseimport asyncio
import time
from typing import AsyncGenerator, Generatorapp = FastAPI(title="FastAPI 流式接口示例")# ------------------- 流式返回JSON数据(实战常用) -------------------
async def json_stream_generator(data_list: list, delay: float = 0.5):"""异步生成器:逐条返回JSON格式数据"""for item in data_list:await asyncio.sleep(delay)# 每条数据返回JSON字符串(注意:流式JSON无需整体数组,逐行返回)json_str = f'{{"index": {item["index"]}, "content": "{item["content"]}"}}\n'yield json_str.encode("utf-8")@app.get("/stream/json")#==================这个接口进行测试.
async def stream_json():"""流式返回JSON格式数据(模拟大模型分段响应)"""# 模拟大模型返回的分段数据mock_data = [{"index": 0, "content": "FastAPI"},{"index": 1, "content": " 是一个高性能的"},{"index": 2, "content": " Python Web框架"},{"index": 3, "content": " 支持异步和流式输出"}]*2return StreamingResponse(json_stream_generator(mock_data),media_type="application/json" # 媒体类型指定为JSON)if __name__ == "__main__":import uvicorn# 启动服务:默认端口8000,开启自动重载uvicorn.run(app, host="0.0.0.0", port=8000)
效果:
