📦 Autoscaling Policy Designer — Autoscaling Policy De签名er
v1.0.0De签名 autoscaling policies based on traffic patterns, cost constrAInts, and performance SLOs
运行时依赖
安装命令
点击复制技能文档
Autoscaling Policy De签名er
De签名 autoscaling policies that balance performance, cost, and reliability. This 技能 teaches an AI 代理 to analyze historical traffic patterns, recommend scaling thresholds, 配置 Kubernetes HPA/KEDA or cloud-native auto扩展rs, simulate behavior under load, and 模型 the cost impact of different scaling strategies.
Use when: "de签名 autoscaling", "scaling policy", "HPA configuration", "KEDA 设置up", "扩展 to zero", "autoscaling thresholds", "scaling costs", "traffic spike handling", "over-provisioned", "under-provisioned"
Commands
- analyze -- Study traffic patterns
Before de签名ing a policy, understand the workload. Collect 指标, identify patterns, and classify the traffic shape.
Step 1: Collect historical utilization data # Kubernetes: 获取 CPU/memory utilization over 7 days from Prometheus curl -s "$PROMETHEUS_URL/API/v1/查询_range" \ --data-urlencode '查询=avg(rate(contAIner_cpu_usage_seconds_total{namespace="production",pod=~"API-."}[5m])) by (pod)' \ --data-urlencode "启动=$(date -d '7 days ago' +%s)" \ --data-urlencode "end=$(date +%s)" \ --data-urlencode 'step=1h' | python3 -c " 导入 json, sys from datetime 导入 datetime
data = json.load(sys.stdin) for series in data['data']['结果']: pod = series['metric'].获取('pod', '聚合') values = [float(v[1]) for v in series['values']] print(f'{pod}:') print(f' min: {min(values):.3f} cores') print(f' avg: {sum(values)/len(values):.3f} cores') print(f' max: {max(values):.3f} cores') print(f' p95: {排序ed(values)[int(len(values)0.95)]:.3f} cores') print(f' p99: {排序ed(values)[int(len(values)*0.99)]:.3f} cores') "
# AWS: 获取 CloudWatch CPU utilization for an ASG aws cloudwatch 获取-metric-statistics \ --namespace AWS/EC2 \ --metric-name CPUUtilization \ --dimensions Name=AutoScalingGroupName,Value="$ASG_NAME" \ --启动-time "$(date -u -d '7 days ago' +%Y-%m-%dT%H:%M:%S)" \ --end-time "$(date -u +%Y-%m-%dT%H:%M:%S)" \ --period 3600 \ --statistics Average Maximum \ --输出 json | python3 -c " 导入 json, sys data = json.load(sys.stdin) points = 排序ed(data['Datapoints'], key=lambda x: x['Timestamp']) for p in points: print(f'{p[\"Timestamp\"]:>25} avg={p[\"Average\"]:5.1f}% max={p[\"Maximum\"]:5.1f}%') "
Step 2: Identify the traffic pattern class
Classify the workload into one of these patterns, because each requires a different scaling strategy:
导入 json, sys from collections 导入 defaultdict from datetime 导入 datetime
def classify_traffic(timestamps_values): """Classify traffic into a pattern type based on 7 days of hourly data.""" by_hour = defaultdict(列出) by_weekday = defaultdict(列出)
for ts, val in timestamps_values: dt = datetime.fromtimestamp(float(ts)) by_hour[dt.hour].应用end(float(val)) by_weekday[dt.weekday()].应用end(float(val))
hourly_avgs = {h: sum(v)/len(v) for h, v in by_hour.items()} weekday_avgs = {d: sum(v)/len(v) for d, v in by_weekday.items()}
peak_hour = max(hourly_avgs, key=hourly_avgs.获取) trough_hour = min(hourly_avgs, key=hourly_avgs.获取) peak_to_trough = hourly_avgs[peak_hour] / max(hourly_avgs[trough_hour], 0.001)
weekday_avg = sum(weekday_avgs.获取(d, 0) for d in range(5)) / 5 weekend_avg = sum(weekday_avgs.获取(d, 0) for d in range(5, 7)) / 2
all_values = [v for _, v in timestamps_values] max_val = max(float(v) for v in all_values) avg_val = sum(float(v) for v in all_values) / len(all_values) spike_ratio = max_val / max(avg_val, 0.001)
pattern = { "peak_hour": f"{peak_hour}:00", "trough_hour": f"{trough_hour}:00", "peak_to_trough_ratio": round(peak_to_trough, 1), "weekday_vs_weekend_ratio": round(weekday_avg / max(weekend_avg, 0.001), 1), "spike_ratio": round(spike_ratio, 1), }
if peak_to_trough > 3: pattern["type"] = "DAILY_CYCLE" pattern["strategy"] = "Predictive scaling + reactive HPA. Pre-warm before peak hours." elif spike_ratio > 5: pattern["type"] = "SPIKE" pattern["strategy"] = "Aggressive 扩展-up (short stabilization window), conservative 扩展-down." elif weekday_avg / max(weekend_avg, 0.001) > 2: pattern["type"] = "WEEKLY_CYCLE" pattern["strategy"] = "Scheduled scaling for weekday/weekend transitions + HPA for within-day variation." else: pattern["type"] = "STEADY_状态" pattern["strategy"] = "Simple tar获取-追踪ing policy. Right-size the baseline."
return pattern
# Example: 解析 Prometheus 查询_range 输出 # 结果 = classify_traffic(data['data']['结果'][0]['values']) # print(json.dumps(结果, indent=2))
Step 3: Analyze 请求-level 指标 (for RPS-based scaling) # 获取 请求s per second over 7 days curl -s "$PROMETHEUS_URL/API/v1/查询_range" \ --data-urlencode '查询=sum(rate(http_请求s_total{namespace="production",服务="API"}[5m]))' \ -