白沙黎族自治县网站建设_网站建设公司_关键词排名_seo优化
2026/1/16 0:20:58 网站建设 项目流程

MAF Workflow 核心概念详解

📚 本课概览

Microsoft Agent Framework (MAF)提供了一套强大的Workflow(工作流)框架,用于编排和协调多个智能体(Agent)或处理组件的执行流程。

本课将以通俗易懂的方式,帮助你理解 MAF Workflow 的核心概念:

概念

说明

类比

Workflow

工作流定义

📋 流程图

Executor

执行器/处理节点

🔧 工人

Edge

连接边

➡️ 传送带

SuperStep

超步/批量处理

⏱️ 处理周期

WorkflowContext

工作流上下文

📦 工作台

WorkflowEvent

工作流事件

📣 通知

Run

运行实例

🏃 一次执行

Checkpoint

检查点

💾 存档点

让我们逐一深入了解这些概念!

🎯 为什么需要 Workflow?

在构建 AI 智能体应用时,我们经常需要:

  1. 多步骤处理:用户请求需要经过多个处理环节

  2. 多智能体协作:不同的 Agent 各司其职,协同完成任务

  3. 条件分支:根据处理结果决定下一步走向

  4. 并行处理:某些任务可以同时进行以提高效率

  5. 状态管理:需要在处理过程中保存和恢复状态

传统方式的问题:

// ❌ 硬编码的流程,难以维护和扩展 var result1 = await agent1.ProcessAsync(input); if (result1.Success) { var result2 = await agent2.ProcessAsync(result1.Data); // ... 越来越复杂 }

使用 Workflow 的优势:

// ✅ 声明式定义,清晰可维护 var workflow = new WorkflowBuilder(startExecutor) .AddEdge(executor1, executor2) .AddEdge(executor2, executor3, condition: x => x.Success) .Build();

一、核心概念:Executor(执行器)

🔧 什么是 Executor?

Executor(执行器)是 Workflow 中的最小工作单元,类似于:

类比

说明

🧑‍🏭 工厂里的工人

每个工人负责一道工序

🧱 乐高积木块

每个积木有特定功能,组合成整体

🔌 电路中的元件

接收输入信号,输出处理结果

📌 Executor 的核心特征

  1. 唯一标识(Id):每个 Executor 有一个唯一的 ID,用于在 Workflow 中引用

  2. 消息处理:接收特定类型的输入消息,处理后产生输出消息

  3. 路由配置:通过ConfigureRoutes方法定义能处理哪些类型的消息

  4. 状态感知:可以通过IWorkflowContext访问和修改工作流状态

🏗️ Executor 的类型层次

MAF 提供了多种 Executor 类型,满足不同场景需求:

类型

用途

示例场景

Executor<TInput>

处理消息,无返回值

日志记录、通知发送

Executor<TInput, TOutput>

处理消息,有返回值

数据转换、AI 调用

FunctionExecutor<T>

用委托函数快速创建

简单处理逻辑

StatefulExecutor<TState>

需要维护状态的执行器

会话管理、计数器

💡 Executor 源码解析

从源码中可以看到 Executor 的核心设计:

// 来自 Executor.cs publicabstractclassExecutor : IIdentified { // 唯一标识符 publicstring Id { get; } // 配置消息路由(子类必须实现) protected abstract RouteBuilder ConfigureRoutes(RouteBuilder routeBuilder); // 执行消息处理 publicasync ValueTask<object?> ExecuteAsync( object message, TypeId messageType, IWorkflowContext context, CancellationToken cancellationToken = default) { // 记录调用事件 await context.AddEventAsync(new ExecutorInvokedEvent(this.Id, message)); // 路由消息到正确的处理器 CallResult? result = awaitthis.Router.RouteMessageAsync(message, context, ...); // 记录完成或失败事件 ExecutorEvent executionResult = result?.IsSuccess is not false ? new ExecutorCompletedEvent(this.Id, result?.Result) : new ExecutorFailedEvent(this.Id, result.Exception); await context.AddEventAsync(executionResult); return result.Result; } }

关键点

  • ✅ 每个 Executor 有唯一 ID

  • ✅ 通过ConfigureRoutes声明能处理的消息类型

  • ✅ 执行过程会产生事件(ExecutorInvokedEventExecutorCompletedEvent等)


