首页龙虾技能列表 › Trading Bot Fleet Management: Unified Control for Multi-Bot Operations — 技能工具

Trading Bot Fleet Management: Unified Control for Multi-Bot Operations — 技能工具

v1.3.0

Trading Bot Fleet Management: Unified Control for Multi-Bot Operations. Build a fleet management layer for 10+ trading bots with per-bot identity isolation,...

0· 68·0 当前·0 累计
by @mirni·MIT-0
下载技能包
License
MIT-0
最后更新
2026/4/14
安全扫描
VirusTotal
无害
查看报告
OpenClaw
安全
high confidence
This is an instruction-only guide for managing trading-bot fleets that is internally consistent with its stated purpose; the only minor inconsistency is that the guide lists an AGENT_SIGNING_KEY credential while the registry metadata does not declare required env vars.
评估建议
This appears to be a legitimate educational guide for fleet management. Before installing or using it: 1) Don't paste production trading API keys or withdrawal-capable credentials into example code; use the GreenHelix sandbox or throwaway keys. 2) Clarify the metadata mismatch: the SKILL.md lists AGENT_SIGNING_KEY but the registry metadata shows no required env vars — confirm whether the agent will try to read an environment variable or prompt for the key. 3) Review all provided code snippets be...
详细分析 ▾
用途与能力
The name and description claim a fleet-management guide for trading bots and the SKILL.md contains architecture explanation, classes, and Python/curl examples against the GreenHelix sandbox — which matches the stated purpose. Requesting a per-bot Ed25519 signing key (AGENT_SIGNING_KEY) is coherent with identity isolation use cases described.
指令范围
This is an instructional guide with code examples and API calls to the named GreenHelix sandbox. The guide warns it is educational and that you supply credentials in your environment. There are no install steps, no directives in the header to read arbitrary system files, and no apparent instructions to collect or exfiltrate unrelated data.
安装机制
No install spec or code files are provided (instruction-only). That minimizes risk because nothing will be written to disk or automatically executed by the installer.
凭证需求
The SKILL.md declares a credential (AGENT_SIGNING_KEY) which is plausible and proportionate for per-bot identity signing. However, the registry metadata lists 'Required env vars: none' while the SKILL.md includes 'credentials: [AGENT_SIGNING_KEY]' and instructs the user to supply it — this mismatch should be clarified before trusting the skill with secrets.
持久化与权限
The skill is not always-enabled and uses default agent invocation settings; it does not request elevated persistence or modify other skills or system-wide settings.
安全有层次,运行前请审查代码。

License

MIT-0

可自由使用、修改和再分发,无需署名。

运行时依赖

无特殊依赖

版本

latestv1.3.02026/4/12

Version 1.3.0 - Updated guide to reference the GreenHelix sandbox, enabling 500 free credits and no API key required for initial use. - Clarified that only `AGENT_SIGNING_KEY` is needed in credentials (removed `SSH_DEPLOY_KEY`). - Added instructions and notices related to the sandbox environment for code examples. - Minor copy and formatting improvements in the introductory sections.

● 无害

安装命令 点击复制

官方npx clawhub@latest install greenhelix-trading-bot-fleet-management
镜像加速npx clawhub@latest install greenhelix-trading-bot-fleet-management --registry https://cn.clawhub-mirror.com

技能文档

