阿拉尔市网站建设_网站建设公司_Tailwind CSS_seo优化
2026/1/5 20:56:49 网站建设 项目流程

【大数据 & AI】Flink Agents 源码解读 --- (4) --- AgentPlan

目录
  • 【大数据 & AI】Flink Agents 源码解读 --- (4) --- AgentPlan
    • 0x00 概要
    • 0x01 基本概念
      • 1.1 定义
      • 1.2 在系统中的位置和作用
      • 1.3 流程
    • 0x02 具体功能
      • 2.1 编译
      • 2.2 提供数据
    • 0x03 与 Flink 原生的关系
      • 3.1 核心对应点(AgentPlan ↔ JobGraph)
      • 3.2 非对应点
      • 3.3 总结
    • 0x04 与智能 Agent “规划(Planning)” 的核心相似性
      • 4.1 核心目标
      • 4.2 核心特征
        • 4.2.1 Agent规划
        • 4.2.2 AgentPlan
    • 0xFF 参考

0x00 概要

AgentPlan 是 Flink Agent 框架中的一个核心组件,它起到连接用户定义的 Agent 和实际执行环境之间的桥梁作用。AgentPlan 的核心职责如下:

  • 解析用户定义的 Agent 中的动作(Action)、事件监听规则、资源提供者(ResourceProvider);
  • 提供 AgentPlan 运行时的核心接口(获取事件对应的动作、获取资源、读取动作配置等);
  • 统一管理 Agent 依赖的各类资源(如模型、工具、Prompt、MCP 服务等)的创建与复用。

0x01 基本概念

AgentPlan 本质上是将用户友好的Agent定义转换为运行时可执行的结构化表示,使得执行引擎可以高效地处理Agent逻辑。

1.1 定义

定义如下:

class AgentPlan(BaseModel):"""从用户自定义 Agent 编译得到的智能体执行计划核心作用:封装 Agent 运行所需的动作、事件映射、资源、配置等核心信息"""# 动作名称到动作对象的映射actions: Dict[str, Action]# 事件类型(字符串格式)到监听该事件的动作名称列表的映射actions_by_event: Dict[str, List[str]]# 资源提供者映射:第一层是资源类型,第二层是资源名称,值为对应资源提供者resource_providers: Dict[ResourceType, Dict[str, ResourceProvider]] | None = None# Agent 的全局配置config: AgentConfiguration | None = None# 私有缓存:已创建的资源实例(避免重复初始化),键为 (资源类型, 资源名称)__resources: Dict[ResourceType, Dict[str, Resource]] = {}        

1.2 在系统中的位置和作用

AgentPlan 在系统中的位置和作用如下:

用户定义的Agent↓↓ (AgentPlan.from_agent)   AgentPlan(编译后的执行计划)   ↓    ↓(LocalRunner 使用)
LocalRunner(实际执行)    

AgentPlan 与 Flink 集群的关系如下:

  • 部署阶段:AgentPlan 被序列化并通过 CompileUtils 传递给运行时操作符(如 ActionExecutionOperator)。
  • 运行时行为:操作符使用 AgentPlan 决定如何处理事件和执行动作,但不会修改 AgentPlan 本身。
  • 资源配置:实际资源(如模型连接、工具等)在运行时通过 ResourceProvider 提供,这不改变 AgentPlan 的结构。

并行度调整的影响如下:

  • 状态恢复:故障恢复或手动调整并行度时,Flink 会重新分配键控状态,AgentPlan 结构保持一致。
  • 检查点兼容性:若需更改 AgentPlan(如添加新动作),必须考虑检查点兼容性和状态演化策略。

1.3 流程

AgentPlan 的流程如下:

  • 编译:将用户定义的Agent 转换为可执行的计划
  • 映射管理:维护事件类型和动作之间的映射关系
  • 资源提供:管理和提供各种资源(模型、工具、提示等)
  • 配置存储:存储和提供动作的配置参数
  • 解耦:将用户接口和执行实现解耦,提供统一的执行计划接口

0x02 具体功能

2.1 编译

from_agent 将用户定义的高级 Agent 对象转换为可执行的计划 AgentPlan。流程如下:

AgentPlan_compile

