Implements IEC 62443-compliant secure conduits for OT remote access with jump servers, MFA gateways, session recording, and approval workflows to control ICS access without OT network exposure.
npx claudepluginhub killvxk/cybersecurity-skills-zhThis skill uses the workspace's default tool permissions.
- 将来自IT或供应商直接进入OT控制网络的VPN连接替换为安全管道
Implements IEC 62443-compliant secure conduits for OT remote access using jump servers, MFA gateways, session recording, and approval workflows to secure vendor access to ICS without direct OT exposure.
Implements IEC 62443-compliant secure conduits for OT remote access with jump servers, MFA gateways, session recording, and approval workflows to secure ICS without direct network exposure.
Implements secure remote access to OT/ICS environments using jump servers, MFA, session recording, PAM, and vendor controls. Complies with IEC 62443 and NERC CIP-005 for operators, engineers, vendors.
Share bugs, ideas, or general feedback.
不适用于设计整体Purdue Model分段(参见implementing-purdue-model-network-segmentation)、部署仅IT远程访问解决方案,或配置对PLC的本地控制台访问。
# IEC 62443 OT远程访问管道安全架构
# 所有远程访问终止于DMZ - 永不直接通达OT
conduit_architecture:
remote_access_conduit:
conduit_id: "RA-001"
source_zone: "企业IT(4级)"
destination_zone: "OT DMZ(3.5级)"
security_level_target: "SL3"
components:
external_gateway:
type: "VPN集中器"
location: "企业DMZ"
ip: "10.200.1.10"
protocols: ["IPsec", "SSL-VPN"]
authentication: "需要MFA(证书 + OTP)"
dmz_jump_server:
type: "特权访问工作站"
location: "OT DMZ(3.5级)"
ip: "10.10.150.20"
os: "Windows Server 2022(加固版)"
controls:
- "禁用本地管理员账户"
- "禁用USB端口"
- "默认禁用剪贴板传输"
- "启用会话录制(所有击键和屏幕)"
- "最长会话时长: 4小时"
- "空闲15分钟后自动注销"
- "强制执行应用程序白名单"
ot_access_gateway:
type: "工业远程访问网关"
location: "OT DMZ / 3级边界"
ip: "10.10.150.25"
controls:
- "仅可到达预先批准的目标IP"
- "协议级过滤(仅RDP、SSH、VNC)"
- "有时间限制的会话,自动过期"
- "基于审批的访问(工厂经理必须授权)"
data_flow:
- step: 1
description: "用户通过企业VPN进行MFA认证"
source: "远程用户"
destination: "VPN集中器"
protocol: "IPsec/SSL-VPN"
- step: 2
description: "用户连接到OT DMZ中的跳板服务器"
source: "VPN隧道"
destination: "DMZ跳板服务器"
protocol: "RDP with NLA"
- step: 3
description: "从跳板服务器,用户访问特定OT系统"
source: "DMZ跳板服务器"
destination: "批准的OT目标"
protocol: "RDP/SSH/VNC"
condition: "仅在经理批准且在时间窗口内"
prohibited_flows:
- "从外部到1/2级OT设备的直接VPN隧道"
- "OT会话期间允许互联网访问的分割隧道"
- "未扫描从跳板服务器到OT的文件传输"
- "持久性VPN连接(禁用自动重连)"
vendor_access_conduit:
conduit_id: "VA-001"
source_zone: "供应商外部网络"
destination_zone: "OT DMZ(3.5级)"
security_level_target: "SL3"
workflow:
request:
- "供应商通过门户提交访问请求(提前24小时)"
- "请求包括: 目的、目标系统、持续时间、人员"
- "工厂运营经理审查并批准/拒绝"
- "批准后生成有时间限制的凭据"
session:
- "供应商通过专用供应商VPN网关连接"
- "MFA认证(供应商通过短信/应用接收OTP)"
- "会话落在供应商专用跳板服务器(与内部跳板隔离)"
- "所有操作录制(视频 + 击键日志)"
- "OT工程师实时监控供应商会话"
termination:
- "会话在批准的结束时间自动终止"
- "凭据自动撤销"
- "会话录制归档90天"
- "访问日志转发至SIEM"
#!/usr/bin/env python3
"""OT Remote Access Conduit Manager.
Manages approval-based remote access to OT systems through
secure conduit architecture with session recording, MFA
enforcement, and time-limited access windows.
"""
import json
import sys
from datetime import datetime, timedelta
from typing import Dict, List, Optional
from enum import Enum
class AccessRequestStatus(Enum):
PENDING = "pending"
APPROVED = "approved"
REJECTED = "rejected"
ACTIVE = "active"
EXPIRED = "expired"
REVOKED = "revoked"
class RemoteAccessRequest:
"""Represents an OT remote access request."""
def __init__(self, requestor: str, requestor_type: str, purpose: str,
target_systems: List[str], duration_hours: int,
requested_start: str):
self.id = f"OT-RA-{datetime.now().strftime('%Y%m%d-%H%M%S')}"
self.requestor = requestor
self.requestor_type = requestor_type # "internal_engineer" 或 "vendor"
self.purpose = purpose
self.target_systems = target_systems
self.duration_hours = duration_hours
self.requested_start = requested_start
self.status = AccessRequestStatus.PENDING
self.created = datetime.now().isoformat()
self.approved_by = None
self.session_id = None
self.audit_trail = []
def approve(self, approver: str, conditions: str = ""):
"""Approve the access request."""
self.status = AccessRequestStatus.APPROVED
self.approved_by = approver
self.audit_trail.append({
"timestamp": datetime.now().isoformat(),
"action": "APPROVED",
"actor": approver,
"conditions": conditions,
})
def reject(self, rejector: str, reason: str):
"""Reject the access request."""
self.status = AccessRequestStatus.REJECTED
self.audit_trail.append({
"timestamp": datetime.now().isoformat(),
"action": "REJECTED",
"actor": rejector,
"reason": reason,
})
class OTConduitManager:
"""Manages OT remote access conduit security."""
def __init__(self):
self.access_requests: Dict[str, RemoteAccessRequest] = {}
self.active_sessions: Dict[str, dict] = {}
self.policy = self._load_policy()
def _load_policy(self) -> dict:
"""Load remote access policy."""
return {
"max_session_hours": 4,
"idle_timeout_minutes": 15,
"mfa_required": True,
"session_recording": True,
"clipboard_transfer": False,
"file_transfer": "scan_required",
"advance_notice_hours": 24,
"vendor_escort_required": True,
"prohibited_targets": ["SIS-*", "SAFETY-*"],
"allowed_protocols": ["RDP", "SSH", "VNC"],
"blocked_protocols": ["Telnet", "FTP", "SMB"],
}
def submit_request(self, request: RemoteAccessRequest) -> str:
"""Submit a new remote access request."""
# 对策略进行验证
violations = []
if request.duration_hours > self.policy["max_session_hours"]:
violations.append(
f"时长 {request.duration_hours}h 超过最大值 {self.policy['max_session_hours']}h"
)
for target in request.target_systems:
for prohibited in self.policy["prohibited_targets"]:
pattern = prohibited.replace("*", "")
if target.startswith(pattern):
violations.append(f"目标 {target} 在禁止列表中(安全系统)")
if violations:
print(f"[!] 发现策略违规:")
for v in violations:
print(f" - {v}")
return ""
self.access_requests[request.id] = request
print(f"[+] 访问请求 {request.id} 已提交待审批")
return request.id
def list_pending_requests(self):
"""List all pending access requests for approval."""
pending = [r for r in self.access_requests.values()
if r.status == AccessRequestStatus.PENDING]
print(f"\n{'='*65}")
print("待处理OT远程访问请求")
print(f"{'='*65}")
if not pending:
print(" 无待处理请求")
return
for req in pending:
print(f"\n 请求: {req.id}")
print(f" 申请人: {req.requestor} ({req.requestor_type})")
print(f" 目的: {req.purpose}")
print(f" 目标: {', '.join(req.target_systems)}")
print(f" 时长: {req.duration_hours} 小时")
print(f" 开始: {req.requested_start}")
print(f" 提交: {req.created}")
def generate_audit_report(self):
"""Generate audit report of all remote access activity."""
print(f"\n{'='*65}")
print("OT远程访问审计报告")
print(f"{'='*65}")
print(f"报告日期: {datetime.now().isoformat()}")
print(f"请求总数: {len(self.access_requests)}")
status_counts = {}
for req in self.access_requests.values():
status = req.status.value
status_counts[status] = status_counts.get(status, 0) + 1
print(f"\n请求状态:")
for status, count in status_counts.items():
print(f" {status}: {count}")
print(f"\n详细审计跟踪:")
for req in self.access_requests.values():
print(f"\n {req.id} - {req.requestor} ({req.requestor_type})")
print(f" 状态: {req.status.value}")
print(f" 目标: {', '.join(req.target_systems)}")
for entry in req.audit_trail:
print(f" [{entry['timestamp']}] {entry['action']} 由 {entry['actor']}")
if __name__ == "__main__":
manager = OTConduitManager()
# 供应商访问请求
vendor_req = RemoteAccessRequest(
requestor="John Smith - Siemens Field Service",
requestor_type="vendor",
purpose="S7-1500控制器的年度PLC固件更新",
target_systems=["PLC-REACTOR-01", "PLC-REACTOR-02"],
duration_hours=3,
requested_start="2025-03-15T08:00:00",
)
req_id = manager.submit_request(vendor_req)
if req_id:
manager.list_pending_requests()
# 模拟审批
manager.access_requests[req_id].approve(
approver="Plant Manager - Jane Doe",
conditions="OT工程师必须监督供应商会话",
)
manager.generate_audit_report()
| 术语 | 定义 |
|---|---|
| 管道(Conduit) | IEC 62443中定义了安全策略的安全区域之间的受控通信路径 |
| 跳板服务器(Jump Server) | 位于DMZ中的加固中间服务器,所有OT远程访问必须经过此服务器 |
| 会话录制(Session Recording) | 捕获远程访问会话期间所有屏幕活动、击键和命令,用于审计 |
| 基于审批的访问(Approval-Based Access) | 要求工厂运营经理授权才能激活远程访问凭据的工作流 |
| 供应商陪同(Vendor Escort) | 内部OT工程师实时监控供应商远程会话的做法 |
| 紧急访问(Break-Glass Access) | 关键情况下绕过正常审批工作流的紧急访问程序 |
OT远程访问管道报告
==================================
日期: YYYY-MM-DD
管道状态:
内部访问管道: [活跃/非活跃]
供应商访问管道: [活跃/非活跃]
访问请求(过去30天):
已提交: [数量]
已批准: [数量]
已拒绝: [数量]
平均会话时长: [小时]
策略合规性:
MFA执行率: [100%]
会话录制率: [100%]
有时间限制的会话: [合规%]
阻止的禁止目标尝试: [数量]