Suricata 监控器将 Suricata 的 eve.json 警报转换为可执行的威胁报告。它读取本地 Suricata 日志,根据严重性对发现进行排名,显示攻击者 IP 和顶级签名,并提供结构化的简报以供采取行动或转发到 Telegram。
隐私:仅读取 /var/log/suricata/eve.json,没有数据离开您的机器。
单个 SKILL.md —— 检查每一行。
前提条件:Suricata 必须正在运行并写入 eve.json:
# 验证日志存在并被写入
ls -lh /var/log/suricata/eve.json
tail -5 /var/log/suricata/eve.json | python3 -m json.tool
如果日志被拒绝访问:
sudo chmod 644 /var/log/suricata/eve.json
工作流程
import json
from datetime import datetime, timedelta
LOG = "/var/log/suricata/eve.json"
HOURS = 24
截止时间 = (datetime.now() - timedelta(hours=HOURS)).timestamp()
警报 = []
with open(LOG) as f:
for 行 in f:
try:
事件 = json.loads(行)
if 事件.get("event_type") != "alert":
continue
ts = datetime.fromisoformat(事件["timestamp"][:19]).timestamp()
if ts < 截止时间:
continue
警报.append({
"time": 事件["timestamp"][:19],
"severity": 事件["alert"].get("severity", 99),
"sig": 事件["alert"].get("signature", "unknown"),
"category": 事件["alert"].get("category", ""),
"src_ip": 事件.get("src_ip", "?"),
"dest_ip": 事件.get("dest_ip", "?"),
"dest_port": 事件.get("dest_port", "?"),
"proto": 事件.get("proto", "?"),
})
except (json.JSONDecodeError, KeyError, ValueError):
continue
from collections import Counter
# 顶级攻击者 IP
攻击者 IP = Counter(a["src_ip"] for a in 警报 if a["severity"] <= 2)
# 顶级签名
顶级签名 = Counter(a["sig"] for a in 警报).most_common(5)
# 按严重性
严重 = [a for a in 警报 if a["severity"] == 1]
高 = [a for a in 警报 if a["severity"] == 2]
中 = [a for a in 警报 if a["severity"] == 3]
低 = [a for a in 警报 if a["severity"] >= 4]
Suricata 威胁报告 —— YYYY-MM-DD HH:MM (最后 N 小时)
警报:X 总数 | 严重:X 高:X 中:X 低:X
顶级攻击者 IP
- [IP] —— X 次(阻止:sudo ufw deny from [IP])
- ...
顶级签名
严重警报(严重性 1)
[HH:MM] [src_ip] → [dest_ip]:[port] [签名]
高警报(严重性 2)—— 样本
[HH:MM] [src_ip] → [dest_ip]:[port] [签名]
推荐操作
[ ] 阻止顶级攻击者 IP:sudo ufw deny from [IP1] sudo ufw deny from [IP2]
[ ] 查看 Suricata 规则:[顶级签名]
[ ] 如果红色姿态:运行 eva-security-audit 技能
状态:[绿色/黄色/红色]
绿色 = 0 严重,<10 高在 24 小时内
黄色 = 1-5 严重 OR 10-50 高
红色 = >5 严重 OR >50 高 OR C2/利用签名检测
- 交付到 Telegram(使用 telegram-notifier 技能):
# 在构建报告字符串后:
import os, requests
requests.post(
f"https://api.telegram.org/bot{os.environ['TELEGRAM_BOT_TOKEN']}/sendMessage",
json={"chat_id": os.environ['TELEGRAM_CHAT_ID'], "text": 报告},
timeout=10
)
到内存:echo "[报告]" >> 内存/$(date +%Y-%m-%d).md
安排每日监控
openclaw cron add \
--name "suricata-monitor:daily" \
--cron "0 7
*" \
--prompt "运行 suricata-monitor 技能。回顾 24 小时。发送报告到 Telegram 并追加到今天的内存文件。"
签名类别要监控
类别 严重性 操作
利用工具包活动 1 立即阻止 IP
恶意软件 C2 1 隔离受影响的主机
端口扫描 2 监控,考虑阻止
策略违规 3 查看,记录
信息 4+ 仅记录
快速命令
# 计数今天的警报按严重性
cat /var/log/suricata/eve.json | python3 -c "
导入 sys, json
从 collections 导入 Counter
sev = Counter()
对于 sys.stdin 中的行:
try:
e = json.loads(行)
if e.get('event_type') == 'alert':
sev[e['alert'].get('severity','?')] += 1
except:
pass
对于 sorted(sev.items()) 中的 k,v:
print(f'严重性 {k}: {v}')
"
# 最近 24 小时的顶级攻击者 IP
cat /var/log/suricata/eve.json | python3 -c "
导入 sys, json
从 collections 导入 Counter
从 datetime 导入 datetime, timedelta
截止时间 = (datetime.now()-timedelta(hours=24)).timestamp()
ips = []
对于 sys.stdin 中的行:
try:
e = json.loads(行)
if e.get('event_type')=='alert':
ts = datetime.fromisoformat(e['timestamp'][:19]).timestamp()
if ts > 截止时间:
ips.append(e.get('src_ip','?'))
except:
pass
对于 Counter(ips).most_common(10) 中的 ip,n:
print(f'{n:5} {ip}')