Automates IOC enrichment from VirusTotal, AbuseIPDB, Shodan, MISP for IPs, domains, URLs, hashes. Provides risk scores and disposition suggestions for SOC alert triage and incident investigations.
npx claudepluginhub killvxk/cybersecurity-skills-zhThis skill uses the workspace's default tool permissions.
以下情况使用本技能:
Automates IOC enrichment across VirusTotal, AbuseIPDB, Shodan, MISP for IPs, domains, URLs, hashes with risk scoring for SOC triage and investigations.
Automates IOC enrichment across VirusTotal, AbuseIPDB, Shodan, MISP for IPs, domains, URLs, hashes with contextual scoring. Use for SOC alert triage and investigations.
Automates IOC enrichment from threat intel sources like VirusTotal, Shodan, and MISP using Python pipelines or SOAR playbooks. For SIEM alert triage, phishing IOC batching, reducing analyst time.
Share bugs, ideas, or general feedback.
以下情况使用本技能:
不适用于未经分析师审查的批量封锁决策——丰富化提供上下文,而非确定性的恶意/良性判断。
requests、vt-py、shodan 库创建多源丰富化管道:
import requests
import vt
import shodan
import time
from dataclasses import dataclass, field
from typing import Optional
@dataclass
class EnrichmentResult:
ioc_value: str
ioc_type: str
virustotal: dict = field(default_factory=dict)
abuseipdb: dict = field(default_factory=dict)
shodan_data: dict = field(default_factory=dict)
greynoise: dict = field(default_factory=dict)
urlscan: dict = field(default_factory=dict)
misp_matches: list = field(default_factory=list)
risk_score: float = 0.0
disposition: str = "未知"
class IOCEnrichmentEngine:
def __init__(self, config):
self.vt_client = vt.Client(config["virustotal_key"])
self.shodan_api = shodan.Shodan(config["shodan_key"])
self.abuseipdb_key = config["abuseipdb_key"]
self.greynoise_key = config["greynoise_key"]
self.urlscan_key = config["urlscan_key"]
def enrich_ip(self, ip_address):
result = EnrichmentResult(ioc_value=ip_address, ioc_type="ip")
# VirusTotal
try:
vt_obj = self.vt_client.get_object(f"/ip_addresses/{ip_address}")
result.virustotal = {
"malicious": vt_obj.last_analysis_stats.get("malicious", 0),
"suspicious": vt_obj.last_analysis_stats.get("suspicious", 0),
"total_engines": sum(vt_obj.last_analysis_stats.values()),
"reputation": vt_obj.reputation,
"country": getattr(vt_obj, "country", "未知"),
"as_owner": getattr(vt_obj, "as_owner", "未知")
}
except Exception as e:
result.virustotal = {"error": str(e)}
# AbuseIPDB
try:
response = requests.get(
"https://api.abuseipdb.com/api/v2/check",
headers={"Key": self.abuseipdb_key, "Accept": "application/json"},
params={"ipAddress": ip_address, "maxAgeInDays": 90}
)
data = response.json()["data"]
result.abuseipdb = {
"confidence_score": data["abuseConfidenceScore"],
"total_reports": data["totalReports"],
"is_tor": data.get("isTor", False),
"usage_type": data.get("usageType", "未知"),
"isp": data.get("isp", "未知"),
"domain": data.get("domain", "未知")
}
except Exception as e:
result.abuseipdb = {"error": str(e)}
# Shodan
try:
host = self.shodan_api.host(ip_address)
result.shodan_data = {
"ports": host.get("ports", []),
"os": host.get("os", "未知"),
"organization": host.get("org", "未知"),
"isp": host.get("isp", "未知"),
"vulns": host.get("vulns", []),
"last_update": host.get("last_update", "未知")
}
except shodan.APIError:
result.shodan_data = {"status": "未在 Shodan 中找到"}
# GreyNoise
try:
response = requests.get(
f"https://api.greynoise.io/v3/community/{ip_address}",
headers={"key": self.greynoise_key}
)
gn_data = response.json()
result.greynoise = {
"classification": gn_data.get("classification", "unknown"),
"noise": gn_data.get("noise", False),
"riot": gn_data.get("riot", False),
"name": gn_data.get("name", "未知")
}
except Exception as e:
result.greynoise = {"error": str(e)}
# 计算综合风险评分
result.risk_score = self._calculate_ip_risk(result)
result.disposition = self._determine_disposition(result.risk_score)
return result
def enrich_domain(self, domain):
result = EnrichmentResult(ioc_value=domain, ioc_type="domain")
# VirusTotal
try:
vt_obj = self.vt_client.get_object(f"/domains/{domain}")
result.virustotal = {
"malicious": vt_obj.last_analysis_stats.get("malicious", 0),
"suspicious": vt_obj.last_analysis_stats.get("suspicious", 0),
"reputation": vt_obj.reputation,
"creation_date": getattr(vt_obj, "creation_date", "未知"),
"registrar": getattr(vt_obj, "registrar", "未知"),
"categories": getattr(vt_obj, "categories", {})
}
except Exception as e:
result.virustotal = {"error": str(e)}
# URLScan.io
try:
response = requests.get(
f"https://urlscan.io/api/v1/search/?q=domain:{domain}",
headers={"API-Key": self.urlscan_key}
)
scans = response.json().get("results", [])
result.urlscan = {
"total_scans": len(scans),
"verdicts": [s.get("verdicts", {}).get("overall", {}).get("malicious", False)
for s in scans[:5]],
"last_scan": scans[0]["task"]["time"] if scans else "从未扫描"
}
except Exception as e:
result.urlscan = {"error": str(e)}
result.risk_score = self._calculate_domain_risk(result)
result.disposition = self._determine_disposition(result.risk_score)
return result
def enrich_hash(self, file_hash):
result = EnrichmentResult(ioc_value=file_hash, ioc_type="hash")
# VirusTotal
try:
vt_obj = self.vt_client.get_object(f"/files/{file_hash}")
result.virustotal = {
"malicious": vt_obj.last_analysis_stats.get("malicious", 0),
"suspicious": vt_obj.last_analysis_stats.get("suspicious", 0),
"undetected": vt_obj.last_analysis_stats.get("undetected", 0),
"total_engines": sum(vt_obj.last_analysis_stats.values()),
"type_description": getattr(vt_obj, "type_description", "未知"),
"popular_threat_name": getattr(vt_obj, "popular_threat_classification", {}).get(
"suggested_threat_label", "未知"
),
"sandbox_verdicts": getattr(vt_obj, "sandbox_verdicts", {}),
"first_seen": getattr(vt_obj, "first_submission_date", "未知")
}
except vt.APIError:
result.virustotal = {"status": "未在 VirusTotal 中找到"}
# MalwareBazaar
try:
response = requests.post(
"https://mb-api.abuse.ch/api/v1/",
data={"query": "get_info", "hash": file_hash}
)
mb_data = response.json()
if mb_data["query_status"] == "ok":
entry = mb_data["data"][0]
result.abuseipdb = { # 复用字段存储 MalwareBazaar 数据
"malware_family": entry.get("signature", "未知"),
"tags": entry.get("tags", []),
"file_type": entry.get("file_type", "未知"),
"delivery_method": entry.get("delivery_method", "未知"),
"first_seen": entry.get("first_seen", "未知")
}
except Exception:
pass
result.risk_score = self._calculate_hash_risk(result)
result.disposition = self._determine_disposition(result.risk_score)
return result
def _calculate_ip_risk(self, result):
score = 0
vt = result.virustotal
abuse = result.abuseipdb
gn = result.greynoise
if isinstance(vt, dict) and "malicious" in vt:
score += min(vt["malicious"] * 3, 30)
if isinstance(abuse, dict) and "confidence_score" in abuse:
score += abuse["confidence_score"] * 0.3
if isinstance(gn, dict):
if gn.get("classification") == "malicious":
score += 20
elif gn.get("riot"):
score -= 20 # 已知良性服务
return min(max(score, 0), 100)
def _calculate_domain_risk(self, result):
score = 0
vt = result.virustotal
if isinstance(vt, dict) and "malicious" in vt:
score += min(vt["malicious"] * 4, 40)
if vt.get("reputation", 0) < -5:
score += 20
return min(max(score, 0), 100)
def _calculate_hash_risk(self, result):
score = 0
vt = result.virustotal
if isinstance(vt, dict) and "malicious" in vt:
total = vt.get("total_engines", 1)
detection_rate = vt["malicious"] / total if total > 0 else 0
score = detection_rate * 100
return min(max(score, 0), 100)
def _determine_disposition(self, risk_score):
if risk_score >= 70:
return "恶意——建议封锁"
elif risk_score >= 40:
return "可疑——监控并调查"
elif risk_score >= 10:
return "低风险——可能良性,验证上下文"
else:
return "干净——无恶意活动指标"
def close(self):
self.vt_client.close()
# 处理事件中的多个 IOC
iocs = [
{"type": "ip", "value": "185.234.218.50"},
{"type": "domain", "value": "evil-c2-server.com"},
{"type": "hash", "value": "a1b2c3d4e5f6..."},
{"type": "ip", "value": "45.33.32.156"},
]
config = {
"virustotal_key": "YOUR_VT_KEY",
"shodan_key": "YOUR_SHODAN_KEY",
"abuseipdb_key": "YOUR_ABUSEIPDB_KEY",
"greynoise_key": "YOUR_GREYNOISE_KEY",
"urlscan_key": "YOUR_URLSCAN_KEY"
}
engine = IOCEnrichmentEngine(config)
results = []
for ioc in iocs:
if ioc["type"] == "ip":
result = engine.enrich_ip(ioc["value"])
elif ioc["type"] == "domain":
result = engine.enrich_domain(ioc["value"])
elif ioc["type"] == "hash":
result = engine.enrich_hash(ioc["value"])
results.append(result)
time.sleep(15) # VT 免费 API 速率限制
engine.close()
# 打印摘要
for r in results:
print(f"{r.ioc_type}: {r.ioc_value}")
print(f" 风险评分:{r.risk_score}")
print(f" 处置:{r.disposition}")
print()
创建 Splunk 自定义搜索命令用于内联丰富化:
index=notable sourcetype="stash"
| table src_ip, dest_ip, file_hash, url
| lookup threat_intel_ip_lookup ip AS src_ip OUTPUT vt_score, abuse_score, disposition
| lookup threat_intel_hash_lookup hash AS file_hash OUTPUT vt_detections, malware_family
| eval combined_risk = coalesce(vt_score, 0) + coalesce(abuse_score, 0)
| where combined_risk > 50
| sort - combined_risk
def generate_enrichment_report(results):
report = []
report.append("IOC 丰富化报告")
report.append("=" * 60)
for r in sorted(results, key=lambda x: x.risk_score, reverse=True):
report.append(f"\n{r.ioc_type.upper()}:{r.ioc_value}")
report.append(f" 风险评分:{r.risk_score}/100")
report.append(f" 处置:{r.disposition}")
if r.virustotal and "malicious" in r.virustotal:
report.append(f" VirusTotal:{r.virustotal['malicious']}/{r.virustotal.get('total_engines', 'N/A')} 恶意")
if r.abuseipdb and "confidence_score" in r.abuseipdb:
report.append(f" AbuseIPDB:{r.abuseipdb['confidence_score']}% 置信度,{r.abuseipdb['total_reports']} 条报告")
if r.greynoise and "classification" in r.greynoise:
report.append(f" GreyNoise:{r.greynoise['classification']}")
if r.shodan_data and "ports" in r.shodan_data:
report.append(f" Shodan:端口 {r.shodan_data['ports']},组织:{r.shodan_data.get('organization', 'N/A')}")
return "\n".join(report)
| 术语 | 定义 |
|---|---|
| IOC 丰富化(IOC Enrichment) | 从多个外部来源向原始指标添加上下文情报的过程 |
| 综合风险评分(Composite Risk Score) | 结合多个情报来源的加权聚合评分,用于处置决策 |
| 速率限制(Rate Limiting) | API 请求限制,需要节流(VT 免费版:每分钟 4 次,AbuseIPDB:每天 1000 次) |
| GreyNoise RIOT | Rule It Out——GreyNoise 已知良性服务数据集,用于减少误报 |
| 被动 DNS(Passive DNS) | 显示域名到 IP 映射历史的 DNS 解析历史数据 |
| 去武装化(Defanging) | 修改 IOC 以在报告中安全处理(evil.com 变为 evil[.]com) |
IOC 丰富化报告 — IR-2024-0450
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
丰富化时间:2024-03-15 14:30 UTC
处理 IOC 数:4
IP:185.234.218[.]50
风险评分: 87/100——恶意
VirusTotal:14/90 个引擎标记为恶意
AbuseIPDB: 92% 置信度,347 条报告
Shodan: 端口 [22, 80, 443, 4444],组织:防弹主机
GreyNoise: 恶意——已知 C2 基础设施
操作: 立即封锁
域名:evil-c2-server[.]com
风险评分: 73/100——恶意
VirusTotal:8/90 个引擎标记
URLScan: 5 次扫描,4 个恶意判定
WHOIS: 3 天前通过 Namecheap 注册
操作: 封锁并添加到 DNS 黑洞
哈希:a1b2c3d4e5f6...
风险评分: 91/100——恶意
VirusTotal:52/72 个引擎(Cobalt Strike Beacon)
MalwareBazaar:标签:cobalt-strike、beacon、c2
操作: 封锁哈希,隔离受影响终端
IP:45.33.32[.]156
风险评分: 5/100——干净
VirusTotal:0/90 个引擎
GreyNoise: 良性——Shodan 扫描器
操作: 无需操作(已知扫描器)