常州市网站建设_网站建设公司_图标设计_seo优化
2026/1/5 21:16:45 网站建设 项目流程

各位编程领域的同仁们,大家好!

今天,我们将深入探讨一个在构建现代智能应用中至关重要的话题:如何高效、可靠地编排大型语言模型(LLM)的推理流程。随着LLM能力的飞速发展,我们不再满足于单次的API调用,而是追求构建能够进行多步推理、工具使用、状态管理乃至人机协作的复杂智能体(Agent)。在这个过程中,编排工具的选择变得尤为关键。

我们面前摆着两类截然不同的解决方案:一类是以LangGraph的StatefulGraph为代表的,专为LLM智能体设计、强调内部状态管理的图式框架;另一类则是以Temporal为代表的,源自传统微服务架构、强调分布式、高可靠工作流的编排引擎。理解它们之间的本质差异,以及各自在处理LLM推理时的优劣,将是我们在构建生产级智能应用时的“必杀技”。

LLM推理的独特挑战

在深入比较之前,我们首先要明确,LLM推理及其相关应用的编排,与传统微服务编排有着显著的不同。这些独特性构成了我们选择编排工具时的核心考量:

  1. 非确定性与迭代性:LLM的输出本质上是概率性的,同一输入可能产生不同输出。这意味着我们需要构建能够处理模糊性、进行多轮迭代、甚至在必要时进行重试和修正的流程。传统的确定性工作流可能需要额外适配。
  2. 复杂的状态管理:智能体在执行多步任务时,需要维护大量的上下文信息,包括对话历史、工具调用结果、中间思考过程、用户偏好等。这些状态需要在不同的步骤之间传递和更新。
  3. 工具使用与外部交互:LLM智能体通常需要调用外部工具(API、数据库、自定义函数)来获取信息或执行操作。这引入了外部系统的延迟、错误和并发问题。
  4. 长链式推理与决策:一个复杂的LLM任务可能涉及多个LLM调用、多个工具调用,形成一个长链式的决策过程,其中每一步的结果都可能影响下一步的方向。
  5. 人机协作(Human-in-the-Loop):在许多关键场景下,我们需要人工介入来审查、批准或纠正LLM的输出,这要求编排系统能够灵活地暂停和恢复流程。
  6. 并发与吞吐量:生产环境中,LLM应用需要处理大量的并发请求,同时保持低延迟。
  7. 错误处理与弹性:LLM服务、外部工具都可能失败。编排系统需要健壮的错误处理、重试和超时机制。

这些挑战使得传统的请求-响应模式或简单的函数链式调用不足以支撑复杂的LLM应用。我们需要更高级的编排抽象。

LangGraph的StatefulGraph:智能体的生命线

LangGraph是LangChain生态系统中的一个高级库,它提供了一种基于图的编程模型,用于构建有状态、多演员(multi-actor)的LLM应用。其核心是StatefulGraph,它将智能体的行为建模为一个状态机,非常适合处理复杂的、迭代式的LLM推理逻辑。

核心理念与架构哲学

LangGraph的设计哲学是“以智能体为中心”。它将一个复杂的LLM任务分解为一系列节点(Node),每个节点执行一个特定的动作(如调用LLM、使用工具、执行自定义逻辑),并通过边(Edge)连接这些节点,定义了状态如何从一个节点流向另一个节点。最重要的是,它显式地管理一个在整个图执行过程中共享和更新的State对象。

这种设计使得开发者能够直观地描绘智能体的决策流程和内部思考路径,特别擅长处理“思考-行动-观察-再思考”的迭代循环,这是LLM智能体行为的典型模式。

主要特性:

  1. 节点(Nodes):图的基本单元,可以是LLM调用、工具调用、自定义Python函数等。
  2. 边(Edges):定义了节点之间的转换路径。边可以是条件性的,根据当前状态或节点输出决定下一个节点。
  3. 状态(State):一个可变的字典或Pydantic模型,作为全局上下文在整个图执行过程中传递和更新。这是LangGraph最核心的特性之一,它让智能体能够“记住”和“学习”。
  4. 检查点(Checkpoints):LangGraph支持将当前状态持久化到后端存储(如SQLite、Redis),这使得智能体可以在崩溃后恢复,或在需要人工干预时暂停。
  5. Python Native:完全基于Python,与Python生态系统无缝集成,调试和开发体验友好。