Notice: This is an educational guide with illustrative code examples.
It does not execute code or install dependencies.
All examples use the GreenHelix sandbox (https://sandbox.greenhelix.net) which
provides 500 free credits — no API key required to get started.
>
Referenced credentials (you supply these in your own environment):
- AGENT_SIGNING_KEY: Cryptographic signing key for agent identity (Ed25519 key pair for request signing)

In March 2023, the Step Finance exploit drained $45M from Solana DeFi positions. The root cause was not a smart contract vulnerability or a novel cryptographic attack. It was operational: multiple bots shared API keys with no identity isolation. When one bot's credentials leaked, the attacker gained access to every bot, every exchange account, every withdrawal endpoint. No per-bot permissions. No unified health dashboard that would have caught anomalous behavior. No kill switch scoped to the compromised bot without killing the entire fleet. The team had to shut down everything, losing millions in unrealized positions across healthy bots while they figured out which one was compromised. This pattern repeats across the industry. Teams running 10+ bots accumulate the same structural debt: shared credentials passed through environment variables, no per-bot permission boundaries, health checks that are either absent or scattered across shell scripts nobody maintains, deployments that are "SSH in and restart the process," and cost tracking that lives in a spreadsheet updated monthly if someone remembers. When one bot goes rogue -- whether from a bug, a compromised key, or a strategy that hits an edge case -- the blast radius is the entire fleet. This guide builds a fleet management layer using GreenHelix's identity, messaging, and metrics tools. Each bot gets its own Ed25519 cryptographic identity, scoped permissions defining exactly what it can and cannot do, real-time health monitoring with automatic dead bot detection, coordinated deployment procedures, SLA tracking against defined performance targets, and per-bot cost allocation. Every concept comes with working Python code and equivalent curl commands against the GreenHelix API.

What You'll Learn

  • Chapter 1: Fleet Architecture
  • Chapter 2: FleetManager Class
  • Chapter 3: BotIdentityManager Class
  • Chapter 4: Permission Scoping
  • Chapter 5: FleetHealthMonitor Class
  • Chapter 6: Coordinated Deployments
  • Chapter 7: SLA Tracking
  • Chapter 8: Cost Allocation
  • What's Next

Full Guide

# Trading Bot Fleet Management: Unified Control for Multi-Bot Operations

In March 2023, the Step Finance exploit drained $45M from Solana DeFi positions. The root cause was not a smart contract vulnerability or a novel cryptographic attack. It was operational: multiple bots shared API keys with no identity isolation. When one bot's credentials leaked, the attacker gained access to every bot, every exchange account, every withdrawal endpoint. No per-bot permissions. No unified health dashboard that would have caught anomalous behavior. No kill switch scoped to the compromised bot without killing the entire fleet. The team had to shut down everything, losing millions in unrealized positions across healthy bots while they figured out which one was compromised. This pattern repeats across the industry. Teams running 10+ bots accumulate the same structural debt: shared credentials passed through environment variables, no per-bot permission boundaries, health checks that are either absent or scattered across shell scripts nobody maintains, deployments that are "SSH in and restart the process," and cost tracking that lives in a spreadsheet updated monthly if someone remembers. When one bot goes rogue -- whether from a bug, a compromised key, or a strategy that hits an edge case -- the blast radius is the entire fleet.

This guide builds a fleet management layer using GreenHelix's identity, messaging, and metrics tools. Each bot gets its own Ed25519 cryptographic identity, scoped permissions defining exactly what it can and cannot do, real-time health monitoring with automatic dead bot detection, coordinated deployment procedures, SLA tracking against defined performance targets, and per-bot cost allocation. Every concept comes with working Python code and equivalent curl commands against the GreenHelix API.


Table of Contents


Chapter 1: Fleet Architecture

Why Fleet Management Matters

The Step Finance breach is instructive because it was not exotic. The attack surface was not a zero-day in a cryptographic library. It was a predictable consequence of how most teams operate trading bots at scale: every bot shares the same API key, the same exchange credentials, the same infrastructure account. This is the "monolith credentials" antipattern, and it shows up in every post-mortem of operational trading failures that did not involve a market event.

Consider what a team with 15 bots typically looks like six months after launch:

  • Shared credentials: All bots read the same .env file or Kubernetes secret. Rotating one key means touching every bot.
  • No permission boundaries: A bot designed to execute $500 momentum trades on Binance Spot has the same access as a bot managing $50,000 Deribit options positions. If the momentum bot is compromised, the attacker can withdraw from the options account.
  • Scattered health checks: Bot #3 has a heartbeat endpoint. Bot #7 writes to a log file. Bots #1, #2, #4-6, and #8-15 have no health reporting at all. The team discovers a dead bot when a strategy stops producing PnL.
  • Manual deployments: Updating a strategy requires SSH-ing into each server, pulling the latest code, restarting the process, and hoping the bot reconnects to the exchange websocket cleanly. There is no rollback procedure beyond "check out the previous git commit and restart again."
  • No cost attribution: The team knows the monthly AWS bill and the total exchange fees. They do not know which bot costs the most to operate, which strategy has a negative ROI after infrastructure costs, or whether a bot that trades 200 times per day is actually profitable after accounting for exchange fees, API rate limit costs, and compute.

Fleet management solves all of these by treating each bot as an independently identified, independently monitored, independently permissioned entity within a unified control plane.

Architecture Overview

The fleet management architecture has four layers:

+-----------------------------------------------------------------------+
|                        Fleet Operator (Human)                          |
|                  Strategic decisions, policy, budgets                   |
+-----------------------------------------------------------------------+
        |
        v
+-----------------------------------------------------------------------+
|                        Fleet Manager (Agent)                           |
|  register/deregister bots, issue commands, aggregate status            |
|  GreenHelix identity: fleet-manager-{org}                              |
+-----------------------------------------------------------------------+
        |
        +--------------------+--------------------+
        |                    |                    |
        v                    v                    v
+---------------+   +---------------+   +---------------+
|  Bot Group:   |   |  Bot Group:   |   |  Bot Group:   |
|  Spot Arb     |   |  Perp Momentum|   |  Options MM   |
|  3 bots       |   |  5 bots       |   |  4 bots       |
+---------------+   +---------------+   +---------------+
    |   |   |          |  |  |  |  |       |  |  |  |
    v   v   v          v  v  v  v  v       v  v  v  v
  Individual bots, each with:
  - Own Ed25519 identity
  - Scoped permissions
  - Health heartbeat
  - Metrics reporting
  - Cost tracking

The fleet operator is a human who sets policy: which strategies to run, on which exchanges, with what risk limits, and how much capital to allocate. The fleet manager is a GreenHelix agent that translates policy into operations: registering bots, issuing commands, monitoring health, orchestrating deployments. Bot groups organize bots by strategy type, and individual bots are the execution units, each with its own cryptographic identity.

GreenHelix Tools Used

This guide uses the following GreenHelix tools:

ToolPurpose
register_agentCreate identity for fleet manager and each bot
get_agent_identityRetrieve and verify bot identity
submit_metricsReport health, PnL, latency, trade count
get_sla_complianceMonitor bots against defined SLA targets
send_messageFleet commands, alerts, inter-bot communication
register_webhookReal-time event delivery for health alerts
search_agents_by_metricsFind underperforming bots across the fleet
get_agent_reputationTrack bot reliability over time
create_event_schemaDefine fleet event types
publish_eventEmit fleet events (deploy, failover, alert)

Fleet Hierarchy

The hierarchy maps to GreenHelix's identity model. The fleet manager is a registered agent. Each bot group is a metadata tag. Each individual bot is a registered agent with metadata linking it to its group and to the fleet manager.

# Hierarchy expressed in GreenHelix metadata
fleet_manager_metadata = {
    "role": "fleet_manager",
    "organization": "your-org-id",
    "fleet_size": 12,
    "groups": ["spot-arb", "perp-momentum", "options-mm"]
}

bot_metadata = { "role": "trading_bot", "fleet_manager": "fleet-manager-{org}", "group": "spot-arb", "exchange": "binance", "strategy": "cross-exchange-arb", "version": "2.4.1", "max_position_usd": 10000 }

This metadata is not cosmetic. It is queryable. When the fleet manager needs to find all bots in the "perp-momentum" group, it searches by metadata. When a health alert fires, the alert includes the bot's group and strategy so the operator knows immediately what type of bot is failing and what the potential market impact is.

Why Not Kubernetes Labels or Consul?

You might already use Kubernetes labels, Consul service tags, or Terraform metadata for infrastructure management. Those are fine for infrastructure concerns -- pod scheduling, service discovery, load balancing. They are not sufficient for trading bot fleet management because they operate at the wrong abstraction level. Kubernetes knows that a pod is running. It does not know that the pod is a trading bot with a $50,000 position limit on Deribit, that its Ed25519 key was last rotated 28 days ago, or that its SLA requires 99.9% uptime with sub-100ms exchange latency.

GreenHelix's identity layer operates at the application level. Each bot's identity carries its trading-specific metadata: strategy type, exchange, position limits, permission tier, key version. The fleet manager queries this metadata through GreenHelix's API, not through infrastructure tooling. This means the fleet management layer works identically whether your bots run on Kubernetes, bare metal, EC2 instances, or a mix of all three. The infrastructure is abstracted away; the fleet identity is portable.

The practical consequence: when you migrate from EC2 to Kubernetes (or vice versa), no fleet management code changes. The bots register the same identities, report the same metrics, and respond to the same commands regardless of the underlying compute platform.


Chapter 2: FleetManager Class

Central Management for Bot Fleet Operations

The FleetManager class is the control plane for all fleet operations. It handles bot registration, inventory management, fleet-wide commands, and status aggregation. Every operation goes through the GreenHelix API so that commands, status, and events are centralized and auditable.

Setup

import requests
import json
import time
import hashlib
from datetime import datetime, timedelta
from typing import Dict, List, Optional
from dataclasses import dataclass, field

base_url = "https://api.greenhelix.net/v1" api_key = "your-api-key" # From GreenHelix dashboard

session = requests.Session() session.headers["Authorization"] = f"Bearer {api_key}" session.headers["Content-Type"] = "application/json"

def execute(tool: str, inputs: dict) -> dict: """Execute a GreenHelix tool and return the result.""" resp = session.post( f"{base_url}/v1", json={"tool": tool, "input": inputs} ) resp.raise_for_status() return resp.json()

Equivalent curl for any execute call throughout this guide:

curl -X POST https://sandbox.greenhelix.net/v1 \
  -H "Authorization: Bearer $API_KEY" \
  -H "Content-Type: application/json" \
  -d '{
    "tool": "register_agent",
    "input": {
      "name": "fleet-manager-acme",
      "description": "Fleet manager for Acme Trading bot fleet",
      "capabilities": ["fleet_management", "bot_orchestration"],
      "metadata": {"role": "fleet_manager", "organization": "acme"}
    }
  }'