代码如下:

    @staticmethoddef from_agent(agent: Agent, config: AgentConfiguration) -> "AgentPlan":"""核心工厂方法:将用户自定义的 Agent 编译为 AgentPlan:param agent: 用户定义的 Agent 实例:param config: Agent 的全局配置:return: 编译后的 AgentPlan 实例"""# 1. 收集 Agent 中的所有动作(自定义动作 + 内置动作)actions = {}actions_by_event = {}# 合并用户自定义动作和框架内置动作for action in _get_actions(agent) + BUILT_IN_ACTIONS:# 校验动作名称唯一性,避免重复assert action.name not in actions, f"Duplicate action name: {action.name}"actions[action.name] = action# 构建事件类型到动作名称的映射for event_type in action.listen_event_types:if event_type not in actions_by_event:actions_by_event[event_type] = []actions_by_event[event_type].append(action.name)# 2. 收集 Agent 中的所有资源提供者resource_providers = {}for provider in _get_resource_providers(agent):type = provider.typeif type not in resource_providers:resource_providers[type] = {}name = provider.name# 校验资源名称在同类型下的唯一性assert name not in resource_providers[type], f"Duplicate resource name: {name}"resource_providers[type][name] = provider# 3. 创建并返回 AgentPlan 实例return AgentPlan(actions=actions,actions_by_event=actions_by_event,resource_providers=resource_providers,config=config,)

2.2 提供数据

AgentPlan 维护了事件类型到动作的映射关系,用于驱动执行流程。

    # 存储结构actions_by_event: Dict[str, List[str]] # 事件类型 --> 动作名称列表# 查询方法    def get_actions(self, event_type: str) -> List[Action]:"""获取监听指定事件类型的所有动作:param event_type: 事件类型(字符串格式):return: 动作实例列表"""return [self.actions[name] for name in self.actions_by_event[event_type]]    

AgentPlan 管理所有资源的提供者,并在需要时实例化资源。具体代码如下:

    def get_resource(self, name: str, type: ResourceType) -> Resource:"""获取指定类型+名称的资源实例(懒加载 + 缓存复用):param name: 资源名称:param type: 资源类型:return: 资源实例"""# 初始化该类型的资源缓存if type not in self.__resources:self.__resources[type] = {}# 若资源未创建,则通过资源提供者创建并缓存if name not in self.__resources[type]:resource_provider = self.resource_providers[type][name]# 调用资源提供者的 provide 方法创建资源(支持递归获取依赖资源)resource = resource_provider.provide(get_resource=self.get_resource, config=self.config)self.__resources[type][name] = resource# 返回缓存的资源实例return self.__resources[type][name]

AgentPlan 存储和提供动作的配置信息。

    def get_action_config(self, action_name: str) -> Dict[str, Any]:"""获取指定动作的配置:param action_name: 动作名称:return: 动作配置字典"""return self.actions[action_name].configdef get_action_config_value(self, action_name: str, key: str) -> Any:"""获取指定动作配置中的某个键值:param action_name: 动作名称:param key: 配置键名:return: 配置值(不存在则返回 None)"""return self.actions[action_name].config.get(key, None)

Flink 原生的 “Plan 体系”(StreamGraph → JobGraph → ExecutionGraph)是 AgentPlan 的底层支撑,其中 AgentPlan 最直接对应原生 Flink 的 JobGraph,而非笼统的 “Flink Plan”。

这是因为,Flink 中 “Plan” 不是单一组件,而是从 “逻辑定义” 到 “物理执行” 的三层编译产物,这是理解对应关系的基础。AgentPlan 是 Flink Agents 对 Agent 逻辑编译后的 “可执行计划”,其核心定位和原生 Flink 的 JobGraph 完全对齐,而非笼统的 “Plan”。

原生 Plan 层级 核心作用 关键特征
StreamGraph 逻辑拓扑(用户视角) 由 DataStream API 代码生成,描述 “数据处理的逻辑步骤”,不涉及并行度、资源分配
JobGraph 可执行计划(集群视角) StreamGraph 编译后的产物,补充并行度、算子链、中间结果传递规则,是 “系统能识别的执行计划”
ExecutionGraph 物理执行图(运行时视角) JobGraph 提交到集群后生成,绑定 TaskManager、Slot、物理资源,是 “正在运行的拓扑”

3.1 核心对应点(AgentPlan ↔ JobGraph)

