Implements Splunk SOAR (Phantom) workflows to automate SOC alert triage, IOC enrichment, containment actions, and incident response playbooks. For reducing manual work and integrating tools like VirusTotal, CrowdStrike.
npx claudepluginhub killvxk/cybersecurity-skills-zhThis skill uses the workspace's default tool permissions.
以下情况使用本技能:
Implements SOAR workflows in Splunk SOAR (Phantom) for SOC alert triage, IOC enrichment, containment, and incident response playbooks. Automates repetitive tasks and integrates security tools.
Implements SOAR workflows using Splunk SOAR (Phantom) to automate SOC alert triage, IOC enrichment, containment actions, and incident response playbooks. For reducing manual analyst work and integrating security tools.
Automates phishing incident response using Splunk SOAR REST API: parses emails for IOCs, creates containers and artifacts, triggers investigation playbooks, polls and summarizes results.
Share bugs, ideas, or general feedback.
以下情况使用本技能:
不适用于无人类审批门控的全自动遏制——高影响操作(如禁用账户或主机隔离)必须包含分析师决策点。
通过 SOAR 应用设置与安全工具的集成:
VirusTotal 资产配置:
{
"app": "VirusTotal v3",
"asset_name": "virustotal_prod",
"configuration": {
"api_key": "YOUR_VT_API_KEY",
"rate_limit": true,
"max_requests_per_minute": 4
},
"product_vendor": "VirusTotal",
"product_name": "VirusTotal"
}
CrowdStrike Falcon 资产:
{
"app": "CrowdStrike Falcon",
"asset_name": "crowdstrike_prod",
"configuration": {
"client_id": "CS_CLIENT_ID",
"client_secret": "CS_CLIENT_SECRET",
"base_url": "https://api.crowdstrike.com"
}
}
Active Directory 资产:
{
"app": "Active Directory",
"asset_name": "ad_prod",
"configuration": {
"server": "dc01.company.com",
"username": "soar_service@company.com",
"password": "SERVICE_ACCOUNT_PASSWORD",
"ssl": true
}
}
用 Python(Phantom 剧本格式)创建自动化钓鱼响应剧本:
"""
钓鱼邮件分诊自动化剧本
触发条件:通过 Splunk ES notable 或邮件摄取报告的新钓鱼邮件
"""
import phantom.rules as phantom
import json
def on_start(container):
# 从容器中提取构件(URL、文件哈希、发件人)
artifacts = phantom.get_artifacts(container_id=container["id"])
for artifact in artifacts:
artifact_type = artifact.get("cef", {}).get("type", "")
if artifact_type == "url":
phantom.act("url reputation", targets=artifact,
assets=["virustotal_prod"],
callback=url_reputation_callback,
name="url_reputation")
elif artifact_type == "hash":
phantom.act("file reputation", targets=artifact,
assets=["virustotal_prod"],
callback=hash_reputation_callback,
name="file_reputation")
elif artifact_type == "ip":
phantom.act("ip reputation", targets=artifact,
assets=["virustotal_prod"],
callback=ip_reputation_callback,
name="ip_reputation")
def url_reputation_callback(action, success, container, results, handle):
if not success:
phantom.comment(container, "URL 信誉检查失败")
return
for result in results:
data = result.get("data", [{}])[0]
malicious_count = data.get("summary", {}).get("malicious", 0)
total_engines = data.get("summary", {}).get("total_engines", 0)
if malicious_count > 5:
# 高置信度恶意 — 自动封锁并升级
phantom.act("block url", targets=result,
assets=["palo_alto_prod"],
name="block_malicious_url")
phantom.set_severity(container, "high")
phantom.set_status(container, "open")
phantom.comment(container,
f"URL 被 {malicious_count}/{total_engines} 个引擎标记为恶意。"
f"已在防火墙封锁。正在升级到二级。")
# 创建 ServiceNow 工单
phantom.act("create ticket", targets=container,
assets=["servicenow_prod"],
parameters=[{
"short_description": f"钓鱼攻击 - 检测到恶意 URL",
"urgency": "2",
"impact": "2"
}],
name="create_incident_ticket")
elif malicious_count > 0:
# 中等置信度 — 请求分析师审查
phantom.promote(container, template="钓鱼调查")
phantom.comment(container,
f"URL 被 {malicious_count}/{total_engines} 个引擎标记。"
f"需要分析师审查。")
else:
# 干净 — 关闭并添加注释
phantom.set_status(container, "closed")
phantom.comment(container,
f"URL 无威胁:0/{total_engines} 个引擎标记。已自动关闭。")
def hash_reputation_callback(action, success, container, results, handle):
if not success:
return
for result in results:
data = result.get("data", [{}])[0]
positives = data.get("summary", {}).get("positives", 0)
if positives > 10:
# 已知恶意软件 — 隔离并封锁
phantom.act("quarantine device", targets=result,
assets=["crowdstrike_prod"],
name="isolate_endpoint")
phantom.set_severity(container, "high")
def ip_reputation_callback(action, success, container, results, handle):
if not success:
return
for result in results:
data = result.get("data", [{}])[0]
malicious = data.get("summary", {}).get("malicious", 0)
if malicious > 3:
phantom.act("block ip", targets=result,
assets=["palo_alto_prod"],
name="block_malicious_ip")
自动化所有传入 SIEM 告警的富化:
"""
通用告警富化剧本
对每个新事件运行,在分析师审查前添加上下文
"""
import phantom.rules as phantom
def on_start(container):
# 获取所有构件
success, message, artifacts = phantom.get_artifacts(
container_id=container["id"], full_data=True
)
ip_artifacts = [a for a in artifacts if a.get("cef", {}).get("sourceAddress")]
domain_artifacts = [a for a in artifacts if a.get("cef", {}).get("destinationDnsDomain")]
# 并行富化 IP
for artifact in ip_artifacts:
ip = artifact["cef"]["sourceAddress"]
# VirusTotal 查询
phantom.act("ip reputation",
parameters=[{"ip": ip}],
assets=["virustotal_prod"],
callback=enrich_ip_callback,
name=f"vt_ip_{ip}")
# GeoIP 查询
phantom.act("geolocate ip",
parameters=[{"ip": ip}],
assets=["maxmind_prod"],
callback=geoip_callback,
name=f"geo_{ip}")
# Whois 查询
phantom.act("whois ip",
parameters=[{"ip": ip}],
assets=["whois_prod"],
name=f"whois_{ip}")
# 富化域名
for artifact in domain_artifacts:
domain = artifact["cef"]["destinationDnsDomain"]
phantom.act("domain reputation",
parameters=[{"domain": domain}],
assets=["virustotal_prod"],
name=f"vt_domain_{domain}")
def enrich_ip_callback(action, success, container, results, handle):
"""使用富化数据更新容器"""
if success:
for result in results:
summary = result.get("summary", {})
phantom.add_artifact(container, {
"cef": {
"vt_malicious": summary.get("malicious", 0),
"vt_suspicious": summary.get("suspicious", 0),
"enrichment_source": "VirusTotal"
},
"label": "enrichment",
"name": "VT IP 富化"
})
为关键操作添加人工参与环节:
def containment_decision(action, success, container, results, handle):
"""向分析师展示遏制选项"""
phantom.prompt(
container=container,
user="soc_tier2",
message=(
"已确认恶意活动。\n"
f"主机:{container['artifacts'][0]['cef'].get('sourceAddress')}\n"
f"威胁:{results[0]['summary'].get('threat_name')}\n\n"
"选择遏制动作:"
),
respond_in_mins=15,
options=["隔离主机", "禁用账户", "两者都执行", "仅监控"],
callback=execute_containment
)
def execute_containment(action, success, container, results, handle):
response = results.get("response", "仅监控")
if response in ["隔离主机", "两者都执行"]:
phantom.act("quarantine device",
parameters=[{"hostname": container["artifacts"][0]["cef"]["sourceHostName"]}],
assets=["crowdstrike_prod"],
name="isolate_host")
if response in ["禁用账户", "两者都执行"]:
phantom.act("disable user",
parameters=[{"username": container["artifacts"][0]["cef"]["sourceUserName"]}],
assets=["ad_prod"],
name="disable_account")
phantom.comment(container, f"分析师已批准:{response}")
在 SOAR 中设置事件触发器:
{
"playbook_name": "phishing_triage_automation",
"trigger": {
"type": "event_created",
"conditions": {
"label": ["phishing", "notable"],
"severity": ["high", "medium"]
}
},
"active": true,
"run_as": "automation_user"
}
使用 SOAR 指标追踪自动化效果:
# 查询 SOAR API 获取剧本执行统计
import requests
headers = {"ph-auth-token": "YOUR_SOAR_TOKEN"}
response = requests.get(
"https://soar.company.com/rest/playbook_run",
headers=headers,
params={
"page_size": 100,
"filter": '{"status":"success"}',
"sort": "create_time",
"order": "desc"
}
)
runs = response.json()["data"]
# 计算自动化指标
total_runs = len(runs)
avg_duration = sum(r["end_time"] - r["start_time"] for r in runs) / total_runs
auto_closed = sum(1 for r in runs if r.get("auto_resolved"))
print(f"总执行次数:{total_runs}")
print(f"平均时长:{avg_duration:.1f}s")
print(f"自动解决:{auto_closed}/{total_runs} ({auto_closed/total_runs*100:.0f}%)")
| 术语 | 定义 |
|---|---|
| SOAR | 安全编排、自动化和响应(Security Orchestration, Automation, and Response)——整合安全工具与自动化剧本的平台 |
| 剧本(Playbook) | 定义由安全事件触发的顺序和并行动作的自动化工作流 |
| 资产(Asset) | SOAR 中已连接安全工具的配置(API 端点、凭据、连接参数) |
| 容器(Container) | 包含来自已摄取告警或事件的构件(IOC)的 SOAR 事件对象 |
| 构件(Artifact) | 容器内的单个 IOC 或数据点(IP、哈希、URL、域名、电子邮件) |
| 审批门控(Approval Gate) | 在执行高影响自动化操作前需要分析师决策的人工参与步骤 |
SOAR 剧本执行报告
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
剧本: 钓鱼分诊自动化 v2.3
容器: SOAR-2024-08921
触发条件: 来自 Splunk ES 的 notable 事件(钓鱼)
已执行动作:
[1] URL 信誉(VirusTotal) — 14/90 个引擎标记为恶意 [2.1s]
[2] IP 信誉(AbuseIPDB) — 置信度:85% [1.3s]
[3] 封锁 URL(Palo Alto) — 在 PA-5260 上封锁 [0.8s]
[4] 封锁 IP(Palo Alto) — 在 PA-5260 上封锁 [0.7s]
[5] 创建工单(ServiceNow) — INC0012345 已创建 [1.5s]
[6] 提示分析师(二级) — 响应:"隔离主机" [4m 12s]
[7] 隔离设备(CrowdStrike) — WORKSTATION-042 已隔离 [3.2s]
总时长: 4m 22s(对比手动分诊平均 35 分钟)
节省时间: 约 31 分钟
处置结果: 真阳性 — 已升级至事件响应