Performs OT network security assessments for SCADA/DCS/ICS using Purdue model, passive discovery of Modbus/DNP3/OPC UA/EtherNet/IP traffic to detect misconfigurations, unauthorized connections, and attack surfaces.
npx claudepluginhub killvxk/cybersecurity-skills-zhThis skill uses the workspace's default tool permissions.
- 对新客户OT/ICS环境进行初始安全基线评估时
Conducts security assessments of OT networks including SCADA, DCS, Purdue Model layers, IT/OT risks, firewall rules, and industrial protocols like Modbus, DNP3, OPC UA to detect misconfigurations and attack surfaces.
Conducts OT/ICS network security assessments using Purdue Model, evaluating firewalls, IT/OT convergence risks, and protocols (Modbus, DNP3, OPC UA, EtherNet/IP) for misconfigurations.
Conducts authorized OT/ICS security assessments: discovers devices/protocols (Modbus, S7, IEC104, DNP3, BACnet, EtherNet/IP) via Nmap and assesses vulnerabilities using Metasploit.
Share bugs, ideas, or general feedback.
不适用于无OT组件的纯IT网络评估、IT Web应用程序的应用层漏洞扫描(参见performing-web-app-penetration-test),或未经明确授权和安全控制措施对生产OT系统进行主动漏洞利用。
根据Purdue参考模型级别定义评估范围,并识别不得主动扫描的安全关键系统。OT评估与IT评估有根本区别,因为主动扫描可能导致PLC崩溃、破坏安全仪表系统(SIS),并造成物理伤害。
# OT评估范围定义
assessment:
facility: "化工处理厂 - Alpha站"
purdue_levels_in_scope:
- level_0: "物理过程传感器和执行器(仅被动观察)"
- level_1: "PLC、RTU、安全控制器(仅被动,不主动扫描)"
- level_2: "HMI站、工程师工作站、历史服务器(经批准可有限主动测试)"
- level_3: "站点运营 - OPC服务器、应用服务器(允许主动扫描)"
- level_3_5: "DMZ - 数据二极管、跳板服务器(允许主动扫描)"
- level_4: "连接到OT的企业IT(允许主动扫描)"
safety_exclusions:
- "安全仪表系统(SIS)- Triconex控制器"
- "紧急停车(ESD)系统"
- "火气检测系统"
- "生产期间的任何0/1级设备"
authorized_activities:
passive:
- "通过SPAN端口进行网络流量捕获和分析"
- "工业协议深度包检测"
- "无线频谱分析"
- "物理巡检和目视检查"
active_with_approval:
- "维护窗口期间对2-4级系统进行针对性Nmap扫描"
- "HMI和工程师工作站的认证测试"
- "区域间防火墙规则验证"
prohibited:
- "主动扫描PLC、RTU或SIS控制器"
- "对生产系统进行工业协议模糊测试"
- "修改PLC逻辑或固件"
部署被动监控以映射OT网络上的所有设备、通信流和协议,而不发送任何可能干扰运营的流量。
#!/usr/bin/env python3
"""OT网络被动发现和资产清单构建器。
使用来自SPAN端口的pcap捕获来识别OT资产、协议
和通信模式,无需主动扫描。
"""
import json
import sys
from collections import defaultdict
from datetime import datetime
try:
from scapy.all import rdpcap, IP, TCP, UDP
from scapy.contrib.modbus import ModbusADURequest, ModbusADUResponse
except ImportError:
print("安装scapy: pip install scapy")
sys.exit(1)
# 工业协议端口映射
OT_PROTOCOL_PORTS = {
502: "Modbus/TCP",
102: "S7comm(西门子)",
44818: "EtherNet/IP(CIP)",
2222: "EtherNet/IP(隐式)",
4840: "OPC UA",
20000: "DNP3",
47808: "BACnet",
1911: "Niagara Fox",
789: "Crimson v3(Red Lion)",
2404: "IEC 60870-5-104",
18245: "GE SRTP",
5094: "HART-IP",
}
PURDUE_LEVEL_RANGES = {
"0-1级(现场设备)": ["10.10.0.0/16", "192.168.10.0/24"],
"2级(控制系统)": ["10.20.0.0/16", "192.168.20.0/24"],
"3级(站点运营)": ["10.30.0.0/16", "192.168.30.0/24"],
"3.5级(DMZ)": ["172.16.0.0/16"],
"4级(企业)": ["10.0.0.0/16"],
}
def classify_purdue_level(ip_addr):
"""将IP地址分类到其Purdue参考模型级别。"""
from ipaddress import ip_address, ip_network
addr = ip_address(ip_addr)
for level, subnets in PURDUE_LEVEL_RANGES.items():
for subnet in subnets:
if addr in ip_network(subnet):
return level
return "未知"
def analyze_ot_pcap(pcap_file):
"""分析pcap文件以发现OT资产和通信模式。"""
packets = rdpcap(pcap_file)
assets = {}
connections = defaultdict(lambda: {"count": 0, "protocols": set(), "ports": set()})
protocol_stats = defaultdict(int)
cross_zone_flows = []
for pkt in packets:
if not pkt.haslayer(IP):
continue
src_ip = pkt[IP].src
dst_ip = pkt[IP].dst
# 跟踪资产
for ip in (src_ip, dst_ip):
if ip not in assets:
assets[ip] = {
"ip": ip,
"purdue_level": classify_purdue_level(ip),
"protocols_observed": set(),
"roles": set(),
"first_seen": str(pkt.time),
"last_seen": str(pkt.time),
"mac": None,
}
assets[ip]["last_seen"] = str(pkt.time)
# 通过端口识别OT协议
dst_port = None
if pkt.haslayer(TCP):
dst_port = pkt[TCP].dport
elif pkt.haslayer(UDP):
dst_port = pkt[UDP].dport
if dst_port and dst_port in OT_PROTOCOL_PORTS:
protocol_name = OT_PROTOCOL_PORTS[dst_port]
protocol_stats[protocol_name] += 1
assets[src_ip]["protocols_observed"].add(protocol_name)
assets[dst_ip]["protocols_observed"].add(protocol_name)
# 确定角色
assets[src_ip]["roles"].add("客户端/主站")
assets[dst_ip]["roles"].add("服务器/从站")
# 跟踪连接
conn_key = (src_ip, dst_ip)
connections[conn_key]["count"] += 1
if dst_port:
connections[conn_key]["ports"].add(dst_port)
if dst_port in OT_PROTOCOL_PORTS:
connections[conn_key]["protocols"].add(OT_PROTOCOL_PORTS[dst_port])
# 检测跨区域通信
src_level = classify_purdue_level(src_ip)
dst_level = classify_purdue_level(dst_ip)
if src_level != dst_level and src_level != "未知" and dst_level != "未知":
cross_zone_flows.append({
"src": src_ip,
"src_level": src_level,
"dst": dst_ip,
"dst_level": dst_level,
"protocol": OT_PROTOCOL_PORTS.get(dst_port, f"port/{dst_port}"),
})
return {
"asset_count": len(assets),
"assets": {ip: {k: list(v) if isinstance(v, set) else v for k, v in info.items()} for ip, info in assets.items()},
"protocol_distribution": dict(protocol_stats),
"total_connections": len(connections),
"cross_zone_flows": cross_zone_flows[:50],
}
def generate_assessment_report(results):
"""生成OT网络评估发现报告。"""
report = []
report.append("=" * 70)
report.append("OT网络被动发现报告")
report.append(f"生成时间: {datetime.now().isoformat()}")
report.append("=" * 70)
report.append(f"\n已发现资产总数: {results['asset_count']}")
report.append(f"唯一连接总数: {results['total_connections']}")
report.append("\n--- 工业协议分布 ---")
for proto, count in sorted(results["protocol_distribution"].items(), key=lambda x: -x[1]):
report.append(f" {proto}: {count} 个数据包")
report.append("\n--- 跨区域通信流 ---")
if results["cross_zone_flows"]:
for flow in results["cross_zone_flows"][:20]:
report.append(
f" {flow['src']} ({flow['src_level']}) -> "
f"{flow['dst']} ({flow['dst_level']}) 通过 {flow['protocol']}"
)
else:
report.append(" 未检测到跨区域流(请检查子网分类)")
report.append("\n--- 发现 ---")
# 检查4级到0-1级的直接连接(严重发现)
for flow in results["cross_zone_flows"]:
if "4级" in flow["src_level"] and "0-1级" in flow["dst_level"]:
report.append(
f" [严重] 企业到现场直接流量: "
f"{flow['src']} -> {flow['dst']} 通过 {flow['protocol']}"
)
elif "4级" in flow["src_level"] and "2级" in flow["dst_level"]:
report.append(
f" [高危] 绕过DMZ的企业到控制流量: "
f"{flow['src']} -> {flow['dst']} 通过 {flow['protocol']}"
)
return "\n".join(report)
if __name__ == "__main__":
if len(sys.argv) < 2:
print("用法: python ot_network_discovery.py <pcap文件>")
sys.exit(1)
results = analyze_ot_pcap(sys.argv[1])
print(generate_assessment_report(results))
# 保存详细JSON
output_file = sys.argv[1].replace(".pcap", "_inventory.json")
with open(output_file, "w") as f:
json.dump(results, f, indent=2, default=str)
print(f"\n详细清单已保存到: {output_file}")
分析OT区域间的防火墙配置,识别过于宽松的规则、缺失的默认拒绝,以及违反IEC 62443区域模型的未授权通道。
#!/usr/bin/env python3
"""OT区域防火墙规则分析器。
解析防火墙规则导出(CSV格式)并根据
IEC 62443区域/通道模型要求进行评估。
"""
import csv
import json
import sys
from dataclasses import dataclass, field
from typing import Optional
@dataclass
class FirewallRule:
rule_id: str
source_zone: str
source_ip: str
dest_zone: str
dest_ip: str
service: str
port: str
action: str
enabled: bool
comment: str = ""
# IEC 62443区域通信策略
# 定义哪些区域对可以通信以及通过哪些通道
ALLOWED_CONDUITS = {
("Level 4", "Level 3.5"): {
"allowed_ports": [443, 3389, 22],
"description": "企业到DMZ - Web服务、跳板主机",
"requires_inspection": True,
},
("Level 3.5", "Level 3"): {
"allowed_ports": [443, 1433, 5432, 8080],
"description": "DMZ到站点运营 - 历史服务器镜像、OPC中继",
"requires_inspection": True,
},
("Level 3", "Level 2"): {
"allowed_ports": [502, 44818, 4840, 102],
"description": "站点运营到控制 - OPC UA、Modbus、S7comm",
"requires_inspection": True,
},
("Level 2", "Level 1"): {
"allowed_ports": [502, 44818, 102, 2222],
"description": "控制到现场 - 直接工业协议",
"requires_inspection": False,
},
}
# 禁止直接连接
PROHIBITED_CONDUITS = [
("Level 4", "Level 3"),
("Level 4", "Level 2"),
("Level 4", "Level 1"),
("Level 4", "Level 0"),
("Level 3", "Level 1"),
("Level 3", "Level 0"),
("Internet", "Level 3.5"),
("Internet", "Level 3"),
("Internet", "Level 2"),
("Internet", "Level 1"),
]
def parse_firewall_rules(csv_file):
"""从CSV导出解析防火墙规则。"""
rules = []
with open(csv_file, "r") as f:
reader = csv.DictReader(f)
for row in reader:
rules.append(FirewallRule(
rule_id=row.get("rule_id", ""),
source_zone=row.get("source_zone", ""),
source_ip=row.get("source_ip", ""),
dest_zone=row.get("dest_zone", ""),
dest_ip=row.get("dest_ip", ""),
service=row.get("service", ""),
port=row.get("port", ""),
action=row.get("action", ""),
enabled=row.get("enabled", "true").lower() == "true",
comment=row.get("comment", ""),
))
return rules
def analyze_rules(rules):
"""根据IEC 62443区域模型分析防火墙规则。"""
findings = {"critical": [], "high": [], "medium": [], "low": [], "info": []}
for rule in rules:
if not rule.enabled or rule.action.lower() != "allow":
continue
zone_pair = (rule.source_zone, rule.dest_zone)
port = int(rule.port) if rule.port.isdigit() else 0
# 检查禁止的通道
if zone_pair in PROHIBITED_CONDUITS:
findings["critical"].append({
"rule_id": rule.rule_id,
"finding": f"禁止直接连接: {rule.source_zone} -> {rule.dest_zone}",
"detail": f"规则允许 {rule.source_ip} 访问 {rule.dest_ip}:{rule.port} ({rule.service})",
"remediation": "删除规则。通过带应用层检查的DMZ(3.5级)路由流量。",
})
# 检查过于宽松的规则(any/any)
elif rule.source_ip in ("any", "0.0.0.0/0") or rule.dest_ip in ("any", "0.0.0.0/0"):
findings["high"].append({
"rule_id": rule.rule_id,
"finding": "包含'any'地址的过于宽松规则",
"detail": f"{rule.source_ip} -> {rule.dest_ip}:{rule.port} 在 {zone_pair} 中",
"remediation": "按照IEC 62443最小权限通道策略,限制为特定主机IP。",
})
# 检查允许通道中的端口违规
elif zone_pair in ALLOWED_CONDUITS:
conduit = ALLOWED_CONDUITS[zone_pair]
if port and port not in conduit["allowed_ports"]:
findings["medium"].append({
"rule_id": rule.rule_id,
"finding": f"通道 {zone_pair} 中的未授权端口",
"detail": f"端口 {port} ({rule.service}) 不在允许列表 {conduit['allowed_ports']} 中",
"remediation": f"从通道中删除端口 {port} 或在风险评估中说明理由。",
})
return findings
if __name__ == "__main__":
if len(sys.argv) < 2:
print("用法: python ot_firewall_analyzer.py <rules.csv>")
sys.exit(1)
rules = parse_firewall_rules(sys.argv[1])
findings = analyze_rules(rules)
print("=" * 70)
print("OT区域防火墙规则分析")
print("=" * 70)
for severity in ["critical", "high", "medium", "low"]:
if findings[severity]:
print(f"\n--- {severity.upper()} 发现 ({len(findings[severity])}) ---")
for f in findings[severity]:
print(f" [{f['rule_id']}] {f['finding']}")
print(f" 详情: {f['detail']}")
print(f" 修复: {f['remediation']}")
评估使用中工业协议的安全配置,检查认证、加密和访问控制。
# 捕获Modbus/TCP流量进行分析
tcpdump -i eth0 -w ot_capture.pcap 'port 502 or port 44818 or port 4840 or port 102 or port 20000' -c 100000
# 使用带OT协议解析器的Wireshark进行深度检查
tshark -r ot_capture.pcap -Y "modbus" -T fields \
-e ip.src -e ip.dst -e modbus.func_code -e modbus.reference_num \
-e modbus.word_cnt > modbus_analysis.csv
# 检查未认证的Modbus写操作(功能码5,6,15,16)
tshark -r ot_capture.pcap -Y "modbus.func_code >= 5 && modbus.func_code <= 16" \
-T fields -e ip.src -e ip.dst -e modbus.func_code -e frame.time
# 扫描OPC UA服务器并检查安全策略
# 仅在获得明确授权的3级以上系统上运行
python3 -c "
from opcua import Client
server_url = 'opc.tcp://10.30.1.50:4840'
client = Client(server_url)
endpoints = client.connect_and_get_server_endpoints()
for ep in endpoints:
print(f'端点: {ep.EndpointUrl}')
print(f' 安全模式: {ep.SecurityMode}')
print(f' 安全策略: {ep.SecurityPolicyUri}')
print(f' 认证令牌: {[t.TokenType for t in ep.UserIdentityTokens]}')
"
按IEC 62443和NIST SP 800-82 Rev.3组织发现,生成结构化报告。
OT网络安全评估报告
=======================================
设施: 化工处理厂 - Alpha站
评估日期: 2026-02-23
标准: IEC 62443-3-3 / NIST SP 800-82r3
评估员: [评估员姓名]
执行摘要:
OT网络评估发现跨Purdue 0-4级共47个资产。
识别出12个严重和23个高危发现,主要涉及
网络隔离不足、未认证工业协议,以及
未授权跨区域通信路径。
资产清单摘要:
0-1级(现场): 18台设备(PLC、RTU、I/O模块)
2级(控制): 9台设备(HMI、工程师工作站)
3级(运营): 12台设备(历史服务器、OPC服务器、应用服务器)
3.5级(DMZ): 3台设备(数据二极管、跳板服务器、补丁服务器)
4级(企业): 5台设备(域控制器、文件服务器)
严重发现:
[OT-001] 检测到企业到PLC的直接通信
来源: 10.0.5.22(4级 - IT工作站)
目标: 10.10.1.15(1级 - Allen-Bradley PLC)
协议: EtherNet/IP(端口44818)
影响: 企业网络上的攻击者可直接修改PLC逻辑
修复: 阻断直接L4-L1流量;通过DMZ代理路由
[OT-002] Modbus/TCP写命令无认证
受影响: 8台PLC接受未认证FC6(写单个寄存器)
影响: OT网络上的任何设备均可修改过程设定点
修复: 部署支持Modbus的防火墙;限制具有写权限的来源
[OT-003] 扁平网络 - Purdue级别间无隔离
详情: 所有OT设备共享VLAN 100(10.10.0.0/16)
影响: 被入侵的HMI可直接访问所有PLC和SIS
修复: 按IEC 62443-3-2实施基于区域的隔离
风险矩阵:
严重: 12个发现(需立即修复)
高危: 23个发现(30天内修复)
中危: 15个发现(90天内修复)
低危: 8个发现(在下次维护周期修复)
| 术语 | 定义 |
|---|---|
| Purdue参考模型 | 工业控制系统的层次架构模型(0-5级),从物理过程到企业IT定义安全区域 |
| IEC 62443 | 工业自动化和控制系统(IACS)安全的国际标准系列,定义安全级别、区域、通道和安全要求 |
| 区域(Zone) | 共享相同安全要求的逻辑或物理资产分组,由IEC 62443-3-2定义 |
| 通道(Conduit) | 连接两个或多个区域的通信信道逻辑分组,受共同安全策略约束 |
| SCADA | 监控控制与数据采集系统——工业过程高级过程监控管理的系统架构 |
| DCS | 分布式控制系统——控制元素分布在整个系统中的控制系统架构 |
| 气隙(Air Gap) | OT网络与IT/互联网的物理隔离,现已越来越多地被带防火墙和数据二极管的受管通道替代 |
| 安全仪表系统(SIS) | 在检测到危险条件时将过程带入安全状态的独立系统 |
背景:一家水务公司将所有OT设备置于同一VLAN。被动网络监控显示HMI、PLC、历史服务器和域控制器共享同一2层广播域。企业网络和OT环境之间没有DMZ。
方法:
注意事项:生产期间主动扫描PLC可能导致通信超时和过程中断。在没有完整流量基线的情况下实施隔离将破坏合法的控制系统通信。仅依赖无工业协议检查的网络层防火墙会使Modbus/TCP和EtherNet/IP写命令无法得到检查。
OT网络安全评估报告
=======================================
设施: [设施名称]
评估日期: YYYY-MM-DD
标准: IEC 62443-3-3 / NIST SP 800-82r3
执行摘要:
[2-3句话概述发现和风险级别]
资产清单:
0-1级: [数量] 台现场设备
2级: [数量] 个控制系统
3级: [数量] 个运营系统
3.5级: [数量] 个DMZ系统
4级: [数量] 个企业系统
按严重程度的发现:
严重: [数量](需立即行动)
高危: [数量](30天修复)
中危: [数量](90天修复)
低危: [数量](下次维护窗口)
详细发现:
[OT-NNN] 发现标题
严重程度: 严重|高|中|低
受影响资产: [列表]
IEC 62443参考: [章节]
NIST 800-82r3参考: [章节]
描述: [技术细节]
影响: [运营和安全影响]
修复: [具体技术修复步骤]