Implements API schema validation using OpenAPI and JSON Schema to enforce input/output contracts, preventing injections, data leakage, and mass assignment attacks. Use for API gateways and dev-time security.
npx claudepluginhub killvxk/cybersecurity-skills-zhThis skill uses the workspace's default tool permissions.
API Schema验证(Schema Validation)确保通过API交换的所有数据符合OpenAPI规范(OAS)或JSON Schema文档中预定义的结构。这可以防止注入攻击(SQLi、XSS、XXE),通过拒绝未知属性来阻断批量赋值(Mass Assignment),通过验证响应Schema来防止数据泄露(Data Leakage),并确保所有API交互的类型安全。Schema验证在API网关层面(运行时强制执行)和开发阶段(安全左移)均可运行。
Implements API schema validation using OpenAPI and JSON Schema to enforce input/output contracts and prevent injection, mass assignment, and data exposure attacks.
Implements API schema validation with OpenAPI and JSON Schema to secure inputs/outputs, prevent injections, mass assignment, and data leaks in API gateways.
Implements security controls at API gateway layer (Kong, AWS API Gateway, Azure APIM, Apigee) including auth enforcement, rate limiting, request validation, IP whitelisting, TLS termination, threat protection. Secures microservices APIs before backend.
Share bugs, ideas, or general feedback.
API Schema验证(Schema Validation)确保通过API交换的所有数据符合OpenAPI规范(OAS)或JSON Schema文档中预定义的结构。这可以防止注入攻击(SQLi、XSS、XXE),通过拒绝未知属性来阻断批量赋值(Mass Assignment),通过验证响应Schema来防止数据泄露(Data Leakage),并确保所有API交互的类型安全。Schema验证在API网关层面(运行时强制执行)和开发阶段(安全左移)均可运行。
openapi: 3.1.0
info:
title: Secure E-Commerce API
version: 2.0.0
servers:
- url: https://api.example.com/v2
description: Production (HTTPS enforced)
security:
- OAuth2:
- read:products
- write:orders
paths:
/products:
post:
operationId: createProduct
security:
- OAuth2: [write:products]
requestBody:
required: true
content:
application/json:
schema:
$ref: '#/components/schemas/ProductCreate'
responses:
'201':
description: Product created
content:
application/json:
schema:
$ref: '#/components/schemas/Product'
'400':
$ref: '#/components/responses/ValidationError'
'401':
$ref: '#/components/responses/Unauthorized'
/products/{productId}:
get:
operationId: getProduct
parameters:
- name: productId
in: path
required: true
schema:
type: string
format: uuid
pattern: '^[0-9a-f]{8}-[0-9a-f]{4}-4[0-9a-f]{3}-[89ab][0-9a-f]{3}-[0-9a-f]{12}$'
responses:
'200':
content:
application/json:
schema:
$ref: '#/components/schemas/Product'
components:
schemas:
ProductCreate:
type: object
required: [name, price, category]
properties:
name:
type: string
minLength: 1
maxLength: 200
pattern: '^[a-zA-Z0-9\s\-\.]+$' # 无特殊字符,防注入
description:
type: string
maxLength: 2000
# 净化HTML实体
price:
type: number
format: float
minimum: 0.01
maximum: 999999.99
exclusiveMinimum: 0
category:
type: string
enum: [electronics, clothing, food, furniture, other]
tags:
type: array
items:
type: string
maxLength: 50
pattern: '^[a-zA-Z0-9\-]+$'
maxItems: 10
uniqueItems: true
additionalProperties: false # 关键:防止批量赋值
Product:
type: object
required: [id, name, price]
properties:
id:
type: string
format: uuid
readOnly: true
name:
type: string
price:
type: number
category:
type: string
tags:
type: array
items:
type: string
createdAt:
type: string
format: date-time
readOnly: true
additionalProperties: false # 防止内部字段数据泄露
ValidationErrorResponse:
type: object
required: [code, message]
properties:
code:
type: string
enum: [VALIDATION_ERROR]
message:
type: string
maxLength: 500
details:
type: array
items:
type: object
properties:
field:
type: string
error:
type: string
additionalProperties: false
maxItems: 50
additionalProperties: false
responses:
ValidationError:
description: Request validation failed
content:
application/json:
schema:
$ref: '#/components/schemas/ValidationErrorResponse'
Unauthorized:
description: Authentication required
securitySchemes:
OAuth2:
type: oauth2
flows:
authorizationCode:
authorizationUrl: https://auth.example.com/authorize
tokenUrl: https://auth.example.com/token
scopes:
read:products: Read product data
write:products: Create and update products
write:orders: Create orders
"""FastAPI API Schema验证中间件
对所有请求和响应负载强制执行严格的Schema验证,
防止注入、批量赋值和数据泄露攻击。
"""
from fastapi import FastAPI, Request, Response, HTTPException
from fastapi.middleware import Middleware
from pydantic import BaseModel, Field, field_validator, ConfigDict
from typing import List, Optional
import re
import json
from starlette.middleware.base import BaseHTTPMiddleware
app = FastAPI()
# 带安全约束的严格Pydantic模型
class ProductCreate(BaseModel):
model_config = ConfigDict(extra='forbid') # 拒绝未知字段(防批量赋值)
name: str = Field(min_length=1, max_length=200, pattern=r'^[a-zA-Z0-9\s\-\.]+$')
description: Optional[str] = Field(default=None, max_length=2000)
price: float = Field(gt=0, le=999999.99)
category: str = Field(pattern=r'^(electronics|clothing|food|furniture|other)$')
tags: Optional[List[str]] = Field(default=None, max_length=10)
@field_validator('name')
@classmethod
def sanitize_name(cls, v):
# 通过HTML实体防止XSS
dangerous_patterns = ['<script', 'javascript:', 'onerror=', 'onload=']
lower_v = v.lower()
for pattern in dangerous_patterns:
if pattern in lower_v:
raise ValueError(f'名称中含有无效字符')
return v
@field_validator('description')
@classmethod
def sanitize_description(cls, v):
if v is None:
return v
# 过滤潜在的SQL注入模式
sql_patterns = [
r"('|--|;|/\*|\*/|xp_|exec\s|union\s+select|drop\s+table)",
]
for pattern in sql_patterns:
if re.search(pattern, v, re.IGNORECASE):
raise ValueError('描述中含有无效内容')
return v
@field_validator('tags')
@classmethod
def validate_tags(cls, v):
if v is None:
return v
if len(v) > 10:
raise ValueError('最多允许10个标签')
for tag in v:
if not re.match(r'^[a-zA-Z0-9\-]+$', tag) or len(tag) > 50:
raise ValueError(f'标签格式无效: {tag}')
return v
class ProductResponse(BaseModel):
"""显式定义允许输出字段的响应模型。
防止内部字段(如internal_notes、cost_price等)泄露。"""
model_config = ConfigDict(extra='forbid')
id: str
name: str
price: float
category: str
tags: List[str] = []
created_at: str
class ResponseValidationMiddleware(BaseHTTPMiddleware):
"""用于验证响应负载是否符合Schema的中间件。
通过检查响应内容防止意外数据泄露。"""
SCHEMA_MAP = {
'/api/v2/products': {
'POST': {'response_model': ProductResponse},
'GET': {'response_model': ProductResponse},
}
}
async def dispatch(self, request: Request, call_next):
response = await call_next(request)
# 仅验证JSON响应
content_type = response.headers.get('content-type', '')
if 'application/json' not in content_type:
return response
# 检查端点是否有注册的响应Schema
path = request.url.path
method = request.method
route_config = self.SCHEMA_MAP.get(path, {}).get(method)
if not route_config:
return response
# 读取并验证响应体
body = b""
async for chunk in response.body_iterator:
body += chunk
try:
data = json.loads(body)
model = route_config['response_model']
if isinstance(data, list):
for item in data:
model.model_validate(item)
else:
model.model_validate(data)
except Exception as e:
# 记录安全监控的验证失败
print(f"安全告警: 响应Schema违规 {method} {path}: {e}")
# 返回安全错误而非可能泄露的数据
return Response(
content=json.dumps({"error": "Internal server error"}),
status_code=500,
media_type="application/json"
)
return Response(
content=body,
status_code=response.status_code,
headers=dict(response.headers),
media_type=response.media_type
)
app.add_middleware(ResponseValidationMiddleware)
@app.post("/api/v2/products", response_model=ProductResponse, status_code=201)
async def create_product(product: ProductCreate):
# 带extra='forbid'的ProductCreate模型自动拒绝
# 任何未知字段,防止批量赋值攻击
# (如攻击者尝试设置is_admin=true或price=0)
pass
# 上传OpenAPI Schema到Cloudflare API Shield
curl -X POST "https://api.cloudflare.com/client/v4/zones/{zone_id}/api_gateway/user_schemas" \
-H "Authorization: Bearer ${CF_API_TOKEN}" \
-H "Content-Type: multipart/form-data" \
-F "file=@openapi.yaml" \
-F "kind=openapi_v3"
# 以拦截模式启用Schema验证
curl -X PATCH "https://api.cloudflare.com/client/v4/zones/{zone_id}/api_gateway/settings/schema_validation" \
-H "Authorization: Bearer ${CF_API_TOKEN}" \
-H "Content-Type: application/json" \
-d '{
"validation_default_mitigation_action": "block",
"validation_override_mitigation_action": null
}'
# GitHub Actions工作流 - CI中的Schema验证
name: API Schema Security Check
on:
pull_request:
paths: ['api/**', 'openapi/**']
jobs:
schema-security:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: 验证OpenAPI Schema
run: |
npm install -g @stoplight/spectral-cli
spectral lint openapi.yaml --ruleset .spectral-security.yaml
- name: 检查安全反模式
run: |
python3 scripts/schema_security_check.py openapi.yaml
- name: 运行契约测试
run: |
npm install -g dredd
dredd openapi.yaml http://localhost:3000 --hookfiles=./test/hooks.js
| 反模式 | 风险 | 修复方案 |
|---|---|---|
additionalProperties: true 或缺失 | 批量赋值 | 设置 additionalProperties: false |
字符串字段无 maxLength | 缓冲区溢出、DoS | 添加适当的 maxLength 约束 |
字符串字段无 pattern | 注入攻击 | 添加正则模式限制输入 |
固定值字段无 enum | 处理意外输入 | 对已知值字段使用 enum |
format: password 无TLS | 凭据暴露 | 强制仅使用HTTPS服务器URL |
| 缺少错误响应Schema | 信息泄露 | 定义所有4xx/5xx响应Schema |
请求体中包含 readOnly 字段 | 数据篡改 | 服务端强制执行 readOnly |