Detects Stuxnet-style cyber-physical attacks in OT/ICS via PLC logic integrity monitoring, process anomaly detection, engineer workstation compromise indicators, USB propagation, and IT-to-OT lateral movement.
npx claudepluginhub killvxk/cybersecurity-skills-zhThis skill uses the workspace's default tool permissions.
- 为高价值OT目标(核能、化工、关键基础设施)实施高级威胁检测
Detects Stuxnet-style attacks on PLCs and OT systems through logic integrity monitoring, physics-based anomaly detection, workstation indicators, USB vectors, and IT-to-OT lateral movement analysis.
Detects Stuxnet-style cyber-physical attacks on ICS/OT systems through PLC logic integrity monitoring, physics-based process anomaly detection, engineering workstation compromises, USB-borne vectors, and IT-to-OT lateral movement.
Detects attacks on SCADA/ICS systems via OT IDS, industrial protocol anomaly detection, and process data analysis using Python/Scapy baselines for Modbus/DNP3. Useful for ICS monitoring and attack investigations.
Share bugs, ideas, or general feedback.
不适用于基本OT入侵检测(参见detecting-attacks-on-scada-systems)、Stuxnet样本的恶意软件分析(参见恶意软件逆向工程技能),或PLC编程和逻辑开发。
在多阶段Stuxnet式攻击链中映射检测机会。
# Stuxnet式攻击链与检测点
attack_chain:
stage_1_initial_access:
technique: "针对气隙网络的USB传播恶意软件"
mitre_ics: "T0847 - Replication Through Removable Media"
detection:
- "工程师工作站的USB设备连接日志"
- "使用OT批准的AV扫描可移动媒体"
- "应用程序白名单阻止未授权可执行文件"
- "通过组策略禁用Windows自动运行"
indicators:
- "工程师工作站的新USB设备连接"
- "从可移动媒体执行未签名二进制文件"
- "LNK文件漏洞利用模式"
stage_2_lateral_movement:
technique: "利用Windows漏洞进行网络传播"
mitre_ics: "T0866 - Exploitation of Remote Services"
detection:
- "网络IDS检测漏洞利用流量(MS08-067、MS10-061)"
- "工程师工作站之间的异常SMB流量"
- "Windows事件日志显示权限提升"
- "新建计划任务或服务"
indicators:
- "3-4级Windows系统间的横向移动"
- "来自意外来源的WMI/PsExec执行"
- "哈希传递认证模式"
stage_3_ews_compromise:
technique: "入侵具有PLC编程软件的工程师工作站"
mitre_ics: "T0862 - Supply Chain Compromise (Step-7 hooking)"
detection:
- "Step-7/TIA Portal目录的文件完整性监控"
- "PLC编程软件中的DLL注入检测"
- "监控s7otbxdx.dll的Stuxnet特定钩子"
- "PLC项目文件的意外修改"
indicators:
- "西门子STEP 7安装目录中的修改DLL"
- "工程师工作站上的Rootkit隐藏文件"
- "PLC编程软件行为异常"
stage_4_plc_logic_modification:
technique: "向PLC程序注入恶意OB/FC块"
mitre_ics: "T0839 - Module Firmware / T0833 - Modify Control Logic"
detection:
- "对比已知良好基线的PLC逻辑完整性比较"
- "来自未授权来源的S7comm上传/下载流量"
- "PLC程序中出现新的OB/FC/FB块"
- "OB1(主扫描)或OB35(循环中断)的修改"
indicators:
- "PLC程序块数量变化"
- "PLC程序大小变化"
- "上传未知程序块"
stage_5_process_manipulation:
technique: "在欺骗传感器读数的同时操控物理过程"
mitre_ics: "T0836 - Modify Parameter / T0856 - Spoof Reporting Message"
detection:
- "基于物理的异常检测(过程模型偏差)"
- "独立传感器的交叉验证"
- "振动分析和机械特征监控"
- "PLC报告值与独立测量值的比较"
indicators:
- "电机/泵在正常参数范围之外运行"
- "传感器读数偏离物理模型预测"
- "过程效率指标出现意外偏差"
通过将运行逻辑与已知良好基线比较,持续监控PLC程序完整性。
#!/usr/bin/env python3
"""PLC Logic Integrity Monitor.
Periodically retrieves PLC program block information and compares
against known-good baselines to detect unauthorized modifications
(Stuxnet-style logic injection).
"""
import hashlib
import json
import sys
import time
from dataclasses import dataclass, field, asdict
from datetime import datetime
@dataclass
class PLCBlock:
"""Represents a PLC program block."""
block_type: str # OB, FC, FB, DB
block_number: int
name: str
size_bytes: int
checksum: str
last_modified: str
author: str = ""
@dataclass
class IntegrityAlert:
alert_id: str
timestamp: str
severity: str
plc_name: str
plc_ip: str
alert_type: str
description: str
baseline_value: str
current_value: str
mitre_technique: str
class PLCIntegrityMonitor:
"""Monitors PLC program integrity against baselines."""
def __init__(self):
self.baselines = {} # plc_name -> PLCBlock列表
self.alerts = []
self.alert_counter = 1
def load_baseline(self, plc_name, baseline_file):
"""Load known-good PLC program baseline."""
with open(baseline_file) as f:
data = json.load(f)
blocks = [PLCBlock(**b) for b in data.get("blocks", [])]
self.baselines[plc_name] = {
"blocks": {f"{b.block_type}{b.block_number}": b for b in blocks},
"total_blocks": len(blocks),
"loaded_at": datetime.now().isoformat(),
}
print(f"[*] 已为 {plc_name} 加载基线: {len(blocks)} 个块")
def check_integrity(self, plc_name, plc_ip, current_blocks):
"""Compare current PLC program against baseline."""
baseline = self.baselines.get(plc_name)
if not baseline:
print(f"[WARN] {plc_name} 无基线")
return
baseline_blocks = baseline["blocks"]
current_block_map = {f"{b.block_type}{b.block_number}": b for b in current_blocks}
# 检查1:新增块(潜在逻辑注入)
for key, block in current_block_map.items():
if key not in baseline_blocks:
self.alerts.append(IntegrityAlert(
alert_id=f"INT-{self.alert_counter:04d}",
timestamp=datetime.now().isoformat(),
severity="critical",
plc_name=plc_name,
plc_ip=plc_ip,
alert_type="NEW_BLOCK_DETECTED",
description=(
f"PLC中发现基线中不存在的新程序块 {key} ({block.name})。"
f"大小: {block.size_bytes} 字节。"
),
baseline_value="基线中不存在该块",
current_value=f"{key}: {block.size_bytes} 字节, 校验和 {block.checksum}",
mitre_technique="T0839 - Module Firmware / T0833 - Modify Control Logic",
))
self.alert_counter += 1
# 检查2:删除的块
for key in baseline_blocks:
if key not in current_block_map:
self.alerts.append(IntegrityAlert(
alert_id=f"INT-{self.alert_counter:04d}",
timestamp=datetime.now().isoformat(),
severity="high",
plc_name=plc_name,
plc_ip=plc_ip,
alert_type="BLOCK_REMOVED",
description=f"程序块 {key} 已从PLC中删除",
baseline_value=f"{key}: {baseline_blocks[key].size_bytes} 字节",
current_value="未找到该块",
mitre_technique="T0833 - Modify Control Logic",
))
self.alert_counter += 1
# 检查3:块内容被修改(校验和不匹配)
for key in baseline_blocks:
if key in current_block_map:
baseline_block = baseline_blocks[key]
current_block = current_block_map[key]
if baseline_block.checksum != current_block.checksum:
self.alerts.append(IntegrityAlert(
alert_id=f"INT-{self.alert_counter:04d}",
timestamp=datetime.now().isoformat(),
severity="critical",
plc_name=plc_name,
plc_ip=plc_ip,
alert_type="BLOCK_MODIFIED",
description=(
f"程序块 {key} 校验和不匹配。"
f"自建立基线以来逻辑已被修改。"
),
baseline_value=f"校验和: {baseline_block.checksum}, 大小: {baseline_block.size_bytes}",
current_value=f"校验和: {current_block.checksum}, 大小: {current_block.size_bytes}",
mitre_technique="T0833 - Modify Control Logic",
))
self.alert_counter += 1
# 检查4:块数量变化
if len(current_blocks) != baseline["total_blocks"]:
self.alerts.append(IntegrityAlert(
alert_id=f"INT-{self.alert_counter:04d}",
timestamp=datetime.now().isoformat(),
severity="high",
plc_name=plc_name,
plc_ip=plc_ip,
alert_type="BLOCK_COUNT_CHANGE",
description=f"总块数从 {baseline['total_blocks']} 变为 {len(current_blocks)}",
baseline_value=str(baseline["total_blocks"]),
current_value=str(len(current_blocks)),
mitre_technique="T0833 - Modify Control Logic",
))
self.alert_counter += 1
def generate_report(self):
"""Generate integrity monitoring report."""
print(f"\n{'='*70}")
print("PLC逻辑完整性监控报告")
print(f"{'='*70}")
print(f"已加载基线数: {len(self.baselines)}")
print(f"告警数: {len(self.alerts)}")
for a in self.alerts:
print(f"\n [{a.severity.upper()}] {a.alert_type}")
print(f" PLC: {a.plc_name} ({a.plc_ip})")
print(f" {a.description}")
print(f" 基线: {a.baseline_value}")
print(f" 当前: {a.current_value}")
print(f" MITRE: {a.mitre_technique}")
if __name__ == "__main__":
monitor = PLCIntegrityMonitor()
print("PLC逻辑完整性监控器")
print("加载基线并定期调用 check_integrity()")
使用基于物理定律预测预期传感器值的模型监控物理过程行为。偏差表明设备故障或网络物理攻击。
#!/usr/bin/env python3
"""Physics-Based Cyber-Physical Attack Detector.
Uses simplified physics models to detect process manipulation
attacks where the attacker modifies the physical process while
spoofing sensor readings (the core Stuxnet attack pattern).
"""
import math
from dataclasses import dataclass
from datetime import datetime
@dataclass
class PhysicsAlert:
timestamp: str
severity: str
alert_type: str
sensor_tag: str
reported_value: float
predicted_value: float
deviation_percent: float
description: str
class CentrifugePhysicsModel:
"""离心机系统物理模型(Stuxnet目标类比)。
通过交叉关联检测操控:
- 电机频率(Hz)vs 报告RPM
- RPM vs 振动特征
- 功耗 vs 转速
"""
def __init__(self, rated_rpm=1200, rated_frequency=50, rated_power_kw=75):
self.rated_rpm = rated_rpm
self.rated_frequency = rated_frequency
self.rated_power_kw = rated_power_kw
self.alerts = []
def check_frequency_rpm_correlation(self, frequency_hz, reported_rpm):
"""验证电机频率与报告RPM是否匹配。
对于感应电机: RPM = 120 * 频率 / 极数
如果RPM被欺骗,它将不匹配实际频率。
"""
# 假设4极电机,典型3%转差率
expected_rpm = (120 * frequency_hz / 4) * 0.97
deviation = abs(reported_rpm - expected_rpm) / expected_rpm * 100
if deviation > 5.0:
self.alerts.append(PhysicsAlert(
timestamp=datetime.now().isoformat(),
severity="critical",
alert_type="FREQUENCY_RPM_MISMATCH",
sensor_tag="MOTOR.RPM vs VFD.FREQ",
reported_value=reported_rpm,
predicted_value=round(expected_rpm, 1),
deviation_percent=round(deviation, 1),
description=(
f"电机RPM({reported_rpm})与VFD频率({frequency_hz} Hz)不匹配。"
f"预期约 {expected_rpm:.0f} RPM。"
f"可能在操控频率的同时欺骗RPM传感器。"
),
))
def check_power_speed_correlation(self, rpm, power_kw):
"""验证功耗是否与转速匹配。
对于离心负载,功率约与RPM的三次方成比例。
"""
speed_ratio = rpm / self.rated_rpm
expected_power = self.rated_power_kw * (speed_ratio ** 3)
deviation = abs(power_kw - expected_power) / max(expected_power, 0.1) * 100
if deviation > 15.0:
self.alerts.append(PhysicsAlert(
timestamp=datetime.now().isoformat(),
severity="high",
alert_type="POWER_SPEED_MISMATCH",
sensor_tag="MOTOR.POWER vs MOTOR.RPM",
reported_value=power_kw,
predicted_value=round(expected_power, 1),
deviation_percent=round(deviation, 1),
description=(
f"功耗({power_kw} kW)与RPM({rpm})不一致。"
f"预期约 {expected_power:.1f} kW。可能表明存在隐藏的转速变化。"
),
))
def check_vibration_anomaly(self, rpm, vibration_mm_s):
"""检查振动特征是否与运行速度一致。
在报告"正常"速度时的异常振动可能表明实际
速度与传感器报告的不同。
"""
# 平衡转子的正常振动随速度线性增加
speed_ratio = rpm / self.rated_rpm
expected_vibration = 2.0 * speed_ratio # mm/s基线
deviation = abs(vibration_mm_s - expected_vibration) / max(expected_vibration, 0.1) * 100
if vibration_mm_s > 7.0: # ISO 10816告警阈值
self.alerts.append(PhysicsAlert(
timestamp=datetime.now().isoformat(),
severity="critical",
alert_type="ABNORMAL_VIBRATION",
sensor_tag="MOTOR.VIBRATION",
reported_value=vibration_mm_s,
predicted_value=round(expected_vibration, 1),
deviation_percent=round(deviation, 1),
description=(
f"RPM报告正常({rpm})时振动({vibration_mm_s} mm/s)达到ISO告警级别。"
f"实际速度可能与报告不同。"
),
))
def report(self):
if self.alerts:
print(f"\n{'='*60}")
print("基于物理的异常检测告警")
print(f"{'='*60}")
for a in self.alerts:
print(f"\n [{a.severity.upper()}] {a.alert_type}")
print(f" {a.description}")
print(f" 报告值: {a.reported_value} | 预测值: {a.predicted_value}")
print(f" 偏差: {a.deviation_percent}%")
if __name__ == "__main__":
model = CentrifugePhysicsModel(rated_rpm=1200, rated_frequency=50, rated_power_kw=75)
# 正常运行 - 预期无告警
model.check_frequency_rpm_correlation(50.0, 1164)
model.check_power_speed_correlation(1164, 72.0)
# Stuxnet式攻击:频率增加但RPM被欺骗为正常
model.check_frequency_rpm_correlation(84.0, 1164) # 频率增加,RPM被欺骗
model.check_power_speed_correlation(1164, 180.0) # 功耗揭示真实速度
model.report()
| 术语 | 定义 |
|---|---|
| 网络物理攻击(Cyber-Physical Attack) | 同时操控网络系统(PLC逻辑、传感器读数)和物理过程的攻击 |
| 逻辑注入(Logic Injection) | 将恶意代码块插入PLC程序以改变物理过程行为 |
| 传感器欺骗(Sensor Spoofing) | 回放或伪造传感器读数以向操作员隐藏过程操控 |
| 基于物理的检测(Physics-Based Detection) | 使用物理过程数学模型检测报告传感器值与实际物理的不一致 |
| PLC逻辑基线(PLC Logic Baseline) | 用于完整性比较的PLC程序块(OB、FC、FB、DB)的已知良好副本 |
| 气隙桥接(Air-Gap Bridging) | 通过USB驱动器跨越气隙网络的技术,如Stuxnet的初始访问方式 |
Stuxnet式攻击检测报告
========================================
监控的PLC数: [N]
监控周期: YYYY-MM-DD 至 YYYY-MM-DD
PLC完整性:
已验证基线: [N]/[N]
检测到逻辑修改: [N]
检测到新块: [N]
物理异常:
传感器关联违规: [N]
过程模型偏差: [N]
工程师工作站:
未授权修改: [N]
USB连接: [N]