LangGraph工作流示例:一个带有工具使用的RAG智能体

让我们通过一个具体的例子来理解LangGraph。假设我们正在构建一个能够回答用户问题,并在需要时调用搜索引擎来获取最新信息的RAG(Retrieval Augmented Generation)智能体。如果首次回答不满意,智能体可以决定重新搜索或精炼问题。

# 1. 定义状态 from typing import TypedDict, Annotated, List import operator from langchain_core.messages import BaseMessage class AgentState(TypedDict): """ Represent the state of our agent. - messages: A list of messages passing through the agent. - search_query: The query used for search. - search_results: The results from the search tool. - iterations: Number of iterations performed. """ messages: Annotated[List[BaseMessage], operator.add] search_query: str search_results: List[str] iterations: int # 2. 定义工具 (假设我们有一个简单的搜索引擎工具) from langchain_core.tools import tool @tool def search_web(query: str) -> str: """Simulates a web search for the given query.""" print(f"--- Searching web for: {query} ---") # In a real scenario, this would call a search API (e.g., Google Search, Bing, etc.) if "latest AI news" in query.lower(): return "Recent AI news: breakthroughs in multimodal models, faster inference chips, and ethical AI discussions." elif "Python LangGraph" in query.lower(): return "LangGraph is a library for building stateful, multi-actor applications with LLMs, built on LangChain." else: return f"No specific results found for '{query}'. Try a different query." tools = [search_web] # 3. 定义LLM (使用OpenAI模型为例) from langchain_openai import ChatOpenAI from langchain_core.prompts import ChatPromptTemplate from langchain_core.runnables import RunnablePassthrough llm = ChatOpenAI(model="gpt-4o", temperature=0) # 4. 定义节点函数 # 4.1 LLM决策节点:决定是回答、搜索还是精炼 def call_llm_decide(state: AgentState) -> AgentState: print("--- CALL_LLM_DECIDE ---") messages = state['messages'] iterations = state.get('iterations', 0) + 1 prompt = ChatPromptTemplate.from_messages([ ("system", "You are an AI assistant. Given the user's question and previous conversation, " "decide if you need to perform a web search to answer the question, " "or if you can answer directly. " "Also, you can refine the search query if previous search was not good. " "Return 'search' if you need to search, 'answer' if you can answer, " "or 'refine_search' if you need to refine the search query."), ("user", "{question}") ]) # If there are search results, include them in the context for decision if state.get('search_results'): messages.append(("system", f"Previous search results: {state['search_results']}")) response = llm.invoke(prompt.format(question=messages[-1].content)) decision = response.content.lower().strip() print(f"LLM Decision: {decision}") return {"messages": [response], "iterations": iterations, "decision": decision} # Add decision to state temporarily # 4.2 LLM生成答案节点 def call_llm_answer(state: AgentState) -> AgentState: print("--- CALL_LLM_ANSWER ---") messages = state['messages'] search_results = state.get('search_results', []) prompt_template = ChatPromptTemplate.from_messages([ ("system", "You are an AI assistant. Based on the following context and conversation, provide a concise answer. " "If search results are available, use them to formulate the answer."), ("user", "Question: {question}nSearch Results: {search_results}") ]) final_question = messages[-1].content # Find the actual user question from the history if the last message is a decision for msg in reversed(messages): if msg.type == 'human': final_question = msg.content break response = llm.invoke(prompt_template.format(question=final_question, search_results="n".join(search_results))) return {"messages": [response]} # 4.3 LLM生成搜索查询节点 def call_llm_generate_search_query(state: AgentState) -> AgentState: print("--- CALL_LLM_GENERATE_SEARCH_QUERY ---") messages = state['messages'] prompt_template = ChatPromptTemplate.from_messages([ ("system", "You are a helpful assistant. Based on the user's question, generate a concise web search query. " "If previous search results were not satisfactory, refine the query based on the conversation history. " "Only output the search query itself, nothing else."), ("user", "{question}") ]) final_question = messages[-1].content # Find the actual user question from the history if the last message is a decision for msg in reversed(messages): if msg.type == 'human': final_question = msg.content break response = llm.invoke(prompt_template.format(question=final_question)) search_query = response.content.strip() return {"search_query": search_query} # 4.4 工具调用节点 def call_tool_search(state: AgentState) -> AgentState: print("--- CALL_TOOL_SEARCH ---") search_query = state['search_query'] results = search_web.invoke({"query": search_query}) return {"search_results": [results]} # 5. 构建图 from langgraph.graph import StateGraph, END workflow = StateGraph(AgentState) # 5.1 添加节点 workflow.add_node("decide_action", call_llm_decide) workflow.add_node("generate_answer", call_llm_answer) workflow.add_node("generate_search_query", call_llm_generate_search_query) workflow.add_node("call_search_tool", call_tool_search) # 5.2 设置入口 workflow.set_entry_point("decide_action") # 5.3 定义条件边 def route_decision(state: AgentState) -> str: decision = state.get('decision', 'answer') # Default to answer if no decision for some reason if decision == 'search': return "generate_search_query" elif decision == 'refine_search': return "generate_search_query" # Refine search also goes through generate_search_query elif decision == 'answer': return "generate_answer" else: return "generate_answer" # Fallback workflow.add_conditional_edges( "decide_action", route_decision, { "generate_search_query": "generate_search_query", "generate_answer": "generate_answer", }, ) # 5.4 定义普通边 workflow.add_edge("generate_search_query", "call_search_tool") workflow.add_edge("call_search_tool", "decide_action") # After search, re-evaluate what to do next workflow.add_edge("generate_answer", END) # Answer node is the end # 6. 编译图 app = workflow.compile() # 7. 运行智能体 from langchain_core.messages import HumanMessage print("n--- Running Agent for 'What is LangGraph?' ---") inputs = {"messages": [HumanMessage(content="What is LangGraph?")]} for s in app.stream(inputs): if "__end__" not in s: print(s) print("---") print(s) # Final state print("n--- Running Agent for 'Latest AI news?' ---") inputs = {"messages": [HumanMessage(content="Latest AI news?")]} for s in app.stream(inputs): if "__end__" not in s: print(s) print("---") print(s) # Final state print("n--- Running Agent for 'Tell me about the capital of France.' (no search expected) ---") inputs = {"messages": [HumanMessage(content="Tell me about the capital of France.")]} for s in app.stream(inputs): if "__end__" not in s: print(s) print("---") print(s) # Final state

