Auto-invoked when writing auth code, handling user input, configuring CORS/CSP, managing secrets, or implementing any security-related feature. Enforces OWASP Top 10 protections and Alpha AI security standards.
From cortexnpx claudepluginhub anthropics/claude-plugins-community --plugin cortexThis skill uses the workspace's default tool permissions.
Designs and optimizes AI agent action spaces, tool definitions, observation formats, error recovery, and context for higher task completion rates.
Reorganizes X and LinkedIn networks: review-first pruning of low-value follows, priority-based add/follow recommendations, and drafts warm outreach in user's voice.
Compares coding agents like Claude Code and Aider on custom YAML-defined codebase tasks using git worktrees, measuring pass rate, cost, time, and consistency.
This skill enforces comprehensive security standards across all Alpha AI projects. Every code change touching authentication, authorization, user input, secrets, HTTP headers, or any security-sensitive surface MUST comply with these rules.
Secure flag on all auth cookies (HTTPS only)SameSite=Lax or SameSite=Strict on auth cookies# REQUIRED: Refresh token rotation implementation
async def rotate_refresh_token(old_refresh_token: str) -> TokenPair:
"""
1. Validate the old refresh token
2. Check if it has been used before (replay detection)
3. Invalidate the old refresh token immediately
4. Issue a new access token + new refresh token
5. Store the new refresh token hash in Redis
"""
payload = verify_refresh_token(old_refresh_token)
# Replay detection: if token already used, revoke ALL user tokens
if await redis.sismember(f"used_refresh_tokens:{payload['sub']}", old_refresh_token):
await revoke_all_user_tokens(payload["sub"])
raise SecurityException("Refresh token replay detected")
# Mark old token as used
await redis.sadd(f"used_refresh_tokens:{payload['sub']}", old_refresh_token)
await redis.expire(f"used_refresh_tokens:{payload['sub']}", 7 * 86400)
# Issue new pair
new_access = create_access_token(subject=payload["sub"], expires_minutes=30)
new_refresh = create_refresh_token(subject=payload["sub"], expires_days=7)
return TokenPair(access_token=new_access, refresh_token=new_refresh)
X-CSRF-Token)# REQUIRED: CSRF middleware
from fastapi import Request, HTTPException
SAFE_METHODS = {"GET", "HEAD", "OPTIONS"}
@app.middleware("http")
async def csrf_protection(request: Request, call_next):
if request.method not in SAFE_METHODS:
cookie_token = request.cookies.get("csrf_token")
header_token = request.headers.get("X-CSRF-Token")
if not cookie_token or cookie_token != header_token:
raise HTTPException(status_code=403, detail="CSRF validation failed")
return await call_next(request)
# REQUIRED: Password hashing
from passlib.context import CryptContext
pwd_context = CryptContext(
schemes=["bcrypt"],
bcrypt__rounds=12, # Minimum cost factor
deprecated="auto",
)
def hash_password(password: str) -> str:
if len(password) < 10:
raise ValueError("Password must be at least 10 characters")
return pwd_context.hash(password)
def verify_password(plain: str, hashed: str) -> bool:
return pwd_context.verify(plain, hashed)
# REQUIRED: Rate limiter for auth endpoints
from fastapi_limiter import FastAPILimiter
from fastapi_limiter.depends import RateLimiter
@app.post("/auth/login", dependencies=[Depends(RateLimiter(times=5, seconds=60))])
async def login(credentials: LoginRequest):
...
# REQUIRED: Token blacklisting
async def logout(access_token: str, refresh_token: str):
# Decode to get expiry
payload = decode_token(access_token, verify_exp=False)
remaining_ttl = payload["exp"] - int(time.time())
if remaining_ttl > 0:
await redis.setex(f"blacklist:{access_token}", remaining_ttl, "1")
# Invalidate refresh token
await redis.delete(f"refresh:{refresh_token}")
# Check on every request
async def is_token_blacklisted(token: str) -> bool:
return await redis.exists(f"blacklist:{token}")
# REQUIRED: RBAC dependency
from fastapi import Depends, HTTPException, Security
def require_role(*roles: str):
async def role_checker(current_user: User = Depends(get_current_user)):
if current_user.role not in roles:
raise HTTPException(status_code=403, detail="Insufficient permissions")
return current_user
return role_checker
@app.delete("/admin/users/{user_id}", dependencies=[Depends(require_role("admin"))])
async def delete_user(user_id: int):
...
# REQUIRED: Pydantic validation pattern
from pydantic import BaseModel, Field, validator, EmailStr
import re
class UserCreate(BaseModel):
email: EmailStr
username: str = Field(..., min_length=3, max_length=50, pattern=r"^[a-zA-Z0-9_-]+$")
password: str = Field(..., min_length=10, max_length=128)
@validator("password")
def password_strength(cls, v):
if len(v) < 10:
raise ValueError("Password must be at least 10 characters")
return v
class Config:
str_strip_whitespace = True
str_max_length = 500 # Global max for any unspecified string
// REQUIRED: Zod validation pattern
import { z } from "zod";
const UserCreateSchema = z.object({
email: z.string().email("Invalid email address"),
username: z.string().min(3).max(50).regex(/^[a-zA-Z0-9_-]+$/),
password: z.string().min(10).max(128),
});
type UserCreate = z.infer<typeof UserCreateSchema>;
# WRONG: Trusting client-provided user_id
@app.put("/profile")
async def update_profile(data: ProfileUpdate):
user = await db.get(User, data.user_id) # BAD: client controls user_id
...
# CORRECT: Use authenticated user identity
@app.put("/profile")
async def update_profile(data: ProfileUpdate, current_user: User = Depends(get_current_user)):
user = await db.get(User, current_user.id) # GOOD: server-controlled identity
...
dangerouslySetInnerHTML unless sanitized with DOMPurify// REQUIRED: If raw HTML is unavoidable
import DOMPurify from "dompurify";
function SafeHTML({ html }: { html: string }) {
return <div dangerouslySetInnerHTML={{ __html: DOMPurify.sanitize(html) }} />;
}
# WRONG: SQL injection vulnerability
query = f"SELECT * FROM users WHERE email = '{email}'" # NEVER DO THIS
# CORRECT: Parameterized query
from sqlalchemy import text
result = await db.execute(text("SELECT * FROM users WHERE email = :email"), {"email": email})
# CORRECT: ORM method
user = await db.execute(select(User).where(User.email == email))
# REQUIRED: File upload validation
import magic
import uuid
ALLOWED_TYPES = {"image/jpeg", "image/png", "image/webp", "application/pdf"}
MAX_FILE_SIZE = 10 * 1024 * 1024 # 10MB
async def validate_upload(file: UploadFile) -> str:
content = await file.read()
if len(content) > MAX_FILE_SIZE:
raise HTTPException(413, "File too large")
mime = magic.from_buffer(content, mime=True)
if mime not in ALLOWED_TYPES:
raise HTTPException(415, f"Unsupported file type: {mime}")
safe_filename = f"{uuid.uuid4()}.{mime.split('/')[-1]}"
return safe_filename
# REQUIRED: URL validation for SSRF prevention
import ipaddress
from urllib.parse import urlparse
import socket
BLOCKED_RANGES = [
ipaddress.ip_network("10.0.0.0/8"),
ipaddress.ip_network("172.16.0.0/12"),
ipaddress.ip_network("192.168.0.0/16"),
ipaddress.ip_network("127.0.0.0/8"),
ipaddress.ip_network("169.254.0.0/16"),
ipaddress.ip_network("0.0.0.0/8"),
]
def validate_url(url: str) -> bool:
parsed = urlparse(url)
if parsed.scheme not in ("http", "https"):
return False
try:
ip = ipaddress.ip_address(socket.gethostbyname(parsed.hostname))
for blocked in BLOCKED_RANGES:
if ip in blocked:
return False
except (socket.gaierror, ValueError):
return False
return True
.env files for local development ONLY.env MUST be in .gitignore ALWAYS# REQUIRED: Settings management with Pydantic
from pydantic_settings import BaseSettings
from functools import lru_cache
class Settings(BaseSettings):
# Database
DATABASE_URL: str
DB_POOL_SIZE: int = 20
# Auth
JWT_SECRET_KEY: str
JWT_ALGORITHM: str = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES: int = 30
REFRESH_TOKEN_EXPIRE_DAYS: int = 7
# Redis
REDIS_URL: str = "redis://localhost:6379"
# CORS
ALLOWED_ORIGINS: list[str] = ["http://localhost:3000"]
# External APIs
OPENAI_API_KEY: str = ""
class Config:
env_file = ".env"
case_sensitive = True
@lru_cache()
def get_settings() -> Settings:
return Settings()
# REQUIRED: .pre-commit-config.yaml entry
repos:
- repo: https://github.com/Yelp/detect-secrets
rev: v1.4.0
hooks:
- id: detect-secrets
args: ['--baseline', '.secrets.baseline']
# REQUIRED: Security headers middleware for every FastAPI application
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from starlette.middleware.base import BaseHTTPMiddleware
app = FastAPI()
# CORS Configuration - NEVER use ["*"] in production
app.add_middleware(
CORSMiddleware,
allow_origins=settings.ALLOWED_ORIGINS, # Explicit origins only
allow_credentials=True,
allow_methods=["GET", "POST", "PUT", "DELETE"],
allow_headers=["Content-Type", "X-CSRF-Token", "Authorization"],
)
@app.middleware("http")
async def security_headers(request, call_next):
response = await call_next(request)
response.headers["X-Content-Type-Options"] = "nosniff"
response.headers["X-Frame-Options"] = "DENY"
response.headers["X-XSS-Protection"] = "1; mode=block"
response.headers["Strict-Transport-Security"] = "max-age=31536000; includeSubDomains"
response.headers["Content-Security-Policy"] = "default-src 'self'; script-src 'self'; style-src 'self' 'unsafe-inline'"
response.headers["Referrer-Policy"] = "strict-origin-when-cross-origin"
response.headers["Permissions-Policy"] = "camera=(), microphone=(), geolocation=()"
response.headers["Cache-Control"] = "no-store, no-cache, must-revalidate"
response.headers["Pragma"] = "no-cache"
return response
allow_origins=["*"] in productionallow_methods to only what is neededallow_headers to only what is neededpip-audit on every CI pipeline runnpm audit on every CI pipeline runpip install --require-hashes)# REQUIRED: .github/dependabot.yml
version: 2
updates:
- package-ecosystem: "pip"
directory: "/"
schedule:
interval: "weekly"
open-pull-requests-limit: 10
- package-ecosystem: "npm"
directory: "/frontend"
schedule:
interval: "weekly"
open-pull-requests-limit: 10
- package-ecosystem: "docker"
directory: "/"
schedule:
interval: "weekly"
read_only: true) where possible# REQUIRED: Non-root user pattern
FROM python:3.11-slim
RUN groupadd -r appuser && useradd -r -g appuser -d /app -s /sbin/nologin appuser
WORKDIR /app
COPY --chown=appuser:appuser . .
USER appuser
CMD ["uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "8000"]
# REQUIRED: Trivy scan in CI
- name: Scan Docker image
uses: aquasecurity/trivy-action@master
with:
image-ref: ${{ env.IMAGE_NAME }}
format: 'sarif'
severity: 'CRITICAL,HIGH'
exit-code: '1'
# REQUIRED: Structured security event logging
import structlog
logger = structlog.get_logger()
async def log_auth_event(event_type: str, user_id: str, ip: str, success: bool, **kwargs):
logger.info(
"auth_event",
event_type=event_type,
user_id=user_id,
ip_address=ip,
success=success,
**kwargs,
)
# Usage
await log_auth_event("login", user.id, request.client.host, success=True)
await log_auth_event("login", email, request.client.host, success=False, reason="invalid_password")
/api/v1/, /api/v2/)Sunset header on deprecated endpoints# REQUIRED: Secure error responses
from fastapi import HTTPException
# WRONG: Information leakage
raise HTTPException(400, "User with email john@example.com not found")
# CORRECT: Generic error
raise HTTPException(401, "Invalid credentials")
# REQUIRED: Request size limits
from starlette.middleware.base import BaseHTTPMiddleware
class RequestSizeLimitMiddleware(BaseHTTPMiddleware):
def __init__(self, app, max_body_size: int = 1_048_576): # 1MB
super().__init__(app)
self.max_body_size = max_body_size
async def dispatch(self, request, call_next):
content_length = request.headers.get("content-length")
if content_length and int(content_length) > self.max_body_size:
raise HTTPException(413, "Request body too large")
return await call_next(request)
app.add_middleware(RequestSizeLimitMiddleware, max_body_size=1_048_576)
Every PR touching security-sensitive code MUST verify: