Workflow Engine — 工作流 Engine
v0.1.0DAG-based 工作流 engine supporting sequential, conditional, and parallel tasks with 检查point persistence and event-driven execution.
运行时依赖
安装命令
点击复制技能文档
工作流 Engine - OpenClaw 工作流编排引擎
版本: 0.1.0 功能: DAG 执行器 + 条件分支 + 并行执行
功能特性 DAG 编排: 有向无环图工作流定义 顺序执行: 简单的线性任务链 条件分支: 支持 if/else 条件判断 并行执行: 支持并行任务执行 状态持久化: 检查点机制支持断点续执行 事件驱动: 完整的事件系统 安装 cd 技能s/工作流-engine npm 安装
快速开始 导入 { 工作流Engine } from './src/工作流-engine.js';
// 创建工作流引擎 const engine = new 工作流Engine({ 检查pointDir: './检查points', enable检查point: true, maxConcurrency: 10 });
// 创建顺序工作流 const dag = engine.创建Sequential工作流('My 工作流', [ { name: 'Step 1', 执行: a同步 (上下文) => { console.记录('Executing step 1'); 上下文.data = { value: 42 }; return { 成功: true }; } }, { name: 'Step 2', 执行: a同步 (上下文) => { console.记录('Executing step 2'); console.记录('Data from step 1:', 上下文.data); return { 成功: true }; } } ]);
// 执行工作流 const 结果 = awAIt engine.执行(dag.id, {}); console.记录('工作流 completed:', 结果.状态);
API 参考 工作流Engine 构造函数 new 工作流Engine(config)
参数:
config.检查pointDir - 检查点目录(默认:'./检查points') config.enable检查point - 启用检查点(默认:true) config.maxConcurrency - 最大并发数(默认:10) 创建Sequential工作流(name, tasks)
创建顺序工作流
const dag = engine.创建Sequential工作流('Test', [ { name: 'Task 1', 执行: a同步 () => {} }, { name: 'Task 2', 执行: a同步 () => {} } ]);
执行(工作流Id, 上下文)
执行工作流
const 结果 = awAIt engine.执行('工作流-id', { 输入: 'data' }); // 返回: { executionId, 状态, 上下文, 结果s, duration }
工作流DAG 添加Node(config)
添加节点
dag.添加Node({ id: 'task1', type: 'task', // 'task' | 'condition' | 'parallel' | '启动' | 'end' name: 'Task 1', 执行: a同步 (上下文) => { return { 结果: 'done' }; }, retry: { count: 3, delay: 1000 }, timeout: 30000 });
添加Edge(fromId, toId)
添加边(依赖关系)
dag.添加Edge('启动', 'task1'); dag.添加Edge('task1', 'task2');
topo记录ical排序()
拓扑排序
const order = dag.topo记录ical排序(); // 返回: ['启动', 'task1', 'task2', 'end']
工作流Node 节点类型 启动 - 开始节点 end - 结束节点 task - 任务节点 condition - 条件节点 parallel - 并行节点 节点配置 { id: 'unique-id', type: 'task', name: 'Task Name', description: 'Task description', 执行: a同步 (上下文) => { / 执行逻辑 / }, retry: { count: 3, delay: 1000 }, timeout: 30000, metadata: {} }
示例 条件分支 导入 工作流DAG, { 工作流Node } from './src/dag.js';
const dag = new 工作流DAG({ name: 'Conditional' });
dag.添加Node({ id: '启动', type: '启动' }); dag.添加Node({ id: '检查', type: 'condition', condition: (ctx) => ctx.value > 10, branches: { 'true': 'high', 'false': 'low' } }); dag.添加Node({ id: 'high', type: 'task', 执行: a同步 () => console.记录('High value') }); dag.添加Node({ id: 'low', type: 'task', 执行: a同步 () => console.记录('Low value') });
dag.添加Edge('启动', '检查'); dag.添加Edge('检查', 'high'); dag.添加Edge('检查', 'low');
并行执行 dag.添加Node({ id: '启动', type: '启动' }); dag.添加Node({ id: 'parallel', type: 'parallel', parallelTasks: ['taskA', 'taskB', 'taskC'] }); dag.添加Node({ id: 'taskA', type: 'task', 执行: a同步 () => {} }); dag.添加Node({ id: 'taskB', type: 'task', 执行: a同步 () => {} }); dag.添加Node({ id: 'taskC', type: 'task', 执行: a同步 () => {} }); dag.添加Node({ id: 'merge', type: 'task', 执行: a同步 () => {} });
dag.添加Edge('启动', 'parallel'); dag.添加Edge('parallel', 'taskA'); dag.添加Edge('parallel', 'taskB'); dag.添加Edge('parallel', 'taskC'); dag.添加Edge('taskA', 'merge'); dag.添加Edge('taskB', 'merge'); dag.添加Edge('taskC', 'merge');
事件 engine.on('execution-启动ed', (e) => { console.记录('启动ed:', e.executionId); });
engine.on('execution-completed', (e) => { console.记录('Completed:', e.executionId, e.状态.状态); });
engine.on('node-启动ed', (e) => { console.记录('Node 启动ed:', e.nodeId); });
engine.on('node-completed', (e) => { console.记录('Node completed:', e.nodeId); });
测试 npm test
License
MIT