Production-grade event-driven architecture skill for Kafka, RabbitMQ, event sourcing, CQRS, and message patterns
Designs production-grade event-driven systems with Kafka/RabbitMQ. Triggers when you need pub/sub, work queues, event sourcing, or CQRS patterns with specific throughput/latency requirements.
/plugin marketplace add pluginagentmarketplace/custom-plugin-system-design/plugin install custom-plugin-system-design@pluginagentmarketplace-system-designThis skill inherits all available tools. When active, it can use any tool Claude has access to.
assets/config.yamlassets/schema.jsonreferences/GUIDE.mdreferences/PATTERNS.mdscripts/validate.pyPurpose: Atomic skill for event-driven architecture with comprehensive messaging patterns and delivery guarantees.
| Attribute | Value |
|---|---|
| Scope | Kafka, RabbitMQ, Event Sourcing, CQRS |
| Responsibility | Single: Async messaging and event patterns |
| Invocation | Skill("event-driven") |
parameters:
event_context:
type: object
required: true
properties:
use_case:
type: string
enum: [pub_sub, work_queue, event_sourcing, cqrs, saga]
required: true
requirements:
type: object
required: true
properties:
throughput:
type: string
pattern: "^\\d+[KM]?\\s*msg/s$"
latency:
type: string
pattern: "^\\d+\\s*(ms|s)$"
ordering:
type: string
enum: [none, partition, global]
delivery:
type: string
enum: [at_most_once, at_least_once, exactly_once]
retention:
type: string
pattern: "^\\d+\\s*(hours?|days?|weeks?)$"
producers:
type: array
items: { type: string }
minItems: 1
consumers:
type: array
items: { type: string }
minItems: 1
validation_rules:
- name: "exactly_once_complexity"
rule: "delivery == 'exactly_once' implies warn_complexity"
warning: "Exactly-once requires idempotent consumers"
- name: "global_ordering_throughput"
rule: "ordering == 'global' implies throughput <= '10K msg/s'"
warning: "Global ordering limits throughput to single partition"
output:
type: object
properties:
broker:
type: object
properties:
technology: { type: string }
rationale: { type: string }
topology:
type: object
properties:
topics: { type: array }
partitions: { type: integer }
replication_factor: { type: integer }
message_schema:
type: object
properties:
format: { type: string }
schema: { type: object }
versioning: { type: string }
consumer_config:
type: object
properties:
group_strategy: { type: string }
error_handling: { type: string }
dlq: { type: object }
Kafka:
├── Throughput: 1M+ msg/s
├── Latency: ~5ms
├── Ordering: Per-partition
├── Retention: Configurable (days/forever)
├── Replay: ✅ Full log replay
├── Use: Event sourcing, streaming
└── Complexity: High
RabbitMQ:
├── Throughput: 50K msg/s
├── Latency: ~1ms
├── Ordering: Per-queue
├── Retention: Until acknowledged
├── Replay: ❌ No native replay
├── Use: Work queues, routing
└── Complexity: Medium
AWS SQS:
├── Throughput: Unlimited (managed)
├── Latency: ~50ms
├── Ordering: FIFO optional
├── Retention: 14 days max
├── Replay: ❌ No replay
├── Use: Serverless, simple queues
└── Complexity: Low
Pulsar:
├── Throughput: 1M+ msg/s
├── Latency: ~5ms
├── Ordering: Per-partition
├── Retention: Tiered storage
├── Replay: ✅ Full replay
├── Use: Multi-tenancy
└── Complexity: High
Pub/Sub:
├── Many consumers per message
├── Topic-based routing
├── Decoupled producers/consumers
└── Use: Notifications, events
Work Queue:
├── One consumer per message
├── Load balancing
├── Acknowledgments
└── Use: Background jobs
Request/Reply:
├── Synchronous over async
├── Correlation IDs
├── Timeout handling
└── Use: RPC-like patterns
Dead Letter Queue:
├── Failed message storage
├── Retry mechanism
├── Manual review
└── Use: Error handling
Event Store:
├── Append-only log
├── Events are immutable
├── State = replay(events)
└── Snapshots for performance
Event Schema:
{
"event_id": "uuid-v4",
"event_type": "OrderPlaced",
"aggregate_id": "order-123",
"aggregate_version": 5,
"timestamp": "2025-01-01T00:00:00Z",
"payload": { ... },
"metadata": {
"correlation_id": "uuid",
"causation_id": "uuid",
"user_id": "user-456"
}
}
Projections:
├── Materialize events to read models
├── Async for eventual consistency
├── Rebuild by replaying events
└── Catch-up from position
Command Side:
├── Validate command
├── Load aggregate state
├── Apply business rules
├── Emit events
└── Persist to event store
Query Side:
├── Subscribe to events
├── Update projections
├── Serve optimized queries
└── Different DBs per query type
Benefits:
├── Independent scaling
├── Optimized read models
├── Full audit trail
└── Time travel (replay)
Trade-offs:
├── Eventual consistency
├── Increased complexity
├── Event versioning
└── Debugging challenges
retry_config:
message_processing:
max_attempts: 5
initial_delay_ms: 1000
max_delay_ms: 60000
multiplier: 2.0
jitter_factor: 0.2
dead_letter:
enabled: true
topic_suffix: ".dlq"
retention_days: 7
alert_threshold: 100
error_classification:
retryable:
- TIMEOUT
- SERVICE_UNAVAILABLE
- RATE_LIMITED
- DATABASE_DEADLOCK
non_retryable:
- VALIDATION_ERROR
- DUPLICATE_MESSAGE
- SCHEMA_MISMATCH
circuit_breaker:
failure_threshold: 10
reset_timeout_seconds: 60
half_open_attempts: 1
log_schema:
level: { type: string }
timestamp: { type: string, format: ISO8601 }
skill: { type: string, value: "event-driven" }
event:
type: string
enum:
- message_produced
- message_consumed
- message_acked
- message_nacked
- message_dlq
- consumer_lag
- rebalance
context:
type: object
properties:
topic: { type: string }
partition: { type: integer }
offset: { type: integer }
consumer_group: { type: string }
latency_ms: { type: number }
example:
level: INFO
event: message_consumed
context:
topic: orders.placed
partition: 3
offset: 12345
consumer_group: order-service
latency_ms: 45
metrics:
- name: messages_produced_total
type: counter
labels: [topic, partition]
- name: messages_consumed_total
type: counter
labels: [topic, consumer_group, status]
- name: consumer_lag
type: gauge
labels: [topic, consumer_group, partition]
- name: message_processing_duration_seconds
type: histogram
labels: [topic, consumer_group]
buckets: [0.001, 0.01, 0.1, 1, 10]
- name: dlq_messages_total
type: counter
labels: [topic, error_type]
| Issue | Cause | Resolution |
|---|---|---|
| Consumer lag | Slow processing | Scale, optimize |
| Message loss | Acks misconfigured | Enable confirms |
| Duplicates | At-least-once | Idempotent consumers |
| Ordering issues | Wrong partition key | Fix key selection |
| DLQ overflow | Processing bugs | Fix root cause |
| Rebalance storms | Unstable consumers | Increase session timeout |
□ Producer acks configured?
□ Consumer offsets committed?
□ Replication factor >= 3?
□ Consumer lag monitored?
□ DLQ alerts configured?
□ Idempotency implemented?
□ Schema registry in use?
# test_event_driven.py
def test_valid_event_context():
params = {
"event_context": {
"use_case": "pub_sub",
"requirements": {
"throughput": "10K msg/s",
"latency": "100ms",
"ordering": "partition",
"delivery": "at_least_once",
"retention": "7 days"
},
"producers": ["order-service"],
"consumers": ["notification-service", "analytics-service"]
}
}
result = validate_parameters(params)
assert result.valid == True
def test_global_ordering_throughput_warning():
params = {
"event_context": {
"requirements": {
"throughput": "100K msg/s",
"ordering": "global" # Incompatible
}
}
}
result = validate_parameters(params)
assert len(result.warnings) > 0
assert "throughput" in result.warnings[0]
def test_partition_count_calculation():
result = calculate_partitions(
throughput_per_second=10000,
max_throughput_per_partition=1000,
replication_factor=3
)
assert result.partitions >= 10
assert result.partitions % 2 == 0 # Even for balancing
def test_idempotent_consumer():
consumer = IdempotentConsumer(dedup_store=MockRedis())
message = {"id": "msg-123", "data": "test"}
# First processing
result1 = consumer.process(message)
assert result1.processed == True
assert result1.duplicate == False
# Duplicate processing
result2 = consumer.process(message)
assert result2.processed == False
assert result2.duplicate == True
def test_exactly_once_with_transaction():
producer = TransactionalProducer()
with producer.transaction():
producer.send("topic-a", {"key": "value"})
producer.send("topic-b", {"key": "value"})
# Both messages committed atomically
assert producer.committed_count == 2
def test_dlq_routing():
consumer = ConsumerWithDLQ(
dlq_topic="orders.dlq",
max_retries=3
)
# Simulate processing failure
message = {"id": "msg-123", "data": "invalid"}
for _ in range(3):
result = consumer.process(message)
assert result.success == False
# After 3 failures, should be in DLQ
assert consumer.dlq_messages[-1]["id"] == "msg-123"
| Version | Date | Changes |
|---|---|---|
| 2.0.0 | 2025-01 | Production-grade rewrite with CQRS patterns |
| 1.0.0 | 2024-12 | Initial release |
This skill should be used when the user asks to "create a slash command", "add a command", "write a custom command", "define command arguments", "use command frontmatter", "organize commands", "create command with file references", "interactive command", "use AskUserQuestion in command", or needs guidance on slash command structure, YAML frontmatter fields, dynamic arguments, bash execution in commands, user interaction patterns, or command development best practices for Claude Code.
This skill should be used when the user asks to "create an agent", "add an agent", "write a subagent", "agent frontmatter", "when to use description", "agent examples", "agent tools", "agent colors", "autonomous agent", or needs guidance on agent structure, system prompts, triggering conditions, or agent development best practices for Claude Code plugins.
This skill should be used when the user asks to "create a hook", "add a PreToolUse/PostToolUse/Stop hook", "validate tool use", "implement prompt-based hooks", "use ${CLAUDE_PLUGIN_ROOT}", "set up event-driven automation", "block dangerous commands", or mentions hook events (PreToolUse, PostToolUse, Stop, SubagentStop, SessionStart, SessionEnd, UserPromptSubmit, PreCompact, Notification). Provides comprehensive guidance for creating and implementing Claude Code plugin hooks with focus on advanced prompt-based hooks API.