Detects ARP poisoning (spoofing) attacks in network traffic using ARPWatch, Dynamic ARP Inspection on switches, Wireshark analysis, and custom Python scripts to prevent MITM intercepts.
npx claudepluginhub killvxk/cybersecurity-skills-zhThis skill uses the workspace's default tool permissions.
ARP 投毒(ARP poisoning,又称 ARP 欺骗)是一种二层(Layer 2)攻击,攻击者发送伪造的 ARP 消息,将其 MAC 地址与合法主机的 IP 地址关联,从而实现中间人(MITM)拦截、会话劫持或拒绝服务攻击。由于 ARP 没有内置身份验证机制,广播域上的任何设备都可以伪造 ARP 应答。检测需要监控 ARP 流量中的异常,如免费 ARP(gratuitous ARP)洪水、IP 到 MAC 映射变化和重复 IP 地址。本技能涵盖部署多层检测,包括 ARPWatch、动态 ARP 检测(DAI)、基于 Wireshark 的分析和自定义 Python 监控工具。
Detects ARP poisoning (spoofing) attacks in network traffic using ARPWatch, Dynamic ARP Inspection, Wireshark analysis, and custom Python scripts for MitM protection.
Detects ARP spoofing attacks using ARPWatch, Dynamic ARP Inspection, Wireshark analysis, and Python scripts to prevent man-in-the-middle interception in networks.
Simulates ARP spoofing attacks using arpspoof, Ettercap, and Scapy in authorized lab or pentest environments to demo MITM risks, test detection, and validate countermeasures.
Share bugs, ideas, or general feedback.
ARP 投毒(ARP poisoning,又称 ARP 欺骗)是一种二层(Layer 2)攻击,攻击者发送伪造的 ARP 消息,将其 MAC 地址与合法主机的 IP 地址关联,从而实现中间人(MITM)拦截、会话劫持或拒绝服务攻击。由于 ARP 没有内置身份验证机制,广播域上的任何设备都可以伪造 ARP 应答。检测需要监控 ARP 流量中的异常,如免费 ARP(gratuitous ARP)洪水、IP 到 MAC 映射变化和重复 IP 地址。本技能涵盖部署多层检测,包括 ARPWatch、动态 ARP 检测(DAI)、基于 Wireshark 的分析和自定义 Python 监控工具。
ARP 在本地网段将 IP 地址映射到 MAC 地址。该协议以无状态方式运行,没有身份验证:
正常 ARP 过程:
1. 主机 A 广播:"谁有 10.0.1.1?请告诉 10.0.1.100"
2. 路由器回复:"10.0.1.1 在 AA:BB:CC:DD:EE:01"
3. 主机 A 缓存该映射
ARP 投毒攻击:
1. 攻击者向主机 A 发送未请求的 ARP 应答:
"10.0.1.1 在 EV:IL:MA:CA:DD:RR"(攻击者的 MAC)
2. 主机 A 更新缓存,将流量发送给攻击者
3. 攻击者转发给真实网关(中间人位置)
| 指标 | 描述 | 严重性 |
|---|---|---|
| MAC 地址反复跳变 | 同一 IP 快速映射到不同 MAC | 高 |
| 免费 ARP 洪水 | 针对多台主机的未请求 ARP 应答 | 高 |
| 重复 IP 地址 | 两个不同 MAC 声称相同 IP | 严重 |
| 异常 ARP 数量 | ARP 每秒数据包数峰值 | 中 |
| 来自非 DHCP 源的 ARP | 来自未知设备的静态 IP 声明 | 中 |
| 网关 MAC 变更 | 默认网关 MAC 地址更改 | 严重 |
# 安装 ARPWatch
sudo apt-get install -y arpwatch
# 配置 ARPWatch
sudo vi /etc/default/arpwatch
# INTERFACES="eth0"
# ARGS="-N -p -i eth0 -f /var/lib/arpwatch/arp.dat"
# 启动监控
sudo systemctl enable arpwatch
sudo systemctl start arpwatch
# 查看当前 ARP 数据库
cat /var/lib/arpwatch/arp.dat
# 监控日志中的变化
tail -f /var/log/syslog | grep arpwatch
ARPWatch 告警类型:
Cisco Catalyst 配置:
! 启用 DHCP 嗅探(DAI 的前置条件)
ip dhcp snooping
ip dhcp snooping vlan 10,20,30
! 配置可信端口(上行链路、DHCP 服务器)
interface GigabitEthernet1/0/1
description Uplink to Distribution
ip dhcp snooping trust
interface GigabitEthernet1/0/48
description DHCP Server
ip dhcp snooping trust
! 启用动态 ARP 检测
ip arp inspection vlan 10,20,30
! 配置 DAI 的可信端口
interface GigabitEthernet1/0/1
ip arp inspection trust
! 设置速率限制以防止 ARP 洪水 DoS
interface range GigabitEthernet1/0/2-47
ip arp inspection limit rate 15
! 启用额外的验证检查
ip arp inspection validate src-mac dst-mac ip
! 为静态 IP 设备配置 ARP ACL(服务器、打印机)
arp access-list STATIC-ARP-ENTRIES
permit ip host 10.0.10.100 mac host 0011.2233.4455
permit ip host 10.0.10.101 mac host 0011.2233.4456
ip arp inspection filter STATIC-ARP-ENTRIES vlan 10
! 验证 DAI 状态
show ip arp inspection vlan 10
show ip arp inspection statistics
show ip dhcp snooping binding
# 检测免费 ARP(发送方和目标 IP 相同)
arp.src.proto_ipv4 == arp.dst.proto_ipv4
# 检测 ARP 应答(关注未请求的)
arp.opcode == 2
# 检测重复 IP 地址声明
arp.duplicate-address-detected
# 检测来自特定攻击者 MAC 的 ARP 数据包
eth.src == ev:il:ma:ca:dd:rr
# 检测 ARP 风暴(高流量)
# 使用 Statistics > I/O Graphs > 显示过滤器:arp
# 检测网关冒充
arp.src.proto_ipv4 == 10.0.1.1 && arp.src.hw_mac != aa:bb:cc:dd:ee:01
#!/usr/bin/env python3
"""
使用数据包捕获进行实时 ARP 投毒检测。
监控 ARP 流量中的欺骗指标并在异常时发出告警。
"""
import subprocess
import sys
import json
import time
from collections import defaultdict
from datetime import datetime
try:
from scapy.all import sniff, ARP, Ether, get_if_hwaddr, conf
SCAPY_AVAILABLE = True
except ImportError:
SCAPY_AVAILABLE = False
class ARPPoisonDetector:
def __init__(self, interface: str, gateway_ip: str, gateway_mac: str):
self.interface = interface
self.gateway_ip = gateway_ip
self.gateway_mac = gateway_mac.lower()
self.arp_table = {} # IP -> MAC 映射
self.arp_history = defaultdict(list) # IP -> (MAC, 时间戳) 列表
self.alerts = []
self.arp_count = defaultdict(int) # 源 MAC -> 每间隔计数
self.last_reset = time.time()
self.arp_rate_threshold = 50 # 每 10 秒的 ARP 数据包数
def alert(self, severity: str, message: str, details: dict):
"""为检测到的异常生成告警。"""
alert_data = {
'timestamp': datetime.now().isoformat(),
'severity': severity,
'message': message,
'details': details,
}
self.alerts.append(alert_data)
print(f"\n[{severity}] {datetime.now().strftime('%H:%M:%S')} - {message}")
for key, value in details.items():
print(f" {key}: {value}")
def check_gateway_spoofing(self, src_ip: str, src_mac: str):
"""检查是否有人在伪造网关。"""
if src_ip == self.gateway_ip and src_mac != self.gateway_mac:
self.alert('CRITICAL', '检测到网关 ARP 欺骗', {
'gateway_ip': self.gateway_ip,
'expected_mac': self.gateway_mac,
'spoofed_mac': src_mac,
'action': '默认网关上的潜在中间人攻击',
})
return True
return False
def check_mac_change(self, src_ip: str, src_mac: str):
"""检查 IP 到 MAC 的映射是否已更改。"""
if src_ip in self.arp_table:
known_mac = self.arp_table[src_ip]
if known_mac != src_mac:
self.alert('HIGH', 'ARP 缓存投毒尝试', {
'ip_address': src_ip,
'previous_mac': known_mac,
'new_mac': src_mac,
'action': 'IP 到 MAC 映射意外更改',
})
return True
return False
def check_flip_flop(self, src_ip: str, src_mac: str):
"""检查 MAC 地址反复跳变(主动攻击指标)。"""
self.arp_history[src_ip].append((src_mac, time.time()))
# 仅保留最近 60 秒的历史
cutoff = time.time() - 60
self.arp_history[src_ip] = [
(mac, ts) for mac, ts in self.arp_history[src_ip]
if ts > cutoff
]
unique_macs = set(mac for mac, ts in self.arp_history[src_ip])
if len(unique_macs) > 2:
self.alert('CRITICAL', '检测到 ARP 地址跳变(主动攻击)', {
'ip_address': src_ip,
'mac_addresses': list(unique_macs),
'changes_in_60s': len(self.arp_history[src_ip]),
})
return True
return False
def check_arp_rate(self, src_mac: str):
"""检查 ARP 洪水(DoS 或扫描)。"""
self.arp_count[src_mac] += 1
# 每 10 秒重置计数器
if time.time() - self.last_reset > 10:
for mac, count in self.arp_count.items():
if count > self.arp_rate_threshold:
self.alert('MEDIUM', '检测到 ARP 洪水', {
'source_mac': mac,
'arp_packets_10s': count,
'threshold': self.arp_rate_threshold,
})
self.arp_count.clear()
self.last_reset = time.time()
def process_packet(self, packet):
"""处理捕获的 ARP 数据包。"""
if not packet.haslayer(ARP):
return
arp = packet[ARP]
# 仅处理 ARP 应答(操作码 2)和请求(操作码 1)
if arp.op not in (1, 2):
return
src_ip = arp.psrc
src_mac = arp.hwsrc.lower()
# 运行检测检查
self.check_gateway_spoofing(src_ip, src_mac)
self.check_mac_change(src_ip, src_mac)
self.check_flip_flop(src_ip, src_mac)
self.check_arp_rate(src_mac)
# 更新 ARP 表
self.arp_table[src_ip] = src_mac
def start_monitoring(self):
"""启动实时 ARP 监控。"""
print(f"[*] 在 {self.interface} 上启动 ARP 投毒检测")
print(f"[*] 网关:{self.gateway_ip}({self.gateway_mac})")
print(f"[*] 监控中...(Ctrl+C 停止)\n")
if SCAPY_AVAILABLE:
sniff(
iface=self.interface,
filter="arp",
prn=self.process_packet,
store=False,
)
else:
print("[-] Scapy 不可用。安装:pip install scapy")
print("[*] 回退到基于 tcpdump 的监控...")
self._monitor_with_tcpdump()
def _monitor_with_tcpdump(self):
"""使用 tcpdump 的备用监控方案。"""
cmd = ['tcpdump', '-i', self.interface, '-l', '-n', 'arp']
proc = subprocess.Popen(cmd, stdout=subprocess.PIPE,
stderr=subprocess.DEVNULL, text=True)
try:
for line in proc.stdout:
parts = line.strip().split()
if 'is-at' in parts:
try:
ip_idx = parts.index('is-at') - 1
mac_idx = parts.index('is-at') + 1
src_ip = parts[ip_idx]
src_mac = parts[mac_idx].lower()
self.check_gateway_spoofing(src_ip, src_mac)
self.check_mac_change(src_ip, src_mac)
self.arp_table[src_ip] = src_mac
except (IndexError, ValueError):
continue
except KeyboardInterrupt:
proc.terminate()
def generate_report(self) -> dict:
"""生成检测到的异常摘要报告。"""
return {
'monitoring_interface': self.interface,
'gateway': {'ip': self.gateway_ip, 'mac': self.gateway_mac},
'total_alerts': len(self.alerts),
'arp_table_size': len(self.arp_table),
'alerts': self.alerts,
}
if __name__ == '__main__':
if len(sys.argv) < 4:
print("用法:python process.py <interface> <gateway_ip> <gateway_mac>")
print("示例:python process.py eth0 10.0.1.1 aa:bb:cc:dd:ee:01")
sys.exit(1)
detector = ARPPoisonDetector(
interface=sys.argv[1],
gateway_ip=sys.argv[2],
gateway_mac=sys.argv[3],
)
try:
detector.start_monitoring()
except KeyboardInterrupt:
print("\n\n[*] 监控已停止。")
report = detector.generate_report()
print(f"[*] 生成的告警总数:{report['total_alerts']}")
print(f"[*] ARP 表条目:{report['arp_table_size']}")