Builds automated threat intelligence pipelines integrating STIX/TAXII sources, open-source feeds like Abuse.ch, and commercial platforms into SIEM for real-time IOC matching and alerting. For SOC teams standardizing and distributing TI.
npx claudepluginhub killvxk/cybersecurity-skills-zhThis skill uses the workspace's default tool permissions.
以下情况使用本技能:
Builds pipelines integrating STIX/TAXII threat intel feeds with SIEM tools for automated ingestion, normalization, IOC matching, and alerting in SOC operations.
Builds pipelines integrating STIX/TAXII feeds, open-source and commercial threat intel into SIEM/security tools for real-time IOC matching and alerting. For SOC teams automating TI ingestion, normalization, scoring, and distribution.
Analyzes structured and unstructured CTI feeds to extract actionable IOCs, adversary tactics, and attack context. For importing feeds, STIX 2.1 normalization, quality assessment, and IOC enrichment.
Share bugs, ideas, or general feedback.
以下情况使用本技能:
不适用于手动 IOC 查询——对于临时查询,请使用专用富化工具(VirusTotal、AbuseIPDB)。
taxii2-client、stix2 Python 包)按类型、格式和更新频率映射可用源:
| 情报源 | 格式 | IOC 类型 | 更新频率 | 费用 |
|---|---|---|---|---|
| AlienVault OTX | STIX/JSON | IP、域名、哈希、URL | 实时 | 免费 |
| Abuse.ch URLhaus | CSV/JSON | URL、域名 | 每 5 分钟 | 免费 |
| Abuse.ch MalwareBazaar | JSON API | 文件哈希 | 实时 | 免费 |
| CISA AIS | STIX/TAXII 2.1 | 全类型 | 每日 | 免费(美国政府) |
| CrowdStrike Intel | STIX/JSON | 全类型 + 威胁行为者 TTP | 实时 | 商业 |
| Mandiant Advantage | STIX 2.1 | 全类型 + 报告 | 实时 | 商业 |
连接到 TAXII 2.1 服务器并下载指标:
from taxii2client.v21 import Server, Collection
from stix2 import parse
# 连接到 TAXII 服务器(示例:CISA AIS)
server = Server(
"https://taxii.cisa.gov/taxii2/",
user="your_username",
password="your_password"
)
# 列出可用集合
for api_root in server.api_roots:
print(f"API Root: {api_root.title}")
for collection in api_root.collections:
print(f" Collection: {collection.title} (ID: {collection.id})")
# 从集合获取指标
collection = Collection(
"https://taxii.cisa.gov/taxii2/collections/COLLECTION_ID/",
user="your_username",
password="your_password"
)
# 获取过去 24 小时添加的指标
from datetime import datetime, timedelta
added_after = (datetime.utcnow() - timedelta(days=1)).strftime("%Y-%m-%dT%H:%M:%S.000Z")
response = collection.get_objects(added_after=added_after, type=["indicator"])
for obj in response.get("objects", []):
indicator = parse(obj)
print(f"Type: {indicator.type}")
print(f"Pattern: {indicator.pattern}")
print(f"Valid Until: {indicator.valid_until}")
print(f"Confidence: {indicator.confidence}")
print("---")
Abuse.ch URLhaus 源:
import requests
import csv
from io import StringIO
# 下载 URLhaus 近期 URL
response = requests.get("https://urlhaus.abuse.ch/downloads/csv_recent/")
reader = csv.reader(StringIO(response.text), delimiter=',')
indicators = []
for row in reader:
if row[0].startswith("#"):
continue
indicators.append({
"id": row[0],
"dateadded": row[1],
"url": row[2],
"url_status": row[3],
"threat": row[5],
"tags": row[6]
})
print(f"从 URLhaus 接入了 {len(indicators)} 条 URL")
# 仅过滤活跃威胁
active = [i for i in indicators if i["url_status"] == "online"]
print(f"活跃威胁:{len(active)} 条")
AlienVault OTX Pulse 源:
from OTXv2 import OTXv2, IndicatorTypes
otx = OTXv2("YOUR_OTX_API_KEY")
# 获取订阅的 pulse(过去 24 小时)
pulses = otx.getall(modified_since="2024-03-14T00:00:00")
for pulse in pulses:
print(f"Pulse: {pulse['name']}")
print(f"Tags: {pulse['tags']}")
for indicator in pulse["indicators"]:
print(f" IOC: {indicator['indicator']} ({indicator['type']})")
Abuse.ch Feodo Tracker(C2 IP):
response = requests.get("https://feodotracker.abuse.ch/downloads/ipblocklist_recommended.json")
c2_data = response.json()
for entry in c2_data:
print(f"IP: {entry['ip_address']}:{entry['port']}")
print(f"Malware: {entry['malware']}")
print(f"First Seen: {entry['first_seen']}")
print(f"Last Online: {entry['last_online']}")
将所有源转换为 STIX 2.1 格式以实现标准化:
from stix2 import Indicator, Bundle
import hashlib
def create_stix_indicator(ioc_value, ioc_type, source, confidence=50):
"""将原始 IOC 转换为 STIX 2.1 指标"""
pattern_map = {
"ipv4": f"[ipv4-addr:value = '{ioc_value}']",
"domain": f"[domain-name:value = '{ioc_value}']",
"url": f"[url:value = '{ioc_value}']",
"sha256": f"[file:hashes.'SHA-256' = '{ioc_value}']",
"md5": f"[file:hashes.MD5 = '{ioc_value}']",
}
return Indicator(
name=f"{ioc_type}: {ioc_value}",
pattern=pattern_map[ioc_type],
pattern_type="stix",
valid_from="2024-03-15T00:00:00Z",
confidence=confidence,
labels=[source],
custom_properties={"x_source_feed": source}
)
# 跨源去重
seen_iocs = set()
unique_indicators = []
for ioc in all_collected_iocs:
ioc_hash = hashlib.sha256(f"{ioc['type']}:{ioc['value']}".encode()).hexdigest()
if ioc_hash not in seen_iocs:
seen_iocs.add(ioc_hash)
unique_indicators.append(
create_stix_indicator(ioc["value"], ioc["type"], ioc["source"])
)
bundle = Bundle(objects=unique_indicators)
print(f"唯一指标数:{len(unique_indicators)}")
推送至 Splunk ES 威胁情报:
import requests
splunk_url = "https://splunk.company.com:8089"
headers = {"Authorization": f"Bearer {splunk_token}"}
for indicator in unique_indicators:
# 从 STIX 模式中提取 IOC 值
ioc_value = indicator.pattern.split("'")[1]
# 上传至 Splunk ES 威胁情报集合
data = {
"ip": ioc_value,
"description": indicator.name,
"weight": indicator.confidence // 10,
"threat_key": indicator.id,
"source_feed": indicator.get("x_source_feed", "unknown")
}
requests.post(
f"{splunk_url}/services/data/threat_intel/item/ip_intel",
headers=headers, data=data, verify=False
)
推送至 MISP 进行集中管理:
from pymisp import PyMISP, MISPEvent, MISPAttribute
misp = PyMISP("https://misp.company.com", "YOUR_MISP_API_KEY")
# 为源批次创建事件
event = MISPEvent()
event.info = f"TI Feed Import - {datetime.now().strftime('%Y-%m-%d')}"
event.threat_level_id = 2 # 中等
event.analysis = 2 # 已完成
# 将指标作为属性添加
for ioc in unique_indicators:
attr = MISPAttribute()
attr.type = "ip-dst" if "ipv4" in ioc.pattern else "domain"
attr.value = ioc.pattern.split("'")[1]
attr.to_ids = True
attr.comment = f"Source: {ioc.get('x_source_feed', 'mixed')}"
event.add_attribute(**attr)
result = misp.add_event(event)
print(f"MISP 事件已创建:{result['Event']['id']}")
跟踪源有效性指标:
index=threat_intel sourcetype="threat_intel_manager"
| stats count AS total_iocs,
dc(threat_key) AS unique_iocs,
dc(source_feed) AS feed_count
by source_feed
| join source_feed [
search index=notable source="Threat Intelligence"
| stats count AS matches by source_feed
]
| eval match_rate = round(matches / unique_iocs * 100, 2)
| sort - match_rate
| table source_feed, unique_iocs, matches, match_rate
| 术语 | 定义 |
|---|---|
| STIX 2.1 | 结构化威胁信息表达——用于共享威胁情报对象的标准化 JSON 格式 |
| TAXII | 指标信息可信自动化交换——通过 REST API 共享 STIX 数据的传输协议 |
| TIP | 威胁情报平台——用于聚合、评分和分发威胁情报的集中系统 |
| IOC 评分(IOC Scoring) | 根据源可靠性和相互印证为指标分配可信度值的过程 |
| 源去重(Feed Deduplication) | 跨多个源删除重复 IOC,同时保留多源归因信息 |
| IOC 过期(IOC Expiration) | 删除过时指标的生存时间策略(IP:30 天,域名:90 天,哈希:1 年) |
威胁情报源状态 — 每日报告
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
日期: 2024-03-15
IOC 总数: 45,892 个活跃指标
源健康状态:
源名称 IOC 数 匹配数 匹配率 状态
Abuse.ch URLhaus 12,340 47 0.38% 健康
AlienVault OTX 18,567 23 0.12% 健康
Abuse.ch Feodo 1,203 12 1.00% 健康
CISA AIS 8,945 8 0.09% 健康
CrowdStrike Intel 4,837 31 0.64% 健康
今日操作:
新增 IOC: 1,247 条
过期 IOC: 892 条
删除重复: 156 条
SIEM 匹配: 121 个显著事件生成
误报: 3 个(CDN IP 已从源移除)