The FleetManager Class

@dataclass
class BotRecord:
    """Local record of a registered bot."""
    agent_id: str
    name: str
    group: str
    exchange: str
    strategy: str
    version: str
    status: str = "active"
    registered_at: str = ""
    last_heartbeat: Optional[str] = None

class FleetManager: """Central management class for a trading bot fleet."""

def __init__(self, org_id: str): self.org_id = org_id self.manager_agent_id: Optional[str] = None self.bots: Dict[str, BotRecord] = {} self._register_manager()

def _register_manager(self): """Register the fleet manager agent on GreenHelix.""" result = execute("register_agent", { "name": f"fleet-manager-{self.org_id}", "description": f"Fleet manager for {self.org_id} trading bot fleet. " f"Handles registration, commands, health, deployments.", "capabilities": [ "fleet_management", "bot_orchestration", "health_monitoring", "deployment_coordination" ], "metadata": { "role": "fleet_manager", "organization": self.org_id } }) self.manager_agent_id = result["agent_id"] print(f"Fleet manager registered: {self.manager_agent_id}")

def register_bot(self, name: str, group: str, exchange: str, strategy: str, version: str, capabilities: List[str] = None, max_position_usd: float = 10000) -> str: """Register a new bot in the fleet.""" if capabilities is None: capabilities = ["trading", "metrics_reporting"]

result = execute("register_agent", { "name": name, "description": f"Trading bot: {strategy} on {exchange}. " f"Part of {group} group in {self.org_id} fleet.", "capabilities": capabilities, "metadata": { "role": "trading_bot", "fleet_manager": self.manager_agent_id, "group": group, "exchange": exchange, "strategy": strategy, "version": version, "max_position_usd": max_position_usd, "organization": self.org_id } })

agent_id = result["agent_id"] self.bots[agent_id] = BotRecord( agent_id=agent_id, name=name, group=group, exchange=exchange, strategy=strategy, version=version, registered_at=datetime.utcnow().isoformat() ) print(f"Bot registered: {name} ({agent_id})") return agent_id

def deregister_bot(self, agent_id: str): """Remove a bot from the fleet.""" if agent_id not in self.bots: raise ValueError(f"Bot {agent_id} not found in fleet")

# Notify the bot to shut down gracefully execute("send_message", { "from_agent_id": self.manager_agent_id, "to_agent_id": agent_id, "message_type": "command", "payload": { "command": "shutdown", "reason": "deregistered_from_fleet", "grace_period_seconds": 30 } })

bot = self.bots.pop(agent_id) print(f"Bot deregistered: {bot.name} ({agent_id})")

def pause_all(self, reason: str = "operator_initiated"): """Pause all bots in the fleet. Bots stop opening new positions.""" for agent_id, bot in self.bots.items(): execute("send_message", { "from_agent_id": self.manager_agent_id, "to_agent_id": agent_id, "message_type": "command", "payload": { "command": "pause", "reason": reason, "timestamp": datetime.utcnow().isoformat() } }) bot.status = "paused" print(f"Fleet paused: {len(self.bots)} bots ({reason})")

def resume_all(self): """Resume all paused bots.""" for agent_id, bot in self.bots.items(): if bot.status == "paused": execute("send_message", { "from_agent_id": self.manager_agent_id, "to_agent_id": agent_id, "message_type": "command", "payload": { "command": "resume", "timestamp": datetime.utcnow().isoformat() } }) bot.status = "active" print(f"Fleet resumed: {len(self.bots)} bots")

def emergency_stop(self, reason: str = "emergency"): """Emergency stop: close all positions, cancel all orders, halt.""" for agent_id, bot in self.bots.items(): execute("send_message", { "from_agent_id": self.manager_agent_id, "to_agent_id": agent_id, "message_type": "command", "payload": { "command": "emergency_stop", "reason": reason, "actions": [ "cancel_all_open_orders", "close_all_positions_market", "halt_trading_loop" ], "timestamp": datetime.utcnow().isoformat() } }) bot.status = "stopped"

# Log the emergency stop event execute("publish_event", { "agent_id": self.manager_agent_id, "event_type": "fleet.emergency_stop", "payload": { "reason": reason, "bots_affected": len(self.bots), "bot_ids": list(self.bots.keys()), "timestamp": datetime.utcnow().isoformat() } }) print(f"EMERGENCY STOP: {len(self.bots)} bots halted ({reason})")

def get_fleet_status(self) -> Dict: """Aggregate status across all bots.""" status_counts = {"active": 0, "paused": 0, "stopped": 0, "dead": 0} group_counts = {}

for bot in self.bots.values(): status_counts[bot.status] = status_counts.get(bot.status, 0) + 1 group_counts[bot.group] = group_counts.get(bot.group, 0) + 1

