Performs sector-specific threat landscape assessments by analyzing threat actors, attack vectors, TTPs, and vulnerabilities using MITRE ATT&CK data and Python tools. Useful for cybersecurity risk prioritization in finance, healthcare, energy.
npx claudepluginhub killvxk/cybersecurity-skills-zhThis skill uses the workspace's default tool permissions.
行业特定威胁态势评估(Threat Landscape Assessment)通过研究哪些威胁行为者针对特定行业、其惯用攻击向量和 TTP(战术、技术和程序,Tactics, Techniques, and Procedures)、常被利用的漏洞、历史事件数据及新兴威胁,分析特定行业垂直领域(医疗、金融服务、能源、政府、制造业)所面临的网络威胁环境。该评估为风险管理、安全投入优先级排序和董事会级汇报提供可落地的情报支持。
Conducts sector-specific cybersecurity threat landscape assessments analyzing threat actors, attack vectors, vulnerabilities, and incidents to inform risk management.
Conducts sector-specific threat landscape assessments analyzing threat actors, attack vectors, vulnerabilities, and incidents to inform risk management and security prioritization.
Analyzes ransomware data leak sites (DLS) to track victims, extract threat intelligence on tactics, and assess industry risks using Python from sources like Ransomwatch.
Share bugs, ideas, or general feedback.
行业特定威胁态势评估(Threat Landscape Assessment)通过研究哪些威胁行为者针对特定行业、其惯用攻击向量和 TTP(战术、技术和程序,Tactics, Techniques, and Procedures)、常被利用的漏洞、历史事件数据及新兴威胁,分析特定行业垂直领域(医疗、金融服务、能源、政府、制造业)所面临的网络威胁环境。该评估为风险管理、安全投入优先级排序和董事会级汇报提供可落地的情报支持。
attackcti、requests、pandas、matplotlib 库不同行业面临不同的威胁画像。金融服务面临高级国家级威胁行为者(Lazarus Group)和专注于金融欺诈的网络犯罪组织。医疗行业面临利用紧迫性和遗留系统的勒索软件(Ransomware)组织。能源和关键基础设施面临具有破坏能力的国家级组织(TEMP.Veles、Sandworm)。政府部门面临以间谍活动为目的的 APT(高级持续性威胁,Advanced Persistent Threat)组织(APT29、APT28、Turla)。
全面评估包括:威胁行为者画像(针对该行业的组织)、攻击向量分析(观测到的初始访问方法)、TTP 映射(该行业常见技术)、漏洞态势(常被利用的 CVE)、事件趋势分析(泄露频率、影响、恢复时间)及新兴威胁(新组织、演变技术、供应链风险)。
行业特定情报来源包括:ISAC、政府公告(CISA、FBI、NSA)、厂商威胁报告(CrowdStrike 年度威胁报告、Mandiant M-Trends、Verizon DBIR),以及行业特定攻击的学术研究。
from attackcti import attack_client
import json
class SectorThreatAssessment:
SECTOR_GROUPS = {
"financial": ["FIN7", "FIN8", "FIN11", "Carbanak", "Lazarus Group",
"Cobalt Group", "TA505", "GOLD SOUTHFIELD"],
"healthcare": ["FIN12", "Ryuk", "Conti", "Wizard Spider",
"GOLD ULRICK", "Vice Society"],
"energy": ["TEMP.Veles", "Sandworm Team", "Dragonfly",
"XENOTIME", "ERYTHRITE", "Berserk Bear"],
"government": ["APT29", "APT28", "Turla", "Gamaredon Group",
"Mustang Panda", "APT41", "Lazarus Group"],
"manufacturing": ["APT41", "TEMP.Veles", "Dragonfly",
"HEXANE", "MAGNALLIUM"],
"technology": ["APT41", "Lazarus Group", "APT10",
"HAFNIUM", "Winnti Group"],
}
def __init__(self, sector):
self.sector = sector.lower()
self.lift = attack_client()
self.groups = self.lift.get_groups()
self.assessment = {
"sector": sector,
"threat_actors": [],
"common_techniques": {},
"attack_vectors": {},
"risk_summary": {},
}
def analyze_sector_actors(self):
"""分析已知针对该行业的威胁行为者。"""
target_groups = self.SECTOR_GROUPS.get(self.sector, [])
actor_profiles = []
for group_name in target_groups:
group = next(
(g for g in self.groups
if g.get("name", "").lower() == group_name.lower()
or group_name.lower() in [a.lower() for a in g.get("aliases", [])]),
None
)
if group:
group_id = ""
for ref in group.get("external_references", []):
if ref.get("source_name") == "mitre-attack":
group_id = ref.get("external_id", "")
break
techniques = []
if group_id:
techs = self.lift.get_techniques_used_by_group(group_id)
for t in techs:
for ref in t.get("external_references", []):
if ref.get("source_name") == "mitre-attack":
techniques.append({
"id": ref.get("external_id", ""),
"name": t.get("name", ""),
})
break
profile = {
"name": group.get("name", ""),
"aliases": group.get("aliases", []),
"description": group.get("description", "")[:300],
"attack_id": group_id,
"technique_count": len(techniques),
"techniques": techniques[:20],
}
actor_profiles.append(profile)
print(f" [+] {group.get('name')}: {len(techniques)} 个技术")
self.assessment["threat_actors"] = actor_profiles
print(f"[+] 已画像 {len(actor_profiles)} 个 {self.sector} 行业威胁行为者")
return actor_profiles
def identify_common_techniques(self):
"""找出行业内各威胁行为者最常用的技术。"""
from collections import Counter
technique_counter = Counter()
for actor in self.assessment["threat_actors"]:
for tech in actor.get("techniques", []):
technique_counter[f"{tech['id']}:{tech['name']}"] += 1
common = technique_counter.most_common(20)
self.assessment["common_techniques"] = [
{
"technique": tech.split(":")[0],
"name": tech.split(":")[1] if ":" in tech else "",
"actor_count": count,
"actors_using": [
a["name"] for a in self.assessment["threat_actors"]
if any(t["id"] == tech.split(":")[0] for t in a.get("techniques", []))
],
}
for tech, count in common
]
print(f"\n=== {self.sector.upper()} 行业高频技术 ===")
for entry in self.assessment["common_techniques"][:10]:
print(f" {entry['technique']} {entry['name']}: "
f"{entry['actor_count']} 个组织使用")
return self.assessment["common_techniques"]
assessment = SectorThreatAssessment("financial")
assessment.analyze_sector_actors()
assessment.identify_common_techniques()
def analyze_attack_vectors(assessment):
"""分析该行业常见的初始访问向量。"""
initial_access_techniques = [
t for t in assessment.assessment["common_techniques"]
if t["technique"].startswith("T1566") or t["technique"].startswith("T1190")
or t["technique"].startswith("T1133") or t["technique"].startswith("T1078")
or t["technique"].startswith("T1195")
]
# 补充已知的行业特定向量
sector_vectors = {
"financial": {
"primary": ["鱼叉式钓鱼 Spearphishing (T1566)", "利用公网应用 Exploit Public-Facing App (T1190)",
"有效账户 Valid Accounts (T1078)", "供应链攻击 Supply Chain Compromise (T1195)"],
"emerging": ["MFA 疲劳/推送轰炸", "二维码钓鱼(Quishing)",
"商业邮件攻击 BEC", "API 密钥窃取"],
},
"healthcare": {
"primary": ["鱼叉式钓鱼 Spearphishing (T1566)", "利用公网应用 Exploit Public-Facing App (T1190)",
"外部远程服务 External Remote Services (T1133)", "有效账户 Valid Accounts (T1078)"],
"emerging": ["IoMT 设备利用", "远程医疗平台攻击",
"医疗设备固件攻击", "通过 EHR 供应商的供应链攻击"],
},
"energy": {
"primary": ["鱼叉式钓鱼 Spearphishing (T1566)", "利用公网应用 Exploit Public-Facing App (T1190)",
"外部远程服务 External Remote Services (T1133)", "供应链攻击 Supply Chain Compromise (T1195)"],
"emerging": ["OT/ICS 协议利用", "远程访问 SCADA",
"工程师工作站入侵", "供应商 VPN 利用"],
},
}
vectors = sector_vectors.get(assessment.sector, {})
assessment.assessment["attack_vectors"] = vectors
return vectors
def generate_sector_report(assessment):
data = assessment.assessment
report = f"""# {data['sector'].title()} 行业威胁态势评估
生成时间: {__import__('datetime').datetime.now().isoformat()}
## 执行摘要
本评估分析了 {data['sector']} 行业的网络威胁态势,
识别出 {len(data['threat_actors'])} 个活跃威胁组织、其惯用技术
及推荐的防御优先级。
## 威胁行为者摘要
| 行为者 | ATT&CK ID | 技术数量 | 主要关注点 |
|-------|-----------|------------|-----------|
"""
for actor in data["threat_actors"]:
report += (f"| {actor['name']} | {actor['attack_id']} "
f"| {actor['technique_count']} | {actor['description'][:60]}... |\n")
report += f"""
## 最常用技术
| 排名 | 技术 | 名称 | 使用的组织 |
|------|-----------|------|-------------|
"""
for i, tech in enumerate(data.get("common_techniques", [])[:15], 1):
actors = ", ".join(tech["actors_using"][:3])
report += f"| {i} | {tech['technique']} | {tech['name']} | {actors} |\n"
vectors = data.get("attack_vectors", {})
report += f"""
## 攻击向量
### 主要向量
"""
for v in vectors.get("primary", []):
report += f"- {v}\n"
report += "\n### 新兴向量\n"
for v in vectors.get("emerging", []):
report += f"- {v}\n"
report += """
## 建议
1. 优先为行业定向组织使用的前 10 个技术构建检测能力
2. 以已识别的威胁行为者为蓝本开展威胁驱动型红队演练
3. 加入行业 ISAC 以实现实时威胁共享
4. 针对已识别的初始访问向量实施安全控制
5. 针对行业特定风险审查供应链安全态势
"""
with open(f"threat_landscape_{data['sector']}.md", "w") as f:
f.write(report)
print(f"[+] 行业报告已保存: threat_landscape_{data['sector']}.md")
generate_sector_report(assessment)