【大数据 & AI】Flink Agents 源码解读 --- (4) --- AgentPlan
- 【大数据 & AI】Flink Agents 源码解读 --- (4) --- AgentPlan
- 0x00 概要
- 0x01 基本概念
- 1.1 定义
- 1.2 在系统中的位置和作用
- 1.3 流程
- 0x02 具体功能
- 2.1 编译
- 2.2 提供数据
- 0x03 与 Flink 原生的关系
- 3.1 核心对应点(AgentPlan ↔ JobGraph)
- 3.2 非对应点
- 3.3 总结
- 0x04 与智能 Agent “规划(Planning)” 的核心相似性
- 4.1 核心目标
- 4.2 核心特征
- 4.2.1 Agent规划
- 4.2.2 AgentPlan
- 0xFF 参考
0x00 概要
AgentPlan 是 Flink Agent 框架中的一个核心组件,它起到连接用户定义的 Agent 和实际执行环境之间的桥梁作用。AgentPlan 的核心职责如下:
- 解析用户定义的 Agent 中的动作(Action)、事件监听规则、资源提供者(ResourceProvider);
- 提供 AgentPlan 运行时的核心接口(获取事件对应的动作、获取资源、读取动作配置等);
- 统一管理 Agent 依赖的各类资源(如模型、工具、Prompt、MCP 服务等)的创建与复用。
0x01 基本概念
AgentPlan 本质上是将用户友好的Agent定义转换为运行时可执行的结构化表示,使得执行引擎可以高效地处理Agent逻辑。
1.1 定义
定义如下:
class AgentPlan(BaseModel):"""从用户自定义 Agent 编译得到的智能体执行计划核心作用:封装 Agent 运行所需的动作、事件映射、资源、配置等核心信息"""# 动作名称到动作对象的映射actions: Dict[str, Action]# 事件类型(字符串格式)到监听该事件的动作名称列表的映射actions_by_event: Dict[str, List[str]]# 资源提供者映射:第一层是资源类型,第二层是资源名称,值为对应资源提供者resource_providers: Dict[ResourceType, Dict[str, ResourceProvider]] | None = None# Agent 的全局配置config: AgentConfiguration | None = None# 私有缓存:已创建的资源实例(避免重复初始化),键为 (资源类型, 资源名称)__resources: Dict[ResourceType, Dict[str, Resource]] = {}
1.2 在系统中的位置和作用
AgentPlan 在系统中的位置和作用如下:
用户定义的Agent↓↓ (AgentPlan.from_agent) AgentPlan(编译后的执行计划) ↓ ↓(LocalRunner 使用)
LocalRunner(实际执行)
AgentPlan 与 Flink 集群的关系如下:
- 部署阶段:AgentPlan 被序列化并通过 CompileUtils 传递给运行时操作符(如 ActionExecutionOperator)。
- 运行时行为:操作符使用 AgentPlan 决定如何处理事件和执行动作,但不会修改 AgentPlan 本身。
- 资源配置:实际资源(如模型连接、工具等)在运行时通过 ResourceProvider 提供,这不改变 AgentPlan 的结构。
并行度调整的影响如下:
- 状态恢复:故障恢复或手动调整并行度时,Flink 会重新分配键控状态,AgentPlan 结构保持一致。
- 检查点兼容性:若需更改 AgentPlan(如添加新动作),必须考虑检查点兼容性和状态演化策略。
1.3 流程
AgentPlan 的流程如下:
- 编译:将用户定义的Agent 转换为可执行的计划
- 映射管理:维护事件类型和动作之间的映射关系
- 资源提供:管理和提供各种资源(模型、工具、提示等)
- 配置存储:存储和提供动作的配置参数
- 解耦:将用户接口和执行实现解耦,提供统一的执行计划接口
0x02 具体功能
2.1 编译
from_agent 将用户定义的高级 Agent 对象转换为可执行的计划 AgentPlan。流程如下:

