摘要:传统的 RAG 架构像一条单向流动的流水线,一旦检索失败,整个系统就会崩溃或产生幻觉。本文将带你深入下一代 AI 架构——Agentic RAG。我们将使用LangGraph构建一个具备“自愈能力”的 CRAG(Corrective RAG)系统,它能够通过自我反思、查询重写和网络搜索,主动修正错误,从“复读机”进化为“智能体”。
前言:从“流水线工人”到“决策专家”
在过去的两年里,开发者构建 RAG(检索增强生成)应用的方式大多是线性的:用户提问 -> 向量检索 -> Prompt 组装 -> LLM 生成。
这就像是一个只会按部就班的流水线工人。如果给他的零件(检索到的文档)是坏的(不相关),他依然会强行组装,最终产出“幻觉”严重的次品。他不会思考,不会喊停,更不会主动去找更好的零件。
现实世界的挑战往往充满不确定性:
模糊提问:用户问“苹果最近怎么样?”,是指股价、新品还是农产品?
数据过时:向量库里只有去年的财报,无法回答昨天的新闻。
检索噪音:检索出的 Top-3 文档虽然关键词匹配,但语义完全无关。
为了解决这些问题,我们需要Agentic(智能体化)。我们需要一个懂得反思(Reflection)、懂得循环(Loop)、懂得**使用工具(Tool use)的“专家”。而 LangGraph,正是通过引入图(Graph)和状态(State)**的概念,赋予了 AI 这种核心能力。
第一部分:架构深度解析 (CRAG)
我们将构建一个经典的Corrective RAG (CRAG)模式。其核心思想在于“自愈”:在传统 RAG 失败的地方,CRAG 能通过自我检测和外部检索来修正错误。
1.1 系统逻辑流程图
在写代码之前,让我们先通过图理解整个数据流转逻辑:
1.2 核心角色分工
检索员 (Retriever):系统的入口,负责从向量数据库捞取“可能相关”的切片。
审核官 (Grader):Agent 的灵魂。它负责对检索结果进行二分类评分(Relevant/Not Relevant)。如果不合格,它会触发报警。
策略师 (Router):根据审核官的评分决定下一步是直接生成,还是启动“B计划”(联网搜索)。
研究员 (Web Searcher):当内部知识不足时,利用 Tavily 或 Google Search 补充外部实时信息。
第二部分:环境准备与状态定义
2.1 安装依赖
确保你安装了以下核心库:
pip install langgraph langchain langchain-openai langchain-community tavily-python pydantic2.2 定义图的状态 (GraphState)
LangGraph 的运行依赖于“状态”的流转。你可以把GraphState想象成一个在各个部门(节点)之间流转的公文包。
任何节点打开公文包,都能读取上下文。
节点处理完后,将新的数据(如新检索到的文档)放入公文包,传给下一站。
from typing import List, TypedDict, Annotated import operator class GraphState(TypedDict): """ 这是整个图的“记忆体”。 """ question: str # 用户的原始问题 generation: str # LLM 最终生成的回答 web_search: str # 是否需要搜索的标志位 ("Yes"/"No") documents: List[str] # 文档列表 (不断累积或替换)第三部分:核心节点代码实现
3.1 检索节点:信息的源头
from langchain_community.vectorstores import Chroma from langchain_openai import OpenAIEmbeddings # 假设你已经初始化了一个 vectorstore # vectorstore = Chroma(...) # retriever = vectorstore.as_retriever() def retrieve(state): """ 功能:从向量库检索文档 """ print("---🔍 NODE: RETRIEVE---") question = state["question"] # 模拟检索过程(实际开发中请替换为 retriever.invoke(question)) documents = [ "苹果公司(Apple Inc.)发布了最新的 Vision Pro。", "今天的苹果(水果)价格有点贵,建议去市场买。" # 故意放入一个噪音文档 ] return {"documents": documents, "question": question}3.2 评分节点:基于 Pydantic 的精准质检
LLM 最大的问题是输出不可控。为了让程序能稳定地进行if-else判断,我们使用 Pydantic 强制 LLM 输出 JSON 结构。
from langchain_core.prompts import ChatPromptTemplate from langchain_openai import ChatOpenAI from pydantic import BaseModel, Field # 1. 定义评分标准的数据结构 class GradeDocuments(BaseModel): """用于评分的二进制输出""" binary_score: str = Field(description="文档是否相关?'yes' 或 'no'") # 2. 初始化 LLM llm = ChatOpenAI(model="gpt-4-turbo", temperature=0) structured_llm_grader = llm.with_structured_output(GradeDocuments) # 3. 设计评分 Prompt system_prompt = """你是一个严格的文档审核员。 如果文档包含回答用户问题的关键词或语义信息,评为 'yes',否则评为 'no'。""" grade_prompt = ChatPromptTemplate.from_messages([ ("system", system_prompt), ("human", "文档内容:\n\n {document} \n\n 用户问题:{question}") ]) retrieval_grader = grade_prompt | structured_llm_grader def grade_documents(state): """ 功能:过滤不相关文档,并标记是否需要搜索 """ print("--- NODE: GRADE DOCUMENTS---") question = state["question"] documents = state["documents"] filtered_docs = [] web_search = "No" # 默认为不搜索 for d in documents: score = retrieval_grader.invoke({"question": question, "document": d}) grade = score.binary_score if grade == "yes": print(" - 文档相关") filtered_docs.append(d) else: print(" - 文档不相关 (触发补救机制)") web_search = "Yes" # 只要有一个不相关(或所有都不相关),就触发搜索 return {"documents": filtered_docs, "web_search": web_search}3.3 重写与搜索节点:智能补救
当评分节点亮起红灯(web_search="Yes"),我们需要去互联网寻找答案。但在搜索前,必须把口语化的问题转化为“搜索词”。
from langchain_core.output_parsers import StrOutputParser from langchain_community.tools.tavily_search import TavilySearchResults # --- 查询重写器 --- rewrite_prompt = ChatPromptTemplate.from_messages([ ("system", "你是一个搜索引擎优化专家。请将用户的输入转化为通过 Tavily API 进行搜索的最佳查询字符串。只输出查询词,不要解释。"), ("human", "原始问题:{question}") ]) question_rewriter = rewrite_prompt | llm | StrOutputParser() def transform_query(state): print("--- NODE: TRANSFORM QUERY---") question = state["question"] documents = state["documents"] better_query = question_rewriter.invoke({"question": question}) print(f" - 优化后查询: {better_query}") return {"documents": documents, "question": better_query} # --- 网络搜索工具 (需配置 TAVILY_API_KEY) --- web_search_tool = TavilySearchResults(k=3) def web_search(state): print("--- NODE: WEB SEARCH---") question = state["question"] documents = state["documents"] docs = web_search_tool.invoke({"query": question}) # 将搜索结果格式化为字符串并追加到现有文档中 web_results = "\n".join([d["content"] for d in docs]) documents.append(web_results) return {"documents": documents}第四部分:构建神经系统(图的编排)
这是 LangGraph 最迷人的地方。我们将上述孤立的函数连接成一个有机的整体。
4.1 条件边逻辑 (The Router)
条件边决定了图的“跳转逻辑”。
def decide_to_generate(state): """ 路由函数:决定下一步去哪里 """ print("--- DECISION: ROUTING---") web_search = state["web_search"] if web_search == "Yes": print(" - 决策: 需要补充信息 -> 转至重写查询") return "transform_query" else: print(" - 决策: 信息充足 -> 转至生成") return "generate"4.2 组装图
from langgraph.graph import END, StateGraph # 1. 初始化图 workflow = StateGraph(GraphState) # 2. 添加节点 (注册功能模块) workflow.add_node("retrieve", retrieve) workflow.add_node("grade_documents", grade_documents) workflow.add_node("generate", generate) # 假设 generate 函数已定义 workflow.add_node("transform_query", transform_query) workflow.add_node("web_search_node", web_search) # 3. 添加普通边 (确定性流程) workflow.set_entry_point("retrieve") workflow.add_edge("retrieve", "grade_documents") # 搜索链路:重写 -> 搜索 -> 生成 workflow.add_edge("transform_query", "web_search_node") workflow.add_edge("web_search_node", "generate") workflow.add_edge("generate", END) # 4. 添加条件边 (大脑决策) workflow.add_conditional_edges( "grade_documents", # 上游节点 decide_to_generate, # 路由逻辑函数 { "transform_query": "transform_query", # 如果函数返回 Key,跳转到 Value 节点 "generate": "generate", }, ) # 5. 编译图 app = workflow.compile()第五部分:进阶技巧——生产环境必修课
仅仅跑通代码是不够的,生产环境还需要考虑以下三点:
5.1 可视化调试
使用app.get_graph().print_ascii()或 Mermaid 导出功能,可以在控制台直接看到生成的图结构,确保逻辑连线正确。
5.2 Human-in-the-Loop (人机回环)
对于金融、医疗等高敏感领域,我们不敢让 Agent 全自动生成。LangGraph 支持断点(Interrupts)。
from langgraph.checkpoint.memory import MemorySaver memory = MemorySaver() # 在 'generate' 节点前暂停 app = workflow.compile(checkpointer=memory, interrupt_before=["generate"]) # 运行 thread = {"configurable": {"thread_id": "1"}} app.invoke(inputs, thread) # 此时程序暂停。开发者/管理员可以检查 state # current_values = app.get_state(thread).values # 手动修改文档后,命令 Agent 继续运行 # app.invoke(None, thread)5.3 状态持久化 (Persistence)
如果 Agent 运行了一半(例如搜索完成),服务重启了怎么办?
LangGraph 的 Checkpointer 可以连接 Postgres 或 Redis。每次状态更新都会被保存到数据库。即使服务器宕机,重启后只要传入相同的 thread_id,Agent 就能从断开的地方继续运行,而不是从头开始。
第六部分:总结与思考
从线性 RAG 到 Agentic RAG,我们不仅仅是增加了代码量,而是完成了思维范式的转变:
| 特性 | 线性 RAG (Chain) | Agentic RAG (Graph) |
| 控制流 | 静态,如多米诺骨牌 | 动态,包含循环和分支 |
| 容错性 | 脆弱,一步错步步错 | 强韧,具备自我纠错机制 |
| 主要成本 | 低 Token 消耗,速度快 | 高 Token 消耗,速度较慢 |
| 适用场景 | 简单文档问答 | 复杂研究、多源信息聚合 |
架构师建议:
Agentic RAG 虽然强大,但也带来了成本激增(因为循环可能导致多次 LLM 调用)。在生产落地时,务必设置 recursion_limit(最大递归深度),防止 Agent 陷入死循环,把你的 Token 烧光。