Deploys and configures TAXII 2.1 servers using Medallion or OpenTAXII to share STIX 2.1 bundles for automated threat intelligence and IOC exchange.
npx claudepluginhub killvxk/cybersecurity-skills-zhThis skill uses the workspace's default tool permissions.
TAXII(可信自动化情报信息交换)是 OASIS 标准协议,用于通过 HTTPS 交换网络威胁情报。OpenTAXII 是 EclecticIQ 开发的开源 TAXII 服务器实现,支持 TAXII 1.x;OASIS cti-taxii-server 提供 TAXII 2.1 参考实现。本技能涵盖部署 TAXII 服务器,配置威胁情报 Feed 集合,发布 STIX 2.1 Bundle,以及与 SIEM/SOAR 平台集成实现自动化指标摄入。
Deploys and configures OpenTAXII or Medallion TAXII 2.1 servers to share and consume STIX-formatted cyber threat intelligence for automated indicator exchange between organizations.
Deploy and configure OpenTAXII or Medallion TAXII servers to share STIX 2.1 cyber threat intelligence via TAXII 2.1 protocol with Docker and Python.
Implements Python STIX/TAXII 2.1 feed consumers/producers: TAXII server discovery, collection polling, STIX object parsing with stix2, SIEM/TIP integration. For threat intel pipelines.
Share bugs, ideas, or general feedback.
TAXII(可信自动化情报信息交换)是 OASIS 标准协议,用于通过 HTTPS 交换网络威胁情报。OpenTAXII 是 EclecticIQ 开发的开源 TAXII 服务器实现,支持 TAXII 1.x;OASIS cti-taxii-server 提供 TAXII 2.1 参考实现。本技能涵盖部署 TAXII 服务器,配置威胁情报 Feed 集合,发布 STIX 2.1 Bundle,以及与 SIEM/SOAR 平台集成实现自动化指标摄入。
medallion、stix2、taxii2-client、opentaxii、cabby 库TAXII 2.1 定义三种服务:发现(查找可用 API 根)、API 根(集合的入口点)和集合(CTI 对象的存储库)。集合支持两种访问模式:集合端点允许消费者轮询对象,状态端点跟踪添加操作的结果。TAXII 使用带 application/taxii+json;version=2.1 的 HTTP 内容协商。
TAXII 支持集线器和辐射(中心服务器向消费者分发)、点对点(合作伙伴之间双向共享)和源-订阅(生产者发布,消费者订阅)模型。每个集合可以有只读、只写或读写访问控制。
TAXII 传输包含结构化威胁信息对象的 STIX 2.1 Bundle:指标(检测模式)、观测数据、恶意软件、攻击模式、威胁行为者、入侵集合、活动、关系和目击记录。每个对象都有唯一的 STIX ID、创建/修改时间戳和可选的 TLP 标记定义。
# 安装 medallion(OASIS 参考实现)
# pip install medallion
# medallion_config.json
import json
config = {
"backend": {
"module_class": "MemoryBackend",
"filename": "taxii_data.json"
},
"users": {
"admin": "admin_password_change_me",
"analyst": "analyst_password_change_me",
"readonly": "readonly_password_change_me"
},
"taxii": {
"max_content_length": 10485760
}
}
# 创建初始数据存储
taxii_data = {
"discovery": {
"title": "威胁情报 TAXII 服务器",
"description": "用于共享 CTI 指标的 TAXII 2.1 服务器",
"contact": "soc@organization.com",
"default": "https://taxii.organization.com/api/",
"api_roots": ["https://taxii.organization.com/api/"]
},
"api_roots": {
"api": {
"title": "威胁情报 API 根",
"description": "威胁情报共享的主要 API 根",
"versions": ["application/taxii+json;version=2.1"],
"max_content_length": 10485760,
"collections": {
"malware-iocs": {
"id": "91a7b528-80eb-42ed-a74d-c6fbd5a26116",
"title": "恶意软件 IOC",
"description": "来自恶意软件分析的失陷指标",
"can_read": True,
"can_write": True,
"media_types": ["application/stix+json;version=2.1"]
},
"apt-intelligence": {
"id": "52892447-4d7e-4f70-b94a-5460e242dd23",
"title": "APT 情报",
"description": "高级持续性威胁组织情报",
"can_read": True,
"can_write": True,
"media_types": ["application/stix+json;version=2.1"]
},
"phishing-indicators": {
"id": "64993447-4d7e-4f70-b94a-5460e242ee34",
"title": "网络钓鱼指标",
"description": "网络钓鱼 URL、域名和电子邮件指标",
"can_read": True,
"can_write": True,
"media_types": ["application/stix+json;version=2.1"]
}
}
}
}
}
with open("medallion_config.json", "w") as f:
json.dump(config, f, indent=2)
with open("taxii_data.json", "w") as f:
json.dump(taxii_data, f, indent=2)
print("[+] TAXII 服务器配置已创建")
# docker-compose.yml
version: '3.8'
services:
taxii-server:
image: python:3.11-slim
container_name: taxii-server
working_dir: /app
volumes:
- ./medallion_config.json:/app/medallion_config.json
- ./taxii_data.json:/app/taxii_data.json
- ./certs:/app/certs
ports:
- "6100:6100"
command: >
bash -c "pip install medallion &&
medallion --host 0.0.0.0 --port 6100
--config /app/medallion_config.json"
restart: unless-stopped
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:6100/taxii2/"]
interval: 30s
timeout: 10s
retries: 3
from stix2 import Indicator, Malware, Relationship, Bundle, TLP_WHITE
from taxii2client.v21 import Server, Collection, as_pages
import json
from datetime import datetime
class TAXIIPublisher:
def __init__(self, server_url, username, password):
self.server = Server(
server_url,
user=username,
password=password,
)
def list_collections(self):
"""列出所有可用集合。"""
api_root = self.server.api_roots[0]
for collection in api_root.collections:
print(f" [{collection.id}] {collection.title} "
f"(读={collection.can_read}, 写={collection.can_write})")
return api_root.collections
def publish_indicators(self, collection_id, indicators):
"""将 STIX 指标发布到 TAXII 集合。"""
api_root = self.server.api_roots[0]
collection = Collection(
f"{api_root.url}collections/{collection_id}/",
user=self.server._user,
password=self.server._password,
)
bundle = Bundle(objects=indicators)
response = collection.add_objects(bundle.serialize())
print(f"[+] 已向 {collection_id} 发布 {len(indicators)} 个对象")
print(f" 状态:{response.status}")
return response
def create_malware_indicators(self):
"""创建示例 STIX 恶意软件指标。"""
malware = Malware(
name="SUNBURST",
description="SolarWinds 供应链攻击(2020)中使用的后门。"
"被木马化的 SolarWinds.Orion.Core.BusinessLayer.dll 模块。",
malware_types=["backdoor", "trojan"],
is_family=True,
object_marking_refs=[TLP_WHITE],
)
indicator_hash = Indicator(
name="SUNBURST SHA-256 哈希",
description="被木马化的 SolarWinds Orion DLL 的 SHA-256 哈希",
pattern="[file:hashes.'SHA-256' = "
"'32519b85c0b422e4656de6e6c41878e95fd95026267daab4215ee59c107d6c77']",
pattern_type="stix",
valid_from=datetime(2020, 12, 13),
indicator_types=["malicious-activity"],
object_marking_refs=[TLP_WHITE],
)
indicator_domain = Indicator(
name="SUNBURST C2 域名模式",
description="SUNBURST 用于 C2 的 DGA 域名模式",
pattern="[domain-name:value MATCHES "
"'^[a-z0-9]{4,}\\.appsync-api\\..*\\.avsvmcloud\\.com$']",
pattern_type="stix",
valid_from=datetime(2020, 12, 13),
indicator_types=["malicious-activity"],
object_marking_refs=[TLP_WHITE],
)
rel = Relationship(
relationship_type="indicates",
source_ref=indicator_hash.id,
target_ref=malware.id,
)
return [malware, indicator_hash, indicator_domain, rel]
publisher = TAXIIPublisher(
"https://taxii.organization.com/taxii2/",
"admin", "admin_password_change_me"
)
collections = publisher.list_collections()
indicators = publisher.create_malware_indicators()
publisher.publish_indicators("91a7b528-80eb-42ed-a74d-c6fbd5a26116", indicators)
from taxii2client.v21 import Server, Collection, as_pages
import json
class TAXIIConsumer:
def __init__(self, server_url, username, password):
self.server = Server(server_url, user=username, password=password)
def poll_collection(self, collection_id, added_after=None):
"""轮询集合获取新 STIX 对象。"""
api_root = self.server.api_roots[0]
collection = Collection(
f"{api_root.url}collections/{collection_id}/",
user=self.server._user,
password=self.server._password,
)
kwargs = {}
if added_after:
kwargs["added_after"] = added_after
all_objects = []
for bundle in as_pages(collection.get_objects, per_request=50, **kwargs):
objects = json.loads(bundle).get("objects", [])
all_objects.extend(objects)
indicators = [o for o in all_objects if o.get("type") == "indicator"]
malware = [o for o in all_objects if o.get("type") == "malware"]
relationships = [o for o in all_objects if o.get("type") == "relationship"]
print(f"[+] 已轮询 {len(all_objects)} 个对象:"
f"{len(indicators)} 个指标,{len(malware)} 个恶意软件,"
f"{len(relationships)} 个关系")
return all_objects
def extract_iocs_for_siem(self, stix_objects):
"""从 STIX 对象中提取 IOC 用于 SIEM 摄入。"""
iocs = []
for obj in stix_objects:
if obj.get("type") == "indicator":
pattern = obj.get("pattern", "")
iocs.append({
"id": obj.get("id"),
"name": obj.get("name", ""),
"pattern": pattern,
"valid_from": obj.get("valid_from", ""),
"indicator_types": obj.get("indicator_types", []),
"confidence": obj.get("confidence", 0),
})
return iocs
consumer = TAXIIConsumer(
"https://taxii.organization.com/taxii2/",
"analyst", "analyst_password_change_me"
)
objects = consumer.poll_collection("91a7b528-80eb-42ed-a74d-c6fbd5a26116")
iocs = consumer.extract_iocs_for_siem(objects)
import requests
def push_to_splunk(iocs, splunk_url, hec_token):
"""通过 HEC 将提取的 IOC 推送到 Splunk。"""
headers = {"Authorization": f"Splunk {hec_token}"}
for ioc in iocs:
event = {
"event": ioc,
"sourcetype": "stix:indicator",
"source": "taxii-server",
"index": "threat_intel",
}
resp = requests.post(
f"{splunk_url}/services/collector/event",
headers=headers,
json=event,
verify=False,
)
if resp.status_code != 200:
print(f"[-] Splunk HEC 错误:{resp.text}")
print(f"[+] 已向 Splunk 推送 {len(iocs)} 个 IOC")
def push_to_elasticsearch(iocs, es_url, index="threat-intel"):
"""将 IOC 推送到 Elasticsearch。"""
for ioc in iocs:
resp = requests.post(
f"{es_url}/{index}/_doc",
json=ioc,
headers={"Content-Type": "application/json"},
)
if resp.status_code not in (200, 201):
print(f"[-] ES 错误:{resp.text}")
print(f"[+] 已在 Elasticsearch 中索引 {len(iocs)} 个 IOC")