Deploys and configures Dragos platform for OT/ICS network monitoring using 600+ protocol parsers, threat intelligence, and API for sensors, assets, vulnerabilities, and notifications. For industrial threat protection.
npx claudepluginhub killvxk/cybersecurity-skills-zhThis skill uses the workspace's default tool permissions.
- 为工业环境部署OT专用网络检测与响应(NDR)解决方案
Deploys and configures Dragos Platform for OT/ICS network monitoring with 600+ protocol parsers, threat detection against VOLTZITE/GRAPHITE, and asset visibility via sensors and API.
Deploys Dragos Platform sensors for OT/ICS monitoring with 600+ protocol parsers, threat detection for groups like VOLTZITE, and Python API tools for sensor management and SIEM integration.
Deploys Nozomi Guardian sensors for passive OT/ICS network traffic analysis, providing asset visibility, real-time threat detection, anomaly detection, and vulnerability assessment via Python API client.
Share bugs, ideas, or general feedback.
不适用于无ICS组件的纯IT网络监控、OT工作站的端点检测与响应(EDR),或已标准化使用Claroty或Nozomi的环境(参见相应技能)。
#!/usr/bin/env python3
"""Dragos平台部署验证和集成工具。
验证Dragos传感器部署、检查连通性,并
配置与企业SIEM的集成以进行OT告警转发。
"""
import json
import sys
import csv
from datetime import datetime
from typing import Optional, List, Dict
try:
import requests
except ImportError:
print("安装requests: pip install requests")
sys.exit(1)
class DragosPlatformManager:
"""与Dragos平台API交互进行OT监控管理。"""
def __init__(self, base_url: str, api_key: str, api_secret: str, verify_ssl: bool = True):
self.base_url = base_url.rstrip("/")
self.session = requests.Session()
self.session.headers.update({
"API-Key": api_key,
"API-Secret": api_secret,
"Content-Type": "application/json",
})
self.session.verify = verify_ssl
def get_sensors(self) -> List[Dict]:
"""获取所有已部署的Dragos传感器及其状态。"""
resp = self.session.get(f"{self.base_url}/api/v1/sensors")
resp.raise_for_status()
return resp.json().get("sensors", [])
def get_assets(self, asset_type: Optional[str] = None) -> List[Dict]:
"""获取Dragos发现的OT资产。"""
params = {}
if asset_type:
params["type"] = asset_type
resp = self.session.get(f"{self.base_url}/api/v1/assets", params=params)
resp.raise_for_status()
return resp.json().get("assets", [])
def get_notifications(self, severity: str = "high", limit: int = 50) -> List[Dict]:
"""获取威胁检测通知。"""
params = {"min_severity": severity, "limit": limit}
resp = self.session.get(f"{self.base_url}/api/v1/notifications", params=params)
resp.raise_for_status()
return resp.json().get("notifications", [])
def get_vulnerabilities(self, severity: str = "critical") -> List[Dict]:
"""获取带有Dragos特定上下文的OT漏洞。"""
params = {"min_severity": severity}
resp = self.session.get(f"{self.base_url}/api/v1/vulnerabilities", params=params)
resp.raise_for_status()
return resp.json().get("vulnerabilities", [])
def get_threat_groups(self) -> List[Dict]:
"""获取与当前环境相关的已追踪ICS威胁组织活动。"""
resp = self.session.get(f"{self.base_url}/api/v1/threat-groups")
resp.raise_for_status()
return resp.json().get("threat_groups", [])
def validate_deployment(self):
"""验证传感器部署健康状态和覆盖范围。"""
sensors = self.get_sensors()
assets = self.get_assets()
print(f"\n{'='*65}")
print("DRAGOS平台部署验证")
print(f"{'='*65}")
print(f"验证时间: {datetime.now().isoformat()}")
print(f"\n--- 传感器状态 ---")
healthy_sensors = 0
for sensor in sensors:
status = sensor.get("status", "unknown")
icon = "[OK]" if status == "connected" else "[!!]"
print(f" {icon} {sensor.get('name', 'Unknown')} | 状态: {status}")
print(f" IP: {sensor.get('ip_address')} | 网段: {sensor.get('monitored_segment')}")
print(f" 最后在线: {sensor.get('last_seen')} | 数据包/秒: {sensor.get('pps', 0)}")
print(f" 知识包: {sensor.get('knowledge_pack_version', 'N/A')}")
if status == "connected":
healthy_sensors += 1
print(f"\n 传感器健康: {healthy_sensors}/{len(sensors)} 正常运行")
print(f"\n--- 资产可见性 ---")
print(f" 发现的资产总数: {len(assets)}")
asset_types = {}
for asset in assets:
atype = asset.get("type", "Unknown")
asset_types[atype] = asset_types.get(atype, 0) + 1
for atype, count in sorted(asset_types.items(), key=lambda x: -x[1]):
print(f" {atype}: {count}")
protocols = set()
for asset in assets:
protocols.update(asset.get("protocols", []))
print(f" 已观察到的协议: {', '.join(sorted(protocols))}")
print(f"\n--- 威胁情报 ---")
groups = self.get_threat_groups()
print(f" 相关威胁组织: {len(groups)}")
for group in groups:
print(f" - {group.get('name')}: {group.get('description', '')[:80]}")
print(f" 目标行业: {', '.join(group.get('target_sectors', []))}")
print(f" 活跃程度: {group.get('activity_level', 'Unknown')}")
def generate_siem_integration_config(self, siem_type: str = "splunk"):
"""生成Dragos告警的SIEM集成配置。"""
configs = {
"splunk": {
"syslog_format": "CEF",
"syslog_port": 514,
"severity_mapping": {
"critical": 10,
"high": 7,
"medium": 5,
"low": 3,
"info": 1,
},
"index": "ot_security",
"sourcetype": "dragos:notification",
"fields": [
"notification_id", "severity", "category", "source_ip",
"destination_ip", "asset_name", "protocol", "description",
"mitre_ics_technique", "threat_group",
],
},
"sentinel": {
"connector_type": "Syslog-CEF",
"workspace_id": "<workspace-id>",
"log_analytics_table": "DragosOTAlerts_CL",
"severity_mapping": {
"critical": "High",
"high": "High",
"medium": "Medium",
"low": "Low",
"info": "Informational",
},
},
}
config = configs.get(siem_type, configs["splunk"])
print(f"\n--- {siem_type.upper()} 集成配置 ---")
print(json.dumps(config, indent=2))
return config
if __name__ == "__main__":
manager = DragosPlatformManager(
base_url="https://dragos-sitestore.plant.local",
api_key="your-api-key",
api_secret="your-api-secret",
verify_ssl=True,
)
manager.validate_deployment()
manager.generate_siem_integration_config("splunk")
print(f"\n--- 近期高严重性通知 ---")
notifications = manager.get_notifications(severity="high", limit=10)
for n in notifications:
print(f" [{n.get('severity', '').upper()}] {n.get('title', '无标题')}")
print(f" 类别: {n.get('category')} | 时间: {n.get('timestamp')}")
print(f" 资产: {', '.join(n.get('affected_assets', []))}")
print(f" MITRE ICS: {n.get('mitre_technique', 'N/A')}")
# Dragos平台检测配置
# 针对制造/能源环境调优
detection_configuration:
knowledge_pack:
auto_update: true
update_schedule: "weekly"
include_threat_groups:
- "VOLTZITE" # 针对能源行业,窃取OT图表
- "GRAPHITE" # 2025年新威胁组织,针对ICS
- "BAUXITE" # 2025年新威胁组织,针对ICS
- "CHERNOVITE" # 开发了PIPEDREAM/INCONTROLLER框架
- "ELECTRUM" # 与Industroyer/CrashOverride相关
- "KAMACITE" # 针对能源行业初始访问
detection_categories:
network_baseline:
enabled: true
learning_period_days: 30
alert_on:
- "new_communication_pair"
- "new_protocol_detected"
- "new_device_on_network"
- "protocol_anomaly"
threat_detection:
enabled: true
alert_on:
- "known_malware_ioc"
- "threat_group_ttp"
- "lateral_movement"
- "command_and_control"
- "data_exfiltration"
vulnerability_correlation:
enabled: true
alert_on:
- "active_exploitation_attempt"
- "vulnerability_with_public_exploit"
protocol_monitoring:
modbus:
monitor_writes: true
baseline_function_codes: true
baseline_register_ranges: true
dnp3:
monitor_control_commands: true
detect_firmware_updates: true
s7comm:
detect_cpu_stop: true
detect_program_download: true
opc_ua:
monitor_method_calls: true
detect_browsing: true
ethernet_ip:
monitor_cip_services: true
detect_firmware_flash: true
alert_routing:
critical:
notify: ["ot_soc_team", "plant_manager"]
siem_forward: true
auto_ticket: true
high:
notify: ["ot_soc_team"]
siem_forward: true
auto_ticket: true
medium:
siem_forward: true
low:
siem_forward: true
| 术语 | 定义 |
|---|---|
| Dragos平台 | 专为OT网络安全打造的平台,具备ICS环境的资产可见性、威胁检测和漏洞管理能力 |
| 知识包(Knowledge Pack) | Dragos威胁情报更新包,包含针对ICS特有新威胁、恶意软件和漏洞利用的检测分析 |
| SiteStore | Dragos中央管理服务器,汇聚站点内所有已部署传感器的数据 |
| VOLTZITE | Dragos追踪的威胁组织,针对能源行业OT环境,窃取GIS数据和ICS网络图 |
| PIPEDREAM/INCONTROLLER | 由CHERNOVITE开发的模块化ICS攻击框架,针对Schneider/OMRON PLC和OPC UA服务器 |
| 邻域守护者(Neighborhood Keeper) | Dragos社区防御计划,在参与的OT环境之间共享匿名化威胁数据 |
背景:部署在电力公司的Dragos传感器检测到异常的OPC UA浏览活动以及从工程师工作站窃取设备配置数据。
处理方法:
注意事项:不要将OPC UA浏览告警视为误报——VOLTZITE专门使用这一技术进行预置。确保Dragos知识包是最新的以检测最新的VOLTZITE指标。在收集取证证据之前不要重新镜像被攻陷的工作站。
DRAGOS OT监控部署报告
==========================================
站点: [站点名称]
日期: YYYY-MM-DD
传感器部署:
传感器总数: [数量]
正常运行: [数量]
覆盖率: [已监控OT网段的百分比]
资产可见性:
OT资产总数: [数量]
PLC: [数量] | HMI: [数量] | 网络设备: [数量]
协议: [列表]
威胁检测:
相关活跃威胁组织: [数量]
已加载检测分析: [数量]
告警(过去30天): [按严重程度分类的数量]
SIEM集成:
状态: [已连接/未连接]
已转发事件(过去24小时): [数量]