运行时依赖
安装命令
点击复制技能文档
工作流编排器 使用DAG执行、分支和错误处理构建和运行多代理工作流。
快速开始 从orchestrator导入Workflow、Step、Branch、Runner wf = Workflow("数据管道") wf.add_step(Step("获取", 代理="scraper", 动作="获取URL", 参数={"url": "https://example.com"})) wf.add_step(Step("提取", 代理="parser", 动作="提取文本", 依赖于=["获取"])) wf.add_step(Step("总结", 代理="writer", 动作="总结", 依赖于=["提取"])) wf.add_step(Step("翻译", 代理="writer", 动作="翻译", 依赖于=["提取"])) wf.add_step(Step("发布", 代理="publisher", 动作="发送", 依赖于=["总结", "翻译"])) runner = Runner() result = runner.execute(wf)
DAG执行模型 获取 → 提取 → 总结 → 发布 → 翻译 ↗ 满足依赖的步骤并行运行。发布步骤等待总结和翻译。
步骤定义 Step( 名称="唯一步骤名称", 代理="代理ID", 动作="工具名称", 参数={}, 依赖于=[], 重试=3, 超时秒=300, 失败时="跳过", 备用步骤="计划B", 条件="$.获取.状态 == 200" )
功能 并行执行:满足依赖的步骤并发运行 条件分支:JSONPath条件确定哪些分支执行 重试和退避:可配置的重试次数和指数退避 超时处理:超时的步骤被杀死并根据失败时处理 备用步骤:当主步骤失败时运行替代步骤 实时状态:在执行过程中的任何时候查询工作流状态 错误传播:配置失败是否冒泡或被包含
监控状态 status = runner.status(工作流ID) # {"运行": 2, "完成": 3, "失败": 0, "等待": 1}