return { "fleet_manager": self.manager_agent_id, "total_bots": len(self.bots), "status": status_counts, "groups": group_counts, "timestamp": datetime.utcnow().isoformat() }

def send_group_command(self, group: str, command: str, payload: dict = None): """Send a command to all bots in a specific group.""" targets = [ (aid, bot) for aid, bot in self.bots.items() if bot.group == group ] for agent_id, bot in targets: execute("send_message", { "from_agent_id": self.manager_agent_id, "to_agent_id": agent_id, "message_type": "command", "payload": { "command": command, (payload or {}), "timestamp": datetime.utcnow().isoformat() } }) print(f"Command '{command}' sent to {len(targets)} bots in {group}")

Registering a Fleet

fleet = FleetManager(org_id="acme-trading")

# Spot arbitrage group for i in range(3): fleet.register_bot( name=f"spot-arb-{i+1:02d}", group="spot-arb", exchange="binance" if i < 2 else "okx", strategy="cross-exchange-arb", version="2.4.1", max_position_usd=10000 )

# Perpetual futures momentum group for i in range(5): exchanges = ["binance", "bybit", "okx", "deribit", "binance"] fleet.register_bot( name=f"perp-momentum-{i+1:02d}", group="perp-momentum", exchange=exchanges[i], strategy="trend-following-perps", version="1.8.3", max_position_usd=25000 )

# Options market making group for i in range(4): fleet.register_bot( name=f"options-mm-{i+1:02d}", group="options-mm", exchange="deribit", strategy="delta-neutral-mm", version="3.1.0", capabilities=["trading", "metrics_reporting", "options_greeks"], max_position_usd=50000 )

print(json.dumps(fleet.get_fleet_status(), indent=2))

# curl: Register a single bot
curl -X POST https://sandbox.greenhelix.net/v1 \
  -H "Authorization: Bearer $API_KEY" \
  -H "Content-Type: application/json" \
  -d '{
    "tool": "register_agent",
    "input": {
      "name": "spot-arb-01",
      "description": "Trading bot: cross-exchange-arb on binance. Part of spot-arb group.",
      "capabilities": ["trading", "metrics_reporting"],
      "metadata": {
        "role": "trading_bot",
        "fleet_manager": "fleet-manager-agent-id",
        "group": "spot-arb",
        "exchange": "binance",
        "strategy": "cross-exchange-arb",
        "version": "2.4.1",
        "max_position_usd": 10000
      }
    }
  }'

# curl: Send emergency stop to a bot curl -X POST https://sandbox.greenhelix.net/v1 \ -H "Authorization: Bearer $API_KEY" \ -H "Content-Type: application/json" \ -d '{ "tool": "send_message", "input": { "from_agent_id": "fleet-manager-agent-id", "to_agent_id": "bot-agent-id", "message_type": "command", "payload": { "command": "emergency_stop", "reason": "anomalous_behavior_detected", "actions": ["cancel_all_open_orders", "close_all_positions_market", "halt_trading_loop"] } } }'

Fleet Command Patterns

Three command scopes cover all operational scenarios:

Fleet-wide: pause_all, resume_all, emergency_stop. Used for market-wide events (flash crash, exchange outage, security incident). These hit every bot regardless of group.

Group-scoped: send_group_command. Used for strategy-specific actions: updating parameters for all momentum bots, pausing all options bots before an expiry event, or upgrading all arb bots to a new version.

Individual: Direct send_message to a single bot. Used for surgical interventions: adjusting one bot's position limits, forcing one bot to close a specific position, or rotating one bot's exchange API keys.

The command hierarchy means that a compromised spot arb bot can be killed without touching the options market makers. That is the entire point: blast radius containment through identity isolation.


Chapter 3: BotIdentityManager Class

Per-Bot Cryptographic Identity

Every bot in the fleet gets its own Ed25519 keypair. This is not optional. Shared keys are how Step Finance happened. Per-bot keys mean that compromising one bot's key gives the attacker access to exactly one bot. The fleet manager can revoke that single identity without affecting any other bot.

Ed25519 was chosen for three reasons: it produces compact 64-byte signatures, signing is fast enough that it adds negligible latency to trading operations (tens of microseconds), and the key generation is deterministic from a seed, making backup and recovery straightforward.

The BotIdentityManager Class

import nacl.signing
import nacl.encoding
import base64
import secrets
import os
from typing import Tuple

class BotIdentityManager: """Manages Ed25519 identities for trading bots."""

def __init__(self, fleet_manager_id: str, key_store_path: str = "/secure/keys"): self.fleet_manager_id = fleet_manager_id self.key_store_path = key_store_path self.identities: Dict[str, dict] = {} # agent_id -> identity record

def generate_keypair(self, bot_name: str) -> Tuple[str, str]: """Generate an Ed25519 keypair for a bot. Returns (public_key_b64, private_key_b64). """ signing_key = nacl.signing.SigningKey.generate() verify_key = signing_key.verify_key

private_b64 = base64.b64encode( signing_key.encode() ).decode("utf-8") public_b64 = base64.b64encode( verify_key.encode() ).decode("utf-8")

# Store private key securely -- in production, use a secrets manager key_path = os.path.join(self.key_store_path, f"{bot_name}.key") os.makedirs(os.path.dirname(key_path), exist_ok=True) with open(key_path, "w") as f: f.write(private_b64) os.chmod(key_path, 0o600) # Owner read/write only

return public_b64, private_b64

def register_identity(self, bot_name: str, group: str, exchange: str, strategy: str, version: str, permissions: List[str] = None) -> dict: """Generate keys and register identity on GreenHelix.""" public_key, private_key = self.generate_keypair(bot_name)

if permissions is None: permissions = ["trade", "report_metrics"]

# Register the agent with its public key result = execute("register_agent", { "name": bot_name, "description": f"Trading bot: {strategy} on {exchange}", "capabilities": permissions, "metadata": { "role": "trading_bot", "fleet_manager": self.fleet_manager_id, "group": group, "exchange": exchange, "strategy": strategy, "version": version, "public_key": public_key, "key_algorithm": "ed25519", "registered_at": datetime.utcnow().isoformat() } })

agent_id = result["agent_id"] identity_record = { "agent_id": agent_id, "bot_name": bot_name, "public_key": public_key, "group": group, "exchange": exchange, "permissions": permissions, "created_at": datetime.utcnow().isoformat(), "key_version": 1, "status": "active" } self.identities[agent_id] = identity_record return identity_record