代码解释:

  • AgentState定义了智能体的核心状态,包括消息历史、搜索查询和结果,以及迭代次数。Annotated[List[BaseMessage], operator.add]是LangGraph的特殊语法,表示messages列表会通过operator.add(即列表拼接)来累积。
  • search_web是一个模拟的工具,它接收一个查询并返回结果。
  • call_llm_decide是一个LLM节点,它根据当前状态决定下一步动作(搜索、回答或精炼搜索)。
  • call_llm_answercall_llm_generate_search_query是LLM节点,分别用于生成最终答案和搜索查询。
  • call_tool_search是一个工具节点,调用我们定义的search_web工具。
  • StateGraph用于构建图,add_node添加节点,set_entry_point设置入口。
  • add_conditional_edges是LangGraph的核心,它允许我们基于route_decision函数的输出来动态选择下一个节点。这里,LLM的决策直接控制了图的流程。
  • 图的流程可以是迭代的:call_search_tool之后,会再次回到decide_action节点,让LLM基于新的搜索结果重新评估。
LangGraph的优缺点

优点:

  • 智能体逻辑的直观建模:非常适合复杂的、多步、迭代式的LLM智能体行为。
  • 内置状态管理:State对象和检查点机制极大地简化了智能体内部状态的维护和恢复。
  • Python原生体验:与现有Python工具链和调试器无缝集成,开发效率高。
  • 快速原型开发:能够快速构建和测试复杂的智能体逻辑。
  • 人机协作友好:可以轻松在图中插入节点来等待用户输入或审批。

