npx claudepluginhub plurigrid/asi --plugin asiThis skill uses the workspace's default tool permissions.
The Diamond Model of Intrusion Analysis provides a structured framework for analyzing cyber intrusions by examining four core features: Adversary, Capability, Infrastructure, and Victim. This skill covers implementing the Diamond Model programmatically to classify and correlate intrusion events, build activity threads linking related events, create activity-attack graphs, and generate pivot-rea...
Implements Diamond Model in Python to classify cyber intrusion events, correlate activities, build threads, and generate threat intelligence pivots.
Implements Diamond Model for cyber intrusion analysis in Python: classifies events by adversary/capability/infrastructure/victim, builds activity threads and attack graphs with NetworkX. For threat intelligence from intrusion data.
Uses PyMISP to create, enrich, and share threat intelligence events on MISP, managing IOCs, feeds, STIX exports, and community sharing. For security assessments and incident response.
Share bugs, ideas, or general feedback.
The Diamond Model of Intrusion Analysis provides a structured framework for analyzing cyber intrusions by examining four core features: Adversary, Capability, Infrastructure, and Victim. This skill covers implementing the Diamond Model programmatically to classify and correlate intrusion events, build activity threads linking related events, create activity-attack graphs, and generate pivot-ready intelligence from intrusion data.
networkx, stix2, graphviz librariesfrom dataclasses import dataclass, field
from datetime import datetime
from typing import Optional
import json
import uuid
@dataclass
class DiamondEvent:
adversary: str = ""
capability: str = ""
infrastructure: str = ""
victim: str = ""
timestamp: str = ""
phase: str = ""
result: str = ""
direction: str = ""
methodology: str = ""
confidence: int = 0
notes: str = ""
event_id: str = field(default_factory=lambda: str(uuid.uuid4())[:8])
mitre_techniques: list = field(default_factory=list)
iocs: list = field(default_factory=list)
def to_dict(self):
return {
"event_id": self.event_id,
"adversary": self.adversary,
"capability": self.capability,
"infrastructure": self.infrastructure,
"victim": self.victim,
"timestamp": self.timestamp,
"phase": self.phase,
"result": self.result,
"direction": self.direction,
"methodology": self.methodology,
"confidence": self.confidence,
"mitre_techniques": self.mitre_techniques,
"iocs": self.iocs,
"notes": self.notes,
}
import networkx as nx
class DiamondAnalysis:
def __init__(self):
self.events = []
self.graph = nx.DiGraph()
def add_event(self, event: DiamondEvent):
self.events.append(event)
self.graph.add_node(event.event_id, **event.to_dict())
def build_activity_thread(self):
"""Link events chronologically into activity threads."""
sorted_events = sorted(self.events, key=lambda e: e.timestamp)
for i in range(len(sorted_events) - 1):
self.graph.add_edge(
sorted_events[i].event_id,
sorted_events[i + 1].event_id,
relationship="followed_by",
)
def find_pivots(self):
"""Find pivot points where events share infrastructure or capabilities."""
pivots = {"infrastructure": {}, "capability": {}, "adversary": {}}
for event in self.events:
if event.infrastructure:
pivots["infrastructure"].setdefault(event.infrastructure, []).append(event.event_id)
if event.capability:
pivots["capability"].setdefault(event.capability, []).append(event.event_id)
if event.adversary:
pivots["adversary"].setdefault(event.adversary, []).append(event.event_id)
return {
k: {pk: pv for pk, pv in v.items() if len(pv) > 1}
for k, v in pivots.items()
}
def generate_report(self):
return {
"total_events": len(self.events),
"unique_adversaries": len(set(e.adversary for e in self.events if e.adversary)),
"unique_victims": len(set(e.victim for e in self.events if e.victim)),
"unique_infrastructure": len(set(e.infrastructure for e in self.events if e.infrastructure)),
"pivots": self.find_pivots(),
"events": [e.to_dict() for e in self.events],
}