Detects anomalies in Modbus/TCP and RTU traffic in ICS/OT environments via Zeek, Suricata IDS, and Python with scapy/pymodbus for function codes, registers, timing, and unauthorized clients.
npx claudepluginhub killvxk/cybersecurity-skills-zhThis skill uses the workspace's default tool permissions.
- 在OT环境中部署Modbus专用入侵检测
Detects anomalies in Modbus/TCP and RTU traffic in ICS/OT using Zeek, Suricata, and Python for function code monitoring, register validation, timing analysis, and unauthorized clients.
Detects anomalies in Modbus/TCP and RTU traffic for ICS including invalid function codes, register ranges, timing deviations, unauthorized clients, and malformed frames using Zeek, Suricata, and Python.
Detects Modbus TCP/RTU command injection attacks in ICS environments by monitoring unauthorized writes, anomalous function codes, malformed frames, and baseline deviations using Scapy, Suricata, or Zeek.
Share bugs, ideas, or general feedback.
不适用于端到端保护Modbus通信(Modbus没有原生安全性;防火墙控制参见implementing-network-segmentation-for-ot)、非Modbus协议监控(多协议参见detecting-anomalies-in-industrial-control-systems),或对Modbus实现进行主动模糊测试(参见performing-plc-firmware-security-analysis)。
部署被动监控以捕获所有Modbus/TCP流量,并将其解析为结构化记录用于分析。
#!/usr/bin/env python3
"""Modbus Protocol Anomaly Detection System.
Monitors Modbus/TCP traffic for anomalies including unauthorized
function codes, unusual register access, timing deviations,
and rogue client devices.
"""
import json
import struct
import sys
import time
from collections import defaultdict, deque
from dataclasses import dataclass, field
from datetime import datetime
from statistics import mean, stdev
try:
from scapy.all import sniff, rdpcap, IP, TCP
except ImportError:
print("Install scapy: pip install scapy")
sys.exit(1)
MODBUS_FUNCTION_CODES = {
1: ("Read Coils", "read"),
2: ("Read Discrete Inputs", "read"),
3: ("Read Holding Registers", "read"),
4: ("Read Input Registers", "read"),
5: ("Write Single Coil", "write"),
6: ("Write Single Register", "write"),
7: ("Read Exception Status", "diagnostic"),
8: ("Diagnostics", "diagnostic"),
11: ("Get Comm Event Counter", "diagnostic"),
12: ("Get Comm Event Log", "diagnostic"),
15: ("Write Multiple Coils", "write"),
16: ("Write Multiple Registers", "write"),
17: ("Report Slave ID", "diagnostic"),
22: ("Mask Write Register", "write"),
23: ("Read/Write Multiple Registers", "read_write"),
43: ("Encapsulated Interface Transport", "diagnostic"),
}
@dataclass
class ModbusAnomaly:
timestamp: str
anomaly_type: str
severity: str
src_ip: str
dst_ip: str
unit_id: int
func_code: int
detail: str
mitre_technique: str = ""
@dataclass
class ModbusSession:
"""跟踪Modbus主站-从站会话状态。"""
src_ip: str
dst_ip: str
func_codes_seen: dict = field(default_factory=lambda: defaultdict(int))
register_ranges: set = field(default_factory=set)
intervals: list = field(default_factory=lambda: deque(maxlen=500))
last_timestamp: float = 0
request_count: int = 0
write_count: int = 0
class ModbusAnomalyDetector:
"""Detects anomalies in Modbus/TCP traffic."""
def __init__(self):
self.sessions = {}
self.baseline_sessions = {}
self.anomalies = []
self.authorized_clients = set()
self.authorized_func_codes = {} # 每会话允许的功能码
self.packet_count = 0
def set_authorized_clients(self, clients):
"""Set list of authorized Modbus client IPs."""
self.authorized_clients = set(clients)
def set_authorized_func_codes(self, session_key, func_codes):
"""Set allowed function codes for a specific session."""
self.authorized_func_codes[session_key] = set(func_codes)
def load_baseline(self, baseline_file):
"""Load baseline profiles from previous capture analysis."""
with open(baseline_file) as f:
baseline = json.load(f)
for key, data in baseline.get("modbus_baselines", {}).items():
self.baseline_sessions[key] = data
self.authorized_func_codes[key] = set(data.get("allowed_function_codes", []))
print(f"[*] 已加载 {len(self.baseline_sessions)} 个Modbus基线")
def process_packet(self, pkt):
"""Process a single packet for Modbus anomaly detection."""
if not pkt.haslayer(TCP) or not pkt.haslayer(IP):
return
# 检查Modbus/TCP(端口502)
if pkt[TCP].dport != 502 and pkt[TCP].sport != 502:
return
payload = bytes(pkt[TCP].payload)
if len(payload) < 8:
return
self.packet_count += 1
timestamp = float(pkt.time)
ts_str = datetime.fromtimestamp(timestamp).isoformat()
# 解析MBAP报头
try:
trans_id = struct.unpack(">H", payload[0:2])[0]
proto_id = struct.unpack(">H", payload[2:4])[0]
length = struct.unpack(">H", payload[4:6])[0]
unit_id = payload[6]
func_code = payload[7]
except (IndexError, struct.error):
return
# 确定方向
if pkt[TCP].dport == 502:
src_ip = pkt[IP].src
dst_ip = pkt[IP].dst
is_request = True
else:
src_ip = pkt[IP].dst
dst_ip = pkt[IP].src
is_request = False
if not is_request:
return # 仅分析请求
session_key = f"{src_ip}->{dst_ip}"
# 获取或创建会话
if session_key not in self.sessions:
self.sessions[session_key] = ModbusSession(src_ip=src_ip, dst_ip=dst_ip)
session = self.sessions[session_key]
session.request_count += 1
session.func_codes_seen[func_code] += 1
# ── 异常检测规则 ──
# 规则1:未授权Modbus客户端
if self.authorized_clients and src_ip not in self.authorized_clients:
self.anomalies.append(ModbusAnomaly(
timestamp=ts_str,
anomaly_type="UNAUTHORIZED_CLIENT",
severity="critical",
src_ip=src_ip, dst_ip=dst_ip,
unit_id=unit_id, func_code=func_code,
detail=f"来自未授权客户端 {src_ip} 的Modbus请求",
mitre_technique="T0886 - Remote Services",
))
# 规则2:未授权功能码
allowed_fcs = self.authorized_func_codes.get(session_key)
if allowed_fcs and func_code not in allowed_fcs:
fc_info = MODBUS_FUNCTION_CODES.get(func_code, (f"Unknown FC{func_code}", "unknown"))
severity = "critical" if fc_info[1] == "write" else "high"
self.anomalies.append(ModbusAnomaly(
timestamp=ts_str,
anomaly_type="UNAUTHORIZED_FUNCTION_CODE",
severity=severity,
src_ip=src_ip, dst_ip=dst_ip,
unit_id=unit_id, func_code=func_code,
detail=f"FC {func_code} ({fc_info[0]}) 不在白名单 {sorted(allowed_fcs)} 中",
mitre_technique="T0855 - Unauthorized Command Message",
))
# 规则3:写操作检测
if func_code in (5, 6, 15, 16, 22, 23):
session.write_count += 1
fc_name = MODBUS_FUNCTION_CODES.get(func_code, ("Unknown", ""))[0]
# 提取寄存器地址
if len(payload) >= 10:
register_addr = struct.unpack(">H", payload[8:10])[0]
session.register_ranges.add((func_code, register_addr))
self.anomalies.append(ModbusAnomaly(
timestamp=ts_str,
anomaly_type="WRITE_OPERATION",
severity="high",
src_ip=src_ip, dst_ip=dst_ip,
unit_id=unit_id, func_code=func_code,
detail=f"写入: {fc_name} 到寄存器 {register_addr} 来自 {src_ip}",
mitre_technique="T0836 - Modify Parameter",
))
# 规则4:时序异常
if session.last_timestamp > 0:
interval = (timestamp - session.last_timestamp) * 1000 # ms
session.intervals.append(interval)
baseline = self.baseline_sessions.get(session_key)
if baseline and len(session.intervals) > 10:
expected_interval = baseline.get("polling_interval_avg_sec", 0) * 1000
expected_std = baseline.get("polling_interval_stddev", 0) * 1000
if expected_std > 0:
z_score = abs(interval - expected_interval) / expected_std
if z_score > 5.0:
self.anomalies.append(ModbusAnomaly(
timestamp=ts_str,
anomaly_type="TIMING_ANOMALY",
severity="medium",
src_ip=src_ip, dst_ip=dst_ip,
unit_id=unit_id, func_code=func_code,
detail=(
f"间隔 {interval:.0f}ms 对比基线 "
f"{expected_interval:.0f}ms (z={z_score:.1f})"
),
mitre_technique="T0831 - Manipulation of Control",
))
# 规则5:协议违规 - 无效协议ID
if proto_id != 0:
self.anomalies.append(ModbusAnomaly(
timestamp=ts_str,
anomaly_type="PROTOCOL_VIOLATION",
severity="high",
src_ip=src_ip, dst_ip=dst_ip,
unit_id=unit_id, func_code=func_code,
detail=f"非标准协议ID {proto_id}(预期为0)",
mitre_technique="T0830 - Man in the Middle",
))
# 规则6:广播写入(单元ID 0)
if unit_id == 0 and func_code in (5, 6, 15, 16):
self.anomalies.append(ModbusAnomaly(
timestamp=ts_str,
anomaly_type="BROADCAST_WRITE",
severity="critical",
src_ip=src_ip, dst_ip=dst_ip,
unit_id=unit_id, func_code=func_code,
detail="广播写入命令(单元ID 0)影响所有从站",
mitre_technique="T0855 - Unauthorized Command Message",
))
session.last_timestamp = timestamp
def analyze_pcap(self, pcap_file):
"""Analyze pcap file for Modbus anomalies."""
print(f"[*] 正在分析 {pcap_file}...")
packets = rdpcap(pcap_file)
for pkt in packets:
self.process_packet(pkt)
print(f"[*] 已处理 {self.packet_count} 个Modbus数据包")
def generate_report(self):
"""Generate anomaly detection report."""
print(f"\n{'='*70}")
print("MODBUS协议异常检测报告")
print(f"{'='*70}")
print(f"分析的数据包数: {self.packet_count}")
print(f"跟踪的会话数: {len(self.sessions)}")
print(f"检测到的异常数: {len(self.anomalies)}")
severity_counts = defaultdict(int)
type_counts = defaultdict(int)
for a in self.anomalies:
severity_counts[a.severity] += 1
type_counts[a.anomaly_type] += 1
print(f"\n按严重级别:")
for sev in ["critical", "high", "medium", "low"]:
if severity_counts[sev]:
print(f" {sev.upper()}: {severity_counts[sev]}")
print(f"\n按类型:")
for atype, count in sorted(type_counts.items(), key=lambda x: -x[1]):
print(f" {atype}: {count}")
print(f"\n主要异常:")
for a in self.anomalies[:15]:
print(f" [{a.severity.upper()}] {a.anomaly_type}: {a.detail}")
if __name__ == "__main__":
detector = ModbusAnomalyDetector()
if len(sys.argv) > 1:
# 如果提供则加载基线
if len(sys.argv) > 2:
detector.load_baseline(sys.argv[2])
detector.analyze_pcap(sys.argv[1])
detector.generate_report()
else:
print("用法: python modbus_detector.py <pcap_file> [baseline.json]")
| 术语 | 定义 |
|---|---|
| Modbus/TCP | 运行在TCP端口502上的工业协议,由MBAP报头和包含功能码和数据的PDU组成 |
| 功能码(Function Code) | Modbus命令标识符(FC1-4:读取,FC5-6/15-16:写入,FC8:诊断),决定操作类型 |
| MBAP报头(MBAP Header) | Modbus应用协议报头,包含事务ID、协议ID(0x0000)、长度和单元ID |
| 单元ID(Unit ID) | 标识目标从站设备的Modbus地址(0-247);单元ID 0是对所有从站的广播 |
| 寄存器映射(Register Map) | Modbus寄存器地址到过程变量的供应商特定映射(如寄存器40001=反应器温度) |
| 功能码白名单(Function Code Allowlist) | 定义每个源IP到每个目标设备允许使用哪些Modbus功能码的安全策略 |
Modbus协议异常检测报告
==========================================
捕获周期: YYYY-MM-DD 至 YYYY-MM-DD
分析的数据包数: [N]
会话数: [N]
异常数: [N]
UNAUTHORIZED_CLIENT: [N]
UNAUTHORIZED_FUNCTION_CODE: [N]
WRITE_OPERATION: [N]
TIMING_ANOMALY: [N]
BROADCAST_WRITE: [N]