缺点:

  • 分布式/高可用性:LangGraph本身是一个Python库,其执行是单进程的。虽然可以通过部署多个LangGraph实例并结合共享检查点后端(如Redis)来达到一定程度的并发和恢复,但它并未内置分布式工作流的调度、容错和监控机制。
  • 跨服务编排:主要侧重于单个智能体内部的流程,对于跨越多个独立微服务的复杂业务流程编排,其能力有限。
  • 持久化与恢复:虽然有检查点,但其粒度和可靠性不如专门的分布式工作流引擎。如果整个服务宕机,恢复到精确的执行点可能需要额外的基础设施和管理。

传统微服务编排:以Temporal为例

与LangGraph的内部状态机模型不同,传统微服务编排工具,如Temporal、Cadence、AWS Step Functions等,关注的是如何在分布式环境中可靠地协调多个独立的服务和任务。它们的核心思想是构建“耐久工作流”(Durable Workflows),确保即使在系统故障、网络中断或服务重启的情况下,工作流也能从中断处恢复并继续执行。

我们将以Temporal为例进行深入探讨。

核心理念与架构哲学

Temporal的核心哲学是“代码即工作流”(Code as Workflow)。开发者使用标准编程语言(如Python、Java、Go等)编写工作流代码,这些代码定义了任务的顺序、并行执行、重试策略、超时以及与外部系统的交互。Temporal服务器则负责执行、调度、持久化和监控这些工作流,提供强一致性和容错保证。

主要特性:

  1. 耐久性(Durability):工作流的状态和执行历史被持久化,即使工作流执行器(Worker)崩溃或网络中断,工作流也能在新的Worker上从中断处恢复。
  2. 确定性(Determinism):工作流代码必须是确定性的,即给定相同的输入,每次执行都产生相同的逻辑路径和外部调用序列。这是实现耐久性和可重放性的关键。
  3. 工作流(Workflows):定义业务逻辑的函数,它们是协调者,负责调用活动。
  4. 活动(Activities):执行实际业务逻辑的函数,它们是非确定性的,可以与外部系统交互(如调用LLM API、数据库、第三方服务)。活动是工作流与外部世界交互的“桥梁”。
  5. 重试与超时:内置强大且可配置的重试策略和超时机制,用于处理活动失败或长时间无响应的情况。
  6. 计时器(Timers):允许工作流暂停并在未来某个时间点继续执行。
  7. 信号与查询(Signals & Queries):外部系统可以向正在运行的工作流发送信号(异步通知)或查询其状态(同步获取信息),实现人机协作或外部事件驱动。
  8. 版本化(Versioning):支持工作流代码的平滑升级,无需停止正在运行的旧版本工作流。
  9. 分布式:天生为分布式环境设计,可以水平扩展以处理大量并发工作流。
Temporal工作流示例:一个耐久的LLM批处理与审批工作流

现在,我们来看一个使用Temporal处理LLM任务的例子。假设我们需要一个工作流,它接收批量输入,对每个输入调用LLM进行内容审核,然后将审核结果提交给人工审批,最后更新数据库。

