LangGraph是基于Pregel模型的Python框架,用于构建状态化、多步骤AI应用。其核心特点包括状态管理、多步骤处理、记忆功能、可中断和可恢复机制。通过StateGraph定义节点和边,结合Channels实现节点通信,Checkpoints提供状态持久化。适用于AI代理系统、复杂工作流和数据处理流水线,支持条件分支、并行处理和流式输出等高级特性,与LangChain生态系统无缝集成,是构建复杂AI应用的强大工具。
在当今的 AI 应用开发中,我们经常面临多步骤、有状态、需要记忆的复杂场景。传统的线性处理方式已无法满足这些需求,这就催生了 LangGraph 框架的诞生。LangGraph 是一个专门用于构建状态化、多步骤 AI 应用的 Python 库,它基于 Pregel 计算模型,提供了构建复杂 AI 工作流的高级抽象。
什么是 LangGraph?
LangGraph 是一个用于构建状态化、多步骤 AI 应用的框架。与传统的单次调用 AI 模型不同,LangGraph 允许我们创建可以记住历史、做出决策、执行多个步骤并根据结果调整行为的复杂 AI 系统。
LangGraph 的核心特点包括:
- 状态化: 保持执行过程中的状态信息
- 多步骤: 支持复杂的多步骤工作流
- 可记忆: 通过检查点机制实现"短期记忆"
- 可中断: 支持人工干预和决策
- 可恢复: 支持暂停和恢复执行
为什么需要 LangGraph?
传统 AI 应用的局限性
传统的 AI 应用通常采用单次调用模式:输入 → AI 模型 → 输出。这种模式在以下场景中存在局限性:
- 多步骤推理: 需要多次调用 AI 模型进行推理
- 状态管理: 需要记住历史交互信息
- 循环处理: 需要根据结果决定是否继续
- 人工干预: 需要在某些节点等待人工确认
LangGraph 解决的问题
LangGraph 通过图结构解决了上述问题:
- 图结构: 使用节点和边定义复杂的执行流程
- 状态管理: 通过状态对象维护执行过程中的数据
- 条件分支: 根据条件动态选择执行路径
- 检查点: 持久化状态,支持恢复和重放
LangGraph 的核心概念
1. Pregel 模型
LangGraph 基于 Pregel 计算模型,这是一种用于大规模图计算的编程模型。Pregel 模型采用 Bulk Synchronous Parallel (BSP) 范式,将计算过程分为多个超步(Superstep):
每个超步包含三个阶段:
- 计划阶段: 确定此步骤中要执行的节点
- 执行阶段: 并行执行选定的节点
- 更新阶段: 更新通道中的值
2. StateGraph
StateGraph 是 LangGraph 中用于创建状态化工作流的主要抽象。它由以下几个核心组件构成:
- 状态(State): 定义图中共享的数据结构
- 节点(Node): 执行特定功能的函数或可运行对象
- 边(Edge): 定义节点间的执行顺序
- 条件边(Conditional Edge): 基于条件动态选择下一个节点
from typing import TypedDictfrom langgraph.graph import StateGraph, START, ENDclass State(TypedDict): value: intdef increment(state):return {"value": state["value"] + 1}def multiply_by_two(state):return {"value": state["value"] * 2}# 创建图builder = StateGraph(State)builder.add_node("increment", increment)builder.add_node("multiply", multiply_by_two)builder.add_edge(START, "increment")builder.add_edge("increment", "multiply")builder.add_edge("multiply", END)graph = builder.compile()# 执行图result = graph.invoke({"value": 1})print(result) # 输出: {'value': 4}3. Channels(通道)
Channels 是节点间通信的主要机制,定义了数据如何在图中流动。主要类型包括:
- LastValue: 存储通道的最后值,适用于需要单一值的状态字段
- Topic: 发布-订阅通道,可累积多个值,适用于消息列表等场景
- BinaryOperatorAggregate: 使用二元操作符聚合值,适用于数值累加等场景
from typing import TypedDictfrom typing_extensions import Annotatedimport operatorclass State(TypedDict):# 使用 reducer 累积消息 messages: Annotated[list, operator.add]# 使用 reducer 累积总数 total: Annotated[int, operator.add]4. Checkpoints(检查点)
检查点系统提供持久化状态的能力,支持暂停、恢复和重放:
from langgraph.checkpoint.memory import MemorySaver# 创建检查点保存器memory = MemorySaver()graph = builder.compile(checkpointer=memory)# 执行图并持久化状态config = {"configurable": {"thread_id": "1"}}result = graph.invoke({"value": 0}, config)5. Interrupts(中断)
中断机制允许在图执行过程中暂停,以便进行人工干预:
from langgraph.types import interruptdef node_with_interrupt(state):# 中断并等待用户输入 user_input = interrupt("请输入一个数字:")return {"user_input": str(user_input)}LangGraph 的应用场景
1. AI 代理系统
AI 代理通常需要多步骤推理、工具调用和状态管理:
from langgraph.prebuilt import create_react_agentfrom langchain_openai import ChatOpenAIfrom langchain.tools import tool@tooldef get_weather(location: str) -> str:"""获取指定位置的天气"""return f"{location}的天气是晴朗的,温度22°C"tools = [get_weather]model = ChatOpenAI(model="gpt-3.5-turbo")agent = create_react_agent(model, tools)result = agent.invoke({"messages": [("user", "北京的天气如何?")]})2. 多步骤工作流
复杂的业务逻辑需要多个步骤协同完成:
from typing import TypedDict, Literalfrom typing_extensions import Annotatedfrom langgraph.graph import StateGraph, START, ENDimport operatorclass Task(TypedDict): id: str description: str status: Literal["pending", "in_progress", "completed", "failed"] created_at: str completed_at: str | Noneclass TaskManagerState(TypedDict): tasks: Annotated[list[Task], operator.add] current_task_id: str | None completed_tasks: Annotated[list[Task], operator.add]def add_task(state): new_task = {"id": f"task_{len(state['tasks']) + 1}","description": f"Sample task {len(state['tasks']) + 1}","status": "pending","created_at": datetime.now().isoformat(),"completed_at": None }return {"tasks": [new_task]}def process_task(state): pending_tasks = [t for t in state["tasks"] if t["status"] == "pending"]ifnot pending_tasks:return {"current_task_id": None} task = pending_tasks[0]# 模拟任务处理 updated_task = {**task, "status": "completed", "completed_at": datetime.now().isoformat()}return {"current_task_id": task["id"],"completed_tasks": [updated_task] }def should_continue(state): pending_tasks = [t for t in state["tasks"] if t["status"] == "pending"]return"process"if pending_tasks else ENDbuilder = StateGraph(TaskManagerState)builder.add_node("add", add_task)builder.add_node("process", process_task)builder.add_conditional_edges(START, lambda s: "add")builder.add_conditional_edges("add", should_continue, ["process", END])builder.add_edge("process", "add")graph = builder.compile()3. 数据处理流水线
构建复杂的数据处理流水线:
class DataPipelineState(TypedDict): raw_data: list processed_data: Annotated[list, operator.add] validation_errors: Annotated[list, operator.add] pipeline_status: strdef extract_data(state):"""从源提取数据""" raw_data = [ {"id": 1, "name": "Alice", "email": "alice@example.com"}, {"id": 2, "name": "Bob", "email": "invalid-email"}, {"id": 3, "name": "Charlie", "email": "charlie@example.com"} ]return {"raw_data": raw_data, "pipeline_status": "extracted"}def validate_data(state):"""验证数据""" errors = [] valid_data = []for item in state["raw_data"]:if"@"in item["email"] and"."in item["email"]: valid_data.append(item)else: errors.append(f"Invalid email for {item['name']}: {item['email']}") result = {"processed_data": valid_data}if errors: result["validation_errors"] = errorsreturn resultdef transform_data(state):"""转换数据格式""" transformed = []for item in state["processed_data"]: transformed.append({"user_id": item["id"],"full_name": item["name"].upper(),"contact": item["email"] })return {"processed_data": transformed, "pipeline_status": "completed"}builder = StateGraph(DataPipelineState)builder.add_node("extract", extract_data)builder.add_node("validate", validate_data)builder.add_node("transform", transform_data)builder.add_edge(START, "extract")builder.add_edge("extract", "validate")builder.add_edge("validate", "transform")builder.add_conditional_edges("transform", lambda s: END if s["pipeline_status"] == "completed"else"transform")graph = builder.compile()高级特性
1. 条件分支
根据条件动态选择执行路径:
def should_continue(state):if state["value"] < 5:return"increment"else:return ENDdef increment(state):return {"value": state["value"] + 1}builder = StateGraph(State)builder.add_node("increment", increment)builder.add_conditional_edges(START, should_continue, ["increment", END])builder.add_edge("increment", START)graph = builder.compile()Result = graph.invoke({"value": 1})print(result) # 输出: {'value': 5}2. 并行处理
使用 Send API 实现并行处理:
from langgraph.types import Senddef distribute_work(state):# 为每个项目创建任务return [Send("process_item", {"item": item}) for item in state["items"]]def should_continue(state):return"square"if len(state["results"]) < len(state["numbers"]) else END3. 错误处理和重试
内置的错误处理和重试机制:
from langgraph.types import RetryPolicy# 定义重试策略retry_policy = RetryPolicy( initial_interval=0.5, backoff_factor=2.0, max_attempts=5, retry_on=Exception)builder.add_node("operation", unreliable_operation, retry_policy=retry_policy)4. 流式处理
支持多种流式输出模式:
# 流式输出每个步骤的更新for chunk in graph.stream({"value": 1}, stream_mode="updates"): print(chunk)# 流式输出最终值for chunk in graph.stream({"value": 1}, stream_mode="values"): print(chunk)架构模式
1. ReAct 模式
ReAct (Reasoning + Acting) 模式是 LangGraph 中常用的 AI 代理模式:
2. Map-Reduce 模式
使用 Send API 实现 Map-Reduce 模式:
def map_tasks(state):# 为每个项目创建任务return [Send("process", {"item": item}) for item in state["items"]]def reduce_results(state):# 合并结果return {"result": sum(state["partial_results"])}最佳实践
1. 状态设计
- 使用 TypedDict 或 Pydantic 模型明确定义状态
- 为需要累积的字段使用适当的 reducer
- 避免状态过于复杂,考虑分解为多个图
2. 错误处理
使用重试策略处理临时性错误
实现适当的错误恢复机制
记录和监控错误情况
3. 性能优化
使用检查点避免重复计算
合理设置递归限制
使用异步 API 提高并发性能
4. 调试和监控
使用流式输出监控执行过程
启用调试模式获取详细信息
使用检查点进行状态回溯
与其他组件的集成
1. LangChain 集成
LangGraph 与 LangChain 生态系统无缝集成:
与 LangChain 的 LLM、工具、记忆等组件无缝集成
使用 Runnable 接口实现节点
利用 LangChain 的回调系统
2. 外部系统集成
通过自定义节点集成数据库
与外部 API 交互
集成消息队列和事件系统
总结
LangGraph 提供了一个强大而灵活的框架来构建复杂的 AI 工作流。通过理解其核心概念(Pregel 模型、StateGraph、Channels、Checkpoints 等),开发者可以构建从简单到复杂的各种 AI 应用,从基本的数值处理到复杂的多步骤推理系统。
LangGraph 的主要优势包括:
状态化管理
提供了完整的状态管理机制
图结构
支持复杂的执行流程定义
可扩展性
支持子图、并行处理等高级特性
容错性
内置错误处理和重试机制
可观察性
提供丰富的监控和调试功能
随着 AI 应用复杂度的不断增加,LangGraph 为开发者提供了一个强大的工具,能够构建更加智能、灵活和可靠的 AI 系统。无论是构建 AI 代理、多步骤工作流,还是复杂的决策系统,LangGraph 都能提供相应的解决方案。
大模型未来如何发展?普通人能从中受益吗?
在科技日新月异的今天,大模型已经展现出了令人瞩目的能力,从编写代码到医疗诊断,再到自动驾驶,它们的应用领域日益广泛。那么,未来大模型将如何发展?普通人又能从中获得哪些益处呢?
通用人工智能(AGI)的曙光:未来,我们可能会见证通用人工智能(AGI)的出现,这是一种能够像人类一样思考的超级模型。它们有可能帮助人类解决气候变化、癌症等全球性难题。这样的发展将极大地推动科技进步,改善人类生活。
个人专属大模型的崛起:想象一下,未来的某一天,每个人的手机里都可能拥有一个私人AI助手。这个助手了解你的喜好,记得你的日程,甚至能模仿你的语气写邮件、回微信。这样的个性化服务将使我们的生活变得更加便捷。
脑机接口与大模型的融合:脑机接口技术的发展,使得大模型与人类的思维直接连接成为可能。未来,你可能只需戴上头盔,心中想到写一篇工作总结”,大模型就能将文字直接投影到屏幕上,实现真正的心想事成。
大模型的多领域应用:大模型就像一个超级智能的多面手,在各个领域都展现出了巨大的潜力和价值。随着技术的不断发展,相信未来大模型还会给我们带来更多的惊喜。赶紧把这篇文章分享给身边的朋友,一起感受大模型的魅力吧!
那么,如何学习AI大模型?
在一线互联网企业工作十余年里,我指导过不少同行后辈,帮助他们得到了学习和成长。我意识到有很多经验和知识值得分享给大家,也可以通过我们的能力和经验解答大家在人工智能学习中的很多困惑。因此,我坚持整理和分享各种AI大模型资料,包括AI大模型入门学习思维导图、精品AI大模型学习书籍手册、视频教程、实战学习等录播视频。
学习阶段包括:
1.大模型系统设计
从大模型系统设计入手,讲解大模型的主要方法。包括模型架构、训练过程、优化策略等,让读者对大模型有一个全面的认识。
2.大模型提示词工程
通过大模型提示词工程,从Prompts角度入手,更好发挥模型的作用。包括提示词的构造、优化、应用等,让读者学会如何更好地利用大模型。
3.大模型平台应用开发
借助阿里云PAI平台,构建电商领域虚拟试衣系统。从需求分析、方案设计、到具体实现,详细讲解如何利用大模型构建实际应用。
4.大模型知识库应用开发
以LangChain框架为例,构建物流行业咨询智能问答系统。包括知识库的构建、问答系统的设计、到实际应用,让读者了解如何利用大模型构建智能问答系统。
5.大模型微调开发
借助以大健康、新零售、新媒体领域,构建适合当前领域的大模型。包括微调的方法、技巧、到实际应用,让读者学会如何针对特定领域进行大模型的微调。
6.SD多模态大模型
以SD多模态大模型为主,搭建文生图小程序案例。从模型选择、到小程序的设计、到实际应用,让读者了解如何利用大模型构建多模态应用。
7.大模型平台应用与开发
通过星火大模型、文心大模型等成熟大模型,构建大模型行业应用。包括行业需求分析、方案设计、到实际应用,让读者了解如何利用大模型构建行业应用。
学成之后的收获👈
•全栈工程实现能力:通过学习,你将掌握从前端到后端,从产品经理到设计,再到数据分析等一系列技能,实现全方位的技术提升。
•解决实际项目需求:在大数据时代,企业和机构面临海量数据处理的需求。掌握大模型应用开发技能,将使你能够更准确地分析数据,更有效地做出决策,更好地应对各种实际项目挑战。
•AI应用开发实战技能:你将学习如何基于大模型和企业数据开发AI应用,包括理论掌握、GPU算力运用、硬件知识、LangChain开发框架应用,以及项目实战经验。此外,你还将学会如何进行Fine-tuning垂直训练大模型,包括数据准备、数据蒸馏和大模型部署等一站式技能。
•提升编码能力:大模型应用开发需要掌握机器学习算法、深度学习框架等技术,这些技术的掌握将提升你的编码能力和分析能力,使你能够编写更高质量的代码。
学习资源📚
- AI大模型学习路线图:为你提供清晰的学习路径,助你系统地掌握AI大模型知识。
- 100套AI大模型商业化落地方案:学习如何将AI大模型技术应用于实际商业场景,实现技术的商业化价值。
- 100集大模型视频教程:通过视频教程,你将更直观地学习大模型的技术细节和应用方法。
- 200本大模型PDF书籍:丰富的书籍资源,供你深入阅读和研究,拓宽你的知识视野。
- LLM面试题合集:准备面试,了解大模型领域的常见问题,提升你的面试通过率。
- AI产品经理资源合集:为你提供AI产品经理的实用资源,帮助你更好地管理和推广AI产品。
👉获取方式: 😝有需要的小伙伴,可以保存图片到wx扫描二v码免费领取【保证100%免费】