Detects attacks on SCADA/ICS systems via OT IDS, industrial protocol anomaly detection, and process data analysis using Python/Scapy baselines for Modbus/DNP3. Useful for ICS monitoring and attack investigations.
npx claudepluginhub killvxk/cybersecurity-skills-zhThis skill uses the workspace's default tool permissions.
- 首次在SCADA环境中部署入侵检测能力
Detects SCADA attacks like protocol MITM, PLC injections, HMI compromise, and DoS using OT IDS, industrial protocol anomaly detection, and process analytics.
Detects cyber attacks on SCADA systems including MITM on industrial protocols, PLC command injection, HMI compromise, and DoS using OT IDS, anomaly detection, and baselines. For OT monitoring deployment and investigations.
Detects anomalies in ICS/OT environments using ML baselines for SCADA polling, Modbus/DNP3/OPC UA traffic deviations, unauthorized devices, and process data correlation.
Share bugs, ideas, or general feedback.
不适用于检测不含SCADA/ICS组件的纯IT网络攻击、构建通用网络IDS规则(参见building-detection-rules-with-sigma),或确认攻击后的事件响应流程(参见performing-ot-incident-response)。
在检测异常之前,先确定正常SCADA流量的形态。工业协议具有高度确定性——同一主站以相同的时间间隔轮询同一从站,读取相同的寄存器。
#!/usr/bin/env python3
"""SCADA Communication Baseline Builder.
Analyzes OT network traffic to establish deterministic baselines for
Modbus/TCP, DNP3, EtherNet/IP, and S7comm communications.
"""
import json
import sys
from collections import defaultdict
from datetime import datetime
from statistics import mean, stdev
try:
from scapy.all import rdpcap, IP, TCP, UDP
except ImportError:
print("Install scapy: pip install scapy")
sys.exit(1)
MODBUS_FUNC_NAMES = {
1: "Read Coils", 2: "Read Discrete Inputs",
3: "Read Holding Registers", 4: "Read Input Registers",
5: "Write Single Coil", 6: "Write Single Register",
8: "Diagnostics", 15: "Write Multiple Coils",
16: "Write Multiple Registers", 17: "Report Slave ID",
22: "Mask Write Register", 23: "Read/Write Multiple Registers",
43: "Encapsulated Interface Transport",
}
class SCADABaselineBuilder:
"""Builds deterministic baselines from SCADA traffic captures."""
def __init__(self):
self.modbus_sessions = defaultdict(lambda: {
"func_codes": defaultdict(int),
"register_ranges": set(),
"intervals": [],
"last_seen": None,
"request_count": 0,
})
self.communication_pairs = defaultdict(lambda: {
"protocols": set(),
"packet_count": 0,
"first_seen": None,
"last_seen": None,
})
def process_pcap(self, pcap_file):
"""Process pcap file to build SCADA baselines."""
packets = rdpcap(pcap_file)
print(f"[*] 正在处理 {len(packets)} 个数据包以构建基线...")
for pkt in packets:
if not pkt.haslayer(IP):
continue
src = pkt[IP].src
dst = pkt[IP].dst
ts = float(pkt.time)
# 跟踪通信对
pair_key = f"{src}->{dst}"
pair = self.communication_pairs[pair_key]
pair["packet_count"] += 1
if pair["first_seen"] is None:
pair["first_seen"] = ts
pair["last_seen"] = ts
# 分析Modbus/TCP
if pkt.haslayer(TCP) and pkt[TCP].dport == 502:
self._analyze_modbus(pkt, src, dst, ts)
def _analyze_modbus(self, pkt, src, dst, timestamp):
"""Extract Modbus function codes and register ranges."""
payload = bytes(pkt[TCP].payload)
if len(payload) < 8:
return
# MBAP报头: transaction_id(2) + protocol_id(2) + length(2) + unit_id(1) + func_code(1)
func_code = payload[7]
session_key = f"{src}->{dst}"
session = self.modbus_sessions[session_key]
session["func_codes"][func_code] += 1
session["request_count"] += 1
session["protocols"] = {"Modbus/TCP"}
# 跟踪轮询间隔
if session["last_seen"] is not None:
interval = timestamp - session["last_seen"]
if 0.01 < interval < 60: # 合理的轮询间隔
session["intervals"].append(interval)
session["last_seen"] = timestamp
# 提取读写操作的寄存器范围
if len(payload) >= 12 and func_code in (1, 2, 3, 4, 5, 6, 15, 16):
start_register = (payload[8] << 8) | payload[9]
if func_code in (1, 2, 3, 4, 15, 16) and len(payload) >= 12:
count = (payload[10] << 8) | payload[11]
session["register_ranges"].add((func_code, start_register, start_register + count))
def generate_baseline(self):
"""Generate the baseline profile from collected data."""
baseline = {
"generated": datetime.now().isoformat(),
"modbus_baselines": {},
"communication_pairs": {},
}
for session_key, session in self.modbus_sessions.items():
avg_interval = mean(session["intervals"]) if session["intervals"] else 0
interval_std = stdev(session["intervals"]) if len(session["intervals"]) > 1 else 0
baseline["modbus_baselines"][session_key] = {
"allowed_function_codes": list(session["func_codes"].keys()),
"function_code_distribution": {
MODBUS_FUNC_NAMES.get(k, f"FC{k}"): v
for k, v in session["func_codes"].items()
},
"polling_interval_avg_sec": round(avg_interval, 3),
"polling_interval_stddev": round(interval_std, 3),
"register_ranges": [
{"func_code": r[0], "start": r[1], "end": r[2]}
for r in session["register_ranges"]
],
"total_requests": session["request_count"],
}
return baseline
def export_baseline(self, output_file):
"""Export baseline to JSON file."""
baseline = self.generate_baseline()
with open(output_file, "w") as f:
json.dump(baseline, f, indent=2)
print(f"[*] 基线已保存至: {output_file}")
# 打印摘要
print(f"\n{'='*60}")
print("SCADA 通信基线摘要")
print(f"{'='*60}")
for session, data in baseline["modbus_baselines"].items():
print(f"\n 会话: {session}")
print(f" 功能码: {data['allowed_function_codes']}")
print(f" 轮询间隔: {data['polling_interval_avg_sec']}s (+/- {data['polling_interval_stddev']}s)")
print(f" 寄存器范围数: {len(data['register_ranges'])}")
print(f" 请求总数: {data['total_requests']}")
if __name__ == "__main__":
if len(sys.argv) < 2:
print("用法: python scada_baseline.py <pcap_file> [output.json]")
sys.exit(1)
builder = SCADABaselineBuilder()
builder.process_pcap(sys.argv[1])
output = sys.argv[2] if len(sys.argv) > 2 else "scada_baseline.json"
builder.export_baseline(output)
为已知SCADA攻击模式创建检测规则,包括TRITON、Industroyer/CrashOverride和PIPEDREAM/INCONTROLLER使用的攻击模式。
# Suricata SCADA攻击检测规则
# 部署在监控OT网络SPAN端口的IDS传感器上
# --- Modbus攻击检测 ---
# 来自非工程师工作站的未授权Modbus写入
alert modbus any any -> $OT_PLC_SUBNET 502 (
msg:"OT-DETECT Modbus write from unauthorized source";
modbus_func:!read_coils; modbus_func:!read_discrete_inputs;
modbus_func:!read_holding_registers; modbus_func:!read_input_registers;
flow:to_server,established;
threshold:type both, track by_src, count 1, seconds 60;
classtype:attempted-admin;
sid:3000001; rev:1;
)
# Modbus诊断/重启命令(FC 8)— 潜在的PLC拒绝服务攻击
alert modbus any any -> $OT_PLC_SUBNET 502 (
msg:"OT-DETECT Modbus diagnostics command to PLC";
modbus_func:diagnostics;
flow:to_server,established;
classtype:attempted-dos;
sid:3000002; rev:1;
)
# Modbus广播写入(单元ID 0)— 影响所有从站
alert modbus any any -> $OT_PLC_SUBNET 502 (
msg:"OT-CRITICAL Modbus broadcast write command";
modbus_unit_id:0;
flow:to_server,established;
classtype:attempted-admin;
sid:3000003; rev:1;
priority:1;
)
# --- S7comm攻击检测(西门子)---
# S7comm CPU STOP命令 — 关闭PLC执行
alert tcp any any -> $SIEMENS_PLC_SUBNET 102 (
msg:"OT-CRITICAL S7comm CPU STOP command detected";
content:"|03 00|"; offset:0; depth:2;
content:"|29|"; offset:17; depth:1;
flow:to_server,established;
classtype:attempted-dos;
sid:3000010; rev:1;
priority:1;
)
# S7comm PLC程序上传(潜在逻辑修改)
alert tcp any any -> $SIEMENS_PLC_SUBNET 102 (
msg:"OT-CRITICAL S7comm program download to PLC";
content:"|03 00|"; offset:0; depth:2;
content:"|1a|"; offset:17; depth:1;
flow:to_server,established;
classtype:attempted-admin;
sid:3000011; rev:1;
priority:1;
)
# --- DNP3攻击检测 ---
# DNP3冷重启命令
alert tcp any any -> $OT_RTU_SUBNET 20000 (
msg:"OT-CRITICAL DNP3 cold restart command";
content:"|05 64|"; offset:0; depth:2;
content:"|0d|"; offset:12; depth:1;
flow:to_server,established;
classtype:attempted-dos;
sid:3000020; rev:1;
priority:1;
)
# DNP3固件更新命令 — 潜在的PIPEDREAM指标
alert tcp any any -> $OT_RTU_SUBNET 20000 (
msg:"OT-CRITICAL DNP3 file transfer / firmware update";
content:"|05 64|"; offset:0; depth:2;
content:"|19|"; offset:12; depth:1;
flow:to_server,established;
classtype:attempted-admin;
sid:3000021; rev:1;
priority:1;
)
# --- 网络异常检测 ---
# 与PLC通信的新设备(不在基线中)
alert ip !$AUTHORIZED_OT_HOSTS any -> $OT_PLC_SUBNET any (
msg:"OT-DETECT Unauthorized device communicating with PLC subnet";
flow:to_server;
threshold:type limit, track by_src, count 1, seconds 3600;
classtype:network-scan;
sid:3000030; rev:1;
)
# 针对OT协议的端口扫描
alert tcp any any -> $OT_NETWORK any (
msg:"OT-DETECT Port scan targeting industrial protocols";
flags:S;
threshold:type threshold, track by_src, count 10, seconds 60;
classtype:network-scan;
sid:3000031; rev:1;
)
监控历史数据服务器的物理过程数据,检测在向操作员隐藏影响的同时操控过程的攻击(Stuxnet攻击模式)。
#!/usr/bin/env python3
"""SCADA Process Data Anomaly Detector.
Monitors historian data to detect physical process anomalies
that may indicate cyber attacks manipulating control logic
while spoofing sensor readings (Stuxnet-style attacks).
"""
import json
import sys
import time
from collections import deque
from dataclasses import dataclass
from datetime import datetime
from statistics import mean, stdev
from typing import Optional
try:
import requests
except ImportError:
print("Install requests: pip install requests")
sys.exit(1)
@dataclass
class ProcessVariable:
"""Represents a monitored process variable."""
tag_name: str
description: str
unit: str
low_limit: float
high_limit: float
rate_of_change_limit: float # 每秒最大变化量
engineering_low: float
engineering_high: float
@dataclass
class Anomaly:
"""Represents a detected process anomaly."""
timestamp: str
tag_name: str
anomaly_type: str
severity: str
current_value: float
expected_range: str
description: str
attack_pattern: str = ""
class ProcessAnomalyDetector:
"""Detects anomalies in SCADA process data from historian."""
def __init__(self, historian_url, api_key=None):
self.historian_url = historian_url
self.api_key = api_key
self.variables = {}
self.history = defaultdict(lambda: deque(maxlen=1000))
self.anomalies = []
def add_variable(self, var: ProcessVariable):
"""Register a process variable to monitor."""
self.variables[var.tag_name] = var
def fetch_current_values(self):
"""Fetch current values from historian API."""
headers = {}
if self.api_key:
headers["Authorization"] = f"Bearer {self.api_key}"
tag_list = list(self.variables.keys())
params = {"tags": ",".join(tag_list), "count": 1}
try:
resp = requests.get(
f"{self.historian_url}/api/v1/streams/values/current",
params=params,
headers=headers,
timeout=10,
verify=False, # 许多OT历史数据服务器使用自签名证书
)
resp.raise_for_status()
return resp.json()
except requests.RequestException as e:
print(f"[ERROR] 历史数据服务器API错误: {e}")
return {}
def check_value(self, tag_name, value, timestamp):
"""Check a process variable value against all detection rules."""
var = self.variables.get(tag_name)
if not var:
return
self.history[tag_name].append((timestamp, value))
# 规则1:值超出工程限制
if value < var.engineering_low or value > var.engineering_high:
self.anomalies.append(Anomaly(
timestamp=timestamp,
tag_name=tag_name,
anomaly_type="OUT_OF_RANGE",
severity="critical",
current_value=value,
expected_range=f"{var.engineering_low}-{var.engineering_high} {var.unit}",
description=f"{tag_name} ({var.description}) 值为 {value} {var.unit} - 超出工程限制",
attack_pattern="过程操控 - 值被推至安全操作范围之外",
))
# 规则2:变化率超过物理限制
history = list(self.history[tag_name])
if len(history) >= 2:
prev_ts, prev_val = history[-2]
try:
dt = (datetime.fromisoformat(timestamp) - datetime.fromisoformat(prev_ts)).total_seconds()
if dt > 0:
rate = abs(value - prev_val) / dt
if rate > var.rate_of_change_limit:
self.anomalies.append(Anomaly(
timestamp=timestamp,
tag_name=tag_name,
anomaly_type="RATE_OF_CHANGE_VIOLATION",
severity="high",
current_value=value,
expected_range=f"最大变化率: {var.rate_of_change_limit} {var.unit}/s",
description=(
f"{tag_name} 变化率为 {rate:.2f} {var.unit}/s "
f"(限制: {var.rate_of_change_limit} {var.unit}/s)"
),
attack_pattern="可能的传感器欺骗或执行器操控",
))
except (ValueError, TypeError):
pass
# 规则3:平线检测(过程活动时传感器读数未变化)
if len(history) >= 20:
recent_values = [v for _, v in list(history)[-20:]]
if len(set(recent_values)) == 1:
self.anomalies.append(Anomaly(
timestamp=timestamp,
tag_name=tag_name,
anomaly_type="FLATLINE_DETECTED",
severity="high",
current_value=value,
expected_range="活跃过程期间预期有变化",
description=f"{tag_name} 在 {value} 处平线持续20+个连续读数",
attack_pattern="Stuxnet式回放攻击 - 过程被操控时传感器值被冻结",
))
# 规则4:统计异常(基于z分数)
if len(history) >= 50:
values = [v for _, v in list(history)[-50:]]
avg = mean(values)
std = stdev(values) if len(values) > 1 else 0
if std > 0:
z_score = abs(value - avg) / std
if z_score > 3.5:
self.anomalies.append(Anomaly(
timestamp=timestamp,
tag_name=tag_name,
anomaly_type="STATISTICAL_ANOMALY",
severity="medium",
current_value=value,
expected_range=f"均值: {avg:.2f}, 标准差: {std:.2f} (z={z_score:.1f})",
description=f"{tag_name} 值 {value} 偏离均值 {z_score:.1f} 个标准差",
attack_pattern="可能的渐进式过程操控",
))
def report_anomalies(self):
"""Print detected anomalies."""
if not self.anomalies:
print("[*] 未检测到异常")
return
print(f"\n{'='*70}")
print(f"过程异常检测报告 - {len(self.anomalies)} 个异常")
print(f"{'='*70}")
for a in self.anomalies:
print(f"\n [{a.severity.upper()}] {a.anomaly_type}")
print(f" 时间: {a.timestamp}")
print(f" 标签: {a.tag_name}")
print(f" 值: {a.current_value}")
print(f" 预期: {a.expected_range}")
print(f" 详情: {a.description}")
if a.attack_pattern:
print(f" 攻击模式: {a.attack_pattern}")
if __name__ == "__main__":
from collections import defaultdict
detector = ProcessAnomalyDetector(
historian_url="https://10.30.1.50:5450",
)
# 为化学反应堆定义被监控的过程变量
detector.add_variable(ProcessVariable(
tag_name="REACTOR_01.TEMP",
description="Reactor 1 Temperature",
unit="C",
low_limit=150, high_limit=280,
rate_of_change_limit=5.0,
engineering_low=100, engineering_high=350,
))
detector.add_variable(ProcessVariable(
tag_name="REACTOR_01.PRESSURE",
description="Reactor 1 Pressure",
unit="bar",
low_limit=2.0, high_limit=8.0,
rate_of_change_limit=0.5,
engineering_low=0, engineering_high=12.0,
))
detector.add_variable(ProcessVariable(
tag_name="PUMP_03.FLOW",
description="Feed Pump 3 Flow Rate",
unit="m3/h",
low_limit=5.0, high_limit=25.0,
rate_of_change_limit=2.0,
engineering_low=0, engineering_high=30.0,
))
print("[*] 正在启动过程异常监控...")
print("[*] 按Ctrl+C停止并生成报告")
try:
while True:
data = detector.fetch_current_values()
for item in data.get("items", []):
detector.check_value(
item.get("tag"),
item.get("value"),
item.get("timestamp", datetime.now().isoformat()),
)
time.sleep(5)
except KeyboardInterrupt:
detector.report_anomalies()
监控与已知ICS定向恶意软件家族相关的失陷指标(IOC)。
# 已知ICS恶意软件检测特征
# 参考: MITRE ATT&CK for ICS, CISA ICS-CERT安全公告
malware_families:
TRITON_TRISIS:
description: "针对施耐德电气Triconex安全仪表系统(SIS)"
target: "安全控制器(SIS)"
network_indicators:
- protocol: "TriStation"
port: 1502
pattern: "来自非工程工作站的异常TriStation命令"
- protocol: "TCP"
pattern: "未授权IP连接到Triconex控制器"
host_indicators:
- "工程工作站上存在trilog.exe"
- "System32目录中有inject.bin"
- "针对Triconex固件的imain.bin载荷"
detection_rule: |
alert tcp !$SIS_ENGINEERING_WS any -> $SIS_CONTROLLERS 1502 (
msg:"OT-CRITICAL Unauthorized TriStation connection to SIS";
flow:to_server; sid:3000100; rev:1; priority:1;)
INDUSTROYER_CRASHOVERRIDE:
description: "通过IEC 60870-5-101/104、IEC 61850、OPC DA针对电网SCADA"
target: "电网变电站和SCADA"
network_indicators:
- protocol: "IEC 60870-5-104"
port: 2404
pattern: "超出正常轮询的快速控制命令序列"
- protocol: "OPC DA"
pattern: "OPC服务器枚举后跟写入命令"
host_indicators:
- "haslo.exe(后门启动器)"
- "61850.dll(IEC 61850攻击模块)"
- "OPC.dll(OPC DA攻击模块)"
- "104.dll(IEC 104攻击模块)"
detection_rule: |
alert tcp any any -> $SUBSTATION_RTU 2404 (
msg:"OT-CRITICAL Rapid IEC 104 control commands - Industroyer pattern";
flow:to_server,established;
threshold:type threshold, track by_src, count 50, seconds 10;
sid:3000110; rev:1; priority:1;)
PIPEDREAM_INCONTROLLER:
description: "针对施耐德/欧姆龙PLC和OPC UA的模块化ICS攻击框架"
target: "多PLC厂商(施耐德、欧姆龙)和OPC UA服务器"
network_indicators:
- protocol: "CODESYS"
port: 1217
pattern: "CODESYS运行时漏洞利用尝试"
- protocol: "OPC UA"
port: 4840
pattern: "OPC UA服务器枚举和未授权方法调用"
- protocol: "Modbus"
port: 502
pattern: "向多个单元ID快速发送Modbus写入命令"
host_indicators:
- "用于OPC UA扫描的TAGRUN工具"
- "用于CODESYS利用的CODECALL工具"
- "用于欧姆龙PLC交互的OMSHELL工具"
detection_rule: |
alert tcp any any -> $OT_NETWORK 1217 (
msg:"OT-CRITICAL CODESYS runtime connection - PIPEDREAM indicator";
flow:to_server,established;
sid:3000120; rev:1; priority:1;)
| 术语 | 定义 |
|---|---|
| SCADA | 数据采集与监控(Supervisory Control and Data Acquisition)——通过RTU和通信基础设施远程监控和控制工业过程的架构 |
| OT专用IDS/IPS | 针对工业协议设计的入侵检测/防御系统,同时使用基于特征和基于异常的检测方法 |
| 过程异常(Process Anomaly) | 物理过程行为(温度、压力、流量)的偏差,可能表明控制系统遭受网络操控 |
| 中间人攻击(Man-in-the-Middle) | 拦截SCADA主站与现场设备之间通信以修改命令或欺骗传感器读数的攻击 |
| 回放攻击(Replay Attack) | 捕获合法SCADA流量并重放以掩盖对过程恶意更改的攻击(Stuxnet使用的技术) |
| 协议异常(Protocol Anomaly) | 与预期工业协议行为的偏差,包括未授权功能码、异常轮询模式或命令序列 |
场景背景:OT安全监控系统对一个不是已授权SIS工程工作站的IP地址发往Triconex安全控制器的异常TriStation协议流量发出告警。
方法:
常见陷阱:永远不要假设SIS流量异常是误报——TRITON证明了老练的攻击者会专门针对安全系统。在验证固件和逻辑完整性之前不要重启SIS控制器。避免仅向IT SOC告警;任何SIS相关事件都必须立即让过程安全团队参与。
SCADA攻击检测报告
===============================
检测时间: YYYY-MM-DD HH:MM:SS UTC
检测来源: [IDS/异常检测器/过程监控]
告警详情:
告警ID: [唯一标识符]
严重级别: 严重/高/中/低
攻击类别: [协议异常/过程操控/未授权访问]
MITRE ATT&CK for ICS: [技术ID和名称]
来源: [IP/主机名]
目标: [IP/主机名 - 设备类型]
协议: [Modbus/DNP3/S7comm/等]
详情: [具体发现描述]
基线对比:
正常: [预期行为]
观测: [触发告警的实际行为]
偏差: [观测值与基线的差异]
推荐响应:
1. [立即遏制行动]
2. [验证步骤]
3. [上报路径]