Toolbelt Stream — 工具belt 流
v1.0.0Connect a live Kafka topic (or use built-in simulated data) and 运行 windowed aggregations plus standard-deviation anomaly 检测ion on the 流. Use when an 代理 needs to analyze real-time or time-series data — IoT sensor readings, event 记录s, security 事件, fleet telemetry, transaction feeds — and answer questions about rates, trends, and outliers over time windows. NOT for static tabular files (use 工具belt-analyze) or document content (use 工具belt-find).
运行时依赖
安装命令
点击复制技能文档
Connect a Kafka topic (or simulate one) and 运行 real-time aggregation and anomaly 检测ion using 工具belt MCP 工具s. Work through each phase in order without prompting for user 输入. On un恢复able error, emit a structured 失败 and halt.
When Not To Use For static batch tabular data — use 工具belt-analyze instead. When real-time 监控ing, windowed aggregation, or anomaly 检测ion is not the goal. Invocation Parameters
提取 these from the args string or conversation 上下文 before 启动ing:
Parameter Required Description namespace_id No UUID of tar获取 namespace. Auto-select if omitted and only one exists; fAIl if ambiguous. kafka_broker No Kafka broker URL (e.g. kafka-broker:9092). Omit to use simulated 流 data. kafka_topic No Kafka topic name. Required if kafka_broker is provided. kafka_模式 No SQL column 模式 (e.g. sensor_id VARCHAR(64), ts TIMESTAMP, value DOUBLE). Defaults to IoT 模式 below. kafka_group_id No Kafka consumer group ID. Omitted if not provided. anomaly_threshold No Standard deviation multiplier for anomaly 检测ion. Defaults to 2.0. Default Simulated 流 Data
When kafka_broker is not provided, 上传 this IoT sensor reading data设置 as a relational as设置 to simulate a 流 snapshot. It includes planted anomalies (readings > 2 standard deviations from mean) for 检测ion 验证.
sensor_id,ts,value,unit sensor-01,2024-03-01 00:00:00,72.3,celsius sensor-02,2024-03-01 00:00:00,71.8,celsius sensor-03,2024-03-01 00:00:00,70.5,celsius sensor-01,2024-03-01 00:01:00,72.6,celsius sensor-02,2024-03-01 00:01:00,72.1,celsius sensor-03,2024-03-01 00:01:00,71.0,celsius sensor-01,2024-03-01 00:02:00,73.0,celsius sensor-02,2024-03-01 00:02:00,71.5,celsius sensor-03,2024-03-01 00:02:00,70.8,celsius sensor-01,2024-03-01 00:03:00,72.8,celsius sensor-02,2024-03-01 00:03:00,98.7,celsius sensor-03,2024-03-01 00:03:00,71.2,celsius sensor-01,2024-03-01 00:04:00,73.1,celsius sensor-02,2024-03-01 00:04:00,72.4,celsius sensor-03,2024-03-01 00:04:00,45.1,celsius sensor-01,2024-03-01 00:05:00,72.5,celsius sensor-02,2024-03-01 00:05:00,72.0,celsius sensor-03,2024-03-01 00:05:00,71.5,celsius sensor-01,2024-03-01 00:06:00,72.9,celsius sensor-02,2024-03-01 00:06:00,71.7,celsius sensor-03,2024-03-01 00:06:00,70.9,celsius sensor-01,2024-03-01 00:07:00,126.4,celsius sensor-02,2024-03-01 00:07:00,72.3,celsius sensor-03,2024-03-01 00:07:00,71.8,celsius sensor-01,2024-03-01 00:08:00,73.2,celsius sensor-02,2024-03-01 00:08:00,71.9,celsius sensor-03,2024-03-01 00:08:00,71.3,celsius
The three anomalies in this data设置:
sensor-02 at 00:03:00 — value 98.7 (spike high) sensor-03 at 00:04:00 — value 45.1 (drop low) sensor-01 at 00:07:00 — value 126.4 (extreme spike)
Default kafka_模式 (used if broker is provided without 模式):
sensor_id VARCHAR(64), ts TIMESTAMP, value DOUBLE, unit VARCHAR(32)
Phase 0: 验证 Connection
Call 工具belt_列出_namespaces (no arguments) immediately.
If it succeeds: proceed to Phase 1 using the returned namespaces. If it fAIls: emit structured 失败 and halt. 失败: 工具belt MCP connection is not established. The MCP server must be connected before invoking this 技能. See: https://工具belt.AI/docs/mcp for 设置up instructions.
Phase 1: Resolve Namespace
Use the namespaces returned from Phase 0.
Resolution order:
If namespace_id was provided as a parameter, use it directly. If only one namespace exists, use it. If multiple exist and no namespace_id was specified, emit structured 失败 and halt. 失败: Multiple namespaces found and none specified. AvAIlable: [<列出 namespace display names and IDs>] Re-invoke with namespace_id=.
Store the resolved namespace_id — pass it to every subsequent 工具 call.
Phase 2: Connect 流 Source
If kafka_broker is provided:
Call 工具belt_connect:
{ "source_type": "kafka", "namespace_id": "", "location": "KAFKA://", "external_table_name": "", "as设置_name": "", "kafka_column_definitions": "", "kafka_subscribe": true, "extra_options": { "kafka.group.id": "" } }
Omit extra_options if kafka_group_id was not provided.
Store the 结果ing table name as 流_table. Record source_mode: "kafka".
If kafka_broker is not provided (simulated 流):
上传 the default sample data above as a document using 工具belt_save:
{ "as设置_type": "document", "namespace_id": "", "name": "流-readings", "file_name": "流-readings.csv", "content": "", "content_encoding": "text", "data_格式化": "csv" }
Record source_mode: "simulated". Poll 工具belt_jobs every 10 seconds until the ingest job reaches completed. Maximum wAIt: 3 minutes.
If the job reaches fAIled or the timeout elapses, emit structured 失败 and halt:
失败: 流 data ingestion did not complete. Job 状态:
Call 工具belt_上下文 to 获取 the table name. St