From example-skills
Designs reliable webhook systems with proper delivery guarantees, retry logic, signature verification, and idempotent processing for event-driven integrations.
npx claudepluginhub organvm-iv-taxis/a-i--skills --plugin document-skillsThis skill uses the workspace's default tool permissions.
This skill provides guidance for designing and implementing robust webhook systems—both as providers (sending webhooks) and consumers (receiving webhooks).
Compares coding agents like Claude Code and Aider on custom YAML-defined codebase tasks using git worktrees, measuring pass rate, cost, time, and consistency.
Designs and optimizes AI agent action spaces, tool definitions, observation formats, error recovery, and context for higher task completion rates.
Designs, implements, and audits WCAG 2.2 AA accessible UIs for Web (ARIA/HTML5), iOS (SwiftUI traits), and Android (Compose semantics). Audits code for compliance gaps.
This skill provides guidance for designing and implementing robust webhook systems—both as providers (sending webhooks) and consumers (receiving webhooks).
Polling (inefficient): Webhooks (efficient):
┌────────┐ ┌────────┐ ┌────────┐ ┌────────┐
│ Client │ │ Server │ │ Client │ │ Server │
└───┬────┘ └───┬────┘ └───┬────┘ └───┬────┘
│ Any news? │ │ │
│─────────────▶│ │ │
│ No │ │ Event! │
│◀─────────────│ │◀─────────────│
│ Any news? │ │ POST /hook │
│─────────────▶│ │◀─────────────│
│ No │ │ 200 OK │
│◀─────────────│ │─────────────▶│
│ Any news? │
│─────────────▶│ Push-based notification
│ YES! │ instead of polling
│◀─────────────│
POST /webhooks/payment HTTP/1.1
Host: your-app.com
Content-Type: application/json
X-Webhook-Signature: sha256=abc123...
X-Webhook-ID: evt_12345
X-Webhook-Timestamp: 1706616000
{
"id": "evt_12345",
"type": "payment.completed",
"created_at": "2024-01-30T12:00:00Z",
"data": {
"payment_id": "pay_abc",
"amount": 1000,
"currency": "USD"
}
}
Key headers:
class WebhookEvent:
"""Standard webhook event structure"""
def __init__(self, event_type, data, idempotency_key=None):
self.id = self._generate_id()
self.type = event_type
self.created_at = datetime.utcnow().isoformat()
self.api_version = "2024-01-30"
self.data = data
self.idempotency_key = idempotency_key or self.id
def to_payload(self):
return {
"id": self.id,
"type": self.type,
"created_at": self.created_at,
"api_version": self.api_version,
"data": self.data
}
# Event types follow resource.action pattern
EVENT_TYPES = [
"payment.created",
"payment.completed",
"payment.failed",
"subscription.created",
"subscription.updated",
"subscription.cancelled",
"customer.created",
"customer.deleted"
]
import hmac
import hashlib
import time
class WebhookSigner:
"""Sign webhook payloads for verification"""
def __init__(self, secret): # allow-secret
self.secret = secret.encode() # allow-secret
def sign(self, payload, timestamp=None):
"""Generate HMAC signature"""
timestamp = timestamp or int(time.time())
payload_str = json.dumps(payload, separators=(',', ':'))
# Include timestamp to prevent replay attacks
signed_payload = f"{timestamp}.{payload_str}"
signature = hmac.new(
self.secret,
signed_payload.encode(),
hashlib.sha256
).hexdigest()
return {
'signature': f"sha256={signature}",
'timestamp': timestamp
}
def create_headers(self, payload):
"""Generate all webhook headers"""
sign_data = self.sign(payload)
return {
'Content-Type': 'application/json',
'X-Webhook-Signature': sign_data['signature'],
'X-Webhook-Timestamp': str(sign_data['timestamp']),
'X-Webhook-ID': payload['id']
}
import asyncio
from datetime import datetime, timedelta
class WebhookDeliverySystem:
"""Reliable webhook delivery with retries"""
RETRY_SCHEDULE = [
timedelta(seconds=10),
timedelta(minutes=1),
timedelta(minutes=5),
timedelta(minutes=30),
timedelta(hours=1),
timedelta(hours=6),
timedelta(hours=24)
]
def __init__(self, signer, http_client):
self.signer = signer
self.http = http_client
self.delivery_log = []
async def deliver(self, endpoint, event, attempt=0):
"""Attempt webhook delivery"""
payload = event.to_payload()
headers = self.signer.create_headers(payload)
try:
response = await self.http.post(
endpoint.url,
json=payload,
headers=headers,
timeout=30
)
self._log_attempt(endpoint, event, attempt, response)
if response.status_code in (200, 201, 202, 204):
return {'status': 'delivered', 'attempts': attempt + 1}
# Non-success status - schedule retry
return await self._schedule_retry(endpoint, event, attempt)
except Exception as e:
self._log_attempt(endpoint, event, attempt, error=e)
return await self._schedule_retry(endpoint, event, attempt)
async def _schedule_retry(self, endpoint, event, attempt):
"""Schedule next retry or give up"""
if attempt >= len(self.RETRY_SCHEDULE):
self._move_to_dead_letter(endpoint, event)
return {'status': 'failed', 'attempts': attempt + 1}
delay = self.RETRY_SCHEDULE[attempt]
# In production: use job queue with delayed execution
await asyncio.sleep(delay.total_seconds())
return await self.deliver(endpoint, event, attempt + 1)
def _move_to_dead_letter(self, endpoint, event):
"""Store failed webhook for manual review"""
# Store in dead letter queue/table
pass
class WebhookEndpoint:
"""Subscriber endpoint configuration"""
def __init__(self, url, events, secret=None): # allow-secret
self.id = generate_id()
self.url = url
self.events = events # List of subscribed event types
self.secret = secret or generate_secret() # allow-secret
self.status = 'active'
self.created_at = datetime.utcnow()
# Health tracking
self.consecutive_failures = 0
self.last_success = None
self.last_failure = None
def should_receive(self, event_type):
"""Check if endpoint subscribes to this event"""
if '*' in self.events:
return True
return event_type in self.events
def record_success(self):
self.consecutive_failures = 0
self.last_success = datetime.utcnow()
if self.status == 'disabled':
self.status = 'active'
def record_failure(self):
self.consecutive_failures += 1
self.last_failure = datetime.utcnow()
# Auto-disable after too many failures
if self.consecutive_failures >= 10:
self.status = 'disabled'
class WebhookVerifier:
"""Verify incoming webhook signatures"""
TIMESTAMP_TOLERANCE = 300 # 5 minutes
def __init__(self, secret): # allow-secret
self.secret = secret.encode() # allow-secret
def verify(self, payload, signature, timestamp):
"""Verify webhook authenticity"""
# Check timestamp freshness
current_time = int(time.time())
if abs(current_time - int(timestamp)) > self.TIMESTAMP_TOLERANCE:
raise WebhookVerificationError("Timestamp too old")
# Compute expected signature
signed_payload = f"{timestamp}.{payload}"
expected = hmac.new(
self.secret,
signed_payload.encode(),
hashlib.sha256
).hexdigest()
expected_sig = f"sha256={expected}"
# Constant-time comparison to prevent timing attacks
if not hmac.compare_digest(signature, expected_sig):
raise WebhookVerificationError("Invalid signature")
return True
# Flask endpoint example
@app.route('/webhooks/provider', methods=['POST'])
def handle_webhook():
verifier = WebhookVerifier(WEBHOOK_SECRET)
try:
verifier.verify(
request.data.decode(),
request.headers.get('X-Webhook-Signature'),
request.headers.get('X-Webhook-Timestamp')
)
except WebhookVerificationError:
return 'Invalid signature', 401
event = request.json
process_webhook(event)
return 'OK', 200
class IdempotentWebhookProcessor:
"""Process webhooks exactly once"""
def __init__(self, storage):
self.storage = storage # Redis, database, etc.
self.lock_ttl = 300 # 5 minute lock
async def process(self, event):
"""Process webhook idempotently"""
event_id = event['id']
# Check if already processed
if await self.storage.exists(f"webhook:processed:{event_id}"):
return {'status': 'duplicate', 'event_id': event_id}
# Acquire lock to prevent concurrent processing
lock_key = f"webhook:lock:{event_id}"
if not await self.storage.set_nx(lock_key, "1", ex=self.lock_ttl):
return {'status': 'processing', 'event_id': event_id}
try:
# Process the event
result = await self._handle_event(event)
# Mark as processed (with long TTL for deduplication)
await self.storage.set(
f"webhook:processed:{event_id}",
json.dumps(result),
ex=86400 * 7 # Keep for 7 days
)
return {'status': 'processed', 'event_id': event_id}
finally:
await self.storage.delete(lock_key)
async def _handle_event(self, event):
"""Route event to appropriate handler"""
handlers = {
'payment.completed': self._handle_payment_completed,
'subscription.cancelled': self._handle_subscription_cancelled,
# ... more handlers
}
handler = handlers.get(event['type'])
if handler:
return await handler(event['data'])
return {'skipped': True, 'reason': 'unknown_event_type'}
class QueuedWebhookHandler:
"""Decouple receipt from processing"""
def __init__(self, queue):
self.queue = queue # Redis, SQS, RabbitMQ, etc.
async def receive(self, event):
"""Acknowledge receipt quickly, process async"""
# Validate immediately
self._validate_event_structure(event)
# Queue for processing
await self.queue.enqueue(
'webhook_processing',
event,
deduplication_id=event['id']
)
# Return 200 immediately (don't make sender wait)
return {'status': 'accepted'}
async def process_queue(self):
"""Worker that processes queued webhooks"""
while True:
event = await self.queue.dequeue('webhook_processing')
if event:
try:
await self._process_event(event)
await self.queue.ack(event)
except Exception as e:
await self.queue.nack(event, requeue=True)
logging.error(f"Webhook processing failed: {e}")
class SecretRotation:
"""Support rolling secret updates"""
def __init__(self):
self.current_secret = None
self.previous_secret = None
self.rotation_timestamp = None
def rotate(self, new_secret):
"""Rotate to new secret while supporting old"""
self.previous_secret = self.current_secret
self.current_secret = new_secret
self.rotation_timestamp = datetime.utcnow()
def get_verification_secrets(self):
"""Return secrets to try for verification"""
secrets = [self.current_secret]
# Accept previous secret for grace period
if self.previous_secret and self.rotation_timestamp:
grace_period = timedelta(hours=24)
if datetime.utcnow() - self.rotation_timestamp < grace_period:
secrets.append(self.previous_secret)
return secrets
ALLOWED_IPS = {
'stripe': ['3.18.12.63', '3.130.192.231', ...],
'github': ['192.30.252.0/22', '185.199.108.0/22', ...],
'twilio': ['54.172.60.0/23', '54.244.51.0/24', ...]
}
def verify_source_ip(request, provider):
"""Verify webhook comes from expected IP"""
client_ip = request.remote_addr
allowed = ALLOWED_IPS.get(provider, [])
for allowed_range in allowed:
if ip_in_range(client_ip, allowed_range):
return True
return False
| Code | Meaning | Provider Action |
|---|---|---|
| 200-204 | Success | Mark delivered |
| 400 | Bad request | Don't retry (your bug) |
| 401/403 | Auth failed | Disable endpoint |
| 404 | Not found | Disable endpoint |
| 429 | Rate limited | Retry with backoff |
| 500+ | Server error | Retry |
| Timeout | No response | Retry |
class DeadLetterQueue:
"""Store and manage failed webhooks"""
async def add(self, endpoint, event, failure_reason, attempts):
"""Add failed webhook to DLQ"""
await self.storage.add({
'id': generate_id(),
'endpoint_id': endpoint.id,
'endpoint_url': endpoint.url,
'event': event.to_payload(),
'failure_reason': str(failure_reason),
'attempts': attempts,
'added_at': datetime.utcnow().isoformat()
})
async def retry(self, dlq_item_id):
"""Manually retry a DLQ item"""
item = await self.storage.get(dlq_item_id)
endpoint = await self.get_endpoint(item['endpoint_id'])
event = WebhookEvent.from_payload(item['event'])
result = await self.delivery_system.deliver(endpoint, event)
if result['status'] == 'delivered':
await self.storage.remove(dlq_item_id)
return result
async def purge_old(self, days=30):
"""Clean up old DLQ items"""
cutoff = datetime.utcnow() - timedelta(days=days)
await self.storage.delete_before(cutoff)
references/webhook-security.md - Detailed security implementationreferences/provider-examples.md - Stripe, GitHub, Twilio patternsreferences/testing-webhooks.md - Local development and testing