Analyzes S7comm and S7CommPlus protocol traffic in Siemens SIMATIC S7 PLCs to identify vulnerabilities like replay attacks, integrity bypasses, unauthorized CPU stops, and program downloads for S7-300/400/1200/1500 controllers.
npx claudepluginhub killvxk/cybersecurity-skills-zhThis skill uses the workspace's default tool permissions.
- 评估西门子SIMATIC S7 PLC环境的安全态势时
Analyzes S7comm/S7CommPlus traffic for Siemens SIMATIC S7 PLC vulnerabilities including replay attacks, integrity bypass, unauthorized CPU stops, and program manipulation.
Analyzes Siemens S7comm and S7CommPlus protocols in SIMATIC S7 PLCs for vulnerabilities like replay attacks, integrity bypass, unauthorized CPU stops, and program download manipulation.
Analyzes PLC firmware for security vulnerabilities like hard-coded credentials, backdoors, insecure updates, and debug interfaces from Siemens S7, Allen-Bradley, Schneider Modicon via extraction, static/dynamic analysis, and baseline comparison.
Share bugs, ideas, or general feedback.
不适用于未经授权和测试计划扫描生产西门子PLC(可能导致控制器崩溃)、非西门子协议分析(Modbus请参见detecting-modbus-command-injection-attacks),或在生产环境中修改PLC程序。
#!/usr/bin/env python3
"""S7comm协议安全分析器。
分析西门子S7comm协议流量,识别安全
漏洞、未授权访问模式以及针对SIMATIC S7 PLC的
潜在攻击指示符。
"""
import struct
import sys
import json
from collections import defaultdict
from datetime import datetime
from typing import Dict, List, Optional
try:
from scapy.all import rdpcap, IP, TCP
except ImportError:
print("请安装scapy: pip install scapy")
sys.exit(1)
# S7comm ROSCTR(PDU类型)定义
S7_ROSCTR = {
0x01: "Job(请求)",
0x02: "Ack",
0x03: "Ack_Data(响应)",
0x07: "Userdata",
}
# S7comm功能码
S7_FUNCTIONS = {
0x00: "CPU服务",
0x04: "读取变量",
0x05: "写入变量",
0x1A: "请求下载(程序)",
0x1B: "下载块",
0x1C: "下载结束",
0x1D: "开始上传(读取程序)",
0x1E: "上传块",
0x1F: "上传结束",
0x28: "PI服务(启动/停止CPU)",
0x29: "PLC停止",
0xF0: "建立通信",
}
# 安全相关的关键操作
CRITICAL_FUNCTIONS = {0x1A, 0x1B, 0x1C, 0x28, 0x29, 0x05}
PROGRAM_FUNCTIONS = {0x1A, 0x1B, 0x1C, 0x1D, 0x1E, 0x1F}
class S7commSecurityFinding:
"""表示S7comm流量中的安全发现。"""
def __init__(self, severity: str, finding_type: str, src_ip: str,
dst_ip: str, function: str, description: str,
cve: str = "", recommendation: str = ""):
self.timestamp = datetime.now().isoformat()
self.severity = severity
self.finding_type = finding_type
self.src_ip = src_ip
self.dst_ip = dst_ip
self.function = function
self.description = description
self.cve = cve
self.recommendation = recommendation
class S7commAnalyzer:
"""分析S7comm协议流量中的安全漏洞。"""
def __init__(self):
self.findings: List[S7commSecurityFinding] = []
self.sessions: Dict[str, dict] = defaultdict(lambda: {
"packets": 0,
"functions_seen": set(),
"writes": 0,
"program_downloads": 0,
"cpu_commands": 0,
"first_seen": None,
"last_seen": None,
})
self.authorized_engineering: set = set()
self.packet_count = 0
def set_authorized_stations(self, ips: List[str]):
"""设置授权工程师工作站IP列表。"""
self.authorized_engineering = set(ips)
def parse_s7comm(self, payload: bytes) -> Optional[dict]:
"""从TCP有效载荷解析S7comm协议数据。"""
# TPKT报头: version(1) + reserved(1) + length(2)
if len(payload) < 4:
return None
tpkt_version = payload[0]
if tpkt_version != 3:
return None
tpkt_length = struct.unpack(">H", payload[2:4])[0]
# COTP报头紧跟TPKT之后
if len(payload) < 7:
return None
cotp_length = payload[4]
cotp_type = payload[5]
# S7comm在COTP之后开始
s7_offset = 4 + 1 + cotp_length
if len(payload) < s7_offset + 10:
return None
# S7comm报头
protocol_id = payload[s7_offset]
if protocol_id != 0x32: # S7comm魔术字节
return None
rosctr = payload[s7_offset + 1]
redundancy = struct.unpack(">H", payload[s7_offset + 2:s7_offset + 4])[0]
pdu_ref = struct.unpack(">H", payload[s7_offset + 4:s7_offset + 6])[0]
param_length = struct.unpack(">H", payload[s7_offset + 6:s7_offset + 8])[0]
data_length = struct.unpack(">H", payload[s7_offset + 8:s7_offset + 10])[0]
result = {
"rosctr": rosctr,
"rosctr_name": S7_ROSCTR.get(rosctr, f"未知 (0x{rosctr:02x})"),
"pdu_ref": pdu_ref,
"param_length": param_length,
"data_length": data_length,
}
# 从参数中解析功能码
param_offset = s7_offset + 10
if rosctr in (0x01, 0x03) and param_length > 0 and len(payload) > param_offset:
func_code = payload[param_offset]
result["function_code"] = func_code
result["function_name"] = S7_FUNCTIONS.get(func_code, f"未知 (0x{func_code:02x})")
return result
def analyze_packet(self, pkt):
"""分析数据包中的S7comm安全问题。"""
self.packet_count += 1
if not pkt.haslayer(IP) or not pkt.haslayer(TCP):
return
tcp = pkt[TCP]
if tcp.dport != 102 and tcp.sport != 102:
return
payload = bytes(tcp.payload)
if not payload:
return
s7 = self.parse_s7comm(payload)
if not s7:
return
src_ip = pkt[IP].src
dst_ip = pkt[IP].dst
session_key = f"{src_ip}->{dst_ip}"
session = self.sessions[session_key]
session["packets"] += 1
if session["first_seen"] is None:
session["first_seen"] = float(pkt.time)
session["last_seen"] = float(pkt.time)
func_code = s7.get("function_code")
if func_code is not None:
session["functions_seen"].add(func_code)
# 检查1:未授权工程师工作站
if tcp.dport == 102 and func_code in CRITICAL_FUNCTIONS:
if self.authorized_engineering and src_ip not in self.authorized_engineering:
self.findings.append(S7commSecurityFinding(
severity="CRITICAL",
finding_type="UNAUTHORIZED_ENGINEERING_ACCESS",
src_ip=src_ip, dst_ip=dst_ip,
function=s7.get("function_name", "未知"),
description=(
f"来自未授权源 {src_ip} 的关键S7comm操作。"
f"功能: {s7.get('function_name')}。只有授权的TIA Portal "
f"工作站才能发出这些命令。"
),
recommendation="在工业防火墙处阻止未授权源。调查源主机是否被攻陷。",
))
# 检查2:CPU停止命令
if func_code == 0x29:
session["cpu_commands"] += 1
self.findings.append(S7commSecurityFinding(
severity="CRITICAL",
finding_type="CPU_STOP_COMMAND",
src_ip=src_ip, dst_ip=dst_ip,
function="PLC CPU停止 (0x29)",
description=f"已向 {dst_ip} 处的PLC发送CPU停止命令。这会停止PLC程序执行。",
cve="MITRE T0881 — 服务停止",
recommendation="验证这是否是授权的维护操作。如果不是,立即隔离源。",
))
# 检查3:程序下载
if func_code in (0x1A, 0x1B, 0x1C):
session["program_downloads"] += 1
self.findings.append(S7commSecurityFinding(
severity="CRITICAL",
finding_type="PROGRAM_DOWNLOAD",
src_ip=src_ip, dst_ip=dst_ip,
function=s7.get("function_name", "下载"),
description=(
f"向 {dst_ip} 执行PLC程序下载操作。"
f"这修改了PLC上运行的控制逻辑。"
),
cve="MITRE T0843 — 程序下载",
recommendation="对照变更管理记录验证。与已知良好的程序备份进行比对。",
))
# 检查4:写入变量操作
if func_code == 0x05:
session["writes"] += 1
# 检查5:程序上传(PLC代码泄露)
if func_code in (0x1D, 0x1E, 0x1F):
self.findings.append(S7commSecurityFinding(
severity="HIGH",
finding_type="PROGRAM_UPLOAD_EXFILTRATION",
src_ip=src_ip, dst_ip=dst_ip,
function=s7.get("function_name", "上传"),
description=f"从 {dst_ip} 进行PLC程序上传(读取)。源 {src_ip} 正在提取PLC控制逻辑。",
recommendation="验证这是否是授权的维护操作。未授权的上传表示侦察行为。",
))
def check_known_vulnerabilities(self):
"""基于观察到的行为检查已知的西门子S7漏洞。"""
vuln_checks = [
{
"name": "S7-300/400重放攻击漏洞",
"cve": "CVE-2019-13945",
"description": "S7-300/400 PLC缺乏对S7comm会话的完整性检查,允许重放攻击",
"affected": "S7-300、S7-400(所有固件版本)",
"severity": "HIGH",
},
{
"name": "S7CommPlus完整性绕过",
"cve": "研究发现(Biham等人)",
"description": "S7CommPlusV3完整性机制可被能够观察到一个合法会话的攻击者绕过",
"affected": "S7-1200(< V4.5)、S7-1500(< V2.9)",
"severity": "HIGH",
},
{
"name": "不可修补的硬件信任根",
"cve": "CVE-2022-38773",
"description": "硬件漏洞允许绕过受保护的引导程序并持久修改固件",
"affected": "S7-1500(特定硬件版本)",
"severity": "CRITICAL",
},
{
"name": "通过端口102的远程DoS",
"cve": "CVE-2019-10929",
"description": "TCP端口102上的特制数据包可以远程崩溃S7 PLC",
"affected": "S7-300、S7-400、S7-1200、S7-1500(特定固件)",
"severity": "HIGH",
},
]
return vuln_checks
def generate_report(self):
"""生成综合S7comm安全分析报告。"""
print(f"\n{'='*70}")
print("S7COMM协议安全分析报告")
print(f"{'='*70}")
print(f"分析时间: {datetime.now().isoformat()}")
print(f"已分析数据包: {self.packet_count}")
print(f"S7comm会话: {len(self.sessions)}")
print(f"安全发现: {len(self.findings)}")
print(f"\n--- 会话摘要 ---")
for key, session in self.sessions.items():
funcs = [S7_FUNCTIONS.get(f, f"0x{f:02x}") for f in session["functions_seen"]]
print(f"\n {key}")
print(f" 数据包: {session['packets']}")
print(f" 功能: {', '.join(funcs)}")
print(f" 写入次数: {session['writes']}")
print(f" 程序下载次数: {session['program_downloads']}")
print(f" CPU命令次数: {session['cpu_commands']}")
if self.findings:
print(f"\n--- 安全发现 ---")
for f in self.findings:
print(f"\n [{f.severity}] {f.finding_type}")
print(f" 源: {f.src_ip} -> {f.dst_ip}")
print(f" 功能: {f.function}")
print(f" 详情: {f.description}")
if f.cve:
print(f" 参考: {f.cve}")
if f.recommendation:
print(f" 措施: {f.recommendation}")
print(f"\n--- 已知漏洞评估 ---")
for vuln in self.check_known_vulnerabilities():
print(f"\n [{vuln['severity']}] {vuln['name']}")
print(f" CVE: {vuln['cve']}")
print(f" 受影响: {vuln['affected']}")
print(f" 详情: {vuln['description']}")
if __name__ == "__main__":
analyzer = S7commAnalyzer()
analyzer.set_authorized_stations(["10.10.2.50", "10.10.2.51"])
if len(sys.argv) >= 2:
print(f"[*] 正在分析捕获文件: {sys.argv[1]}")
packets = rdpcap(sys.argv[1])
for pkt in packets:
analyzer.analyze_packet(pkt)
analyzer.generate_report()
else:
print("用法: python s7comm_analyzer.py <capture.pcap>")
print(" 分析S7comm流量中的安全漏洞")
| 术语 | 定义 |
|---|---|
| S7comm | 西门子专有协议,用于通过TCP端口102与SIMATIC S7 PLC通信,基于COTP/TPKT协议栈 |
| S7CommPlus | S7comm的增强版本,用于S7-1200/1500,具有完整性保护机制 |
| ROSCTR | S7comm报头中的远程操作服务控制字段,指示PDU类型(Job、Ack、Ack_Data、Userdata) |
| TIA Portal | 全集成自动化门户(Totally Integrated Automation Portal)——西门子用于编程S7 PLC的工程软件 |
| CPU停止(0x29) | 停止PLC程序执行的S7comm功能,是一种关键的拒绝服务操作 |
| 程序下载(0x1A) | 启动向PLC传输新控制逻辑的S7comm功能,代表最高风险操作 |
背景:Dragos传感器对来自非授权TIA Portal工程师工作站IP地址的S7comm程序下载流量发出警报。
方法:
注意事项:S7-300/400 PLC没有密码学完整性保护——任何能够到达TCP端口102的设备都可以发送命令。不要仅依赖PLC密码,因为它们在S7comm中以明文传输(非S7CommPlus)。网络分段是主要防御手段。
S7COMM安全分析报告
===================================
日期: YYYY-MM-DD
范围: [已分析的网段]
会话清单:
工程师工作站: [数量和IP]
通信中的PLC: [数量和IP]
未授权源: [数量]
关键发现:
CPU停止命令: [数量]
程序下载: [来自未授权源的数量]
重放攻击可能性: [评估]
漏洞评估:
S7-300/400(无完整性保护): [受影响PLC数量]
S7-1200/1500(S7CommPlus): [固件评估]
适用的已知CVE: [列表]
建议:
1. [最高优先级修复措施]
2. [网络分段改进]
3. [监控增强]