This skill should be used when the user asks to "add Keycloak authentication", "implement OIDC", "configure SSO", "validate JWT token", "add role-based access", "protect API endpoint", or mentions Keycloak, OAuth2, OpenID Connect, identity provider, or authentication in FastAPI. Provides Keycloak/OIDC integration patterns.
/plugin marketplace add Lobbi-Docs/claude/plugin install fastapi-backend@claude-orchestrationThis skill inherits all available tools. When active, it can use any tool Claude has access to.
This skill provides patterns for integrating Keycloak as an identity provider with FastAPI applications using OIDC/OAuth2.
from pydantic_settings import BaseSettings
class KeycloakSettings(BaseSettings):
keycloak_url: str = "https://auth.example.com"
keycloak_realm: str = "my-realm"
keycloak_client_id: str = "my-api"
keycloak_client_secret: str = ""
@property
def openid_config_url(self) -> str:
return f"{self.keycloak_url}/realms/{self.keycloak_realm}/.well-known/openid-configuration"
@property
def jwks_url(self) -> str:
return f"{self.keycloak_url}/realms/{self.keycloak_realm}/protocol/openid-connect/certs"
@property
def token_url(self) -> str:
return f"{self.keycloak_url}/realms/{self.keycloak_realm}/protocol/openid-connect/token"
class Config:
env_file = ".env"
import httpx
from jose import jwt, JWTError
from jose.jwk import construct
from functools import lru_cache
from typing import Optional, Dict, Any
class KeycloakTokenValidator:
def __init__(self, settings: KeycloakSettings):
self.settings = settings
self._jwks: Optional[Dict] = None
async def get_jwks(self) -> Dict:
if self._jwks is None:
async with httpx.AsyncClient() as client:
response = await client.get(self.settings.jwks_url)
response.raise_for_status()
self._jwks = response.json()
return self._jwks
async def validate_token(self, token: str) -> Dict[str, Any]:
try:
jwks = await self.get_jwks()
# Get key ID from token header
unverified_header = jwt.get_unverified_header(token)
kid = unverified_header.get("kid")
# Find matching key
key = None
for jwk in jwks.get("keys", []):
if jwk.get("kid") == kid:
key = jwk
break
if not key:
raise JWTError("Key not found")
# Decode and validate
payload = jwt.decode(
token,
key,
algorithms=["RS256"],
audience=self.settings.keycloak_client_id,
issuer=f"{self.settings.keycloak_url}/realms/{self.settings.keycloak_realm}"
)
return payload
except JWTError as e:
raise ValueError(f"Invalid token: {str(e)}")
from fastapi import Depends, HTTPException, status
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from typing import Optional
security = HTTPBearer(auto_error=False)
class TokenUser:
def __init__(self, payload: Dict[str, Any]):
self.sub: str = payload.get("sub", "")
self.email: str = payload.get("email", "")
self.name: str = payload.get("name", "")
self.preferred_username: str = payload.get("preferred_username", "")
self.roles: list = self._extract_roles(payload)
self.raw_payload = payload
def _extract_roles(self, payload: Dict) -> list:
# Realm roles
realm_roles = payload.get("realm_access", {}).get("roles", [])
# Client roles
resource_access = payload.get("resource_access", {})
client_roles = resource_access.get(
settings.keycloak_client_id, {}
).get("roles", [])
return list(set(realm_roles + client_roles))
async def get_token_validator() -> KeycloakTokenValidator:
return KeycloakTokenValidator(get_settings())
async def get_current_user(
credentials: Optional[HTTPAuthorizationCredentials] = Depends(security),
validator: KeycloakTokenValidator = Depends(get_token_validator)
) -> TokenUser:
if not credentials:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Not authenticated",
headers={"WWW-Authenticate": "Bearer"}
)
try:
payload = await validator.validate_token(credentials.credentials)
return TokenUser(payload)
except ValueError as e:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail=str(e),
headers={"WWW-Authenticate": "Bearer"}
)
async def get_current_user_optional(
credentials: Optional[HTTPAuthorizationCredentials] = Depends(security),
validator: KeycloakTokenValidator = Depends(get_token_validator)
) -> Optional[TokenUser]:
if not credentials:
return None
try:
payload = await validator.validate_token(credentials.credentials)
return TokenUser(payload)
except ValueError:
return None
from functools import wraps
from typing import List
def require_roles(*required_roles: str):
"""Dependency that checks for required roles."""
async def role_checker(
user: TokenUser = Depends(get_current_user)
) -> TokenUser:
if not any(role in user.roles for role in required_roles):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail=f"Required roles: {', '.join(required_roles)}"
)
return user
return role_checker
def require_all_roles(*required_roles: str):
"""Dependency that checks user has ALL required roles."""
async def role_checker(
user: TokenUser = Depends(get_current_user)
) -> TokenUser:
missing = [r for r in required_roles if r not in user.roles]
if missing:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail=f"Missing roles: {', '.join(missing)}"
)
return user
return role_checker
# Usage in routes
@router.get("/admin/users")
async def list_users(user: TokenUser = Depends(require_roles("admin", "user-manager"))):
"""Only admins or user-managers can access."""
return {"users": []}
@router.delete("/admin/system")
async def system_action(user: TokenUser = Depends(require_all_roles("admin", "super-admin"))):
"""Requires BOTH admin AND super-admin roles."""
return {"status": "ok"}
from fastapi import APIRouter, Depends
router = APIRouter(prefix="/api/v1", tags=["Protected"])
@router.get("/profile")
async def get_profile(user: TokenUser = Depends(get_current_user)):
"""Get current user's profile."""
return {
"sub": user.sub,
"email": user.email,
"name": user.name,
"roles": user.roles
}
@router.get("/public")
async def public_endpoint():
"""Public endpoint - no auth required."""
return {"message": "Public data"}
@router.get("/optional-auth")
async def optional_auth(user: Optional[TokenUser] = Depends(get_current_user_optional)):
"""Returns different data based on auth status."""
if user:
return {"message": f"Hello, {user.name}!", "authenticated": True}
return {"message": "Hello, guest!", "authenticated": False}
import httpx
from typing import Tuple
class KeycloakAuthService:
def __init__(self, settings: KeycloakSettings):
self.settings = settings
async def refresh_token(self, refresh_token: str) -> Tuple[str, str]:
"""Exchange refresh token for new access token."""
async with httpx.AsyncClient() as client:
response = await client.post(
self.settings.token_url,
data={
"grant_type": "refresh_token",
"client_id": self.settings.keycloak_client_id,
"client_secret": self.settings.keycloak_client_secret,
"refresh_token": refresh_token
}
)
if response.status_code != 200:
raise ValueError("Failed to refresh token")
data = response.json()
return data["access_token"], data["refresh_token"]
async def exchange_code(self, code: str, redirect_uri: str) -> dict:
"""Exchange authorization code for tokens."""
async with httpx.AsyncClient() as client:
response = await client.post(
self.settings.token_url,
data={
"grant_type": "authorization_code",
"client_id": self.settings.keycloak_client_id,
"client_secret": self.settings.keycloak_client_secret,
"code": code,
"redirect_uri": redirect_uri
}
)
if response.status_code != 200:
raise ValueError("Failed to exchange code")
return response.json()
from starlette.middleware.base import BaseHTTPMiddleware
from starlette.requests import Request
from starlette.responses import Response
class TokenRefreshMiddleware(BaseHTTPMiddleware):
async def dispatch(self, request: Request, call_next):
response = await call_next(request)
# Check if token is about to expire (from custom header)
token_exp = request.state.get("token_exp")
if token_exp and token_exp - time.time() < 300: # 5 min
# Token expires soon - add header to signal frontend
response.headers["X-Token-Expiring"] = "true"
return response
For detailed configuration and advanced patterns:
references/keycloak-setup.md - Keycloak realm/client configurationreferences/multi-tenant.md - Multi-tenant authentication patternsreferences/testing.md - Testing authenticated endpointsWorking examples in examples/:
examples/auth_dependencies.py - Complete auth dependenciesexamples/protected_router.py - Protected route examplesexamples/keycloak_service.py - Full Keycloak serviceThis skill should be used when the user asks to "create a slash command", "add a command", "write a custom command", "define command arguments", "use command frontmatter", "organize commands", "create command with file references", "interactive command", "use AskUserQuestion in command", or needs guidance on slash command structure, YAML frontmatter fields, dynamic arguments, bash execution in commands, user interaction patterns, or command development best practices for Claude Code.
This skill should be used when the user asks to "create an agent", "add an agent", "write a subagent", "agent frontmatter", "when to use description", "agent examples", "agent tools", "agent colors", "autonomous agent", or needs guidance on agent structure, system prompts, triggering conditions, or agent development best practices for Claude Code plugins.
This skill should be used when the user asks to "create a hook", "add a PreToolUse/PostToolUse/Stop hook", "validate tool use", "implement prompt-based hooks", "use ${CLAUDE_PLUGIN_ROOT}", "set up event-driven automation", "block dangerous commands", or mentions hook events (PreToolUse, PostToolUse, Stop, SubagentStop, SessionStart, SessionEnd, UserPromptSubmit, PreCompact, Notification). Provides comprehensive guidance for creating and implementing Claude Code plugin hooks with focus on advanced prompt-based hooks API.