代码如下:
@staticmethoddef from_agent(agent: Agent, config: AgentConfiguration) -> "AgentPlan":"""核心工厂方法:将用户自定义的 Agent 编译为 AgentPlan:param agent: 用户定义的 Agent 实例:param config: Agent 的全局配置:return: 编译后的 AgentPlan 实例"""# 1. 收集 Agent 中的所有动作(自定义动作 + 内置动作)actions = {}actions_by_event = {}# 合并用户自定义动作和框架内置动作for action in _get_actions(agent) + BUILT_IN_ACTIONS:# 校验动作名称唯一性,避免重复assert action.name not in actions, f"Duplicate action name: {action.name}"actions[action.name] = action# 构建事件类型到动作名称的映射for event_type in action.listen_event_types:if event_type not in actions_by_event:actions_by_event[event_type] = []actions_by_event[event_type].append(action.name)# 2. 收集 Agent 中的所有资源提供者resource_providers = {}for provider in _get_resource_providers(agent):type = provider.typeif type not in resource_providers:resource_providers[type] = {}name = provider.name# 校验资源名称在同类型下的唯一性assert name not in resource_providers[type], f"Duplicate resource name: {name}"resource_providers[type][name] = provider# 3. 创建并返回 AgentPlan 实例return AgentPlan(actions=actions,actions_by_event=actions_by_event,resource_providers=resource_providers,config=config,)
2.2 提供数据
AgentPlan 维护了事件类型到动作的映射关系,用于驱动执行流程。
# 存储结构actions_by_event: Dict[str, List[str]] # 事件类型 --> 动作名称列表# 查询方法 def get_actions(self, event_type: str) -> List[Action]:"""获取监听指定事件类型的所有动作:param event_type: 事件类型(字符串格式):return: 动作实例列表"""return [self.actions[name] for name in self.actions_by_event[event_type]]
AgentPlan 管理所有资源的提供者,并在需要时实例化资源。具体代码如下:
def get_resource(self, name: str, type: ResourceType) -> Resource:"""获取指定类型+名称的资源实例(懒加载 + 缓存复用):param name: 资源名称:param type: 资源类型:return: 资源实例"""# 初始化该类型的资源缓存if type not in self.__resources:self.__resources[type] = {}# 若资源未创建,则通过资源提供者创建并缓存if name not in self.__resources[type]:resource_provider = self.resource_providers[type][name]# 调用资源提供者的 provide 方法创建资源(支持递归获取依赖资源)resource = resource_provider.provide(get_resource=self.get_resource, config=self.config)self.__resources[type][name] = resource# 返回缓存的资源实例return self.__resources[type][name]
AgentPlan 存储和提供动作的配置信息。
def get_action_config(self, action_name: str) -> Dict[str, Any]:"""获取指定动作的配置:param action_name: 动作名称:return: 动作配置字典"""return self.actions[action_name].configdef get_action_config_value(self, action_name: str, key: str) -> Any:"""获取指定动作配置中的某个键值:param action_name: 动作名称:param key: 配置键名:return: 配置值(不存在则返回 None)"""return self.actions[action_name].config.get(key, None)
0x03 与 Flink 原生的关系
Flink 原生的 “Plan 体系”(StreamGraph → JobGraph → ExecutionGraph)是 AgentPlan 的底层支撑,其中 AgentPlan 最直接对应原生 Flink 的 JobGraph,而非笼统的 “Flink Plan”。
这是因为,Flink 中 “Plan” 不是单一组件,而是从 “逻辑定义” 到 “物理执行” 的三层编译产物,这是理解对应关系的基础。AgentPlan 是 Flink Agents 对 Agent 逻辑编译后的 “可执行计划”,其核心定位和原生 Flink 的 JobGraph 完全对齐,而非笼统的 “Plan”。
| 原生 Plan 层级 | 核心作用 | 关键特征 |
|---|---|---|
| StreamGraph | 逻辑拓扑(用户视角) | 由 DataStream API 代码生成,描述 “数据处理的逻辑步骤”,不涉及并行度、资源分配 |
| JobGraph | 可执行计划(集群视角) | StreamGraph 编译后的产物,补充并行度、算子链、中间结果传递规则,是 “系统能识别的执行计划” |
| ExecutionGraph | 物理执行图(运行时视角) | JobGraph 提交到集群后生成,绑定 TaskManager、Slot、物理资源,是 “正在运行的拓扑” |
3.1 核心对应点(AgentPlan ↔ JobGraph)
我们来看看AgentPlan和JobGraph之间的关联。
-
角色一致:都是 “编译层产物”,连接 “高层业务定义” 和 “底层物理执行”:
- 原生:StreamGraph(用户写的 DataStream 逻辑)→ JobGraph(集群可执行计划);
- Flink Agents:Agent(用户定义的 Agent 行为逻辑)→ AgentPlan(系统可调度的动作执行计划)。
-
内容一致:都包含 “执行规则” 而非 “纯逻辑”:
- JobGraph 包含算子并行度、输入输出流、状态存储规则;
- AgentPlan 包含动作触发规则、资源映射关系、Action 与 Event 的绑定规则。
-
作用一致:都是 “中间层”,不直接运行,而是给底层执行组件(Operator/TaskManager)提供执行依据。
3.2 非对应点
我们再来看看AgentPlan 不对应哪些。
- AgentPlan ≠ StreamGraph:StreamGraph 是 “用户逻辑的直接映射”,无编译优化;AgentPlan 是 Agent 逻辑的 “编译优化后产物”,已包含系统可执行的规则(如动作调度优先级)。
- AgentPlan ≠ ExecutionGraph:ExecutionGraph 是 “运行时物理拓扑”,绑定具体资源;AgentPlan 是 “预执行计划”,不涉及物理资源分配(由 Flink 集群自动处理)。
3.3 总结
- Flink 原生的 “Plan” 是三层体系(StreamGraph/JobGraph/ExecutionGraph),AgentPlan 并非对应 “所有 Plan”,而是精准对应 JobGraph;
- 核心逻辑:AgentPlan 是 Flink Agents 框架对 Agent 高层逻辑的 “编译产物”,和 JobGraph 一样承担 “承上启下” 的作用 —— 把用户的业务语义转换为系统可执行的规则;
- 边界:AgentPlan 是 “逻辑可执行计划”,不涉及物理资源分配(这部分仍由原生 Flink 的 ExecutionGraph 处理);
0x04 与智能 Agent “规划(Planning)” 的核心相似性
我们再来看看 Flink Agents 的 AgentPlan 与智能 Agent “规划” 的核心相似性
Flink Agents 中的 AgentPlan 是面向流式计算场景的任务执行规划,而智能 Agent 领域的 “规划(Planning)” 是 Agent 为达成目标制定行动序列的核心能力,二者虽应用场景(流式计算 vs 通用智能)不同,但底层逻辑、核心特征高度契合,具体相似性可从以下维度拆解:
4.1 核心目标
核心目标都是:为 “达成既定目标” 制定可执行路径。
- 智能 Agent 的规划:核心是基于自身目标(如 “导航到目的地”“完成订单处理”)、环境状态(如 “当前位置”“订单队列长度”)和可用动作(如 “左转”“分配订单”),生成一套有序的行动序列,本质是 “从目标反推可落地的步骤”,确保 Agent 能通过执行该序列完成任务。
- Flink Agents 的 AgentPlan:核心是基于流式计算任务目标(如 “实时统计订单量”“清洗异常日志”)、Flink 集群的资源状态(如 “算子并行度”“节点负载”)和可用计算动作(如 “数据分片”“窗口聚合”“状态存储”),生成一套结构化的流式任务执行方案,本质是 “将抽象的计算目标转化为 Flink 可执行的算子调度、资源分配、数据流转步骤”。
相似性:二者均以 “目标导向” 为核心,规划的最终目的是将抽象目标拆解为可落地、可验证的执行路径,而非单纯的 “任务罗列”。
4.2 核心特征
4.2.1 Agent规划
Agent规划是动态逻辑。
智能 Agent 的规划的执行逻辑为:遵循 “规划生成→执行动作→感知环境反馈→调整规划” 的闭环(如机器人 “规划抓取动作→执行抓取→感知未抓稳→重新规划抓取角度”)。
智能 Agent 的规划不是 “一次性规划”,而是与执行过程形成闭环,通过反馈持续优化规划,确保目标最终达成。
4.2.2 AgentPlan
AgentPlan 是静态执行蓝图,在作业部署时确定,运行期间不因 Flink 集群配置变化而动态调整。任何代理逻辑更改都需重新编译并部署新的 AgentPlan。
AgentPlan 在编译时生成:AgentPlan 在作业提交前从用户定义的 Agent 类编译而来,是静态计划。它包含所有动作(actions)、资源配置(resource providers)以及事件监听关系(actions_by_event)。
静态结构:一旦创建,AgentPlan 的结构在整个作业生命周期内保持不变。