Enriches malware file hashes (MD5/SHA-1/SHA-256) via VirusTotal API v3, fetching detection stats, sandbox behavior, YARA/Sigma matches, and threat intel for SOC triage and IOC validation.
npx claudepluginhub killvxk/cybersecurity-skills-zhThis skill uses the workspace's default tool permissions.
VirusTotal 是全球最大的众包恶意软件语料库,使用 70 多个杀毒引擎扫描文件,并提供行为分析、YARA 规则匹配、网络指标和社区情报。本技能涵盖使用 VirusTotal API v3 富化文件哈希(MD5、SHA-1、SHA-256),获取检测判决、沙箱报告、相关指标和上下文情报,用于 SOC 分类、事件响应和威胁情报富化工作流。
Enrich malware file hashes using VirusTotal API v3 to retrieve detection rates, behavioral analysis, YARA matches, and threat intelligence for SOC triage, incident response, and IOC validation.
Enriches malware hashes with VirusTotal API data: AV detections, sandbox behaviors, YARA matches, IOCs, and threat intel for incident triage and validation.
Analyzes indicators of compromise (IOCs) including IP addresses, domains, file hashes, URLs, and email artifacts to determine malice confidence, threat attribution, and blocking priority. For triaging phishing emails, security alerts, or threat intel.
Share bugs, ideas, or general feedback.
VirusTotal 是全球最大的众包恶意软件语料库,使用 70 多个杀毒引擎扫描文件,并提供行为分析、YARA 规则匹配、网络指标和社区情报。本技能涵盖使用 VirusTotal API v3 富化文件哈希(MD5、SHA-1、SHA-256),获取检测判决、沙箱报告、相关指标和上下文情报,用于 SOC 分类、事件响应和威胁情报富化工作流。
vt-py(官方 VirusTotal Python 客户端)或 requests该 API 提供用于文件报告(/files/{hash})、URL 扫描、域名报告、IP 地址情报的 RESTful 端点,以及通过 VirusTotal Intelligence(VTI)进行高级猎捕。每份文件报告包含:70 多个杀毒引擎的检测结果、沙箱行为分析、YARA 规则匹配、Sigma 规则匹配、文件元数据(PE 头、导入表、节)、网络指标(联系的 IP、域名、URL),以及社区投票和评论。
典型富化流程:从告警/EDR 接收哈希 -> 查询 VT API -> 解析检测率 -> 提取行为指标 -> 与现有情报关联 -> 做出分类决策。API 返回 last_analysis_stats 对象,包含 malicious(恶意)、suspicious(可疑)、undetected(未检测到)和 harmless(无害)的计数。
VirusTotal 支持从单个哈希关联到相关情报:相似文件(ITW/野外样本)、联系的域名和 IP(C2 基础设施)、释放的文件、嵌入的 URL、YARA 规则匹配,以及通过众包情报的威胁行为者归因。
import vt
import json
import hashlib
from datetime import datetime
class VTEnricher:
def __init__(self, api_key):
self.client = vt.Client(api_key)
def enrich_hash(self, file_hash):
"""使用 VirusTotal 情报富化文件哈希。"""
try:
file_obj = self.client.get_object(f"/files/{file_hash}")
stats = file_obj.last_analysis_stats
report = {
"hash": file_hash,
"sha256": file_obj.sha256,
"sha1": file_obj.sha1,
"md5": file_obj.md5,
"file_type": getattr(file_obj, "type_description", "未知"),
"file_size": getattr(file_obj, "size", 0),
"first_submission": str(getattr(file_obj, "first_submission_date", "")),
"last_analysis_date": str(getattr(file_obj, "last_analysis_date", "")),
"detection_stats": {
"malicious": stats.get("malicious", 0),
"suspicious": stats.get("suspicious", 0),
"undetected": stats.get("undetected", 0),
"harmless": stats.get("harmless", 0),
},
"detection_ratio": f"{stats.get('malicious', 0)}/{sum(stats.values())}",
"popular_threat_names": getattr(file_obj, "popular_threat_classification", {}),
"tags": getattr(file_obj, "tags", []),
"names": getattr(file_obj, "names", []),
}
total_engines = sum(stats.values())
mal_count = stats.get("malicious", 0)
report["threat_level"] = (
"critical" if mal_count > total_engines * 0.7
else "high" if mal_count > total_engines * 0.4
else "medium" if mal_count > total_engines * 0.1
else "low" if mal_count > 0
else "clean"
)
print(f"[+] {file_hash[:16]}... -> {report['detection_ratio']} "
f"({report['threat_level'].upper()})")
return report
except vt.error.APIError as e:
print(f"[-] {file_hash} 的 VT API 错误:{e}")
return None
def get_behavior_report(self, file_hash):
"""获取文件的沙箱行为分析。"""
try:
behaviors = self.client.get_object(f"/files/{file_hash}/behaviours")
behavior_data = {
"processes_created": [],
"files_written": [],
"registry_keys_set": [],
"dns_lookups": [],
"http_conversations": [],
"mutexes_created": [],
"commands_executed": [],
}
for sandbox in getattr(behaviors, "data", []):
attrs = sandbox.get("attributes", {})
behavior_data["processes_created"].extend(
attrs.get("processes_created", []))
behavior_data["files_written"].extend(
[f.get("path", "") for f in attrs.get("files_written", [])])
behavior_data["registry_keys_set"].extend(
[r.get("key", "") for r in attrs.get("registry_keys_set", [])])
behavior_data["dns_lookups"].extend(
[d.get("hostname", "") for d in attrs.get("dns_lookups", [])])
behavior_data["commands_executed"].extend(
attrs.get("command_executions", []))
return behavior_data
except Exception as e:
print(f"[-] 行为报告错误:{e}")
return {}
def close(self):
self.client.close()
# 用法
enricher = VTEnricher("YOUR_VT_API_KEY")
report = enricher.enrich_hash("275a021bbfb6489e54d471899f7db9d1663fc695ec2fe2a2c4538aabf651fd0f")
print(json.dumps(report, indent=2, default=str))
enricher.close()
import time
import csv
def batch_enrich(api_key, hash_file, output_file, rate_limit=4):
"""从文件中读取哈希列表并执行速率限制富化。"""
enricher = VTEnricher(api_key)
results = []
with open(hash_file, "r") as f:
hashes = [line.strip() for line in f if line.strip()]
print(f"[*] 正在富化 {len(hashes)} 个哈希(速率:{rate_limit} 次/分钟)")
for i, file_hash in enumerate(hashes):
report = enricher.enrich_hash(file_hash)
if report:
results.append(report)
if (i + 1) % rate_limit == 0:
print(f" [{i+1}/{len(hashes)}] 速率限制暂停(60秒)...")
time.sleep(60)
# 导出为 CSV
with open(output_file, "w", newline="") as f:
if results:
writer = csv.DictWriter(f, fieldnames=results[0].keys())
writer.writeheader()
for r in results:
flat = {k: str(v) for k, v in r.items()}
writer.writerow(flat)
print(f"[+] 富化完成:{len(results)}/{len(hashes)} 个哈希")
print(f"[+] 结果已保存至 {output_file}")
enricher.close()
return results
batch_enrich("YOUR_API_KEY", "hashes.txt", "enrichment_results.csv")
def extract_network_iocs(api_key, file_hash):
"""从 VT 提取基于网络的 IOC 以识别 C2。"""
client = vt.Client(api_key)
network_iocs = {
"contacted_ips": [],
"contacted_domains": [],
"contacted_urls": [],
"embedded_urls": [],
}
try:
# 获取联系的 IP
it = client.iterator(f"/files/{file_hash}/contacted_ips")
for ip_obj in it:
network_iocs["contacted_ips"].append({
"ip": ip_obj.id,
"country": getattr(ip_obj, "country", ""),
"asn": getattr(ip_obj, "asn", 0),
"as_owner": getattr(ip_obj, "as_owner", ""),
})
# 获取联系的域名
it = client.iterator(f"/files/{file_hash}/contacted_domains")
for domain_obj in it:
network_iocs["contacted_domains"].append({
"domain": domain_obj.id,
"registrar": getattr(domain_obj, "registrar", ""),
"creation_date": str(getattr(domain_obj, "creation_date", "")),
})
# 获取联系的 URL
it = client.iterator(f"/files/{file_hash}/contacted_urls")
for url_obj in it:
network_iocs["contacted_urls"].append({
"url": url_obj.url,
"last_http_response_code": getattr(url_obj, "last_http_response_content_length", 0),
})
except Exception as e:
print(f"[-] 提取网络 IOC 时出错:{e}")
finally:
client.close()
print(f"[+] 网络 IOC:{len(network_iocs['contacted_ips'])} 个 IP,"
f"{len(network_iocs['contacted_domains'])} 个域名,"
f"{len(network_iocs['contacted_urls'])} 个 URL")
return network_iocs
def get_yara_matches(api_key, file_hash):
"""获取 YARA 规则匹配以进行威胁分类。"""
client = vt.Client(api_key)
try:
file_obj = client.get_object(f"/files/{file_hash}")
crowdsourced_yara = getattr(file_obj, "crowdsourced_yara_results", [])
matches = []
for rule in crowdsourced_yara:
matches.append({
"rule_name": rule.get("rule_name", ""),
"ruleset_name": rule.get("ruleset_name", ""),
"author": rule.get("author", ""),
"description": rule.get("description", ""),
"source": rule.get("source", ""),
})
# 基于 YARA 匹配进行分类
classifications = set()
for m in matches:
rule_lower = m["rule_name"].lower()
if any(k in rule_lower for k in ["apt", "nation", "state"]):
classifications.add("apt")
if any(k in rule_lower for k in ["ransom", "crypto"]):
classifications.add("ransomware")
if any(k in rule_lower for k in ["trojan", "rat", "backdoor"]):
classifications.add("trojan")
if any(k in rule_lower for k in ["loader", "dropper"]):
classifications.add("loader")
print(f"[+] YARA:{len(matches)} 条规则匹配")
print(f"[+] 分类:{classifications or {'未分类'}}")
return {"matches": matches, "classifications": list(classifications)}
finally:
client.close()
def generate_enrichment_report(hash_report, behavior, network, yara_data):
"""生成综合富化报告。"""
report = {
"metadata": {
"generated": datetime.now().isoformat(),
"hash": hash_report.get("sha256", ""),
},
"verdict": {
"threat_level": hash_report.get("threat_level", "unknown"),
"detection_ratio": hash_report.get("detection_ratio", "0/0"),
"classifications": yara_data.get("classifications", []),
"threat_names": hash_report.get("popular_threat_names", {}),
},
"behavioral_indicators": {
"processes": behavior.get("processes_created", [])[:10],
"dns_queries": behavior.get("dns_lookups", [])[:10],
"commands": behavior.get("commands_executed", [])[:10],
},
"network_indicators": {
"c2_candidates": network.get("contacted_ips", [])[:10],
"domains": network.get("contacted_domains", [])[:10],
},
"yara_matches": yara_data.get("matches", [])[:10],
"recommendation": (
"拦截并调查" if hash_report.get("threat_level") in ("critical", "high")
else "监控并分析" if hash_report.get("threat_level") == "medium"
else "低风险 - 继续监控"
),
}
with open(f"enrichment_{hash_report.get('sha256', 'unknown')[:16]}.json", "w") as f:
json.dump(report, f, indent=2, default=str)
return report