Performs ICS/OT asset discovery using Claroty xDome via passive monitoring, Claroty Edge queries, and API for visibility into PLCs, RTUs, HMIs, and network infrastructure across Purdue model levels.
npx claudepluginhub killvxk/cybersecurity-skills-zhThis skill uses the workspace's default tool permissions.
- 对资产未知或记录不完整的OT环境进行初始可见性建立时
Performs ICS/OT asset discovery using Claroty xDome via passive monitoring, Edge active queries, and API for visibility into PLCs, RTUs, HMIs across Purdue Model levels. Useful for OT inventory and risk assessments.
Performs ICS/OT asset discovery using Claroty xDome with passive monitoring, Edge active queries, and API for visibility into PLCs, RTUs, HMIs across Purdue levels.
Performs OT vulnerability assessments using Claroty xDome: asset discovery via traffic analysis, risk scoring, CVE/ICS-CERT correlation, and remediation prioritization considering operational impact.
Share bugs, ideas, or general feedback.
不适用于仅IT资产发现(使用Nessus或Qualys等工具)、未经供应商批准对敏感PLC网络进行主动扫描,或未部署Claroty的环境(参见implementing-ot-network-traffic-analysis-with-nozomi)。
在SPAN端口上部署Claroty传感器,被动观察所有OT网络流量而不影响运营。
#!/usr/bin/env python3
"""Claroty xDome资产发现配置和报告工具。
自动化被动监控传感器的配置,并从
Claroty xDome API生成资产清单报告。
"""
import json
import sys
import csv
from datetime import datetime
from typing import Optional
try:
import requests
except ImportError:
print("安装requests: pip install requests")
sys.exit(1)
class ClarotyAssetDiscovery:
"""通过Claroty xDome API进行ICS资产发现的接口。"""
def __init__(self, base_url: str, api_token: str, verify_ssl: bool = True):
self.base_url = base_url.rstrip("/")
self.session = requests.Session()
self.session.headers.update({
"Authorization": f"Bearer {api_token}",
"Content-Type": "application/json",
"Accept": "application/json",
})
self.session.verify = verify_ssl
def get_sites(self):
"""获取所有被监控的站点。"""
resp = self.session.get(f"{self.base_url}/api/v1/sites")
resp.raise_for_status()
return resp.json().get("sites", [])
def get_assets(self, site_id: Optional[str] = None, asset_type: Optional[str] = None):
"""获取已发现的资产(可选过滤)。
asset_type: PLC, RTU, HMI, DCS, Engineering_Workstation,
Historian, Network_Device, IO_Module, Safety_Controller
"""
params = {}
if site_id:
params["site_id"] = site_id
if asset_type:
params["type"] = asset_type
resp = self.session.get(f"{self.base_url}/api/v1/assets", params=params)
resp.raise_for_status()
return resp.json().get("assets", [])
def get_asset_detail(self, asset_id: str):
"""获取详细资产信息,包括固件、模块和CVE。"""
resp = self.session.get(f"{self.base_url}/api/v1/assets/{asset_id}")
resp.raise_for_status()
return resp.json()
def get_communication_map(self, site_id: str):
"""获取资产间的通信关系。"""
resp = self.session.get(
f"{self.base_url}/api/v1/sites/{site_id}/communications"
)
resp.raise_for_status()
return resp.json().get("communications", [])
def get_vulnerabilities(self, site_id: Optional[str] = None, severity: str = "critical"):
"""获取已发现资产的漏洞。"""
params = {"min_severity": severity}
if site_id:
params["site_id"] = site_id
resp = self.session.get(f"{self.base_url}/api/v1/vulnerabilities", params=params)
resp.raise_for_status()
return resp.json().get("vulnerabilities", [])
def export_asset_inventory(self, output_file: str, site_id: Optional[str] = None):
"""将完整资产清单导出为CSV用于合规报告。"""
assets = self.get_assets(site_id=site_id)
if not assets:
print("[!] 未找到资产")
return
fieldnames = [
"asset_id", "name", "type", "vendor", "model", "firmware_version",
"ip_address", "mac_address", "serial_number", "purdue_level",
"zone", "protocol", "first_seen", "last_seen", "risk_score",
"cve_count", "site_name",
]
with open(output_file, "w", newline="") as f:
writer = csv.DictWriter(f, fieldnames=fieldnames)
writer.writeheader()
for asset in assets:
writer.writerow({
"asset_id": asset.get("id", ""),
"name": asset.get("name", "未知"),
"type": asset.get("type", ""),
"vendor": asset.get("vendor", ""),
"model": asset.get("model", ""),
"firmware_version": asset.get("firmware_version", ""),
"ip_address": asset.get("ip_address", ""),
"mac_address": asset.get("mac_address", ""),
"serial_number": asset.get("serial_number", ""),
"purdue_level": asset.get("purdue_level", ""),
"zone": asset.get("zone", ""),
"protocol": ", ".join(asset.get("protocols", [])),
"first_seen": asset.get("first_seen", ""),
"last_seen": asset.get("last_seen", ""),
"risk_score": asset.get("risk_score", 0),
"cve_count": asset.get("cve_count", 0),
"site_name": asset.get("site_name", ""),
})
print(f"[+] 已导出 {len(assets)} 个资产到 {output_file}")
def generate_purdue_level_report(self, site_id: str):
"""按Purdue模型级别生成资产分布报告。"""
assets = self.get_assets(site_id=site_id)
levels = {0: [], 1: [], 2: [], 3: [], 3.5: [], 4: [], 5: []}
for asset in assets:
level = asset.get("purdue_level", -1)
if level in levels:
levels[level].append(asset)
print(f"\n{'='*65}")
print("PURDUE模型资产分布报告")
print(f"{'='*65}")
print(f"站点: {site_id}")
print(f"已发现资产总数: {len(assets)}")
print(f"报告生成时间: {datetime.now().isoformat()}")
print(f"{'-'*65}")
level_names = {
0: "0级 - 物理过程(传感器/执行器)",
1: "1级 - 基本控制(PLC/RTU)",
2: "2级 - 监控控制(HMI/SCADA)",
3: "3级 - 站点运营(历史服务器/MES)",
3.5: "3.5级 - IT/OT DMZ",
4: "4级 - 企业IT",
5: "5级 - 企业网络/互联网",
}
for level, name in level_names.items():
device_list = levels.get(level, [])
print(f"\n {name}")
print(f" 数量: {len(device_list)}")
if device_list:
vendors = set(a.get("vendor", "未知") for a in device_list)
types = set(a.get("type", "未知") for a in device_list)
print(f" 供应商: {', '.join(vendors)}")
print(f" 类型: {', '.join(types)}")
high_risk = [a for a in device_list if a.get("risk_score", 0) >= 7]
if high_risk:
print(f" 高风险资产: {len(high_risk)}")
for a in high_risk[:5]:
print(f" - {a['name']} (风险: {a.get('risk_score')})")
if __name__ == "__main__":
discovery = ClarotyAssetDiscovery(
base_url="https://your-claroty-instance.claroty.cloud",
api_token="your-api-token-here",
verify_ssl=True,
)
print("[*] 获取站点...")
sites = discovery.get_sites()
for site in sites:
print(f" 站点: {site['name']} (ID: {site['id']})")
if sites:
site_id = sites[0]["id"]
print(f"\n[*] 为 {sites[0]['name']} 生成Purdue级别报告...")
discovery.generate_purdue_level_report(site_id)
print(f"\n[*] 导出资产清单...")
discovery.export_asset_inventory(
f"asset_inventory_{datetime.now().strftime('%Y%m%d')}.csv",
site_id=site_id,
)
print(f"\n[*] 检查严重漏洞...")
vulns = discovery.get_vulnerabilities(site_id=site_id, severity="critical")
print(f" 严重漏洞: {len(vulns)}")
for v in vulns[:10]:
print(f" - {v.get('cve_id')}: {v.get('description', '')[:80]}")
Claroty Edge使用原生工业协议(而非IT扫描)对OT设备执行安全、有针对性的查询,从被动监控单独无法完全识别的设备中提取详细资产信息。
# Claroty Edge主动发现配置
# 使用原生工业协议进行安全主动查询
edge_configuration:
deployment_mode: "on-premises"
collection_schedule:
frequency: "weekly"
maintenance_window: "周日 02:00-06:00"
max_concurrent_queries: 5
protocol_queries:
siemens_s7:
enabled: true
target_subnets: ["10.10.1.0/24", "10.10.2.0/24"]
ports: [102]
query_type: "SZL_read"
information_collected:
- "模块标识"
- "固件版本"
- "硬件配置"
- "保护级别"
rockwell_cip:
enabled: true
target_subnets: ["10.10.3.0/24"]
ports: [44818]
query_type: "CIP_identity"
information_collected:
- "产品名称和版本"
- "序列号"
- "设备类型"
- "供应商ID"
modbus:
enabled: true
target_subnets: ["10.10.4.0/24"]
ports: [502]
query_type: "read_device_identification"
function_code: 43
information_collected:
- "供应商名称"
- "产品代码"
- "固件版本"
bacnet:
enabled: true
target_subnets: ["10.10.5.0/24"]
ports: [47808]
query_type: "who_is"
information_collected:
- "设备名称"
- "供应商标识符"
- "型号名称"
- "应用软件版本"
safety_controls:
excluded_subnets: ["10.10.100.0/24"] # SIS网络 - 永不主动扫描
rate_limiting: true
max_packets_per_second: 10
timeout_seconds: 5
retry_count: 1
abort_on_device_error: true
将已发现的资产与已知清单进行交叉比对,并用漏洞数据进行丰富。
#!/usr/bin/env python3
"""资产验证和丰富工具。
将Claroty发现结果与现有CMDB进行交叉比对,
并用NVD漏洞数据进行丰富。
"""
import json
import csv
import sys
from datetime import datetime
try:
import requests
except ImportError:
print("安装requests: pip install requests")
sys.exit(1)
class AssetValidator:
"""验证和丰富OT资产清单。"""
def __init__(self, inventory_file: str):
self.discovered_assets = []
self.load_inventory(inventory_file)
self.discrepancies = []
def load_inventory(self, filepath: str):
"""加载Claroty发现的资产清单。"""
with open(filepath, "r") as f:
reader = csv.DictReader(f)
self.discovered_assets = list(reader)
print(f"[*] 已加载 {len(self.discovered_assets)} 个已发现资产")
def compare_with_cmdb(self, cmdb_file: str):
"""将已发现资产与CMDB记录进行比较。"""
with open(cmdb_file, "r") as f:
cmdb_assets = {row["ip_address"]: row for row in csv.DictReader(f)}
discovered_ips = {a["ip_address"] for a in self.discovered_assets if a["ip_address"]}
cmdb_ips = set(cmdb_assets.keys())
shadow_devices = discovered_ips - cmdb_ips
missing_devices = cmdb_ips - discovered_ips
print(f"\n{'='*60}")
print("资产清单验证报告")
print(f"{'='*60}")
print(f"已发现资产: {len(discovered_ips)}")
print(f"CMDB记录: {len(cmdb_ips)}")
print(f"影子OT设备(不在CMDB中): {len(shadow_devices)}")
print(f"缺失设备(在CMDB中但未发现): {len(missing_devices)}")
if shadow_devices:
print(f"\n 影子设备(未授权/未记录):")
for ip in sorted(shadow_devices):
asset = next((a for a in self.discovered_assets if a["ip_address"] == ip), {})
print(f" - {ip} | {asset.get('vendor', '未知')} {asset.get('model', '')} | 类型: {asset.get('type', '未知')}")
self.discrepancies.append({
"type": "SHADOW_DEVICE",
"severity": "HIGH",
"ip": ip,
"detail": f"来自{asset.get('vendor', '未知供应商')}的未记录{asset.get('type', '设备')}",
})
if missing_devices:
print(f"\n 缺失设备(预期存在但未发现):")
for ip in sorted(missing_devices):
cmdb = cmdb_assets[ip]
print(f" - {ip} | {cmdb.get('name', '未知')} | CMDB最后更新: {cmdb.get('last_updated', 'N/A')}")
self.discrepancies.append({
"type": "MISSING_DEVICE",
"severity": "MEDIUM",
"ip": ip,
"detail": f"CMDB资产 {cmdb.get('name', ip)} 在网络上未发现",
})
def check_firmware_vulnerabilities(self, asset):
"""检查NVD中匹配资产固件的已知漏洞。"""
vendor = asset.get("vendor", "").lower()
model = asset.get("model", "").lower()
firmware = asset.get("firmware_version", "")
if not vendor or not model:
return []
search_term = f"{vendor} {model}"
try:
resp = requests.get(
"https://services.nvd.nist.gov/rest/json/cves/2.0",
params={"keywordSearch": search_term, "resultsPerPage": 10},
timeout=15,
)
if resp.status_code == 200:
data = resp.json()
return data.get("vulnerabilities", [])
except requests.RequestException:
pass
return []
def generate_risk_summary(self):
"""生成按风险优先级排序的发现摘要。"""
print(f"\n{'='*60}")
print("风险摘要")
print(f"{'='*60}")
high_risk = [a for a in self.discovered_assets if float(a.get("risk_score", 0)) >= 7]
end_of_life = [a for a in self.discovered_assets if a.get("firmware_version", "").startswith("v1.")]
no_encryption = [a for a in self.discovered_assets if "modbus" in a.get("protocol", "").lower()]
print(f" 高风险资产(评分 >= 7): {len(high_risk)}")
print(f" 可能达到生命周期终止的固件: {len(end_of_life)}")
print(f" 使用未加密协议的资产: {len(no_encryption)}")
print(f" 清单差异数量: {len(self.discrepancies)}")
if __name__ == "__main__":
if len(sys.argv) < 2:
print("用法: python validate_assets.py <claroty_export.csv> [cmdb_export.csv]")
sys.exit(1)
validator = AssetValidator(sys.argv[1])
if len(sys.argv) >= 3:
validator.compare_with_cmdb(sys.argv[2])
validator.generate_risk_summary()
| 术语 | 定义 |
|---|---|
| 被动监控(Passive Monitoring) | 通过SPAN/TAP观察镜像网络流量而不注入数据包,对所有OT设备安全 |
| 主动查询(Active Querying) | 发送原生协议请求以提取详细设备信息;需要仔细规划 |
| Claroty Edge | Claroty的安全主动发现采集器,使用原生工业协议而非IT扫描 |
| Purdue级别(Purdue Level) | 工业网络资产的层次化分类,从0级(物理过程)到5级(企业) |
| 影子OT设备(Shadow OT Device) | 连接到OT网络但未在资产管理系统中记录的资产 |
| xDome | Claroty的基于SaaS的网络物理系统保护平台,提供可见性、风险管理和威胁检测 |
背景:一家拥有20年设备添加历史的制造工厂需要为IEC 62443风险评估建立完整的OT资产清单。没有准确的资产记录。
方法:
注意事项:不要在被动监控捕获基线流量模式之前急于进行主动发现。切勿直接对PLC或RTU使用IT漏洞扫描器(Nessus主动扫描)——这可能导致旧版控制器崩溃。始终将安全仪表系统(SIS)排除在主动查询之外。
ICS资产发现报告
============================
日期: YYYY-MM-DD
平台: Claroty xDome
站点: [站点名称]
发现摘要:
已发现资产总数: [数量]
新资产(不在CMDB中): [数量]
高风险资产: [数量]
PURDUE级别分布:
0级(过程): [数量] 个资产
1级(控制): [数量] 个资产
2级(监控): [数量] 个资产
3级(运营): [数量] 个资产
3.5级(DMZ): [数量] 个资产
4-5级(企业): [数量] 个资产
主要供应商:
1. [供应商] - [数量] 台设备
2. [供应商] - [数量] 台设备
关键发现:
- [影子设备描述]
- [生命周期终止固件发现]
- [未加密协议问题]