Analyzes PCAP/PCAPNG packet captures using Wireshark, tshark, tcpdump, and Python (scapy/pyshark) for forensics: reconstructs communications, extracts files, detects exfiltration and C2 traffic.
npx claudepluginhub killvxk/cybersecurity-skills-zhThis skill uses the workspace's default tool permissions.
网络数据包捕获(PCAP/PCAPNG 文件)是关于网络活动的终极真相来源,为主机间通信提供无可辩驳的证据。PCAP 文件记录了网络段上传输的每一个数据包,对于涉及数据渗出、命令与控制通信、横向移动、恶意软件投递和未授权访问的取证调查至关重要。Wireshark 是交互式分析的主要工具,而 tshark 提供命令行能力以支持自动化处理和脚本编写。现代 PCAPNG 格式支持附加元数据,包括接口描述、捕获注释、精确时间戳和每数据包注释。
Analyzes PCAP/PCAPNG captures using Wireshark, tshark, tcpdump to reconstruct flows, extract files, detect exfiltration, C2, and malicious traffic in forensics.
Analyzes PCAP files with Wireshark and tshark for network forensics: filters suspicious traffic like DNS/HTTP/C2, reconstructs events, extracts files/credentials, detects beacons.
Performs forensic analysis of PCAP/PCAPNG files using Wireshark, tshark, tcpdump, scapy to reconstruct communications, extract files, identify malicious traffic, and detect data exfiltration or C2 activity.
Share bugs, ideas, or general feedback.
网络数据包捕获(PCAP/PCAPNG 文件)是关于网络活动的终极真相来源,为主机间通信提供无可辩驳的证据。PCAP 文件记录了网络段上传输的每一个数据包,对于涉及数据渗出、命令与控制通信、横向移动、恶意软件投递和未授权访问的取证调查至关重要。Wireshark 是交互式分析的主要工具,而 tshark 提供命令行能力以支持自动化处理和脚本编写。现代 PCAPNG 格式支持附加元数据,包括接口描述、捕获注释、精确时间戳和每数据包注释。
# 在 eth0 接口上捕获所有流量
tcpdump -i eth0 -w capture.pcap
# 带轮转的捕获(100MB 文件,保留 10 个)
tcpdump -i eth0 -w capture_%Y%m%d_%H%M%S.pcap -C 100 -W 10
# 捕获特定主机的流量
tcpdump -i eth0 host 192.168.1.100 -w host_traffic.pcap
# 捕获特定端口的流量
tcpdump -i eth0 port 443 -w https_traffic.pcap
# 使用 BPF 过滤器捕获可疑端口流量
tcpdump -i eth0 'port 4444 or port 8080 or port 1337' -w suspicious.pcap
# HTTP 流量
http
# DNS 查询
dns
# SMB 文件传输
smb2
# 特定 IP 通信
ip.addr == 192.168.1.100
# 失败的 TCP 连接
tcp.flags.syn == 1 && tcp.flags.ack == 0
# 大数据传输(潜在的数据渗出)
tcp.len > 1000
# 按端口过滤特定协议
tcp.port == 4444
# TLS 握手(提取 SNI)
tls.handshake.type == 1
# HTTP POST 请求
http.request.method == "POST"
# 到可疑 TLD 的 DNS 查询
dns.qry.name contains ".xyz" or dns.qry.name contains ".top"
# 信标检测(定时间隔)
frame.time_delta_displayed > 55 && frame.time_delta_displayed < 65
# 从捕获文件中提取 HTTP URL
tshark -r capture.pcap -Y "http.request" -T fields -e http.host -e http.request.uri
# 提取 DNS 查询
tshark -r capture.pcap -Y "dns.flags.response == 0" -T fields -e dns.qry.name | sort -u
# 提取文件传输(HTTP 对象)
tshark -r capture.pcap --export-objects http,exported_files/
# 提取 SMB 文件传输
tshark -r capture.pcap --export-objects smb,smb_files/
# 协议层次统计
tshark -r capture.pcap -z io,phs
# 会话统计
tshark -r capture.pcap -z conv,tcp
# 提取 TLS SNI(服务器名称指示)
tshark -r capture.pcap -Y "tls.handshake.type == 1" -T fields -e tls.handshake.extensions_server_name
# 按字节数列出最活跃主机
tshark -r capture.pcap -z endpoints,ip -q
# 提取凭据(FTP、HTTP Basic)
tshark -r capture.pcap -Y "ftp.request.command == USER || ftp.request.command == PASS || http.authorization" -T fields -e ftp.request.arg -e http.authorization
from scapy.all import rdpcap, IP, TCP, UDP, DNS, DNSQR, Raw
import os
import sys
import json
from collections import defaultdict, Counter
from datetime import datetime
class PCAPForensicAnalyzer:
"""使用 Scapy 对 PCAP 文件进行取证分析。"""
def __init__(self, pcap_path: str, output_dir: str):
self.pcap_path = pcap_path
self.output_dir = output_dir
os.makedirs(output_dir, exist_ok=True)
self.packets = rdpcap(pcap_path)
def get_conversations(self) -> list:
"""提取包含字节计数的唯一 IP 会话。"""
convos = defaultdict(lambda: {"packets": 0, "bytes": 0})
for pkt in self.packets:
if IP in pkt:
key = tuple(sorted([pkt[IP].src, pkt[IP].dst]))
convos[key]["packets"] += 1
convos[key]["bytes"] += len(pkt)
return [
{"src": k[0], "dst": k[1], "packets": v["packets"], "bytes": v["bytes"]}
for k, v in sorted(convos.items(), key=lambda x: x[1]["bytes"], reverse=True)
]
def extract_dns_queries(self) -> list:
"""从捕获文件中提取所有 DNS 查询。"""
queries = []
for pkt in self.packets:
if DNS in pkt and pkt[DNS].qr == 0 and DNSQR in pkt:
queries.append({
"query": pkt[DNSQR].qname.decode(errors="replace").rstrip("."),
"type": pkt[DNSQR].qtype,
"src": pkt[IP].src if IP in pkt else "unknown"
})
return queries
def detect_beaconing(self, threshold_seconds: float = 5.0) -> list:
"""基于定时间隔检测潜在的信标行为。"""
ip_timestamps = defaultdict(list)
for pkt in self.packets:
if IP in pkt and TCP in pkt:
key = (pkt[IP].src, pkt[IP].dst, pkt[TCP].dport)
ip_timestamps[key].append(float(pkt.time))
beacons = []
for key, times in ip_timestamps.items():
if len(times) < 5:
continue
deltas = [times[i+1] - times[i] for i in range(len(times)-1)]
if deltas:
avg_delta = sum(deltas) / len(deltas)
variance = sum((d - avg_delta) ** 2 for d in deltas) / len(deltas)
if variance < threshold_seconds and avg_delta > 1:
beacons.append({
"src": key[0], "dst": key[1], "port": key[2],
"avg_interval": round(avg_delta, 2),
"variance": round(variance, 4),
"connection_count": len(times)
})
return sorted(beacons, key=lambda x: x["variance"])
def get_protocol_distribution(self) -> dict:
"""获取协议分布统计。"""
protocols = Counter()
for pkt in self.packets:
if TCP in pkt:
protocols[f"TCP/{pkt[TCP].dport}"] += 1
elif UDP in pkt:
protocols[f"UDP/{pkt[UDP].dport}"] += 1
return dict(protocols.most_common(50))
def generate_report(self) -> str:
"""生成综合 PCAP 分析报告。"""
report = {
"analysis_timestamp": datetime.now().isoformat(),
"pcap_file": self.pcap_path,
"total_packets": len(self.packets),
"conversations": self.get_conversations()[:50],
"dns_queries": self.extract_dns_queries()[:200],
"potential_beacons": self.detect_beaconing(),
"protocol_distribution": self.get_protocol_distribution()
}
report_path = os.path.join(self.output_dir, "pcap_forensic_report.json")
with open(report_path, "w") as f:
json.dump(report, f, indent=2)
print(f"[*] 数据包总数:{report['total_packets']}")
print(f"[*] 会话数:{len(report['conversations'])}")
print(f"[*] DNS 查询数:{len(report['dns_queries'])}")
print(f"[*] 潜在信标数:{len(report['potential_beacons'])}")
return report_path
def main():
if len(sys.argv) < 3:
print("Usage: python process.py <pcap_file> <output_dir>")
sys.exit(1)
analyzer = PCAPForensicAnalyzer(sys.argv[1], sys.argv[2])
analyzer.generate_report()
if __name__ == "__main__":
main()