Builds automated pipeline to defang IOCs (URLs, IPs, domains, emails) for safe sharing, normalize and deduplicate them, convert to STIX 2.1, and distribute via TAXII, MISP, or email.
npx claudepluginhub killvxk/cybersecurity-skills-zhThis skill uses the workspace's default tool permissions.
IOC 去危化(Defanging)将潜在恶意指标(URL、IP 地址、域名、电子邮件地址)进行修改,防止意外点击或执行,同时保持可读性以便分析和共享。本技能涵盖构建自动化流水线:从多个来源摄取原始 IOC、规范化和去重、对人工查阅进行去危化处理、转换为 STIX 2.1 格式以便机器处理,并通过 TAXII 服务器、MISP 实例和邮件报告进行分发。
Builds automated pipeline to defang IOCs (URLs, IPs, domains, emails) for safe sharing, normalize/deduplicate, convert to STIX 2.1, and distribute via TAXII/MISP/email reports.
Builds Python pipeline to defang IOCs (URLs, IPs, domains, emails), normalize/deduplicate, convert to STIX 2.1, and share via TAXII/MISP/email reports. For threat intel automation.
Collects, categorizes, and distributes Indicators of Compromise (IOCs) during/after security incidents for detection, blocking, and threat intel sharing. Covers network, host, email, behavioral IOCs using STIX/TAXII and MISP.
Share bugs, ideas, or general feedback.
IOC 去危化(Defanging)将潜在恶意指标(URL、IP 地址、域名、电子邮件地址)进行修改,防止意外点击或执行,同时保持可读性以便分析和共享。本技能涵盖构建自动化流水线:从多个来源摄取原始 IOC、规范化和去重、对人工查阅进行去危化处理、转换为 STIX 2.1 格式以便机器处理,并通过 TAXII 服务器、MISP 实例和邮件报告进行分发。
defang、ioc-fanger、stix2、requests、validators 库去危化替换活跃协议和域名组件以防止执行:http:// 变为 hxxp://,https:// 变为 hxxps://,域名/IP 中的点变为 [.],邮件中的 @ 变为 [@]。这对于在报告、邮件、Slack 频道和粘贴站点中共享 IOC 至关重要——在这些场景下自动链接可能触发对恶意基础设施的网络连接。
来自不同来源的原始 IOC 格式不一致。规范化包括:转换为小写、去除尾部斜杠和空白、从 URL 中提取域名、解析 URL 编码、验证格式正确性,以及跨来源去重。
STIX 模式以标准化格式表达 IOC:[ipv4-addr:value = '203.0.113.1']、[domain-name:value = 'malicious.example.com']、[url:value = 'http://evil.com/payload']、[file:hashes.'SHA-256' = 'abc123...']。每个指标包含 valid_from、indicator_types、confidence 和可选的 TLP 标记。
import re
import hashlib
from urllib.parse import urlparse, unquote
from datetime import datetime
class IOCExtractor:
"""从文本中提取并规范化 IOC。"""
PATTERNS = {
"ipv4": r'\b(?:(?:25[0-5]|2[0-4]\d|1\d{2}|[1-9]?\d)\.){3}(?:25[0-5]|2[0-4]\d|1\d{2}|[1-9]?\d)\b',
"domain": r'\b(?:[a-zA-Z0-9](?:[a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?\.)+[a-zA-Z]{2,}\b',
"url": r'https?://[^\s<>"{}|\\^`\[\]]+',
"email": r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b',
"md5": r'\b[a-fA-F0-9]{32}\b',
"sha1": r'\b[a-fA-F0-9]{40}\b',
"sha256": r'\b[a-fA-F0-9]{64}\b',
}
WHITELIST_DOMAINS = {
"google.com", "microsoft.com", "amazon.com", "github.com",
"cloudflare.com", "akamai.com", "example.com",
}
def extract_from_text(self, text):
"""从自由文本中提取所有类型的 IOC。"""
# 先对已去危化的指标进行还原
text = self._refang(text)
iocs = {"ipv4": set(), "domain": set(), "url": set(),
"email": set(), "md5": set(), "sha1": set(), "sha256": set()}
for ioc_type, pattern in self.PATTERNS.items():
matches = re.findall(pattern, text)
for match in matches:
normalized = self._normalize(match, ioc_type)
if normalized and not self._is_whitelisted(normalized, ioc_type):
iocs[ioc_type].add(normalized)
# 移除已包含在 URL 中的域名
url_domains = set()
for url in iocs["url"]:
parsed = urlparse(url)
url_domains.add(parsed.netloc)
iocs["domain"] -= url_domains
total = sum(len(v) for v in iocs.values())
print(f"[+] 从文本中提取到 {total} 个唯一 IOC")
return {k: sorted(v) for k, v in iocs.items()}
def _refang(self, text):
"""将去危化后的指标还原为活跃形式。"""
text = text.replace("hxxp://", "http://").replace("hxxps://", "https://")
text = text.replace("[.]", ".").replace("[@]", "@")
text = text.replace("[://]", "://").replace("(.)", ".")
return text
def _normalize(self, value, ioc_type):
"""规范化 IOC 值。"""
value = value.strip().lower()
if ioc_type == "url":
value = unquote(value).rstrip("/")
elif ioc_type == "domain":
value = value.rstrip(".")
return value
def _is_whitelisted(self, value, ioc_type):
"""检查 IOC 是否在白名单中。"""
if ioc_type == "domain":
return value in self.WHITELIST_DOMAINS
if ioc_type == "url":
parsed = urlparse(value)
return parsed.netloc in self.WHITELIST_DOMAINS
return False
extractor = IOCExtractor()
sample_text = """
恶意软件 C2: hxxps://evil-domain[.]com/beacon
从 192.168.1.100 下载载荷并联系 10[.]0[.]0[.]1
SHA256: 275a021bbfb6489e54d471899f7db9d1663fc695ec2fe2a2c4538aabf651fd0f
钓鱼邮件来自 attacker[@]phishing-domain[.]com
"""
iocs = extractor.extract_from_text(sample_text)
class IOCDefanger:
"""对 IOC 进行去危化处理,用于报告和通信中的安全共享。"""
def defang_url(self, url):
return url.replace("http://", "hxxp://").replace("https://", "hxxps://").replace(".", "[.]")
def defang_domain(self, domain):
return domain.replace(".", "[.]")
def defang_ip(self, ip):
return ip.replace(".", "[.]")
def defang_email(self, email):
return email.replace("@", "[@]").replace(".", "[.]")
def defang_all(self, iocs):
"""对字典中的所有 IOC 进行去危化处理。"""
defanged = {}
for ioc_type, values in iocs.items():
if ioc_type == "url":
defanged[ioc_type] = [self.defang_url(v) for v in values]
elif ioc_type == "domain":
defanged[ioc_type] = [self.defang_domain(v) for v in values]
elif ioc_type == "ipv4":
defanged[ioc_type] = [self.defang_ip(v) for v in values]
elif ioc_type == "email":
defanged[ioc_type] = [self.defang_email(v) for v in values]
else:
defanged[ioc_type] = values # 哈希值无需去危化
return defanged
def generate_sharing_report(self, iocs, defanged, report_name="IOC 报告"):
"""生成人类可读的去危化 IOC 报告。"""
report = f"# {report_name}\n"
report += f"生成时间: {datetime.now().isoformat()}\n\n"
for ioc_type in ["url", "domain", "ipv4", "email", "sha256", "sha1", "md5"]:
values = defanged.get(ioc_type, [])
if values:
report += f"## {ioc_type.upper()} ({len(values)} 个)\n"
for v in values:
report += f"- `{v}`\n"
report += "\n"
return report
defanger = IOCDefanger()
defanged = defanger.defang_all(iocs)
report = defanger.generate_sharing_report(iocs, defanged, "恶意软件攻击活动 IOC")
print(report)
from stix2 import Indicator, Bundle, TLP_WHITE, TLP_GREEN, TLP_AMBER
from datetime import datetime
class STIXConverter:
"""将原始 IOC 转换为 STIX 2.1 指标对象。"""
TLP_MAP = {"white": TLP_WHITE, "green": TLP_GREEN, "amber": TLP_AMBER}
def iocs_to_stix(self, iocs, tlp="green", confidence=75):
"""将 IOC 字典转换为 STIX 2.1 bundle。"""
stix_objects = []
marking = self.TLP_MAP.get(tlp, TLP_GREEN)
for ip in iocs.get("ipv4", []):
stix_objects.append(Indicator(
name=f"恶意 IP: {ip}",
pattern=f"[ipv4-addr:value = '{ip}']",
pattern_type="stix",
valid_from=datetime.now(),
indicator_types=["malicious-activity"],
confidence=confidence,
object_marking_refs=[marking],
))
for domain in iocs.get("domain", []):
stix_objects.append(Indicator(
name=f"恶意域名: {domain}",
pattern=f"[domain-name:value = '{domain}']",
pattern_type="stix",
valid_from=datetime.now(),
indicator_types=["malicious-activity"],
confidence=confidence,
object_marking_refs=[marking],
))
for url in iocs.get("url", []):
escaped = url.replace("'", "\\'")
stix_objects.append(Indicator(
name=f"恶意 URL: {url[:60]}",
pattern=f"[url:value = '{escaped}']",
pattern_type="stix",
valid_from=datetime.now(),
indicator_types=["malicious-activity"],
confidence=confidence,
object_marking_refs=[marking],
))
for sha256 in iocs.get("sha256", []):
stix_objects.append(Indicator(
name=f"恶意文件哈希: {sha256[:16]}...",
pattern=f"[file:hashes.'SHA-256' = '{sha256}']",
pattern_type="stix",
valid_from=datetime.now(),
indicator_types=["malicious-activity"],
confidence=confidence,
object_marking_refs=[marking],
))
bundle = Bundle(objects=stix_objects)
print(f"[+] 已创建包含 {len(stix_objects)} 个指标的 STIX bundle")
return bundle
converter = STIXConverter()
stix_bundle = converter.iocs_to_stix(iocs, tlp="amber", confidence=80)
with open("iocs_stix_bundle.json", "w") as f:
f.write(stix_bundle.serialize(pretty=True))
import requests
import json
class IOCDistributor:
"""通过各种渠道分发 IOC。"""
def push_to_misp(self, iocs, misp_url, misp_key, event_info):
"""将 IOC 作为新事件推送到 MISP。"""
headers = {
"Authorization": misp_key,
"Content-Type": "application/json",
"Accept": "application/json",
}
event = {
"Event": {
"info": event_info,
"distribution": "1", # 仅限本社区
"threat_level_id": "2", # 中等
"analysis": "2", # 已完成
"Attribute": [],
}
}
type_mapping = {
"ipv4": "ip-dst",
"domain": "domain",
"url": "url",
"email": "email-src",
"md5": "md5",
"sha1": "sha1",
"sha256": "sha256",
}
for ioc_type, values in iocs.items():
misp_type = type_mapping.get(ioc_type)
if misp_type:
for value in values:
event["Event"]["Attribute"].append({
"type": misp_type,
"value": value,
"category": "Network activity" if ioc_type in ("ipv4", "domain", "url") else "Payload delivery",
"to_ids": True,
})
resp = requests.post(
f"{misp_url}/events",
headers=headers,
json=event,
verify=False,
)
if resp.status_code == 200:
event_id = resp.json().get("Event", {}).get("id", "")
print(f"[+] MISP 事件已创建: {event_id}")
return event_id
else:
print(f"[-] MISP 错误: {resp.status_code} - {resp.text[:200]}")
return None
def push_to_taxii(self, stix_bundle, taxii_url, collection_id, username, password):
"""将 STIX bundle 推送到 TAXII 2.1 集合。"""
from taxii2client.v21 import Collection
collection = Collection(
f"{taxii_url}/collections/{collection_id}/",
user=username, password=password,
)
response = collection.add_objects(stix_bundle.serialize())
print(f"[+] TAXII: 已发布 bundle,状态: {response.status}")
return response
distributor = IOCDistributor()
distributor.push_to_misp(
iocs,
misp_url="https://misp.organization.com",
misp_key="YOUR_MISP_API_KEY",
event_info="恶意软件攻击活动 IOC - 2025",
)