Implements Diamond Model for cyber intrusion analysis in Python: classifies events by adversary/capability/infrastructure/victim, builds activity threads and attack graphs with NetworkX. For threat intelligence from intrusion data.
npx claudepluginhub killvxk/cybersecurity-skills-zhThis skill uses the workspace's default tool permissions.
菱形入侵分析模型通过审查四个核心特征(对手、能力、基础设施、受害者)提供结构化框架,用于分析网络入侵事件。本技能涵盖以编程方式实现菱形模型,对入侵事件进行分类和关联,构建链接相关事件的活动线程,创建活动攻击图,并从入侵数据中生成可枢纽的情报。
Implements Diamond Model in Python to classify cyber intrusion events, correlate activities, build threads, and generate threat intelligence pivots.
Implements Diamond Model in Python to classify/c correlate intrusion events, build activity threads, create graphs, generate pivot intelligence. For threat intel analysis.
Applies Diamond Model to structure intrusions into adversary, capability, infrastructure, and victim vertices with relationships for investigation, attribution, and event clustering to common threat actors. For post-incident analysis and threat intel products.
Share bugs, ideas, or general feedback.
菱形入侵分析模型通过审查四个核心特征(对手、能力、基础设施、受害者)提供结构化框架,用于分析网络入侵事件。本技能涵盖以编程方式实现菱形模型,对入侵事件进行分类和关联,构建链接相关事件的活动线程,创建活动攻击图,并从入侵数据中生成可枢纽的情报。
networkx、stix2、graphviz 库from dataclasses import dataclass, field
from datetime import datetime
from typing import Optional
import json
import uuid
@dataclass
class DiamondEvent:
adversary: str = ""
capability: str = ""
infrastructure: str = ""
victim: str = ""
timestamp: str = ""
phase: str = ""
result: str = ""
direction: str = ""
methodology: str = ""
confidence: int = 0
notes: str = ""
event_id: str = field(default_factory=lambda: str(uuid.uuid4())[:8])
mitre_techniques: list = field(default_factory=list)
iocs: list = field(default_factory=list)
def to_dict(self):
return {
"event_id": self.event_id,
"adversary": self.adversary,
"capability": self.capability,
"infrastructure": self.infrastructure,
"victim": self.victim,
"timestamp": self.timestamp,
"phase": self.phase,
"result": self.result,
"direction": self.direction,
"methodology": self.methodology,
"confidence": self.confidence,
"mitre_techniques": self.mitre_techniques,
"iocs": self.iocs,
"notes": self.notes,
}
import networkx as nx
class DiamondAnalysis:
def __init__(self):
self.events = []
self.graph = nx.DiGraph()
def add_event(self, event: DiamondEvent):
self.events.append(event)
self.graph.add_node(event.event_id, **event.to_dict())
def build_activity_thread(self):
"""按时间顺序将事件链接为活动线程。"""
sorted_events = sorted(self.events, key=lambda e: e.timestamp)
for i in range(len(sorted_events) - 1):
self.graph.add_edge(
sorted_events[i].event_id,
sorted_events[i + 1].event_id,
relationship="followed_by",
)
def find_pivots(self):
"""查找共享基础设施或能力的事件枢纽点。"""
pivots = {"infrastructure": {}, "capability": {}, "adversary": {}}
for event in self.events:
if event.infrastructure:
pivots["infrastructure"].setdefault(event.infrastructure, []).append(event.event_id)
if event.capability:
pivots["capability"].setdefault(event.capability, []).append(event.event_id)
if event.adversary:
pivots["adversary"].setdefault(event.adversary, []).append(event.event_id)
return {
k: {pk: pv for pk, pv in v.items() if len(pv) > 1}
for k, v in pivots.items()
}
def generate_report(self):
return {
"total_events": len(self.events),
"unique_adversaries": len(set(e.adversary for e in self.events if e.adversary)),
"unique_victims": len(set(e.victim for e in self.events if e.victim)),
"unique_infrastructure": len(set(e.infrastructure for e in self.events if e.infrastructure)),
"pivots": self.find_pivots(),
"events": [e.to_dict() for e in self.events],
}