前端 + AI 进阶学习路线|Week 11-12:智能工作流前端
Day 13:工作流执行监控
学习时间:2026年1月6日(星期二)
关键词:工作流执行、DAG 遍历、实时状态、进度条、日志面板、执行引擎
📁 项目文件结构
day13-workflow-execution/
├── src/
│ ├── components/
│ │ ├── WorkflowCanvas.jsx # 复用 Day 12 画布(带状态)
│ │ ├── ExecutionPanel.jsx # 执行控制 + 日志面板
│ │ └── NodeTypes/ # 复用 Day 12 节点(带状态样式)
│ │ ├── AINode.jsx
│ │ ├── InputNode.jsx
│ │ └── OutputNode.jsx
│ ├── lib/
│ │ └── workflowEngine.js # 工作流执行引擎
│ └── App.jsx # 主应用集成
└── public/
✅ 本日核心:让可视化工作流“动起来”——实时执行 + 状态监控
🎯 今日学习目标
- 实现 工作流执行引擎(基于 DAG 拓扑排序)
- 显示 节点实时状态(待运行 / 运行中 / 成功 / 失败)
- 构建 执行日志面板 与 整体进度条
- 支持 手动触发执行 与 结果查看
💡 为什么需要执行监控?
可视化只是第一步,用户更关心:
- “我的工作流跑起来了吗?”
- “哪一步卡住了?为什么失败?”
- “需要多久才能完成?”
✅ 执行可视化 是工作流系统的核心价值,让自动化过程透明可控
📚 核心设计思路
| 功能 | 实现方式 |
|---|---|
| DAG 遍历 | 拓扑排序 + 并行/串行执行策略 |
| 节点状态 | 扩展节点数据:{ status: 'idle' | 'running' | 'success' | 'error' } |
| 日志收集 | 执行过程中收集 console.log + 自定义日志 |
| 进度计算 | (已完成节点数 / 总节点数) * 100 |
⚠️ 注意:前端执行适合轻量任务(如 API 调用),重计算应交给后端
🔧 动手实践:构建工作流执行监控系统
步骤 1:创建项目并复用 Day 12 组件
npx create-react-app day13-workflow-execution
cd day13-workflow-execution
npm install reactflow
# 复制 Day 12 的 components/ 和 lib/
步骤 2:扩展现有节点以支持状态
// src/components/NodeTypes/AINode.jsx(更新版)
import { Handle, Position } from 'reactflow';const statusColors = {idle: '#52c41a',running: '#13c2c2',success: '#52c41a',error: '#ff4d4f'
};const statusLabels = {idle: '● 待运行',running: '● 运行中',success: '✅ 成功',error: '❌ 失败'
};export default function AINode({ data }) {const status = data.status || 'idle';const color = statusColors[status];return (<div style={{background: status === 'error' ? '#fff2f0' : '#f6ffed',border: `2px solid ${color}`,borderRadius: '8px',padding: '12px 16px',minWidth: '160px',boxShadow: '0 2px 8px rgba(0,0,0,0.1)',position: 'relative'}}><div style={{ fontWeight: '600', color: color,marginBottom: '4px',display: 'flex',justifyContent: 'space-between',alignItems: 'center'}}>🤖 AI 模型<span style={{ fontSize: '12px', fontWeight: 'normal' }}>{statusLabels[status]}</span></div><div style={{ fontSize: '12px', color: '#666', wordBreak: 'break-word' }}>{data.label || 'LLM 分析'}</div><Handle type="target" position={Position.Left} /><Handle type="source" position={Position.Right} /></div>);
}
💡
InputNode.jsx和OutputNode.jsx按相同逻辑更新status样式
步骤 3:创建工作流执行引擎
// src/lib/workflowEngine.js
/*** 模拟工作流执行(真实场景替换为 API 调用)* @param {Object} node - 节点对象* @param {Object} input - 输入数据* @returns {Promise<Object>} - 输出数据*/
const executeNode = async (node, input) => {await new Promise(resolve => setTimeout(resolve, 1000 + Math.random() * 2000)); // 模拟耗时// 模拟不同节点行为if (node.type === 'input') {return { data: input || '用户输入内容' };}if (node.type === 'ai') {// 模拟 AI 调用const responses = ['✅ AI 分析完成:检测到 3 个关键实体','✅ AI 生成摘要:文本长度 120 字','❌ AI 调用失败:模型超时','✅ AI 翻译完成:中 → 英'];const response = responses[Math.floor(Math.random() * responses.length)];if (response.includes('❌')) {throw new Error('模型调用失败');}return { data: response };}if (node.type === 'output') {return { data: `最终输出: ${input}` };}return { data: '处理完成' };
};/*** 工作流执行器* @param {Array} nodes - 节点列表* @param {Array} edges - 边列表* @param {function} onUpdateNode - (nodeId, status, output) => void* @param {function} onLog - (message) => void*/
export const executeWorkflow = async ({ nodes, edges, onUpdateNode, onLog }) => {// 构建邻接表和入度表(用于拓扑排序)const graph = new Map();const inDegree = new Map();nodes.forEach(node => {graph.set(node.id, []);inDegree.set(node.id, 0);});edges.forEach(edge => {graph.get(edge.source).push(edge.target);inDegree.set(edge.target, inDegree.get(edge.target) + 1);});// 拓扑排序(Kahn 算法)const queue = [];const results = new Map(); // nodeId -> output// 初始化:入度为 0 的节点入队nodes.forEach(node => {if (inDegree.get(node.id) === 0) {queue.push(node.id);}});// 执行队列while (queue.length > 0) {// 并行执行当前层级所有节点const currentLevel = [...queue];queue.length = 0;const promises = currentLevel.map(async (nodeId) => {const node = nodes.find(n => n.id === nodeId);onUpdateNode(nodeId, 'running');onLog(`▶ 开始执行节点: ${node.data?.label || nodeId}`);try {// 获取输入(来自前驱节点)const inputEdges = edges.filter(e => e.target === nodeId);let input = null;if (inputEdges.length > 0) {// 简化:取第一个前驱的输出const sourceId = inputEdges[0].source;input = results.get(sourceId)?.data;}const output = await executeNode(node, input);results.set(nodeId, output);onUpdateNode(nodeId, 'success');onLog(`✅ 节点 ${nodeId} 执行成功`);} catch (error) {onUpdateNode(nodeId, 'error');onLog(`❌ 节点 ${nodeId} 执行失败: ${error.message}`);throw error; // 中止整个工作流}// 更新后继节点入度const neighbors = graph.get(nodeId);neighbors?.forEach(neighbor => {const newDegree = inDegree.get(neighbor) - 1;inDegree.set(neighbor, newDegree);if (newDegree === 0) {queue.push(neighbor);}});});// 等待当前层级全部完成await Promise.all(promises);}return results;
};
步骤 4:创建执行控制面板
// src/components/ExecutionPanel.jsx
import { useState, useRef } from 'react';
import { executeWorkflow } from '../lib/workflowEngine';export default function ExecutionPanel({ nodes, edges, onUpdateNode,onResetNodes
}) {const [isExecuting, setIsExecuting] = useState(false);const [logs, setLogs] = useState([]);const [progress, setProgress] = useState(0);const logContainerRef = useRef(null);const handleExecute = async () => {if (isExecuting) return;setIsExecuting(true);setLogs([]);setProgress(0);onResetNodes(); // 重置所有节点状态try {await executeWorkflow({nodes,edges,onUpdateNode: (nodeId, status, output) => {onUpdateNode(nodeId, status, output);// 更新进度const completedNodes = nodes.filter(n => ['success', 'error'].includes(n.data?.status)).length;setProgress(Math.round((completedNodes / nodes.length) * 100));},onLog: (message) => {setLogs(prev => [...prev, `${new Date().toLocaleTimeString()} ${message}`]);// 自动滚动到底部setTimeout(() => {if (logContainerRef.current) {logContainerRef.current.scrollTop = logContainerRef.current.scrollHeight;}}, 10);}});setLogs(prev => [...prev, '🎉 工作流执行完成!']);} catch (error) {setLogs(prev => [...prev, `💥 工作流执行失败: ${error.message}`]);} finally {setIsExecuting(false);}};const clearLogs = () => {setLogs([]);};return (<div style={{padding: '16px',borderTop: '1px solid #e8e8e8',backgroundColor: '#fff',width: '100%',display: 'flex',flexDirection: 'column',gap: '12px'}}>{/* 执行控制 */}<div style={{ display: 'flex', gap: '12px' }}><buttononClick={handleExecute}disabled={isExecuting}style={{padding: '8px 16px',backgroundColor: isExecuting ? '#b0b0b0' : '#52c41a',color: 'white',border: 'none',borderRadius: '4px',cursor: isExecuting ? 'not-allowed' : 'pointer',fontSize: '14px',fontWeight: '500'}}>{isExecuting ? '🔄 执行中...' : '▶ 执行工作流'}</button><buttononClick={onResetNodes}disabled={isExecuting}style={{padding: '8px 16px',backgroundColor: '#f0f0f0',border: '1px solid #d9d9d9',borderRadius: '4px',cursor: 'pointer',fontSize: '14px'}}>🔄 重置</button></div>{/* 进度条 */}<div><div style={{ display: 'flex', justifyContent: 'space-between', fontSize: '12px', color: '#666',marginBottom: '4px'}}><span>执行进度</span><span>{progress}%</span></div><div style={{height: '6px',backgroundColor: '#f0f0f0',borderRadius: '3px',overflow: 'hidden'}}><divstyle={{height: '100%',width: `${progress}%`,backgroundColor: progress === 100 ? '#52c41a' : '#1890ff',transition: 'width 0.3s ease'}}/></div></div>{/* 日志面板 */}<div><div style={{ display: 'flex', justifyContent: 'space-between', alignItems: 'center',marginBottom: '8px'}}><h4 style={{ margin: 0, fontSize: '14px' }}>📝 执行日志</h4><buttononClick={clearLogs}style={{background: 'none',border: 'none',color: '#1890ff',fontSize: '12px',cursor: 'pointer'}}>清空</button></div><divref={logContainerRef}style={{height: '120px',padding: '8px',backgroundColor: '#f9f9f9',borderRadius: '4px',border: '1px solid #e8e8e8',overflowY: 'auto',fontSize: '12px',fontFamily: 'monospace'}}>{logs.map((log, idx) => (<div key={idx} style={{ marginBottom: '4px',whiteSpace: 'pre-wrap'}}>{log}</div>))}{logs.length === 0 && (<div style={{ color: '#888', fontStyle: 'italic' }}>点击“执行工作流”查看日志</div>)}</div></div></div>);
}
步骤 5:在 App.jsx 中集成执行逻辑
// src/App.jsx
import React, { useState } from 'react';
import WorkflowCanvas from './components/WorkflowCanvas';
import ControlsPanel from './components/ControlsPanel';
import ExecutionPanel from './components/ExecutionPanel';function App() {const [nodes, setNodes] = useState([]);const [edges, setEdges] = useState([]);// 更新单个节点状态const updateNodeStatus = (nodeId, status, output) => {setNodes(prev => prev.map(node => node.id === nodeId ? { ...node, data: { ...node.data, status, output } } : node));};// 重置所有节点状态const resetNodes = () => {setNodes(prev => prev.map(node => ({...node,data: {...node.data,status: 'idle',output: null}})));};return (<div style={{ fontFamily: 'Inter, -apple-system, sans-serif',width: '100vw',height: '100vh',display: 'flex',flexDirection: 'column'}}><div style={{ display: 'flex', flex: 1, overflow: 'hidden' }}><ControlsPanel /><WorkflowCanvas nodes={nodes}edges={edges}onNodesChange={setNodes}onEdgesChange={setEdges}/></div><ExecutionPanel nodes={nodes}edges={edges}onUpdateNode={updateNodeStatus}onResetNodes={resetNodes}/></div>);
}export default App;
💡 WorkflowCanvas.jsx 需更新以接收
nodes/edges作为 props(而非内部状态)
✅ 效果验证
- ✅ 点击“▶ 执行工作流” → 节点状态从“待运行” → “运行中” → “成功/失败”
- ✅ 进度条实时更新(0% → 100%)
- ✅ 日志面板显示执行时间戳 + 详细信息
- ✅ 失败节点显示红色边框 + “❌ 失败”标签
- ✅ 点击“🔄 重置” → 所有节点恢复“待运行”状态
🤔 思考与延伸
-
真实 AI 集成:如何替换
executeNode为真实 Ollama/OpenAI 调用?
→ 在executeNode中调用streamAIResponse(参考 Day 4) -
并行优化:如何最大化并行度?
→ 拓扑排序天然支持同层级并行,可限制并发数 -
结果导出:如何保存最终输出?
→ 在执行完成后,将results存入 localStorage 或下载 JSON
💡 为 Day 14 做准备:执行结果可作为“工作流模板”的输入示例
📅 明日预告
Day 14:工作流模板市场
- 实现工作流模板保存/导入
- 构建模板分享与发现界面
- 支持一键应用模板到画布
✍️ 小结
今天,我们让静态的工作流“活”了起来!通过执行引擎与实时监控,用户可清晰看到每一步的状态与日志,真正掌握自动化流程。可观测性,是复杂系统可用性的基石。
💬 实践提示:真实项目中,建议将执行日志按节点分组,并支持筛选/搜索。欢迎分享你的工作流执行监控界面!