Monitors Pastebin, GitHub Gists, and other paste sites for leaked credentials, API keys, and sensitive data using Python scraping APIs, regex patterns, and keyword matching for early breach detection.
npx claudepluginhub killvxk/cybersecurity-skills-zhThis skill uses the workspace's default tool permissions.
粘贴站点(Pastebin、GitHub Gists、Ghostbin、Dpaste、Hastebin)经常被用作泄露凭证(Credential)、数据库转储(Database Dump)、API 密钥和敏感数据在暗网论坛及 Telegram 频道广泛传播前的暂存区域。监控这些站点可实现早期泄露检测(Breach Detection),使组织能够在被盗数据被武器化之前及时响应。本技能涵盖使用 Pastebin Scraping API 构建自动化粘贴站点监控器、基于关键词的告警、凭证模式匹配,以及与事件响应(Incident Response)工作流的集成。
Monitors paste sites like Pastebin and GitHub Gists for leaked credentials, API keys, and sensitive data using Python scraping, regex patterns, and keyword matching for early breach detection in security assessments.
Monitors paste sites like Pastebin and GitHub Gists for leaked credentials, API keys, and data dumps via scraping, regex patterns, and keyword matching for early breach detection in security audits.
Monitors dark web forums, markets, paste sites, and ransomware leak sites for organization asset mentions, leaked credentials, threats, and actor communications. Use for OSINT, leak investigations, and threat intel enrichment.
Share bugs, ideas, or general feedback.
粘贴站点(Pastebin、GitHub Gists、Ghostbin、Dpaste、Hastebin)经常被用作泄露凭证(Credential)、数据库转储(Database Dump)、API 密钥和敏感数据在暗网论坛及 Telegram 频道广泛传播前的暂存区域。监控这些站点可实现早期泄露检测(Breach Detection),使组织能够在被盗数据被武器化之前及时响应。本技能涵盖使用 Pastebin Scraping API 构建自动化粘贴站点监控器、基于关键词的告警、凭证模式匹配,以及与事件响应(Incident Response)工作流的集成。
requests、beautifulsoup4、regex、pymisp 库每年有超过 300,000 个用户凭证发布在 Pastebin 上,平均每次泄露包含 1,000 个用户名/密码对。粘贴站点服务于三个主要的威胁情报目的:早期泄露检测(凭证出现在粘贴站点上早于暗网)、威胁行为者画像(攻击者使用粘贴站点进行 C2 配置、数据暂存、工具共享)和恶意软件发现(编码的有效载荷、配置文件、C2 地址)。
主动监控(Active Monitoring)定期查询粘贴站点 API 或抓取端点。Pastebin Scraping API 提供对新公开粘贴的实时访问。对于 GitHub,搜索 API 允许监控 Gists 和代码库提交中的暴露密钥。被动监控(Passive Monitoring)使用 IntelX、Dehashed 或 Have I Been Pwned 等聚合粘贴站点数据的服务。
有效的监控使用正则表达式(Regex)模式检测:电子邮件:密码组合、API 密钥(AWS、Azure、GCP、Stripe、Twilio)、数据库连接字符串(Connection String)、私钥(SSH、PGP)、JWT 令牌,以及内部主机名/URL。组织特定关键词(域名、产品名称、员工姓名)可降低误报率。
import requests
import re
import json
import time
from datetime import datetime
class PastebinMonitor:
SCRAPING_URL = "https://scrape.pastebin.com/api_scraping.php"
RAW_URL = "https://scrape.pastebin.com/api_scrape_item.php"
def __init__(self, keywords, output_dir="paste_alerts"):
self.keywords = [k.lower() for k in keywords]
self.output_dir = output_dir
self.seen_keys = set()
self.credential_patterns = {
"email_password": re.compile(
r'[\w.+-]+@[\w-]+\.[\w.]+[\s:;|,]+[\S]{6,}', re.IGNORECASE),
"aws_key": re.compile(
r'AKIA[0-9A-Z]{16}'),
"aws_secret": re.compile(
r'[0-9a-zA-Z/+=]{40}'),
"github_token": re.compile(
r'ghp_[0-9a-zA-Z]{36}'),
"slack_token": re.compile(
r'xox[baprs]-[0-9a-zA-Z-]+'),
"private_key": re.compile(
r'-----BEGIN (?:RSA |EC |DSA )?PRIVATE KEY-----'),
"jwt_token": re.compile(
r'eyJ[A-Za-z0-9-_]+\.eyJ[A-Za-z0-9-_]+\.[A-Za-z0-9-_]+'),
"connection_string": re.compile(
r'(?:mongodb|postgres|mysql|redis)://[^\s]+'),
"api_key_generic": re.compile(
r'(?:api[_-]?key|apikey|access[_-]?token)[\s]*[=:]\s*["\']?[\w-]{20,}',
re.IGNORECASE),
}
def fetch_recent_pastes(self, limit=100):
"""从 Pastebin Scraping API 获取最近的公开粘贴。"""
params = {"limit": limit}
try:
resp = requests.get(self.SCRAPING_URL, params=params, timeout=30)
if resp.status_code == 200:
pastes = resp.json()
print(f"[+] 已获取 {len(pastes)} 条最新粘贴")
return pastes
else:
print(f"[-] API 错误: {resp.status_code}")
return []
except Exception as e:
print(f"[-] 获取失败: {e}")
return []
def get_paste_content(self, paste_key):
"""获取粘贴的原始内容。"""
params = {"i": paste_key}
try:
resp = requests.get(self.RAW_URL, params=params, timeout=15)
if resp.status_code == 200:
return resp.text
return ""
except Exception:
return ""
def analyze_paste(self, content, paste_metadata):
"""分析粘贴内容中的凭证和关键词。"""
findings = {
"keyword_matches": [],
"credential_matches": {},
"severity": "low",
}
content_lower = content.lower()
# 检查关键词
for keyword in self.keywords:
if keyword in content_lower:
count = content_lower.count(keyword)
findings["keyword_matches"].append({
"keyword": keyword,
"count": count,
})
# 检查凭证模式
for pattern_name, pattern in self.credential_patterns.items():
matches = pattern.findall(content)
if matches:
findings["credential_matches"][pattern_name] = {
"count": len(matches),
"samples": matches[:3],
}
# 计算严重程度
cred_count = sum(
m["count"] for m in findings["credential_matches"].values()
)
if findings["keyword_matches"] and cred_count > 0:
findings["severity"] = "critical"
elif findings["keyword_matches"]:
findings["severity"] = "high"
elif cred_count > 10:
findings["severity"] = "high"
elif cred_count > 0:
findings["severity"] = "medium"
return findings
def monitor_loop(self, interval=120, iterations=None):
"""持续监控循环。"""
count = 0
while iterations is None or count < iterations:
pastes = self.fetch_recent_pastes()
alerts = []
for paste in pastes:
paste_key = paste.get("key", "")
if paste_key in self.seen_keys:
continue
self.seen_keys.add(paste_key)
content = self.get_paste_content(paste_key)
if not content:
continue
findings = self.analyze_paste(content, paste)
if findings["severity"] != "low":
alert = {
"paste_key": paste_key,
"title": paste.get("title", "Untitled"),
"user": paste.get("user", "Anonymous"),
"date": paste.get("date", ""),
"size": paste.get("size", 0),
"url": f"https://pastebin.com/{paste_key}",
"findings": findings,
"detected_at": datetime.now().isoformat(),
}
alerts.append(alert)
print(f" [告警-{findings['severity'].upper()}] "
f"{paste_key}: {findings['keyword_matches']}")
if alerts:
self._save_alerts(alerts)
count += 1
if iterations is None or count < iterations:
time.sleep(interval)
return alerts
def _save_alerts(self, alerts):
"""将告警保存到 JSON 文件。"""
filename = f"{self.output_dir}/alerts_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json"
import os
os.makedirs(self.output_dir, exist_ok=True)
with open(filename, "w") as f:
json.dump(alerts, f, indent=2)
print(f"[+] 已保存 {len(alerts)} 条告警至 {filename}")
monitor = PastebinMonitor(
keywords=["mycompany.com", "internal-project", "employee-name"],
)
alerts = monitor.monitor_loop(interval=120, iterations=5)
class GitHubSecretMonitor:
def __init__(self, github_token, org_keywords):
self.token = github_token
self.keywords = org_keywords
self.headers = {
"Authorization": f"token {github_token}",
"Accept": "application/vnd.github.v3+json",
}
def search_code(self, query, per_page=30):
"""搜索 GitHub 代码中的泄露密钥。"""
url = "https://api.github.com/search/code"
params = {"q": query, "per_page": per_page}
resp = requests.get(url, headers=self.headers, params=params)
if resp.status_code == 200:
results = resp.json().get("items", [])
print(f"[+] GitHub 代码搜索:'{query}' 找到 {len(results)} 条结果")
return results
return []
def search_gists(self, keyword):
"""搜索公开 Gists 中的敏感数据。"""
url = "https://api.github.com/gists/public"
params = {"per_page": 100}
resp = requests.get(url, headers=self.headers, params=params)
matches = []
if resp.status_code == 200:
gists = resp.json()
for gist in gists:
description = (gist.get("description") or "").lower()
files = gist.get("files", {})
for filename, file_info in files.items():
if keyword.lower() in description or keyword.lower() in filename.lower():
matches.append({
"gist_id": gist["id"],
"description": gist.get("description", ""),
"filename": filename,
"url": gist["html_url"],
"created_at": gist["created_at"],
})
return matches
def monitor_org_secrets(self, org_domain):
"""监控组织密钥在 GitHub 上的泄露情况。"""
queries = [
f'"{org_domain}" password',
f'"{org_domain}" api_key',
f'"{org_domain}" secret',
f'"{org_domain}" token',
f'"{org_domain}" credentials',
]
all_findings = []
for query in queries:
results = self.search_code(query)
for result in results:
all_findings.append({
"query": query,
"repo": result.get("repository", {}).get("full_name", ""),
"path": result.get("path", ""),
"url": result.get("html_url", ""),
"score": result.get("score", 0),
})
time.sleep(10) # GitHub 速率限制
return all_findings
gh_monitor = GitHubSecretMonitor("YOUR_GITHUB_TOKEN", ["mycompany.com"])
findings = gh_monitor.monitor_org_secrets("mycompany.com")
def generate_credential_leak_alert(alert_data):
"""为检测到的凭证泄露生成事件告警。"""
alert = {
"title": f"凭证泄露检测 - {alert_data.get('severity', 'unknown').upper()}",
"source": alert_data.get("url", ""),
"detected_at": alert_data.get("detected_at", ""),
"severity": alert_data.get("severity", "medium"),
"summary": f"发现包含组织关键词和凭证的粘贴内容",
"keyword_matches": alert_data.get("findings", {}).get("keyword_matches", []),
"credential_types": list(alert_data.get("findings", {}).get("credential_matches", {}).keys()),
"recommended_actions": [
"验证泄露凭证是否有效",
"强制受影响账户重置密码",
"轮换暴露的 API 密钥和令牌",
"检查访问日志是否有未授权使用记录",
"举报粘贴内容请求下线处理",
"如发现新模式,更新监控关键词",
],
}
return alert