Deploys and integrates MISP, OpenCTI, TheHive, and Cortex via Docker Compose to build a threat intelligence platform for CTI collection, analysis, enrichment, and sharing.
npx claudepluginhub killvxk/cybersecurity-skills-zhThis skill uses the workspace's default tool permissions.
构建威胁情报平台(TIP)涉及将多个 CTI 工具部署和集成到统一系统中,用于收集、分析、富化和分发威胁情报。本技能涵盖使用开源工具(MISP、OpenCTI、TheHive、Cortex)设计 TIP 架构、配置推送摄取流水线、建立富化工作流、实现 STIX/TAXII 互操作性,以及构建 CTI 运营的分析师仪表板。
Deploys Threat Intelligence Platform integrating MISP, OpenCTI, TheHive, Cortex via Docker Compose, Python libraries, Elasticsearch, Redis for CTI collection, analysis, enrichment.
Deploys and integrates open-source CTI tools like MISP, OpenCTI, TheHive, Cortex into a Threat Intelligence Platform for threat data collection, analysis, enrichment, and STIX/TAXII sharing. Useful for cybersecurity architecture.
Deploys MISP via Docker Compose to aggregate threat feeds from sources like abuse.ch and AlienVault OTX, correlate IOCs, and integrate with SIEM tools like Splunk.
Share bugs, ideas, or general feedback.
构建威胁情报平台(TIP)涉及将多个 CTI 工具部署和集成到统一系统中,用于收集、分析、富化和分发威胁情报。本技能涵盖使用开源工具(MISP、OpenCTI、TheHive、Cortex)设计 TIP 架构、配置推送摄取流水线、建立富化工作流、实现 STIX/TAXII 互操作性,以及构建 CTI 运营的分析师仪表板。
pymisp、pycti、thehive4py 库version: '3.8'
services:
# --- 存储层 ---
elasticsearch:
image: docker.elastic.co/elasticsearch/elasticsearch:8.12.0
environment:
- discovery.type=single-node
- xpack.security.enabled=false
- "ES_JAVA_OPTS=-Xms2g -Xmx2g"
ports:
- "9200:9200"
volumes:
- es-data:/usr/share/elasticsearch/data
redis:
image: redis:7
ports:
- "6379:6379"
rabbitmq:
image: rabbitmq:3-management
ports:
- "5672:5672"
- "15672:15672"
minio:
image: minio/minio
command: server /data --console-address ":9001"
ports:
- "9000:9000"
- "9001:9001"
# --- MISP ---
misp:
image: ghcr.io/misp/misp-docker/misp-core:latest
ports:
- "8443:443"
environment:
- MISP_ADMIN_EMAIL=admin@tip.local
- MISP_BASEURL=https://localhost:8443
volumes:
- misp-data:/var/www/MISP/app/files
# --- OpenCTI ---
opencti:
image: opencti/platform:6.4.4
environment:
- APP__PORT=8080
- APP__ADMIN__EMAIL=admin@tip.local
- APP__ADMIN__PASSWORD=TIPAdminPassword
- APP__ADMIN__TOKEN=tip-opencti-token-uuid
- ELASTICSEARCH__URL=http://elasticsearch:9200
- MINIO__ENDPOINT=minio
- RABBITMQ__HOSTNAME=rabbitmq
- REDIS__HOSTNAME=redis
ports:
- "8080:8080"
depends_on:
- elasticsearch
- redis
- rabbitmq
- minio
# --- TheHive ---
thehive:
image: strangebee/thehive:5.3
environment:
- TH_CORTEX_URL=http://cortex:9001
ports:
- "9000:9000"
depends_on:
- elasticsearch
# --- Cortex ---
cortex:
image: thehiveproject/cortex:3.1.8
ports:
- "9001:9001"
depends_on:
- elasticsearch
volumes:
es-data:
misp-data:
from pymisp import PyMISP
from pycti import OpenCTIApiClient
import json
class TIPFeedManager:
"""管理跨平台组件的威胁情报推送摄取。"""
def __init__(self, misp_url, misp_key, opencti_url, opencti_token):
self.misp = PyMISP(misp_url, misp_key, ssl=False)
self.opencti = OpenCTIApiClient(opencti_url, opencti_token)
def configure_osint_feeds(self):
"""在 MISP 中启用默认 OSINT 推送。"""
osint_feeds = [
{"name": "CIRCL OSINT", "id": 1},
{"name": "Botvrij.eu", "id": 2},
{"name": "abuse.ch URLhaus", "id": 5},
{"name": "abuse.ch Feodo Tracker", "id": 6},
]
for feed in osint_feeds:
try:
self.misp.enable_feed(feed["id"])
self.misp.fetch_feed(feed["id"])
print(f"[+] 已启用推送: {feed['name']}")
except Exception as e:
print(f"[-] 失败: {feed['name']}: {e}")
def configure_opencti_connectors(self):
"""列出并验证 OpenCTI 连接器状态。"""
connectors = self.opencti.connector.list()
for conn in connectors:
print(
f" 连接器: {conn['name']} - "
f"活跃: {conn['active']} - "
f"类型: {conn['connector_type']}"
)
def sync_misp_to_opencti(self):
"""验证 MISP-OpenCTI 同步是否正常运行。"""
# OpenCTI MISP 连接器自动处理此过程
# 检查连接器状态
connectors = self.opencti.connector.list()
misp_connector = [
c for c in connectors if "misp" in c["name"].lower()
]
if misp_connector:
print(f"[+] MISP 连接器活跃: {misp_connector[0]['active']}")
else:
print("[-] 未找到 MISP 连接器 - 在 Docker Compose 中配置")
import requests
class CortexEnrichment:
"""集成 Cortex 分析器实现自动化富化。"""
def __init__(self, cortex_url, cortex_key):
self.url = cortex_url
self.headers = {"Authorization": f"Bearer {cortex_key}"}
def list_analyzers(self):
"""列出可用的 Cortex 分析器。"""
resp = requests.get(
f"{self.url}/api/analyzer",
headers=self.headers,
timeout=30,
)
if resp.status_code == 200:
analyzers = resp.json()
for a in analyzers:
print(f" {a['name']}: {a.get('description', '')[:60]}")
return analyzers
return []
def analyze_observable(self, observable_type, observable_value, analyzer_id):
"""提交可观测对象进行分析。"""
job = {
"data": observable_value,
"dataType": observable_type,
"tlp": 2,
"message": "TIP 自动富化",
}
resp = requests.post(
f"{self.url}/api/analyzer/{analyzer_id}/run",
json=job,
headers=self.headers,
timeout=30,
)
if resp.status_code == 200:
return resp.json()
return None
def get_job_report(self, job_id):
"""获取已完成分析任务的报告。"""
resp = requests.get(
f"{self.url}/api/job/{job_id}/report",
headers=self.headers,
timeout=60,
)
if resp.status_code == 200:
return resp.json()
return None
class TIPMetrics:
"""收集平台指标用于分析师仪表板。"""
def __init__(self, misp, opencti):
self.misp = misp
self.opencti = opencti
def get_platform_stats(self):
"""收集所有平台组件的统计数据。"""
stats = {}
# MISP 统计
misp_stats = self.misp.get_server_statistics()
stats["misp"] = {
"total_events": misp_stats.get("event_count", 0),
"total_attributes": misp_stats.get("attribute_count", 0),
"active_feeds": len([
f for f in self.misp.feeds()
if f.get("Feed", {}).get("enabled")
]),
}
# OpenCTI 统计(通过 GraphQL)
stats["opencti"] = {
"total_indicators": self.opencti.indicator.list(
first=0, withPagination=True
).get("pagination", {}).get("globalCount", 0),
"total_reports": self.opencti.report.list(
first=0, withPagination=True
).get("pagination", {}).get("globalCount", 0),
}
return stats