# temporal_app/workflows.py from datetime import timedelta from temporalio.workflow import workflow_method, Workflow, activity_method, info, ExternalTask from temporalio.common import RetryPolicy # 1. 定义活动接口 # Activities can be defined directly or in a separate file. # For simplicity, let's define them here. class LLMActivities: @activity_method async def call_llm_for_moderation(self, content: str) -> str: """ Simulates calling an LLM for content moderation. This would be a call to an external LLM API (e.g., OpenAI, Anthropic). """ print(f"Activity: Moderating content: '{content[:50]}...'") # Simulate LLM call latency import asyncio await asyncio.sleep(2) if "bad word" in content.lower(): return "REJECTED: Contains inappropriate language." elif "sensitive topic" in content.lower(): return "PENDING_REVIEW: Potentially sensitive content." else: return "APPROVED" @activity_method async def update_database_with_result(self, record_id: str, result: str) -> str: """ Simulates updating a database with the moderation result. """ print(f"Activity: Updating DB for {record_id} with result: {result}") import asyncio await asyncio.sleep(1) return f"Database updated for {record_id}." # 2. 定义工作流 class LLMModerationWorkflow(Workflow): def __init__(self): self._result = "" self._approved_by_human = False # Define the activities that this workflow can call @workflow_method async def run_moderation_workflow(self, record_id: str, content: str) -> str: print(f"Workflow: Starting moderation for record {record_id}") # Call LLM for initial moderation (Activity) moderation_result = await self.execute_activity( LLMActivities.call_llm_for_moderation, content, start_to_close_timeout=timedelta(seconds=10), retry_policy=RetryPolicy(maximum_attempts=3) # Retry LLM call up to 3 times ) self._result = moderation_result print(f"Workflow: LLM moderation result for {record_id}: {moderation_result}") # Conditional human approval if "PENDING_REVIEW" in moderation_result: print(f"Workflow: Content {record_id} needs human review. Waiting for signal...") # Wait for an external signal from a human reviewer await self.wait_for_signal("approve_content_signal") self._approved_by_human = True print(f"Workflow: Content {record_id} approved by human.") final_result = "APPROVED_AFTER_REVIEW" elif "REJECTED" in moderation_result: final_result = "REJECTED" else: # APPROVED final_result = "APPROVED" # Update database with final result (Activity) db_update_status = await self.execute_activity( LLMActivities.update_database_with_result, record_id, final_result, start_to_close_timeout=timedelta(seconds=5) ) print(f"Workflow: Final status for {record_id}: {final_result}. DB update: {db_update_status}") return final_result # Define a query method to get current status @workflow_method(query=True) def get_status(self) -> str: return f"Current LLM result: {self._result}, Human approved: {self._approved_by_human}" # Define a signal method for human approval @workflow_method(signal=True) async def approve_content_signal(self): self._approved_by_human = True print("Workflow: Received human approval signal.")
# temporal_app/worker.py from temporalio.worker import Worker from temporalio.client import Client from .workflows import LLMModerationWorkflow, LLMActivities async def main(): client = await Client.connect("localhost:7233") # Connect to Temporal server worker = Worker( client, task_queue="llm-moderation-task-queue", workflows=[LLMModerationWorkflow], activities=[LLMActivities().call_llm_for_moderation, LLMActivities().update_database_with_result], ) print("Worker started. Listening for tasks...") await worker.run() if __name__ == "__main__": import asyncio asyncio.run(main())
# temporal_app/client.py (to start workflows and send signals) from temporalio.client import Client from .workflows import LLMModerationWorkflow import asyncio import uuid async def start_and_signal_workflow(): client = await Client.connect("localhost:7233") # Example 1: Approved content record_id_1 = str(uuid.uuid4()) print(f"n--- Starting Workflow for record {record_id_1} (Approved) ---") handle_1 = await client.start_workflow( LLMModerationWorkflow.run_moderation_workflow, record_id_1, "This is a normal, positive review.", id=f"moderation-workflow-{record_id_1}", task_queue="llm-moderation-task-queue", ) print(f"Workflow {handle_1.id} started. Result: {await handle_1.result()}") # Example 2: Content needing human review record_id_2 = str(uuid.uuid4()) print(f"n--- Starting Workflow for record {record_id_2} (Pending Review) ---") handle_2 = await client.start_workflow( LLMModerationWorkflow.run_moderation_workflow, record_id_2, "This content discusses a sensitive topic that requires review.", id=f"moderation-workflow-{record_id_2}", task_queue="llm-moderation-task-queue", ) # The workflow will now be suspended, waiting for a signal. # We can query its status print(f"Workflow {handle_2.id} is running. Status: {await handle_2.query(LLMModerationWorkflow.get_status)}") print(f"--- Sending approval signal for {record_id_2} after 5 seconds... ---") await asyncio.sleep(5) await handle_2.signal(LLMModerationWorkflow.approve_content_signal) print(f"Signal sent. Result: {await handle_2.result()}") # Example 3: Rejected content record_id_3 = str(uuid.uuid4()) print(f"n--- Starting Workflow for record {record_id_3} (Rejected) ---") handle_3 = await client.start_workflow( LLMModerationWorkflow.run_moderation_workflow, record_id_3, "This content contains a bad word, and is inappropriate.", id=f"moderation-workflow-{record_id_3}", task_queue="llm-moderation-task-queue", ) print(f"Workflow {handle_3.id} started. Result: {await handle_3.result()}") if __name__ == "__main__": asyncio.run(start_and_signal_workflow())

