Implements threat intelligence lifecycle management covering planning, collection, processing, analysis, dissemination, and feedback stages using Python with pymisp and stix2 for CTI programs.
npx claudepluginhub killvxk/cybersecurity-skills-zhThis skill uses the workspace's default tool permissions.
威胁情报生命周期是将原始数据转化为可操作情报的结构化迭代过程。基于军事和政府机构使用的情报周期,它由六个阶段组成:指导(需求收集)、收集(数据获取)、处理(规范化和去重)、分析(情境化和评估)、传播(向相关方分发)和反馈(评估和优化)。本技能涵盖为成熟 CTI 计划构建每个阶段的工具、指标和集成点。
Implements threat intelligence lifecycle management covering planning, collection, processing, analysis, dissemination, and feedback for cybersecurity CTI programs.
Implements threat intelligence lifecycle management: planning, collection, processing, analysis, dissemination, feedback for building CTI programs with Python tooling.
Manages end-to-end cyber threat intelligence (CTI) lifecycle: planning, collection, processing, analysis, dissemination, and feedback. Use for establishing/maturing CTI programs, defining PIRs, or feedback loops.
Share bugs, ideas, or general feedback.
威胁情报生命周期是将原始数据转化为可操作情报的结构化迭代过程。基于军事和政府机构使用的情报周期,它由六个阶段组成:指导(需求收集)、收集(数据获取)、处理(规范化和去重)、分析(情境化和评估)、传播(向相关方分发)和反馈(评估和优化)。本技能涵盖为成熟 CTI 计划构建每个阶段的工具、指标和集成点。
pymisp、stix2、requests、pandas 库优先情报需求(PIR)定义组织需要了解的内容。示例:哪些威胁行为者针对我们的行业?哪些漏洞正在被主动利用?我们的品牌或凭据是否在暗网上被交易?PIR 驱动收集计划并确保情报生产具有相关性。
收集管理框架将情报需求映射到收集来源,跟踪收集缺口,确保覆盖整个威胁态势。来源包括 OSINT、商业 Feed、ISAC 共享、内部遥测和行业联系人的人力情报。
战略情报为高层决策提供信息(威胁态势、风险趋势、地缘政治背景)。操作情报支持安全运营(活动跟踪、行为者 TTP、攻击时机)。战术情报实现即时防御(IOC、检测规则、黑名单)。
import json
from datetime import datetime
from enum import Enum
class Priority(Enum):
CRITICAL = 1
HIGH = 2
MEDIUM = 3
LOW = 4
class IntelligenceRequirement:
def __init__(self, requirement_id, question, priority, stakeholder,
intelligence_level, collection_sources=None):
self.id = requirement_id
self.question = question
self.priority = priority
self.stakeholder = stakeholder
self.level = intelligence_level
self.sources = collection_sources or []
self.created = datetime.now().isoformat()
self.status = "active"
self.last_answered = None
def to_dict(self):
return {
"id": self.id,
"question": self.question,
"priority": self.priority.name,
"stakeholder": self.stakeholder,
"intelligence_level": self.level,
"collection_sources": self.sources,
"created": self.created,
"status": self.status,
"last_answered": self.last_answered,
}
class RequirementsManager:
def __init__(self):
self.requirements = []
def add_requirement(self, requirement):
self.requirements.append(requirement)
print(f"[+] 已添加 IR-{requirement.id}:{requirement.question[:60]}...")
def get_active_requirements(self, priority=None, level=None):
filtered = [r for r in self.requirements if r.status == "active"]
if priority:
filtered = [r for r in filtered if r.priority == priority]
if level:
filtered = [r for r in filtered if r.level == level]
return filtered
def export_requirements(self, output_file="intelligence_requirements.json"):
data = [r.to_dict() for r in self.requirements]
with open(output_file, "w") as f:
json.dump(data, f, indent=2)
print(f"[+] 已将 {len(data)} 个需求导出至 {output_file}")
# 定义组织 PIR
mgr = RequirementsManager()
mgr.add_requirement(IntelligenceRequirement(
"PIR-001", "哪些威胁行为者正在积极针对我们的行业?",
Priority.CRITICAL, "CISO", "strategic",
["MITRE ATT&CK", "ISAC feeds", "厂商报告"],
))
mgr.add_requirement(IntelligenceRequirement(
"PIR-002", "哪些漏洞正在被野外主动利用?",
Priority.CRITICAL, "漏洞管理", "operational",
["CISA KEV", "Exploit-DB", "VulnCheck", "Shodan"],
))
mgr.add_requirement(IntelligenceRequirement(
"PIR-003", "组织凭据或数据是否在暗网上暴露?",
Priority.HIGH, "SOC 经理", "tactical",
["暗网监控", "粘贴站点监控", "泄露数据库"],
))
mgr.add_requirement(IntelligenceRequirement(
"PIR-004", "针对云基础设施的新兴攻击技术有哪些?",
Priority.HIGH, "云安全", "operational",
["ATT&CK 云矩阵", "厂商通告", "ISAC 公告"],
))
mgr.export_requirements()
import requests
from datetime import datetime, timedelta
class CollectionPipeline:
def __init__(self, config):
self.config = config
self.collected_data = []
def collect_cisa_kev(self):
"""收集 CISA 已知被利用漏洞目录。"""
url = "https://www.cisa.gov/sites/default/files/feeds/known_exploited_vulnerabilities.json"
resp = requests.get(url, timeout=30)
if resp.status_code == 200:
data = resp.json()
vulns = data.get("vulnerabilities", [])
self.collected_data.append({
"source": "CISA KEV",
"type": "vulnerability",
"count": len(vulns),
"collected_at": datetime.now().isoformat(),
"data": vulns,
})
print(f"[+] CISA KEV:{len(vulns)} 个已知被利用漏洞")
return vulns
return []
def collect_otx_pulses(self, api_key, days=7):
"""收集最近的 OTX 脉冲。"""
headers = {"X-OTX-API-KEY": api_key}
since = (datetime.now() - timedelta(days=days)).isoformat()
url = f"https://otx.alienvault.com/api/v1/pulses/subscribed?modified_since={since}"
resp = requests.get(url, headers=headers, timeout=30)
if resp.status_code == 200:
pulses = resp.json().get("results", [])
self.collected_data.append({
"source": "AlienVault OTX",
"type": "threat_intelligence",
"count": len(pulses),
"collected_at": datetime.now().isoformat(),
})
print(f"[+] OTX:过去 {days} 天内 {len(pulses)} 个脉冲")
return pulses
return []
def collect_abuse_ch(self):
"""从 MalwareBazaar 收集最近的恶意软件样本。"""
url = "https://mb-api.abuse.ch/api/v1/"
resp = requests.post(url, data={"query": "get_recent", "selector": "time"}, timeout=30)
if resp.status_code == 200:
data = resp.json().get("data", [])
self.collected_data.append({
"source": "MalwareBazaar",
"type": "malware_samples",
"count": len(data),
"collected_at": datetime.now().isoformat(),
})
print(f"[+] MalwareBazaar:{len(data)} 个近期样本")
return data
return []
def get_collection_summary(self):
summary = {
"total_sources": len(self.collected_data),
"total_items": sum(d.get("count", 0) for d in self.collected_data),
"sources": [
{"name": d["source"], "type": d["type"], "count": d["count"]}
for d in self.collected_data
],
}
return summary
pipeline = CollectionPipeline({})
pipeline.collect_cisa_kev()
pipeline.collect_abuse_ch()
print(json.dumps(pipeline.get_collection_summary(), indent=2))
class IntelligenceProcessor:
def __init__(self):
self.processed_items = []
self.dedup_hashes = set()
def process_collection(self, raw_data, source_name):
"""规范化和去重收集到的情报。"""
processed = []
duplicates = 0
for item in raw_data:
normalized = self._normalize(item, source_name)
if normalized:
item_hash = self._compute_hash(normalized)
if item_hash not in self.dedup_hashes:
self.dedup_hashes.add(item_hash)
normalized["processed_at"] = datetime.now().isoformat()
processed.append(normalized)
else:
duplicates += 1
self.processed_items.extend(processed)
print(f"[+] 已从 {source_name} 处理 {len(processed)} 个条目"
f"(已删除 {duplicates} 个重复项)")
return processed
def _normalize(self, item, source):
"""将条目规范化为标准格式。"""
return {
"source": source,
"type": item.get("type", "unknown"),
"value": item.get("value", item.get("indicator", "")),
"confidence": item.get("confidence", 50),
"tlp": item.get("tlp", "green"),
"tags": item.get("tags", []),
"first_seen": item.get("first_seen", item.get("date_added", "")),
"raw": item,
}
def _compute_hash(self, item):
import hashlib
key = f"{item['type']}:{item['value']}:{item['source']}"
return hashlib.sha256(key.encode()).hexdigest()
processor = IntelligenceProcessor()
class IntelligenceAnalyzer:
def __init__(self, requirements, processed_data):
self.requirements = requirements
self.data = processed_data
def answer_requirement(self, requirement_id):
"""生产回答特定需求的情报。"""
req = next((r for r in self.requirements if r.id == requirement_id), None)
if not req:
return None
# 根据需求类型过滤相关数据
relevant = self.data # 实践中按需求主题过滤
analysis = {
"requirement_id": requirement_id,
"question": req.question,
"intelligence_level": req.level,
"data_points_analyzed": len(relevant),
"produced_at": datetime.now().isoformat(),
"key_findings": [],
"confidence": "medium",
"recommendations": [],
}
return analysis
def produce_daily_brief(self):
"""生产每日威胁情报简报。"""
brief = {
"date": datetime.now().strftime("%Y-%m-%d"),
"total_items_processed": len(self.data),
"highlights": [],
"active_requirements_status": [
{"id": r.id, "question": r.question[:80], "status": r.status}
for r in self.requirements if r.status == "active"
],
}
return brief
class IntelligenceDisseminator:
def __init__(self):
self.distribution_log = []
def distribute_report(self, report, channels, classification="TLP:GREEN"):
"""通过适当渠道向相关方分发情报报告。"""
for channel in channels:
entry = {
"report_id": report.get("requirement_id", "daily"),
"channel": channel,
"classification": classification,
"distributed_at": datetime.now().isoformat(),
"status": "sent",
}
self.distribution_log.append(entry)
print(f" [+] 已分发至 {channel}")
def collect_feedback(self, report_id, stakeholder, rating, comments=""):
"""收集相关方对情报产品的反馈。"""
feedback = {
"report_id": report_id,
"stakeholder": stakeholder,
"rating": rating, # 1-5
"comments": comments,
"received_at": datetime.now().isoformat(),
}
print(f"[+] 已收到来自 {stakeholder} 的反馈:{rating}/5")
return feedback
def calculate_metrics(self):
"""计算 CTI 计划绩效指标。"""
metrics = {
"total_products_distributed": len(self.distribution_log),
"distribution_by_channel": {},
}
for entry in self.distribution_log:
channel = entry["channel"]
if channel not in metrics["distribution_by_channel"]:
metrics["distribution_by_channel"][channel] = 0
metrics["distribution_by_channel"][channel] += 1
return metrics
disseminator = IntelligenceDisseminator()