def verify_identity(self, agent_id: str) -> dict: """Retrieve and verify a bot's identity from GreenHelix.""" result = execute("get_agent_identity", { "agent_id": agent_id })

local_record = self.identities.get(agent_id) if local_record: # Verify public key matches what we registered remote_key = result.get("metadata", {}).get("public_key") if remote_key != local_record["public_key"]: raise SecurityError( f"Public key mismatch for {agent_id}. " f"Expected: {local_record['public_key'][:16]}... " f"Got: {remote_key[:16] if remote_key else 'None'}... " f"Possible key tampering." ) return result

def rotate_key(self, agent_id: str) -> dict: """Rotate a bot's Ed25519 keypair without downtime.

The rotation procedure: 1. Generate new keypair 2. Update GreenHelix identity with new public key 3. Bot continues operating -- it picks up the new key on next heartbeat 4. Old key is archived (not deleted) for signature verification of historical events """ if agent_id not in self.identities: raise ValueError(f"No identity record for {agent_id}")

record = self.identities[agent_id] bot_name = record["bot_name"]

# Archive old key old_key_path = os.path.join( self.key_store_path, f"{bot_name}.key.v{record['key_version']}" ) current_key_path = os.path.join( self.key_store_path, f"{bot_name}.key" ) if os.path.exists(current_key_path): os.rename(current_key_path, old_key_path)

# Generate new keypair new_public, new_private = self.generate_keypair(bot_name)

# Update the identity on GreenHelix -- submit metrics indicating rotation execute("submit_metrics", { "agent_id": agent_id, "metrics": { "key_rotation": 1, "key_version": record["key_version"] + 1, "rotation_timestamp": datetime.utcnow().isoformat() } })

# Notify the bot to pick up the new key execute("send_message", { "from_agent_id": self.fleet_manager_id, "to_agent_id": agent_id, "message_type": "command", "payload": { "command": "rotate_key", "new_public_key": new_public, "key_version": record["key_version"] + 1, "effective_at": datetime.utcnow().isoformat() } })

record["public_key"] = new_public record["key_version"] += 1 print(f"Key rotated for {bot_name}: v{record['key_version']}") return record

def revoke_identity(self, agent_id: str, reason: str): """Revoke a compromised bot's identity.

This is the nuclear option: the bot can no longer authenticate. Use when a bot is confirmed compromised. """ if agent_id not in self.identities: raise ValueError(f"No identity record for {agent_id}")

record = self.identities[agent_id]

# Send shutdown command before revocation execute("send_message", { "from_agent_id": self.fleet_manager_id, "to_agent_id": agent_id, "message_type": "command", "payload": { "command": "emergency_stop", "reason": f"identity_revoked: {reason}", "actions": [ "cancel_all_open_orders", "close_all_positions_market", "halt_trading_loop", "destroy_local_keys" ] } })

# Publish revocation event for audit trail execute("publish_event", { "agent_id": self.fleet_manager_id, "event_type": "fleet.identity_revoked", "payload": { "revoked_agent_id": agent_id, "bot_name": record["bot_name"], "reason": reason, "revoked_at": datetime.utcnow().isoformat(), "key_version_revoked": record["key_version"] } })

# Delete the private key key_path = os.path.join( self.key_store_path, f"{record['bot_name']}.key" ) if os.path.exists(key_path): # Overwrite with random data before unlinking with open(key_path, "wb") as f: f.write(secrets.token_bytes(64)) os.unlink(key_path)

record["status"] = "revoked" print(f"Identity revoked: {record['bot_name']} ({agent_id}): {reason}")

def list_active_identities(self) -> List[dict]: """List all active bot identities.""" return [ r for r in self.identities.values() if r["status"] == "active" ]

# curl: Get a bot's identity
curl -X POST https://sandbox.greenhelix.net/v1 \
  -H "Authorization: Bearer $API_KEY" \
  -H "Content-Type: application/json" \
  -d '{
    "tool": "get_agent_identity",
    "input": {
      "agent_id": "bot-agent-id-here"
    }
  }'

# curl: Notify bot of key rotation via message curl -X POST https://sandbox.greenhelix.net/v1 \ -H "Authorization: Bearer $API_KEY" \ -H "Content-Type: application/json" \ -d '{ "tool": "send_message", "input": { "from_agent_id": "fleet-manager-agent-id", "to_agent_id": "bot-agent-id", "message_type": "command", "payload": { "command": "rotate_key", "new_public_key": "base64-encoded-new-public-key", "key_version": 2, "effective_at": "2026-04-07T14:30:00Z" } } }'

Key Rotation Without Downtime

Key rotation is the most operationally sensitive identity operation. The bot must continue trading during rotation -- you cannot afford a gap where the bot has no valid key. The procedure is:

  • Generate new keypair on the fleet manager side.
  • Archive the old private key (do not delete -- you need it to verify signatures on historical events).
  • Update GreenHelix with the new public key via metrics submission.
  • Notify the bot* via send_message with the new key material.
  • The bot picks up the new key, switches to it, and acknowledges.

Between steps 4 and 5, the bot is still signing with the old key, and that is fine. The old key is still valid until the bot confirms the switch. There is no window where the bot has no valid signing key.

A sane rotation schedule is every 30 days for production bots, immediately upon any suspected compromise, and after any personnel change on the team (someone leaves, all keys rotate).

Identity Revocation for Compromised Bots

Revocation is destructive and immediate. When you confirm a bot is compromised, you do not rotate -- you revoke. The difference: rotation preserves continuity (the bot keeps running with a new key), while revocation terminates the bot's ability to operate entirely.

The revocation procedure overwrites the private key file with random data before deleting it. A simple os.unlink() leaves the key material on disk until the filesystem overwrites those blocks. Writing random data first ensures the key is unrecoverable without forensic disk analysis, and even that becomes unreliable on SSDs with wear leveling.


Chapter 4: Permission Scoping

Principle of Least Privilege for Trading Bots

A spot arb bot that compares prices across exchanges needs read access to two order books and the ability to place limit orders on two exchanges. It does not need withdrawal permissions. It does not need the ability to modify account settings. It does not need access to the options chain on Deribit. Every permission beyond what the bot needs is attack surface.