代码解释:

  • LLMActivities:定义了两个活动。call_llm_for_moderation模拟调用LLM进行内容审核,update_database_with_result模拟更新数据库。这些活动可以是任意的Python函数,它们与外部系统交互。
  • LLMModerationWorkflow:这是核心工作流定义。
    • __init__用于初始化工作流的内部状态。
    • run_moderation_workflow是工作流的入口方法。它首先调用call_llm_for_moderation活动。
    • execute_activity是调用活动的关键方法,它包含了重试策略 (RetryPolicy) 和超时设置 (start_to_close_timeout),确保活动的可靠执行。
    • 如果LLM结果需要人工审核(PENDING_REVIEW),工作流会调用wait_for_signal("approve_content_signal")。这是一个阻塞操作,工作流会暂停执行,直到外部系统发送名为approve_content_signal的信号。
    • get_status是一个查询方法,允许外部系统在工作流运行时获取其当前状态。
    • approve_content_signal是一个信号方法,当外部系统发送该信号时,工作流会被唤醒。
  • Worker:负责监听特定任务队列,执行分配给它的工作流和活动。
  • Client:用于启动工作流实例、发送信号和查询工作流状态。

要运行此示例,你需要先启动Temporal Server(通常通过Docker),然后运行worker.py,最后运行client.py。你会看到工作流如何暂停等待人工信号,以及如何通过信号恢复执行。

Temporal的优缺点

优点:

  • 无与伦比的耐久性和容错性:工作流状态被持久化,即使整个应用程序或服务器宕机,工作流也能从中断处恢复,确保业务流程的完成。
  • 高可伸缩性:天生为分布式环境设计,可以处理大规模的并发工作流。
  • 强大的错误处理:内置了重试、超时、心跳等机制,简化了对瞬时故障和外部服务不可用的处理。
  • 长运行工作流:适合跨越数小时、数天甚至数月的业务流程。
  • 可视化与可观测性:Temporal Web UI提供了详细的工作流历史和状态,便于调试和监控。
  • 人机协作与外部事件驱动:通过信号和查询机制,可以方便地集成人工审批、外部事件触发等复杂场景。

缺点:

  • 运维复杂性:需要部署和管理Temporal Server集群,这引入了一定的运维负担(尽管有Temporal Cloud服务可以简化)。
  • 学习曲线:“确定性工作流”的概念对初学者来说可能比较陌生,需要时间理解其约束和最佳实践。
  • 资源开销:相比简单的Python库,Temporal引入了更多的基础设施和网络通信开销。
  • 细粒度LLM智能体:对于LangGraph擅长的非常细粒度的、内部迭代的LLM智能体逻辑,直接使用Temporal可能会显得有些“重量级”。工作流中的每个步骤通常被抽象为活动,而一个复杂的LLM智能体的内部决策可能包含数十个微小步骤。

核心差异:对比分析

现在,让我们通过一个表格来清晰地对比LangGraph的StatefulGraph和以Temporal为代表的传统微服务编排在处理LLM推理时的本质差异。

