<!-- AUTO-GENERATED by export-plugins.py — DO NOT EDIT -->
npx claudepluginhub frank-luongt/faos-skills-marketplace --plugin faos-security-engineerThis skill uses the workspace's default tool permissions.
Implements structured self-debugging workflow for AI agent failures: capture errors, diagnose patterns like loops or context overflow, apply contained recoveries, and generate introspection reports.
Designs and optimizes AI agent action spaces, tool definitions, observation formats, error recovery, and context for higher task completion rates.
Compares coding agents like Claude Code and Aider on custom YAML-defined codebase tasks using git worktrees, measuring pass rate, cost, time, and consistency.
APIs are the primary attack surface for modern applications. Over 90% of web-enabled applications expose more attack surface through APIs than through traditional UI. This skill provides defense-in-depth patterns for API security, mapping OWASP API Security Top 10 risks to concrete prevention code.
Core principle: Every API endpoint is an entry point for attackers. Authenticate, authorize, validate, and rate-limit every request -- no exceptions.
This skill enhances the existing @owasp-api-top10 skill with implementation-level patterns for Python/FastAPI, OPA policy-as-code, Redis-based rate limiting, and GraphQL-specific defenses.
Before writing security code, enumerate your attack surface:
API Threat Model Checklist:
============================
[ ] List all endpoints (including undocumented/debug endpoints)
[ ] Classify data sensitivity per endpoint (public, internal, confidential, restricted)
[ ] Identify authentication requirements per endpoint
[ ] Map authorization rules (who can access what)
[ ] Document rate limiting requirements per endpoint/consumer
[ ] Identify external dependencies and trust boundaries
[ ] Review OWASP API Top 10 against each endpoint:
API1 - Broken Object Level Authorization (BOLA)
API2 - Broken Authentication
API3 - Broken Object Property Level Authorization
API4 - Unrestricted Resource Consumption
API5 - Broken Function Level Authorization
API6 - Unrestricted Access to Sensitive Business Flows
API7 - Server-Side Request Forgery (SSRF)
API8 - Security Misconfiguration
API9 - Improper Inventory Management
API10 - Unsafe Consumption of APIs
JWT Best Practices:
iss, aud, exp, nbf claims on every requestOAuth 2.0 + PKCE (for public clients):
state parameter to prevent CSRFcode_verifier / code_challenge (S256 method)mTLS (for service-to-service):
Validate every input at the API boundary. Never trust client data.
Validation layers:
Apply rate limits at multiple layers:
Algorithms:
| Algorithm | Best For | Burst Tolerance |
|---|---|---|
| Fixed Window | Simple, low-memory | Allows 2x burst at window boundary |
| Sliding Window Log | Precise, audit-friendly | None (exact count) |
| Sliding Window Counter | Good balance of precision and memory | Minimal |
| Token Bucket | Steady rate with controlled bursts | Configurable |
| Leaky Bucket | Strict output rate smoothing | None |
Monitor for:
"""
JWT authentication middleware for FastAPI with RBAC.
Validates tokens, extracts claims, and enforces role-based access control.
"""
from datetime import datetime, timezone
from enum import Enum
from functools import wraps
from typing import Optional
import httpx
from fastapi import Depends, HTTPException, Request, status
from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer
from jose import JWTError, jwt
from pydantic import BaseModel
# --- Configuration ---
class AuthConfig(BaseModel):
jwks_url: str # e.g., "https://auth.example.com/.well-known/jwks.json"
issuer: str # e.g., "https://auth.example.com/"
audience: str # e.g., "https://api.example.com"
algorithm: str = "RS256"
token_ttl_leeway: int = 30 # seconds of clock skew tolerance
class Role(str, Enum):
VIEWER = "viewer"
EDITOR = "editor"
ADMIN = "admin"
SUPER_ADMIN = "super_admin"
ROLE_HIERARCHY = {
Role.SUPER_ADMIN: 4,
Role.ADMIN: 3,
Role.EDITOR: 2,
Role.VIEWER: 1,
}
class TokenClaims(BaseModel):
sub: str
tenant_id: str
roles: list[str]
exp: int
iss: str
aud: str
jti: Optional[str] = None
# --- JWKS Key Cache ---
class JWKSClient:
"""Caches JWKS keys with automatic refresh."""
def __init__(self, jwks_url: str, cache_ttl: int = 3600):
self._jwks_url = jwks_url
self._cache_ttl = cache_ttl
self._keys: dict = {}
self._last_fetched: float = 0
async def get_signing_key(self, kid: str) -> dict:
now = datetime.now(timezone.utc).timestamp()
if now - self._last_fetched > self._cache_ttl or kid not in self._keys:
await self._refresh_keys()
if kid not in self._keys:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Unknown signing key",
)
return self._keys[kid]
async def _refresh_keys(self) -> None:
async with httpx.AsyncClient(timeout=10) as client:
resp = await client.get(self._jwks_url)
resp.raise_for_status()
jwks = resp.json()
self._keys = {key["kid"]: key for key in jwks.get("keys", [])}
self._last_fetched = datetime.now(timezone.utc).timestamp()
# --- Token Validation ---
security_scheme = HTTPBearer()
async def validate_token(
request: Request,
credentials: HTTPAuthorizationCredentials = Depends(security_scheme),
) -> TokenClaims:
"""Validate JWT and extract claims. Use as a FastAPI dependency."""
config: AuthConfig = request.app.state.auth_config
jwks_client: JWKSClient = request.app.state.jwks_client
token = credentials.credentials
try:
# Decode header without verification to get kid
unverified_header = jwt.get_unverified_header(token)
kid = unverified_header.get("kid")
if not kid:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Token missing key ID",
)
# Fetch the signing key
signing_key = await jwks_client.get_signing_key(kid)
# Verify and decode the token
payload = jwt.decode(
token,
signing_key,
algorithms=[config.algorithm],
audience=config.audience,
issuer=config.issuer,
options={
"verify_exp": True,
"verify_aud": True,
"verify_iss": True,
"verify_nbf": True,
"leeway": config.token_ttl_leeway,
},
)
claims = TokenClaims(**payload)
# Bind tenant_id and sub to request state for downstream use
request.state.tenant_id = claims.tenant_id
request.state.user_id = claims.sub
request.state.roles = claims.roles
return claims
except JWTError as e:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail=f"Invalid token: {e}",
)
# --- Role-Based Access Control Decorator ---
def require_role(minimum_role: Role):
"""Decorator that enforces minimum role level on a route."""
def decorator(func):
@wraps(func)
async def wrapper(*args, claims: TokenClaims = Depends(validate_token), **kwargs):
user_max_role = max(
(ROLE_HIERARCHY.get(Role(r), 0) for r in claims.roles),
default=0,
)
if user_max_role < ROLE_HIERARCHY[minimum_role]:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail=f"Requires role: {minimum_role.value}",
)
return await func(*args, claims=claims, **kwargs)
return wrapper
return decorator
# --- Usage in Routes ---
# @router.get("/admin/users")
# @require_role(Role.ADMIN)
# async def list_users(claims: TokenClaims = Depends(validate_token)):
# """Only admins and above can list users."""
# return await user_service.list_by_tenant(claims.tenant_id)
#
# @router.get("/documents/{doc_id}")
# async def get_document(doc_id: str, claims: TokenClaims = Depends(validate_token)):
# """BOLA prevention: always filter by tenant_id from token, never from request."""
# doc = await doc_service.get(doc_id, tenant_id=claims.tenant_id)
# if not doc:
# raise HTTPException(status_code=404, detail="Not found")
# return doc
# policy/api_authz.rego
#
# Open Policy Agent (OPA) policy for API authorization.
# Enforces tenant isolation, role-based access, and resource-level permissions.
package api.authz
import future.keywords.if
import future.keywords.in
default allow := false
# --- Tenant Isolation (BOLA Prevention) ---
# Every request must include a tenant_id that matches the token's tenant.
allow if {
valid_tenant
valid_role
valid_resource_access
}
valid_tenant if {
# The requested resource's tenant must match the token's tenant
input.resource.tenant_id == input.token.tenant_id
}
# --- Role-Based Access Control ---
role_permissions := {
"viewer": {"GET"},
"editor": {"GET", "POST", "PUT", "PATCH"},
"admin": {"GET", "POST", "PUT", "PATCH", "DELETE"},
"super_admin": {"GET", "POST", "PUT", "PATCH", "DELETE"},
}
valid_role if {
some role in input.token.roles
input.request.method in role_permissions[role]
}
# --- Resource-Level Permissions ---
# Admin-only endpoints
admin_paths := {"/api/v1/admin", "/api/v1/users", "/api/v1/audit-logs"}
valid_resource_access if {
not startswith_any(input.request.path, admin_paths)
}
valid_resource_access if {
startswith_any(input.request.path, admin_paths)
"admin" in input.token.roles
}
valid_resource_access if {
startswith_any(input.request.path, admin_paths)
"super_admin" in input.token.roles
}
# --- Rate Limit Tier (advisory, enforced by gateway) ---
rate_limit_tier := tier if {
"super_admin" in input.token.roles
tier := "unlimited"
} else := tier if {
"admin" in input.token.roles
tier := "high"
} else := tier if {
tier := "standard"
}
# --- Helper Functions ---
startswith_any(str, prefixes) if {
some prefix in prefixes
startswith(str, prefix)
}
# --- Deny Reasons (for debugging and audit logging) ---
reasons[reason] if {
not valid_tenant
reason := "tenant_id mismatch: cross-tenant access denied"
}
reasons[reason] if {
not valid_role
reason := sprintf("role '%v' not authorized for method '%v'", [input.token.roles, input.request.method])
}
reasons[reason] if {
not valid_resource_access
reason := sprintf("path '%v' requires admin role", [input.request.path])
}
"""
Sliding window rate limiter using Redis sorted sets.
Suitable for distributed API deployments behind a load balancer.
"""
import time
from dataclasses import dataclass
from typing import Optional
import redis.asyncio as redis
from fastapi import HTTPException, Request, status
@dataclass
class RateLimitConfig:
"""Rate limit configuration per endpoint or consumer tier."""
requests: int # max requests allowed
window_seconds: int # time window in seconds
burst_multiplier: float = 1.0 # allow burst up to N * requests in any sub-window
@dataclass
class RateLimitResult:
allowed: bool
limit: int
remaining: int
retry_after: Optional[int] # seconds until next allowed request
reset_at: float # Unix timestamp when the window resets
class SlidingWindowRateLimiter:
"""
Sliding window rate limiter using Redis sorted sets.
Algorithm:
1. Use a sorted set per (identifier, endpoint) pair
2. Score = request timestamp
3. Remove entries older than the window
4. Count remaining entries
5. If count < limit, allow and add new entry
6. All operations in a single Redis pipeline (atomic)
"""
def __init__(self, redis_client: redis.Redis, key_prefix: str = "faos:ratelimit"):
self._redis = redis_client
self._prefix = key_prefix
async def check(
self,
identifier: str,
endpoint: str,
config: RateLimitConfig,
) -> RateLimitResult:
"""Check and consume a rate limit token. Returns result with headers."""
now = time.time()
window_start = now - config.window_seconds
key = f"{self._prefix}:{identifier}:{endpoint}"
async with self._redis.pipeline(transaction=True) as pipe:
# Remove expired entries
pipe.zremrangebyscore(key, 0, window_start)
# Count current entries in window
pipe.zcard(key)
# Add current request (optimistically)
pipe.zadd(key, {f"{now}:{id(now)}": now})
# Set TTL to auto-cleanup
pipe.expire(key, config.window_seconds + 1)
results = await pipe.execute()
current_count = results[1] # zcard result (before adding new entry)
if current_count >= config.requests:
# Over limit -- remove the optimistically added entry
await self._redis.zremrangebyscore(key, now, now + 0.001)
# Calculate retry-after from oldest entry in window
oldest = await self._redis.zrange(key, 0, 0, withscores=True)
retry_after = int(oldest[0][1] + config.window_seconds - now) + 1 if oldest else config.window_seconds
return RateLimitResult(
allowed=False,
limit=config.requests,
remaining=0,
retry_after=retry_after,
reset_at=now + retry_after,
)
return RateLimitResult(
allowed=True,
limit=config.requests,
remaining=config.requests - current_count - 1,
retry_after=None,
reset_at=now + config.window_seconds,
)
# --- Rate Limit Configurations by Tier ---
RATE_LIMITS = {
"default": RateLimitConfig(requests=100, window_seconds=60),
"auth_endpoints": RateLimitConfig(requests=10, window_seconds=60),
"search": RateLimitConfig(requests=30, window_seconds=60),
"export": RateLimitConfig(requests=5, window_seconds=300),
"premium_tier": RateLimitConfig(requests=500, window_seconds=60),
}
# --- FastAPI Middleware ---
async def rate_limit_middleware(request: Request, call_next):
"""FastAPI middleware that enforces rate limits and sets response headers."""
limiter: SlidingWindowRateLimiter = request.app.state.rate_limiter
# Determine identifier: authenticated user > API key > IP
identifier = getattr(request.state, "user_id", None)
if not identifier:
identifier = request.headers.get("X-API-Key", request.client.host)
# Determine rate limit config based on endpoint
endpoint_key = "default"
if request.url.path.startswith("/api/v1/auth"):
endpoint_key = "auth_endpoints"
elif request.url.path.startswith("/api/v1/search"):
endpoint_key = "search"
elif request.url.path.startswith("/api/v1/export"):
endpoint_key = "export"
config = RATE_LIMITS[endpoint_key]
result = await limiter.check(identifier, endpoint_key, config)
if not result.allowed:
raise HTTPException(
status_code=status.HTTP_429_TOO_MANY_REQUESTS,
detail="Rate limit exceeded",
headers={
"X-RateLimit-Limit": str(result.limit),
"X-RateLimit-Remaining": "0",
"X-RateLimit-Reset": str(int(result.reset_at)),
"Retry-After": str(result.retry_after),
},
)
response = await call_next(request)
response.headers["X-RateLimit-Limit"] = str(result.limit)
response.headers["X-RateLimit-Remaining"] = str(result.remaining)
response.headers["X-RateLimit-Reset"] = str(int(result.reset_at))
return response