Queries Malpedia API to research malware family relationships, track variant evolution, associate families with threat actors, and integrate YARA rules for lineage detection.
npx claudepluginhub killvxk/cybersecurity-skills-zhThis skill uses the workspace's default tool permissions.
Malpedia 是由弗劳恩霍夫 FKIE 维护的协作平台,收录了恶意软件家族的别名、YARA 规则、威胁行为者关联和参考报告。收录超过 2,600 个恶意软件家族,是了解恶意软件谱系、追踪变体演化以及将恶意软件关联到特定威胁组织的权威资源。本技能涵盖查询 Malpedia API、映射恶意软件家族关系、提取 YARA 规则用于检测,以及构建对手所用恶意软件生态系统的情报。
Queries Malpedia API to map malware family relationships, track variant evolution, link families to threat actors, and extract YARA rules for detection.
Queries Malpedia API to research malware family relationships, track variant evolution, link families to threat actors, and integrate YARA rules for detection.
Triages and classifies malware samples using YARA rules to match strings, byte sequences, file patterns, and structures. Guides rule creation, scanning, and workflow integration for signature-based detection.
Share bugs, ideas, or general feedback.
Malpedia 是由弗劳恩霍夫 FKIE 维护的协作平台,收录了恶意软件家族的别名、YARA 规则、威胁行为者关联和参考报告。收录超过 2,600 个恶意软件家族,是了解恶意软件谱系、追踪变体演化以及将恶意软件关联到特定威胁组织的权威资源。本技能涵盖查询 Malpedia API、映射恶意软件家族关系、提取 YARA 规则用于检测,以及构建对手所用恶意软件生态系统的情报。
requests、yara-python、stix2 库Malpedia 将恶意软件组织为家族(如"win.cobalt_strike"),每个家族包含:别名(厂商特定名称,如"Beacon"、"CobaltStrike")、YARA 规则(社区和厂商贡献)、行为者关联(使用该家族的威胁组织)、参考报告(记录该家族的 CTI 报告)和样本哈希(每个变体的代表性样本)。
Malpedia 使用 平台.家族名称 格式(如 win.emotet、elf.mirai、apk.flubot)。平台包括 win(Windows)、elf(Linux)、apk(Android)、osx(macOS)和 py(Python)。这种标准化命名解决了不同厂商对同一恶意软件使用不同名称的"多名问题"。
恶意软件家族之间存在以下关系:父子关系(代码复用、分叉)、加载器-载荷关系(Emotet 加载 TrickBot 加载 Ryuk)、共同作者关系(同一威胁行为者开发多种工具)以及基础设施共享(共同 C2 框架)。
import requests
import json
from collections import defaultdict
class MalpediaClient:
BASE_URL = "https://malpedia.caad.fkie.fraunhofer.de/api"
def __init__(self, api_key):
self.headers = {"Authorization": f"apitoken {api_key}"}
def get_family_list(self):
"""获取所有恶意软件家族列表。"""
resp = requests.get(f"{self.BASE_URL}/list/families",
headers=self.headers, timeout=30)
if resp.status_code == 200:
families = resp.json()
print(f"[+] Malpedia: {len(families)} malware families")
return families
return {}
def get_family_info(self, family_name):
"""获取恶意软件家族的详细信息。"""
resp = requests.get(f"{self.BASE_URL}/get/family/{family_name}",
headers=self.headers, timeout=30)
if resp.status_code == 200:
info = resp.json()
print(f"[+] Family: {family_name}")
print(f" Aliases: {info.get('alt_names', [])}")
print(f" Actors: {[a.get('value', '') for a in info.get('attribution', [])]}")
print(f" URLs: {len(info.get('urls', []))} references")
return info
print(f"[-] Family not found: {family_name}")
return None
def get_family_yara(self, family_name):
"""获取恶意软件家族的 YARA 规则。"""
resp = requests.get(f"{self.BASE_URL}/get/yara/{family_name}",
headers=self.headers, timeout=30)
if resp.status_code == 200:
rules = resp.json()
rule_count = sum(len(v) for v in rules.values()) if isinstance(rules, dict) else 0
print(f"[+] YARA rules for {family_name}: {rule_count} rules")
return rules
return {}
def get_actor_families(self, actor_name):
"""获取与威胁行为者关联的恶意软件家族。"""
resp = requests.get(f"{self.BASE_URL}/get/actor/{actor_name}",
headers=self.headers, timeout=30)
if resp.status_code == 200:
data = resp.json()
families = data.get("families", {})
print(f"[+] {actor_name}: {len(families)} malware families")
return data
return {}
def search_families(self, keyword):
"""按关键词搜索家族。"""
all_families = self.get_family_list()
matches = {
name: info for name, info in all_families.items()
if keyword.lower() in name.lower()
or keyword.lower() in str(info.get("alt_names", [])).lower()
}
print(f"[+] Search '{keyword}': {len(matches)} matches")
return matches
client = MalpediaClient("YOUR_MALPEDIA_API_KEY")
families = client.get_family_list()
emotet_info = client.get_family_info("win.emotet")
class MalwareFamilyMapper:
def __init__(self, malpedia_client):
self.client = malpedia_client
self.relationship_graph = defaultdict(list)
def map_actor_ecosystem(self, actor_name):
"""映射威胁行为者使用的恶意软件生态系统。"""
actor_data = self.client.get_actor_families(actor_name)
families = actor_data.get("families", {})
ecosystem = {
"actor": actor_name,
"families": [],
"family_count": len(families),
}
for family_name in families:
info = self.client.get_family_info(family_name)
if info:
ecosystem["families"].append({
"name": family_name,
"aliases": info.get("alt_names", []),
"description": info.get("description", "")[:200],
"shared_actors": [
a.get("value", "")
for a in info.get("attribution", [])
],
"reference_count": len(info.get("urls", [])),
})
print(f"\n=== {actor_name} 恶意软件生态系统 ===")
for fam in ecosystem["families"]:
shared = [a for a in fam["shared_actors"] if a != actor_name]
print(f" {fam['name']}")
print(f" 别名: {fam['aliases'][:5]}")
if shared:
print(f" 同时被以下使用: {shared}")
return ecosystem
def find_shared_tooling(self, actor_names):
"""发现威胁行为者之间共享的恶意软件家族。"""
actor_families = {}
for actor in actor_names:
data = self.client.get_actor_families(actor)
actor_families[actor] = set(data.get("families", {}).keys())
# 发现重叠
shared = {}
for i, actor1 in enumerate(actor_names):
for actor2 in actor_names[i+1:]:
common = actor_families[actor1] & actor_families[actor2]
if common:
shared[f"{actor1} <-> {actor2}"] = sorted(common)
print(f"\n=== 共享工具分析 ===")
for pair, families in shared.items():
print(f" {pair}: {len(families)} 个共享家族")
for f in families[:5]:
print(f" - {f}")
return shared
def build_loader_payload_chain(self, family_name):
"""构建家族的加载器-载荷投递链。"""
info = self.client.get_family_info(family_name)
if not info:
return {}
chain = {
"family": family_name,
"description": info.get("description", ""),
"known_loaders": [],
"known_payloads": [],
}
# 已知投递链
known_chains = {
"win.emotet": {"loaders": ["email/macro"], "payloads": ["win.trickbot", "win.qakbot", "win.cobalt_strike"]},
"win.trickbot": {"loaders": ["win.emotet"], "payloads": ["win.ryuk", "win.conti", "win.cobalt_strike"]},
"win.qakbot": {"loaders": ["email/macro", "win.emotet"], "payloads": ["win.cobalt_strike", "win.blackbasta"]},
"win.cobalt_strike": {"loaders": ["win.emotet", "win.trickbot", "win.qakbot"], "payloads": ["ransomware"]},
}
if family_name in known_chains:
chain["known_loaders"] = known_chains[family_name]["loaders"]
chain["known_payloads"] = known_chains[family_name]["payloads"]
return chain
mapper = MalwareFamilyMapper(client)
ecosystem = mapper.map_actor_ecosystem("Wizard Spider")
shared = mapper.find_shared_tooling(["Wizard Spider", "FIN7", "Lazarus Group"])
chain = mapper.build_loader_payload_chain("win.emotet")
def compile_yara_ruleset(client, family_names, output_file="malware_yara_rules.yar"):
"""为多个恶意软件家族编译 YARA 规则。"""
all_rules = []
for family in family_names:
yara_data = client.get_family_yara(family)
if isinstance(yara_data, dict):
for source, rules in yara_data.items():
if isinstance(rules, list):
for rule in rules:
all_rules.append(f"// Source: {source} - Family: {family}\n{rule}")
elif isinstance(rules, str):
all_rules.append(f"// Source: {source} - Family: {family}\n{rules}")
with open(output_file, "w") as f:
f.write(f"// Malpedia YARA Rules - {len(all_rules)} rules\n")
f.write(f"// Families: {', '.join(family_names)}\n\n")
for rule in all_rules:
f.write(rule + "\n\n")
print(f"[+] 已编译 {len(all_rules)} 条 YARA 规则到 {output_file}")
return all_rules
compile_yara_ruleset(client, ["win.emotet", "win.trickbot", "win.cobalt_strike"])