Implements secure API key generation with entropy, hashed storage, rotation, revocation, scoping, rate limiting, and leak monitoring for backend services.
npx claudepluginhub killvxk/cybersecurity-skills-zhThis skill uses the workspace's default tool permissions.
- 设计具有足够熵和可识别前缀的安全API密钥生成机制,用于泄露检测
Implements secure API key generation with entropy, hashed storage using SHA-256/bcrypt, rotation workflows, per-key scoping, rate limiting, and leak monitoring. For API key management and credential protection.
Implements secure API key generation, hashed storage, rotation, revocation, scoping, rate limiting, and leak monitoring for API credential protection.
Provides step-by-step guidance and production-ready code for secure API key management, including storage, best practices, authentication, and configurations for API integrations.
Share bugs, ideas, or general feedback.
不适用将API密钥作为面向用户应用的唯一认证机制。API密钥最适合服务间通信和开发者访问。
import secrets
import hashlib
import hmac
import time
import json
from datetime import datetime, timedelta
class APIKeyManager:
"""管理安全API密钥生命周期:生成、存储、验证、轮换。"""
# 密钥格式:prefix_base64random(例如 sk_live_a1b2c3d4e5f6...)
# 前缀用于标识密钥类型和环境,便于泄露检测
KEY_PREFIXES = {
"live_secret": "sk_live_",
"test_secret": "sk_test_",
"live_public": "pk_live_",
"test_public": "pk_test_",
}
def __init__(self, db_connection, redis_connection):
self.db = db_connection
self.redis = redis_connection
def generate_key(self, key_type="live_secret", owner_id=None, scopes=None,
rate_limit=None, ip_allowlist=None, expires_days=365):
"""生成带元数据的新API密钥。"""
prefix = self.KEY_PREFIXES.get(key_type, "sk_live_")
# 生成32字节(256位)随机数
random_bytes = secrets.token_bytes(32)
key_body = secrets.token_urlsafe(32) # Base64url编码
# 客户端收到的完整API密钥(仅显示一次)
full_key = f"{prefix}{key_body}"
# 对密钥进行哈希处理用于存储(永不存储原始密钥)
key_hash = hashlib.sha256(full_key.encode()).hexdigest()
# 创建用于引用的短密钥ID(前8个字符)
key_id = f"{prefix}{key_body[:8]}..."
# 存储哈希密钥及元数据
key_metadata = {
"key_hash": key_hash,
"key_id": key_id,
"key_type": key_type,
"owner_id": owner_id,
"scopes": scopes or ["read"],
"rate_limit": rate_limit or {"requests": 1000, "window": 3600},
"ip_allowlist": ip_allowlist or [],
"created_at": datetime.utcnow().isoformat(),
"expires_at": (datetime.utcnow() + timedelta(days=expires_days)).isoformat(),
"last_used": None,
"is_active": True,
"usage_count": 0,
}
# 存入数据库
self.db.execute(
"INSERT INTO api_keys (key_hash, key_id, metadata) VALUES (?, ?, ?)",
(key_hash, key_id, json.dumps(key_metadata))
)
# 缓存到Redis用于快速验证
self.redis.setex(
f"apikey:{key_hash}",
86400, # 24小时缓存TTL
json.dumps(key_metadata)
)
return {
"api_key": full_key, # 仅向用户展示一次
"key_id": key_id, # 用于引用/管理
"scopes": key_metadata["scopes"],
"expires_at": key_metadata["expires_at"],
}
def validate_key(self, api_key):
"""验证API密钥并返回其元数据。"""
key_hash = hashlib.sha256(api_key.encode()).hexdigest()
# 优先检查Redis缓存
cached = self.redis.get(f"apikey:{key_hash}")
if cached:
metadata = json.loads(cached)
else:
# 回退到数据库
row = self.db.execute(
"SELECT metadata FROM api_keys WHERE key_hash = ?",
(key_hash,)
).fetchone()
if not row:
return None, "invalid_key"
metadata = json.loads(row[0])
# 刷新缓存
self.redis.setex(f"apikey:{key_hash}", 86400, row[0])
# 验证检查
if not metadata.get("is_active"):
return None, "key_revoked"
if metadata.get("expires_at"):
if datetime.fromisoformat(metadata["expires_at"]) < datetime.utcnow():
return None, "key_expired"
# 更新最后使用时间
metadata["last_used"] = datetime.utcnow().isoformat()
metadata["usage_count"] = metadata.get("usage_count", 0) + 1
self.redis.setex(f"apikey:{key_hash}", 86400, json.dumps(metadata))
return metadata, "valid"
def revoke_key(self, key_id):
"""立即吊销API密钥。"""
row = self.db.execute(
"SELECT key_hash, metadata FROM api_keys WHERE key_id = ?",
(key_id,)
).fetchone()
if row:
key_hash = row[0]
metadata = json.loads(row[1])
metadata["is_active"] = False
metadata["revoked_at"] = datetime.utcnow().isoformat()
self.db.execute(
"UPDATE api_keys SET metadata = ? WHERE key_id = ?",
(json.dumps(metadata), key_id)
)
# 立即使缓存失效
self.redis.delete(f"apikey:{key_hash}")
return True
return False
def rotate_key(self, old_key_id, grace_period_hours=24):
"""轮换API密钥,提供新旧密钥同时生效的宽限期。"""
old_row = self.db.execute(
"SELECT key_hash, metadata FROM api_keys WHERE key_id = ?",
(old_key_id,)
).fetchone()
if not old_row:
return None, "key_not_found"
old_metadata = json.loads(old_row[1])
# 使用相同设置生成新密钥
new_key_data = self.generate_key(
key_type=old_metadata["key_type"],
owner_id=old_metadata["owner_id"],
scopes=old_metadata["scopes"],
rate_limit=old_metadata["rate_limit"],
ip_allowlist=old_metadata["ip_allowlist"],
)
# 在宽限期后安排旧密钥吊销
revoke_at = datetime.utcnow() + timedelta(hours=grace_period_hours)
old_metadata["scheduled_revocation"] = revoke_at.isoformat()
self.db.execute(
"UPDATE api_keys SET metadata = ? WHERE key_id = ?",
(json.dumps(old_metadata), old_key_id)
)
return {
"new_key": new_key_data,
"old_key_id": old_key_id,
"old_key_revokes_at": revoke_at.isoformat(),
"message": f"旧密钥将在 {grace_period_hours} 小时后吊销"
}, "success"
from flask import Flask, request, jsonify, g
from functools import wraps
app = Flask(__name__)
def require_api_key(required_scopes=None):
"""验证API密钥并检查范围的中间件。"""
def decorator(f):
@wraps(f)
def wrapped(*args, **kwargs):
# 从请求头提取API密钥
api_key = request.headers.get("X-API-Key")
if not api_key:
# 也检查 Authorization: Bearer <key>
auth_header = request.headers.get("Authorization", "")
if auth_header.startswith("Bearer "):
api_key = auth_header[7:]
if not api_key:
return jsonify({"error": "missing_api_key"}), 401
# 验证密钥
metadata, status = key_manager.validate_key(api_key)
if status != "valid":
return jsonify({"error": status}), 401
# 检查IP白名单
if metadata.get("ip_allowlist"):
client_ip = request.remote_addr
if client_ip not in metadata["ip_allowlist"]:
return jsonify({"error": "ip_not_allowed"}), 403
# 检查范围
if required_scopes:
key_scopes = set(metadata.get("scopes", []))
if not key_scopes.intersection(required_scopes):
return jsonify({"error": "insufficient_scope"}), 403
# 将元数据附加到请求上下文
g.api_key_metadata = metadata
return f(*args, **kwargs)
return wrapped
return decorator
@app.route('/api/v1/data', methods=['GET'])
@require_api_key(required_scopes=["read", "admin"])
def get_data():
return jsonify({"data": "敏感信息"})
@app.route('/api/v1/data', methods=['POST'])
@require_api_key(required_scopes=["write", "admin"])
def create_data():
return jsonify({"created": True})
# 使用gitleaks扫描GitHub仓库中的泄露API密钥
gitleaks detect --source=/path/to/repo --config=gitleaks.toml --report-path=leaks.json
# 用于API密钥前缀检测的自定义gitleaks配置
# gitleaks.toml
cat <<'EOF'
[[rules]]
id = "company-api-key-live"
description = "Company Live API Key"
regex = '''sk_live_[A-Za-z0-9_-]{32,}'''
tags = ["api-key", "live", "critical"]
[[rules]]
id = "company-api-key-test"
description = "Company Test API Key"
regex = '''sk_test_[A-Za-z0-9_-]{32,}'''
tags = ["api-key", "test"]
[[rules]]
id = "company-public-key"
description = "Company Public API Key"
regex = '''pk_live_[A-Za-z0-9_-]{32,}'''
tags = ["api-key", "public"]
EOF
# 自动化泄露密钥吊销
import json
def process_leaked_keys(leaks_file):
"""自动吊销在公共仓库中检测到的API密钥。"""
with open(leaks_file) as f:
leaks = json.load(f)
for leak in leaks:
key_match = leak.get("match", "")
# 从匹配中提取密钥
for prefix in ["sk_live_", "sk_test_", "pk_live_"]:
if prefix in key_match:
start = key_match.index(prefix)
potential_key = key_match[start:start+50] # 最大密钥长度
# 验证并吊销
metadata, status = key_manager.validate_key(potential_key)
if status == "valid":
key_manager.revoke_key(metadata["key_id"])
print(f"[已吊销] 密钥 {metadata['key_id']} 在 {leak.get('file')} 中泄露")
# 通知密钥所有者
notify_owner(metadata["owner_id"], metadata["key_id"], leak)
| 术语 | 定义 |
|---|---|
| API密钥(API Key) | 用于认证API请求的密钥字符串,通常通过请求头或查询参数传递 |
| 密钥哈希(Key Hashing) | 在数据库中只存储API密钥的哈希值(SHA-256),永不存储明文密钥,类似于密码哈希 |
| 密钥轮换(Key Rotation) | 在维持宽限期(新旧密钥均有效)的情况下将API密钥替换为新密钥,确保零停机过渡 |
| 密钥范围限制(Key Scoping) | 将每个API密钥限制到特定端点、HTTP方法、IP范围和速率限制,以最小化影响范围 |
| 密钥前缀(Key Prefix) | 可识别的前缀(如 sk_live_),使自动化扫描能够在日志、代码和公共仓库中检测泄露密钥 |
| 密钥扫描(Secret Scanning) | 自动监控仓库、日志和公共来源中暴露的API密钥和凭据 |
背景:开发者平台提供以API密钥认证的公共API,平台有10,000+个API消费者,每天产生50M+个请求,密钥频繁在公共GitHub仓库中泄露。
方法:
注意事项:
## API密钥安全实施报告
**平台**: Developer API v3
**总活跃密钥数**: 12,450
**日密钥验证次数**: 5200万
### 安全控制
| 控制措施 | 实施方式 | 状态 |
|---------|---------------|--------|
| 密钥熵 | 256位(secrets.token_urlsafe(32)) | 已实施 |
| 密钥格式 | sk_live_/sk_test_ 前缀 | 已实施 |
| 存储 | SHA-256哈希,Redis缓存 | 已实施 |
| 范围限制 | 按密钥的端点/IP/速率限制 | 已实施 |
| 轮换 | 24小时宽限期API | 已实施 |
| 过期 | 最大TTL 365天 | 已实施 |
| 泄露检测 | GitHub Secret Scanning + gitleaks | 活跃 |
| 自动吊销 | 泄露密钥5分钟内吊销 | 活跃 |
### 密钥泄露统计(过去30天)
- 公共仓库中检测到的密钥: 23个
- 平均吊销时间: 3.2分钟
- CI/CD预提交检测到的密钥: 7个(已阻断)