Trading Bot Fleet Management: Unified Control for Multi-Bot Operations — 技能工具
v1.3.0Trading Bot Fleet Management: Unified Control for Multi-Bot Operations. Build a fleet management layer for 10+ trading bots with per-bot identity isolation,...
详细分析 ▾
运行时依赖
版本
Version 1.3.0 - Updated guide to reference the GreenHelix sandbox, enabling 500 free credits and no API key required for initial use. - Clarified that only `AGENT_SIGNING_KEY` is needed in credentials (removed `SSH_DEPLOY_KEY`). - Added instructions and notices related to the sandbox environment for code examples. - Minor copy and formatting improvements in the introductory sections.
安装命令 点击复制
技能文档
Notice: This is an educational guide with illustrative code examples.>
It does not execute code or install dependencies.
All examples use the GreenHelix sandbox (https://sandbox.greenhelix.net) which
provides 500 free credits — no API key required to get started.
Referenced credentials (you supply these in your own environment):
-AGENT_SIGNING_KEY: Cryptographic signing key for agent identity (Ed25519 key pair for request signing)
In March 2023, the Step Finance exploit drained $45M from Solana DeFi positions. The root cause was not a smart contract vulnerability or a novel cryptographic attack. It was operational: multiple bots shared API keys with no identity isolation. When one bot's credentials leaked, the attacker gained access to every bot, every exchange account, every withdrawal endpoint. No per-bot permissions. No unified health dashboard that would have caught anomalous behavior. No kill switch scoped to the compromised bot without killing the entire fleet. The team had to shut down everything, losing millions in unrealized positions across healthy bots while they figured out which one was compromised. This pattern repeats across the industry. Teams running 10+ bots accumulate the same structural debt: shared credentials passed through environment variables, no per-bot permission boundaries, health checks that are either absent or scattered across shell scripts nobody maintains, deployments that are "SSH in and restart the process," and cost tracking that lives in a spreadsheet updated monthly if someone remembers. When one bot goes rogue -- whether from a bug, a compromised key, or a strategy that hits an edge case -- the blast radius is the entire fleet. This guide builds a fleet management layer using GreenHelix's identity, messaging, and metrics tools. Each bot gets its own Ed25519 cryptographic identity, scoped permissions defining exactly what it can and cannot do, real-time health monitoring with automatic dead bot detection, coordinated deployment procedures, SLA tracking against defined performance targets, and per-bot cost allocation. Every concept comes with working Python code and equivalent curl commands against the GreenHelix API.
What You'll Learn
- Chapter 1: Fleet Architecture
- Chapter 2: FleetManager Class
- Chapter 3: BotIdentityManager Class
- Chapter 4: Permission Scoping
- Chapter 5: FleetHealthMonitor Class
- Chapter 6: Coordinated Deployments
- Chapter 7: SLA Tracking
- Chapter 8: Cost Allocation
- What's Next
Full Guide
# Trading Bot Fleet Management: Unified Control for Multi-Bot Operations
In March 2023, the Step Finance exploit drained $45M from Solana DeFi positions. The root cause was not a smart contract vulnerability or a novel cryptographic attack. It was operational: multiple bots shared API keys with no identity isolation. When one bot's credentials leaked, the attacker gained access to every bot, every exchange account, every withdrawal endpoint. No per-bot permissions. No unified health dashboard that would have caught anomalous behavior. No kill switch scoped to the compromised bot without killing the entire fleet. The team had to shut down everything, losing millions in unrealized positions across healthy bots while they figured out which one was compromised. This pattern repeats across the industry. Teams running 10+ bots accumulate the same structural debt: shared credentials passed through environment variables, no per-bot permission boundaries, health checks that are either absent or scattered across shell scripts nobody maintains, deployments that are "SSH in and restart the process," and cost tracking that lives in a spreadsheet updated monthly if someone remembers. When one bot goes rogue -- whether from a bug, a compromised key, or a strategy that hits an edge case -- the blast radius is the entire fleet.
This guide builds a fleet management layer using GreenHelix's identity, messaging, and metrics tools. Each bot gets its own Ed25519 cryptographic identity, scoped permissions defining exactly what it can and cannot do, real-time health monitoring with automatic dead bot detection, coordinated deployment procedures, SLA tracking against defined performance targets, and per-bot cost allocation. Every concept comes with working Python code and equivalent curl commands against the GreenHelix API.
Table of Contents
- Fleet Architecture
- FleetManager Class
- BotIdentityManager Class
- Permission Scoping
- FleetHealthMonitor Class
- Coordinated Deployments
- SLA Tracking
- Cost Allocation
- What's Next
Chapter 1: Fleet Architecture
Why Fleet Management Matters
The Step Finance breach is instructive because it was not exotic. The attack surface was not a zero-day in a cryptographic library. It was a predictable consequence of how most teams operate trading bots at scale: every bot shares the same API key, the same exchange credentials, the same infrastructure account. This is the "monolith credentials" antipattern, and it shows up in every post-mortem of operational trading failures that did not involve a market event.
Consider what a team with 15 bots typically looks like six months after launch:
- Shared credentials: All bots read the same
.envfile or Kubernetes secret. Rotating one key means touching every bot. - No permission boundaries: A bot designed to execute $500 momentum trades on Binance Spot has the same access as a bot managing $50,000 Deribit options positions. If the momentum bot is compromised, the attacker can withdraw from the options account.
- Scattered health checks: Bot #3 has a heartbeat endpoint. Bot #7 writes to a log file. Bots #1, #2, #4-6, and #8-15 have no health reporting at all. The team discovers a dead bot when a strategy stops producing PnL.
- Manual deployments: Updating a strategy requires SSH-ing into each server, pulling the latest code, restarting the process, and hoping the bot reconnects to the exchange websocket cleanly. There is no rollback procedure beyond "check out the previous git commit and restart again."
- No cost attribution: The team knows the monthly AWS bill and the total exchange fees. They do not know which bot costs the most to operate, which strategy has a negative ROI after infrastructure costs, or whether a bot that trades 200 times per day is actually profitable after accounting for exchange fees, API rate limit costs, and compute.
Fleet management solves all of these by treating each bot as an independently identified, independently monitored, independently permissioned entity within a unified control plane.
Architecture Overview
The fleet management architecture has four layers:
+-----------------------------------------------------------------------+
| Fleet Operator (Human) |
| Strategic decisions, policy, budgets |
+-----------------------------------------------------------------------+
|
v
+-----------------------------------------------------------------------+
| Fleet Manager (Agent) |
| register/deregister bots, issue commands, aggregate status |
| GreenHelix identity: fleet-manager-{org} |
+-----------------------------------------------------------------------+
|
+--------------------+--------------------+
| | |
v v v
+---------------+ +---------------+ +---------------+
| Bot Group: | | Bot Group: | | Bot Group: |
| Spot Arb | | Perp Momentum| | Options MM |
| 3 bots | | 5 bots | | 4 bots |
+---------------+ +---------------+ +---------------+
| | | | | | | | | | | |
v v v v v v v v v v v v
Individual bots, each with:
- Own Ed25519 identity
- Scoped permissions
- Health heartbeat
- Metrics reporting
- Cost tracking
The fleet operator is a human who sets policy: which strategies to run, on which exchanges, with what risk limits, and how much capital to allocate. The fleet manager is a GreenHelix agent that translates policy into operations: registering bots, issuing commands, monitoring health, orchestrating deployments. Bot groups organize bots by strategy type, and individual bots are the execution units, each with its own cryptographic identity.
GreenHelix Tools Used
This guide uses the following GreenHelix tools:
| Tool | Purpose |
|---|---|
register_agent | Create identity for fleet manager and each bot |
get_agent_identity | Retrieve and verify bot identity |
submit_metrics | Report health, PnL, latency, trade count |
get_sla_compliance | Monitor bots against defined SLA targets |
send_message | Fleet commands, alerts, inter-bot communication |
register_webhook | Real-time event delivery for health alerts |
search_agents_by_metrics | Find underperforming bots across the fleet |
get_agent_reputation | Track bot reliability over time |
create_event_schema | Define fleet event types |
publish_event | Emit fleet events (deploy, failover, alert) |
Fleet Hierarchy
The hierarchy maps to GreenHelix's identity model. The fleet manager is a registered agent. Each bot group is a metadata tag. Each individual bot is a registered agent with metadata linking it to its group and to the fleet manager.
# Hierarchy expressed in GreenHelix metadata
fleet_manager_metadata = {
"role": "fleet_manager",
"organization": "your-org-id",
"fleet_size": 12,
"groups": ["spot-arb", "perp-momentum", "options-mm"]
}bot_metadata = {
"role": "trading_bot",
"fleet_manager": "fleet-manager-{org}",
"group": "spot-arb",
"exchange": "binance",
"strategy": "cross-exchange-arb",
"version": "2.4.1",
"max_position_usd": 10000
}
This metadata is not cosmetic. It is queryable. When the fleet manager needs to find all bots in the "perp-momentum" group, it searches by metadata. When a health alert fires, the alert includes the bot's group and strategy so the operator knows immediately what type of bot is failing and what the potential market impact is.
Why Not Kubernetes Labels or Consul?
You might already use Kubernetes labels, Consul service tags, or Terraform metadata for infrastructure management. Those are fine for infrastructure concerns -- pod scheduling, service discovery, load balancing. They are not sufficient for trading bot fleet management because they operate at the wrong abstraction level. Kubernetes knows that a pod is running. It does not know that the pod is a trading bot with a $50,000 position limit on Deribit, that its Ed25519 key was last rotated 28 days ago, or that its SLA requires 99.9% uptime with sub-100ms exchange latency.
GreenHelix's identity layer operates at the application level. Each bot's identity carries its trading-specific metadata: strategy type, exchange, position limits, permission tier, key version. The fleet manager queries this metadata through GreenHelix's API, not through infrastructure tooling. This means the fleet management layer works identically whether your bots run on Kubernetes, bare metal, EC2 instances, or a mix of all three. The infrastructure is abstracted away; the fleet identity is portable.
The practical consequence: when you migrate from EC2 to Kubernetes (or vice versa), no fleet management code changes. The bots register the same identities, report the same metrics, and respond to the same commands regardless of the underlying compute platform.
Chapter 2: FleetManager Class
Central Management for Bot Fleet Operations
The FleetManager class is the control plane for all fleet operations. It handles bot registration, inventory management, fleet-wide commands, and status aggregation. Every operation goes through the GreenHelix API so that commands, status, and events are centralized and auditable.
Setup
import requests
import json
import time
import hashlib
from datetime import datetime, timedelta
from typing import Dict, List, Optional
from dataclasses import dataclass, fieldbase_url = "https://api.greenhelix.net/v1"
api_key = "your-api-key" # From GreenHelix dashboard
session = requests.Session()
session.headers["Authorization"] = f"Bearer {api_key}"
session.headers["Content-Type"] = "application/json"
def execute(tool: str, inputs: dict) -> dict:
"""Execute a GreenHelix tool and return the result."""
resp = session.post(
f"{base_url}/v1",
json={"tool": tool, "input": inputs}
)
resp.raise_for_status()
return resp.json()
Equivalent curl for any execute call throughout this guide:
curl -X POST https://sandbox.greenhelix.net/v1 \
-H "Authorization: Bearer $API_KEY" \
-H "Content-Type: application/json" \
-d '{
"tool": "register_agent",
"input": {
"name": "fleet-manager-acme",
"description": "Fleet manager for Acme Trading bot fleet",
"capabilities": ["fleet_management", "bot_orchestration"],
"metadata": {"role": "fleet_manager", "organization": "acme"}
}
}'
The FleetManager Class
@dataclass
class BotRecord:
"""Local record of a registered bot."""
agent_id: str
name: str
group: str
exchange: str
strategy: str
version: str
status: str = "active"
registered_at: str = ""
last_heartbeat: Optional[str] = None
class FleetManager:
"""Central management class for a trading bot fleet."""
def __init__(self, org_id: str):
self.org_id = org_id
self.manager_agent_id: Optional[str] = None
self.bots: Dict[str, BotRecord] = {}
self._register_manager()
def _register_manager(self):
"""Register the fleet manager agent on GreenHelix."""
result = execute("register_agent", {
"name": f"fleet-manager-{self.org_id}",
"description": f"Fleet manager for {self.org_id} trading bot fleet. "
f"Handles registration, commands, health, deployments.",
"capabilities": [
"fleet_management",
"bot_orchestration",
"health_monitoring",
"deployment_coordination"
],
"metadata": {
"role": "fleet_manager",
"organization": self.org_id
}
})
self.manager_agent_id = result["agent_id"]
print(f"Fleet manager registered: {self.manager_agent_id}")
def register_bot(self, name: str, group: str, exchange: str,
strategy: str, version: str,
capabilities: List[str] = None,
max_position_usd: float = 10000) -> str:
"""Register a new bot in the fleet."""
if capabilities is None:
capabilities = ["trading", "metrics_reporting"]
result = execute("register_agent", {
"name": name,
"description": f"Trading bot: {strategy} on {exchange}. "
f"Part of {group} group in {self.org_id} fleet.",
"capabilities": capabilities,
"metadata": {
"role": "trading_bot",
"fleet_manager": self.manager_agent_id,
"group": group,
"exchange": exchange,
"strategy": strategy,
"version": version,
"max_position_usd": max_position_usd,
"organization": self.org_id
}
})
agent_id = result["agent_id"]
self.bots[agent_id] = BotRecord(
agent_id=agent_id,
name=name,
group=group,
exchange=exchange,
strategy=strategy,
version=version,
registered_at=datetime.utcnow().isoformat()
)
print(f"Bot registered: {name} ({agent_id})")
return agent_id
def deregister_bot(self, agent_id: str):
"""Remove a bot from the fleet."""
if agent_id not in self.bots:
raise ValueError(f"Bot {agent_id} not found in fleet")
# Notify the bot to shut down gracefully
execute("send_message", {
"from_agent_id": self.manager_agent_id,
"to_agent_id": agent_id,
"message_type": "command",
"payload": {
"command": "shutdown",
"reason": "deregistered_from_fleet",
"grace_period_seconds": 30
}
})
bot = self.bots.pop(agent_id)
print(f"Bot deregistered: {bot.name} ({agent_id})")
def pause_all(self, reason: str = "operator_initiated"):
"""Pause all bots in the fleet. Bots stop opening new positions."""
for agent_id, bot in self.bots.items():
execute("send_message", {
"from_agent_id": self.manager_agent_id,
"to_agent_id": agent_id,
"message_type": "command",
"payload": {
"command": "pause",
"reason": reason,
"timestamp": datetime.utcnow().isoformat()
}
})
bot.status = "paused"
print(f"Fleet paused: {len(self.bots)} bots ({reason})")
def resume_all(self):
"""Resume all paused bots."""
for agent_id, bot in self.bots.items():
if bot.status == "paused":
execute("send_message", {
"from_agent_id": self.manager_agent_id,
"to_agent_id": agent_id,
"message_type": "command",
"payload": {
"command": "resume",
"timestamp": datetime.utcnow().isoformat()
}
})
bot.status = "active"
print(f"Fleet resumed: {len(self.bots)} bots")
def emergency_stop(self, reason: str = "emergency"):
"""Emergency stop: close all positions, cancel all orders, halt."""
for agent_id, bot in self.bots.items():
execute("send_message", {
"from_agent_id": self.manager_agent_id,
"to_agent_id": agent_id,
"message_type": "command",
"payload": {
"command": "emergency_stop",
"reason": reason,
"actions": [
"cancel_all_open_orders",
"close_all_positions_market",
"halt_trading_loop"
],
"timestamp": datetime.utcnow().isoformat()
}
})
bot.status = "stopped"
# Log the emergency stop event
execute("publish_event", {
"agent_id": self.manager_agent_id,
"event_type": "fleet.emergency_stop",
"payload": {
"reason": reason,
"bots_affected": len(self.bots),
"bot_ids": list(self.bots.keys()),
"timestamp": datetime.utcnow().isoformat()
}
})
print(f"EMERGENCY STOP: {len(self.bots)} bots halted ({reason})")
def get_fleet_status(self) -> Dict:
"""Aggregate status across all bots."""
status_counts = {"active": 0, "paused": 0, "stopped": 0, "dead": 0}
group_counts = {}
for bot in self.bots.values():
status_counts[bot.status] = status_counts.get(bot.status, 0) + 1
group_counts[bot.group] = group_counts.get(bot.group, 0) + 1
return {
"fleet_manager": self.manager_agent_id,
"total_bots": len(self.bots),
"status": status_counts,
"groups": group_counts,
"timestamp": datetime.utcnow().isoformat()
}
def send_group_command(self, group: str, command: str,
payload: dict = None):
"""Send a command to all bots in a specific group."""
targets = [
(aid, bot) for aid, bot in self.bots.items()
if bot.group == group
]
for agent_id, bot in targets:
execute("send_message", {
"from_agent_id": self.manager_agent_id,
"to_agent_id": agent_id,
"message_type": "command",
"payload": {
"command": command,
(payload or {}),
"timestamp": datetime.utcnow().isoformat()
}
})
print(f"Command '{command}' sent to {len(targets)} bots in {group}")
Registering a Fleet
fleet = FleetManager(org_id="acme-trading")# Spot arbitrage group
for i in range(3):
fleet.register_bot(
name=f"spot-arb-{i+1:02d}",
group="spot-arb",
exchange="binance" if i < 2 else "okx",
strategy="cross-exchange-arb",
version="2.4.1",
max_position_usd=10000
)
# Perpetual futures momentum group
for i in range(5):
exchanges = ["binance", "bybit", "okx", "deribit", "binance"]
fleet.register_bot(
name=f"perp-momentum-{i+1:02d}",
group="perp-momentum",
exchange=exchanges[i],
strategy="trend-following-perps",
version="1.8.3",
max_position_usd=25000
)
# Options market making group
for i in range(4):
fleet.register_bot(
name=f"options-mm-{i+1:02d}",
group="options-mm",
exchange="deribit",
strategy="delta-neutral-mm",
version="3.1.0",
capabilities=["trading", "metrics_reporting", "options_greeks"],
max_position_usd=50000
)
print(json.dumps(fleet.get_fleet_status(), indent=2))
# curl: Register a single bot
curl -X POST https://sandbox.greenhelix.net/v1 \
-H "Authorization: Bearer $API_KEY" \
-H "Content-Type: application/json" \
-d '{
"tool": "register_agent",
"input": {
"name": "spot-arb-01",
"description": "Trading bot: cross-exchange-arb on binance. Part of spot-arb group.",
"capabilities": ["trading", "metrics_reporting"],
"metadata": {
"role": "trading_bot",
"fleet_manager": "fleet-manager-agent-id",
"group": "spot-arb",
"exchange": "binance",
"strategy": "cross-exchange-arb",
"version": "2.4.1",
"max_position_usd": 10000
}
}
}'# curl: Send emergency stop to a bot
curl -X POST https://sandbox.greenhelix.net/v1 \
-H "Authorization: Bearer $API_KEY" \
-H "Content-Type: application/json" \
-d '{
"tool": "send_message",
"input": {
"from_agent_id": "fleet-manager-agent-id",
"to_agent_id": "bot-agent-id",
"message_type": "command",
"payload": {
"command": "emergency_stop",
"reason": "anomalous_behavior_detected",
"actions": ["cancel_all_open_orders", "close_all_positions_market", "halt_trading_loop"]
}
}
}'
Fleet Command Patterns
Three command scopes cover all operational scenarios:
Fleet-wide: pause_all, resume_all, emergency_stop. Used for market-wide events (flash crash, exchange outage, security incident). These hit every bot regardless of group.
Group-scoped: send_group_command. Used for strategy-specific actions: updating parameters for all momentum bots, pausing all options bots before an expiry event, or upgrading all arb bots to a new version.
Individual: Direct send_message to a single bot. Used for surgical interventions: adjusting one bot's position limits, forcing one bot to close a specific position, or rotating one bot's exchange API keys.
The command hierarchy means that a compromised spot arb bot can be killed without touching the options market makers. That is the entire point: blast radius containment through identity isolation.
Chapter 3: BotIdentityManager Class
Per-Bot Cryptographic Identity
Every bot in the fleet gets its own Ed25519 keypair. This is not optional. Shared keys are how Step Finance happened. Per-bot keys mean that compromising one bot's key gives the attacker access to exactly one bot. The fleet manager can revoke that single identity without affecting any other bot.
Ed25519 was chosen for three reasons: it produces compact 64-byte signatures, signing is fast enough that it adds negligible latency to trading operations (tens of microseconds), and the key generation is deterministic from a seed, making backup and recovery straightforward.
The BotIdentityManager Class
import nacl.signing
import nacl.encoding
import base64
import secrets
import os
from typing import Tuple
class BotIdentityManager:
"""Manages Ed25519 identities for trading bots."""
def __init__(self, fleet_manager_id: str, key_store_path: str = "/secure/keys"):
self.fleet_manager_id = fleet_manager_id
self.key_store_path = key_store_path
self.identities: Dict[str, dict] = {} # agent_id -> identity record
def generate_keypair(self, bot_name: str) -> Tuple[str, str]:
"""Generate an Ed25519 keypair for a bot.
Returns (public_key_b64, private_key_b64).
"""
signing_key = nacl.signing.SigningKey.generate()
verify_key = signing_key.verify_key
private_b64 = base64.b64encode(
signing_key.encode()
).decode("utf-8")
public_b64 = base64.b64encode(
verify_key.encode()
).decode("utf-8")
# Store private key securely -- in production, use a secrets manager
key_path = os.path.join(self.key_store_path, f"{bot_name}.key")
os.makedirs(os.path.dirname(key_path), exist_ok=True)
with open(key_path, "w") as f:
f.write(private_b64)
os.chmod(key_path, 0o600) # Owner read/write only
return public_b64, private_b64
def register_identity(self, bot_name: str, group: str,
exchange: str, strategy: str,
version: str,
permissions: List[str] = None) -> dict:
"""Generate keys and register identity on GreenHelix."""
public_key, private_key = self.generate_keypair(bot_name)
if permissions is None:
permissions = ["trade", "report_metrics"]
# Register the agent with its public key
result = execute("register_agent", {
"name": bot_name,
"description": f"Trading bot: {strategy} on {exchange}",
"capabilities": permissions,
"metadata": {
"role": "trading_bot",
"fleet_manager": self.fleet_manager_id,
"group": group,
"exchange": exchange,
"strategy": strategy,
"version": version,
"public_key": public_key,
"key_algorithm": "ed25519",
"registered_at": datetime.utcnow().isoformat()
}
})
agent_id = result["agent_id"]
identity_record = {
"agent_id": agent_id,
"bot_name": bot_name,
"public_key": public_key,
"group": group,
"exchange": exchange,
"permissions": permissions,
"created_at": datetime.utcnow().isoformat(),
"key_version": 1,
"status": "active"
}
self.identities[agent_id] = identity_record
return identity_record
def verify_identity(self, agent_id: str) -> dict:
"""Retrieve and verify a bot's identity from GreenHelix."""
result = execute("get_agent_identity", {
"agent_id": agent_id
})
local_record = self.identities.get(agent_id)
if local_record:
# Verify public key matches what we registered
remote_key = result.get("metadata", {}).get("public_key")
if remote_key != local_record["public_key"]:
raise SecurityError(
f"Public key mismatch for {agent_id}. "
f"Expected: {local_record['public_key'][:16]}... "
f"Got: {remote_key[:16] if remote_key else 'None'}... "
f"Possible key tampering."
)
return result
def rotate_key(self, agent_id: str) -> dict:
"""Rotate a bot's Ed25519 keypair without downtime.
The rotation procedure:
1. Generate new keypair
2. Update GreenHelix identity with new public key
3. Bot continues operating -- it picks up the new key on next heartbeat
4. Old key is archived (not deleted) for signature verification of historical events
"""
if agent_id not in self.identities:
raise ValueError(f"No identity record for {agent_id}")
record = self.identities[agent_id]
bot_name = record["bot_name"]
# Archive old key
old_key_path = os.path.join(
self.key_store_path,
f"{bot_name}.key.v{record['key_version']}"
)
current_key_path = os.path.join(
self.key_store_path, f"{bot_name}.key"
)
if os.path.exists(current_key_path):
os.rename(current_key_path, old_key_path)
# Generate new keypair
new_public, new_private = self.generate_keypair(bot_name)
# Update the identity on GreenHelix -- submit metrics indicating rotation
execute("submit_metrics", {
"agent_id": agent_id,
"metrics": {
"key_rotation": 1,
"key_version": record["key_version"] + 1,
"rotation_timestamp": datetime.utcnow().isoformat()
}
})
# Notify the bot to pick up the new key
execute("send_message", {
"from_agent_id": self.fleet_manager_id,
"to_agent_id": agent_id,
"message_type": "command",
"payload": {
"command": "rotate_key",
"new_public_key": new_public,
"key_version": record["key_version"] + 1,
"effective_at": datetime.utcnow().isoformat()
}
})
record["public_key"] = new_public
record["key_version"] += 1
print(f"Key rotated for {bot_name}: v{record['key_version']}")
return record
def revoke_identity(self, agent_id: str, reason: str):
"""Revoke a compromised bot's identity.
This is the nuclear option: the bot can no longer authenticate.
Use when a bot is confirmed compromised.
"""
if agent_id not in self.identities:
raise ValueError(f"No identity record for {agent_id}")
record = self.identities[agent_id]
# Send shutdown command before revocation
execute("send_message", {
"from_agent_id": self.fleet_manager_id,
"to_agent_id": agent_id,
"message_type": "command",
"payload": {
"command": "emergency_stop",
"reason": f"identity_revoked: {reason}",
"actions": [
"cancel_all_open_orders",
"close_all_positions_market",
"halt_trading_loop",
"destroy_local_keys"
]
}
})
# Publish revocation event for audit trail
execute("publish_event", {
"agent_id": self.fleet_manager_id,
"event_type": "fleet.identity_revoked",
"payload": {
"revoked_agent_id": agent_id,
"bot_name": record["bot_name"],
"reason": reason,
"revoked_at": datetime.utcnow().isoformat(),
"key_version_revoked": record["key_version"]
}
})
# Delete the private key
key_path = os.path.join(
self.key_store_path, f"{record['bot_name']}.key"
)
if os.path.exists(key_path):
# Overwrite with random data before unlinking
with open(key_path, "wb") as f:
f.write(secrets.token_bytes(64))
os.unlink(key_path)
record["status"] = "revoked"
print(f"Identity revoked: {record['bot_name']} ({agent_id}): {reason}")
def list_active_identities(self) -> List[dict]:
"""List all active bot identities."""
return [
r for r in self.identities.values()
if r["status"] == "active"
]
# curl: Get a bot's identity
curl -X POST https://sandbox.greenhelix.net/v1 \
-H "Authorization: Bearer $API_KEY" \
-H "Content-Type: application/json" \
-d '{
"tool": "get_agent_identity",
"input": {
"agent_id": "bot-agent-id-here"
}
}'# curl: Notify bot of key rotation via message
curl -X POST https://sandbox.greenhelix.net/v1 \
-H "Authorization: Bearer $API_KEY" \
-H "Content-Type: application/json" \
-d '{
"tool": "send_message",
"input": {
"from_agent_id": "fleet-manager-agent-id",
"to_agent_id": "bot-agent-id",
"message_type": "command",
"payload": {
"command": "rotate_key",
"new_public_key": "base64-encoded-new-public-key",
"key_version": 2,
"effective_at": "2026-04-07T14:30:00Z"
}
}
}'
Key Rotation Without Downtime
Key rotation is the most operationally sensitive identity operation. The bot must continue trading during rotation -- you cannot afford a gap where the bot has no valid key. The procedure is:
- Generate new keypair on the fleet manager side.
- Archive the old private key (do not delete -- you need it to verify signatures on historical events).
- Update GreenHelix with the new public key via metrics submission.
- Notify the bot* via
send_messagewith the new key material. - The bot picks up the new key, switches to it, and acknowledges.
Between steps 4 and 5, the bot is still signing with the old key, and that is fine. The old key is still valid until the bot confirms the switch. There is no window where the bot has no valid signing key.
A sane rotation schedule is every 30 days for production bots, immediately upon any suspected compromise, and after any personnel change on the team (someone leaves, all keys rotate).
Identity Revocation for Compromised Bots
Revocation is destructive and immediate. When you confirm a bot is compromised, you do not rotate -- you revoke. The difference: rotation preserves continuity (the bot keeps running with a new key), while revocation terminates the bot's ability to operate entirely.
The revocation procedure overwrites the private key file with random data before deleting it. A simple os.unlink() leaves the key material on disk until the filesystem overwrites those blocks. Writing random data first ensures the key is unrecoverable without forensic disk analysis, and even that becomes unreliable on SSDs with wear leveling.
Chapter 4: Permission Scoping
Principle of Least Privilege for Trading Bots
A spot arb bot that compares prices across exchanges needs read access to two order books and the ability to place limit orders on two exchanges. It does not need withdrawal permissions. It does not need the ability to modify account settings. It does not need access to the options chain on Deribit. Every permission beyond what the bot needs is attack surface.
The principle of least privilege states that each bot should have the minimum set of permissions required to execute its strategy and nothing more. This is straightforward in theory and consistently ignored in practice because it requires upfront work: defining permission tiers, mapping strategies to permissions, and enforcing boundaries.
Permission Tiers
Four tiers cover the range of trading bot operations:
from enum import Enum
from typing import Set
class PermissionTier(Enum):
READ_ONLY = "read_only"
TRADE = "trade"
WITHDRAW = "withdraw"
ADMIN = "admin"
# What each tier can do
TIER_PERMISSIONS: Dict[PermissionTier, Set[str]] = {
PermissionTier.READ_ONLY: {
"read_orderbook",
"read_positions",
"read_balances",
"read_trade_history",
"report_metrics",
"receive_commands",
},
PermissionTier.TRADE: {
"read_orderbook",
"read_positions",
"read_balances",
"read_trade_history",
"report_metrics",
"receive_commands",
"place_order",
"cancel_order",
"modify_order",
},
PermissionTier.WITHDRAW: {
"read_orderbook",
"read_positions",
"read_balances",
"read_trade_history",
"report_metrics",
"receive_commands",
"place_order",
"cancel_order",
"modify_order",
"withdraw_funds",
"transfer_between_accounts",
},
PermissionTier.ADMIN: {
"read_orderbook",
"read_positions",
"read_balances",
"read_trade_history",
"report_metrics",
"receive_commands",
"place_order",
"cancel_order",
"modify_order",
"withdraw_funds",
"transfer_between_accounts",
"modify_api_keys",
"modify_account_settings",
"register_sub_accounts",
},
}
Per-Exchange Permission Mapping
Different exchanges have different permission models, but they all support the same core concept: API keys with scoped permissions. The PermissionManager maps abstract tiers to exchange-specific permission sets.
class PermissionManager:
"""Maps permission tiers to exchange-specific settings.""" EXCHANGE_PERMISSION_MAP = {
"binance": {
PermissionTier.READ_ONLY: {
"enableReading": True,
"enableSpotAndMarginTrading": False,
"enableWithdrawals": False,
"enableFutures": False,
},
PermissionTier.TRADE: {
"enableReading": True,
"enableSpotAndMarginTrading": True,
"enableWithdrawals": False,
"enableFutures": True,
},
PermissionTier.WITHDRAW: {
"enableReading": True,
"enableSpotAndMarginTrading": True,
"enableWithdrawals": True,
"enableFutures": True,
"withdrawalAddressWhitelist": True,
},
},
"deribit": {
PermissionTier.READ_ONLY: {
"scope": "read",
},
PermissionTier.TRADE: {
"scope": "trade:read",
},
PermissionTier.WITHDRAW: {
"scope": "trade:read:withdraw",
},
},
"okx": {
PermissionTier.READ_ONLY: {
"perm": "read_only",
"trade": False,
"withdraw": False,
},
PermissionTier.TRADE: {
"perm": "trade",
"trade": True,
"withdraw": False,
},
PermissionTier.WITHDRAW: {
"perm": "trade",
"trade": True,
"withdraw": True,
"ip_whitelist_required": True,
},
},
}
def __init__(self, fleet_manager_id: str):
self.fleet_manager_id = fleet_manager_id
self.bot_permissions: Dict[str, dict] = {}
def assign_permissions(self, agent_id: str, bot_name: str,
exchange: str,
tier: PermissionTier,
custom_restrictions: dict = None) -> dict:
"""Assign a permission tier to a bot and record it."""
base_permissions = TIER_PERMISSIONS[tier]
exchange_config = self.EXCHANGE_PERMISSION_MAP.get(
exchange, {}
).get(tier, {})
record = {
"agent_id": agent_id,
"bot_name": bot_name,
"exchange": exchange,
"tier": tier.value,
"abstract_permissions": list(base_permissions),
"exchange_config": exchange_config,
"custom_restrictions": custom_restrictions or {},
"assigned_at": datetime.utcnow().isoformat(),
}
self.bot_permissions[agent_id] = record
# Record permission assignment on GreenHelix
execute("submit_metrics", {
"agent_id": agent_id,
"metrics": {
"permission_tier": tier.value,
"permission_count": len(base_permissions),
"exchange": exchange,
"has_withdraw": "withdraw_funds" in base_permissions,
"has_admin": tier == PermissionTier.ADMIN,
}
})
# Publish audit event
execute("publish_event", {
"agent_id": self.fleet_manager_id,
"event_type": "fleet.permission_assigned",
"payload": {
"target_agent_id": agent_id,
"bot_name": bot_name,
"tier": tier.value,
"exchange": exchange,
"permissions": list(base_permissions),
}
})
return record
def check_permission(self, agent_id: str,
action: str) -> bool:
"""Check if a bot has permission for an action."""
record = self.bot_permissions.get(agent_id)
if not record:
return False
return action in record["abstract_permissions"]
def escalate_permission(self, agent_id: str,
new_tier: PermissionTier,
reason: str,
approved_by: str):
"""Escalate a bot's permissions. Requires explicit approval."""
record = self.bot_permissions.get(agent_id)
if not record:
raise ValueError(f"No permission record for {agent_id}")
old_tier = record["tier"]
# Log the escalation for audit
execute("publish_event", {
"agent_id": self.fleet_manager_id,
"event_type": "fleet.permission_escalated",
"payload": {
"target_agent_id": agent_id,
"bot_name": record["bot_name"],
"old_tier": old_tier,
"new_tier": new_tier.value,
"reason": reason,
"approved_by": approved_by,
"escalated_at": datetime.utcnow().isoformat(),
}
})
# Update the record
new_permissions = TIER_PERMISSIONS[new_tier]
record["tier"] = new_tier.value
record["abstract_permissions"] = list(new_permissions)
record["exchange_config"] = self.EXCHANGE_PERMISSION_MAP.get(
record["exchange"], {}
).get(new_tier, {})
print(f"Permission escalated for {record['bot_name']}: "
f"{old_tier} -> {new_tier.value} (reason: {reason})")
def get_fleet_permission_summary(self) -> Dict:
"""Summary of permissions across the fleet."""
tier_counts = {}
withdraw_enabled = []
admin_enabled = []
for agent_id, record in self.bot_permissions.items():
tier = record["tier"]
tier_counts[tier] = tier_counts.get(tier, 0) + 1
if "withdraw_funds" in record["abstract_permissions"]:
withdraw_enabled.append(record["bot_name"])
if record["tier"] == "admin":
admin_enabled.append(record["bot_name"])
return {
"total_bots": len(self.bot_permissions),
"tiers": tier_counts,
"withdraw_enabled": withdraw_enabled,
"admin_enabled": admin_enabled,
}
Cross-Bot Permission Isolation
Permission isolation is enforced at two levels. First, at the GreenHelix level: each bot has its own agent ID, and GreenHelix tools scope operations to the authenticated agent. Bot A cannot submit metrics as Bot B. Bot A cannot read Bot B's messages. This is identity-level isolation.
Second, at the exchange level: each bot should use its own exchange API key with permissions matching its tier. Do not share exchange API keys across bots. This is the lesson from Step Finance -- even if you have perfect identity isolation at the fleet management layer, shared exchange keys collapse the isolation at the point that matters most.
# Assign permissions to the fleet
perm_mgr = PermissionManager(fleet_manager_id=fleet.manager_agent_id)# Spot arb bots: TRADE tier (no withdrawals)
for agent_id, bot in fleet.bots.items():
if bot.group == "spot-arb":
perm_mgr.assign_permissions(
agent_id=agent_id,
bot_name=bot.name,
exchange=bot.exchange,
tier=PermissionTier.TRADE,
custom_restrictions={
"max_order_size_usd": 5000,
"allowed_pairs": ["BTC/USDT", "ETH/USDT", "SOL/USDT"],
}
)
# Options MM bots: TRADE tier with additional restrictions
for agent_id, bot in fleet.bots.items():
if bot.group == "options-mm":
perm_mgr.assign_permissions(
agent_id=agent_id,
bot_name=bot.name,
exchange=bot.exchange,
tier=PermissionTier.TRADE,
custom_restrictions={
"max_order_size_usd": 25000,
"allowed_instruments": ["BTC--C", "BTC--P", "ETH--C", "ETH--P"],
"max_delta_exposure": 0.5,
}
)
print(json.dumps(perm_mgr.get_fleet_permission_summary(), indent=2))
The key insight: no bot in this fleet has withdrawal permissions. Withdrawals should only be executed by a dedicated withdrawal agent that is not connected to exchange websockets and has its own approval workflow (ideally requiring human confirmation). A compromised trading bot that cannot withdraw can, at worst, make bad trades. A compromised bot with withdrawal access can empty the exchange account.
Chapter 5: FleetHealthMonitor Class
Real-Time Health Across All Bots
Health monitoring is the difference between discovering a dead bot immediately and discovering it three days later when someone notices a strategy stopped making money. For a fleet of 12 bots, you need centralized health that answers three questions in real time: is each bot alive, is each bot performing within expectations, and are there any bots that need intervention?
Metrics Collected
Each bot reports the following metrics on every heartbeat:
| Metric | Type | Description |
|---|---|---|
uptime_seconds | gauge | Time since last restart |
heartbeat_latency_ms | gauge | Round-trip time to exchange websocket |
pnl_usd | gauge | Unrealized + realized PnL since start of day |
drawdown_pct | gauge | Current drawdown from peak equity |
trade_count | counter | Number of trades executed since start of day |
open_positions | gauge | Number of currently open positions |
open_orders | gauge | Number of currently open orders |
memory_mb | gauge | Process memory usage |
cpu_pct | gauge | Process CPU usage |
error_count | counter | Number of errors since last heartbeat |
The FleetHealthMonitor Class
class FleetHealthMonitor:
"""Monitors health across all bots in the fleet.""" def __init__(self, fleet_manager_id: str, bots: Dict[str, BotRecord],
heartbeat_interval_seconds: int = 30,
dead_threshold_seconds: int = 120):
self.fleet_manager_id = fleet_manager_id
self.bots = bots
self.heartbeat_interval = heartbeat_interval_seconds
self.dead_threshold = dead_threshold_seconds
self.health_records: Dict[str, dict] = {}
self.alert_callbacks: List[callable] = []
self._setup_webhooks()
def _setup_webhooks(self):
"""Register webhooks for real-time health alerts."""
execute("register_webhook", {
"agent_id": self.fleet_manager_id,
"event_types": [
"fleet.heartbeat",
"fleet.health_alert",
"fleet.bot_dead"
],
"url": "https://your-fleet-dashboard.example.com/webhooks/health",
"secret": "webhook-signing-secret"
})
def record_heartbeat(self, agent_id: str, metrics: dict):
"""Record a heartbeat from a bot."""
now = datetime.utcnow().isoformat()
# Submit metrics to GreenHelix
execute("submit_metrics", {
"agent_id": agent_id,
"metrics": {
*metrics,
"heartbeat_timestamp": now,
}
})
self.health_records[agent_id] = {
"agent_id": agent_id,
"bot_name": self.bots[agent_id].name if agent_id in self.bots else "unknown",
"last_heartbeat": now,
"metrics": metrics,
"status": self._evaluate_health(agent_id, metrics),
}
def _evaluate_health(self, agent_id: str, metrics: dict) -> str:
"""Evaluate bot health from metrics. Returns status string."""
issues = []
# Check drawdown
if metrics.get("drawdown_pct", 0) > 10:
issues.append(f"high_drawdown:{metrics['drawdown_pct']:.1f}%")
elif metrics.get("drawdown_pct", 0) > 5:
issues.append(f"elevated_drawdown:{metrics['drawdown_pct']:.1f}%")
# Check latency
if metrics.get("heartbeat_latency_ms", 0) > 5000:
issues.append(f"high_latency:{metrics['heartbeat_latency_ms']}ms")
elif metrics.get("heartbeat_latency_ms", 0) > 1000:
issues.append(f"elevated_latency:{metrics['heartbeat_latency_ms']}ms")
# Check errors
if metrics.get("error_count", 0) > 10:
issues.append(f"high_errors:{metrics['error_count']}")
elif metrics.get("error_count", 0) > 3:
issues.append(f"elevated_errors:{metrics['error_count']}")
# Check memory
if metrics.get("memory_mb", 0) > 2048:
issues.append(f"high_memory:{metrics['memory_mb']}MB")
if any("high_" in i for i in issues):
self._trigger_alert(agent_id, "critical", issues)
return "critical"
elif issues:
self._trigger_alert(agent_id, "warning", issues)
return "warning"
return "healthy"
def _trigger_alert(self, agent_id: str, severity: str,
issues: List[str]):
"""Send alert via GreenHelix messaging."""
bot_name = self.bots[agent_id].name if agent_id in self.bots else agent_id
execute("send_message", {
"from_agent_id": self.fleet_manager_id,
"to_agent_id": self.fleet_manager_id, # Self-message for logging
"message_type": "alert",
"payload": {
"severity": severity,
"bot_name": bot_name,
"agent_id": agent_id,
"issues": issues,
"timestamp": datetime.utcnow().isoformat(),
}
})
# Publish as event for audit trail
execute("publish_event", {
"agent_id": self.fleet_manager_id,
"event_type": "fleet.health_alert",
"payload": {
"severity": severity,
"agent_id": agent_id,
"bot_name": bot_name,
"issues": issues,
}
})
def detect_dead_bots(self) -> List[str]:
"""Find bots that have missed heartbeats beyond the threshold."""
now = datetime.utcnow()
dead_bots = []
for agent_id, bot in self.bots.items():
record = self.health_records.get(agent_id)
if record is None:
# Never sent a heartbeat
dead_bots.append(agent_id)
continue
last_hb = datetime.fromisoformat(record["last_heartbeat"])
seconds_since = (now - last_hb).total_seconds()
if seconds_since > self.dead_threshold:
dead_bots.append(agent_id)
bot.status = "dead"
execute("publish_event", {
"agent_id": self.fleet_manager_id,
"event_type": "fleet.bot_dead",
"payload": {
"agent_id": agent_id,
"bot_name": bot.name,
"last_heartbeat": record["last_heartbeat"],
"seconds_since_heartbeat": seconds_since,
"group": bot.group,
}
})
return dead_bots
def get_fleet_health_dashboard(self) -> dict:
"""Aggregate health data for dashboard display."""
status_counts = {"healthy": 0, "warning": 0, "critical": 0, "dead": 0, "unknown": 0}
group_health = {}
total_pnl = 0.0
total_trades = 0
for agent_id, bot in self.bots.items():
record = self.health_records.get(agent_id)
if record:
status = record["status"]
metrics = record["metrics"]
total_pnl += metrics.get("pnl_usd", 0)
total_trades += metrics.get("trade_count", 0)
elif bot.status == "dead":
status = "dead"
else:
status = "unknown"
status_counts[status] = status_counts.get(status, 0) + 1
if bot.group not in group_health:
group_health[bot.group] = {
"healthy": 0, "warning": 0,
"critical": 0, "dead": 0, "unknown": 0,
"pnl_usd": 0.0, "trade_count": 0,
}
group_health[bot.group][status] += 1
if record:
group_health[bot.group]["pnl_usd"] += record["metrics"].get("pnl_usd", 0)
group_health[bot.group]["trade_count"] += record["metrics"].get("trade_count", 0)
return {
"timestamp": datetime.utcnow().isoformat(),
"total_bots": len(self.bots),
"status_summary": status_counts,
"groups": group_health,
"fleet_pnl_usd": round(total_pnl, 2),
"fleet_trade_count": total_trades,
}
def initiate_failover(self, dead_agent_id: str,
replacement_agent_id: str):
"""Failover a dead bot's responsibilities to a replacement."""
dead_bot = self.bots.get(dead_agent_id)
if not dead_bot:
raise ValueError(f"Bot {dead_agent_id} not found")
# Send failover command to replacement bot
execute("send_message", {
"from_agent_id": self.fleet_manager_id,
"to_agent_id": replacement_agent_id,
"message_type": "command",
"payload": {
"command": "assume_responsibility",
"from_bot": dead_agent_id,
"group": dead_bot.group,
"exchange": dead_bot.exchange,
"strategy": dead_bot.strategy,
"reason": "failover_from_dead_bot",
"timestamp": datetime.utcnow().isoformat(),
}
})
execute("publish_event", {
"agent_id": self.fleet_manager_id,
"event_type": "fleet.failover",
"payload": {
"dead_bot": dead_agent_id,
"replacement_bot": replacement_agent_id,
"group": dead_bot.group,
"exchange": dead_bot.exchange,
}
})
print(f"Failover: {dead_bot.name} -> "
f"{self.bots[replacement_agent_id].name}")
Using the Health Monitor
health = FleetHealthMonitor(
fleet_manager_id=fleet.manager_agent_id,
bots=fleet.bots,
heartbeat_interval_seconds=30,
dead_threshold_seconds=120
)# Simulate a bot heartbeat (in production, bots call this themselves)
for agent_id, bot in fleet.bots.items():
health.record_heartbeat(agent_id, {
"uptime_seconds": 86400,
"heartbeat_latency_ms": 45,
"pnl_usd": 234.50,
"drawdown_pct": 1.2,
"trade_count": 47,
"open_positions": 3,
"open_orders": 6,
"memory_mb": 512,
"cpu_pct": 15.3,
"error_count": 0,
})
# Check for dead bots
dead = health.detect_dead_bots()
if dead:
print(f"Dead bots detected: {dead}")
# Get dashboard
dashboard = health.get_fleet_health_dashboard()
print(json.dumps(dashboard, indent=2))
# curl: Submit bot heartbeat metrics
curl -X POST https://sandbox.greenhelix.net/v1 \
-H "Authorization: Bearer $API_KEY" \
-H "Content-Type: application/json" \
-d '{
"tool": "submit_metrics",
"input": {
"agent_id": "bot-agent-id",
"metrics": {
"uptime_seconds": 86400,
"heartbeat_latency_ms": 45,
"pnl_usd": 234.50,
"drawdown_pct": 1.2,
"trade_count": 47,
"open_positions": 3,
"open_orders": 6,
"memory_mb": 512,
"cpu_pct": 15.3,
"error_count": 0,
"heartbeat_timestamp": "2026-04-07T14:00:00Z"
}
}
}'# curl: Register health webhook
curl -X POST https://sandbox.greenhelix.net/v1 \
-H "Authorization: Bearer $API_KEY" \
-H "Content-Type: application/json" \
-d '{
"tool": "register_webhook",
"input": {
"agent_id": "fleet-manager-agent-id",
"event_types": ["fleet.heartbeat", "fleet.health_alert", "fleet.bot_dead"],
"url": "https://your-fleet-dashboard.example.com/webhooks/health",
"secret": "webhook-signing-secret"
}
}'
Dead Bot Detection and Automatic Failover
Dead bot detection runs on a configurable interval (default: check every 30 seconds, declare dead after 120 seconds without a heartbeat). When a bot is declared dead, the monitor publishes a fleet.bot_dead event and the fleet manager can trigger automatic failover.
Failover strategy depends on the bot type. Stateless bots (spot arb, momentum) can fail over to a standby instance that was pre-registered but dormant. Stateful bots (options market makers with active positions) require careful handoff: the replacement bot needs to query the exchange for open positions before it starts making new decisions.
The health monitor does not make the failover decision automatically -- it detects the dead bot and notifies the fleet manager, which then decides whether to fai
免费技能或插件可能存在安全风险,如需更匹配、更安全的方案,建议联系付费定制