From cybersecurity-skills
Implements Python STIX/TAXII 2.1 feed consumer/producer: discovers servers, polls collections, parses threat intel objects, integrates with SIEM/TIP platforms.
npx claudepluginhub mukul975/anthropic-cybersecurity-skills --plugin cybersecurity-skillsThis skill uses the workspace's default tool permissions.
STIX (Structured Threat Information eXpression) and TAXII (Trusted Automated eXchange of Intelligence Information) are OASIS open standards for representing and transporting cyber threat intelligence. This skill covers implementing a STIX/TAXII 2.1 feed consumer and producer using Python, configuring TAXII server discovery, collection management, polling for new intelligence, parsing STIX 2.1 o...
Applies Acme Corporation brand guidelines including colors, fonts, layouts, and messaging to generated PowerPoint, Excel, and PDF documents.
Builds DCF models with sensitivity analysis, Monte Carlo simulations, and scenario planning for investment valuation and risk assessment.
Calculates profitability (ROE, margins), liquidity (current ratio), leverage, efficiency, and valuation (P/E, EV/EBITDA) ratios from financial statements in CSV, JSON, text, or Excel for investment analysis.
STIX (Structured Threat Information eXpression) and TAXII (Trusted Automated eXchange of Intelligence Information) are OASIS open standards for representing and transporting cyber threat intelligence. This skill covers implementing a STIX/TAXII 2.1 feed consumer and producer using Python, configuring TAXII server discovery, collection management, polling for new intelligence, parsing STIX 2.1 objects, and integrating feeds into SIEM and TIP platforms.
taxii2-client, stix2, cti-taxii-client librariesTAXII defines a RESTful API with three service types:
STIX objects are categorized as:
A Bundle is a collection of STIX objects transmitted together. Bundles have a unique ID and contain an array of objects. TAXII collections serve bundles in response to GET requests.
from taxii2client.v21 import Server, Collection, as_pages
# Connect to MITRE ATT&CK TAXII server
server = Server("https://cti-taxii.mitre.org/taxii2/", user="", password="")
print(f"Title: {server.title}")
print(f"Description: {server.description}")
# List API roots
for api_root in server.api_roots:
print(f"\nAPI Root: {api_root.title}")
print(f" URL: {api_root.url}")
# List collections
for collection in api_root.collections:
print(f" Collection: {collection.title} (ID: {collection.id})")
print(f" Can Read: {collection.can_read}")
print(f" Can Write: {collection.can_write}")
from taxii2client.v21 import Collection, as_pages
import json
# Connect to Enterprise ATT&CK collection
ENTERPRISE_ATTACK_ID = "95ecc380-afe9-11e4-9b6c-751b66dd541e"
collection = Collection(
f"https://cti-taxii.mitre.org/stix/collections/{ENTERPRISE_ATTACK_ID}/",
user="",
password="",
)
print(f"Collection: {collection.title}")
# Fetch all objects (paginated)
all_objects = []
for envelope in as_pages(collection.get_objects, per_request=50):
objects = envelope.get("objects", [])
all_objects.extend(objects)
print(f" Fetched {len(objects)} objects (total: {len(all_objects)})")
print(f"\nTotal objects retrieved: {len(all_objects)}")
# Categorize by type
type_counts = {}
for obj in all_objects:
obj_type = obj.get("type", "unknown")
type_counts[obj_type] = type_counts.get(obj_type, 0) + 1
for obj_type, count in sorted(type_counts.items()):
print(f" {obj_type}: {count}")
from stix2 import parse, Filter, MemoryStore
# Load objects into a MemoryStore for querying
store = MemoryStore(stix_data=all_objects)
# Query for all indicators
indicators = store.query([Filter("type", "=", "indicator")])
print(f"Indicators: {len(indicators)}")
for ind in indicators[:5]:
print(f" {ind.name}: {ind.pattern}")
# Query for malware
malware_list = store.query([Filter("type", "=", "malware")])
print(f"\nMalware families: {len(malware_list)}")
# Query for threat actors
actors = store.query([Filter("type", "=", "intrusion-set")])
print(f"Threat actors: {len(actors)}")
# Find relationships for a specific object
def get_related(store, source_id):
relationships = store.query([
Filter("type", "=", "relationship"),
Filter("source_ref", "=", source_id),
])
return relationships
# Example: Get all techniques used by APT28
apt28 = store.query([
Filter("type", "=", "intrusion-set"),
Filter("name", "=", "APT28"),
])
if apt28:
rels = get_related(store, apt28[0].id)
for rel in rels:
target = store.get(rel.target_ref)
if target:
print(f" {rel.relationship_type} -> {target.name} ({target.type})")
from taxii2client.v21 import Collection, as_pages
from stix2 import parse, Bundle
from datetime import datetime, timedelta
import json
class TAXIIConsumer:
"""Consume STIX/TAXII 2.1 feeds and extract IOCs."""
def __init__(self, collection_url, user="", password=""):
self.collection = Collection(collection_url, user=user, password=password)
self.last_poll = None
def poll_new_objects(self, added_after=None):
"""Poll for objects added after a specific timestamp."""
if added_after is None:
added_after = (
self.last_poll or
(datetime.utcnow() - timedelta(days=1)).strftime(
"%Y-%m-%dT%H:%M:%S.000Z"
)
)
all_objects = []
kwargs = {"added_after": added_after}
for envelope in as_pages(
self.collection.get_objects, per_request=100, **kwargs
):
objects = envelope.get("objects", [])
all_objects.extend(objects)
self.last_poll = datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%S.000Z")
return all_objects
def extract_indicators(self, objects):
"""Extract actionable indicators from STIX objects."""
indicators = []
for obj in objects:
if obj.get("type") == "indicator":
indicators.append({
"id": obj.get("id"),
"name": obj.get("name", ""),
"pattern": obj.get("pattern", ""),
"pattern_type": obj.get("pattern_type", ""),
"valid_from": obj.get("valid_from", ""),
"valid_until": obj.get("valid_until", ""),
"indicator_types": obj.get("indicator_types", []),
"confidence": obj.get("confidence", 0),
"labels": obj.get("labels", []),
})
return indicators
def extract_observables(self, objects):
"""Extract STIX Cyber Observables."""
observables = []
observable_types = {
"ipv4-addr", "ipv6-addr", "domain-name", "url",
"file", "email-addr", "network-traffic",
}
for obj in objects:
if obj.get("type") in observable_types:
observables.append({
"type": obj["type"],
"value": obj.get("value", ""),
"id": obj.get("id"),
})
return observables
# Usage
consumer = TAXIIConsumer(
f"https://cti-taxii.mitre.org/stix/collections/{ENTERPRISE_ATTACK_ID}/"
)
new_objects = consumer.poll_new_objects()
indicators = consumer.extract_indicators(new_objects)
print(f"New indicators: {len(indicators)}")
# medallion configuration (medallion.conf)
TAXII_CONFIG = {
"backend": {
"module_class": "MemoryBackend",
},
"users": {
"admin": "admin_password",
"readonly": "readonly_password",
},
"taxii": {
"max_content_length": 10485760,
},
}
# Run medallion server:
# pip install medallion
# python -m medallion --config medallion.conf --port 5000
# Add objects to local TAXII server
import requests
def push_to_taxii(server_url, collection_id, stix_bundle, user, password):
"""Push STIX bundle to a TAXII 2.1 collection."""
url = f"{server_url}/collections/{collection_id}/objects/"
headers = {
"Content-Type": "application/stix+json;version=2.1",
"Accept": "application/taxii+json;version=2.1",
}
response = requests.post(
url,
json=stix_bundle,
headers=headers,
auth=(user, password),
timeout=30,
)
return response.json()