二、核心概念:Edge(边)

➡️ 什么是 Edge?

Edge(边)是连接两个 Executor 的消息通道,类似于:

类比

说明

🏭 工厂传送带

把上一道工序的产品传送到下一道工序

🔗 水管

把水从一个容器引导到另一个容器

📨 邮路

把信件从发件人送到收件人

📌 Edge 的三种类型

MAF 支持三种类型的 Edge,适用于不同的流程模式:

类型

说明

使用场景

Direct(直连)

一对一连接

顺序处理流程

FanOut(扇出)

一对多连接

并行分发任务

FanIn(扇入)

多对一连接

汇聚多个结果

💡 Edge 源码解析

从源码可以看到 Edge 的核心结构:

// 来自 Edge.cs publicenum EdgeKind { Direct, // 直连:一对一 FanOut, // 扇出:一对多 FanIn // 扇入:多对一 } publicsealedclassEdge { public EdgeKind Kind { get; init; } // 边的类型 public EdgeData Data { get; init; } // 边的具体数据 } // 来自 DirectEdgeData.cs - 直连边的数据 publicsealedclassDirectEdgeData : EdgeData { publicstring SourceId { get; } // 源 Executor ID publicstring SinkId { get; } // 目标 Executor ID public Func<object?, bool>? Condition; // 可选的条件判断 }

Direct Edge 示意图

关键点

  • ✅ Edge 定义了消息从哪里来、到哪里去

  • ✅ Direct Edge 支持条件路由(只有满足条件的消息才传递)

  • ✅ FanOut Edge 可以实现广播分区逻辑


三、核心概念:Workflow(工作流)

📋 什么是 Workflow?

Workflow(工作流)是将多个 Executor 通过 Edge 连接起来的完整流程定义,类似于:

类比

说明

📋 流程图

定义了从开始到结束的完整流程

🏭 生产线

多个工位通过传送带连接成完整生产线

🎼 乐谱

规定了演奏的顺序和节奏

📌 Workflow 的核心属性

从源码中可以看到 Workflow 的核心结构:

// 来自 Workflow.cs publicclassWorkflow { // 起始 Executor 的 ID publicstring StartExecutorId { get; } // 工作流名称(可选) publicstring? Name { get; internal init; } // 工作流描述(可选) publicstring? Description { get; internal init; } // Executor 绑定字典 internal Dictionary<string, ExecutorBinding> ExecutorBindings { get; init; } // 边的集合(按源节点分组) internal Dictionary<string, HashSet<Edge>> Edges { get; init; } // 输出 Executor 集合 internal HashSet<string> OutputExecutors { get; init; } }

🔨 使用 WorkflowBuilder 构建工作流

MAF 采用建造者模式(Builder Pattern)来构建 Workflow,这使得工作流的定义更加直观:

// 创建工作流示例 var workflow = new WorkflowBuilder(startExecutor) // 指定起点 .WithName("订单处理工作流") // 设置名称 .WithDescription("处理电商订单的完整流程") // 设置描述 .AddEdge(receiveOrder, validateOrder) // 添加边 .AddEdge(validateOrder, processPayment, condition: x => x.IsValid) // 条件边 .AddEdge(processPayment, sendNotification) .WithOutputFrom(sendNotification) // 指定输出节点 .Build(); // 构建工作流

关键方法

方法

说明

AddEdge(source, sink)

添加直连边

AddEdge(..., condition)

添加条件边

AddFanOut(source, sinks)

添加扇出边

AddFanIn(sources, sink)

添加扇入边

WithOutputFrom(executor)

指定输出节点

BindExecutor(executor)

绑定占位符执行器

Build()

构建最终的 Workflow


四、核心概念:SuperStep(超步)

⏱️ 什么是 SuperStep?

SuperStep(超步)是 Workflow 执行的基本处理周期。可以类比为:

类比

说明

🎮 游戏中的"回合"

每个回合内所有玩家同时行动

⏰ 工厂的"班次"

每个班次内完成一批任务

🌊 海浪的"一波"

一波消息被处理,然后产生下一波

📌 SuperStep 的执行流程

每个 SuperStep 内部执行的步骤:

关键事件

  • SuperStepStartedEvent:超步开始

  • SuperStepCompletedEvent:超步完成

