Detects anomalies in DNP3 protocol communications for SCADA systems by monitoring unauthorized control commands, firmware updates, protocol violations, and baseline traffic deviations using deep packet inspection and Python analysis.
npx claudepluginhub killvxk/cybersecurity-skills-zhThis skill uses the workspace's default tool permissions.
- 在DNP3为主要协议的能源行业监控SCADA系统
Detects anomalies in DNP3 protocol communications for SCADA systems by monitoring unauthorized control commands, firmware update attempts, protocol violations, and baseline traffic deviations using deep packet inspection and Python/Scapy.
Detects anomalies in DNP3 protocol traffic for SCADA/OT systems, spotting unauthorized commands, protocol violations, and baseline deviations using Python/Scapy packet analysis.
Detects attacks on SCADA/ICS systems via OT IDS, industrial protocol anomaly detection, and process data analysis using Python/Scapy baselines for Modbus/DNP3. Useful for ICS monitoring and attack investigations.
Share bugs, ideas, or general feedback.
不适用于非DNP3协议监控(Modbus相关参见detecting-modbus-command-injection-attacks)、DNP3安全认证配置(独立实现),或与协议无关的网络异常检测。
#!/usr/bin/env python3
"""DNP3 Protocol Anomaly Detector.
Monitors DNP3 communications for unauthorized control commands,
protocol violations, and deviations from established baselines.
Supports both TCP and serial DNP3 deployments.
"""
import struct
import sys
import json
from collections import defaultdict
from datetime import datetime
from typing import Dict, List, Optional, Set
try:
from scapy.all import rdpcap, IP, TCP
except ImportError:
print("Install scapy: pip install scapy")
sys.exit(1)
# DNP3功能码
DNP3_FUNCTIONS = {
0x00: "Confirm", 0x01: "Read", 0x02: "Write",
0x03: "Select", 0x04: "Operate", 0x05: "Direct Operate",
0x06: "Direct Operate No Ack", 0x07: "Immediate Freeze",
0x08: "Immediate Freeze No Ack", 0x09: "Freeze and Clear",
0x0A: "Freeze and Clear No Ack", 0x0B: "Freeze at Time",
0x0C: "Freeze at Time No Ack", 0x0D: "Cold Restart",
0x0E: "Warm Restart", 0x0F: "Initialize Data",
0x10: "Initialize Application", 0x11: "Start Application",
0x12: "Stop Application", 0x13: "Save Configuration",
0x14: "Enable Unsolicited", 0x15: "Disable Unsolicited",
0x16: "Assign Class", 0x17: "Delay Measurement",
0x18: "Record Current Time", 0x19: "Open File",
0x1A: "Close File", 0x1B: "Delete File",
0x1C: "Get File Info", 0x1D: "Authenticate File",
0x1E: "Abort File", 0x81: "Response", 0x82: "Unsolicited Response",
}
# 应触发告警的高风险功能码
DNP3_CRITICAL_FUNCTIONS = {
0x02, # Write
0x03, 0x04, 0x05, 0x06, # Select/Operate/Direct Operate
0x0D, # Cold Restart
0x0E, # Warm Restart
0x0F, # Initialize Data
0x10, # Initialize Application
0x12, # Stop Application
0x19, 0x1A, 0x1B, # 文件操作(固件更新)
}
class DNP3AnomalyDetector:
"""Detects anomalies in DNP3 protocol communications."""
def __init__(self, baseline_file: Optional[str] = None):
self.alerts = []
self.sessions = defaultdict(lambda: {
"packet_count": 0,
"function_codes": defaultdict(int),
"control_commands": 0,
"file_operations": 0,
"restarts": 0,
})
self.packet_count = 0
self.dnp3_count = 0
self.authorized_masters: Set[str] = set()
self.authorized_pairs: Dict[str, Set[str]] = defaultdict(set)
self.baseline_functions: Dict[str, Set[int]] = defaultdict(set)
if baseline_file:
self.load_baseline(baseline_file)
def load_baseline(self, filepath: str):
"""Load DNP3 communication baseline."""
with open(filepath, "r") as f:
baseline = json.load(f)
for entry in baseline.get("authorized_communications", []):
master = entry["master_ip"]
outstation = entry["outstation_ip"]
self.authorized_masters.add(master)
self.authorized_pairs[master].add(outstation)
self.baseline_functions[f"{master}->{outstation}"] = set(
entry.get("expected_function_codes", [0x00, 0x01])
)
def parse_dnp3_header(self, payload: bytes) -> Optional[dict]:
"""Parse DNP3 data link layer and transport/application headers."""
if len(payload) < 10:
return None
# DNP3数据链路层: start(2) + length(1) + control(1) + dest(2) + source(2) + crc(2)
start_bytes = struct.unpack(">H", payload[0:2])[0]
if start_bytes != 0x0564:
return None
length = payload[2]
control = payload[3]
dest_addr = struct.unpack("<H", payload[4:6])[0]
source_addr = struct.unpack("<H", payload[6:8])[0]
direction = "Master->Outstation" if (control & 0x80) else "Outstation->Master"
result = {
"length": length,
"control": control,
"direction": direction,
"dest_addr": dest_addr,
"source_addr": source_addr,
"is_master": bool(control & 0x80),
}
# 解析传输层和应用层(CRC字节之后)
if len(payload) >= 12:
transport_header = payload[10]
if len(payload) >= 13:
app_control = payload[11]
func_code = payload[12]
result["function_code"] = func_code
result["function_name"] = DNP3_FUNCTIONS.get(
func_code, f"Unknown (0x{func_code:02x})"
)
return result
def analyze_packet(self, pkt):
"""Analyze a packet for DNP3 anomalies."""
self.packet_count += 1
if not pkt.haslayer(IP) or not pkt.haslayer(TCP):
return
tcp = pkt[TCP]
if tcp.dport != 20000 and tcp.sport != 20000:
return
payload = bytes(tcp.payload)
if not payload:
return
dnp3 = self.parse_dnp3_header(payload)
if not dnp3:
return
self.dnp3_count += 1
src_ip = pkt[IP].src
dst_ip = pkt[IP].dst
session_key = f"{src_ip}->{dst_ip}"
session = self.sessions[session_key]
session["packet_count"] += 1
func_code = dnp3.get("function_code")
if func_code is not None:
session["function_codes"][func_code] += 1
# 检测1:未授权DNP3主站
if dnp3.get("is_master") and self.authorized_masters:
if src_ip not in self.authorized_masters:
self.alerts.append({
"severity": "CRITICAL",
"type": "UNAUTHORIZED_DNP3_MASTER",
"src": src_ip, "dst": dst_ip,
"function": dnp3.get("function_name"),
"description": f"未授权DNP3主站 {src_ip} 与外站 {dst_ip} 通信",
"mitre": "T0869 - Standard Application Layer Protocol",
})
# 检测2:冷/热重启命令
if func_code in (0x0D, 0x0E):
session["restarts"] += 1
restart_type = "冷" if func_code == 0x0D else "热"
self.alerts.append({
"severity": "CRITICAL",
"type": "DNP3_RESTART_COMMAND",
"src": src_ip, "dst": dst_ip,
"function": f"{restart_type}重启",
"description": f"{restart_type}重启命令发送至外站 {dst_ip}(地址 {dnp3['dest_addr']})",
"mitre": "T0816 - Device Restart/Shutdown",
})
# 检测3:文件操作(潜在固件更新)
if func_code in (0x19, 0x1A, 0x1B, 0x1C, 0x1D, 0x1E):
session["file_operations"] += 1
self.alerts.append({
"severity": "CRITICAL",
"type": "DNP3_FILE_OPERATION",
"src": src_ip, "dst": dst_ip,
"function": dnp3.get("function_name"),
"description": f"外站 {dst_ip} 上的文件操作 - 潜在固件更新或PIPEDREAM指标",
"mitre": "T0839 - Module Firmware",
})
# 检测4:控制命令(选择/操作)
if func_code in (0x03, 0x04, 0x05, 0x06):
session["control_commands"] += 1
if session_key in self.baseline_functions:
if func_code not in self.baseline_functions[session_key]:
self.alerts.append({
"severity": "HIGH",
"type": "UNEXPECTED_CONTROL_COMMAND",
"src": src_ip, "dst": dst_ip,
"function": dnp3.get("function_name"),
"description": f"控制命令 {dnp3.get('function_name')} 不在 {session_key} 的基线中",
"mitre": "T0855 - Unauthorized Command Message",
})
# 检测5:此通信对的异常功能码
if session_key in self.baseline_functions:
if func_code not in self.baseline_functions[session_key]:
if func_code not in (0x00, 0x81, 0x82): # 排除常见响应码
self.alerts.append({
"severity": "MEDIUM",
"type": "ANOMALOUS_FUNCTION_CODE",
"src": src_ip, "dst": dst_ip,
"function": dnp3.get("function_name"),
"description": f"功能码 0x{func_code:02x} 不在基线中",
"mitre": "T0855 - Unauthorized Command Message",
})
def generate_report(self):
"""Generate DNP3 anomaly detection report."""
print(f"\n{'='*70}")
print("DNP3协议异常检测报告")
print(f"{'='*70}")
print(f"分析时间: {datetime.now().isoformat()}")
print(f"总数据包数: {self.packet_count}")
print(f"DNP3数据包数: {self.dnp3_count}")
print(f"告警数: {len(self.alerts)}")
print(f"\n--- DNP3会话摘要 ---")
for key, session in self.sessions.items():
print(f"\n {key}")
print(f" 数据包数: {session['packet_count']}")
funcs = [DNP3_FUNCTIONS.get(f, f"0x{f:02x}") for f in session["function_codes"]]
print(f" 功能码: {', '.join(funcs)}")
print(f" 控制命令数: {session['control_commands']}")
print(f" 文件操作数: {session['file_operations']}")
print(f" 重启命令数: {session['restarts']}")
if self.alerts:
print(f"\n--- 告警 ---")
for alert in self.alerts:
print(f"\n [{alert['severity']}] {alert['type']}")
print(f" {alert['src']} -> {alert['dst']}")
print(f" 功能码: {alert['function']}")
print(f" 详情: {alert['description']}")
print(f" MITRE ICS: {alert.get('mitre', 'N/A')}")
if __name__ == "__main__":
detector = DNP3AnomalyDetector(
baseline_file=sys.argv[2] if len(sys.argv) > 2 else None
)
if len(sys.argv) >= 2:
print(f"[*] 正在分析: {sys.argv[1]}")
packets = rdpcap(sys.argv[1])
for pkt in packets:
detector.analyze_packet(pkt)
detector.generate_report()
else:
print("用法: python dnp3_detector.py <capture.pcap> [baseline.json]")
| 术语 | 定义 |
|---|---|
| DNP3 | 分布式网络协议版本3,能源行业SCADA系统中主站与外站通信的主要协议 |
| 外站(Outstation) | DNP3从站设备(通常为RTU或IED),响应主站的轮询和命令 |
| 先选后操(Select-Before-Operate) | DNP3安全机制,要求在操作前先发送选择命令,防止意外控制操作 |
| 冷重启(Cold Restart,FC 0x0D) | 完全重启外站的DNP3命令,重置所有配置——高风险的拒绝服务操作 |
| DNP3安全认证(DNP3 Secure Authentication) | 可选的DNP3扩展(SA v5),添加基于HMAC的认证以防止命令欺骗 |
| PIPEDREAM | 具有DNP3功能的ICS攻击框架,用于操控外站和执行固件更新 |
DNP3异常检测报告
================================
分析周期: [开始] 至 [结束]
监控点: [变电站/网段]
流量摘要:
DNP3数据包: [数量]
唯一主站-外站对: [数量]
控制命令: [数量]
文件操作: [数量]
告警:
[严重] 未授权DNP3主站 [IP]
[严重] 向外站 [地址] 发送冷重启命令
[高] 来自 [IP] 的意外控制命令
建议:
1. 部署DNP3安全认证(SA v5)
2. 在防火墙阻断未授权来源
3. 在工业防火墙启用DNP3深度包检测