Performs OT vulnerability assessments using Claroty xDome: asset discovery via traffic analysis, risk scoring, CVE/ICS-CERT correlation, and remediation prioritization considering operational impact.
npx claudepluginhub killvxk/cybersecurity-skills-zhThis skill uses the workspace's default tool permissions.
- 按照IEC 62443或NERC CIP要求进行定期OT漏洞评估时
Performs OT vulnerability assessments using Claroty xDome for asset discovery, risk scoring, CVE/ICS-CERT correlation, and remediation prioritization in ICS environments.
Performs OT vulnerability assessments using Claroty xDome for asset discovery, risk scoring, vulnerability correlation with CVE/ICS-CERT data, and remediation prioritization. Useful for IEC 62443 compliance and limited maintenance windows.
Performs safe vulnerability scanning in OT/ICS environments using passive monitoring, native protocol queries, and controlled Tenable OT Security active scans without disrupting industrial processes or crashing legacy controllers.
Share bugs, ideas, or general feedback.
不适用于对PLC和安全系统进行主动漏洞扫描(参见performing-ot-network-security-assessment的被动方法)、仅IT漏洞管理(使用标准漏洞扫描器),或渗透测试(参见performing-ics-penetration-testing)。
配置Claroty执行被动和主动安全发现,建立包含固件版本的完整资产清单用于漏洞关联。
#!/usr/bin/env python3
"""OT漏洞评估管理器。
将OT资产清单与ICS-CERT公告和CVE数据进行关联,
以识别、优先排序和跟踪OT漏洞。设计用于
与Claroty xDome API集成或独立运行。
"""
import json
import sys
from collections import defaultdict
from dataclasses import dataclass, field, asdict
from datetime import datetime
import requests
@dataclass
class OTAsset:
asset_id: str
name: str
vendor: str
model: str
firmware_version: str
asset_type: str # PLC, HMI, RTU, historian, switch等
purdue_level: str
ip_address: str
protocol: str
criticality: str # critical, high, medium, low
zone: str
@dataclass
class OTVulnerability:
vuln_id: str
cve_id: str
title: str
severity: str # critical, high, medium, low
cvss_score: float
affected_vendor: str
affected_product: str
affected_versions: str
description: str
ics_cert_advisory: str = ""
remediation: str = ""
patch_available: bool = False
compensating_controls: str = ""
@dataclass
class RiskAssessment:
asset: OTAsset
vulnerability: OTVulnerability
risk_score: float = 0.0
risk_rating: str = ""
exploitability: str = ""
operational_impact: str = ""
compensating_controls: list = field(default_factory=list)
remediation_priority: int = 0
class OTVulnerabilityAssessment:
"""OT漏洞评估和优先级排序引擎。"""
def __init__(self):
self.assets = []
self.vulnerabilities = []
self.risk_assessments = []
def load_assets(self, assets_data):
"""从Claroty导出或手动清单加载资产。"""
for a in assets_data:
self.assets.append(OTAsset(**a))
print(f"[*] 已加载 {len(self.assets)} 个OT资产")
def fetch_ics_advisories(self):
"""从CISA获取最新ICS-CERT公告。"""
print("[*] 从CISA获取ICS-CERT公告...")
try:
# CISA已知被利用漏洞目录
url = "https://www.cisa.gov/sites/default/files/feeds/known_exploited_vulnerabilities.json"
resp = requests.get(url, timeout=30)
resp.raise_for_status()
data = resp.json()
ics_vulns = []
for vuln in data.get("vulnerabilities", []):
# 过滤ICS相关供应商
ics_vendors = [
"siemens", "schneider", "rockwell", "honeywell",
"abb", "ge", "emerson", "yokogawa", "omron",
"mitsubishi", "phoenix", "moxa", "advantech",
]
vendor = vuln.get("vendorProject", "").lower()
if any(v in vendor for v in ics_vendors):
ics_vulns.append(vuln)
print(f" 发现 {len(ics_vulns)} 个ICS相关已知利用漏洞")
return ics_vulns
except Exception as e:
print(f"[警告] 无法获取公告: {e}")
return []
def correlate_vulnerabilities(self):
"""基于供应商/型号/固件将漏洞与资产匹配。"""
print("[*] 将漏洞与资产关联...")
for asset in self.assets:
for vuln in self.vulnerabilities:
if (vuln.affected_vendor.lower() in asset.vendor.lower() and
vuln.affected_product.lower() in asset.model.lower()):
# 如有指定则检查固件版本
ra = RiskAssessment(asset=asset, vulnerability=vuln)
self._calculate_risk_score(ra)
self.risk_assessments.append(ra)
print(f" 已关联 {len(self.risk_assessments)} 个资产-漏洞对")
def _calculate_risk_score(self, ra):
"""计算考虑运营影响的OT特定风险评分。"""
# 基于CVSS的基础评分
base = ra.vulnerability.cvss_score
# 基于资产功能的关键性乘数
criticality_weights = {
"critical": 1.5, # SIS、安全系统
"high": 1.3, # PLC、主要控制
"medium": 1.0, # HMI、历史服务器
"low": 0.7, # 非关键支持系统
}
criticality = criticality_weights.get(ra.asset.criticality, 1.0)
# Purdue级别接近度因子(级别越低 = 风险越高)
level_weights = {
"Level 0-1": 1.5,
"Level 2": 1.3,
"Level 3": 1.0,
"Level 3.5": 0.8,
"Level 4": 0.6,
}
level_factor = level_weights.get(ra.asset.purdue_level, 1.0)
# 如有补偿控制措施则降低网络暴露风险
comp_reduction = 0.8 if ra.compensating_controls else 1.0
ra.risk_score = round(base * criticality * level_factor * comp_reduction, 1)
ra.risk_score = min(ra.risk_score, 10.0)
if ra.risk_score >= 9.0:
ra.risk_rating = "critical"
ra.remediation_priority = 1
elif ra.risk_score >= 7.0:
ra.risk_rating = "high"
ra.remediation_priority = 2
elif ra.risk_score >= 4.0:
ra.risk_rating = "medium"
ra.remediation_priority = 3
else:
ra.risk_rating = "low"
ra.remediation_priority = 4
def generate_report(self):
"""生成漏洞评估报告。"""
# 按风险评分降序排列
sorted_ra = sorted(self.risk_assessments, key=lambda x: -x.risk_score)
report = []
report.append("=" * 70)
report.append("OT漏洞评估报告")
report.append(f"日期: {datetime.now().isoformat()}")
report.append(f"资产: {len(self.assets)} | 漏洞: {len(self.vulnerabilities)}")
report.append(f"风险评估: {len(self.risk_assessments)}")
report.append("=" * 70)
for sev in ["critical", "high", "medium", "low"]:
findings = [ra for ra in sorted_ra if ra.risk_rating == sev]
if findings:
report.append(f"\n--- {sev.upper()} 风险 ({len(findings)}) ---")
for ra in findings[:10]:
report.append(f"\n 风险评分: {ra.risk_score}/10.0")
report.append(f" 资产: {ra.asset.name} ({ra.asset.vendor} {ra.asset.model})")
report.append(f" 区域: {ra.asset.zone} ({ra.asset.purdue_level})")
report.append(f" CVE: {ra.vulnerability.cve_id} (CVSS: {ra.vulnerability.cvss_score})")
report.append(f" 标题: {ra.vulnerability.title}")
if ra.vulnerability.patch_available:
report.append(f" 补丁: 可用 - 安排在下次维护窗口")
else:
report.append(f" 补丁: 不可用 - 应用补偿控制措施")
return "\n".join(report)
def export_json(self, output_file):
"""将评估导出为JSON。"""
data = {
"assessment_date": datetime.now().isoformat(),
"asset_count": len(self.assets),
"vulnerability_count": len(self.vulnerabilities),
"risk_assessments": [
{
"asset_name": ra.asset.name,
"asset_ip": ra.asset.ip_address,
"cve": ra.vulnerability.cve_id,
"risk_score": ra.risk_score,
"risk_rating": ra.risk_rating,
"priority": ra.remediation_priority,
}
for ra in sorted(self.risk_assessments, key=lambda x: -x.risk_score)
],
}
with open(output_file, "w") as f:
json.dump(data, f, indent=2)
if __name__ == "__main__":
assessment = OTVulnerabilityAssessment()
advisories = assessment.fetch_ics_advisories()
print(f"从CISA KEV目录获取了 {len(advisories)} 条ICS公告")
| 术语 | 定义 |
|---|---|
| Claroty xDome | 为OT/IoT环境提供资产发现、漏洞管理和威胁检测的网络物理系统保护平台 |
| 被动发现(Passive Discovery) | 通过分析网络流量识别OT资产而不发送任何数据包,对生产环境安全 |
| 安全主动查询(Safe Active Query) | 以安全速率使用原生工业协议查询OT设备,收集详细资产信息而不干扰运营 |
| OT风险评分 | 结合CVSS基础评分、资产关键性、Purdue级别和补偿控制措施的OT适用风险评级 |
| ICS-CERT公告 | CISA发布的工业控制系统漏洞安全公告,包含供应商特定修复指导 |
| 虚拟打补丁(Virtual Patching) | 在无法立即应用固件补丁时,部署IPS/防火墙规则阻止已知漏洞利用 |
OT漏洞评估报告
=====================================
工具: Claroty xDome / 手动评估
日期: YYYY-MM-DD
已扫描资产: [N]
风险摘要:
严重风险: [N]个漏洞影响[N]个资产
高风险: [N]个漏洞影响[N]个资产
中风险: [N]个漏洞影响[N]个资产
低风险: [N]个漏洞影响[N]个资产
主要风险:
[风险评分] [CVE-ID] 影响 [资产名称] ([区域])
修复: [补丁/补偿控制措施]
时限: [下次维护窗口/立即]