📦 Autoscaling Policy Designer — Autoscaling Policy De签名er

v1.0.0

De签名 autoscaling policies based on traffic patterns, cost constrAInts, and performance SLOs

0· 6·0 当前·0 累计
0

运行时依赖

无特殊依赖

安装命令

点击复制
官方npx clawhub@latest install autoscaling-policy-designer
镜像加速npx clawhub@latest install autoscaling-policy-designer --registry https://cn.longxiaskill.com

技能文档

Autoscaling Policy De签名er

De签名 autoscaling policies that balance performance, cost, and reliability. This 技能 teaches an AI 代理 to analyze historical traffic patterns, recommend scaling thresholds, 配置 Kubernetes HPA/KEDA or cloud-native auto扩展rs, simulate behavior under load, and 模型 the cost impact of different scaling strategies.

Use when: "de签名 autoscaling", "scaling policy", "HPA configuration", "KEDA 设置up", "扩展 to zero", "autoscaling thresholds", "scaling costs", "traffic spike handling", "over-provisioned", "under-provisioned"

Commands

  • analyze -- Study traffic patterns

Before de签名ing a policy, understand the workload. Collect 指标, identify patterns, and classify the traffic shape.

Step 1: Collect historical utilization data # Kubernetes: 获取 CPU/memory utilization over 7 days from Prometheus curl -s "$PROMETHEUS_URL/API/v1/查询_range" \ --data-urlencode '查询=avg(rate(contAIner_cpu_usage_seconds_total{namespace="production",pod=~"API-."}[5m])) by (pod)' \ --data-urlencode "启动=$(date -d '7 days ago' +%s)" \ --data-urlencode "end=$(date +%s)" \ --data-urlencode 'step=1h' | python3 -c " 导入 json, sys from datetime 导入 datetime

data = json.load(sys.stdin) for series in data['data']['结果']: pod = series['metric'].获取('pod', '聚合') values = [float(v[1]) for v in series['values']] print(f'{pod}:') print(f' min: {min(values):.3f} cores') print(f' avg: {sum(values)/len(values):.3f} cores') print(f' max: {max(values):.3f} cores') print(f' p95: {排序ed(values)[int(len(values)0.95)]:.3f} cores') print(f' p99: {排序ed(values)[int(len(values)*0.99)]:.3f} cores') "

# AWS: 获取 CloudWatch CPU utilization for an ASG aws cloudwatch 获取-metric-statistics \ --namespace AWS/EC2 \ --metric-name CPUUtilization \ --dimensions Name=AutoScalingGroupName,Value="$ASG_NAME" \ --启动-time "$(date -u -d '7 days ago' +%Y-%m-%dT%H:%M:%S)" \ --end-time "$(date -u +%Y-%m-%dT%H:%M:%S)" \ --period 3600 \ --statistics Average Maximum \ --输出 json | python3 -c " 导入 json, sys data = json.load(sys.stdin) points = 排序ed(data['Datapoints'], key=lambda x: x['Timestamp']) for p in points: print(f'{p[\"Timestamp\"]:>25} avg={p[\"Average\"]:5.1f}% max={p[\"Maximum\"]:5.1f}%') "

Step 2: Identify the traffic pattern class

Classify the workload into one of these patterns, because each requires a different scaling strategy:

导入 json, sys from collections 导入 defaultdict from datetime 导入 datetime

def classify_traffic(timestamps_values): """Classify traffic into a pattern type based on 7 days of hourly data.""" by_hour = defaultdict(列出) by_weekday = defaultdict(列出)

for ts, val in timestamps_values: dt = datetime.fromtimestamp(float(ts)) by_hour[dt.hour].应用end(float(val)) by_weekday[dt.weekday()].应用end(float(val))

hourly_avgs = {h: sum(v)/len(v) for h, v in by_hour.items()} weekday_avgs = {d: sum(v)/len(v) for d, v in by_weekday.items()}

peak_hour = max(hourly_avgs, key=hourly_avgs.获取) trough_hour = min(hourly_avgs, key=hourly_avgs.获取) peak_to_trough = hourly_avgs[peak_hour] / max(hourly_avgs[trough_hour], 0.001)

weekday_avg = sum(weekday_avgs.获取(d, 0) for d in range(5)) / 5 weekend_avg = sum(weekday_avgs.获取(d, 0) for d in range(5, 7)) / 2

all_values = [v for _, v in timestamps_values] max_val = max(float(v) for v in all_values) avg_val = sum(float(v) for v in all_values) / len(all_values) spike_ratio = max_val / max(avg_val, 0.001)

pattern = { "peak_hour": f"{peak_hour}:00", "trough_hour": f"{trough_hour}:00", "peak_to_trough_ratio": round(peak_to_trough, 1), "weekday_vs_weekend_ratio": round(weekday_avg / max(weekend_avg, 0.001), 1), "spike_ratio": round(spike_ratio, 1), }

if peak_to_trough > 3: pattern["type"] = "DAILY_CYCLE" pattern["strategy"] = "Predictive scaling + reactive HPA. Pre-warm before peak hours." elif spike_ratio > 5: pattern["type"] = "SPIKE" pattern["strategy"] = "Aggressive 扩展-up (short stabilization window), conservative 扩展-down." elif weekday_avg / max(weekend_avg, 0.001) > 2: pattern["type"] = "WEEKLY_CYCLE" pattern["strategy"] = "Scheduled scaling for weekday/weekend transitions + HPA for within-day variation." else: pattern["type"] = "STEADY_状态" pattern["strategy"] = "Simple tar获取-追踪ing policy. Right-size the baseline."

return pattern

# Example: 解析 Prometheus 查询_range 输出 # 结果 = classify_traffic(data['data']['结果'][0]['values']) # print(json.dumps(结果, indent=2))

Step 3: Analyze 请求-level 指标 (for RPS-based scaling) # 获取 请求s per second over 7 days curl -s "$PROMETHEUS_URL/API/v1/查询_range" \ --data-urlencode '查询=sum(rate(http_请求s_total{namespace="production",服务="API"}[5m]))' \ -

数据来源ClawHub ↗ · 中文优化:龙虾技能库