Implements network segmentation in OT environments using VLANs, industrial firewalls, data diodes, SDN based on Purdue model. Covers zero-downtime migration from flat networks, DPI firewall config for industrial protocols, and traffic analysis verification.
npx claudepluginhub killvxk/cybersecurity-skills-zhThis skill uses the workspace's default tool permissions.
- OT安全评估发现Purdue层级之间没有分段的扁平网络
Implements Purdue Model-based network segmentation in OT using VLANs, industrial firewalls, data diodes, SDN. Guides flat-to-segmented migrations, OT firewall config with protocol DPI, and traffic validation.
Guides implementing network segmentation in OT/ICS environments using VLANs, industrial firewalls, data diodes, SDN, and Purdue Model. Covers flat-to-segmented migration, OT firewall config, and traffic validation.
Implements Purdue model network segmentation for ICS/OT environments, mapping assets to levels 0-5, assigning VLANs/protocols, and defining firewall rules for IT/OT isolation.
Share bugs, ideas, or general feedback.
不适用于无OT组件的纯IT微分段(参见implementing-zero-trust-in-cloud),或在未进行先期流量分析的情况下进行初始区域设计(先参见performing-ot-network-security-assessment)。
使用流量基线设计VLAN和防火墙架构,在隔离区域的同时保留所有合法通信路径。
#!/usr/bin/env python3
"""OT网络分段设计工具。
分析流量基线数据,生成带VLAN分配、
防火墙规则和迁移计划的分段设计。
"""
import json
import sys
from collections import defaultdict
from dataclasses import dataclass, field, asdict
from ipaddress import ip_address, ip_network
@dataclass
class VLANDesign:
vlan_id: int
name: str
purdue_level: str
subnet: str
gateway: str
description: str
devices: list = field(default_factory=list)
@dataclass
class FirewallRule:
rule_id: int
source_zone: str
source_ip: str
dest_zone: str
dest_ip: str
protocol: str
port: int
action: str
dpi_profile: str = ""
comment: str = ""
class SegmentationDesigner:
"""根据流量基线生成分段设计。"""
def __init__(self, baseline_file):
with open(baseline_file) as f:
self.baseline = json.load(f)
self.vlans = []
self.rules = []
self.rule_counter = 1
def design_vlans(self):
"""根据Purdue层级创建VLAN设计。"""
self.vlans = [
VLANDesign(10, "SIS-SAFETY", "1级(安全)",
"10.10.10.0/24", "10.10.10.1",
"安全仪表系统 - 气隙隔离或硬件隔离"),
VLANDesign(20, "BPCS-FIELD", "0-1级(现场/控制)",
"10.10.20.0/24", "10.10.20.1",
"PLC、RTU、I/O模块、现场仪表"),
VLANDesign(30, "BPCS-SUPERVISORY", "2级(监控)",
"10.10.30.0/24", "10.10.30.1",
"HMI、工程师工作站、本地历史服务器"),
VLANDesign(40, "SITE-OPS", "3级(运营)",
"10.10.40.0/24", "10.10.40.1",
"站点历史服务器、OPC服务器、MES、报警管理"),
VLANDesign(50, "OT-DMZ", "3.5级(DMZ)",
"172.16.50.0/24", "172.16.50.1",
"数据二极管、历史服务器镜像、跳板服务器、补丁服务器"),
VLANDesign(60, "ENTERPRISE", "4级(企业)",
"10.0.60.0/24", "10.0.60.1",
"访问OT数据的企业IT系统"),
VLANDesign(999, "QUARANTINE", "隔离区",
"10.10.99.0/24", "10.10.99.1",
"用于未授权或不受信任设备的隔离VLAN"),
]
return self.vlans
def generate_firewall_rules_from_baseline(self):
"""根据观察到的合法流量生成防火墙规则。"""
self.rules = []
# 各区域边界的默认拒绝规则
zone_pairs = [
("2级", "0-1级"),
("3级", "2级"),
("3.5级", "3级"),
("4级", "3.5级"),
]
# 从基线观察到的流量生成允许规则
for flow in self.baseline.get("cross_zone_flows", []):
self.rules.append(FirewallRule(
rule_id=self.rule_counter,
source_zone=flow["src_level"],
source_ip=flow["src"],
dest_zone=flow["dst_level"],
dest_ip=flow["dst"],
protocol=flow.get("protocol", "TCP"),
port=flow.get("port", 0),
action="ALLOW",
dpi_profile=self._get_dpi_profile(flow.get("port", 0)),
comment=f"基线观察: {flow['src']} -> {flow['dst']}",
))
self.rule_counter += 1
# 在每个区域ACL末尾添加默认拒绝规则
for src_zone, dst_zone in zone_pairs:
self.rules.append(FirewallRule(
rule_id=self.rule_counter,
source_zone=src_zone,
source_ip="any",
dest_zone=dst_zone,
dest_ip="any",
protocol="any",
port=0,
action="DENY",
comment=f"默认拒绝: {src_zone} -> {dst_zone}",
))
self.rule_counter += 1
return self.rules
def _get_dpi_profile(self, port):
"""返回适合OT协议端口的DPI检测配置文件。"""
dpi_profiles = {
502: "modbus-inspect(仅允许来自L3的读取FC)",
44818: "enip-inspect",
4840: "opcua-inspect(需要SignAndEncrypt)",
102: "s7comm-inspect",
20000: "dnp3-inspect",
}
return dpi_profiles.get(port, "none")
def generate_migration_plan(self):
"""生成网络分段的分阶段迁移计划。"""
plan = {
"phase_1": {
"name": "DMZ实施(第1-2周)",
"description": "在企业网络和OT网络之间部署DMZ",
"steps": [
"部署DMZ防火墙对(内外两侧)",
"将历史服务器镜像迁移到DMZ",
"在DMZ中配置带MFA的跳板服务器",
"安装数据二极管用于单向历史服务器复制",
"通过DMZ路由企业到OT的流量",
"验证企业通过DMZ对历史数据的访问",
],
"rollback": "移除DMZ防火墙规则,恢复直接路由",
},
"phase_2": {
"name": "L3/L2分段(第3-4周)",
"description": "将运营区(L3)与控制区(L2)分离",
"steps": [
"在OT交换机上创建VLAN 30和VLAN 40",
"在L2和L3之间部署工业防火墙",
"以监控模式(仅记录,不阻断)配置防火墙",
"分析日志1周以验证规则完整性",
"在维护窗口期间切换到执行模式",
"验证所有HMI到PLC以及历史服务器到PLC的通信",
],
"rollback": "还原VLAN分配,将防火墙设置为允许所有",
},
"phase_3": {
"name": "现场设备隔离(第5-6周)",
"description": "将0-1级现场设备与2级监控系统隔离",
"steps": [
"为PLC和现场仪表创建VLAN 20",
"在PLC端口上配置MAC绑定的端口安全",
"应用Modbus功能码过滤(阻止来自L3的写入)",
"在维护窗口期间测试所有控制回路",
"验证从现场到HMI的报警传播",
],
"rollback": "将VLAN 20合并回VLAN 30",
},
"phase_4": {
"name": "SIS隔离(第7-8周)",
"description": "完全隔离安全仪表系统",
"steps": [
"验证SIS在专用VLAN 10上或气隙隔离",
"移除SIS与BPCS之间的所有网络路径",
"为SIS实施专用工程师工作站",
"在SIS EWS上应用USB和可移动介质控制",
"在隔离状态下测试SIS功能",
],
"rollback": "不适用 - SIS隔离不应被撤销",
},
}
return plan
def export_design(self, output_file):
"""导出完整的分段设计。"""
design = {
"vlans": [asdict(v) for v in self.vlans],
"firewall_rules": [asdict(r) for r in self.rules],
"migration_plan": self.generate_migration_plan(),
}
with open(output_file, "w") as f:
json.dump(design, f, indent=2)
print(f"[*] 分段设计已导出到: {output_file}")
print(f" VLAN数量: {len(self.vlans)}")
print(f" 防火墙规则数量: {len(self.rules)}")
return design
if __name__ == "__main__":
if len(sys.argv) < 2:
print("用法: python segmentation_designer.py <baseline.json> [output.json]")
sys.exit(1)
designer = SegmentationDesigner(sys.argv[1])
designer.design_vlans()
designer.generate_firewall_rules_from_baseline()
output = sys.argv[2] if len(sys.argv) > 2 else "segmentation_design.json"
designer.export_design(output)
对工业以太网交换机应用VLAN配置,包括端口安全和未使用端口加固。
# Cisco工业以太网4000/5000系列配置
# 创建与Purdue层级对齐的VLAN
vlan 10
name SIS-SAFETY-L1
vlan 20
name BPCS-FIELD-L01
vlan 30
name BPCS-SUPERVISORY-L2
vlan 40
name SITE-OPS-L3
vlan 50
name OT-DMZ-L35
vlan 999
name QUARANTINE
# PLC接入端口配置端口安全
interface range GigabitEthernet1/0/1-12
description PLC Connections
switchport mode access
switchport access vlan 20
switchport port-security
switchport port-security maximum 1
switchport port-security mac-address sticky
switchport port-security violation shutdown
storm-control broadcast level 10
storm-control multicast level 10
spanning-tree portfast
spanning-tree bpduguard enable
no cdp enable
no lldp transmit
no lldp receive
# HMI接入端口
interface range GigabitEthernet1/0/13-18
description HMI Stations
switchport mode access
switchport access vlan 30
switchport port-security
switchport port-security maximum 1
switchport port-security mac-address sticky
switchport port-security violation restrict
spanning-tree portfast
# 到区域防火墙的中继
interface TenGigabitEthernet1/0/1
description Trunk to OT Zone Firewall
switchport mode trunk
switchport trunk allowed vlan 20,30,40,50
switchport trunk native vlan 999
switchport nonegotiate
# 禁用并隔离所有未使用端口
interface range GigabitEthernet1/0/19-48
description UNUSED - Shutdown
switchport mode access
switchport access vlan 999
shutdown
实施后,验证分段正确阻断未授权的跨区流量,同时允许所有合法操作。
#!/usr/bin/env python3
"""OT网络分段验证器。
运行自动化测试,在分段部署后验证区域隔离、
防火墙规则和协议执行情况。
"""
import json
import socket
import subprocess
import sys
import time
from dataclasses import dataclass, asdict
@dataclass
class ValidationTest:
test_id: str
description: str
source_zone: str
target_ip: str
target_port: int
expected_result: str # "blocked"(被阻断)或"allowed"(被允许)
actual_result: str = ""
status: str = "" # PASS或FAIL
class SegmentationValidator:
"""验证OT网络分段实施情况。"""
def __init__(self):
self.tests = []
self.results = []
def add_test(self, test):
self.tests.append(test)
def run_connectivity_test(self, target_ip, target_port, timeout=3):
"""测试到目标的TCP连通性。"""
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(timeout)
result = sock.connect_ex((target_ip, target_port))
sock.close()
return "reachable" if result == 0 else "blocked"
except (socket.timeout, ConnectionRefusedError):
return "blocked"
except Exception:
return "error"
def run_all_tests(self):
"""执行所有分段验证测试。"""
print("=" * 60)
print("OT分段验证")
print("=" * 60)
passed = 0
failed = 0
for test in self.tests:
actual = self.run_connectivity_test(test.target_ip, test.target_port)
test.actual_result = actual
if actual == test.expected_result:
test.status = "PASS"
passed += 1
else:
test.status = "FAIL"
failed += 1
icon = "[+]" if test.status == "PASS" else "[-]"
print(f" {icon} {test.test_id}: {test.description}")
print(f" 目标: {test.target_ip}:{test.target_port}")
print(f" 预期: {test.expected_result} | 实际: {actual} -> {test.status}")
print(f"\n 结果: {passed} 通过, {failed} 失败, 共 {len(self.tests)} 个测试")
return {"passed": passed, "failed": failed, "total": len(self.tests)}
if __name__ == "__main__":
validator = SegmentationValidator()
# 来自企业区(4级)的测试 - 应被阻止无法到达OT
validator.add_test(ValidationTest(
"SEG-001", "企业无法通过Modbus到达PLC",
"4级", "10.10.20.10", 502, "blocked"))
validator.add_test(ValidationTest(
"SEG-002", "企业无法通过EtherNet/IP到达PLC",
"4级", "10.10.20.10", 44818, "blocked"))
validator.add_test(ValidationTest(
"SEG-003", "企业可以访问DMZ跳板服务器",
"4级", "172.16.50.10", 3389, "allowed"))
validator.add_test(ValidationTest(
"SEG-004", "企业可以访问DMZ历史服务器镜像",
"4级", "172.16.50.20", 443, "allowed"))
# 来自运营区(3级)的测试 - 对控制区的有限访问
validator.add_test(ValidationTest(
"SEG-005", "运营可以通过Modbus读取PLC",
"3级", "10.10.20.10", 502, "allowed"))
validator.add_test(ValidationTest(
"SEG-006", "运营无法访问SIS控制器",
"3级", "10.10.10.10", 1502, "blocked"))
validator.run_all_tests()
| 术语 | 定义 |
|---|---|
| VLAN | 虚拟局域网(Virtual Local Area Network) - 第2层广播域隔离,用于在共享交换机基础设施上分离OT区域 |
| 工业防火墙(Industrial Firewall) | 具备工业协议(Modbus、DNP3、EtherNet/IP、OPC UA)深度包检测能力的防火墙 |
| 数据二极管(Data Diode) | 硬件强制单向网关,从物理上阻止反向数据流,用于OT运营区和DMZ之间 |
| 端口安全(Port Security) | 交换机功能,限制端口上的MAC地址数量并锁定分配,防止未授权设备连接 |
| 中继端口(Trunk Port) | 使用802.1Q标签承载多个VLAN的交换机端口,用于跨区域边界连接交换机和防火墙 |
| DMZ | 企业IT和OT之间的非军事化区 - 所有跨域流量在此终止并接受检测的缓冲区 |
OT网络分段报告
================================
实施日期: YYYY-MM-DD
VLAN架构:
VLAN [ID] - [名称] ([Purdue层级])
子网: [子网/掩码]
设备数量: [数量]
防火墙规则:
[区域A] -> [区域B]: [允许/拒绝数量]
验证结果:
通过的测试: [N]/[总数]
关键失败: [N]