Detects network attacks on OT historian servers (OSIsoft PI, Ignition, Wonderware) including data tampering, unauthorized queries, lateral movement, and specific vulnerabilities at IT/OT boundaries.
npx claudepluginhub killvxk/cybersecurity-skills-zhThis skill uses the workspace's default tool permissions.
- 监控连接IT与OT网络的历史数据服务器,检测入侵指标
Detects attacks on OT historian servers (OSIsoft PI, Ignition, Wonderware) including data manipulation, unauthorized queries, lateral movement, and historian vulnerabilities.
Detects cyber attacks on OT historian servers (OSIsoft PI, Ignition, Wonderware) including data manipulation, unauthorized queries, and lateral movement at IT/OT boundaries. Use for monitoring compromise indicators.
Secures process historian servers (OSIsoft PI, Honeywell PHD, GE Proficy, AVEVA Historian) in OT environments via Purdue model network deployment, access controls, DMZ data replication, SQL injection protection, and data integrity safeguards. For hardening deployments and audits.
Share bugs, ideas, or general feedback.
不适用于一般数据库安全监控(参见数据库安全技能)、历史数据服务器部署与配置,或仅限IT的数据仓库安全。
#!/usr/bin/env python3
"""OT Historian Attack Detector.
Monitors historian servers for unauthorized access, data manipulation,
lateral movement indicators, and exploitation of historian-specific
vulnerabilities. Supports OSIsoft PI and Ignition platforms.
"""
import json
import sys
from collections import defaultdict
from datetime import datetime, timedelta
from typing import Dict, List, Optional
try:
import requests
except ImportError:
print("Install requests: pip install requests")
sys.exit(1)
class HistorianAttackDetector:
"""Detects attacks targeting OT historian servers."""
def __init__(self, historian_type: str, historian_url: str,
api_credentials: dict, verify_ssl: bool = False):
self.historian_type = historian_type
self.historian_url = historian_url.rstrip("/")
self.credentials = api_credentials
self.verify_ssl = verify_ssl
self.alerts = []
self.authorized_clients = set()
self.authorized_queries = {}
def set_baseline(self, authorized_clients: List[str],
authorized_query_patterns: Dict[str, List[str]]):
"""Set baseline of authorized historian clients and query patterns."""
self.authorized_clients = set(authorized_clients)
self.authorized_queries = authorized_query_patterns
def check_active_connections(self) -> List[dict]:
"""Check for unauthorized connections to historian."""
connections = []
if self.historian_type == "osisoft_pi":
try:
resp = requests.get(
f"{self.historian_url}/piwebapi/system/status",
auth=(self.credentials.get("username"), self.credentials.get("password")),
verify=self.verify_ssl,
timeout=10,
)
if resp.status_code == 200:
data = resp.json()
connections = data.get("ConnectedClients", [])
except requests.RequestException as e:
print(f"[!] PI Web API 错误: {e}")
elif self.historian_type == "ignition":
try:
resp = requests.get(
f"{self.historian_url}/data/status/connections",
headers={"Authorization": f"Bearer {self.credentials.get('token')}"},
verify=self.verify_ssl,
timeout=10,
)
if resp.status_code == 200:
connections = resp.json().get("connections", [])
except requests.RequestException as e:
print(f"[!] Ignition API 错误: {e}")
# 检查未授权客户端
for conn in connections:
client_ip = conn.get("client_ip", conn.get("address", ""))
if self.authorized_clients and client_ip not in self.authorized_clients:
self.alerts.append({
"severity": "HIGH",
"type": "UNAUTHORIZED_HISTORIAN_CLIENT",
"timestamp": datetime.now().isoformat(),
"source_ip": client_ip,
"details": f"未授权客户端 {client_ip} 连接到 {self.historian_type} 历史数据服务器",
"mitre": "T0802 - Automated Collection",
})
return connections
def check_data_integrity(self, tags: List[str], hours_back: int = 24):
"""Check historian data for manipulation indicators."""
print(f"[*] 正在检查 {len(tags)} 个标签点过去 {hours_back} 小时的数据完整性")
integrity_issues = []
for tag in tags:
try:
if self.historian_type == "osisoft_pi":
resp = requests.get(
f"{self.historian_url}/piwebapi/streams/{tag}/recorded",
params={"startTime": f"*-{hours_back}h", "endTime": "*"},
auth=(self.credentials.get("username"), self.credentials.get("password")),
verify=self.verify_ssl,
timeout=15,
)
if resp.status_code == 200:
items = resp.json().get("Items", [])
# 检查可疑模式
if len(items) == 0:
integrity_issues.append({
"tag": tag, "issue": "NO_DATA",
"detail": "预期时间段内无数据点 - 可能已被删除",
})
else:
values = [i.get("Value", 0) for i in items if isinstance(i.get("Value"), (int, float))]
if values and len(set(values)) == 1 and len(values) > 100:
integrity_issues.append({
"tag": tag, "issue": "FLATLINE",
"detail": f"恒定值 {values[0]} 持续 {len(values)} 个数据点 - 可能是回放/欺骗攻击",
})
except requests.RequestException:
pass
for issue in integrity_issues:
self.alerts.append({
"severity": "HIGH",
"type": f"DATA_INTEGRITY_{issue['issue']}",
"timestamp": datetime.now().isoformat(),
"tag": issue["tag"],
"details": issue["detail"],
"mitre": "T0809 - Data Destruction" if issue["issue"] == "NO_DATA" else "T0832 - Manipulation of View",
})
return integrity_issues
def check_lateral_movement_indicators(self):
"""Check for indicators of historian being used as pivot point."""
indicators = []
# 检查1:历史数据服务器向一级设备发起出站连接
# (历史数据服务器应接收数据,而不是主动发起到PLC的连接)
indicators.append({
"check": "向PLC子网的出站连接",
"description": "历史数据服务器向一级设备发起连接可能表明已被入侵",
"detection": "监控防火墙日志,检查历史数据服务器IP是否连接PLC端口(502、102、44818)",
})
# 检查2:历史数据服务器上的新进程或服务
indicators.append({
"check": "历史数据服务器上的未授权进程",
"description": "攻击者可能在历史数据服务器上安装工具以进行横向移动",
"detection": "监控历史数据服务器上的进程创建事件(Sysmon EventID 1)",
})
# 检查3:历史数据服务器的异常身份验证
indicators.append({
"check": "来自意外来源的身份验证",
"description": "已入侵的IT系统向历史数据服务器进行身份验证以进行跳板攻击",
"detection": "监控Windows安全事件4624,检查非基线来源的登录",
})
return indicators
def generate_report(self):
"""Generate historian attack detection report."""
print(f"\n{'='*70}")
print("历史数据服务器攻击检测报告")
print(f"{'='*70}")
print(f"历史数据服务器类型: {self.historian_type}")
print(f"历史数据服务器URL: {self.historian_url}")
print(f"报告时间: {datetime.now().isoformat()}")
print(f"告警总数: {len(self.alerts)}")
if self.alerts:
print(f"\n--- 告警 ---")
for alert in self.alerts:
print(f"\n [{alert['severity']}] {alert['type']}")
print(f" 时间: {alert['timestamp']}")
print(f" 详情: {alert['details']}")
print(f" MITRE ICS: {alert.get('mitre', 'N/A')}")
print(f"\n--- 横向移动检查 ---")
for indicator in self.check_lateral_movement_indicators():
print(f"\n 检查: {indicator['check']}")
print(f" 风险: {indicator['description']}")
print(f" 检测: {indicator['detection']}")
if __name__ == "__main__":
detector = HistorianAttackDetector(
historian_type="osisoft_pi",
historian_url="https://pi-server.plant.local",
api_credentials={"username": "pi_reader", "password": "api_key_here"},
)
detector.set_baseline(
authorized_clients=["10.10.2.10", "10.10.2.20", "10.10.3.50", "10.10.150.10"],
authorized_query_patterns={},
)
detector.check_active_connections()
detector.check_data_integrity(tags=["REACTOR_01.TEMP", "PUMP_03.FLOW"], hours_back=24)
detector.generate_report()
| 术语 | 定义 |
|---|---|
| OT历史数据服务器(OT Historian) | 存储SCADA/DCS系统时间序列过程数据的数据库服务器(OSIsoft PI、Ignition、Wonderware) |
| 跳板点(Pivot Point) | 历史数据服务器位于IT和OT网络之间,是攻击者在区域间移动的主要目标 |
| 数据回放攻击(Data Replay Attack) | 向HMI馈送历史数据以掩盖实时过程操控(Stuxnet技术) |
| OSIsoft PI | 最广泛部署的OT历史数据服务器,被全球500强过程工业企业中65%使用 |
| Ignition | Inductive Automation的SCADA平台,带有历史数据模块,因Python脚本功能而日益成为攻击目标 |
| CVE-2025-0921 | Ignition SCADA特权文件系统漏洞,允许通过恶意项目文件进行权限提升 |
历史数据服务器攻击检测报告
====================================
历史数据服务器: [类型和主机名]
日期: YYYY-MM-DD
连接分析:
已授权客户端: [数量]
检测到未授权客户端: [数量及IP]
数据完整性:
检查的标签点: [数量]
完整性问题: [数量]
平线检测: [数量]
数据缺口: [数量]
横向移动指标:
出站PLC连接: [发现/未发现]
未授权进程: [发现/未发现]
异常身份验证: [发现/未发现]