Tracks threat actor infrastructure including C2 servers, phishing domains, and staging servers using passive DNS, CT logs, Shodan/Censys scans, WHOIS analysis, and network fingerprints.
npx claudepluginhub killvxk/cybersecurity-skills-zhThis skill uses the workspace's default tool permissions.
威胁行为者基础设施追踪涉及监控和映射对手控制的资产,包括命令与控制(C2)服务器、钓鱼(Phishing)域名、漏洞利用工具包宿主、防弹托管(Bulletproof Hosting)和暂存服务器。本技能涵盖使用被动 DNS(Passive DNS)、证书透明度(Certificate Transparency)日志、Shodan/Censys 扫描、WHOIS 分析和网络指纹技术,随时间推移发现、追踪和跨威胁行为者基础设施进行关联分析。
Tracks threat actor infrastructure including C2 servers and phishing domains using Shodan, Censys, passive DNS, WHOIS, and TLS fingerprinting. For cybersecurity threat intelligence workflows.
Tracks threat actor infrastructure including C2 servers, phishing domains via Shodan/Censys, passive DNS, WHOIS, certificate transparency, and network fingerprinting. For security ops and threat intel.
Builds Python system using passive DNS, WHOIS, certificate transparency, and IP enrichment to map and monitor threat actor C2 networks. For threat intelligence and hunting.
Share bugs, ideas, or general feedback.
威胁行为者基础设施追踪涉及监控和映射对手控制的资产,包括命令与控制(C2)服务器、钓鱼(Phishing)域名、漏洞利用工具包宿主、防弹托管(Bulletproof Hosting)和暂存服务器。本技能涵盖使用被动 DNS(Passive DNS)、证书透明度(Certificate Transparency)日志、Shodan/Censys 扫描、WHOIS 分析和网络指纹技术,随时间推移发现、追踪和跨威胁行为者基础设施进行关联分析。
shodan、censys、requests、stix2 库关联分析是利用一个已知指标发现相关基础设施的技术。从已知的 C2 IP 地址出发,分析师可通过以下方式进行关联:被动 DNS(发现域名)、反向 WHOIS(发现相关注册信息)、SSL 证书(发现共享证书)、SSH 密钥指纹、HTTP 响应指纹、JARM/JA3S 哈希,以及 WHOIS 注册人数据。
被动 DNS 数据库记录在递归解析器处观测到的 DNS 查询/响应数据。这允许分析师查找历史域名到 IP 的映射、发现托管在已知 C2 IP 上的域名,以及识别快速流量(Fast-Flux)或域名生成算法(DGA)行为。
证书透明度(CT)日志公开记录 CA 机构签发的所有 SSL/TLS 证书。监控 CT 日志可以发现为可疑域名注册的新证书,有助于在 C2 基础设施激活前识别钓鱼站点。
import shodan
api = shodan.Shodan("YOUR_SHODAN_API_KEY")
def discover_infrastructure(ip_address):
"""发现目标 IP 的服务和元数据。"""
try:
host = api.host(ip_address)
return {
"ip": host["ip_str"],
"org": host.get("org", ""),
"asn": host.get("asn", ""),
"isp": host.get("isp", ""),
"country": host.get("country_name", ""),
"city": host.get("city", ""),
"os": host.get("os"),
"ports": host.get("ports", []),
"vulns": host.get("vulns", []),
"hostnames": host.get("hostnames", []),
"domains": host.get("domains", []),
"tags": host.get("tags", []),
"services": [
{
"port": svc.get("port"),
"transport": svc.get("transport"),
"product": svc.get("product", ""),
"version": svc.get("version", ""),
"ssl_cert": svc.get("ssl", {}).get("cert", {}).get("subject", {}),
"jarm": svc.get("ssl", {}).get("jarm", ""),
}
for svc in host.get("data", [])
],
}
except shodan.APIError as e:
print(f"[-] Shodan 错误: {e}")
return None
def search_c2_framework(framework_name):
"""搜索 Shodan 中已知 C2 框架的特征。"""
c2_queries = {
"cobalt-strike": 'product:"Cobalt Strike Beacon"',
"metasploit": 'product:"Metasploit"',
"covenant": 'http.html:"Covenant" http.title:"Covenant"',
"sliver": 'ssl.cert.subject.cn:"multiplayer" ssl.cert.issuer.cn:"operators"',
"havoc": 'http.html_hash:-1472705893',
}
query = c2_queries.get(framework_name.lower(), framework_name)
results = api.search(query, limit=100)
hosts = []
for match in results.get("matches", []):
hosts.append({
"ip": match["ip_str"],
"port": match["port"],
"org": match.get("org", ""),
"country": match.get("location", {}).get("country_name", ""),
"asn": match.get("asn", ""),
"timestamp": match.get("timestamp", ""),
})
return hosts
import requests
def passive_dns_lookup(indicator, api_key, indicator_type="ip"):
"""通过 SecurityTrails 查询被动 DNS 记录。"""
base_url = "https://api.securitytrails.com/v1"
headers = {"APIKEY": api_key, "Accept": "application/json"}
if indicator_type == "ip":
url = f"{base_url}/search/list"
payload = {
"filter": {"ipv4": indicator}
}
resp = requests.post(url, json=payload, headers=headers, timeout=30)
else:
url = f"{base_url}/domain/{indicator}/subdomains"
resp = requests.get(url, headers=headers, timeout=30)
if resp.status_code == 200:
return resp.json()
return None
def query_passive_total(indicator, user, api_key):
"""通过 PassiveTotal 查询被动 DNS 和 WHOIS 数据。"""
base_url = "https://api.passivetotal.org/v2"
auth = (user, api_key)
# 被动 DNS 查询
pdns_resp = requests.get(
f"{base_url}/dns/passive",
params={"query": indicator},
auth=auth,
timeout=30,
)
# WHOIS 查询
whois_resp = requests.get(
f"{base_url}/whois",
params={"query": indicator},
auth=auth,
timeout=30,
)
results = {}
if pdns_resp.status_code == 200:
results["passive_dns"] = pdns_resp.json().get("results", [])
if whois_resp.status_code == 200:
results["whois"] = whois_resp.json()
return results
import requests
def search_ct_logs(domain):
"""通过 crt.sh 搜索证书透明度日志。"""
resp = requests.get(
f"https://crt.sh/?q=%.{domain}&output=json",
timeout=30,
)
if resp.status_code == 200:
certs = resp.json()
unique_domains = set()
cert_info = []
for cert in certs:
name_value = cert.get("name_value", "")
for name in name_value.split("\n"):
unique_domains.add(name.strip())
cert_info.append({
"id": cert.get("id"),
"issuer": cert.get("issuer_name", ""),
"common_name": cert.get("common_name", ""),
"name_value": name_value,
"not_before": cert.get("not_before", ""),
"not_after": cert.get("not_after", ""),
"serial_number": cert.get("serial_number", ""),
})
return {
"domain": domain,
"total_certificates": len(certs),
"unique_domains": sorted(unique_domains),
"certificates": cert_info[:50],
}
return None
def monitor_new_certs(domains, interval_hours=1):
"""监控一组域名新签发的证书。"""
from datetime import datetime, timedelta
cutoff = (datetime.utcnow() - timedelta(hours=interval_hours)).isoformat()
new_certs = []
for domain in domains:
result = search_ct_logs(domain)
if result:
for cert in result.get("certificates", []):
if cert.get("not_before", "") > cutoff:
new_certs.append({
"domain": domain,
"cert": cert,
})
return new_certs
from datetime import datetime
def build_infrastructure_timeline(indicators):
"""构建基础设施变化时间线。"""
timeline = []
for ind in indicators:
if "passive_dns" in ind:
for record in ind["passive_dns"]:
timeline.append({
"timestamp": record.get("firstSeen", ""),
"event": "dns_resolution",
"source": record.get("resolve", ""),
"target": record.get("value", ""),
"record_type": record.get("recordType", ""),
})
if "certificates" in ind:
for cert in ind["certificates"]:
timeline.append({
"timestamp": cert.get("not_before", ""),
"event": "certificate_issued",
"domain": cert.get("common_name", ""),
"issuer": cert.get("issuer", ""),
})
timeline.sort(key=lambda x: x.get("timestamp", ""))
return timeline