我们来看看AgentPlan和JobGraph之间的关联。

  • 角色一致:都是 “编译层产物”,连接 “高层业务定义” 和 “底层物理执行”:

    • 原生:StreamGraph(用户写的 DataStream 逻辑)→ JobGraph(集群可执行计划);
    • Flink Agents:Agent(用户定义的 Agent 行为逻辑)→ AgentPlan(系统可调度的动作执行计划)。
  • 内容一致:都包含 “执行规则” 而非 “纯逻辑”:

    • JobGraph 包含算子并行度、输入输出流、状态存储规则;
    • AgentPlan 包含动作触发规则、资源映射关系、Action 与 Event 的绑定规则。
  • 作用一致:都是 “中间层”,不直接运行,而是给底层执行组件(Operator/TaskManager)提供执行依据。

3.2 非对应点

我们再来看看AgentPlan 不对应哪些。

  • AgentPlan ≠ StreamGraph:StreamGraph 是 “用户逻辑的直接映射”,无编译优化;AgentPlan 是 Agent 逻辑的 “编译优化后产物”,已包含系统可执行的规则(如动作调度优先级)。
  • AgentPlan ≠ ExecutionGraph:ExecutionGraph 是 “运行时物理拓扑”,绑定具体资源;AgentPlan 是 “预执行计划”,不涉及物理资源分配(由 Flink 集群自动处理)。

3.3 总结

  1. Flink 原生的 “Plan” 是三层体系(StreamGraph/JobGraph/ExecutionGraph),AgentPlan 并非对应 “所有 Plan”,而是精准对应 JobGraph
  2. 核心逻辑:AgentPlan 是 Flink Agents 框架对 Agent 高层逻辑的 “编译产物”,和 JobGraph 一样承担 “承上启下” 的作用 —— 把用户的业务语义转换为系统可执行的规则;
  3. 边界:AgentPlan 是 “逻辑可执行计划”,不涉及物理资源分配(这部分仍由原生 Flink 的 ExecutionGraph 处理);

0x04 与智能 Agent “规划(Planning)” 的核心相似性

我们再来看看 Flink Agents 的 AgentPlan 与智能 Agent “规划” 的核心相似性

Flink Agents 中的 AgentPlan 是面向流式计算场景的任务执行规划,而智能 Agent 领域的 “规划(Planning)” 是 Agent 为达成目标制定行动序列的核心能力,二者虽应用场景(流式计算 vs 通用智能)不同,但底层逻辑、核心特征高度契合,具体相似性可从以下维度拆解:

4.1 核心目标

核心目标都是:为 “达成既定目标” 制定可执行路径。

  1. 智能 Agent 的规划:核心是基于自身目标(如 “导航到目的地”“完成订单处理”)、环境状态(如 “当前位置”“订单队列长度”)和可用动作(如 “左转”“分配订单”),生成一套有序的行动序列,本质是 “从目标反推可落地的步骤”,确保 Agent 能通过执行该序列完成任务。
  2. Flink Agents 的 AgentPlan:核心是基于流式计算任务目标(如 “实时统计订单量”“清洗异常日志”)、Flink 集群的资源状态(如 “算子并行度”“节点负载”)和可用计算动作(如 “数据分片”“窗口聚合”“状态存储”),生成一套结构化的流式任务执行方案,本质是 “将抽象的计算目标转化为 Flink 可执行的算子调度、资源分配、数据流转步骤”。

相似性:二者均以 “目标导向” 为核心,规划的最终目的是将抽象目标拆解为可落地、可验证的执行路径,而非单纯的 “任务罗列”。

4.2 核心特征

4.2.1 Agent规划

Agent规划是动态逻辑。

智能 Agent 的规划的执行逻辑为:遵循 “规划生成→执行动作→感知环境反馈→调整规划” 的闭环(如机器人 “规划抓取动作→执行抓取→感知未抓稳→重新规划抓取角度”)。

智能 Agent 的规划不是 “一次性规划”,而是与执行过程形成闭环,通过反馈持续优化规划,确保目标最终达成。

4.2.2 AgentPlan

AgentPlan 是静态执行蓝图,在作业部署时确定,运行期间不因 Flink 集群配置变化而动态调整。任何代理逻辑更改都需重新编译并部署新的 AgentPlan。

AgentPlan 在编译时生成:AgentPlan 在作业提交前从用户定义的 Agent 类编译而来,是静态计划。它包含所有动作(actions)、资源配置(resource providers)以及事件监听关系(actions_by_event)。
静态结构:一旦创建,AgentPlan 的结构在整个作业生命周期内保持不变。

0xFF 参考

需要专业的网站建设服务?

联系我们获取免费的网站建设咨询和方案报价,让我们帮助您实现业务目标

立即咨询