Deploys Nozomi Guardian sensors for passive OT/ICS network traffic analysis, providing asset visibility, real-time threat detection, anomaly detection, and vulnerability assessment via Python API client.
npx claudepluginhub killvxk/cybersecurity-skills-zhThis skill uses the workspace's default tool permissions.
- 使用Nozomi Networks Guardian传感器部署被动OT网络监控
Deploys Nozomi Guardian sensors for passive OT network monitoring, enabling asset visibility, anomaly detection, and threat analysis in ICS via Python API scripts.
Deploys and manages Nozomi Networks Guardian sensors for passive OT network traffic analysis, providing asset visibility, real-time threat detection, vulnerability assessment, and anomaly detection in ICS environments.
Deploys and configures Dragos platform for OT/ICS network monitoring using 600+ protocol parsers, threat intelligence, and API for sensors, assets, vulnerabilities, and notifications. For industrial threat protection.
Share bugs, ideas, or general feedback.
不适用于OT设备的主动漏洞扫描(参见performing-ot-vulnerability-scanning-safely)、已标准化使用Dragos的环境(参见implementing-dragos-platform-for-ot-monitoring),或仅IT网络监控。
#!/usr/bin/env python3
"""Nozomi Guardian部署管理器和告警分析器。
管理Nozomi Guardian传感器部署验证、资产清单
提取,以及OT环境的威胁告警分析。
"""
import json
import sys
from collections import defaultdict
from datetime import datetime
from typing import Dict, List, Optional
try:
import requests
except ImportError:
print("安装requests: pip install requests")
sys.exit(1)
class NozomiGuardianManager:
"""管理Nozomi Networks Guardian进行OT监控。"""
def __init__(self, guardian_url: str, api_token: str, verify_ssl: bool = False):
self.guardian_url = guardian_url.rstrip("/")
self.session = requests.Session()
self.session.headers.update({
"Authorization": f"Bearer {api_token}",
"Content-Type": "application/json",
})
self.session.verify = verify_ssl
def get_nodes(self, node_type: Optional[str] = None) -> List[Dict]:
"""获取已发现的网络节点(资产)。"""
params = {}
if node_type:
params["type"] = node_type
resp = self.session.get(f"{self.guardian_url}/api/v1/nodes", params=params)
resp.raise_for_status()
return resp.json().get("result", [])
def get_alerts(self, severity: str = "high", limit: int = 100) -> List[Dict]:
"""获取安全告警。"""
params = {"severity": severity, "limit": limit, "status": "open"}
resp = self.session.get(f"{self.guardian_url}/api/v1/alerts", params=params)
resp.raise_for_status()
return resp.json().get("result", [])
def get_links(self) -> List[Dict]:
"""获取节点之间的通信链接。"""
resp = self.session.get(f"{self.guardian_url}/api/v1/links")
resp.raise_for_status()
return resp.json().get("result", [])
def get_vulnerabilities(self) -> List[Dict]:
"""获取检测到的漏洞。"""
resp = self.session.get(f"{self.guardian_url}/api/v1/vulnerabilities")
resp.raise_for_status()
return resp.json().get("result", [])
def validate_deployment(self):
"""验证Guardian传感器部署和覆盖范围。"""
print(f"\n{'='*65}")
print("NOZOMI GUARDIAN部署验证")
print(f"{'='*65}")
print(f"Guardian URL: {self.guardian_url}")
print(f"验证时间: {datetime.now().isoformat()}")
# 检查系统状态
try:
resp = self.session.get(f"{self.guardian_url}/api/v1/system/status")
if resp.status_code == 200:
status = resp.json()
print(f"\n--- 系统状态 ---")
print(f" 版本: {status.get('version', 'N/A')}")
print(f" 运行时间: {status.get('uptime', 'N/A')}")
print(f" 已处理数据包: {status.get('packets_processed', 'N/A')}")
print(f" 威胁情报: {status.get('threat_intelligence_version', 'N/A')}")
except requests.RequestException as e:
print(f" [!] 系统状态不可用: {e}")
# 资产发现摘要
nodes = self.get_nodes()
print(f"\n--- 资产发现 ---")
print(f" 发现的节点总数: {len(nodes)}")
type_counts = defaultdict(int)
vendor_counts = defaultdict(int)
protocol_set = set()
for node in nodes:
type_counts[node.get("type", "unknown")] += 1
vendor_counts[node.get("vendor", "Unknown")] += 1
for proto in node.get("protocols", []):
protocol_set.add(proto)
print(f"\n 按类型:")
for ntype, count in sorted(type_counts.items(), key=lambda x: -x[1]):
print(f" {ntype}: {count}")
print(f"\n 按供应商:")
for vendor, count in sorted(vendor_counts.items(), key=lambda x: -x[1])[:10]:
print(f" {vendor}: {count}")
print(f"\n 已观察到的协议: {', '.join(sorted(protocol_set))}")
# 告警摘要
alerts = self.get_alerts(severity="high")
print(f"\n--- 告警摘要 ---")
print(f" 高/严重告警: {len(alerts)}")
alert_types = defaultdict(int)
for alert in alerts:
alert_types[alert.get("type_id", "unknown")] += 1
for atype, count in sorted(alert_types.items(), key=lambda x: -x[1])[:10]:
print(f" {atype}: {count}")
# 漏洞摘要
vulns = self.get_vulnerabilities()
print(f"\n--- 漏洞摘要 ---")
print(f" 漏洞总数: {len(vulns)}")
sev_counts = defaultdict(int)
for vuln in vulns:
sev_counts[vuln.get("severity", "unknown")] += 1
for sev in ["critical", "high", "medium", "low"]:
if sev in sev_counts:
print(f" {sev.capitalize()}: {sev_counts[sev]}")
def analyze_communication_patterns(self):
"""分析OT通信模式中的异常情况。"""
links = self.get_links()
nodes = {n.get("id"): n for n in self.get_nodes()}
print(f"\n--- 通信分析 ---")
print(f" 通信链接总数: {len(links)}")
# 识别跨区域通信
cross_zone = []
for link in links:
src_node = nodes.get(link.get("source_id"), {})
dst_node = nodes.get(link.get("destination_id"), {})
src_zone = src_node.get("zone", "unknown")
dst_zone = dst_node.get("zone", "unknown")
if src_zone != dst_zone and src_zone != "unknown" and dst_zone != "unknown":
cross_zone.append({
"source": src_node.get("label", "Unknown"),
"source_zone": src_zone,
"destination": dst_node.get("label", "Unknown"),
"dest_zone": dst_zone,
"protocols": link.get("protocols", []),
})
if cross_zone:
print(f"\n 跨区域通信: {len(cross_zone)}")
for comm in cross_zone[:10]:
print(f" {comm['source']} ({comm['source_zone']}) -> "
f"{comm['destination']} ({comm['dest_zone']}) "
f"通过 {', '.join(comm['protocols'])}")
if __name__ == "__main__":
manager = NozomiGuardianManager(
guardian_url="https://nozomi-guardian.plant.local",
api_token="your-api-token",
)
manager.validate_deployment()
manager.analyze_communication_patterns()
| 术语 | 定义 |
|---|---|
| Guardian | Nozomi Networks被动传感器,通过SPAN/TAP监控OT网络流量,不产生额外流量 |
| Vantage | Nozomi基于云的中央管理平台,用于汇聚多个Guardian传感器的数据 |
| 行为异常检测(BAD) | Nozomi基于AI的方法,检测与学习到的正常OT网络行为的偏差 |
| 智能轮询(Smart Polling) | Nozomi使用原生协议安全提取额外设备详情的主动查询功能 |
| 资产情报(Asset Intelligence) | Nozomi从网络流量自动识别和分类OT/IoT资产 |
| 威胁情报订阅源 | Nozomi Labs维护的OT特定威胁指标订阅源,基于全球蜜罐数据更新 |
NOZOMI GUARDIAN OT监控报告
=======================================
站点: [站点名称]
日期: YYYY-MM-DD
资产可见性:
总资产数: [数量]
PLC: [数量] | HMI: [数量] | 交换机: [数量]
协议: [列表]
供应商: [前5名]
威胁检测:
严重告警: [数量]
高告警: [数量]
主要告警类别: [列表]
漏洞:
严重: [数量]
高: [数量]
网络分析:
通信链接: [数量]
跨区域流: [数量]