The principle of least privilege states that each bot should have the minimum set of permissions required to execute its strategy and nothing more. This is straightforward in theory and consistently ignored in practice because it requires upfront work: defining permission tiers, mapping strategies to permissions, and enforcing boundaries.

Permission Tiers

Four tiers cover the range of trading bot operations:

from enum import Enum
from typing import Set

class PermissionTier(Enum): READ_ONLY = "read_only" TRADE = "trade" WITHDRAW = "withdraw" ADMIN = "admin"

# What each tier can do TIER_PERMISSIONS: Dict[PermissionTier, Set[str]] = { PermissionTier.READ_ONLY: { "read_orderbook", "read_positions", "read_balances", "read_trade_history", "report_metrics", "receive_commands", }, PermissionTier.TRADE: { "read_orderbook", "read_positions", "read_balances", "read_trade_history", "report_metrics", "receive_commands", "place_order", "cancel_order", "modify_order", }, PermissionTier.WITHDRAW: { "read_orderbook", "read_positions", "read_balances", "read_trade_history", "report_metrics", "receive_commands", "place_order", "cancel_order", "modify_order", "withdraw_funds", "transfer_between_accounts", }, PermissionTier.ADMIN: { "read_orderbook", "read_positions", "read_balances", "read_trade_history", "report_metrics", "receive_commands", "place_order", "cancel_order", "modify_order", "withdraw_funds", "transfer_between_accounts", "modify_api_keys", "modify_account_settings", "register_sub_accounts", }, }

Per-Exchange Permission Mapping

Different exchanges have different permission models, but they all support the same core concept: API keys with scoped permissions. The PermissionManager maps abstract tiers to exchange-specific permission sets.

class PermissionManager:
    """Maps permission tiers to exchange-specific settings."""

EXCHANGE_PERMISSION_MAP = { "binance": { PermissionTier.READ_ONLY: { "enableReading": True, "enableSpotAndMarginTrading": False, "enableWithdrawals": False, "enableFutures": False, }, PermissionTier.TRADE: { "enableReading": True, "enableSpotAndMarginTrading": True, "enableWithdrawals": False, "enableFutures": True, }, PermissionTier.WITHDRAW: { "enableReading": True, "enableSpotAndMarginTrading": True, "enableWithdrawals": True, "enableFutures": True, "withdrawalAddressWhitelist": True, }, }, "deribit": { PermissionTier.READ_ONLY: { "scope": "read", }, PermissionTier.TRADE: { "scope": "trade:read", }, PermissionTier.WITHDRAW: { "scope": "trade:read:withdraw", }, }, "okx": { PermissionTier.READ_ONLY: { "perm": "read_only", "trade": False, "withdraw": False, }, PermissionTier.TRADE: { "perm": "trade", "trade": True, "withdraw": False, }, PermissionTier.WITHDRAW: { "perm": "trade", "trade": True, "withdraw": True, "ip_whitelist_required": True, }, }, }

def __init__(self, fleet_manager_id: str): self.fleet_manager_id = fleet_manager_id self.bot_permissions: Dict[str, dict] = {}

def assign_permissions(self, agent_id: str, bot_name: str, exchange: str, tier: PermissionTier, custom_restrictions: dict = None) -> dict: """Assign a permission tier to a bot and record it.""" base_permissions = TIER_PERMISSIONS[tier] exchange_config = self.EXCHANGE_PERMISSION_MAP.get( exchange, {} ).get(tier, {})

record = { "agent_id": agent_id, "bot_name": bot_name, "exchange": exchange, "tier": tier.value, "abstract_permissions": list(base_permissions), "exchange_config": exchange_config, "custom_restrictions": custom_restrictions or {}, "assigned_at": datetime.utcnow().isoformat(), } self.bot_permissions[agent_id] = record

# Record permission assignment on GreenHelix execute("submit_metrics", { "agent_id": agent_id, "metrics": { "permission_tier": tier.value, "permission_count": len(base_permissions), "exchange": exchange, "has_withdraw": "withdraw_funds" in base_permissions, "has_admin": tier == PermissionTier.ADMIN, } })

# Publish audit event execute("publish_event", { "agent_id": self.fleet_manager_id, "event_type": "fleet.permission_assigned", "payload": { "target_agent_id": agent_id, "bot_name": bot_name, "tier": tier.value, "exchange": exchange, "permissions": list(base_permissions), } })

return record

def check_permission(self, agent_id: str, action: str) -> bool: """Check if a bot has permission for an action.""" record = self.bot_permissions.get(agent_id) if not record: return False return action in record["abstract_permissions"]

def escalate_permission(self, agent_id: str, new_tier: PermissionTier, reason: str, approved_by: str): """Escalate a bot's permissions. Requires explicit approval.""" record = self.bot_permissions.get(agent_id) if not record: raise ValueError(f"No permission record for {agent_id}")

old_tier = record["tier"]

# Log the escalation for audit execute("publish_event", { "agent_id": self.fleet_manager_id, "event_type": "fleet.permission_escalated", "payload": { "target_agent_id": agent_id, "bot_name": record["bot_name"], "old_tier": old_tier, "new_tier": new_tier.value, "reason": reason, "approved_by": approved_by, "escalated_at": datetime.utcnow().isoformat(), } })

# Update the record new_permissions = TIER_PERMISSIONS[new_tier] record["tier"] = new_tier.value record["abstract_permissions"] = list(new_permissions) record["exchange_config"] = self.EXCHANGE_PERMISSION_MAP.get( record["exchange"], {} ).get(new_tier, {})

print(f"Permission escalated for {record['bot_name']}: " f"{old_tier} -> {new_tier.value} (reason: {reason})")

def get_fleet_permission_summary(self) -> Dict: """Summary of permissions across the fleet.""" tier_counts = {} withdraw_enabled = [] admin_enabled = []

for agent_id, record in self.bot_permissions.items(): tier = record["tier"] tier_counts[tier] = tier_counts.get(tier, 0) + 1 if "withdraw_funds" in record["abstract_permissions"]: withdraw_enabled.append(record["bot_name"]) if record["tier"] == "admin": admin_enabled.append(record["bot_name"])

return { "total_bots": len(self.bot_permissions), "tiers": tier_counts, "withdraw_enabled": withdraw_enabled, "admin_enabled": admin_enabled, }

Cross-Bot Permission Isolation

