Detects Modbus TCP/RTU command injection attacks in ICS environments by monitoring unauthorized writes, anomalous function codes, malformed frames, and baseline deviations using Scapy, Suricata, or Zeek.
npx claudepluginhub killvxk/cybersecurity-skills-zhThis skill uses the workspace's default tool permissions.
- 为使用Modbus TCP(端口502)或Modbus RTU的环境部署入侵检测
Detects Modbus TCP/RTU command injection attacks in ICS by monitoring unauthorized writes, anomalous function codes, malformed frames, and baseline deviations with Python/Scapy and IDS tools.
Detects Modbus command injection attacks in ICS environments by monitoring TCP/RTU traffic for unauthorized writes, anomalous function codes, malformed frames, and baseline deviations using Python/Scapy and IDS like Suricata/Zeek.
Detects anomalies in Modbus/TCP and RTU traffic in ICS/OT environments via Zeek, Suricata IDS, and Python with scapy/pymodbus for function codes, registers, timing, and unauthorized clients.
Share bugs, ideas, or general feedback.
不适用于检测非Modbus协议的攻击(DNP3相关参见detecting-dnp3-protocol-anomalies)、一般IT网络入侵检测,或Modbus设备配置(参见performing-ot-vulnerability-scanning-safely)。
捕获并分析正常Modbus流量,确定合法通信模式的构成。
#!/usr/bin/env python3
"""Modbus Command Injection Detector.
Monitors Modbus TCP traffic for unauthorized write operations, anomalous
function codes, and deviations from established communication baselines.
Detects attacks like FrostyGoop that use Modbus TCP for operational impact.
"""
import json
import struct
import sys
import time
from collections import defaultdict
from datetime import datetime
from typing import Dict, List, Optional, Set, Tuple
try:
from scapy.all import sniff, IP, TCP
except ImportError:
print("Install scapy: pip install scapy")
sys.exit(1)
# Modbus功能码定义
MODBUS_READ_FUNCTIONS = {1, 2, 3, 4}
MODBUS_WRITE_FUNCTIONS = {5, 6, 15, 16}
MODBUS_DIAGNOSTIC_FUNCTIONS = {8, 17, 43}
MODBUS_FUNC_NAMES = {
1: "Read Coils", 2: "Read Discrete Inputs",
3: "Read Holding Registers", 4: "Read Input Registers",
5: "Write Single Coil", 6: "Write Single Register",
8: "Diagnostics", 15: "Write Multiple Coils",
16: "Write Multiple Registers", 17: "Report Slave ID",
22: "Mask Write Register", 23: "Read/Write Multiple Registers",
43: "Encapsulated Interface Transport",
}
class ModbusAlert:
"""Represents a detected Modbus anomaly."""
def __init__(self, severity: str, alert_type: str, src_ip: str,
dst_ip: str, unit_id: int, func_code: int,
description: str, mitre_technique: str = ""):
self.timestamp = datetime.now().isoformat()
self.severity = severity
self.alert_type = alert_type
self.src_ip = src_ip
self.dst_ip = dst_ip
self.unit_id = unit_id
self.func_code = func_code
self.func_name = MODBUS_FUNC_NAMES.get(func_code, f"Unknown FC {func_code}")
self.description = description
self.mitre_technique = mitre_technique
def __str__(self):
return (
f"[{self.severity}] {self.alert_type} | {self.src_ip} -> {self.dst_ip} "
f"| 单元 {self.unit_id} | {self.func_name} | {self.description}"
)
class ModbusInjectionDetector:
"""Detects Modbus command injection attacks."""
def __init__(self, baseline_file: Optional[str] = None):
self.alerts: List[ModbusAlert] = []
self.packet_count = 0
self.modbus_count = 0
# 基线数据
self.authorized_masters: Set[str] = set()
self.authorized_pairs: Set[Tuple[str, str]] = set()
self.allowed_write_sources: Set[str] = set()
self.allowed_function_codes: Dict[str, Set[int]] = defaultdict(set)
self.allowed_register_ranges: Dict[str, List[Tuple[int, int]]] = defaultdict(list)
self.polling_intervals: Dict[str, float] = {}
self.last_seen: Dict[str, float] = {}
# 速率检测计数器
self.write_counts: Dict[str, List[float]] = defaultdict(list)
if baseline_file:
self.load_baseline(baseline_file)
def load_baseline(self, filepath: str):
"""Load established Modbus communication baseline."""
with open(filepath, "r") as f:
baseline = json.load(f)
for session_key, data in baseline.get("modbus_baselines", {}).items():
src, dst = session_key.split("->")
self.authorized_pairs.add((src.strip(), dst.strip()))
self.authorized_masters.add(src.strip())
fc_set = set(data.get("allowed_function_codes", []))
self.allowed_function_codes[session_key] = fc_set
if fc_set & MODBUS_WRITE_FUNCTIONS:
self.allowed_write_sources.add(src.strip())
for reg_range in data.get("register_ranges", []):
self.allowed_register_ranges[session_key].append(
(reg_range["start"], reg_range["end"])
)
if data.get("polling_interval_avg_sec"):
self.polling_intervals[session_key] = data["polling_interval_avg_sec"]
print(f"[*] 基线已加载: {len(self.authorized_pairs)} 个授权对, "
f"{len(self.allowed_write_sources)} 个授权写入来源")
def parse_modbus_mbap(self, payload: bytes) -> Optional[dict]:
"""Parse Modbus TCP MBAP header and PDU."""
if len(payload) < 8:
return None
transaction_id = struct.unpack(">H", payload[0:2])[0]
protocol_id = struct.unpack(">H", payload[2:4])[0]
length = struct.unpack(">H", payload[4:6])[0]
unit_id = payload[6]
func_code = payload[7]
if protocol_id != 0: # 非Modbus
return None
result = {
"transaction_id": transaction_id,
"protocol_id": protocol_id,
"length": length,
"unit_id": unit_id,
"func_code": func_code,
}
# 解析读写操作的寄存器地址和数量
if len(payload) >= 12 and func_code in (1, 2, 3, 4, 5, 6, 15, 16):
result["start_address"] = struct.unpack(">H", payload[8:10])[0]
result["quantity"] = struct.unpack(">H", payload[10:12])[0]
return result
def analyze_packet(self, pkt):
"""Analyze a network packet for Modbus command injection."""
self.packet_count += 1
if not pkt.haslayer(IP) or not pkt.haslayer(TCP):
return
tcp = pkt[TCP]
if tcp.dport != 502 and tcp.sport != 502:
return
payload = bytes(tcp.payload)
if not payload:
return
modbus = self.parse_modbus_mbap(payload)
if not modbus:
return
self.modbus_count += 1
src_ip = pkt[IP].src
dst_ip = pkt[IP].dst
session_key = f"{src_ip}->{dst_ip}"
now = time.time()
# 检测规则1:未授权Modbus主站
if self.authorized_masters and src_ip not in self.authorized_masters:
if tcp.dport == 502:
self.alerts.append(ModbusAlert(
severity="CRITICAL",
alert_type="UNAUTHORIZED_MASTER",
src_ip=src_ip, dst_ip=dst_ip,
unit_id=modbus["unit_id"],
func_code=modbus["func_code"],
description=f"未授权设备 {src_ip} 向 {dst_ip} 发送Modbus命令",
mitre_technique="T0843 - Program Download",
))
# 检测规则2:未授权写入操作
if modbus["func_code"] in MODBUS_WRITE_FUNCTIONS:
if self.allowed_write_sources and src_ip not in self.allowed_write_sources:
self.alerts.append(ModbusAlert(
severity="CRITICAL",
alert_type="UNAUTHORIZED_WRITE",
src_ip=src_ip, dst_ip=dst_ip,
unit_id=modbus["unit_id"],
func_code=modbus["func_code"],
description=f"来自非授权来源 {src_ip} 的写入命令",
mitre_technique="T0855 - Unauthorized Command Message",
))
# 跟踪写入频率用于速率异常检测
self.write_counts[src_ip].append(now)
recent_writes = [t for t in self.write_counts[src_ip] if now - t < 60]
self.write_counts[src_ip] = recent_writes
if len(recent_writes) > 20:
self.alerts.append(ModbusAlert(
severity="HIGH",
alert_type="WRITE_FLOOD",
src_ip=src_ip, dst_ip=dst_ip,
unit_id=modbus["unit_id"],
func_code=modbus["func_code"],
description=f"过高的写入速率: {src_ip} 在60秒内 {len(recent_writes)} 次写入",
mitre_technique="T0836 - Modify Parameter",
))
# 检测规则3:异常功能码
if session_key in self.allowed_function_codes:
if modbus["func_code"] not in self.allowed_function_codes[session_key]:
self.alerts.append(ModbusAlert(
severity="HIGH",
alert_type="ANOMALOUS_FUNCTION_CODE",
src_ip=src_ip, dst_ip=dst_ip,
unit_id=modbus["unit_id"],
func_code=modbus["func_code"],
description=(
f"功能码 {modbus['func_code']} ({MODBUS_FUNC_NAMES.get(modbus['func_code'], 'Unknown')}) "
f"不在 {session_key} 的基线中"
),
mitre_technique="T0855 - Unauthorized Command Message",
))
# 检测规则4:广播写入(单元ID 0)
if modbus["unit_id"] == 0 and modbus["func_code"] in MODBUS_WRITE_FUNCTIONS:
self.alerts.append(ModbusAlert(
severity="CRITICAL",
alert_type="BROADCAST_WRITE",
src_ip=src_ip, dst_ip=dst_ip,
unit_id=0,
func_code=modbus["func_code"],
description="广播写入命令(单元ID 0)影响网段上的所有Modbus设备",
mitre_technique="T0855 - Unauthorized Command Message",
))
# 检测规则5:越界寄存器访问
if "start_address" in modbus and session_key in self.allowed_register_ranges:
addr = modbus["start_address"]
qty = modbus.get("quantity", 1)
in_range = any(
start <= addr and addr + qty <= end
for start, end in self.allowed_register_ranges[session_key]
)
if not in_range:
self.alerts.append(ModbusAlert(
severity="HIGH",
alert_type="OUT_OF_RANGE_REGISTER",
src_ip=src_ip, dst_ip=dst_ip,
unit_id=modbus["unit_id"],
func_code=modbus["func_code"],
description=f"寄存器访问 {addr}-{addr+qty} 超出基线范围",
mitre_technique="T0836 - Modify Parameter",
))
# 检测规则6:诊断/重启命令
if modbus["func_code"] in MODBUS_DIAGNOSTIC_FUNCTIONS:
self.alerts.append(ModbusAlert(
severity="HIGH",
alert_type="DIAGNOSTIC_COMMAND",
src_ip=src_ip, dst_ip=dst_ip,
unit_id=modbus["unit_id"],
func_code=modbus["func_code"],
description=f"检测到诊断功能码 {modbus['func_code']} - 潜在的拒绝服务或侦察",
mitre_technique="T0814 - Denial of Service",
))
def print_report(self):
"""Print detection report."""
print(f"\n{'='*70}")
print(f"MODBUS命令注入检测报告")
print(f"{'='*70}")
print(f"分析时间: {datetime.now().isoformat()}")
print(f"分析的数据包总数: {self.packet_count}")
print(f"Modbus数据包数: {self.modbus_count}")
print(f"生成的告警数: {len(self.alerts)}")
if self.alerts:
severity_counts = defaultdict(int)
for alert in self.alerts:
severity_counts[alert.severity] += 1
print(f"\n严重级别分布:")
for sev in ["CRITICAL", "HIGH", "MEDIUM", "LOW"]:
if sev in severity_counts:
print(f" {sev}: {severity_counts[sev]}")
print(f"\n详细告警:")
for alert in self.alerts:
print(f"\n [{alert.severity}] {alert.alert_type}")
print(f" 时间: {alert.timestamp}")
print(f" 来源: {alert.src_ip} -> {alert.dst_ip}")
print(f" 单元ID: {alert.unit_id}")
print(f" 功能码: {alert.func_name} (FC {alert.func_code})")
print(f" 详情: {alert.description}")
if alert.mitre_technique:
print(f" MITRE ATT&CK ICS: {alert.mitre_technique}")
def start_live_monitoring(self, interface: str, duration: int = 0):
"""Start live Modbus traffic monitoring."""
print(f"[*] 正在 {interface} 上启动Modbus监控...")
print(f"[*] 按Ctrl+C停止")
try:
sniff(
iface=interface,
filter="tcp port 502",
prn=self.analyze_packet,
timeout=duration if duration > 0 else None,
)
except KeyboardInterrupt:
pass
self.print_report()
if __name__ == "__main__":
detector = ModbusInjectionDetector(
baseline_file=sys.argv[2] if len(sys.argv) > 2 else None
)
if len(sys.argv) >= 2:
if sys.argv[1].endswith(".pcap") or sys.argv[1].endswith(".pcapng"):
from scapy.all import rdpcap
print(f"[*] 正在分析捕获文件: {sys.argv[1]}")
packets = rdpcap(sys.argv[1])
for pkt in packets:
detector.analyze_packet(pkt)
detector.print_report()
else:
detector.start_live_monitoring(sys.argv[1])
else:
print("用法:")
print(" 实时: python modbus_detector.py <interface> [baseline.json]")
print(" 离线: python modbus_detector.py <capture.pcap> [baseline.json]")