Deploys MISP via Docker Compose to aggregate threat feeds from sources like abuse.ch and AlienVault OTX, correlate IOCs, and integrate with SIEM tools like Splunk.
npx claudepluginhub killvxk/cybersecurity-skills-zhThis skill uses the workspace's default tool permissions.
MISP 是领先的开源威胁情报平台,用于收集、存储、分发和共享网络安全指标和威胁情报。它将来自 OSINT 来源、商业提供商和共享社区的推送聚合到统一平台,具备自动关联、STIX/TAXII 导出,以及与 SIEM 和安全工具的直接集成。本技能涵盖通过 Docker 部署 MISP、配置来自 abuse.ch、AlienVault OTX 和 CIRCL 等来源的推送、设置自动推送同步,以及与 Splunk、Elasticsearch 和 SOAR 平台集成。
Deploys MISP via Docker to aggregate threat feeds from OSINT/commercial sources, correlate IOCs, and integrate with SIEMs like Splunk and Elasticsearch.
Deploys MISP via Docker, configures threat feeds like CIRCL OSINT and abuse.ch, collects/searches IOCs with PyMISP API, exports to STIX/TAXII.
Deploys MISP via Docker to aggregate threat intelligence feeds from OSINT sources like abuse.ch and AlienVault OTX, correlate IOCs, and integrate with SIEMs like Splunk and Elasticsearch.
Share bugs, ideas, or general feedback.
MISP 是领先的开源威胁情报平台,用于收集、存储、分发和共享网络安全指标和威胁情报。它将来自 OSINT 来源、商业提供商和共享社区的推送聚合到统一平台,具备自动关联、STIX/TAXII 导出,以及与 SIEM 和安全工具的直接集成。本技能涵盖通过 Docker 部署 MISP、配置来自 abuse.ch、AlienVault OTX 和 CIRCL 等来源的推送、设置自动推送同步,以及与 Splunk、Elasticsearch 和 SOAR 平台集成。
pymisp 库用于 API 交互MISP 将威胁情报存储为包含属性(IOC)的事件,按类型和类别组织。事件可以有标签(MITRE ATT&CK、TLP 标记、行业标签)、Galaxy(威胁行为者档案、恶意软件家族、攻击模式)和对象(相关属性的结构化分组)。事件在实例间自动关联。
MISP 支持三种推送格式:MISP 格式(原生 JSON 事件)、CSV(逗号分隔的 IOC)和自由文本(非结构化文本,自动提取 IOC)。推送可以是远程的(从 URL 获取)或本地的(上传文件)。MISP 内置了 80 多个默认 OSINT 推送,包括 abuse.ch URLhaus、Botvrij、CIRCL OSINT 和恶意软件流量分析。
MISP 实例可通过推/拉机制与其他 MISP 实例同步。共享组控制分发范围(仅限本组织、本社区、已连接社区、所有社区)。TAXII 服务器模块支持与 STIX/TAXII 消费者集成。
# docker-compose.yml 用于 MISP 部署
version: '3.8'
services:
misp:
image: coolacid/misp-docker:core-latest
container_name: misp
restart: unless-stopped
ports:
- "443:443"
- "80:80"
environment:
- MYSQL_HOST=misp-db
- MYSQL_DATABASE=misp
- MYSQL_USER=misp
- MYSQL_PASSWORD=misp_db_password_change_me
- MISP_ADMIN_EMAIL=admin@organization.com
- MISP_ADMIN_PASSPHRASE=admin_password_change_me
- MISP_BASEURL=https://misp.organization.com
- POSTFIX_RELAY_HOST=smtp.organization.com
- TIMEZONE=UTC
volumes:
- misp-data:/var/www/MISP/app/files
- misp-config:/var/www/MISP/app/Config
depends_on:
- misp-db
- misp-redis
misp-db:
image: mysql:8.0
container_name: misp-db
restart: unless-stopped
environment:
- MYSQL_DATABASE=misp
- MYSQL_USER=misp
- MYSQL_PASSWORD=misp_db_password_change_me
- MYSQL_ROOT_PASSWORD=root_password_change_me
volumes:
- misp-db-data:/var/lib/mysql
misp-redis:
image: redis:7
container_name: misp-redis
restart: unless-stopped
volumes:
misp-data:
misp-config:
misp-db-data:
from pymisp import PyMISP, MISPFeed
import json
class MISPFeedManager:
def __init__(self, misp_url, misp_key, verify_ssl=False):
self.misp = PyMISP(misp_url, misp_key, verify_ssl)
print(f"[+] 已连接到 MISP: {misp_url}")
def list_feeds(self):
"""列出所有已配置的推送。"""
feeds = self.misp.feeds()
enabled = [f for f in feeds if f.get("Feed", {}).get("enabled")]
disabled = [f for f in feeds if not f.get("Feed", {}).get("enabled")]
print(f"[+] 推送: {len(enabled)} 个已启用, {len(disabled)} 个已禁用")
return feeds
def enable_default_feeds(self):
"""启用推荐的默认 OSINT 推送。"""
recommended_feeds = [
"CIRCL OSINT Feed",
"Botvrij.eu - Indicators of Compromise",
"abuse.ch URLhaus Host file",
"abuse.ch Feodo Tracker",
"abuse.ch SSL Blacklist",
"malwaredomainlist",
"CyberCure - IP Feed",
]
feeds = self.misp.feeds()
enabled_count = 0
for feed in feeds:
feed_data = feed.get("Feed", {})
if feed_data.get("name") in recommended_feeds:
if not feed_data.get("enabled"):
self.misp.enable_feed(feed_data["id"])
self.misp.enable_feed_cache(feed_data["id"])
enabled_count += 1
print(f" [+] 已启用: {feed_data['name']}")
print(f"[+] 共启用 {enabled_count} 个推送")
def add_custom_feed(self, name, url, provider, feed_format="csv",
input_source="network", enabled=True):
"""添加自定义威胁情报推送。"""
feed = MISPFeed()
feed.name = name
feed.provider = provider
feed.url = url
feed.source_format = feed_format
feed.input_source = input_source
feed.enabled = enabled
feed.caching_enabled = True
feed.publish = False
feed.distribution = "3" # 所有社区
result = self.misp.add_feed(feed)
if "Feed" in result:
feed_id = result["Feed"]["id"]
print(f"[+] 已添加推送: {name} (ID: {feed_id})")
return feed_id
else:
print(f"[-] 添加推送失败: {result}")
return None
def fetch_all_feeds(self):
"""触发所有已启用推送的获取。"""
feeds = self.misp.feeds()
for feed in feeds:
feed_data = feed.get("Feed", {})
if feed_data.get("enabled"):
self.misp.fetch_feed(feed_data["id"])
print(f" [*] 正在获取: {feed_data['name']}")
print("[+] 已触发所有已启用推送的获取")
manager = MISPFeedManager(
"https://misp.organization.com",
"YOUR_MISP_API_KEY",
)
manager.enable_default_feeds()
manager.add_custom_feed(
name="Abuse.ch MalwareBazaar 近期",
url="https://bazaar.abuse.ch/export/csv/recent/",
provider="abuse.ch",
feed_format="csv",
)
manager.fetch_all_feeds()
def search_indicators(misp, value=None, type_attribute=None, tags=None, last_days=30):
"""在 MISP 中搜索并关联指标。"""
from datetime import datetime, timedelta
date_from = (datetime.now() - timedelta(days=last_days)).strftime("%Y-%m-%d")
search_params = {
"date_from": date_from,
"published": True,
"enforceWarninglist": True,
}
if value:
search_params["value"] = value
if type_attribute:
search_params["type_attribute"] = type_attribute
if tags:
search_params["tags"] = tags
results = misp.search("attributes", **search_params)
attributes = results.get("Attribute", [])
print(f"[+] 搜索返回 {len(attributes)} 个属性")
# 按事件分组以获取上下文
events = {}
for attr in attributes:
event_id = attr.get("event_id", "")
if event_id not in events:
events[event_id] = {"attributes": [], "tags": set()}
events[event_id]["attributes"].append({
"type": attr.get("type", ""),
"value": attr.get("value", ""),
"category": attr.get("category", ""),
"timestamp": attr.get("timestamp", ""),
})
for tag in attr.get("Tag", []):
events[event_id]["tags"].add(tag.get("name", ""))
return {"attributes": attributes, "events": events}
# 搜索特定 IOC
misp = manager.misp
results = search_indicators(misp, value="203.0.113.1")
results_by_type = search_indicators(misp, type_attribute="ip-dst", last_days=7)
results_by_tag = search_indicators(misp, tags=["tlp:white", "type:OSINT"])
import requests
from datetime import datetime, timedelta
class MISPSIEMExporter:
def __init__(self, misp_client):
self.misp = misp_client
def export_to_splunk(self, splunk_url, hec_token, days=7):
"""通过 HEC 将近期 MISP 指标导出到 Splunk。"""
date_from = (datetime.now() - timedelta(days=days)).strftime("%Y-%m-%d")
results = self.misp.search("attributes", date_from=date_from,
published=True, enforceWarninglist=True)
attributes = results.get("Attribute", [])
headers = {"Authorization": f"Splunk {hec_token}"}
exported = 0
for attr in attributes:
event = {
"event": {
"ioc_type": attr.get("type", ""),
"ioc_value": attr.get("value", ""),
"category": attr.get("category", ""),
"event_id": attr.get("event_id", ""),
"timestamp": attr.get("timestamp", ""),
"tags": [t.get("name", "") for t in attr.get("Tag", [])],
},
"sourcetype": "misp:attribute",
"source": "misp",
"index": "threat_intel",
}
resp = requests.post(
f"{splunk_url}/services/collector/event",
headers=headers, json=event, verify=False,
)
if resp.status_code == 200:
exported += 1
print(f"[+] 已导出 {exported}/{len(attributes)} 个指标到 Splunk")
def export_ioc_list(self, output_file, ioc_types=None, days=30):
"""导出防火墙/代理封锁列表的平面 IOC 列表。"""
ioc_types = ioc_types or ["ip-dst", "domain", "hostname", "url"]
date_from = (datetime.now() - timedelta(days=days)).strftime("%Y-%m-%d")
all_iocs = []
for ioc_type in ioc_types:
results = self.misp.search(
"attributes", type_attribute=ioc_type,
date_from=date_from, published=True,
enforceWarninglist=True,
)
for attr in results.get("Attribute", []):
all_iocs.append(attr.get("value", ""))
unique_iocs = sorted(set(all_iocs))
with open(output_file, "w") as f:
for ioc in unique_iocs:
f.write(f"{ioc}\n")
print(f"[+] 已导出 {len(unique_iocs)} 个唯一 IOC 到 {output_file}")
exporter = MISPSIEMExporter(misp)
exporter.export_ioc_list("blocklist_ips.txt", ioc_types=["ip-dst"], days=7)