Permission isolation is enforced at two levels. First, at the GreenHelix level: each bot has its own agent ID, and GreenHelix tools scope operations to the authenticated agent. Bot A cannot submit metrics as Bot B. Bot A cannot read Bot B's messages. This is identity-level isolation.

Second, at the exchange level: each bot should use its own exchange API key with permissions matching its tier. Do not share exchange API keys across bots. This is the lesson from Step Finance -- even if you have perfect identity isolation at the fleet management layer, shared exchange keys collapse the isolation at the point that matters most.

# Assign permissions to the fleet
perm_mgr = PermissionManager(fleet_manager_id=fleet.manager_agent_id)

# Spot arb bots: TRADE tier (no withdrawals) for agent_id, bot in fleet.bots.items(): if bot.group == "spot-arb": perm_mgr.assign_permissions( agent_id=agent_id, bot_name=bot.name, exchange=bot.exchange, tier=PermissionTier.TRADE, custom_restrictions={ "max_order_size_usd": 5000, "allowed_pairs": ["BTC/USDT", "ETH/USDT", "SOL/USDT"], } )

# Options MM bots: TRADE tier with additional restrictions for agent_id, bot in fleet.bots.items(): if bot.group == "options-mm": perm_mgr.assign_permissions( agent_id=agent_id, bot_name=bot.name, exchange=bot.exchange, tier=PermissionTier.TRADE, custom_restrictions={ "max_order_size_usd": 25000, "allowed_instruments": ["BTC--C", "BTC--P", "ETH--C", "ETH--P"], "max_delta_exposure": 0.5, } )

print(json.dumps(perm_mgr.get_fleet_permission_summary(), indent=2))

The key insight: no bot in this fleet has withdrawal permissions. Withdrawals should only be executed by a dedicated withdrawal agent that is not connected to exchange websockets and has its own approval workflow (ideally requiring human confirmation). A compromised trading bot that cannot withdraw can, at worst, make bad trades. A compromised bot with withdrawal access can empty the exchange account.


Chapter 5: FleetHealthMonitor Class

Real-Time Health Across All Bots

Health monitoring is the difference between discovering a dead bot immediately and discovering it three days later when someone notices a strategy stopped making money. For a fleet of 12 bots, you need centralized health that answers three questions in real time: is each bot alive, is each bot performing within expectations, and are there any bots that need intervention?

Metrics Collected

Each bot reports the following metrics on every heartbeat:

MetricTypeDescription
uptime_secondsgaugeTime since last restart
heartbeat_latency_msgaugeRound-trip time to exchange websocket
pnl_usdgaugeUnrealized + realized PnL since start of day
drawdown_pctgaugeCurrent drawdown from peak equity
trade_countcounterNumber of trades executed since start of day
open_positionsgaugeNumber of currently open positions
open_ordersgaugeNumber of currently open orders
memory_mbgaugeProcess memory usage
cpu_pctgaugeProcess CPU usage
error_countcounterNumber of errors since last heartbeat

The FleetHealthMonitor Class

class FleetHealthMonitor:
    """Monitors health across all bots in the fleet."""

def __init__(self, fleet_manager_id: str, bots: Dict[str, BotRecord], heartbeat_interval_seconds: int = 30, dead_threshold_seconds: int = 120): self.fleet_manager_id = fleet_manager_id self.bots = bots self.heartbeat_interval = heartbeat_interval_seconds self.dead_threshold = dead_threshold_seconds self.health_records: Dict[str, dict] = {} self.alert_callbacks: List[callable] = [] self._setup_webhooks()

def _setup_webhooks(self): """Register webhooks for real-time health alerts.""" execute("register_webhook", { "agent_id": self.fleet_manager_id, "event_types": [ "fleet.heartbeat", "fleet.health_alert", "fleet.bot_dead" ], "url": "https://your-fleet-dashboard.example.com/webhooks/health", "secret": "webhook-signing-secret" })

def record_heartbeat(self, agent_id: str, metrics: dict): """Record a heartbeat from a bot.""" now = datetime.utcnow().isoformat()

# Submit metrics to GreenHelix execute("submit_metrics", { "agent_id": agent_id, "metrics": { *metrics, "heartbeat_timestamp": now, } })

self.health_records[agent_id] = { "agent_id": agent_id, "bot_name": self.bots[agent_id].name if agent_id in self.bots else "unknown", "last_heartbeat": now, "metrics": metrics, "status": self._evaluate_health(agent_id, metrics), }

def _evaluate_health(self, agent_id: str, metrics: dict) -> str: """Evaluate bot health from metrics. Returns status string.""" issues = []

# Check drawdown if metrics.get("drawdown_pct", 0) > 10: issues.append(f"high_drawdown:{metrics['drawdown_pct']:.1f}%") elif metrics.get("drawdown_pct", 0) > 5: issues.append(f"elevated_drawdown:{metrics['drawdown_pct']:.1f}%")

# Check latency if metrics.get("heartbeat_latency_ms", 0) > 5000: issues.append(f"high_latency:{metrics['heartbeat_latency_ms']}ms") elif metrics.get("heartbeat_latency_ms", 0) > 1000: issues.append(f"elevated_latency:{metrics['heartbeat_latency_ms']}ms")

# Check errors if metrics.get("error_count", 0) > 10: issues.append(f"high_errors:{metrics['error_count']}") elif metrics.get("error_count", 0) > 3: issues.append(f"elevated_errors:{metrics['error_count']}")

# Check memory if metrics.get("memory_mb", 0) > 2048: issues.append(f"high_memory:{metrics['memory_mb']}MB")

if any("high_" in i for i in issues): self._trigger_alert(agent_id, "critical", issues) return "critical" elif issues: self._trigger_alert(agent_id, "warning", issues) return "warning" return "healthy"

def _trigger_alert(self, agent_id: str, severity: str, issues: List[str]): """Send alert via GreenHelix messaging.""" bot_name = self.bots[agent_id].name if agent_id in self.bots else agent_id

