Implements ticketing system integrating SIEM alerts with ServiceNow, Jira, or TheHive for SOC incident tracking, SLA management, escalation workflows, and compliance records. For teams automating incident lifecycle.
npx claudepluginhub killvxk/cybersecurity-skills-zhThis skill uses the workspace's default tool permissions.
以下情况使用本技能:
Implements ticketing system connecting SIEM alerts to ServiceNow, Jira, or TheHive for incident lifecycle management, SLA tracking, escalations, and compliance in SOC operations.
Implements incident ticketing system integrating SIEM alerts with ServiceNow, Jira, or TheHive for SOC tracking, SLA management, escalations, and compliance. For formalizing incident lifecycles.
Triages security incidents using IR playbooks: classifies alerts from SIEM/EDR, enriches IOCs with threat intel APIs, scores severity via matrix, assigns teams, starts responses. For SOC handling new alerts.
Share bugs, ideas, or general feedback.
以下情况使用本技能:
不适用于单个告警分诊——工单系统用于需要多步骤调查和修复的已确认事件,而非每一条 SIEM 告警。
建立标准化事件类别和严重程度:
incident_taxonomy:
categories:
- malware_infection
- phishing_campaign
- unauthorized_access
- data_exfiltration
- denial_of_service
- ransomware
- insider_threat
- vulnerability_exploitation
- account_compromise
- policy_violation
severity_levels:
critical:
definition: "活跃数据泄露、勒索软件或业务关键系统被入侵"
response_sla: 15 minutes
resolution_sla: 4 hours
escalation: 立即升级至 Tier 3 并通知 CISO
examples: ["活跃勒索软件", "域管理员被入侵", "客户数据泄露"]
high:
definition: "已确认的业务系统或多个用户账号被入侵"
response_sla: 30 minutes
resolution_sla: 8 hours
escalation: 立即升级 Tier 2,2 小时内未解决升级 Tier 3
examples: ["带 C2 的恶意软件", "检测到横向移动", "带凭据窃取的钓鱼攻击"]
medium:
definition: "已确认的需要调查和修复的安全事件"
response_sla: 2 hours
resolution_sla: 24 hours
escalation: 4 小时内升级 Tier 2
examples: ["单次钓鱼点击", "未授权软件", "策略违规"]
low:
definition: "影响有限的轻微安全事件"
response_sla: 8 hours
resolution_sla: 72 hours
escalation: Tier 1 标准队列
examples: ["扫描尝试", "暴力破解失败(未被入侵)", "信息披露"]
ServiceNow 通过 REST API 集成:
import requests
import json
from datetime import datetime
class IncidentTicketManager:
def __init__(self, snow_url, snow_user, snow_password):
self.snow_url = snow_url
self.auth = (snow_user, snow_password)
self.headers = {
"Content-Type": "application/json",
"Accept": "application/json"
}
def create_incident(self, alert_data):
"""从 SIEM 告警创建 ServiceNow 事件"""
severity_map = {
"critical": "1",
"high": "2",
"medium": "3",
"low": "4"
}
payload = {
"short_description": f"[SEC] {alert_data['rule_name']} — {alert_data['src']}",
"description": self._build_description(alert_data),
"category": "Security",
"subcategory": alert_data.get("category", "Investigation"),
"urgency": severity_map.get(alert_data["severity"], "3"),
"impact": severity_map.get(alert_data["severity"], "3"),
"assignment_group": self._get_assignment_group(alert_data["severity"]),
"caller_id": "soc_automation",
"u_siem_event_id": alert_data.get("notable_id", ""),
"u_mitre_technique": alert_data.get("mitre_technique", ""),
"u_affected_hosts": ", ".join(alert_data.get("affected_hosts", [])),
"u_iocs": json.dumps(alert_data.get("iocs", {}))
}
response = requests.post(
f"{self.snow_url}/api/now/table/incident",
auth=self.auth,
headers=self.headers,
json=payload
)
result = response.json()["result"]
return {
"ticket_number": result["number"],
"sys_id": result["sys_id"],
"state": result["state"]
}
def _build_description(self, alert_data):
return f"""
安全事件 — 由 SIEM 自动生成
================================================
告警规则: {alert_data['rule_name']}
SIEM 事件 ID: {alert_data.get('notable_id', 'N/A')}
检测时间: {alert_data['detection_time']}
严重程度: {alert_data['severity'].upper()}
MITRE ATT&CK: {alert_data.get('mitre_technique', 'N/A')}
源地址: {alert_data.get('src', 'N/A')}
目标地址: {alert_data.get('dest', 'N/A')}
用户: {alert_data.get('user', 'N/A')}
初始上下文:
{alert_data.get('description', '详见 SIEM。')}
IOC:
{json.dumps(alert_data.get('iocs', {}), indent=2)}
"""
def _get_assignment_group(self, severity):
if severity in ("critical", "high"):
return "SOC Tier 2"
return "SOC Tier 1"
def update_incident(self, ticket_number, updates):
"""更新已有事件"""
# 先通过工单号获取 sys_id
response = requests.get(
f"{self.snow_url}/api/now/table/incident",
auth=self.auth,
headers=self.headers,
params={"sysparm_query": f"number={ticket_number}", "sysparm_limit": 1}
)
sys_id = response.json()["result"][0]["sys_id"]
# 更新
response = requests.patch(
f"{self.snow_url}/api/now/table/incident/{sys_id}",
auth=self.auth,
headers=self.headers,
json=updates
)
return response.json()["result"]
def add_work_note(self, ticket_number, note):
"""向事件添加调查备注"""
self.update_incident(ticket_number, {"work_notes": note})
def escalate_incident(self, ticket_number, reason):
"""升级至下一层级"""
self.update_incident(ticket_number, {
"assignment_group": "SOC Tier 3",
"urgency": "1",
"work_notes": f"已升级:{reason}"
})
def resolve_incident(self, ticket_number, resolution):
"""解决并关闭事件"""
self.update_incident(ticket_number, {
"state": "6", # 已解决
"close_code": "Resolved",
"close_notes": resolution,
"u_incident_disposition": resolution.split(":")[0] if ":" in resolution else "Resolved"
})
TheHive 案例创建(ServiceNow 的替代方案):
import requests
class TheHiveCaseManager:
def __init__(self, thehive_url, api_key):
self.url = thehive_url
self.headers = {
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json"
}
def create_case(self, alert_data):
"""从 SIEM 告警在 TheHive 中创建案例"""
case = {
"title": f"[{alert_data['severity'].upper()}] {alert_data['rule_name']}",
"description": self._build_markdown_description(alert_data),
"severity": {"critical": 4, "high": 3, "medium": 2, "low": 1}.get(
alert_data["severity"], 2
),
"tlp": 2, # TLP:AMBER
"pap": 2, # PAP:AMBER
"tags": [
alert_data.get("mitre_technique", ""),
alert_data.get("category", ""),
f"source:{alert_data.get('src', 'unknown')}"
],
"tasks": self._generate_tasks(alert_data["severity"]),
"customFields": {
"siem-event-id": {"string": alert_data.get("notable_id", "")},
"mitre-technique": {"string": alert_data.get("mitre_technique", "")},
"detection-source": {"string": "Splunk ES"}
}
}
response = requests.post(
f"{self.url}/api/case",
headers=self.headers,
json=case
)
return response.json()
def _generate_tasks(self, severity):
"""根据严重程度生成调查任务"""
tasks = [
{"title": "初始分诊", "group": "Phase 1", "description": "审查 SIEM 告警并验证发现"},
{"title": "IOC 丰富化", "group": "Phase 1", "description": "使用 VT、AbuseIPDB 丰富所有 IOC"},
{"title": "范围评估", "group": "Phase 2", "description": "确定受影响的系统和用户"},
]
if severity in ("critical", "high"):
tasks.extend([
{"title": "遏制行动", "group": "Phase 2", "description": "隔离受影响系统"},
{"title": "证据收集", "group": "Phase 3", "description": "保存取证构件"},
{"title": "清除", "group": "Phase 3", "description": "从环境中清除威胁"},
{"title": "恢复", "group": "Phase 4", "description": "将系统恢复正常运行"},
{"title": "事后复盘", "group": "Phase 4", "description": "记录经验教训"},
])
else:
tasks.append(
{"title": "解决和文档记录", "group": "Phase 2", "description": "记录发现并关闭"}
)
return tasks
def add_observable(self, case_id, ioc_type, ioc_value, description=""):
"""向案例添加 IOC 可观测指标"""
observable = {
"dataType": ioc_type,
"data": ioc_value,
"message": description,
"tlp": 2,
"ioc": True,
"tags": ["auto-extracted"]
}
response = requests.post(
f"{self.url}/api/case/{case_id}/artifact",
headers=self.headers,
json=observable
)
return response.json()
Splunk SLA 监控仪表板:
--- 即将违反 SLA 的活跃事件
index=servicenow sourcetype="snow:incident" category="Security" state IN ("New", "In Progress")
| eval sla_minutes = case(
urgency="1", 15,
urgency="2", 30,
urgency="3", 120,
urgency="4", 480
)
| eval age_minutes = round((now() - strptime(opened_at, "%Y-%m-%d %H:%M:%S")) / 60, 0)
| eval sla_remaining = sla_minutes - age_minutes
| eval sla_status = case(
sla_remaining < 0, "BREACHED",
sla_remaining < sla_minutes * 0.25, "AT RISK",
1=1, "ON TRACK"
)
| where sla_status IN ("BREACHED", "AT RISK")
| sort sla_remaining
| table number, short_description, urgency, assignment_group, assigned_to,
age_minutes, sla_minutes, sla_remaining, sla_status
自动升级逻辑:
def check_sla_breaches(ticket_manager):
"""检查 SLA 违反情况并自动升级"""
open_incidents = ticket_manager.get_open_incidents()
for incident in open_incidents:
age_minutes = (datetime.utcnow() - incident["opened_at"]).total_seconds() / 60
sla_minutes = {"1": 15, "2": 30, "3": 120, "4": 480}[incident["urgency"]]
if age_minutes > sla_minutes and incident["state"] == "New":
ticket_manager.escalate_incident(
incident["number"],
f"SLA 违反:已过 {int(age_minutes)} 分钟,SLA 为 {sla_minutes} 分钟。自动升级。"
)
--- 月度事件指标
index=servicenow sourcetype="snow:incident" category="Security"
opened_at > "2024-03-01" opened_at < "2024-04-01"
| stats count AS total,
avg(eval((resolved_at - opened_at) / 3600)) AS avg_resolution_hours,
sum(eval(if(urgency="1", 1, 0))) AS critical,
sum(eval(if(urgency="2", 1, 0))) AS high,
sum(eval(if(urgency="3", 1, 0))) AS medium,
sum(eval(if(urgency="4", 1, 0))) AS low
| eval avg_resolution = round(avg_resolution_hours, 1)
--- SLA 合规率
index=servicenow sourcetype="snow:incident" category="Security" state="Resolved"
| eval sla_target = case(urgency="1", 4, urgency="2", 8, urgency="3", 24, urgency="4", 72)
| eval resolution_hours = (resolved_at - opened_at) / 3600
| eval sla_met = if(resolution_hours <= sla_target, 1, 0)
| stats sum(sla_met) AS met, count AS total
| eval compliance_pct = round(met / total * 100, 1)
| 术语 | 定义 |
|---|---|
| 事件工单(Incident Ticket) | 已确认安全事件的正式跟踪记录,具有完整生命周期管理 |
| SLA | 服务级别协议,按严重程度定义最大响应和解决时间 |
| 升级路径(Escalation Path) | 基于严重程度、耗时或分析师请求,从 Tier 1 到 Tier 2/3 的路由定义 |
| 处置(Disposition) | 已关闭事件的最终分类(真阳性、误报、重复、策略违规) |
| MTTR | 平均解决时间(Mean Time to Resolve)——从工单创建到解决的跨所有事件平均时间 |
| 案例管理(Case Management) | 通过任务、可观测指标和审计跟踪管理复杂事件的结构化方法 |
事件工单 — INC0012567
━━━━━━━━━━━━━━━━━━━━━━━━━━━
标题: [SEC] 检测到 Cobalt Strike C2 信标 — WORKSTATION-042
类别: 安全 > 恶意软件感染
严重程度: 严重(P1)
SLA: 响应:15 分钟 | 解决:4 小时
时间线:
14:23 工单创建(自 Splunk ES NE-2024-08921 自动创建)
14:25 分配给 analyst_jdoe(Tier 2)
14:28 工作备注:"VT 确认 Cobalt Strike 信标,哈希值 a1b2c3..."
14:35 工作备注:"主机已通过 CrowdStrike 隔离,C2 域名已封锁"
15:00 工作备注:"企业 IOC 扫描——发现另外 2 台受影响主机"
15:30 升级至 Tier 3 进行取证分析
16:00 工作备注:"所有受影响主机已遏制并清除"
18:00 已解决:"恶意软件已清除,系统已恢复,监控 72 小时"
指标:
确认时间: 2 分钟
遏制时间: 12 分钟
解决时间: 3 小时 37 分钟
SLA 状态: 已达成(在 4 小时解决目标内)