Implements API rate limiting and throttling with token bucket, sliding/fixed window algorithms using Redis. Prevents brute force, abuse; sets per-user/IP/endpoint limits with 429 responses.
npx claudepluginhub killvxk/cybersecurity-skills-zhThis skill uses the workspace's default tool permissions.
- 保护认证端点免受暴力破解和凭据填充攻击
Implements API rate limiting and throttling using token bucket, sliding window, fixed window algorithms with Redis counters or middleware. Protects against brute force, abuse, DoS attacks.
Implements API rate limiting and throttling via token bucket, sliding window, fixed window using Redis counters, middleware, or gateways for per-user/IP/endpoint limits and 429 responses.
Implements API abuse detection with token bucket, sliding window, and adaptive rate limiting algorithms to prevent DDoS, brute force, and credential stuffing attacks. Uses Redis backend with Python example.
Share bugs, ideas, or general feedback.
不适用将速率限制作为抵御攻击的唯一防御。需与认证、授权和WAF规则结合使用。
按端点类别和用户层级定义速率限制:
# 速率限制配置
RATE_LIMITS = {
# 认证端点(最严格)
"auth": {
"login": {"requests": 5, "window_seconds": 60, "by": "ip"},
"register": {"requests": 3, "window_seconds": 300, "by": "ip"},
"forgot_password": {"requests": 3, "window_seconds": 3600, "by": "ip"},
"verify_mfa": {"requests": 5, "window_seconds": 300, "by": "user"},
},
# 标准API端点
"api": {
"free": {"requests": 60, "window_seconds": 60, "by": "user"},
"premium": {"requests": 300, "window_seconds": 60, "by": "user"},
"enterprise": {"requests": 1000, "window_seconds": 60, "by": "user"},
},
# 资源密集型端点
"expensive": {
"search": {"requests": 10, "window_seconds": 60, "by": "user"},
"export": {"requests": 5, "window_seconds": 3600, "by": "user"},
"bulk_import": {"requests": 2, "window_seconds": 3600, "by": "user"},
},
# 全局限制
"global": {
"per_ip": {"requests": 1000, "window_seconds": 60, "by": "ip"},
"per_user": {"requests": 5000, "window_seconds": 3600, "by": "user"},
},
}
import redis
import time
import hashlib
from functools import wraps
from flask import Flask, request, jsonify, g
app = Flask(__name__)
redis_client = redis.Redis(host='localhost', port=6379, db=0, decode_responses=True)
class SlidingWindowRateLimiter:
"""使用Redis有序集合的滑动窗口速率限制器。"""
def __init__(self, redis_conn):
self.redis = redis_conn
def is_allowed(self, key, max_requests, window_seconds):
"""检查请求是否被允许并记录它。"""
now = time.time()
window_start = now - window_seconds
pipe = self.redis.pipeline()
# 删除过期条目
pipe.zremrangebyscore(key, 0, window_start)
# 计算当前窗口内的请求数
pipe.zcard(key)
# 添加当前请求
pipe.zadd(key, {f"{now}:{hashlib.md5(str(now).encode()).hexdigest()[:8]}": now})
# 设置键的TTL
pipe.expire(key, window_seconds + 1)
results = pipe.execute()
current_count = results[1]
if current_count >= max_requests:
# 计算重试等待时间
oldest = self.redis.zrange(key, 0, 0, withscores=True)
if oldest:
retry_after = int(oldest[0][1] + window_seconds - now) + 1
else:
retry_after = window_seconds
return False, current_count, max_requests, retry_after
return True, current_count + 1, max_requests, 0
rate_limiter = SlidingWindowRateLimiter(redis_client)
def rate_limit(max_requests, window_seconds, key_func=None):
"""API端点速率限制装饰器。"""
def decorator(f):
@wraps(f)
def wrapped(*args, **kwargs):
# 确定速率限制键
if key_func:
identifier = key_func()
elif hasattr(g, 'user_id'):
identifier = f"user:{g.user_id}"
else:
identifier = f"ip:{request.remote_addr}"
key = f"ratelimit:{request.endpoint}:{identifier}"
allowed, current, limit, retry_after = rate_limiter.is_allowed(
key, max_requests, window_seconds)
# 始终设置速率限制头
headers = {
"X-RateLimit-Limit": str(limit),
"X-RateLimit-Remaining": str(max(0, limit - current)),
"X-RateLimit-Reset": str(int(time.time()) + window_seconds),
}
if not allowed:
headers["Retry-After"] = str(retry_after)
response = jsonify({
"error": "rate_limit_exceeded",
"message": "请求过多,请稍后再试。",
"retry_after": retry_after
})
response.status_code = 429
for h, v in headers.items():
response.headers[h] = v
return response
response = f(*args, **kwargs)
for h, v in headers.items():
response.headers[h] = v
return response
return wrapped
return decorator
# 对端点应用速率限制
@app.route('/api/v1/auth/login', methods=['POST'])
@rate_limit(max_requests=5, window_seconds=60,
key_func=lambda: f"ip:{request.remote_addr}")
def login():
# 登录逻辑
return jsonify({"message": "登录成功"})
@app.route('/api/v1/users/me', methods=['GET'])
@rate_limit(max_requests=60, window_seconds=60)
def get_profile():
# 个人信息逻辑
return jsonify({"user": "data"})
@app.route('/api/v1/search', methods=['GET'])
@rate_limit(max_requests=10, window_seconds=60)
def search():
# 搜索逻辑
return jsonify({"results": []})
import redis
import time
class TokenBucketRateLimiter:
"""允许在限制内突发流量的令牌桶速率限制器。"""
def __init__(self, redis_conn):
self.redis = redis_conn
def is_allowed(self, key, max_tokens, refill_rate, refill_interval=1):
"""
令牌桶算法:
- max_tokens: 最大突发容量
- refill_rate: 每refill_interval添加的令牌数
- refill_interval: 补充之间的秒数
"""
now = time.time()
bucket_key = f"tb:{key}"
# 用于原子令牌桶操作的Lua脚本
lua_script = """
local key = KEYS[1]
local max_tokens = tonumber(ARGV[1])
local refill_rate = tonumber(ARGV[2])
local refill_interval = tonumber(ARGV[3])
local now = tonumber(ARGV[4])
local bucket = redis.call('hmget', key, 'tokens', 'last_refill')
local tokens = tonumber(bucket[1])
local last_refill = tonumber(bucket[2])
if tokens == nil then
tokens = max_tokens
last_refill = now
end
-- 补充令牌
local elapsed = now - last_refill
local refills = math.floor(elapsed / refill_interval)
if refills > 0 then
tokens = math.min(max_tokens, tokens + (refills * refill_rate))
last_refill = last_refill + (refills * refill_interval)
end
local allowed = 0
if tokens >= 1 then
tokens = tokens - 1
allowed = 1
end
redis.call('hmset', key, 'tokens', tokens, 'last_refill', last_refill)
redis.call('expire', key, math.ceil(max_tokens / refill_rate * refill_interval) + 10)
return {allowed, tokens, max_tokens}
"""
result = self.redis.eval(lua_script, 1, bucket_key,
max_tokens, refill_rate, refill_interval, now)
allowed = bool(result[0])
remaining = int(result[1])
limit = int(result[2])
return allowed, remaining, limit
from enum import Enum
class UserTier(Enum):
FREE = "free"
PREMIUM = "premium"
ENTERPRISE = "enterprise"
TIER_LIMITS = {
UserTier.FREE: {
"default": (60, 60), # 60 req/min
"search": (10, 60), # 10 req/min
"export": (5, 3600), # 5 req/hour
"daily_total": (1000, 86400), # 1000 req/day
},
UserTier.PREMIUM: {
"default": (300, 60),
"search": (50, 60),
"export": (20, 3600),
"daily_total": (10000, 86400),
},
UserTier.ENTERPRISE: {
"default": (1000, 60),
"search": (200, 60),
"export": (100, 3600),
"daily_total": (100000, 86400),
},
}
def get_rate_limit_for_request(user_tier, endpoint_category="default"):
"""根据用户层级和端点获取速率限制配置。"""
tier_config = TIER_LIMITS.get(user_tier, TIER_LIMITS[UserTier.FREE])
limit_config = tier_config.get(endpoint_category, tier_config["default"])
return limit_config # (max_requests, window_seconds)
class TieredRateLimitMiddleware:
"""基于用户订阅层级应用速率限制的中间件。"""
def __init__(self, app, redis_conn):
self.app = app
self.limiter = SlidingWindowRateLimiter(redis_conn)
def __call__(self, environ, start_response):
# 从请求提取用户信息
user_id = environ.get("HTTP_X_USER_ID")
user_tier = UserTier(environ.get("HTTP_X_USER_TIER", "free"))
endpoint = environ.get("PATH_INFO", "/")
# 确定端点类别
category = "default"
if "/search" in endpoint:
category = "search"
elif "/export" in endpoint:
category = "export"
max_requests, window = get_rate_limit_for_request(user_tier, category)
key = f"tiered:{user_id or environ.get('REMOTE_ADDR')}:{category}"
allowed, current, limit, retry_after = self.limiter.is_allowed(
key, max_requests, window)
if not allowed:
status = "429 Too Many Requests"
headers = [
("Content-Type", "application/json"),
("Retry-After", str(retry_after)),
("X-RateLimit-Limit", str(limit)),
("X-RateLimit-Remaining", "0"),
]
start_response(status, headers)
body = f'{{"error":"rate_limit_exceeded","retry_after":{retry_after},"tier":"{user_tier.value}"}}'
return [body.encode()]
return self.app(environ, start_response)
# 使用Redis Cluster的集中式速率限制服务
import redis
from redis.cluster import RedisCluster
class DistributedRateLimiter:
"""使用Redis Cluster的微服务架构速率限制器。"""
def __init__(self):
self.redis = RedisCluster(
startup_nodes=[
{"host": "redis-node-1", "port": 6379},
{"host": "redis-node-2", "port": 6379},
{"host": "redis-node-3", "port": 6379},
],
decode_responses=True
)
def check_and_increment(self, service_name, user_id, endpoint,
max_requests, window_seconds):
"""使用Redis Lua脚本进行原子检查和递增。"""
key = f"rl:{{{service_name}}}:{user_id}:{endpoint}"
# Lua脚本确保检查和递增的原子性
lua_script = """
local key = KEYS[1]
local max_requests = tonumber(ARGV[1])
local window = tonumber(ARGV[2])
local now = tonumber(ARGV[3])
local window_start = now - window
-- 删除旧条目
redis.call('zremrangebyscore', key, '-inf', window_start)
-- 计算当前条目数
local count = redis.call('zcard', key)
if count >= max_requests then
-- 获取最旧条目用于重试等待时间计算
local oldest = redis.call('zrange', key, 0, 0, 'WITHSCORES')
local retry_after = 0
if #oldest > 0 then
retry_after = math.ceil(tonumber(oldest[2]) + window - now)
end
return {0, count, retry_after}
end
-- 添加新条目
redis.call('zadd', key, now, now .. ':' .. math.random(100000))
redis.call('expire', key, window + 1)
return {1, count + 1, 0}
"""
result = self.redis.eval(lua_script, 1, key,
max_requests, window_seconds, time.time())
return {
"allowed": bool(result[0]),
"current": int(result[1]),
"retry_after": int(result[2]),
}
| 术语 | 定义 |
|---|---|
| 滑动窗口(Sliding Window) | 在滚动时间窗口内跟踪请求的速率限制算法,比固定窗口提供更平滑的速率强制 |
| 令牌桶(Token Bucket) | 令牌以固定速率添加并按请求消耗的算法,允许在桶容量范围内的受控突发 |
| 固定窗口(Fixed Window) | 最简单的速率限制,按固定时间窗口(如每分钟)计数请求,在窗口边界处容易发生突发 |
| 429 Too Many Requests | 表示客户端已超过速率限制的HTTP状态码,伴随Retry-After头 |
| Retry-After头(Retry-After Header) | 告知客户端重试前需等待多少秒的HTTP响应头,对行为良好的API客户端至关重要 |
| 分布式速率限制(Distributed Rate Limiting) | 使用共享状态(Redis、Memcached)跨多个服务器实例进行速率限制,维护准确的全局计数器 |
背景:公司推出包含免费、高级和企业层级的公共API,需要在防止滥用的同时为付费客户提供公平访问。API在AWS ALB后面运行在6个实例上。
方法:
注意事项:
## 速率限制实施报告
**API**: Public API v2
**算法**: 滑动窗口(Redis有序集合)
**后端**: Redis Cluster(3个节点)
**部署**: AWS ALB后面的6个API实例
### 速率限制配置
| 层级 | 默认 | 搜索 | 导出 | 认证(每IP) |
|------|---------|--------|--------|---------------|
| 免费 | 60/min | 10/min | 5/hour | 5/min |
| 高级 | 300/min | 50/min | 20/hour | 5/min |
| 企业 | 1000/min | 200/min | 100/hour | 10/min |
### 验证结果(k6负载测试)
- 免费层:第61个请求时速率限制(正确)
- 高级层:第301个请求时速率限制(正确)
- 跨实例:所有6个实例速率限制一致
- Redis故障转移:Redis不可用时速率限制优雅降级(允许流量通过)
- Retry-After头:实际重置时间1秒内准确
- 响应开销:每个速率限制检查增加<2ms延迟