execute("send_message", { "from_agent_id": self.fleet_manager_id, "to_agent_id": self.fleet_manager_id, # Self-message for logging "message_type": "alert", "payload": { "severity": severity, "bot_name": bot_name, "agent_id": agent_id, "issues": issues, "timestamp": datetime.utcnow().isoformat(), } })

# Publish as event for audit trail execute("publish_event", { "agent_id": self.fleet_manager_id, "event_type": "fleet.health_alert", "payload": { "severity": severity, "agent_id": agent_id, "bot_name": bot_name, "issues": issues, } })

def detect_dead_bots(self) -> List[str]: """Find bots that have missed heartbeats beyond the threshold.""" now = datetime.utcnow() dead_bots = []

for agent_id, bot in self.bots.items(): record = self.health_records.get(agent_id) if record is None: # Never sent a heartbeat dead_bots.append(agent_id) continue

last_hb = datetime.fromisoformat(record["last_heartbeat"]) seconds_since = (now - last_hb).total_seconds()

if seconds_since > self.dead_threshold: dead_bots.append(agent_id) bot.status = "dead"

execute("publish_event", { "agent_id": self.fleet_manager_id, "event_type": "fleet.bot_dead", "payload": { "agent_id": agent_id, "bot_name": bot.name, "last_heartbeat": record["last_heartbeat"], "seconds_since_heartbeat": seconds_since, "group": bot.group, } })

return dead_bots

def get_fleet_health_dashboard(self) -> dict: """Aggregate health data for dashboard display.""" status_counts = {"healthy": 0, "warning": 0, "critical": 0, "dead": 0, "unknown": 0} group_health = {} total_pnl = 0.0 total_trades = 0

for agent_id, bot in self.bots.items(): record = self.health_records.get(agent_id) if record: status = record["status"] metrics = record["metrics"] total_pnl += metrics.get("pnl_usd", 0) total_trades += metrics.get("trade_count", 0) elif bot.status == "dead": status = "dead" else: status = "unknown"

status_counts[status] = status_counts.get(status, 0) + 1

if bot.group not in group_health: group_health[bot.group] = { "healthy": 0, "warning": 0, "critical": 0, "dead": 0, "unknown": 0, "pnl_usd": 0.0, "trade_count": 0, } group_health[bot.group][status] += 1 if record: group_health[bot.group]["pnl_usd"] += record["metrics"].get("pnl_usd", 0) group_health[bot.group]["trade_count"] += record["metrics"].get("trade_count", 0)

return { "timestamp": datetime.utcnow().isoformat(), "total_bots": len(self.bots), "status_summary": status_counts, "groups": group_health, "fleet_pnl_usd": round(total_pnl, 2), "fleet_trade_count": total_trades, }

def initiate_failover(self, dead_agent_id: str, replacement_agent_id: str): """Failover a dead bot's responsibilities to a replacement.""" dead_bot = self.bots.get(dead_agent_id) if not dead_bot: raise ValueError(f"Bot {dead_agent_id} not found")

# Send failover command to replacement bot execute("send_message", { "from_agent_id": self.fleet_manager_id, "to_agent_id": replacement_agent_id, "message_type": "command", "payload": { "command": "assume_responsibility", "from_bot": dead_agent_id, "group": dead_bot.group, "exchange": dead_bot.exchange, "strategy": dead_bot.strategy, "reason": "failover_from_dead_bot", "timestamp": datetime.utcnow().isoformat(), } })

execute("publish_event", { "agent_id": self.fleet_manager_id, "event_type": "fleet.failover", "payload": { "dead_bot": dead_agent_id, "replacement_bot": replacement_agent_id, "group": dead_bot.group, "exchange": dead_bot.exchange, } })

print(f"Failover: {dead_bot.name} -> " f"{self.bots[replacement_agent_id].name}")

Using the Health Monitor

health = FleetHealthMonitor(
    fleet_manager_id=fleet.manager_agent_id,
    bots=fleet.bots,
    heartbeat_interval_seconds=30,
    dead_threshold_seconds=120
)

# Simulate a bot heartbeat (in production, bots call this themselves) for agent_id, bot in fleet.bots.items(): health.record_heartbeat(agent_id, { "uptime_seconds": 86400, "heartbeat_latency_ms": 45, "pnl_usd": 234.50, "drawdown_pct": 1.2, "trade_count": 47, "open_positions": 3, "open_orders": 6, "memory_mb": 512, "cpu_pct": 15.3, "error_count": 0, })

# Check for dead bots dead = health.detect_dead_bots() if dead: print(f"Dead bots detected: {dead}")

# Get dashboard dashboard = health.get_fleet_health_dashboard() print(json.dumps(dashboard, indent=2))

# curl: Submit bot heartbeat metrics
curl -X POST https://sandbox.greenhelix.net/v1 \
  -H "Authorization: Bearer $API_KEY" \
  -H "Content-Type: application/json" \
  -d '{
    "tool": "submit_metrics",
    "input": {
      "agent_id": "bot-agent-id",
      "metrics": {
        "uptime_seconds": 86400,
        "heartbeat_latency_ms": 45,
        "pnl_usd": 234.50,
        "drawdown_pct": 1.2,
        "trade_count": 47,
        "open_positions": 3,
        "open_orders": 6,
        "memory_mb": 512,
        "cpu_pct": 15.3,
        "error_count": 0,
        "heartbeat_timestamp": "2026-04-07T14:00:00Z"
      }
    }
  }'

# curl: Register health webhook curl -X POST https://sandbox.greenhelix.net/v1 \ -H "Authorization: Bearer $API_KEY" \ -H "Content-Type: application/json" \ -d '{ "tool": "register_webhook", "input": { "agent_id": "fleet-manager-agent-id", "event_types": ["fleet.heartbeat", "fleet.health_alert", "fleet.bot_dead"], "url": "https://your-fleet-dashboard.example.com/webhooks/health", "secret": "webhook-signing-secret" } }'

Dead Bot Detection and Automatic Failover

Dead bot detection runs on a configurable interval (default: check every 30 seconds, declare dead after 120 seconds without a heartbeat). When a bot is declared dead, the monitor publishes a fleet.bot_dead event and the fleet manager can trigger automatic failover.

Failover strategy depends on the bot type. Stateless bots (spot arb, momentum) can fail over to a standby instance that was pre-registered but dormant. Stateful bots (options market makers with active positions) require careful handoff: the replacement bot needs to query the exchange for open positions before it starts making new decisions.

The health monitor does not make the failover decision automatically -- it detects the dead bot and notifies the fleet manager, which then decides whether to fai

数据来源:ClawHub ↗ · 中文优化:龙虾技能库
OpenClaw 技能定制 / 插件定制 / 私有工作流定制

免费技能或插件可能存在安全风险,如需更匹配、更安全的方案,建议联系付费定制

了解定制服务