Eino-Workflow 实战详解

张开发
2026/4/11 7:42:09 15 分钟阅读

分享文章

Eino-Workflow 实战详解
Eino-Workflow 实战详解概述本文详细讲解 Eino 框架中 Workflow 的概念、初始化、编排和编译过程。Workflow 是 Eino 框架中用于构建分支 AI 流程的核心组件提供了比 Chain 更灵活的编排能力代码链接。一、Eino 框架中 Workflow 的概念1.1 什么是 WorkflowWorkflow工作流是 Eino 框架中用于编排 AI 流程的重要组件。与 Chain 的线性执行不同Workflow 支持设置多个输入源和分支结构可以构建更复杂的 AI 应用流程。1.2 Workflow vs Chain vs Graph特性ChainWorkflowGraph结构严格线性分支结构DAG输入源单一起点多个入口多个入口节点连接顺序执行链式分支任意连接执行模式顺序顺序/并行拓扑排序适用场景简单流水线多阶段流程复杂AI应用灵活性低中高1.3 Workflow 的核心特点workflow:compose.NewWorkflow[map[string]any,*schema.Message]()泛型参数[map[string]any, *schema.Message]定义输入输出类型链式调用通过AddXXXNode().AddInput()实现链式编排多入口设置可设置多个AddInput来源明确的终点通过.End()标记工作流终点二、如何初始化 Workflow2.1 创建 Workflow 实例funcinitWorkflow(cfg*Config)error{ctx:context.Background()wf:compose.NewWorkflow[map[string]any,*schema.Message]()// ...}2.2 定义提示词模板systemTpl:你是一名篮球教练与比赛分析师。你需要结合用户的基本信息与训练习惯 使用 player_info API 补全用户画像然后给出位置建议、核心技能树、一周训练计划、以及一套简单战术建议。chatTpl:prompt.FromMessages(schema.FString,schema.SystemMessage(systemTpl),schema.MessagesPlaceholder(histories,true),schema.UserMessage({user_query}),)2.3 创建 ChatModelchatModel,err:openai.NewChatModel(ctx,openai.ChatModelConfig{APIKey:cfg.Model.APIKey,Model:cfg.Model.ModelName,BaseURL:cfg.Model.BaseURL,})2.4 创建工具节点playerInfoTool:utils.NewTool(schema.ToolInfo{Name:player_info,Desc:根据用户的姓名和邮箱查询用户的篮球相关信息,ParamsOneOf:schema.NewParamsOneOfByParams(map[string]*schema.ParameterInfo{name:{Type:string,Desc:用户的姓名},email:{Type:string,Desc:用户的邮箱},}),},func(ctx context.Context,input*playerInfoRequest)(*playerInfoResponse,error){returnplayerInfoResponse{Name:input.Name,Email:input.Email,Role:锋线,HeightCM:182,WeightKG:78,PlayStyle:偏投射无球空切偶尔持球突破,WeeklyHours:4,},nil},)toolsNode,err:compose.NewToolNode(ctx,compose.ToolsNodeConfig{Tools:[]tool.BaseTool{playerInfoTool},})2.5 创建 Lambda 节点Lambda 节点用于数据转换处理节点的输入输出// 把 toolsNode 输出的 []*schema.Message - 提炼成一个普通 user messagetoolToTextOps:func(ctx context.Context,input*schema.StreamReader[[]*schema.Message],)(*schema.StreamReader[*schema.Message],error){returnschema.StreamReaderWithConvert(input,func(msgs[]*schema.Message)(*schema.Message,error){iflen(msgs)0{returnnil,errors.New(no message)}typelitestruct{Contentstringjson:content,omitempty}lites:make([]lite,0,len(msgs))for_,m:rangemsgs{ifmnil{continue}litesappend(lites,lite{Content:m.Content})}b,_:json.MarshalIndent(lites,, )text:工具返回的用户信息汇总\nstring(b)returnschema.UserMessage(text),nil}),nil}lambdaToolToText:compose.TransformableLambda[[]*schema.Message,*schema.Message](toolToTextOps)三、如何编排 Workflow3.1 节点类型Workflow 支持的节点类型与 Graph 类似节点类型方法说明ChatTemplateNodeAddChatTemplateNode提示词模板节点ChatModelNodeAddChatModelNode大模型节点ToolsNodeAddToolsNode工具节点LambdaNodeAddLambdaNode自定义转换节点3.2 链式编排Workflow 的核心特点是链式调用每个节点方法返回 Workflow 自身支持连续调用wf.AddChatTemplateNode(prompt,chatTpl).AddInput(compose.START)wf.AddChatModelNode(chat,chatModel).AddInput(prompt)wf.AddToolsNode(tools,toolsNode).AddInput(chat)wf.AddLambdaNode(tool_to_text,lambdaToolToText).AddInput(tools)wf.AddLambdaNode(prompt_transform,lambdaPrompt).AddInput(tool_to_text)wf.AddChatModelNode(chat_recommend,chatModel).AddInput(prompt_transform)wf.End().AddInput(chat_recommend)3.3 关键方法说明AddChatTemplateNode(name, template)添加提示词模板节点AddChatModelNode(name, model)添加大模型节点AddToolsNode(name, tools)添加工具节点AddLambdaNode(name, lambda)添加 Lambda 转换节点AddInput(source)设置节点的输入源可以是compose.START、其他节点名称或多个节点End()标记工作流终点3.4 Workflow 编排流程图START ──────────────────────────────────────────────────────── │ │ ▼ │ ┌─────────────────┐ │ │ promptNode │ (ChatTemplate: 篮球教练系统提示词) │ └────────┬────────┘ │ │ │ ▼ │ ┌─────────────────┐ │ │ chatNode │ (ChatModel: 第一次模型调用决定是否调用工具) │ └────────┬────────┘ │ │ │ ▼ │ ┌─────────────────┐ │ │ toolsNode │ (ToolsNode: player_info 工具) │ └────────┬────────┘ │ │ │ ▼ │ ┌─────────────────┐ │ │ tool_to_text │ (Lambda: 工具结果转为用户消息) │ └────────┬────────┘ │ │ │ ▼ │ ┌─────────────────┐ │ │ prompt_transform│ (Lambda: 构造第二次模型输入) │ └────────┬────────┘ │ │ │ ▼ │ ┌──────────────────────┐ │ │ chat_recommend │ (ChatModel: 第二次模型调用生成最终推荐) │ └────────┬─────────────┘ │ │ │ ▼ │ END ◀─────────────────────────────────────────────────┘四、如何编译 Workflow4.1 编译方法funccompileWorkflow()error{ctx:context.Background()r,err:workflow.Compile(ctx)iferr!nil{returnfmt.Errorf(编译 Workflow 失败: %w,err)}runnabler log.Printf(Workflow 编译成功)returnnil}4.2 执行 Workflow编译后的 Workflow 是一个Runnable对象output,err:runnable.Invoke(ctx,map[string]any{histories:[]*schema.Message{},user_query:req.UserQuery,})4.3 响应处理resp:WorkflowResponse{Content:output.Content,ReasoningContent:output.ReasoningContent,}ifoutput.ResponseMeta!niloutput.ResponseMeta.Usage!nil{resp.PromptTokensoutput.ResponseMeta.Usage.PromptTokens resp.OutputTokensoutput.ResponseMeta.Usage.CompletionTokens resp.TotalTokensoutput.ResponseMeta.Usage.TotalTokens}五、Workflow 与 Graph 的区别5.1 编排方式不同Workflow 链式编排wf.AddChatTemplateNode(prompt,chatTpl).AddInput(compose.START)wf.AddChatModelNode(chat,chatModel).AddInput(prompt)Graph 声明式编排g.AddChatTemplateNode(prompt,chatTpl)g.AddChatModelNode(chat,chatModel)g.AddEdge(prompt,chat)// 显式添加边5.2 结构特点Workflow更适合线性或简单分支结构代码更简洁Graph更适合复杂网状结构需要显式定义节点间的边5.3 选择建议场景推荐使用简单流水线Chain多阶段分支流程Workflow复杂 DAG 结构Graph六、完整项目结构lab02/ ├── config.yml # 配置文件 ├── workflow/ │ ├── workflow_chat.go # 主程序 │ ├── workflow_chat # 编译产物 │ └── docs/ # Swagger文档 │ ├── docs.go │ ├── swagger.json │ └── swagger.yaml配置文件 config.ymlmodel:base_url:https://api.minimaxi.com/v1api_key:your-api-keymodel_name:MiniMax-M2.7app:host:0.0.0.0port:8080七、运行项目启动服务cdlab02/workflow go run workflow_chat.go-logworkflow.logAPI 调用curl-XPOST http://localhost:8080/workflow\-HContent-Type: application/json\-d{user_query: 我叫morning邮箱是lumworngmail.com帮我制定训练计划}访问 Swagger UIhttp://localhost:8080/swagger/index.html八、总结本文通过workflow_chat.go实战项目详细讲解了Workflow 概念Eino 框架中用于构建分支 AI 流程的核心组件支持链式编排初始化 Workflow创建实例、定义模板、创建工具和 Lambda 节点编排 Workflow通过链式调用AddNode().AddInput()方法串联各节点编译 Workflow调用Compile方法将 Workflow 编译为可执行对象Workflow 相比 Chain 提供了更灵活的分支编排能力同时比 Graph 的语法更简洁适用于多阶段、多分支的 AI 应用场景。

更多文章