Detects and analyzes malware covert channels like DNS tunneling, ICMP exfiltration, HTTP steganography, and protocol abuse in PCAPs using Python/Scapy. For network forensics and C2 detection.
npx claudepluginhub killvxk/cybersecurity-skills-zhThis skill uses the workspace's default tool permissions.
恶意软件使用隐蔽信道将 C2 通信和数据泄露伪装成看似合法的网络流量。DNS 隧道将数据编码在 DNS 查询和响应中(iodine、dnscat2 等工具和 FrameworkPOS 等恶意软件家族均使用此技术)。ICMP 隧道将数据隐藏在回显请求/响应载荷中(icmpsh、ptunnel)。HTTP 隐蔽信道将 C2 数据嵌入头部、Cookie 或隐写图像中。协议滥用利用允许的协议绕过防火墙。现代基于机器学习的方法对 DNS 隧道检测可达 99% 以上的召回率,但低吞吐量泄露仍然具有挑战性。Palo Alto Unit42 在 2024 年跟踪了三个主要的 DNS 隧道活动(TrkCdn、SecShow、Savvy Seahorse),显示了该技术的持续普遍性。
Detects and analyzes malware covert channels including DNS tunneling, ICMP exfiltration, steganographic HTTP, and protocol abuse for C2 and data exfiltration. For incident response and threat hunting.
Detects and analyzes malware covert channels including DNS tunneling, ICMP exfiltration, steganographic HTTP, and protocol abuse in PCAPs using Python/Scapy for C2 and exfiltration.
Analyzes malware PCAPs from sandbox or incident response using Wireshark, tshark, Zeek, Suricata to detect C2 protocols, data exfiltration, DNS tunneling, payload downloads, and lateral movement.
Share bugs, ideas, or general feedback.
恶意软件使用隐蔽信道将 C2 通信和数据泄露伪装成看似合法的网络流量。DNS 隧道将数据编码在 DNS 查询和响应中(iodine、dnscat2 等工具和 FrameworkPOS 等恶意软件家族均使用此技术)。ICMP 隧道将数据隐藏在回显请求/响应载荷中(icmpsh、ptunnel)。HTTP 隐蔽信道将 C2 数据嵌入头部、Cookie 或隐写图像中。协议滥用利用允许的协议绕过防火墙。现代基于机器学习的方法对 DNS 隧道检测可达 99% 以上的召回率,但低吞吐量泄露仍然具有挑战性。Palo Alto Unit42 在 2024 年跟踪了三个主要的 DNS 隧道活动(TrkCdn、SecShow、Savvy Seahorse),显示了该技术的持续普遍性。
scapy、dpkt、dnslib#!/usr/bin/env python3
"""检测网络流量中的 DNS 隧道和隐蔽信道。"""
import sys
import json
import math
from collections import Counter, defaultdict
try:
from scapy.all import rdpcap, DNS, DNSQR, DNSRR, IP, ICMP
except ImportError:
print("pip install scapy")
sys.exit(1)
def entropy(data):
if not data:
return 0
freq = Counter(data)
length = len(data)
return -sum((c/length) * math.log2(c/length) for c in freq.values())
def analyze_dns_tunneling(pcap_path):
"""检测 PCAP 中的 DNS 隧道指标。"""
packets = rdpcap(pcap_path)
domain_stats = defaultdict(lambda: {
"queries": 0, "total_qname_len": 0, "subdomain_lengths": [],
"query_types": Counter(), "unique_subdomains": set(),
})
for pkt in packets:
if pkt.haslayer(DNS) and pkt.haslayer(DNSQR):
qname = pkt[DNSQR].qname.decode('utf-8', errors='replace').rstrip('.')
qtype = pkt[DNSQR].qtype
parts = qname.split('.')
if len(parts) >= 3:
base_domain = '.'.join(parts[-2:])
subdomain = '.'.join(parts[:-2])
stats = domain_stats[base_domain]
stats["queries"] += 1
stats["total_qname_len"] += len(qname)
stats["subdomain_lengths"].append(len(subdomain))
stats["query_types"][qtype] += 1
stats["unique_subdomains"].add(subdomain)
# 对域名进行隧道指标评分
suspicious = []
for domain, stats in domain_stats.items():
if stats["queries"] < 5:
continue
avg_subdomain_len = (sum(stats["subdomain_lengths"]) /
len(stats["subdomain_lengths"]))
unique_ratio = len(stats["unique_subdomains"]) / stats["queries"]
# 计算子域名熵
all_subdomains = ''.join(stats["unique_subdomains"])
sub_entropy = entropy(all_subdomains)
score = 0
reasons = []
if avg_subdomain_len > 30:
score += 30
reasons.append(f"子域名过长(平均 {avg_subdomain_len:.0f} 字符)")
if unique_ratio > 0.9:
score += 25
reasons.append(f"唯一性高({unique_ratio:.2%})")
if sub_entropy > 4.0:
score += 25
reasons.append(f"熵值高({sub_entropy:.2f})")
if stats["query_types"].get(16, 0) > 10: # TXT 记录
score += 20
reasons.append(f"大量 TXT 查询({stats['query_types'][16]} 次)")
if score >= 50:
suspicious.append({
"domain": domain,
"score": score,
"queries": stats["queries"],
"avg_subdomain_length": round(avg_subdomain_len, 1),
"unique_subdomains": len(stats["unique_subdomains"]),
"subdomain_entropy": round(sub_entropy, 2),
"reasons": reasons,
})
return sorted(suspicious, key=lambda x: -x["score"])
def analyze_icmp_tunneling(pcap_path):
"""检测 PCAP 中的 ICMP 隧道。"""
packets = rdpcap(pcap_path)
icmp_stats = defaultdict(lambda: {"count": 0, "payload_sizes": [], "payloads": []})
for pkt in packets:
if pkt.haslayer(ICMP) and pkt.haslayer(IP):
src = pkt[IP].src
dst = pkt[IP].dst
key = f"{src}->{dst}"
payload = bytes(pkt[ICMP].payload)
icmp_stats[key]["count"] += 1
icmp_stats[key]["payload_sizes"].append(len(payload))
if len(payload) > 64:
icmp_stats[key]["payloads"].append(payload[:100])
suspicious = []
for flow, stats in icmp_stats.items():
if stats["count"] < 5:
continue
avg_size = sum(stats["payload_sizes"]) / len(stats["payload_sizes"])
if avg_size > 64 or stats["count"] > 100:
suspicious.append({
"flow": flow,
"packets": stats["count"],
"avg_payload_size": round(avg_size, 1),
"reason": "大型/频繁的 ICMP 载荷表明存在隧道",
})
return suspicious
if __name__ == "__main__":
if len(sys.argv) < 2:
print(f"用法:{sys.argv[0]} <pcap_file>")
sys.exit(1)
print("[+] DNS 隧道分析")
dns_results = analyze_dns_tunneling(sys.argv[1])
for r in dns_results:
print(f" {r['domain']}(评分:{r['score']})")
for reason in r['reasons']:
print(f" - {reason}")
print("\n[+] ICMP 隧道分析")
icmp_results = analyze_icmp_tunneling(sys.argv[1])
for r in icmp_results:
print(f" {r['flow']}:{r['reason']}")