Builds automated threat intelligence enrichment pipelines in Splunk Enterprise Security using lookup tables, modular inputs, KV Store, and TI frameworks for IOC matching from TAXII, OTX, CSV sources. Useful for SOC analysts.
npx claudepluginhub killvxk/cybersecurity-skills-zhThis skill uses the workspace's default tool permissions.
Splunk Enterprise Security 中的威胁情报(Threat Intelligence)框架使 SOC 团队能够自动将失陷指标(IOC)与安全事件进行关联。该框架摄取威胁情报源,将指标规范化存储到 KV Store 集合中,并通过基于查询表的关联搜索标记匹配事件。Splunk 威胁情报管理集中整合来自多个来源的收集、规范化和富化流程,为分析师提供即时上下文,从而缩短分诊时间。
Build automated threat intelligence enrichment pipelines in Splunk Enterprise Security using lookup tables, modular inputs, and the Threat Intelligence Framework.
Builds automated threat intelligence enrichment pipelines in Splunk Enterprise Security using modular inputs, lookup tables, KV Store, and Threat Intelligence Framework for IOC correlation.
Builds automated threat intelligence pipelines integrating STIX/TAXII sources, open-source feeds like Abuse.ch, and commercial platforms into SIEM for real-time IOC matching and alerting. For SOC teams standardizing and distributing TI.
Share bugs, ideas, or general feedback.
Splunk Enterprise Security 中的威胁情报(Threat Intelligence)框架使 SOC 团队能够自动将失陷指标(IOC)与安全事件进行关联。该框架摄取威胁情报源,将指标规范化存储到 KV Store 集合中,并通过基于查询表的关联搜索标记匹配事件。Splunk 威胁情报管理集中整合来自多个来源的收集、规范化和富化流程,为分析师提供即时上下文,从而缩短分诊时间。
外部 TI 来源(STIX/TAXII、CSV、API)
|
v
模块化输入(下载并解析情报源)
|
v
KV Store 集合(规范化 IOC 存储)
|-- ip_intel
|-- domain_intel
|-- file_intel
|-- url_intel
|-- email_intel
|
v
威胁情报查询表
|
v
关联搜索(将事件与 IOC 匹配)
|
v
Notable 事件(已富化 TI 上下文)
# inputs.conf - TAXII 情报源配置
[threatlist://taxii_feed_example]
description = TAXII 2.1 Threat Feed
type = taxii
url = https://threatfeed.example.com/taxii2/
collection = threat-indicators-v21
polling_interval = 3600
api_key = <encrypted_api_key>
disabled = false
# inputs.conf - CSV 威胁列表
[threatlist://custom_blocklist]
description = 内部威胁封锁列表
type = csv
url = https://internal.company.com/threat-feeds/blocklist.csv
polling_interval = 1800
disabled = false
# bin/threatfeed_otx.py - OTX AlienVault 情报源采集器
import json
import sys
import requests
from splunklib.modularinput import Script, Scheme, Argument, Event
class OTXFeedInput(Script):
def get_scheme(self):
scheme = Scheme("OTX AlienVault 情报源")
scheme.description = "从 AlienVault OTX 采集 IOC"
scheme.use_external_validation = False
scheme.streaming_mode = Scheme.streaming_mode_xml
api_key_arg = Argument("api_key")
api_key_arg.data_type = Argument.data_type_string
api_key_arg.required_on_create = True
scheme.add_argument(api_key_arg)
pulse_days_arg = Argument("pulse_days")
pulse_days_arg.data_type = Argument.data_type_number
pulse_days_arg.required_on_create = False
scheme.add_argument(pulse_days_arg)
return scheme
def stream_events(self, inputs, ew):
for input_name, input_item in inputs.inputs.items():
api_key = input_item["api_key"]
pulse_days = int(input_item.get("pulse_days", 30))
headers = {"X-OTX-API-KEY": api_key}
url = f"https://otx.alienvault.com/api/v1/pulses/subscribed?modified_since={pulse_days}d"
try:
response = requests.get(url, headers=headers, timeout=60)
response.raise_for_status()
data = response.json()
for pulse in data.get("results", []):
for indicator in pulse.get("indicators", []):
event = Event()
event.stanza = input_name
event.data = json.dumps({
"indicator": indicator["indicator"],
"type": indicator["type"],
"pulse_name": pulse["name"],
"pulse_id": pulse["id"],
"description": indicator.get("description", ""),
"created": indicator.get("created", ""),
"threat_source": "OTX",
"confidence": pulse.get("adversary", "unknown"),
})
ew.write_event(event)
except requests.RequestException as e:
ew.log("ERROR", f"OTX 情报源采集失败:{str(e)}")
if __name__ == "__main__":
sys.exit(OTXFeedInput().run(sys.argv))
# collections.conf
[ip_threat_intel]
field.ip = string
field.threat_type = string
field.confidence = number
field.source = string
field.description = string
field.first_seen = time
field.last_seen = time
field.severity = string
[domain_threat_intel]
field.domain = string
field.threat_type = string
field.confidence = number
field.source = string
field.whois_registrar = string
field.whois_created = string
[file_hash_intel]
field.file_hash = string
field.hash_type = string
field.malware_family = string
field.confidence = number
field.source = string
field.detection_names = string
# transforms.conf
[ip_threat_intel_lookup]
external_type = kvstore
collection = ip_threat_intel
fields_list = ip, threat_type, confidence, source, description, severity
[domain_threat_intel_lookup]
external_type = kvstore
collection = domain_threat_intel
fields_list = domain, threat_type, confidence, source
[file_hash_intel_lookup]
external_type = kvstore
collection = file_hash_intel
fields_list = file_hash, hash_type, malware_family, confidence, source
| tstats summariesonly=true count from datamodel=Network_Traffic
where All_Traffic.action=allowed
by All_Traffic.src_ip, All_Traffic.dest_ip, All_Traffic.dest_port, _time span=5m
| rename "All_Traffic.*" as *
| lookup ip_threat_intel_lookup ip as dest_ip OUTPUT threat_type, confidence, source as ti_source, severity as ti_severity
| where isnotnull(threat_type)
| lookup asset_lookup ip as src_ip OUTPUT asset_name, asset_owner, asset_priority
| eval urgency=case(
ti_severity=="critical" AND asset_priority=="critical", "critical",
ti_severity=="high" OR asset_priority=="critical", "high",
ti_severity=="medium", "medium",
true(), "low"
)
| eval description="来自 ".src_ip." (".asset_name.") 向已知恶意 IP ".dest_ip." (".threat_type.") 发起连接 - 来源:".ti_source
index=dns sourcetype=stream:dns query_type=A OR query_type=AAAA
| lookup domain_threat_intel_lookup domain as query OUTPUT threat_type as domain_threat, confidence as domain_confidence, source as ti_source
| where isnotnull(domain_threat) AND domain_confidence > 70
| stats count dc(src_ip) as unique_sources values(src_ip) as source_ips by query, domain_threat, ti_source
| eval severity=case(domain_confidence > 90, "critical", domain_confidence > 70, "high", true(), "medium")
| eval description="来自 ".unique_sources." 台主机的 DNS 查询指向恶意域名 ".query." - 威胁类型:".domain_threat
index=endpoint sourcetype=sysmon EventCode=1
| lookup file_hash_intel_lookup file_hash as Hashes OUTPUT malware_family, confidence as hash_confidence, source as ti_source
| where isnotnull(malware_family)
| stats count values(ParentCommandLine) as parent_commands by Computer, User, Image, malware_family, ti_source
| eval severity="critical"
| eval description="已知恶意软件 ".malware_family." 在 ".Computer." 上由 ".User." 执行 - 二进制:".Image
index=firewall sourcetype=pan:traffic action=allowed
| eval indicators=mvappend(src_ip, dest_ip)
| mvexpand indicators
| lookup ip_threat_intel_lookup ip as indicators OUTPUT threat_type as ip_threat, confidence as ip_confidence, source as ip_ti_source
| lookup geo_ip_lookup ip as indicators OUTPUT country, city, latitude, longitude
| lookup whois_lookup ip as indicators OUTPUT org as ip_org, asn as ip_asn
| where isnotnull(ip_threat)
| stats count
values(ip_threat) as threat_types
values(ip_ti_source) as intel_sources
values(country) as countries
values(ip_org) as organizations
latest(_time) as last_seen
earliest(_time) as first_seen
by src_ip, dest_ip, dest_port
| eval enrichment_context="威胁:".mvjoin(threat_types, ", ")." | 地理位置:".mvjoin(countries, ", ")." | 机构:".mvjoin(organizations, ", ")
| inputlookup ip_threat_intel_lookup
| stats count by source, threat_type
| sort -count
| head 20
| inputlookup ip_threat_intel_lookup
| eval age_days=round((now() - strptime(last_seen, "%Y-%m-%dT%H:%M:%S")) / 86400, 0)
| stats count avg(age_days) as avg_age_days max(age_days) as max_age_days by source
| eval status=case(avg_age_days > 30, "过期", avg_age_days > 7, "老化中", true(), "新鲜")