Extracts attack patterns from CTI reports, maps behaviors to MITRE ATT&CK techniques, builds STIX 2.1 library, generates detection rules for Sigma/YARA.
npx claudepluginhub killvxk/cybersecurity-skills-zhThis skill uses the workspace's default tool permissions.
来自 Mandiant、CrowdStrike、Talos 和 Microsoft 等厂商的网络威胁情报(CTI)报告包含对手行为的详细描述,这些描述可以被提取、规范化并归入结构化攻击模式库。本技能涵盖解析 CTI 报告以提取对手技术、将行为映射到 MITRE ATT&CK 技术 ID、创建 STIX 2.1 攻击模式对象、构建按战术/技术和威胁行为者索引的可搜索库,以及从已记录模式生成检测规则模板。
Extracts attack patterns from CTI reports, catalogs them into STIX 2.1 library mapped to MITRE ATT&CK, and generates detection rule templates for threat-informed defense.
Extracts attack patterns from CTI reports, maps to MITRE ATT&CK, generates STIX objects and detection rules for threat-informed defense.
Maps threat actor TTPs to MITRE ATT&CK framework using Python libraries, generates ATT&CK Navigator heatmaps, identifies detection gaps, and creates reports linking IOCs to Enterprise/Mobile/ICS techniques.
Share bugs, ideas, or general feedback.
来自 Mandiant、CrowdStrike、Talos 和 Microsoft 等厂商的网络威胁情报(CTI)报告包含对手行为的详细描述,这些描述可以被提取、规范化并归入结构化攻击模式库。本技能涵盖解析 CTI 报告以提取对手技术、将行为映射到 MITRE ATT&CK 技术 ID、创建 STIX 2.1 攻击模式对象、构建按战术/技术和威胁行为者索引的可搜索库,以及从已记录模式生成检测规则模板。
stix2、mitreattack-python、spacy、requests 库CTI 报告以自然语言描述对手行为。提取过程包括:识别映射到 ATT&CK 技术的动词和技术术语、识别工具名称和恶意软件家族、识别基础设施指标,以及将行为序列映射到攻击链(Kill Chain 阶段)。
STIX 将攻击模式定义为结构化域对象(SDO),描述威胁行为者尝试攻陷目标的方式。每个模式通过外部引用链接到 ATT&CK,包含 Kill Chain 阶段(战术),并可关联到入侵集合、恶意软件和工具对象。
提取的攻击模式为检测工程提供依据:为 Sigma 规则创建提供具体过程示例、为关联规则提供行为序列、为 YARA 和 Snort 规则提供 IOC 模式,以及识别遥测数据缺口所需的数据源要求。
import re
import json
from collections import defaultdict
class CTIReportParser:
"""解析 CTI 报告以提取对手行为。"""
BEHAVIOR_INDICATORS = [
"used", "executed", "deployed", "leveraged", "exploited",
"established", "created", "modified", "downloaded", "uploaded",
"exfiltrated", "injected", "enumerated", "spawned", "dropped",
"persisted", "escalated", "moved laterally", "collected",
"encrypted", "compressed", "encoded", "obfuscated",
]
TOOL_PATTERNS = [
r'\b(Cobalt Strike|Mimikatz|PsExec|BloodHound|Rubeus|Impacket)\b',
r'\b(PowerShell|cmd\.exe|WMI|WMIC|certutil|bitsadmin)\b',
r'\b(Metasploit|Empire|Covenant|Sliver|Brute Ratel)\b',
r'\b(Lazagne|SharpHound|ADFind|Sharphound|Invoke-Obfuscation)\b',
]
TECHNIQUE_KEYWORDS = {
"spearphishing": "T1566",
"phishing attachment": "T1566.001",
"phishing link": "T1566.002",
"powershell": "T1059.001",
"command line": "T1059.003",
"scheduled task": "T1053.005",
"registry run key": "T1547.001",
"process injection": "T1055",
"dll side-loading": "T1574.002",
"credential dumping": "T1003",
"lsass": "T1003.001",
"kerberoasting": "T1558.003",
"pass the hash": "T1550.002",
"remote desktop": "T1021.001",
"smb": "T1021.002",
"winrm": "T1021.006",
"data staging": "T1074",
"exfiltration over c2": "T1041",
"dns tunneling": "T1071.004",
"web shell": "T1505.003",
}
def parse_report(self, text, report_metadata=None):
"""解析 CTI 报告并提取行为。"""
sentences = re.split(r'[.!?]\s+', text)
behaviors = []
for sentence in sentences:
sentence_lower = sentence.lower()
# 检查行为指示词
for indicator in self.BEHAVIOR_INDICATORS:
if indicator in sentence_lower:
behavior = {
"sentence": sentence.strip(),
"action": indicator,
"tools": self._extract_tools(sentence),
"technique_hints": self._match_techniques(sentence_lower),
}
if behavior["technique_hints"]:
behaviors.append(behavior)
break
print(f"[+] 从报告中提取到 {len(behaviors)} 个行为指标")
return behaviors
def _extract_tools(self, text):
"""从文本中提取工具/恶意软件名称。"""
tools = set()
for pattern in self.TOOL_PATTERNS:
matches = re.findall(pattern, text, re.IGNORECASE)
tools.update(matches)
return list(tools)
def _match_techniques(self, text):
"""将文本匹配到 ATT&CK 技术提示。"""
matches = []
for keyword, tech_id in self.TECHNIQUE_KEYWORDS.items():
if keyword in text:
matches.append({"keyword": keyword, "technique_id": tech_id})
return matches
parser = CTIReportParser()
sample_report = """
The threat actor used spearphishing attachments with macro-enabled documents to
gain initial access. Once inside, they executed PowerShell scripts to download
additional tooling. The actor leveraged Mimikatz to dump credentials from LSASS
memory. They then used pass the hash techniques for lateral movement via SMB
to multiple systems. Data was staged in a compressed archive and exfiltrated
over the existing C2 channel. The actor established persistence through
scheduled tasks and registry run keys.
"""
behaviors = parser.parse_report(sample_report)
from attackcti import attack_client
class ATTACKMapper:
def __init__(self):
self.lift = attack_client()
self.techniques = {}
self._load_techniques()
def _load_techniques(self):
"""加载所有 ATT&CK 技术用于映射。"""
all_techs = self.lift.get_enterprise_techniques()
for tech in all_techs:
tech_id = ""
for ref in tech.get("external_references", []):
if ref.get("source_name") == "mitre-attack":
tech_id = ref.get("external_id", "")
break
if tech_id:
self.techniques[tech_id] = {
"name": tech.get("name", ""),
"description": tech.get("description", "")[:500],
"tactics": [p.get("phase_name") for p in tech.get("kill_chain_phases", [])],
"platforms": tech.get("x_mitre_platforms", []),
"data_sources": tech.get("x_mitre_data_sources", []),
}
print(f"[+] 已加载 {len(self.techniques)} 个 ATT&CK 技术")
def map_behaviors(self, behaviors):
"""将提取的行为映射到 ATT&CK 技术。"""
mapped = []
for behavior in behaviors:
for hint in behavior.get("technique_hints", []):
tech_id = hint["technique_id"]
if tech_id in self.techniques:
tech_info = self.techniques[tech_id]
mapped.append({
"technique_id": tech_id,
"technique_name": tech_info["name"],
"tactics": tech_info["tactics"],
"source_sentence": behavior["sentence"],
"tools_observed": behavior["tools"],
"keyword_matched": hint["keyword"],
"data_sources": tech_info["data_sources"],
})
print(f"[+] 已将 {len(mapped)} 个行为映射到 ATT&CK 技术")
return mapped
mapper = ATTACKMapper()
mapped_behaviors = mapper.map_behaviors(behaviors)
from stix2 import AttackPattern, Relationship, Bundle, TLP_GREEN
from datetime import datetime
class AttackPatternLibrary:
def __init__(self):
self.patterns = []
self.relationships = []
def add_pattern_from_mapping(self, mapping, report_source="CTI Report"):
"""从映射的行为创建 STIX 攻击模式。"""
pattern = AttackPattern(
name=mapping["technique_name"],
description=f"观察到的行为: {mapping['source_sentence']}\n\n"
f"工具: {', '.join(mapping['tools_observed']) or '未识别'}\n"
f"来源: {report_source}",
external_references=[{
"source_name": "mitre-attack",
"external_id": mapping["technique_id"],
"url": f"https://attack.mitre.org/techniques/{mapping['technique_id'].replace('.', '/')}/",
}],
kill_chain_phases=[{
"kill_chain_name": "mitre-attack",
"phase_name": tactic,
} for tactic in mapping["tactics"]],
object_marking_refs=[TLP_GREEN],
)
self.patterns.append(pattern)
return pattern
def build_library(self, mapped_behaviors, report_source="CTI Report"):
"""从映射构建完整攻击模式库。"""
seen_techniques = set()
for mapping in mapped_behaviors:
tech_id = mapping["technique_id"]
if tech_id not in seen_techniques:
self.add_pattern_from_mapping(mapping, report_source)
seen_techniques.add(tech_id)
bundle = Bundle(objects=self.patterns + self.relationships)
print(f"[+] 库构建完成: {len(self.patterns)} 个攻击模式")
return bundle
def export_library(self, output_file="attack_pattern_library.json"):
bundle = Bundle(objects=self.patterns + self.relationships)
with open(output_file, "w") as f:
f.write(bundle.serialize(pretty=True))
print(f"[+] 库已导出到 {output_file}")
def generate_detection_templates(self, mapped_behaviors):
"""从攻击模式生成 Sigma 规则模板。"""
templates = []
for mapping in mapped_behaviors:
template = {
"title": f"检测: {mapping['technique_name']} ({mapping['technique_id']})",
"status": "experimental",
"description": f"基于 CTI 报告观察结果检测 {mapping['technique_name']}",
"references": [
f"https://attack.mitre.org/techniques/{mapping['technique_id'].replace('.', '/')}/",
],
"tags": [
f"attack.{mapping['tactics'][0]}" if mapping['tactics'] else "attack.unknown",
f"attack.{mapping['technique_id'].lower()}",
],
"data_sources": mapping.get("data_sources", []),
"observed_tools": mapping.get("tools_observed", []),
"source_context": mapping["source_sentence"],
}
templates.append(template)
with open("detection_templates.json", "w") as f:
json.dump(templates, f, indent=2)
print(f"[+] 已生成 {len(templates)} 个检测模板")
return templates
library = AttackPatternLibrary()
bundle = library.build_library(mapped_behaviors, "Sample CTI Report")
library.export_library()
templates = library.generate_detection_templates(mapped_behaviors)