Detects anomalies in ICS/OT environments using ML baselines for SCADA polling, Modbus/DNP3/OPC UA traffic deviations, unauthorized devices, and process data correlation.
npx claudepluginhub killvxk/cybersecurity-skills-zhThis skill uses the workspace's default tool permissions.
- 为缺乏入侵检测的OT环境部署持续监控
Deploys ML anomaly detection for OT/ICS networks, profiling SCADA polling in Modbus/DNP3/OPC UA, spotting deviations/rogue devices, and correlating with process historians.
Deploys ML-based anomaly detection for ICS/OT networks profiling SCADA polling, Modbus/DNP3/OPC UA traffic deviations, rogue devices, and process data correlations.
Detects attacks on SCADA/ICS systems via OT IDS, industrial protocol anomaly detection, and process data analysis using Python/Scapy baselines for Modbus/DNP3. Useful for ICS monitoring and attack investigations.
Share bugs, ideas, or general feedback.
不适用于基于特征的已知漏洞利用检测(参见detecting-attacks-on-scada-systems)、不含OT协议的IT网络异常检测,或替代过程安全系统(SIS)。
从多个维度捕获并建模ICS通信的确定性行为:时序、协议行为和网络拓扑。
#!/usr/bin/env python3
"""ICS Anomaly Detection System.
Builds multi-dimensional baselines from OT network traffic and
detects anomalies using statistical and machine learning methods.
Designed for deterministic SCADA communication patterns.
"""
import json
import sys
import time
import warnings
from collections import defaultdict
from datetime import datetime, timedelta
from dataclasses import dataclass, field
import numpy as np
import pandas as pd
from sklearn.ensemble import IsolationForest
from sklearn.preprocessing import StandardScaler
warnings.filterwarnings("ignore")
@dataclass
class CommunicationProfile:
"""Profile for a single master-slave communication pair."""
src_ip: str
dst_ip: str
protocol: str
port: int
avg_interval_ms: float = 0.0
std_interval_ms: float = 0.0
avg_payload_size: float = 0.0
function_codes: dict = field(default_factory=dict)
packets_per_minute: float = 0.0
first_seen: str = ""
last_seen: str = ""
class ICSAnomalyDetector:
"""Multi-dimensional anomaly detection for ICS environments."""
def __init__(self):
self.profiles = {}
self.topology_baseline = set()
self.timing_model = None
self.isolation_forest = None
self.scaler = StandardScaler()
self.anomalies = []
self.training_data = []
def build_baseline_from_pcap(self, pcap_data):
"""Build baselines from parsed pcap data (list of flow records)."""
print("[*] 正在构建ICS通信基线...")
for flow in pcap_data:
key = f"{flow['src']}->{flow['dst']}:{flow['port']}"
if key not in self.profiles:
self.profiles[key] = CommunicationProfile(
src_ip=flow["src"],
dst_ip=flow["dst"],
protocol=flow.get("protocol", "TCP"),
port=flow["port"],
first_seen=flow.get("timestamp", ""),
)
profile = self.profiles[key]
profile.last_seen = flow.get("timestamp", "")
# 跟踪工业协议的功能码
fc = flow.get("function_code")
if fc is not None:
profile.function_codes[fc] = profile.function_codes.get(fc, 0) + 1
# 添加到拓扑基线
self.topology_baseline.add((flow["src"], flow["dst"], flow["port"]))
# 计算时序统计信息
self._calculate_timing_stats(pcap_data)
print(f" 通信对数量: {len(self.profiles)}")
print(f" 拓扑条目数: {len(self.topology_baseline)}")
def _calculate_timing_stats(self, flows):
"""Calculate packet timing statistics per communication pair."""
timestamps = defaultdict(list)
for flow in flows:
key = f"{flow['src']}->{flow['dst']}:{flow['port']}"
ts = flow.get("timestamp_epoch")
if ts:
timestamps[key].append(ts)
for key, ts_list in timestamps.items():
if key in self.profiles and len(ts_list) > 1:
ts_sorted = sorted(ts_list)
intervals = [
(ts_sorted[i+1] - ts_sorted[i]) * 1000
for i in range(len(ts_sorted) - 1)
]
self.profiles[key].avg_interval_ms = np.mean(intervals)
self.profiles[key].std_interval_ms = np.std(intervals)
duration_min = (ts_sorted[-1] - ts_sorted[0]) / 60
if duration_min > 0:
self.profiles[key].packets_per_minute = len(ts_list) / duration_min
def train_isolation_forest(self, features_df):
"""Train Isolation Forest model on feature vectors from baseline traffic."""
print("[*] 正在训练孤立森林(Isolation Forest)模型...")
feature_cols = [
"interval_ms", "payload_size", "packets_per_window",
"unique_func_codes", "new_connection_flag",
]
available_cols = [c for c in feature_cols if c in features_df.columns]
X = features_df[available_cols].fillna(0).values
X_scaled = self.scaler.fit_transform(X)
self.isolation_forest = IsolationForest(
n_estimators=200,
contamination=0.01, # 预期基线中1%的异常率
random_state=42,
n_jobs=-1,
)
self.isolation_forest.fit(X_scaled)
scores = self.isolation_forest.decision_function(X_scaled)
print(f" 模型训练样本数: {len(X)}")
print(f" 异常分数范围: [{scores.min():.4f}, {scores.max():.4f}]")
print(f" 阈值: {np.percentile(scores, 1):.4f}")
def detect_topology_anomaly(self, src_ip, dst_ip, port):
"""Detect new/unauthorized communication pairs."""
if (src_ip, dst_ip, port) not in self.topology_baseline:
return {
"type": "NEW_COMMUNICATION_PAIR",
"severity": "high",
"detail": f"新连接: {src_ip} -> {dst_ip}:{port} 不在基线中",
"recommendation": "验证是否为已授权的新设备或配置变更",
}
return None
def detect_timing_anomaly(self, src_ip, dst_ip, port, interval_ms):
"""Detect polling interval deviations."""
key = f"{src_ip}->{dst_ip}:{port}"
profile = self.profiles.get(key)
if profile and profile.std_interval_ms > 0:
z_score = abs(interval_ms - profile.avg_interval_ms) / profile.std_interval_ms
if z_score > 4.0:
return {
"type": "TIMING_ANOMALY",
"severity": "medium",
"detail": (
f"间隔 {interval_ms:.1f}ms 偏离基线 "
f"{profile.avg_interval_ms:.1f}ms (z分数: {z_score:.1f})"
),
"recommendation": "检查网络拥塞、设备故障或中间人攻击",
}
return None
def detect_function_code_anomaly(self, src_ip, dst_ip, port, func_code):
"""Detect unauthorized Modbus/DNP3 function codes."""
key = f"{src_ip}->{dst_ip}:{port}"
profile = self.profiles.get(key)
if profile and func_code not in profile.function_codes:
severity = "critical" if func_code in {5, 6, 15, 16, 8} else "high"
return {
"type": "UNAUTHORIZED_FUNCTION_CODE",
"severity": severity,
"detail": (
f"来自 {src_ip} 到 {dst_ip}:{port} 的功能码 {func_code} "
f"不在基线中。允许的功能码: {list(profile.function_codes.keys())}"
),
"recommendation": "调查来源 - 可能是命令注入攻击",
}
return None
def analyze_flow(self, flow):
"""Analyze a single network flow against all detection models."""
results = []
# 拓扑检查
topo = self.detect_topology_anomaly(flow["src"], flow["dst"], flow["port"])
if topo:
results.append(topo)
# 时序检查
if "interval_ms" in flow:
timing = self.detect_timing_anomaly(
flow["src"], flow["dst"], flow["port"], flow["interval_ms"])
if timing:
results.append(timing)
# 功能码检查
if "function_code" in flow:
fc = self.detect_function_code_anomaly(
flow["src"], flow["dst"], flow["port"], flow["function_code"])
if fc:
results.append(fc)
self.anomalies.extend(results)
return results
def generate_report(self):
"""Generate anomaly detection report."""
print(f"\n{'='*60}")
print(f"ICS 异常检测报告")
print(f"{'='*60}")
print(f"基线配置文件数: {len(self.profiles)}")
print(f"检测到的异常数: {len(self.anomalies)}")
severity_counts = defaultdict(int)
for a in self.anomalies:
severity_counts[a["severity"]] += 1
for sev in ["critical", "high", "medium", "low"]:
if severity_counts[sev]:
print(f" {sev.upper()}: {severity_counts[sev]}")
for a in self.anomalies[:20]:
print(f"\n [{a['severity'].upper()}] {a['type']}")
print(f" {a['detail']}")
if __name__ == "__main__":
print("ICS 异常检测系统")
print("加载基线数据并调用 analyze_flow() 进行实时检测")
| 术语 | 定义 |
|---|---|
| 确定性流量(Deterministic Traffic) | ICS网络表现出高度可预测的通信模式,相同的主站以固定时间间隔、相同的功能码轮询相同的从站 |
| 孤立森林(Isolation Forest) | 通过随机划分特征空间来隔离异常的无监督机器学习算法,适用于低异常率的OT流量 |
| 轮询间隔(Polling Interval) | SCADA主站连续向从站设备发送请求之间的时间间隔,通常固定且可配置(100ms至10s) |
| 功能码白名单(Function Code Allowlist) | 每个通信对允许使用的工业协议操作集合,由异常检测规则强制执行 |
| 拓扑基线(Topology Baseline) | OT网络中所有已授权设备间通信路径的完整映射 |
| 基于物理的检测(Physics-Based Detection) | 使用物理过程模型(热力学、流体动力学)检测在欺骗传感器数据的同时操控过程的攻击 |
ICS 异常检测报告
==============================
检测周期: YYYY-MM-DD 至 YYYY-MM-DD
基线规模: [N] 个通信配置文件
检测到的异常数: [N]
严重: [N] 高: [N] 中: [N] 低: [N]
[严重级别] 异常类型
来源: [IP] -> 目标: [IP]:[端口]
详情: [偏离基线的描述]
基线: [预期行为]
观测: [实际行为]