Implements Python STIX/TAXII 2.1 feed consumers/producers: TAXII server discovery, collection polling, STIX object parsing with stix2, SIEM/TIP integration. For threat intel pipelines.
npx claudepluginhub killvxk/cybersecurity-skills-zhThis skill uses the workspace's default tool permissions.
STIX(结构化威胁信息表达式)和 TAXII(可信自动化情报信息交换)是 OASIS 开放标准,用于表示和传输网络威胁情报。本技能涵盖使用 Python 实现 STIX/TAXII 2.1 Feed 消费者和生产者,配置 TAXII 服务器发现,集合管理,轮询新情报,解析 STIX 2.1 对象,以及将 Feed 集成到 SIEM 和 TIP 平台。
Implements Python STIX/TAXII 2.1 feed consumer/producer: TAXII discovery, polling, STIX object parsing, SIEM/TIP integration for threat intelligence.
Implements Python STIX/TAXII 2.1 feed consumer/producer: discovers servers, polls collections, parses threat intel objects, integrates with SIEM/TIP platforms.
Processes STIX 2.1 bundles from TAXII 2.1 servers, validates objects against OASIS spec, normalizes to native formats, and routes to SIEM/TIP like MISP/OpenCTI. For TAXII integrations and CTI pipelines.
Share bugs, ideas, or general feedback.
STIX(结构化威胁信息表达式)和 TAXII(可信自动化情报信息交换)是 OASIS 开放标准,用于表示和传输网络威胁情报。本技能涵盖使用 Python 实现 STIX/TAXII 2.1 Feed 消费者和生产者,配置 TAXII 服务器发现,集合管理,轮询新情报,解析 STIX 2.1 对象,以及将 Feed 集成到 SIEM 和 TIP 平台。
taxii2-client、stix2、cti-taxii-client 库TAXII 定义了三种服务类型的 RESTful API:
STIX 对象分为以下类别:
Bundle 是一组一起传输的 STIX 对象集合。Bundle 具有唯一 ID,包含对象数组。TAXII 集合响应 GET 请求时提供 Bundle。
from taxii2client.v21 import Server, Collection, as_pages
# 连接到 MITRE ATT&CK TAXII 服务器
server = Server("https://cti-taxii.mitre.org/taxii2/", user="", password="")
print(f"标题:{server.title}")
print(f"描述:{server.description}")
# 列出 API 根
for api_root in server.api_roots:
print(f"\nAPI 根:{api_root.title}")
print(f" URL:{api_root.url}")
# 列出集合
for collection in api_root.collections:
print(f" 集合:{collection.title}(ID:{collection.id})")
print(f" 可读:{collection.can_read}")
print(f" 可写:{collection.can_write}")
from taxii2client.v21 import Collection, as_pages
import json
# 连接到 Enterprise ATT&CK 集合
ENTERPRISE_ATTACK_ID = "95ecc380-afe9-11e4-9b6c-751b66dd541e"
collection = Collection(
f"https://cti-taxii.mitre.org/stix/collections/{ENTERPRISE_ATTACK_ID}/",
user="",
password="",
)
print(f"集合:{collection.title}")
# 获取所有对象(分页)
all_objects = []
for envelope in as_pages(collection.get_objects, per_request=50):
objects = envelope.get("objects", [])
all_objects.extend(objects)
print(f" 已获取 {len(objects)} 个对象(总计:{len(all_objects)})")
print(f"\n总共获取对象:{len(all_objects)}")
# 按类型分类
type_counts = {}
for obj in all_objects:
obj_type = obj.get("type", "unknown")
type_counts[obj_type] = type_counts.get(obj_type, 0) + 1
for obj_type, count in sorted(type_counts.items()):
print(f" {obj_type}: {count}")
from stix2 import parse, Filter, MemoryStore
# 将对象加载到 MemoryStore 中查询
store = MemoryStore(stix_data=all_objects)
# 查询所有指标
indicators = store.query([Filter("type", "=", "indicator")])
print(f"指标:{len(indicators)}")
for ind in indicators[:5]:
print(f" {ind.name}: {ind.pattern}")
# 查询恶意软件
malware_list = store.query([Filter("type", "=", "malware")])
print(f"\n恶意软件家族:{len(malware_list)}")
# 查询威胁行为者
actors = store.query([Filter("type", "=", "intrusion-set")])
print(f"威胁行为者:{len(actors)}")
# 查找特定对象的关系
def get_related(store, source_id):
relationships = store.query([
Filter("type", "=", "relationship"),
Filter("source_ref", "=", source_id),
])
return relationships
# 示例:获取 APT28 使用的所有技术
apt28 = store.query([
Filter("type", "=", "intrusion-set"),
Filter("name", "=", "APT28"),
])
if apt28:
rels = get_related(store, apt28[0].id)
for rel in rels:
target = store.get(rel.target_ref)
if target:
print(f" {rel.relationship_type} -> {target.name}({target.type})")
from taxii2client.v21 import Collection, as_pages
from stix2 import parse, Bundle
from datetime import datetime, timedelta
import json
class TAXIIConsumer:
"""消费 STIX/TAXII 2.1 Feed 并提取 IOC。"""
def __init__(self, collection_url, user="", password=""):
self.collection = Collection(collection_url, user=user, password=password)
self.last_poll = None
def poll_new_objects(self, added_after=None):
"""轮询特定时间戳之后添加的对象。"""
if added_after is None:
added_after = (
self.last_poll or
(datetime.utcnow() - timedelta(days=1)).strftime(
"%Y-%m-%dT%H:%M:%S.000Z"
)
)
all_objects = []
kwargs = {"added_after": added_after}
for envelope in as_pages(
self.collection.get_objects, per_request=100, **kwargs
):
objects = envelope.get("objects", [])
all_objects.extend(objects)
self.last_poll = datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%S.000Z")
return all_objects
def extract_indicators(self, objects):
"""从 STIX 对象中提取可操作指标。"""
indicators = []
for obj in objects:
if obj.get("type") == "indicator":
indicators.append({
"id": obj.get("id"),
"name": obj.get("name", ""),
"pattern": obj.get("pattern", ""),
"pattern_type": obj.get("pattern_type", ""),
"valid_from": obj.get("valid_from", ""),
"valid_until": obj.get("valid_until", ""),
"indicator_types": obj.get("indicator_types", []),
"confidence": obj.get("confidence", 0),
"labels": obj.get("labels", []),
})
return indicators
def extract_observables(self, objects):
"""提取 STIX 网络可观测对象。"""
observables = []
observable_types = {
"ipv4-addr", "ipv6-addr", "domain-name", "url",
"file", "email-addr", "network-traffic",
}
for obj in objects:
if obj.get("type") in observable_types:
observables.append({
"type": obj["type"],
"value": obj.get("value", ""),
"id": obj.get("id"),
})
return observables
# 使用示例
consumer = TAXIIConsumer(
f"https://cti-taxii.mitre.org/stix/collections/{ENTERPRISE_ATTACK_ID}/"
)
new_objects = consumer.poll_new_objects()
indicators = consumer.extract_indicators(new_objects)
print(f"新指标:{len(indicators)}")
# medallion 配置(medallion.conf)
TAXII_CONFIG = {
"backend": {
"module_class": "MemoryBackend",
},
"users": {
"admin": "admin_password",
"readonly": "readonly_password",
},
"taxii": {
"max_content_length": 10485760,
},
}
# 运行 medallion 服务器:
# pip install medallion
# python -m medallion --config medallion.conf --port 5000
# 向本地 TAXII 服务器添加对象
import requests
def push_to_taxii(server_url, collection_id, stix_bundle, user, password):
"""将 STIX Bundle 推送到 TAXII 2.1 集合。"""
url = f"{server_url}/collections/{collection_id}/objects/"
headers = {
"Content-Type": "application/stix+json;version=2.1",
"Accept": "application/taxii+json;version=2.1",
}
response = requests.post(
url,
json=stix_bundle,
headers=headers,
auth=(user, password),
timeout=30,
)
return response.json()