特性/方面LangGraph的StatefulGraph传统微服务编排(如Temporal)
核心关注点复杂、迭代式的LLM智能体行为,智能体内部状态管理。分布式、高可靠、长运行的业务流程,服务间协调。
执行模型Python进程内的状态机,图结构定义流程。进程外、分布式的工作流引擎,工作流代码定义流程,Worker执行活动。
状态管理显式、可变的State对象在节点间传递和更新,支持检查点。工作流的局部变量作为状态,通过事件历史持久化。通常需要活动来管理外部共享状态。
容错与耐久性通过检查点实现崩溃恢复(需配置后端)。节点内可实现重试。内置强大的耐久性和容错机制,工作流即使在Worker/服务器故障后也能恢复并继续。自动重试、超时。
可伸缩性单实例性能取决于Python进程。水平扩展需外部负载均衡和共享检查点。天生为分布式设计,可水平扩展Worker和Server以支持高吞吐量和并发。
部署模型作为Python应用程序的一部分部署(如Web服务)。需要部署Temporal Server集群和Worker应用程序。
开发体验Pythonic,快速迭代LLM智能体逻辑,对AI/ML工程师友好。学习曲线较高,需理解分布式系统概念,但一旦掌握,可构建非常健壮的系统。
步骤粒度非常细粒度,一个LLM调用、一个工具调用都可以是一个节点。活动通常是粗粒度的,代表对外部服务的调用。工作流是协调者。
人机协作易于在图中插入暂停节点,等待外部输入或审批。通过信号和查询机制实现,外部系统可与运行中的工作流交互。
LLM推理应用场景复杂的LLM智能体、多轮对话、RAG循环、基于LLM输出的决策树、自主代理。企业级、关键业务流程中包含LLM步骤的场景,如内容审核、文档处理、客户服务自动化、订单履行。
架构角色智能体的“大脑”和“决策引擎”。跨服务的“业务流程协调器”。

何时选择哪种方案来处理LLM推理

理解了这些差异后,我们就能更好地判断何时选择哪种工具:

选择LangGraph的StatefulGraph,当:

  • 您的核心任务是构建一个复杂的、迭代式的LLM智能体。您需要一个框架来清晰地定义智能体的决策流、工具使用和内部状态管理。
  • 您关注的是智能体“思考”和“行动”的内在逻辑。LangGraph的图模型非常适合模拟这种思维过程。
  • 您追求快速原型开发和迭代。作为Python库,LangGraph在本地开发和调试方面具有优势。
  • 您的部署策略允许将智能体作为一个相对独立的(或多个实例的)服务来运行,并且可以接受通过外部机制(如Redis)管理检查点来提供一定程度的恢复能力。
  • LLM推理是您应用的核心,而不是一个大型分布式业务流程中的一个步骤。

选择传统微服务编排(如Temporal),当:

  • 您的LLM推理是更大、更关键、更长运行的业务流程的一部分。例如,LLM用于处理客户投诉、自动化审批流程、数据清洗等,这些流程可能涉及多个微服务、数据库、人工干预等。
  • 您需要企业级的耐久性、容错性和可伸缩性。您的业务不能承受因系统故障而丢失正在进行的工作流实例。
  • 您需要强大的内置重试、超时和错误处理机制。外部LLM服务或工具可能不稳定,您需要可靠地处理这些外部依赖的失败。
  • 您需要在分布式微服务架构中协调LLM与其他服务。例如,LLM输出结果后,需要触发另一个服务更新数据库,再触发另一个服务发送通知。
  • 您需要高级的工作流管理功能,如版本控制、调度、详细的审计日志以及外部事件与工作流的交互(信号)。
  • 您已经在使用或计划使用微服务架构,并且愿意承担部署和管理工作流引擎的开销。

混合方法:兼顾智能与可靠

在很多实际场景中,我们并非要在两者之间做非此即彼的选择,而是可以采取一种混合方法来获得两者的最佳优势。

一个强大的策略是:将LangGraph智能体作为Temporal工作流中的一个“活动”(Activity)。

这意味着:

  1. Temporal工作流负责外部的耐久性和协调性:整个业务流程由Temporal工作流定义。它处理重试、超时、外部服务调用、持久化、人机审批等“宏观”流程。
  2. LangGraph智能体负责内部的智能和复杂逻辑:当Temporal工作流需要进行复杂的LLM推理或智能体决策时,它会调用一个包装了LangGraph智能体的Activity。这个Activity内部运行LangGraph图,处理其复杂的、迭代式的状态管理和决策逻辑。

示例构想:

