Implements secure remote access to OT/ICS environments using jump servers, MFA, session recording, PAM, and vendor controls. Complies with IEC 62443 and NERC CIP-005 for operators, engineers, vendors.
npx claudepluginhub killvxk/cybersecurity-skills-zhThis skill uses the workspace's default tool permissions.
- 为OT环境实施或升级远程访问架构时
Implements secure remote access to OT/ICS environments using jump servers, MFA, session recording, PAM, and vendor controls for IEC 62443/NERC CIP-005 compliance.
Implements secure remote access to OT/ICS environments using jump servers, MFA, session recording, and PAM while ensuring IEC 62443/NERC CIP-005 compliance.
Implements IEC 62443-compliant secure conduits for OT remote access with jump servers, MFA gateways, session recording, and approval workflows to control ICS access without OT network exposure.
Share bugs, ideas, or general feedback.
不适用于保护没有OT组件的纯IT远程访问、为企业员工配置VPN(参见通用VPN指南),或OT设施的物理访问控制。
在DMZ中实施具有中间系统的深度防御远程访问架构,防止外部用户与OT系统之间的任何直接网络连接。
# OT远程访问架构
# 关键原则:外部网络到OT系统无直接连接
architecture:
external_access_point:
location: "面向互联网的防火墙"
service: "VPN网关(IKEv2/IPsec或SSL VPN)"
authentication: "证书 + MFA(CIP-005-7 R2.4)"
controls:
- "供应商访问的源IP白名单"
- "基于时间的访问窗口"
- "带宽速率限制"
dmz_intermediate_system:
location: "第3.5层DMZ"
platform: "CyberArk特权访问安全或加固的跳板服务器"
function: "会话代理——终止外部连接,发起新的内部连接"
controls:
- "所有会话在跳板服务器终止(不直通)"
- "带键盘记录的会话录制"
- "剪贴板和文件传输限制"
- "不活跃30分钟后会话超时"
- "每用户并发会话限制"
ot_internal_access:
location: "第3层/第2层OT网络"
method: "跳板服务器向OT系统发起RDP/SSH/VNC连接"
controls:
- "防火墙仅允许跳板服务器IP到达OT"
- "协议受限(RDP 3389、SSH 22、VNC 5900)"
- "按用户角色限定目标"
vendor_access:
description: "第三方供应商远程访问"
additional_controls:
- "供应商账户默认禁用;按请求启用"
- "时间限制的访问窗口(按会话启用/禁用)"
- "OT操作员必须在供应商会话期间陪同"
- "OT安全团队实时监控会话"
- "无持久凭据——使用一次性访问令牌"
# 网络流向:
# 用户 -> VPN -> 防火墙 -> DMZ跳板服务器 -> 内部防火墙 -> OT系统
# (两个独立的已认证连接;无直接路由)
在DMZ中部署和加固跳板服务器,配置会话管理、记录和基于角色的访问控制。
#!/usr/bin/env python3
"""OT远程访问会话管理器。
管理对OT环境的授权远程访问会话,
包括会话创建、监控、录制和终止。
与PAM解决方案集成以进行凭据存管。
"""
import json
import hashlib
import sys
from dataclasses import dataclass, field, asdict
from datetime import datetime, timedelta
from enum import Enum
class SessionState(str, Enum):
PENDING_APPROVAL = "pending_approval"
APPROVED = "approved"
ACTIVE = "active"
TERMINATED = "terminated"
EXPIRED = "expired"
DENIED = "denied"
class UserRole(str, Enum):
OT_OPERATOR = "ot_operator"
OT_ENGINEER = "ot_engineer"
VENDOR = "vendor"
SECURITY_ANALYST = "security_analyst"
@dataclass
class RemoteAccessSession:
session_id: str
user_id: str
user_role: str
source_ip: str
target_system: str
target_ip: str
protocol: str
purpose: str
state: str = SessionState.PENDING_APPROVAL
mfa_verified: bool = False
approved_by: str = ""
created_at: str = ""
started_at: str = ""
ended_at: str = ""
max_duration_minutes: int = 120
recording_path: str = ""
actions_logged: list = field(default_factory=list)
class OTRemoteAccessManager:
"""管理OT环境的远程访问会话。"""
def __init__(self):
self.sessions = {}
self.active_sessions = {}
self.access_policies = {}
self.audit_log = []
def define_access_policy(self, role, allowed_targets, protocols, max_duration):
"""为用户角色定义访问策略。"""
self.access_policies[role] = {
"allowed_targets": allowed_targets,
"allowed_protocols": protocols,
"max_duration_minutes": max_duration,
"requires_co_attendance": role == UserRole.VENDOR,
"requires_approval": role == UserRole.VENDOR,
}
def request_session(self, user_id, user_role, source_ip,
target_system, target_ip, protocol, purpose):
"""请求新的远程访问会话。"""
# 生成唯一会话ID
session_id = hashlib.sha256(
f"{user_id}{target_ip}{datetime.now().isoformat()}".encode()
).hexdigest()[:16]
# 检查访问策略
policy = self.access_policies.get(user_role)
if not policy:
return None, "该角色未定义访问策略"
if target_system not in policy["allowed_targets"]:
self._audit("ACCESS_DENIED", user_id, target_system,
f"目标不在角色 {user_role} 的允许列表中")
return None, f"角色 {user_role} 未授权访问目标 {target_system}"
if protocol not in policy["allowed_protocols"]:
self._audit("ACCESS_DENIED", user_id, target_system,
f"角色 {user_role} 不允许协议 {protocol}")
return None, f"未授权使用协议 {protocol}"
session = RemoteAccessSession(
session_id=session_id,
user_id=user_id,
user_role=user_role,
source_ip=source_ip,
target_system=target_system,
target_ip=target_ip,
protocol=protocol,
purpose=purpose,
max_duration_minutes=policy["max_duration_minutes"],
created_at=datetime.now().isoformat(),
)
# 供应商会话需要审批
if policy.get("requires_approval"):
session.state = SessionState.PENDING_APPROVAL
else:
session.state = SessionState.APPROVED
self.sessions[session_id] = session
self._audit("SESSION_REQUESTED", user_id, target_system, purpose)
return session_id, "会话已创建"
def approve_session(self, session_id, approver_id):
"""审批待审批的供应商会话。"""
session = self.sessions.get(session_id)
if not session:
return False, "未找到会话"
if session.state != SessionState.PENDING_APPROVAL:
return False, "会话不在待审批状态"
session.state = SessionState.APPROVED
session.approved_by = approver_id
self._audit("SESSION_APPROVED", approver_id, session.target_system,
f"已审批 {session.user_id} 的会话 {session_id}")
return True, "会话已审批"
def activate_session(self, session_id, mfa_token):
"""MFA验证后激活已审批的会话。"""
session = self.sessions.get(session_id)
if not session or session.state != SessionState.APPROVED:
return False, "会话未获审批"
# 验证MFA(简化版——实际实现调用MFA提供商)
session.mfa_verified = True
session.state = SessionState.ACTIVE
session.started_at = datetime.now().isoformat()
session.recording_path = f"/recordings/{session_id}_{datetime.now().strftime('%Y%m%d')}.mp4"
self.active_sessions[session_id] = session
self._audit("SESSION_ACTIVATED", session.user_id, session.target_system,
f"MFA已验证,录制到 {session.recording_path}")
return True, "会话已激活"
def terminate_session(self, session_id, reason="manual"):
"""终止活跃会话。"""
session = self.active_sessions.pop(session_id, None)
if not session:
session = self.sessions.get(session_id)
if not session:
return False
session.state = SessionState.TERMINATED
session.ended_at = datetime.now().isoformat()
self._audit("SESSION_TERMINATED", session.user_id, session.target_system, reason)
return True
def check_expired_sessions(self):
"""终止超过最大时长的会话。"""
now = datetime.now()
expired = []
for sid, session in list(self.active_sessions.items()):
started = datetime.fromisoformat(session.started_at)
if (now - started).total_seconds() > session.max_duration_minutes * 60:
self.terminate_session(sid, "超过最大时长")
expired.append(sid)
return expired
def _audit(self, event_type, user_id, target, detail):
"""写入审计日志。"""
self.audit_log.append({
"timestamp": datetime.now().isoformat(),
"event": event_type,
"user": user_id,
"target": target,
"detail": detail,
})
def generate_report(self):
"""生成远程访问会话报告。"""
report = []
report.append("=" * 70)
report.append("OT远程访问会话报告")
report.append(f"日期: {datetime.now().isoformat()}")
report.append("=" * 70)
# 会话摘要
total = len(self.sessions)
active = sum(1 for s in self.sessions.values() if s.state == SessionState.ACTIVE)
report.append(f"\n总会话数: {total}")
report.append(f"活跃会话数: {active}")
# 活跃会话详情
if self.active_sessions:
report.append("\n活跃会话:")
for sid, s in self.active_sessions.items():
report.append(f" [{sid[:8]}] {s.user_id} ({s.user_role})")
report.append(f" 目标: {s.target_system} ({s.target_ip})")
report.append(f" 开始: {s.started_at}")
report.append(f" MFA: {'已验证' if s.mfa_verified else '未验证'}")
return "\n".join(report)
if __name__ == "__main__":
manager = OTRemoteAccessManager()
# 定义策略
manager.define_access_policy(
UserRole.OT_ENGINEER,
["HMI-01", "HMI-02", "EWS-01", "HISTORIAN-01"],
["RDP", "SSH"],
max_duration=240,
)
manager.define_access_policy(
UserRole.VENDOR,
["DCS-EWS-01"],
["RDP"],
max_duration=120,
)
# 请求供应商会话
sid, msg = manager.request_session(
"vendor_honeywell_01", UserRole.VENDOR,
"203.0.113.50", "DCS-EWS-01", "10.30.1.20",
"RDP", "DCS固件更新,变更请求 CR-2026-0045")
print(f"会话请求: {msg} ({sid})")
if sid:
manager.approve_session(sid, "ot_manager_01")
manager.activate_session(sid, "123456")
print(manager.generate_report())
| 术语 | 定义 |
|---|---|
| 中间系统(Intermediate System) | DMZ中终止外部连接并代理到OT的新连接的系统,按CIP-005要求防止直接网络访问 |
| 跳板服务器(Jump Server) | DMZ中用于到OT系统的远程访问会话的加固堡垒主机,具备会话记录和访问控制 |
| 会话记录(Session Recording) | 捕获所有远程访问会话活动(屏幕、键盘操作、命令)用于安全审计和事件调查 |
| 特权访问管理(PAM) | 用于存管凭据、控制访问和审计对关键OT系统特权会话的系统 |
| 陪同(Co-Attendance) | OT操作员实时监控供应商远程访问会话的要求 |
| 时间限制访问(Time-Limited Access) | 供应商账户仅在特定维护窗口期间启用,窗口关闭后自动禁用 |
OT远程访问安全报告
===================================
活跃会话: [N]
待审批: [N]
今日会话: [N]
MFA合规性:
所有会话MFA已验证: [是/否]
无MFA的会话: [N]
供应商访问:
活跃供应商会话: [N]
已陪同: [N]
已录制: [N]