Builds Python system using passive DNS, WHOIS, certificate transparency, and IP enrichment to map and monitor threat actor C2 networks. For threat intelligence and hunting.
npx claudepluginhub killvxk/cybersecurity-skills-zhThis skill uses the workspace's default tool permissions.
对手基础设施追踪利用被动 DNS 记录、证书透明度日志、WHOIS 注册数据和 IP 富化来发现、映射和监控威胁行为者的命令与控制(C2)网络。攻击者在不同攻击活动中频繁复用托管服务商、注册商、SSL 证书和命名模式,使分析人员能够从已知指标枢纽到发现新的基础设施。本技能涵盖构建识别基础设施关系的自动化追踪系统、检测与对手模式匹配的新注册域名,以及维护持续更新的威胁行为者网络地图。
Builds automated system to track adversary infrastructure using passive DNS, certificate transparency, WHOIS, and IP enrichment for mapping threat actor C2 networks.
Builds automated system to track adversary infrastructure using passive DNS, certificate transparency, WHOIS data, and IP enrichment to map threat actor C2 networks.
Tracks threat actor infrastructure including C2 servers, phishing domains, and staging servers using passive DNS, CT logs, Shodan/Censys scans, WHOIS analysis, and network fingerprints.
Share bugs, ideas, or general feedback.
对手基础设施追踪利用被动 DNS 记录、证书透明度日志、WHOIS 注册数据和 IP 富化来发现、映射和监控威胁行为者的命令与控制(C2)网络。攻击者在不同攻击活动中频繁复用托管服务商、注册商、SSL 证书和命名模式,使分析人员能够从已知指标枢纽到发现新的基础设施。本技能涵盖构建识别基础设施关系的自动化追踪系统、检测与对手模式匹配的新注册域名,以及维护持续更新的威胁行为者网络地图。
requests、dnspython、python-whois、shodan、networkx 库被动 DNS 捕获历史 DNS 解析数据,记录哪些域名解析到了哪些 IP 以及发生时间。与主动 DNS 查询不同,即使记录已更改,被动 DNS 也会保留历史关系,使分析人员能够追踪基础设施变化、识别共享托管模式,并发现历史上解析到相同 IP 的相关域名。
枢纽通过跟踪连接来识别相关基础设施:IP 枢纽(查找某 IP 上的所有域名)、域名枢纽(查找某域名解析过的所有 IP)、WHOIS 枢纽(查找同一注册人的域名)、证书枢纽(查找共享 SSL 证书的主机)以及 NS/MX 枢纽(查找使用相同域名服务器或邮件服务器的域名)。
威胁行为者表现出一定模式:偏好的注册商(Namecheap、REG.RU、Tucows)、偏好的托管(防弹托管服务商、云服务)、域名生成算法(DGA)、一致的命名模式,以及跨攻击活动复用证书。
import requests
import json
from collections import defaultdict
from datetime import datetime
class InfrastructureTracker:
def __init__(self, securitytrails_key=None, vt_key=None, shodan_key=None):
self.st_key = securitytrails_key
self.vt_key = vt_key
self.shodan_key = shodan_key
self.infrastructure_graph = defaultdict(lambda: {"nodes": set(), "edges": []})
def passive_dns_lookup(self, domain):
"""查询域名的被动 DNS 历史解析记录。"""
headers = {"apikey": self.st_key}
url = f"https://api.securitytrails.com/v1/history/{domain}/dns/a"
resp = requests.get(url, headers=headers, timeout=30)
if resp.status_code == 200:
records = resp.json().get("records", [])
history = []
for record in records:
for value in record.get("values", []):
history.append({
"domain": domain,
"ip": value.get("ip", ""),
"first_seen": record.get("first_seen", ""),
"last_seen": record.get("last_seen", ""),
"type": record.get("type", "a"),
})
print(f"[+] 被动 DNS for {domain}: {len(history)} 条记录")
return history
return []
def reverse_ip_lookup(self, ip_address):
"""查找托管在某 IP 上的所有域名。"""
headers = {"apikey": self.st_key}
url = f"https://api.securitytrails.com/v1/ips/nearby/{ip_address}"
resp = requests.get(url, headers=headers, timeout=30)
if resp.status_code == 200:
blocks = resp.json().get("blocks", [])
domains = []
for block in blocks:
for site in block.get("sites", []):
domains.append(site)
print(f"[+] 反向 IP 查询 {ip_address}: {len(domains)} 个域名")
return domains
return []
def whois_lookup(self, domain):
"""获取用于枢纽的 WHOIS 注册数据。"""
headers = {"apikey": self.st_key}
url = f"https://api.securitytrails.com/v1/domain/{domain}/whois"
resp = requests.get(url, headers=headers, timeout=30)
if resp.status_code == 200:
data = resp.json()
whois_data = {
"domain": domain,
"registrar": data.get("registrar", ""),
"registrant_org": data.get("registrant_org", ""),
"registrant_email": data.get("registrant_email", ""),
"name_servers": data.get("nameServers", []),
"created_date": data.get("createdDate", ""),
"updated_date": data.get("updatedDate", ""),
"expires_date": data.get("expiresDate", ""),
}
return whois_data
return {}
def pivot_from_seed(self, seed_indicator, indicator_type="domain", depth=2):
"""从种子指标递归枢纽以发现基础设施。"""
discovered = {"domains": set(), "ips": set(), "relationships": []}
if indicator_type == "domain":
discovered["domains"].add(seed_indicator)
# 获取域名的 IP
pdns = self.passive_dns_lookup(seed_indicator)
for record in pdns:
ip = record["ip"]
discovered["ips"].add(ip)
discovered["relationships"].append({
"source": seed_indicator, "target": ip,
"type": "resolves_to",
"first_seen": record["first_seen"],
"last_seen": record["last_seen"],
})
if depth > 1:
# 对已发现的 IP 进行反向查询
reverse_domains = self.reverse_ip_lookup(ip)
for rd in reverse_domains[:20]:
discovered["domains"].add(rd)
discovered["relationships"].append({
"source": rd, "target": ip,
"type": "hosted_on",
})
elif indicator_type == "ip":
discovered["ips"].add(seed_indicator)
domains = self.reverse_ip_lookup(seed_indicator)
for domain in domains[:20]:
discovered["domains"].add(domain)
discovered["relationships"].append({
"source": domain, "target": seed_indicator,
"type": "hosted_on",
})
print(f"[+] 从 {seed_indicator} 枢纽: "
f"{len(discovered['domains'])} 个域名, "
f"{len(discovered['ips'])} 个 IP, "
f"{len(discovered['relationships'])} 个关系")
return discovered
tracker = InfrastructureTracker(
securitytrails_key="YOUR_ST_KEY",
vt_key="YOUR_VT_KEY",
)
import networkx as nx
class InfrastructureGraph:
def __init__(self):
self.graph = nx.Graph()
def add_discovery(self, discovery_data):
"""将已发现的基础设施添加到图中。"""
for domain in discovery_data["domains"]:
self.graph.add_node(domain, type="domain")
for ip in discovery_data["ips"]:
self.graph.add_node(ip, type="ip")
for rel in discovery_data["relationships"]:
self.graph.add_edge(
rel["source"], rel["target"],
relationship=rel["type"],
first_seen=rel.get("first_seen", ""),
last_seen=rel.get("last_seen", ""),
)
def find_clusters(self):
"""识别基础设施集群。"""
components = list(nx.connected_components(self.graph))
clusters = []
for component in components:
domains = [n for n in component if self.graph.nodes[n].get("type") == "domain"]
ips = [n for n in component if self.graph.nodes[n].get("type") == "ip"]
clusters.append({
"size": len(component),
"domains": sorted(domains),
"ips": sorted(ips),
"domain_count": len(domains),
"ip_count": len(ips),
})
clusters.sort(key=lambda x: x["size"], reverse=True)
print(f"[+] 基础设施集群: {len(clusters)} 个")
return clusters
def find_hub_nodes(self, top_n=10):
"""查找高中心性节点(共享基础设施)。"""
centrality = nx.degree_centrality(self.graph)
top_nodes = sorted(centrality.items(), key=lambda x: x[1], reverse=True)[:top_n]
hubs = []
for node, score in top_nodes:
hubs.append({
"node": node,
"type": self.graph.nodes[node].get("type", "unknown"),
"centrality": round(score, 4),
"connections": self.graph.degree(node),
})
return hubs
def export_graph(self, output_file="infrastructure_graph.json"):
data = nx.node_link_data(self.graph)
with open(output_file, "w") as f:
json.dump(data, f, indent=2)
print(f"[+] 图已导出: {self.graph.number_of_nodes()} 个节点, "
f"{self.graph.number_of_edges()} 条边")
infra_graph = InfrastructureGraph()
discovery = tracker.pivot_from_seed("evil-domain.com", depth=2)
infra_graph.add_discovery(discovery)
clusters = infra_graph.find_clusters()
hubs = infra_graph.find_hub_nodes()
infra_graph.export_graph()
import time
class InfrastructureMonitor:
def __init__(self, tracker, known_indicators):
self.tracker = tracker
self.known = set(known_indicators)
self.alerts = []
def check_new_registrations(self, patterns):
"""检查是否有与对手模式匹配的新注册域名。"""
import re
new_domains = []
for pattern in patterns:
# 查询 SecurityTrails 查找匹配模式的新域名
headers = {"apikey": self.tracker.st_key}
url = "https://api.securitytrails.com/v1/domains/list"
params = {"include_ips": "true", "page": 1}
body = {"filter": {"keyword": pattern}}
resp = requests.post(url, headers=headers, json=body, timeout=30)
if resp.status_code == 200:
records = resp.json().get("records", [])
for record in records:
domain = record.get("hostname", "")
if domain not in self.known:
new_domains.append({
"domain": domain,
"pattern_matched": pattern,
"first_seen": datetime.now().isoformat(),
})
self.known.add(domain)
if new_domains:
print(f"[ALERT] {len(new_domains)} 个新域名匹配模式")
self.alerts.extend(new_domains)
return new_domains
def generate_infrastructure_report(self, clusters, hubs):
report = f"""# 对手基础设施追踪报告
生成时间: {datetime.now().isoformat()}
## 摘要
- 识别基础设施集群: {len(clusters)} 个
- 追踪域名总数: {sum(c['domain_count'] for c in clusters)}
- 追踪 IP 总数: {sum(c['ip_count'] for c in clusters)}
- 检测新域名: {len(self.alerts)} 个
## 顶级基础设施枢纽节点
| 节点 | 类型 | 连接数 | 中心性 |
|------|------|-------------|------------|
"""
for hub in hubs[:10]:
report += (f"| {hub['node']} | {hub['type']} "
f"| {hub['connections']} | {hub['centrality']} |\n")
report += "\n## 基础设施集群\n"
for i, cluster in enumerate(clusters[:5], 1):
report += f"\n### 集群 {i}({cluster['size']} 个节点)\n"
report += f"- 域名: {', '.join(cluster['domains'][:5])}\n"
report += f"- IP: {', '.join(cluster['ips'][:5])}\n"
with open("infrastructure_report.md", "w") as f:
f.write(report)
print("[+] 基础设施报告已保存")
monitor = InfrastructureMonitor(tracker, known_indicators=set())