// SuperStep 事件定义 public class SuperStepEvent(int stepNumber, object? data = null) : WorkflowEvent(data) { // 超步的序号(从 0 开始) public int StepNumber => stepNumber; }

五、核心概念:WorkflowContext(工作流上下文)

📦 什么是 WorkflowContext?

WorkflowContext(工作流上下文)是 Executor 执行时的运行环境,类似于:

类比

说明

🛠️ 工人的工作台

提供工具、材料和通信渠道

📞 通信枢纽

允许各个工位之间传递信息

💾 共享内存

存储和读取状态数据

📌 IWorkflowContext 核心接口

// 来自 IWorkflowContext.cs publicinterfaceIWorkflowContext { // 添加工作流事件(在当前 SuperStep 结束时触发) ValueTask AddEventAsync(WorkflowEvent workflowEvent, CancellationToken cancellationToken = default); // 发送消息给下游 Executor(在下一个 SuperStep 处理) ValueTask SendMessageAsync(object message, string? targetId, CancellationToken cancellationToken = default); // 输出工作流结果 ValueTask YieldOutputAsync(object output, CancellationToken cancellationToken = default); // 请求在当前 SuperStep 结束时停止工作流 ValueTask RequestHaltAsync(); // 读取状态 ValueTask<T?> ReadStateAsync<T>(string key, string? scopeName = null, CancellationToken cancellationToken = default); // 读取或初始化状态 ValueTask<T> ReadOrInitStateAsync<T>(string key, Func<T> initialStateFactory, string? scopeName = null, CancellationToken cancellationToken = default); // 更新状态(排队更新,在 SuperStep 结束时应用) ValueTask QueueStateUpdateAsync<T>(string key, T value, string? scopeName = null, CancellationToken cancellationToken = default); }

关键点

  • 消息传递:通过SendMessageAsync在 Executor 之间传递消息

  • 状态管理:支持读取、初始化和更新状态

  • 事件通知:通过AddEventAsync发出事件

  • 流程控制:通过RequestHaltAsync停止工作流


六、核心概念:WorkflowEvent(工作流事件)

📣 什么是 WorkflowEvent?

WorkflowEvent(工作流事件)是工作流执行过程中产生的通知消息,类似于:

类比

说明

📢 广播通知

向所有人广播系统状态变化

📝 日志记录

记录系统执行过程中的关键节点

🔔 事件订阅

允许外部监听并响应特定事件

📌 事件分类

事件层级

事件类型

说明

工作流级别WorkflowStartedEvent

工作流开始执行

WorkflowOutputEvent

工作流产生输出

WorkflowErrorEvent

工作流发生错误

WorkflowWarningEvent

工作流产生警告

超步级别SuperStepStartedEvent

超步开始

SuperStepCompletedEvent

超步完成

执行器级别ExecutorInvokedEvent

Executor 被调用

ExecutorCompletedEvent

Executor 完成处理

ExecutorFailedEvent

Executor 处理失败


七、核心概念:Run(运行实例)

🏃 什么是 Run?

Run(运行实例)是 Workflow 的一次具体执行,类似于:

类比

说明

🎬 电影的一场放映

同一部电影可以放映多场

🏭 生产线的一个批次

同一条生产线可以生产多个批次

🎮 游戏的一局

同一个游戏可以玩多局

📌 Run 的核心特性

// 来自 Run.cs publicsealedclassRun : IAsyncDisposable { // 运行实例的唯一标识符 publicstring RunId => this._runHandle.RunId; // 获取当前执行状态 public ValueTask<RunStatus> GetStatusAsync(CancellationToken cancellationToken = default); // 获取所有产生的事件 public IEnumerable<WorkflowEvent> OutgoingEvents => this._eventSink; // 获取自上次访问后的新事件 public IEnumerable<WorkflowEvent> NewEvents { get; } // 恢复执行(带外部响应) public async ValueTask<bool> ResumeAsync(IEnumerable<ExternalResponse> responses, CancellationToken cancellationToken = default); }

📌 RunStatus(运行状态)

