Implements API abuse detection with token bucket, sliding window, and adaptive rate limiting algorithms to prevent DDoS, brute force, and credential stuffing attacks. Uses Redis backend with Python example.
npx claudepluginhub killvxk/cybersecurity-skills-zhThis skill uses the workspace's default tool permissions.
API速率限制(Rate Limiting)是一种关键的安全控制措施,用于限制客户端在规定时间段内可以发出的请求数量。它可以防御拒绝服务攻击(DDoS)、暴力破解登录尝试、凭据填充(Credential Stuffing)、API数据爬取和资源耗尽攻击。现代实现使用令牌桶(Token Bucket)、滑动窗口(Sliding Window)和固定窗口计数器等算法,通常以Redis等分布式存储为后端。自适应速率限制(Adaptive Rate Limiting)在检测到攻击时动态收紧限制,并在正常运营期间放宽限制,与基于静态IP的方法相比,成功DDoS攻击减少了94%。
Implements API rate limiting with token bucket, sliding window, and adaptive algorithms using Redis to prevent DDoS, brute force, credential stuffing, and abuse attacks.
Implements API abuse detection with token bucket, sliding window, and adaptive rate limiting using Redis to prevent DDoS, brute force, and credential stuffing attacks.
Implements API rate limiting and throttling with token bucket, sliding/fixed window algorithms using Redis. Prevents brute force, abuse; sets per-user/IP/endpoint limits with 429 responses.
Share bugs, ideas, or general feedback.
API速率限制(Rate Limiting)是一种关键的安全控制措施,用于限制客户端在规定时间段内可以发出的请求数量。它可以防御拒绝服务攻击(DDoS)、暴力破解登录尝试、凭据填充(Credential Stuffing)、API数据爬取和资源耗尽攻击。现代实现使用令牌桶(Token Bucket)、滑动窗口(Sliding Window)和固定窗口计数器等算法,通常以Redis等分布式存储为后端。自适应速率限制(Adaptive Rate Limiting)在检测到攻击时动态收紧限制,并在正常运营期间放宽限制,与基于静态IP的方法相比,成功DDoS攻击减少了94%。
令牌桶(Token Bucket)为每个客户端分配一个具有固定容量令牌的桶。令牌以恒定速率补充,每个请求消耗一个令牌,当桶为空时请求被拒绝。这允许受控的突发流量同时保持平均限制。
"""基于Redis后端的令牌桶速率限制器
实现用于API速率限制的分布式令牌桶算法,
支持突发允许和自动补充。
"""
import time
import redis
import json
from typing import Tuple
class TokenBucketRateLimiter:
def __init__(self, redis_client: redis.Redis,
max_tokens: int = 100,
refill_rate: float = 10.0,
key_prefix: str = "ratelimit:tb"):
self.redis = redis_client
self.max_tokens = max_tokens
self.refill_rate = refill_rate # 每秒令牌数
self.key_prefix = key_prefix
def _get_key(self, client_id: str) -> str:
return f"{self.key_prefix}:{client_id}"
def allow_request(self, client_id: str, tokens_required: int = 1) -> Tuple[bool, dict]:
"""检查请求是否应在速率限制下被允许。
返回(allowed, info),info包含剩余令牌数和重试等待秒数。
"""
key = self._get_key(client_id)
now = time.time()
# 使用Lua脚本执行原子令牌桶操作
lua_script = """
local key = KEYS[1]
local max_tokens = tonumber(ARGV[1])
local refill_rate = tonumber(ARGV[2])
local now = tonumber(ARGV[3])
local requested = tonumber(ARGV[4])
local bucket = redis.call('HMGET', key, 'tokens', 'last_refill')
local tokens = tonumber(bucket[1])
local last_refill = tonumber(bucket[2])
-- 如果桶不存在则初始化
if tokens == nil then
tokens = max_tokens
last_refill = now
end
-- 计算已补充的令牌数
local elapsed = now - last_refill
local refilled = elapsed * refill_rate
tokens = math.min(max_tokens, tokens + refilled)
-- 检查是否有足够令牌
local allowed = 0
if tokens >= requested then
tokens = tokens - requested
allowed = 1
end
-- 更新桶状态
redis.call('HMSET', key, 'tokens', tokens, 'last_refill', now)
redis.call('EXPIRE', key, 3600) -- TTL用于清理
-- 如果被拒绝则计算重试等待时间
local retry_after = 0
if allowed == 0 then
retry_after = math.ceil((requested - tokens) / refill_rate)
end
return {allowed, math.floor(tokens), retry_after}
"""
result = self.redis.eval(
lua_script, 1, key,
self.max_tokens, self.refill_rate, now, tokens_required
)
allowed = bool(result[0])
remaining = int(result[1])
retry_after = int(result[2])
return allowed, {
"remaining": remaining,
"limit": self.max_tokens,
"retry_after": retry_after,
"reset": int(now + (self.max_tokens - remaining) / self.refill_rate)
}
"""滑动窗口速率限制器
在持续移动的时间窗口内跟踪请求,
相比固定窗口提供更平滑的速率限制,
误报率仅2.3%。
"""
class SlidingWindowRateLimiter:
def __init__(self, redis_client: redis.Redis,
window_seconds: int = 60,
max_requests: int = 100,
key_prefix: str = "ratelimit:sw"):
self.redis = redis_client
self.window = window_seconds
self.max_requests = max_requests
self.key_prefix = key_prefix
def allow_request(self, client_id: str) -> Tuple[bool, dict]:
key = f"{self.key_prefix}:{client_id}"
now = time.time()
window_start = now - self.window
# 使用有序集合执行原子滑动窗口操作
pipe = self.redis.pipeline()
# 删除过期条目
pipe.zremrangebyscore(key, 0, window_start)
# 添加当前请求
pipe.zadd(key, {f"{now}:{id(now)}": now})
# 计算窗口内的请求数
pipe.zcard(key)
# 设置TTL
pipe.expire(key, self.window + 1)
results = pipe.execute()
current_count = results[2]
allowed = current_count <= self.max_requests
if not allowed:
# 删除刚添加的请求(已被拒绝)
self.redis.zremrangebyscore(key, now, now)
return allowed, {
"remaining": max(0, self.max_requests - current_count),
"limit": self.max_requests,
"window": self.window,
"current_count": current_count
}
"""自适应速率限制器
根据检测到的攻击模式动态调整速率限制。
检测到攻击时收紧限制,正常运行期间放宽限制。
"""
from enum import Enum
from dataclasses import dataclass
class ThreatLevel(Enum):
NORMAL = "normal"
ELEVATED = "elevated"
HIGH = "high"
CRITICAL = "critical"
@dataclass
class AdaptiveLimits:
requests_per_minute: int
burst_size: int
block_duration_seconds: int
THREAT_LIMITS = {
ThreatLevel.NORMAL: AdaptiveLimits(100, 20, 0),
ThreatLevel.ELEVATED: AdaptiveLimits(50, 10, 60),
ThreatLevel.HIGH: AdaptiveLimits(20, 5, 300),
ThreatLevel.CRITICAL: AdaptiveLimits(5, 2, 3600),
}
class AdaptiveRateLimiter:
def __init__(self, redis_client: redis.Redis):
self.redis = redis_client
self.token_bucket = TokenBucketRateLimiter(redis_client)
self.sliding_window = SlidingWindowRateLimiter(redis_client)
def assess_threat_level(self, client_id: str) -> ThreatLevel:
"""根据客户端行为评估当前威胁级别。"""
metrics_key = f"metrics:{client_id}"
metrics = self.redis.hgetall(metrics_key)
if not metrics:
return ThreatLevel.NORMAL
error_rate = float(metrics.get(b'error_rate', 0))
auth_failures = int(metrics.get(b'auth_failures_5m', 0))
unique_endpoints = int(metrics.get(b'unique_endpoints_5m', 0))
request_rate = float(metrics.get(b'requests_per_second', 0))
# 基于评分的威胁评估
score = 0
if auth_failures > 10:
score += 3
elif auth_failures > 5:
score += 2
elif auth_failures > 2:
score += 1
if error_rate > 0.8:
score += 3
elif error_rate > 0.5:
score += 2
if request_rate > 50:
score += 2
elif request_rate > 20:
score += 1
if unique_endpoints > 50:
score += 2 # 可能存在枚举行为
if score >= 7:
return ThreatLevel.CRITICAL
elif score >= 5:
return ThreatLevel.HIGH
elif score >= 3:
return ThreatLevel.ELEVATED
return ThreatLevel.NORMAL
def allow_request(self, client_id: str, endpoint: str) -> Tuple[bool, dict]:
"""基于威胁级别的自适应阈值速率限制。"""
threat_level = self.assess_threat_level(client_id)
limits = THREAT_LIMITS[threat_level]
# 检查客户端是否当前被封锁
block_key = f"blocked:{client_id}"
if self.redis.exists(block_key):
ttl = self.redis.ttl(block_key)
return False, {
"blocked": True,
"threat_level": threat_level.value,
"retry_after": ttl,
"reason": "因可疑活动被临时封锁"
}
# 应用威胁级别调整后的速率限制参数
self.token_bucket.max_tokens = limits.burst_size
self.token_bucket.refill_rate = limits.requests_per_minute / 60.0
allowed, info = self.token_bucket.allow_request(client_id)
if not allowed and limits.block_duration_seconds > 0:
# 按威胁级别对应时长封锁客户端
self.redis.setex(block_key, limits.block_duration_seconds, threat_level.value)
info["threat_level"] = threat_level.value
return allowed, info
def record_request_outcome(self, client_id: str, status_code: int, endpoint: str):
"""记录请求结果用于威胁评估。"""
metrics_key = f"metrics:{client_id}"
pipe = self.redis.pipeline()
pipe.hincrby(metrics_key, 'total_requests', 1)
if status_code in (401, 403):
pipe.hincrby(metrics_key, 'auth_failures_5m', 1)
if status_code >= 400:
pipe.hincrby(metrics_key, 'errors_5m', 1)
# 追踪唯一端点用于枚举检测
pipe.sadd(f"endpoints:{client_id}", endpoint)
pipe.expire(metrics_key, 300) # 5分钟窗口
pipe.expire(f"endpoints:{client_id}", 300)
pipe.execute()
# 定义速率限制区域
limit_req_zone $binary_remote_addr zone=api_general:10m rate=10r/s;
limit_req_zone $binary_remote_addr zone=api_auth:10m rate=3r/s;
limit_req_zone $binary_remote_addr zone=api_sensitive:10m rate=1r/s;
# 对API路由应用速率限制
server {
listen 443 ssl;
# 通用API端点 - 10 req/s,突发20
location /api/v1/ {
limit_req zone=api_general burst=20 nodelay;
limit_req_status 429;
proxy_pass http://api_backend;
}
# 认证端点 - 严格3 req/s
location /api/v1/auth/ {
limit_req zone=api_auth burst=5;
limit_req_status 429;
proxy_pass http://api_backend;
}
# 敏感数据端点 - 1 req/s
location /api/v1/admin/ {
limit_req zone=api_sensitive burst=3;
limit_req_status 429;
proxy_pass http://api_backend;
}
# 带Retry-After头的自定义429响应
error_page 429 = @rate_limited;
location @rate_limited {
add_header Retry-After 30;
add_header X-RateLimit-Limit $limit_req_status;
return 429 '{"error": "rate_limit_exceeded", "retry_after": 30}';
}
}
始终包含标准速率限制头:
HTTP/1.1 429 Too Many Requests
X-RateLimit-Limit: 100
X-RateLimit-Remaining: 0
X-RateLimit-Reset: 1672531200
Retry-After: 30
Content-Type: application/json
{"error": "rate_limit_exceeded", "retry_after": 30}