Monitors dark web threats like leaked credentials, data breaches, and actor discussions by scanning Tor services, forums, and paste sites with Python and Tor proxies.
npx claudepluginhub killvxk/cybersecurity-skills-zhThis skill uses the workspace's default tool permissions.
暗网威胁监控涉及系统性扫描 Tor 隐藏服务、地下论坛、粘贴站点和暗网市场,以识别针对组织的威胁,包括泄露凭据、数据泄露、威胁行为者讨论、漏洞利用工具和预谋攻击。本技能涵盖搭建监控基础设施、使用基于 Tor 的收集工具、为品牌提及和凭据泄露实现自动告警,以及分析暗网情报以获取可操作的威胁指标。
Monitors dark web forums, markets, paste sites, and ransomware leak sites for organization asset mentions, leaked credentials, threats, and actor communications. Use for OSINT, leak investigations, and threat intel enrichment.
Guides dark web monitoring for threats by scanning Tor hidden services, forums, paste sites, and marketplaces for leaked credentials, breaches, and attacks targeting organizations.
Guides dark web monitoring for organizational threats using Tor crawlers, Python tools, forums scanning, and APIs like Flare. For threat intelligence, security audits, and incident response.
Share bugs, ideas, or general feedback.
暗网威胁监控涉及系统性扫描 Tor 隐藏服务、地下论坛、粘贴站点和暗网市场,以识别针对组织的威胁,包括泄露凭据、数据泄露、威胁行为者讨论、漏洞利用工具和预谋攻击。本技能涵盖搭建监控基础设施、使用基于 Tor 的收集工具、为品牌提及和凭据泄露实现自动告警,以及分析暗网情报以获取可操作的威胁指标。
requests、stem、beautifulsoup4、stix2 库import requests
from requests.adapters import HTTPAdapter
def create_tor_session():
"""创建通过 Tor SOCKS5 代理路由的 requests 会话。"""
session = requests.Session()
session.proxies = {
"http": "socks5h://127.0.0.1:9050",
"https": "socks5h://127.0.0.1:9050",
}
session.headers.update({
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; rv:109.0) Gecko/20100101 Firefox/115.0",
})
return session
def verify_tor_connection(session):
"""验证流量是否通过 Tor 路由。"""
try:
resp = session.get("https://check.torproject.org/api/ip", timeout=30)
data = resp.json()
return {
"is_tor": data.get("IsTor", False),
"ip": data.get("IP", ""),
}
except Exception as e:
return {"error": str(e)}
import re
from datetime import datetime
def monitor_paste_sites(session, organization_domains):
"""监控粘贴站点,查找与组织域名匹配的泄露凭据。"""
findings = []
# 检查 Have I Been Pwned API(明网)
for domain in organization_domains:
try:
resp = requests.get(
f"https://haveibeenpwned.com/api/v3/breaches",
headers={"hibp-api-key": "YOUR_HIBP_KEY"},
timeout=30,
)
if resp.status_code == 200:
breaches = resp.json()
for breach in breaches:
if domain.lower() in breach.get("Domain", "").lower():
findings.append({
"source": "HIBP",
"breach_name": breach["Name"],
"breach_date": breach.get("BreachDate"),
"data_classes": breach.get("DataClasses", []),
"pwn_count": breach.get("PwnCount", 0),
"domain": domain,
})
except Exception as e:
print(f"[-] {domain} 的 HIBP 查询出错:{e}")
return findings
def search_for_keywords(session, keywords, onion_paste_urls):
"""在暗网粘贴站点中搜索特定关键词。"""
results = []
for paste_url in onion_paste_urls:
try:
resp = session.get(paste_url, timeout=60)
if resp.status_code == 200:
content = resp.text.lower()
for keyword in keywords:
if keyword.lower() in content:
results.append({
"url": paste_url,
"keyword": keyword,
"timestamp": datetime.utcnow().isoformat(),
"snippet": extract_context(content, keyword.lower()),
})
except Exception as e:
print(f"[-] 获取 {paste_url} 时出错:{e}")
return results
def extract_context(text, keyword, context_chars=200):
"""提取关键词匹配周围的文本上下文。"""
idx = text.find(keyword)
if idx == -1:
return ""
start = max(0, idx - context_chars)
end = min(len(text), idx + len(keyword) + context_chars)
return text[start:end]
def check_ransomware_leak_sites(session, organization_name):
"""检查已知勒索软件组织泄露站点是否有组织提及。"""
# 使用 Ransomwatch API(勒索软件泄露站点的明网聚合器)
try:
resp = requests.get(
"https://raw.githubusercontent.com/joshhighet/ransomwatch/main/posts.json",
timeout=30,
)
if resp.status_code == 200:
posts = resp.json()
matches = []
for post in posts:
post_title = post.get("post_title", "").lower()
if organization_name.lower() in post_title:
matches.append({
"group": post.get("group_name", ""),
"title": post.get("post_title", ""),
"discovered": post.get("discovered", ""),
"url": post.get("post_url", ""),
})
return matches
except Exception as e:
print(f"[-] Ransomwatch 查询出错:{e}")
return []
def generate_dark_web_report(findings, organization):
"""生成结构化暗网情报报告。"""
report = {
"organization": organization,
"report_date": datetime.utcnow().isoformat(),
"executive_summary": "",
"credential_leaks": [],
"ransomware_mentions": [],
"dark_web_mentions": [],
"recommendations": [],
}
for finding in findings:
if finding.get("source") == "HIBP":
report["credential_leaks"].append(finding)
elif finding.get("group"):
report["ransomware_mentions"].append(finding)
else:
report["dark_web_mentions"].append(finding)
# 生成执行摘要
cred_count = len(report["credential_leaks"])
ransom_count = len(report["ransomware_mentions"])
report["executive_summary"] = (
f"监控发现 {organization} 存在 {cred_count} 个凭据泄露来源"
f"和 {ransom_count} 个勒索软件组织提及。"
)
if ransom_count > 0:
report["recommendations"].append(
"严重:组织在勒索软件泄露站点被提及。"
"立即启动事件响应。"
)
if cred_count > 0:
report["recommendations"].append(
"高危:检测到泄露凭据。强制受影响账户重置密码"
"并启用 MFA。"
)
return report