延安市网站建设_网站建设公司_Oracle_seo优化
2025/12/21 7:33:22 网站建设 项目流程

协调的艺术:Pipeline 与消息系统核心解析

请关注公众号【碳硅化合物AI】

摘要

多智能体系统的核心是协调。AgentScope 通过 Pipeline 和消息系统实现了优雅的多智能体编排。本文将深入分析 MsgHub、Pipeline 模式以及消息系统的设计。你会发现,消息(Msg)不仅是数据载体,更是整个框架的统一接口;MsgHub 通过订阅机制实现了自动消息广播;Pipeline 提供了多种编排模式,让多智能体协作变得简单而强大。通过阅读本文,你会理解多智能体如何通过消息进行通信,如何通过 Pipeline 进行编排,以及这些机制背后的设计考量。

入口类与类关系

消息系统的类层次

消息系统非常简单,但设计精妙:

Pipeline 系统的类层次

Pipeline 提供了多种编排模式:

关键代码:Msg 数据结构

Msg 是框架的核心数据结构:

class Msg: """The message class in agentscope.""" def __init__( self, name: str, content: str | Sequence[ContentBlock], role: Literal["user", "assistant", "system"], metadata: dict[str, JSONSerializableObject] | None = None, timestamp: str | None = None, invocation_id: str | None = None, ) -> None: self.name = name self.content = content self.role = role self.metadata = metadata self.id = shortuuid.uuid() self.timestamp = timestamp or datetime.now().strftime(...)

这个设计非常巧妙:

  • 统一接口:所有智能体间通信都使用 Msg
  • 多模态支持:content 可以是字符串或 ContentBlock 列表
  • 元数据支持:metadata 字段可以存储结构化输出等额外信息
  • 自动标识:每个消息都有唯一的 id 和 timestamp

关键流程分析

消息创建和传递流程

消息在智能体间的传递非常直接:

多智能体对话流程

使用 MsgHub 的多智能体对话流程:

Pipeline 执行流程

顺序 Pipeline 的执行流程:

关键技术点

1. 消息作为统一数据结构的设计

Msg 类在 AgentScope 中扮演着核心角色,它是:

  • 智能体间通信的媒介:所有智能体都通过 Msg 交换信息
  • 与 LLM API 的桥梁:Formatter 将 Msg 转换为 API 所需格式
  • 记忆存储的单元:Memory 存储的是 Msg 对象列表
  • UI 显示的数据源:前端可以直接显示 Msg 对象

这种统一设计避免了数据格式转换的复杂性。无论消息来自哪里、要去哪里,都是同一个数据结构。

2. 多模态消息块系统

Msg 支持多模态内容,通过 ContentBlock 系统实现:

class TextBlock(TypedDict, total=False): type: Required[Literal["text"]] text: str class ToolUseBlock(TypedDict, total=False): type: Required[Literal["tool_use"]] id: Required[str] name: Required[str] input: Required[dict[str, object]] class ToolResultBlock(TypedDict, total=False): type: Required[Literal["tool_result"]] id: Required[str] name: Required[str] output: Required[str | List[ContentBlock]] class ImageBlock(TypedDict, total=False): type: Required[Literal["image"]] source: Required[Base64Source | URLSource] class AudioBlock(TypedDict, total=False): type: Required[Literal["audio"]] source: Required[Base64Source | URLSource] ContentBlock = ( ToolUseBlock | ToolResultBlock | TextBlock | ThinkingBlock | ImageBlock | AudioBlock | VideoBlock )

这种设计让消息可以包含:

  • 文本内容
  • 工具调用和结果
  • 图像、音频、视频等多媒体内容
  • 思考过程(thinking block)

所有内容都统一在一个消息对象中,非常灵活。

3. 消息中心(MsgHub)的设计

MsgHub 通过订阅机制实现自动消息广播。关键代码:

def _reset_subscriber(self) -> None: """Reset the subscriber for agent in `self.participant`""" if self.enable_auto_broadcast: for agent in self.participants: agent.reset_subscribers(self.name, self.participants)

当智能体在 MsgHub 中回复时,消息会自动广播给其他参与者。这个机制通过 AgentBase 的订阅系统实现:

# 在 AgentBase 中defreset_subscribers(self,hub_name:str,subscribers:list[AgentBase])->None:"""设置订阅者,当智能体回复时,消息会自动发送给订阅者"""self._subscribers[hub_name]=subscribers

这种设计让多智能体对话变得非常简单:

asyncwithMsgHub([agent1,agent2,agent3]):awaitagent1()# agent2 和 agent3 自动收到消息awaitagent2()# agent1 和 agent3 自动收到消息awaitagent3()# agent1 和 agent2 自动收到消息

4. 不同 Pipeline 模式的实现

AgentScope 提供了多种 Pipeline 模式:

顺序 Pipeline(SequentialPipeline)

async def sequential_pipeline( agents: list[AgentBase], msg: Msg | list[Msg] | None = None, ) -> Msg | list[Msg] | None: """执行智能体序列,前一个的输出作为下一个的输入""" for agent in agents: msg = await agent(msg) return msg

扇出 Pipeline(FanoutPipeline)

async def fanout_pipeline( agents: list[AgentBase], msg: Msg | list[Msg] | None = None, enable_gather: bool = True, ) -> list[Msg]: """将同一个输入分发给多个智能体""" if enable_gather: # 并发执行 tasks = [asyncio.create_task(agent(deepcopy(msg))) for agent in agents] return await asyncio.gather(*tasks) else: # 顺序执行 return [await agent(deepcopy(msg)) for agent in agents]

这些 Pipeline 模式让多智能体编排变得非常灵活。你可以:

  • 顺序执行:前一个智能体的输出作为下一个的输入(适合流水线场景)
  • 并发执行:多个智能体同时处理同一个输入(适合并行分析场景)
  • 动态编排:在 MsgHub 中动态添加或删除参与者

5. 消息的序列化和反序列化

Msg 支持完整的序列化:

def to_dict(self) -> dict: """Convert the message into JSON dict data.""" return { "id": self.id, "name": self.name, "role": self.role, "content": self.content, "metadata": self.metadata, "timestamp": self.timestamp, } @classmethod def from_dict(cls, json_data: dict) -> "Msg": """Load a message object from the given JSON data.""" new_obj = cls( name=json_data["name"], content=json_data["content"], role=json_data["role"], metadata=json_data.get("metadata", None), timestamp=json_data.get("timestamp", None), invocation_id=json_data.get("invocation_id", None), ) new_obj.id = json_data.get("id", new_obj.id) return new_obj

这种设计让消息可以:

  • 保存到文件或数据库
  • 通过网络传输
  • 在不同进程间共享
  • 用于调试和日志记录

总结

Pipeline 和消息系统是 AgentScope 框架中实现多智能体协调的核心:

  1. 消息系统:通过统一的 Msg 数据结构,实现了智能体间通信、API 交互、记忆存储的统一接口
  2. MsgHub:通过订阅机制实现了自动消息广播,让多智能体对话变得简单优雅
  3. Pipeline:提供了多种编排模式,支持顺序、并发、动态等多种场景

这些系统的设计都体现了 AgentScope 的核心理念:透明、模块化、易用。在下一篇文章中,我们会分析扩展机制的实现,了解如何为框架添加新功能。


需要专业的网站建设服务?

联系我们获取免费的网站建设咨询和方案报价,让我们帮助您实现业务目标

立即咨询