目标:带你用 LangChain(Python)把第 2 课的五层架构落地成可运行的 Agent Demo。完成后你能理解 LangChain 的核心抽象(Agent / Tools / Memory / Chain / Prompt),并能搭出一个最小可用的“自动写周报” Agent。
本课结构(便于快速查阅):
课程前提与环境准备
设计回顾:五层如何映射到 LangChain
最小可用 Agent 设计(行为规范)
代码实现(逐步解释)
测试与调试技巧
安全、审计与生产化建议
拓展练习
一、前提与环境准备
语言:Python 3.11+
依赖(建议虚拟环境):
langchain>=0.3.0
langgraph>=0.2.0
langchain-openai>=0.2.0
langchain-experimental>=0.3.0
DeepSeek 或其他 LLM SDK(取决于你用哪个模型)
python-dotenv、tiktoken、faiss-cpu(按需)
环境变量(示例):
DEEPSEEK_API_KEY=...
LLM_PROVIDER=deepseek
TESTING_MODE=true
安装(示例):
pip install -r requirements.txt
说明:LangChain 定位为"把 LLM 接入工具与记忆的框架",它并非唯一方案,但能把大部分工程实践抽象掉,适合教学与快速落地。在最新版本中,ReAct agent 已经迁移到 LangGraph 框架。
二、把五层映射到 LangChain
LLM 层:LangChain 中的 LLM wrappers(OpenAI、DeepSeek 等,通过 ChatOpenAI 接口)
工具层(Tools):langchain_core.tools.BaseTool(自定义函数封装为工具)
记忆层(Memory):langchain_community.chat_message_histories.ChatMessageHistory(ConversationBufferMemory 已废弃)
控制层(Controller / Orchestrator):langgraph.prebuilt.create_react_agent(替代旧版 AgentExecutor)
接入层:FastAPI / CLI / 企业 IM webhook(LangChain 不强制,但可与之集成)
要点:真正控制 Agent 行为的部分在于你如何定义 Tools、Prompt 与 Agent Type(tool-use agent、planner agent 等)。在 LangChain 1.2.0+ 中,ReAct agent 实现已迁移到 LangGraph。
三、最小可用 Agent 设计
功能需求(最小可用)
拉取本周任务清单(模拟:从本地 JSON 或 mock API)
汇总完成项 / 进行中 / 风险项
生成周报草稿(指定模板)
将草稿写入本地文件(dry-run 模式,人工确认后写入)
把结果交给用户确认(人工确认后可发送)
安全与工程约束:
所有写操作默认 dry-run,除非用户确认
所有操作记录审计日志(谁、何时、调用了哪个工具、输出是什么)
使用 LangGraph 替代旧版 AgentExecutor 实现控制逻辑
Agent 的核心能力在于把"自然语言需求"转为"结构化动作",比如 JSON:
{"action":"fetch_tasks","args": {...}}。四、代码实现(重点,分步解释)
下面给出简化但可运行的示例。注:示例已更新为 LangChain 1.2.0+ 版本,使用 LangGraph 实现 ReAct agent。
依赖导入与 LLM 初始化(支持 DeepSeek 举例)
""" 周报生成Agent实现 - 基于LangChain的五层架构 (更新版 - 使用LangGraph) """ import os from typing import Dict, Any, List from langchain_openai import ChatOpenAI from langchain_core.tools import BaseTool from langgraph.prebuilt import create_react_agent from memory.memory_manager import MemoryManager def create_weekly_report_agent(): """创建周报生成 Agent""" # LLM 初始化 - 使用环境变量检测是否为测试模式或特定LLM提供者 if os.getenv("TESTING_MODE", "false").lower() == "true": # 在测试模式下使用模拟LLM # 使用一个模拟的ChatOpenAI实例来 ensure compatibility with bind_tools from langchain_core.messages import HumanMessage, AIMessage from langchain_core.outputs import ChatGeneration, ChatResult from langchain_core.language_models import BaseChatModel from typing import List, Optional import json class MockChatModel(BaseChatModel): """Mock Chat Model for testing""" def _generate(self, messages, stop=None, run_manager=None, **kwargs): # Return a mock response based on the input content = "Mock response for testing" return ChatResult(generations=[ChatGeneration(message=AIMessage(content=content))]) @property def _llm_type(self): return "mock-chat" def bind_tools(self, tools, **kwargs): # Return self to allow chaining return self llm = MockChatModel() else: # 根据环境变量选择LLM提供者 llm_provider = os.getenv("LLM_PROVIDER", "openai").lower() if llm_provider == "deepseek": # 使用DeepSeek作为LLM提供者 (兼容OpenAI格式) llm = ChatOpenAI( model=os.getenv("DEEPSEEK_MODEL", "deepseek-chat"), base_url=os.getenv("DEEPSEEK_API_BASE", "https://api.deepseek.com"), api_key=os.getenv("DEEPSEEK_API_KEY"), temperature=float(os.getenv("LLM_TEMPERATURE", "0")) ) else: # 默认使用OpenAI llm = ChatOpenAI( temperature=float(os.getenv("LLM_TEMPERATURE", "0")), api_key=os.getenv("OPENAI_API_KEY") ) # 导入工具 from tools.task_fetcher import tool_fetch_tasks from tools.metrics_summarizer import tool_summarize from tools.report_writer import tool_write # 创建工具列表 tools = [tool_fetch_tasks, tool_summarize, tool_write] # 创建记忆管理器 memory_manager = MemoryManager() # 使用记忆管理器的短期记忆 memory = memory_manager.get_short_term_memory() # 创建 LangGraph ReAct agent agent = create_react_agent(llm, tools) # 返回一个包含 agent 和 memory_manager 的字典,便于后续访问 return { 'agent': agent, 'memory_manager': memory_manager }说明:在 LangChain 1.2.0+ 中,ReAct agent 创建方式已更新,使用langgraph.prebuilt.create_react_agent替代旧版的initialize_agent。
定义工具(Tools)
工具要保持"窄而稳"。我们实现三个工具:fetch_tasks、summarize_metrics、write_report(dry-run 写入本地真实写入需确认)。
# 模拟数据源 def fetch_tasks_from_mock(team: str, week: str) -> Dict[str, Any]: """模拟从任务系统获取任务数据""" # 实际应调用 Jira / Notion / API try: with open("sample6/mock_tasks.json", "r", encoding="utf-8") as f: data = json.load(f) # 这里简单返回过滤结果 tasks = [t for t in data if t["team"] == team and t["week"] == week] return {"tasks": tasks} except FileNotFoundError: # 如果没有mock文件,返回模拟数据 return { "tasks": [ { "id": "TASK-001", "title": "修复登录页面样式问题", "status": "completed", "assignee": "张三", "priority": "high" }, { "id": "TASK-002", "title": "优化数据库查询性能", "status": "in_progress", "assignee": "李四", "priority": "medium" }, { "id": "TASK-003", "title": "添加用户反馈功能", "status": "todo", "assignee": "王五", "priority": "low" } ] } # 把函数包装为 LangChain Tools class TaskFetchTool(BaseTool): name: str = "fetch_tasks" description: str = "按 team 和 week 获取任务列表,输入格式: {\"team\": \"团队名\", \"week\": \"周\"},返回 JSON 格式的任务列表" def _run(self, input_str: str) -> str: try: # 尝试解析为JSON,如果失败则尝试直接使用作为输入 try: input_data = json.loads(input_str) except json.JSONDecodeError: # 如果输入不是JSON格式,尝试解析为简单的键值对 # 这种情况可能发生在LangChain直接传递字符串而非JSON时 input_data = {"input": input_str} # 从输入数据中提取参数 team = input_data.get("team") week = input_data.get("week") # 如果直接获取不到,尝试从input字段中解析 if not team or not week: input_text = input_data.get("input", "") if isinstance(input_text, str): # 尝试从字符串中提取参数 import re # 匹配 team: value 或 "team": "value" 等格式 if not team: team_match = re.search(r'"?team"?\s*[:=]\s*"([^"]+)"', input_text) if not team_match: team_match = re.search(r'"?team"?\s*[:=]\s*(\w+)', input_text) if team_match: team = team_match.group(1) if not week: week_match = re.search(r'"?week"?\s*[:=]\s*"([^"]+)"', input_text) if not week_match: week_match = re.search(r'"?week"?\s*[:=]\s*(\w+)', input_text) if week_match: week = week_match.group(1) if not team or not week: # 如果仍然无法提取,返回错误信息 return json.dumps({ "error": f"无法从输入中提取team和week参数。期望格式: {{\"team\": \"团队名\", \"week\": \"周\"}},实际输入: {input_str}", "received_input": input_str }, ensure_ascii=False) result = fetch_tasks_from_mock(team, week) return json.dumps(result, ensure_ascii=False) except Exception as e: # 记录错误到审计日志 audit_logger.log_action( user_id="system", action="tool_execution", tool_name=self.name, inputs={"input_str": input_str}, outputs={"error": str(e)}, status="error" ) return json.dumps({"error": str(e)}) async def _arun(self, input_str: str): raise NotImplementedError("fetch_tasks does not support async") # 创建工具实例 tool_fetch_tasks = TaskFetchTool()说明:在 LangChain 1.2.0+ 中,工具定义需要继承BaseTool并实现_run方法,同时需要添加类型注解以避免 Pydantic 验证错误。
记忆层:ChatMessageHistory + 可选向量记忆(RAG)
""" 记忆层实现:包括短期记忆、长期记忆和记忆管理功能 """ import os from langchain_community.chat_message_histories import ChatMessageHistory from langchain_core.chat_history import BaseChatMessageHistory from langchain_core.runnables.history import RunnableWithMessageHistory from langchain_community.vectorstores import FAISS from langchain_openai import OpenAIEmbeddings from langchain_community.embeddings import FakeEmbeddings, HuggingFaceEmbeddings from langchain_openai import OpenAI import json from typing import Dict, List, Any class MemoryManager: """记忆管理器,整合短期记忆和长期记忆""" def __init__(self): # 短期记忆:聊天消息历史 self.short_term_memory = ChatMessageHistory() # 长期记忆:向量存储(这里使用简单的实现,实际应用中需要更复杂的管理) self.long_term_memory = None # 根据环境变量决定使用真实或模拟嵌入 if os.getenv("TESTING_MODE", "false").lower() == "true": # 在测试模式下使用模拟嵌入 from langchain_community.embeddings import FakeEmbeddings # 创建一个维度为10的模拟嵌入器 self.embeddings = FakeEmbeddings(size=10) elif os.getenv("EMBEDDING_PROVIDER", "openai").lower() == "huggingface": # 使用HuggingFace嵌入提供者(免费替代方案) self.embeddings = HuggingFaceEmbeddings() else: # 默认使用OpenAI嵌入 # Check if OPENAI_API_KEY is available, otherwise use a default api_key = os.getenv("OPENAI_API_KEY") if not api_key: # Use a fake embedding as fallback when no API key is provided from langchain_community.embeddings import FakeEmbeddings self.embeddings = FakeEmbeddings(size=10) else: self.embeddings = OpenAIEmbeddings(openai_api_key=api_key) # 审计日志 self.audit_log = [] def add_to_short_term(self, input_text: str, output_text: str): """添加对话到短期记忆""" self.short_term_memory.add_user_message(input_text) self.short_term_memory.add_ai_message(output_text) def get_short_term_memory(self): """获取短期记忆""" return self.short_term_memory def add_to_long_term(self, text: str, metadata: Dict[str, Any] = None): """添加信息到长期记忆""" if self.long_term_memory is None: # 初始化向量存储 self.long_term_memory = FAISS.from_texts([text], self.embeddings, metadatas=[metadata or {}]) else: # 添加到现有向量存储 self.long_term_memory.add_texts([text], metadatas=[metadata or {}]) def search_long_term(self, query: str, k: int = 3) -> List[str]: """从长期记忆中搜索相关信息""" if self.long_term_memory is None: return [] docs = self.long_term_memory.similarity_search(query, k=k) return [doc.page_content for doc in docs] def log_audit(self, user_id: str, action: str, tool_name: str, inputs: Dict, outputs: Dict): """记录审计日志""" log_entry = { "timestamp": str(__import__('datetime').datetime.now()), "user_id": user_id, "action": action, "tool_name": tool_name, "inputs": inputs, "outputs": outputs } self.audit_log.append(log_entry) # 保存到文件 with open("audit_log.json", "a", encoding="utf-8") as f: f.write(json.dumps(log_entry, ensure_ascii=False) + "\n") def get_audit_log(self) -> List[Dict]: """获取审计日志""" return self.audit_log说明:在 LangChain 1.2.0+ 中,ConversationBufferMemory已被废弃,使用ChatMessageHistory作为短期记忆的替代方案。
控制层:使用 LangGraph 实现
""" 控制层和接入层实现 包含工作流控制、安全检查和用户确认流程 """ from typing import Dict, Any import json from langchain_core.messages import HumanMessage class WorkflowController: """工作流控制器,负责控制Agent执行流程""" def __init__(self, agent): self.agent = agent self.max_steps = 8 # 最大步数限制 def execute_with_control(self, user_input: str) -> Dict[str, Any]: """执行带控制的Agent流程""" try: # 执行Agent - LangGraph expects input in the format {"messages": [...]} # Create initial messages list with the user input inputs = {"messages": [HumanMessage(content=user_input)]} # Invoke the LangGraph agent result = self.agent.invoke(inputs) # Extract the output from the result if isinstance(result, dict) and "messages" in result: # Get the last message as the result messages = result["messages"] if messages: last_message = messages[-1] # Get the last message result_text = last_message.content else: result_text = str(result) elif isinstance(result, dict): result_text = result.get("output", str(result)) else: result_text = str(result) # Record to memory manager (if exists) if hasattr(self.agent, 'memory_manager'): self.agent.memory_manager.add_to_short_term(user_input, result_text) return { "status": "success", "result": result_text, "steps_used": len(self._get_steps_from_agent()) if hasattr(self.agent, '_get_steps_from_agent') else "unknown" } except Exception as e: return { "status": "error", "error_message": str(e), "error_type": type(e).__name__ } def _get_steps_from_agent(self): """从Agent获取执行步骤(如果支持)""" # 这里是一个简化实现,实际可能需要根据具体的Agent类型获取步骤信息 return [] class UserConfirmationHandler: """用户确认处理器,处理需要人工确认的操作""" @staticmethod def prepare_confirmation_payload(agent_result: str) -> Dict[str, Any]: """准备确认载荷""" # 解析Agent结果,提取需要确认的信息 return { "draft_content": agent_result, "actions": ["write_report"], # 假设Agent建议执行写报告操作 } @staticmethod def confirm_and_execute(confirm_payload: Dict[str, Any], tools): """处理用户确认并执行操作""" if confirm_payload.get("confirmed", False): # 执行实际写入操作 if "write_report" in confirm_payload.get("actions", []): write_tool = tools.get("write_report") if write_tool: # 从载荷中获取写入参数 write_params = { "report_md": confirm_payload.get("draft_content", ""), "path": confirm_payload.get("output_path", "weekly_report.md"), "dry_run": False # 实际写入,非dry run } result = write_tool._run(json.dumps(write_params)) return json.loads(result) return {"status": "cancelled", "message": "用户未确认执行"}说明:在 LangChain 1.2.0+ 中,控制逻辑通过 LangGraph 实现,输入格式为{"messages": [HumanMessage(content=user_input)]}。
使用 Agent:一个典型交互流程
""" 自动写周报 Agent - 基于 LangChain 的五层架构实现 """ import os from agents.weekly_report_agent import create_weekly_report_agent from utils.controller import WorkflowController, UserConfirmationHandler import json from dotenv import load_dotenv # 加载环境变量 load_dotenv() def main(): print("自动写周报 Agent 启动中...") # 显示当前使用的LLM提供者 llm_provider = os.getenv("LLM_PROVIDER", "openai").lower() print(f"使用 LLM 提供者: {llm_provider}") # 创建 Agent agent_components = create_weekly_report_agent() agent = agent_components['agent'] # 创建工作流控制器 controller = WorkflowController(agent) # 示例输入 user_input = """ 请帮我生成公司后端组 2025-W50 的周报草稿: 包含完成项、进行中项、风险与下周计划 调用任务系统获取任务(team=backend, week=2025-W50) 拉取关键指标(DAU、错误率) 先做 dry-run,生成草稿并给出建议,等待我确认是否写入 """ print("用户输入:", user_input) print("\n正在生成周报...") # 执行带控制的流程 result = controller.execute_with_control(user_input) if result["status"] == "success": print("周报生成成功!") print("结果:", result["result"]) # 准备用户确认 confirmation_payload = UserConfirmationHandler.prepare_confirmation_payload(result["result"]) print("\n需要您的确认:") print(f"草稿内容预览: {confirmation_payload['draft_content'][:200]}...") # 模拟用户确认 print("\n用户已确认,执行实际写入...") final_result = UserConfirmationHandler.confirm_and_execute( {**confirmation_payload, "confirmed": True}, {"write_report": __import__('tools.report_writer').tool_write} ) print("最终结果:", final_result) else: print("周报生成失败:", result["error_message"]) if __name__ == "__main__": main()说明:在实际运行中,Agent 会调用工具(fetch_tasks、summarize_metrics、write_report),并把输出记录到 memory。LangGraph agent 的输入格式与旧版有所不同,需要使用正确的格式。
人工确认与实际写入
Agent 生成草稿后,我们在外部(UI/CLI)提示用户确认。如果确认,则调用 write_report 的 dry_run=False:
confirm_payload = { "report_md": "<agent 生成的 markdown 字符串>", "path": "weekly_backend_2025-W50.md", "dry_run": False } # 直接调用工具(或通过 agent 执行特定 action) write_result = tool_write._run(json.dumps(confirm_payload)) print(write_result)这种做法可以强制区分"建议"与"执行",是工程上常见的安全模式。
五、测试与调试技巧
本地 mock 是关键:先用 mock_tasks.json、mock BI 函数调通流程,再接真正 API。
打开 LLM 输出日志:温度设为 0 或 0.2 提高稳定性;在开发调高温度可看更多候选结果。
增量构建 Tools:先只实现 fetch_tasks + generate_report(不写文件),保证逻辑,再加入写工具、确认流程。
加入观测(Observability):每次工具调用记录到日志(时间、输入、输出、调用者)。
防死循环:max_iterations + 检测"没有新操作"或"重复操作超过阈值"。
测试模式:使用 TESTING_MODE=true 环境变量启用模拟 LLM,避免消耗真实 API 调用。
六、安全、审计与生产化建议(必读)
权限边界:所有写权限需基于用户身份与审批策略。写操作默认 dry-run。
审计日志:记录每一步的 LLM 响应、工具输入输出、执行者(系统/用户)、时间戳。
白名单工具:在生产中只暴露有限工具集合,禁止 arbitrary code execution(除非有严格审计)。
速率与并发限制:防止自动化任务造成系统冲击或滥用。
回滚策略:写操作应能回滚或保留版本(如写入 Notion 之前先创建草稿副本)。
人机协同界面:在关键点提示用户(例如:发送给主管前再确认),并展示差异/摘要。
API 密钥管理:使用环境变量存储敏感信息,避免硬编码。
七、常见进阶与拓展(建议练习)
使用 VectorDB(Chroma/FAISS)将历史周报与模板作为长期记忆,做 RAG 提升准确性。
把 Agent 拆成两个角色:PlanAgent(做任务分解)和 ExecAgent(调用工具执行),实现多 Agent 协作。
为 fetch_tasks 添加增量查询(只查询自上次周报以来的新任务),并保留任务状态到短期记忆。
在写入前做自动合规检查 Agent(检查是否包含敏感信息或承诺)。
将 Agent 托管为后端服务(FastAPI),并对接企业 IM(飞书/Slack)作为接入层,支持异步执行与通知。
练习题(强烈建议动手):
用 mock 数据把上面 Demo 跑通,生成一个 markdown 周报文件并人工确认写入。
改造工具,使 fetch_tasks 真实调用你常用的 issue tracker(Jira/Notion/GitHub)。
为 Agent 增加"任务失败重试 + 降级策略"(比如 metrics 查询失败时用上次缓存的数据)。
八、小结
LangChain 把 LLM、Tools、Memory、LangGraph Agent 抽象成工程化构件,便于把"说话的模型"变成"会做事的智能体"。
工程核心不在于模型本身,而在于:如何设计窄而稳的工具、如何保存并检索记忆、如何在控制层避免失控、如何通过接入层安全地进入业务流程。
在 LangChain 1.2.0+ 中,ReAct agent 已迁移到 LangGraph 框架,API 有所变化但核心理念不变。
实践建议:先做 mock + dry-run,再逐步放开真实读写权限;保持日志与人工确认点。