# In Temporal Activities (llm_activities.py) from temporalio.activity import activity_method from typing import Dict, Any # Assume 'run_langgraph_agent' is a function that takes input, # initializes/resumes a LangGraph agent, runs it, and returns the final state. # You would implement this function using your LangGraph app.compile().invoke() or .stream() def run_langgraph_agent(input_data: Dict[str, Any], agent_id: str = None) -> Dict[str, Any]: """ Temporal Activity to run a LangGraph agent. If agent_id is provided, it can be used to load/save LangGraph checkpoints. """ print(f"Activity: Running LangGraph agent for input: {input_data}") # Here, you would instantiate/load your LangGraph app # and run it with the given input. # You might use agent_id to load a specific checkpoint for continuity. # Example: # from your_langgraph_module import my_langgraph_app # app = my_langgraph_app # if agent_id: # # Logic to load checkpoint if LangGraph supports it via config # pass # final_state = app.invoke(input_data) # return final_state # For demonstration: import time time.sleep(5) # Simulate complex agent execution if "urgent" in input_data.get("question", "").lower(): return {"status": "urgent_handled_by_agent", "llm_output": "Agent prioritized and escalated."} else: return {"status": "agent_completed", "llm_output": "Agent processed normally."} class MyLLMHybridActivities: @activity_method async def execute_langgraph_agent(self, input_data: Dict[str, Any], agent_id: str = None) -> Dict[str, Any]: return run_langgraph_agent(input_data, agent_id) # In Temporal Workflow (my_hybrid_workflow.py) from temporalio.workflow import workflow_method, Workflow from datetime import timedelta from temporalio.common import RetryPolicy from .llm_activities import MyLLMHybridActivities class HybridLLMWorkflow(Workflow): @workflow_method async def run_hybrid_llm_process(self, user_request: Dict[str, Any]) -> str: print(f"Workflow: Starting hybrid LLM process for request: {user_request}") # Step 1: Execute a LangGraph agent to handle the core LLM reasoning and tool use # The agent_id could be the workflow_id for checkpointing consistency agent_result = await self.execute_activity( MyLLMHybridActivities.execute_langgraph_agent, user_request, info().workflow_id, # Pass workflow ID as agent_id for LangGraph checkpointing start_to_close_timeout=timedelta(minutes=5), # Give agent enough time retry_policy=RetryPolicy(maximum_attempts=2) # Retry running the entire agent ) print(f"Workflow: LangGraph agent finished. Result: {agent_result}") # Step 2: Based on agent's output, decide next steps in the durable workflow if agent_result.get("status") == "urgent_handled_by_agent": # Potentially call another activity for human notification or high-priority action print("Workflow: Agent identified urgent case. Escalating...") return "URGENT_HANDLED_BY_AGENT" else: # Continue with normal database updates or other downstream processes print("Workflow: Agent processed normally. Continuing with standard flow.") # ... call other activities like updating DB etc. return "NORMAL_PROCESSING_COMPLETED"

这种混合方式使得LangGraph能够专注于其擅长的领域——构建智能体的内部逻辑,而将分布式系统的复杂性、耐久性和可伸缩性交给Temporal等专业工作流引擎处理。这无疑是构建下一代复杂、可靠的智能应用的一个强有力组合。

编排智能:选择与融合的艺术

今天我们深入探讨了LangGraph的StatefulGraph与传统微服务编排(以Temporal为例)在处理LLM推理时的本质差异。我们看到,LangGraph提供了一个直观、强大的模型来构建有状态、迭代式的LLM智能体,其核心在于显式的内部状态管理和基于图的决策流。而Temporal则提供了一个业界领先的平台,用于构建耐久、容错、可伸缩的分布式工作流,其核心在于将业务流程代码化并提供强大的执行保证。

两者并非相互排斥,而是各自在LLM应用的不同层面上发挥着关键作用。LangGraph是智能体的大脑,负责复杂的推理和内部协调;Temporal则是整个业务系统的骨架,确保智能体的运行被可靠地嵌入到更广泛的、高可用的企业流程中。理解它们的定位,并在适当的场景下选择,甚至巧妙地融合它们,将是我们在构建未来智能应用时不可或缺的技能。

需要专业的网站建设服务?

联系我们获取免费的网站建设咨询和方案报价,让我们帮助您实现业务目标

立即咨询