public enum RunStatus { NotStarted, // 尚未开始 Idle, // 空闲(已暂停,无待处理请求) PendingRequests, // 等待外部响应 Ended, // 已结束 Running // 正在运行 }

八、核心概念:Checkpoint(检查点)

💾 什么是 Checkpoint?

Checkpoint(检查点)是工作流在某个时刻的完整状态快照,类似于:

类比

说明

🎮 游戏存档

保存游戏进度,随时可以读档继续

📸 照片

记录某一时刻的完整状态

🔖 书签

标记阅读进度,下次从这里继续

📌 Checkpoint 的核心信息

// 来自 CheckpointInfo.cs publicsealedclassCheckpointInfo { // 运行实例的唯一标识符 publicstring RunId { get; } // 检查点的唯一标识符 publicstring CheckpointId { get; } } // 检查点的完整数据(来自 Checkpoint.cs) internalsealedclassCheckpoint { publicint StepNumber { get; } // 超步编号 public WorkflowInfo Workflow { get; } // 工作流信息 public RunnerStateData RunnerData { get; } // 运行器状态 public Dictionary<ScopeKey, PortableValue> StateData { get; } // 状态数据 public Dictionary<EdgeId, PortableValue> EdgeStateData { get; } // 边状态数据 public CheckpointInfo? Parent { get; } // 父检查点 }

📌 Checkpoint 的使用场景

场景

说明

故障恢复

系统崩溃后从最近的检查点恢复

长时间运行

分段执行,每段结束保存进度

人机交互

等待用户输入时保存状态

调试回放

从任意检查点重新执行

版本分支

从同一个检查点创建多个分支执行


九、核心概念关系图

🔗 概念之间的关系

让我们把所有核心概念联系起来,看看它们是如何协作的:

📌 生命周期视角

从工作流的完整生命周期来看:

📌 消息流视角

从消息在工作流中的流动来看:

关键理解

  1. 消息驱动:Executor 之间通过消息传递数据

  2. 异步批处理:同一 SuperStep 内的 Executor 可以并行执行

  3. 边控制流向:Edge 决定消息从哪里到哪里

  4. 状态隔离:每个 SuperStep 结束时应用状态更新


十、实际应用示例

🛒 场景:电商订单处理工作流

让我们通过一个实际场景来理解这些概念的应用:

📌 概念对应关系

概念

在此场景中的体现

Workflow

整个订单处理流程

Executor

每个处理步骤(接收、验证、支付等)

Edge

步骤之间的连接(含条件判断)

SuperStep

每一轮处理(如:超步1处理接收,超步2处理验证...)

WorkflowContext

提供订单状态读写、消息发送能力

WorkflowEvent

每个步骤的开始/完成/失败事件

Run

一个具体订单的处理过程

Checkpoint

处理中途保存的状态(如支付完成后保存)

🤖 场景:多智能体协作工作流

在 AI Agent 场景中,Workflow 可以用来编排多个 Agent 的协作:

Workflow 的优势

优势

说明

🔄灵活编排

可以轻松调整 Agent 之间的协作关系

💾状态管理

自动管理各 Agent 的状态和上下文

⏸️可中断/恢复

支持人机交互,随时暂停和恢复

🔍可观测性

通过事件追踪整个执行过程

🛡️容错能力

通过检查点支持故障恢复


十一、概念总结

📚 核心概念速查表

概念

定义

关键类

核心职责

Executor

执行器/处理节点

Executor

,Executor<T>,FunctionExecutor<T>

处理消息,产生输出

Edge

连接边

Edge

,EdgeData,DirectEdgeData

定义消息流向和条件

Workflow

工作流定义

Workflow

,WorkflowBuilder

组织 Executor 和 Edge

SuperStep

超步/批量处理周期

SuperStepEvent

,SuperStepStartedEvent

批量处理消息

WorkflowContext

工作流上下文

IWorkflowContext

提供运行时服务

WorkflowEvent

工作流事件

WorkflowEvent

,ExecutorEvent

通知执行状态

Run

运行实例

Run

,RunStatus

管理一次执行

Checkpoint

检查点

CheckpointInfo

,ICheckpointStore

保存和恢复状态

📌 记忆口诀

"工作流程边连接,执行器来做处理"
"超步批量往前走,上下文中读写态"
"事件通知全过程,运行实例跑一次"
"检查点来存进度,随时恢复不怕挂"

需要专业的网站建设服务?

联系我们获取免费的网站建设咨询和方案报价,让我们帮助您实现业务目标

立即咨询