Analyzes ransomware data leak sites (DLS) to track victims, extract threat intelligence on tactics, and assess industry risks using Python from sources like Ransomwatch.
npx claudepluginhub killvxk/cybersecurity-skills-zhThis skill uses the workspace's default tool permissions.
采用双重勒索模式运营的勒索软件(Ransomware)组织在 Tor 隐藏服务上维护数据泄露站点(DLS),在那里发布受害者名称、被盗数据样本和倒计时器以施压付款。2025 年上半年,96 个独特勒索软件组织活跃,每月约发布 535 名受害者。监控这些站点提供了关于活跃威胁组织、目标行业、地理模式和新兴勒索软件家族的情报。本技能涵盖安全收集 DLS 情报、提取结构化数据、追踪组织活动趋势,以及生成行业特定风险评估。
Analyzes ransomware data leak sites (DLS) to track victim postings, extract threat intelligence on group tactics, and assess sector-specific risks for defense.
Monitors ransomware leak sites to track victims, extract threat intelligence on group tactics, and assess sector-specific risks for defense.
Monitors dark web forums, markets, paste sites, and ransomware leak sites for organization asset mentions, leaked credentials, threats, and actor communications. Use for OSINT, leak investigations, and threat intel enrichment.
Share bugs, ideas, or general feedback.
采用双重勒索模式运营的勒索软件(Ransomware)组织在 Tor 隐藏服务上维护数据泄露站点(DLS),在那里发布受害者名称、被盗数据样本和倒计时器以施压付款。2025 年上半年,96 个独特勒索软件组织活跃,每月约发布 535 名受害者。监控这些站点提供了关于活跃威胁组织、目标行业、地理模式和新兴勒索软件家族的情报。本技能涵盖安全收集 DLS 情报、提取结构化数据、追踪组织活动趋势,以及生成行业特定风险评估。
requests、beautifulsoup4、pandas、matplotlib 库现代勒索软件组织在加密受害者数据之前还会将其外泄(Exfiltration)。泄露站点作为公开施压工具:受害者以倒计时器、部分数据样本和文件目录的形式被列出。若未支付赎金,完整数据将被公开。部分组织已转向三重勒索,追加 DDoS 威胁或直接联系受害者客户。
泄露站点提供:受害者识别(公司名称、行业、国家)、攻击时间线(列出时间、截止日期、数据发布时间)、数据量估算、组织能力评估(目标行业、攻击频率、操作节奏),以及趋势分析(新组织出现、组织品牌重塑、执法打击)。
切勿在生产环境中直接访问 DLS 站点。使用专用监控服务(Ransomwatch、DarkFeed、KELA、Flashpoint)、Tor 隔离研究虚拟机、商业威胁情报平台或社区维护的数据集。所有分析应在隔离环境中进行,并获得适当授权。
import requests
import json
import pandas as pd
from datetime import datetime, timedelta
from collections import Counter
class RansomwareIntelCollector:
"""从公开追踪来源收集勒索软件 DLS 情报。"""
RANSOMWATCH_API = "https://raw.githubusercontent.com/joshhighet/ransomwatch/main/posts.json"
RANSOMWATCH_GROUPS = "https://raw.githubusercontent.com/joshhighet/ransomwatch/main/groups.json"
def __init__(self):
self.posts = []
self.groups = []
def fetch_ransomwatch_data(self):
"""从 ransomwatch 获取勒索软件受害者发布数据。"""
resp = requests.get(self.RANSOMWATCH_API, timeout=30)
if resp.status_code == 200:
self.posts = resp.json()
print(f"[+] 已从 ransomwatch 加载 {len(self.posts)} 条受害者记录")
else:
print(f"[-] 获取记录失败: {resp.status_code}")
resp = requests.get(self.RANSOMWATCH_GROUPS, timeout=30)
if resp.status_code == 200:
self.groups = resp.json()
print(f"[+] 已加载 {len(self.groups)} 个勒索软件组织画像")
return self.posts
def get_recent_victims(self, days=30):
"""获取最近 N 天内发布的受害者。"""
cutoff = datetime.now() - timedelta(days=days)
recent = []
for post in self.posts:
try:
discovered = datetime.fromisoformat(
post.get("discovered", "").replace("Z", "+00:00")
)
if discovered.replace(tzinfo=None) >= cutoff:
recent.append(post)
except (ValueError, TypeError):
continue
print(f"[+] 最近 {days} 天内 {len(recent)} 名受害者")
return recent
def get_group_activity(self, group_name):
"""获取特定勒索软件组织的所有发布记录。"""
group_posts = [
p for p in self.posts
if p.get("group_name", "").lower() == group_name.lower()
]
print(f"[+] {group_name}: 共 {len(group_posts)} 名受害者")
return group_posts
collector = RansomwareIntelCollector()
collector.fetch_ransomwatch_data()
recent = collector.get_recent_victims(days=30)
def analyze_group_trends(posts, top_n=15):
"""分析勒索软件组织活动趋势。"""
group_counts = Counter(p.get("group_name", "unknown") for p in posts)
monthly_activity = {}
for post in posts:
try:
date = datetime.fromisoformat(
post.get("discovered", "").replace("Z", "+00:00")
)
month_key = date.strftime("%Y-%m")
group = post.get("group_name", "unknown")
if month_key not in monthly_activity:
monthly_activity[month_key] = Counter()
monthly_activity[month_key][group] += 1
except (ValueError, TypeError):
continue
analysis = {
"total_posts": len(posts),
"unique_groups": len(group_counts),
"top_groups": group_counts.most_common(top_n),
"monthly_totals": {
month: sum(counts.values())
for month, counts in sorted(monthly_activity.items())
},
"monthly_top_groups": {
month: counts.most_common(5)
for month, counts in sorted(monthly_activity.items())
},
}
print(f"\n=== 勒索软件组织活动 ===")
print(f"追踪受害者总数: {analysis['total_posts']}")
print(f"活跃组织数量: {analysis['unique_groups']}")
print(f"\n前 {top_n} 活跃组织:")
for group, count in analysis["top_groups"]:
print(f" {group}: {count} 名受害者")
return analysis
trends = analyze_group_trends(collector.posts)
def assess_sector_risk(posts, target_sector=None, target_country=None):
"""评估特定行业或地区的勒索软件风险。"""
sector_data = {}
country_data = {}
for post in posts:
# 提取行业(并非所有情报源都包含此字段)
sector = post.get("sector", post.get("industry", "unknown"))
country = post.get("country", "unknown")
if sector not in sector_data:
sector_data[sector] = {"count": 0, "groups": Counter(), "recent": []}
sector_data[sector]["count"] += 1
sector_data[sector]["groups"][post.get("group_name", "")] += 1
if country not in country_data:
country_data[country] = {"count": 0, "groups": Counter()}
country_data[country]["count"] += 1
country_data[country]["groups"][post.get("group_name", "")] += 1
# 行业风险评分
total = len(posts)
risk_assessment = {
"total_victims": total,
"sectors": {},
"countries": {},
}
for sector, data in sorted(sector_data.items(), key=lambda x: -x[1]["count"]):
pct = (data["count"] / total * 100) if total > 0 else 0
risk_assessment["sectors"][sector] = {
"victim_count": data["count"],
"percentage": round(pct, 1),
"top_groups": data["groups"].most_common(5),
"risk_level": (
"critical" if pct > 15
else "high" if pct > 8
else "medium" if pct > 3
else "low"
),
}
for country, data in sorted(country_data.items(), key=lambda x: -x[1]["count"]):
pct = (data["count"] / total * 100) if total > 0 else 0
risk_assessment["countries"][country] = {
"victim_count": data["count"],
"percentage": round(pct, 1),
"top_groups": data["groups"].most_common(5),
}
return risk_assessment
risk = assess_sector_risk(collector.posts)
def track_new_groups(posts, lookback_days=90):
"""识别新出现的勒索软件组织。"""
group_first_seen = {}
for post in posts:
group = post.get("group_name", "")
try:
date = datetime.fromisoformat(
post.get("discovered", "").replace("Z", "+00:00")
)
if group not in group_first_seen or date < group_first_seen[group]["first_seen"]:
group_first_seen[group] = {
"first_seen": date,
"first_victim": post.get("post_title", ""),
}
except (ValueError, TypeError):
continue
cutoff = datetime.now() - timedelta(days=lookback_days)
new_groups = {
group: info for group, info in group_first_seen.items()
if info["first_seen"].replace(tzinfo=None) >= cutoff
}
# 统计每个新组织的受害者总数
for group in new_groups:
victims = [p for p in posts if p.get("group_name") == group]
new_groups[group]["total_victims"] = len(victims)
new_groups[group]["avg_per_month"] = round(
len(victims) / max(1, lookback_days / 30), 1
)
print(f"\n=== 新组织(最近 {lookback_days} 天)===")
for group, info in sorted(new_groups.items(), key=lambda x: -x[1]["total_victims"]):
print(f" {group}: {info['total_victims']} 名受害者, "
f"首次发现 {info['first_seen'].strftime('%Y-%m-%d')}")
return new_groups
new_groups = track_new_groups(collector.posts, lookback_days=90)
def generate_ransomware_intel_report(trends, risk, new_groups):
"""生成勒索软件威胁情报报告。"""
report = f"""# 勒索软件威胁情报报告
生成时间: {datetime.now().isoformat()}
## 执行摘要
- **追踪受害者总数**: {trends['total_posts']}
- **活跃勒索软件组织**: {trends['unique_groups']}
- **新兴组织(最近 90 天)**: {len(new_groups)}
## 最活跃组织
| 排名 | 组织 | 受害者数 |
|------|-------|---------|
"""
for i, (group, count) in enumerate(trends["top_groups"][:10], 1):
report += f"| {i} | {group} | {count} |\n"
report += "\n## 新兴组织\n"
for group, info in sorted(new_groups.items(), key=lambda x: -x[1]["total_victims"])[:10]:
report += f"- **{group}**: {info['total_victims']} 名受害者,首次出现于 {info['first_seen'].strftime('%Y-%m-%d')}\n"
report += "\n## 行业风险评估\n"
report += "| 行业 | 受害者数 | 占比 | 风险级别 |\n|--------|---------|---|------------|\n"
for sector, data in list(risk["sectors"].items())[:10]:
report += f"| {sector} | {data['victim_count']} | {data['percentage']}% | {data['risk_level'].upper()} |\n"
report += """
## 建议措施
1. 每日监控 DLS 情报,关注您的组织及供应链合作伙伴
2. 优先修补被最活跃组织利用的漏洞
3. 实施离线备份策略以降低勒索杠杆
4. 针对勒索软件场景开展桌面演练
5. 与行业 ISAC 和威胁共享社区共享指标
"""
with open("ransomware_intel_report.md", "w") as f:
f.write(report)
print("[+] 报告已保存: ransomware_intel_report.md")
return report
generate_ransomware_intel_report(trends, risk, new_groups)