Detects DNS tunnel data exfiltration via passive DNS monitoring, analyzing query entropy, subdomain lengths, query volumes, TXT record abuse, and response payload sizes.
npx claudepluginhub killvxk/cybersecurity-skills-zhThis skill uses the workspace's default tool permissions.
DNS 渗出(DNS exfiltration)将域名系统(Domain Name System)作为隐蔽通道,从被入侵的网络中提取数据。攻击者将窃取的数据编码到 DNS 查询名称(子域名)或 DNS 响应记录(TXT、CNAME、NULL)中,绕过通常对 DNS 流量放行的传统安全控制。iodine、dnscat2 和 dns2tcp 等工具可实现通过 DNS 的完整 TCP 隧道。检测需要分析 DNS 查询模式的异常,包括:过长的查询长度、高熵值子域名字符串、对单一域名的异常查询量以及超大 TXT 记录响应。本技能介绍如何使用被动 DNS(passive DNS)分析、统计方法和机器学习方法构建全面的 DNS 渗出检测能力。
Analyzes DNS query logs using entropy analysis, query volume anomalies, and subdomain length detection to identify DNS tunneling exfiltration, DGA domains, and covert C2 channels in SIEM like Splunk. For SOC teams hunting DNS threats.
Detects DNS tunneling data exfiltration by analyzing query entropy, subdomain length, volume, TXT record abuse, and response sizes via passive DNS monitoring. For security incident response and threat hunting.
Analyzes DNS query logs in SIEM like Splunk to detect exfiltration via tunneling, DGA domains, and C2 using entropy, query volume anomalies, and subdomain lengths.
Share bugs, ideas, or general feedback.
DNS 渗出(DNS exfiltration)将域名系统(Domain Name System)作为隐蔽通道,从被入侵的网络中提取数据。攻击者将窃取的数据编码到 DNS 查询名称(子域名)或 DNS 响应记录(TXT、CNAME、NULL)中,绕过通常对 DNS 流量放行的传统安全控制。iodine、dnscat2 和 dns2tcp 等工具可实现通过 DNS 的完整 TCP 隧道。检测需要分析 DNS 查询模式的异常,包括:过长的查询长度、高熵值子域名字符串、对单一域名的异常查询量以及超大 TXT 记录响应。本技能介绍如何使用被动 DNS(passive DNS)分析、统计方法和机器学习方法构建全面的 DNS 渗出检测能力。
DNS 渗出在 DNS 消息的不同部分对数据进行编码:
出站(基于查询的渗出):
将编码数据作为子域名标签:
dGhlIHNlY3JldCBkYXRh.exfil.attacker.com
[base64 编码数据].[隧道域名]
使用的查询类型:A、AAAA、CNAME、MX、TXT、NULL
入站(基于响应的命令通道):
TXT 记录在响应中携带编码的命令/数据
CNAME 记录通过多个标签链式传递编码数据
NULL 记录携带任意二进制数据
| 指标 | 正常 DNS | DNS 隧道 |
|---|---|---|
| 子域名长度 | 5-20 字符 | 40-253 字符 |
| 标签数量 | 2-4 个 | 5-10+ 个 |
| 香农熵(Shannon entropy) | 2.5-3.5 bits | 4.0-5.5 bits |
| 查询量(每域名) | 可变 | 每分钟数百至数千 |
| TXT 响应大小 | < 100 字节 | 200-4000+ 字节 |
| 唯一子域名 | 少 | 极多 |
| 查询类型分布 | 以 A/AAAA 为主 | 大量 TXT、NULL、CNAME |
| 工具 | 协议 | 编码方式 | 检测难度 |
|---|---|---|---|
| iodine | IP-over-DNS | Base32/Base64/Raw | 中 |
| dnscat2 | TCP-over-DNS | 十六进制编码 | 中 |
| dns2tcp | TCP-over-DNS | Base64 | 中 |
| DNSExfiltrator | 自定义 | Base64 | 低 |
| Cobalt Strike DNS | C2 over DNS | 自定义编码 | 高 |
使用 Zeek:
# 实时捕获
zeek -i eth0 -C base/protocols/dns
# 离线 PCAP 分析
zeek -r traffic.pcap base/protocols/dns
# 输出:包含 query、qtype、answers、TTL 的 dns.log
使用 tcpdump:
# 捕获所有 DNS 流量
tcpdump -i eth0 -w dns_capture.pcap port 53
# 按大小过滤(大型 DNS 数据包)
tcpdump -i eth0 -w large_dns.pcap 'port 53 and greater 512'
使用 Suricata:
# 在 suricata.yaml 中启用 DNS 日志
outputs:
- eve-log:
types:
- dns:
query: yes
answer: yes
formats: [detailed]
用于 DNS 渗出检测的 Python 脚本:
#!/usr/bin/env python3
"""DNS 渗出检测器——分析 DNS 日志中的隧道指标。"""
import json
import math
import re
import sys
from collections import defaultdict
from datetime import datetime, timedelta
import pandas as pd
def calculate_entropy(domain: str) -> float:
"""计算字符串的香农熵。"""
if not domain:
return 0.0
freq = defaultdict(int)
for char in domain:
freq[char] += 1
length = len(domain)
entropy = -sum(
(count / length) * math.log2(count / length)
for count in freq.values()
)
return entropy
def extract_subdomain(query: str) -> str:
"""从 FQDN 中提取子域名部分。"""
parts = query.rstrip('.').split('.')
if len(parts) > 2:
return '.'.join(parts[:-2])
return ''
def get_base_domain(query: str) -> str:
"""从 FQDN 中提取注册域名。"""
parts = query.rstrip('.').split('.')
if len(parts) >= 2:
return '.'.join(parts[-2:])
return query
def is_base64_like(s: str) -> bool:
"""检查字符串是否类似 base64 编码。"""
b64_chars = set('ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/=')
if len(s) < 10:
return False
char_ratio = sum(1 for c in s if c in b64_chars) / len(s)
return char_ratio > 0.9 and calculate_entropy(s) > 4.0
def is_hex_encoded(s: str) -> bool:
"""检查字符串是否为十六进制编码。"""
hex_chars = set('0123456789abcdefABCDEF')
if len(s) < 16:
return False
clean = s.replace('.', '').replace('-', '')
return all(c in hex_chars for c in clean) and len(clean) % 2 == 0
class DNSExfiltrationDetector:
def __init__(self):
self.domain_stats = defaultdict(lambda: {
'query_count': 0,
'unique_subdomains': set(),
'total_subdomain_length': 0,
'entropy_sum': 0.0,
'query_types': defaultdict(int),
'source_ips': set(),
'first_seen': None,
'last_seen': None,
'txt_response_sizes': [],
})
# 检测阈值
self.thresholds = {
'min_query_count': 50,
'min_unique_subdomains': 30,
'avg_subdomain_length': 30,
'avg_entropy': 3.8,
'unique_ratio': 0.7,
'txt_query_ratio': 0.3,
'max_label_length': 63,
'max_subdomain_labels': 5,
}
def process_query(self, timestamp, src_ip, query, qtype, response_size=0):
"""处理单条 DNS 查询并更新统计信息。"""
base_domain = get_base_domain(query)
subdomain = extract_subdomain(query)
stats = self.domain_stats[base_domain]
stats['query_count'] += 1
stats['unique_subdomains'].add(subdomain)
stats['total_subdomain_length'] += len(subdomain)
stats['entropy_sum'] += calculate_entropy(subdomain)
stats['query_types'][qtype] += 1
stats['source_ips'].add(src_ip)
if stats['first_seen'] is None:
stats['first_seen'] = timestamp
stats['last_seen'] = timestamp
if qtype in ('TXT', 'NULL') and response_size > 0:
stats['txt_response_sizes'].append(response_size)
def analyze(self):
"""分析累积的统计信息,返回可疑域名。"""
alerts = []
for domain, stats in self.domain_stats.items():
if stats['query_count'] < self.thresholds['min_query_count']:
continue
unique_count = len(stats['unique_subdomains'])
avg_length = stats['total_subdomain_length'] / stats['query_count']
avg_entropy = stats['entropy_sum'] / stats['query_count']
unique_ratio = unique_count / stats['query_count']
txt_queries = stats['query_types'].get('TXT', 0) + stats['query_types'].get('NULL', 0)
txt_ratio = txt_queries / stats['query_count']
score = 0
indicators = []
if avg_length > self.thresholds['avg_subdomain_length']:
score += 25
indicators.append(f"high_avg_subdomain_length={avg_length:.1f}")
if avg_entropy > self.thresholds['avg_entropy']:
score += 25
indicators.append(f"high_entropy={avg_entropy:.2f}")
if unique_ratio > self.thresholds['unique_ratio']:
score += 20
indicators.append(f"high_unique_ratio={unique_ratio:.2f}")
if txt_ratio > self.thresholds['txt_query_ratio']:
score += 15
indicators.append(f"high_txt_ratio={txt_ratio:.2f}")
if unique_count > self.thresholds['min_unique_subdomains']:
score += 15
indicators.append(f"unique_subdomains={unique_count}")
# 检查编码模式
encoded_count = sum(
1 for sd in list(stats['unique_subdomains'])[:100]
if is_base64_like(sd) or is_hex_encoded(sd)
)
if encoded_count > 20:
score += 20
indicators.append(f"encoded_subdomains={encoded_count}")
if score >= 50:
duration = (stats['last_seen'] - stats['first_seen']).total_seconds() if stats['first_seen'] and stats['last_seen'] else 0
alerts.append({
'domain': domain,
'score': min(score, 100),
'query_count': stats['query_count'],
'unique_subdomains': unique_count,
'avg_subdomain_length': round(avg_length, 1),
'avg_entropy': round(avg_entropy, 2),
'unique_ratio': round(unique_ratio, 2),
'txt_ratio': round(txt_ratio, 2),
'source_ips': list(stats['source_ips']),
'duration_seconds': duration,
'indicators': indicators,
})
return sorted(alerts, key=lambda x: x['score'], reverse=True)
def process_zeek_dns_log(self, log_path):
"""处理 Zeek dns.log 文件。"""
with open(log_path, 'r') as f:
for line in f:
if line.startswith('#'):
continue
fields = line.strip().split('\t')
if len(fields) < 22:
continue
try:
ts = datetime.fromtimestamp(float(fields[0]))
src_ip = fields[2]
query = fields[9]
qtype = fields[11]
self.process_query(ts, src_ip, query, qtype)
except (ValueError, IndexError):
continue
def process_eve_json(self, log_path):
"""处理 Suricata EVE JSON DNS 日志。"""
with open(log_path, 'r') as f:
for line in f:
try:
event = json.loads(line)
if event.get('event_type') != 'dns':
continue
dns = event.get('dns', {})
ts = datetime.fromisoformat(event['timestamp'].replace('Z', '+00:00'))
src_ip = event.get('src_ip', '')
query = dns.get('rrname', '')
qtype = dns.get('rrtype', '')
self.process_query(ts, src_ip, query, qtype)
except (json.JSONDecodeError, KeyError, ValueError):
continue
def main():
detector = DNSExfiltrationDetector()
log_file = sys.argv[1] if len(sys.argv) > 1 else '/opt/zeek/logs/current/dns.log'
if log_file.endswith('.json'):
detector.process_eve_json(log_file)
else:
detector.process_zeek_dns_log(log_file)
alerts = detector.analyze()
if alerts:
print(f"\n{'='*80}")
print(f"DNS 渗出检测结果——发现 {len(alerts)} 个可疑域名")
print(f"{'='*80}\n")
for alert in alerts:
severity = "严重" if alert['score'] >= 80 else "高" if alert['score'] >= 60 else "中"
print(f"[{severity}] 域名:{alert['domain']}")
print(f" 得分:{alert['score']}/100")
print(f" 查询数:{alert['query_count']},唯一子域名:{alert['unique_subdomains']}")
print(f" 平均子域名长度:{alert['avg_subdomain_length']},平均熵值:{alert['avg_entropy']}")
print(f" 源 IP:{', '.join(alert['source_ips'][:5])}")
print(f" 指标:{', '.join(alert['indicators'])}")
print()
else:
print("未检测到 DNS 渗出指标。")
if __name__ == '__main__':
main()
# 检测过长 DNS 查询(潜在隧道)
alert dns $HOME_NET any -> any 53 (msg:"DNS Exfiltration - Excessive query length"; dns.query; content:"."; pcre:"/^.{60,}/"; threshold:type both,track by_src,count 20,seconds 60; classtype:bad-unknown; sid:3000001; rev:1;)
# 检测高熵值 DNS 子域名
alert dns $HOME_NET any -> any 53 (msg:"DNS Exfiltration - High entropy subdomain"; dns.query; pcre:"/^[a-zA-Z0-9+\/=]{30,}\./"; threshold:type both,track by_src,count 10,seconds 60; classtype:bad-unknown; sid:3000002; rev:1;)
# 检测大型 TXT 记录响应
alert dns any 53 -> $HOME_NET any (msg:"DNS Exfiltration - Large TXT response"; content:"|00 10|"; byte_test:2,>,400,0,relative; classtype:bad-unknown; sid:3000003; rev:1;)
# 检测 NULL 记录查询(iodine 指标)
alert dns $HOME_NET any -> any 53 (msg:"DNS Exfiltration - NULL record query (iodine indicator)"; content:"|00 0a|"; classtype:bad-unknown; sid:3000004; rev:1;)
# 检测 dnscat2 流量模式
alert dns $HOME_NET any -> any 53 (msg:"DNS Exfiltration - dnscat2 indicator"; dns.query; content:"dnscat"; nocase; classtype:trojan-activity; sid:3000005; rev:1;)
Splunk SPL 查询用于 DNS 渗出检测:
index=dns sourcetype=zeek:dns
| eval subdomain=mvindex(split(query,"."),0)
| eval subdomain_len=len(subdomain)
| eval label_count=mvcount(split(query,"."))
| stats count as query_count,
dc(subdomain) as unique_subdomains,
avg(subdomain_len) as avg_sub_len,
values(src_ip) as source_ips
by query_domain
| where query_count > 100 AND avg_sub_len > 30 AND unique_subdomains > 50
| eval risk_score = case(
avg_sub_len > 50 AND unique_subdomains > 200, "Critical",
avg_sub_len > 40 AND unique_subdomains > 100, "High",
avg_sub_len > 30 AND unique_subdomains > 50, "Medium",
true(), "Low")
| sort -query_count
| table query_domain risk_score query_count unique_subdomains avg_sub_len source_ips