Messaging and event-driven architecture. Activate when: (1) Working with NATS pub/sub, (2) Configuring Temporal workflows, (3) Implementing event sourcing, (4) Setting up message queues, or (5) Designing async communication patterns.
Implements NATS pub/sub, Temporal workflows, and event-driven patterns like event sourcing and CQRS.
npx claudepluginhub flexnetos/ripple-envThis skill inherits all available tools. When active, it can use any tool Claude has access to.
This skill covers messaging systems (NATS), workflow orchestration (Temporal), and event-driven architecture patterns.
| Concept | Description |
|---|---|
| Subject | Message address/topic (e.g., orders.created) |
| Publisher | Sends messages to subjects |
| Subscriber | Receives messages from subjects |
| Queue Group | Load-balanced message distribution |
| JetStream | Persistence and streaming layer |
| KV Store | Key-value storage built on JetStream |
# Connect
nats context add local --server nats://localhost:4222
nats context select local
# Pub/Sub
nats pub orders.created '{"id": 123}'
nats sub 'orders.>' # Wildcard subscription
# Request/Reply
nats reply 'service.ping' 'pong' # In terminal 1
nats request 'service.ping' '' # In terminal 2
# JetStream
nats stream add ORDERS --subjects "orders.>" --retention limits
nats stream info ORDERS
nats consumer add ORDERS processor --pull --ack explicit
# KV Store
nats kv add CONFIG
nats kv put CONFIG app.setting "value"
nats kv get CONFIG app.setting
nats kv watch CONFIG
# nats.conf
port: 4222
jetstream {
store_dir: /data/jetstream
max_mem: 1G
max_file: 10G
}
cluster {
name: my-cluster
port: 6222
routes: [
nats-route://nats-1:6222
nats-route://nats-2:6222
]
}
use async_nats;
#[tokio::main]
async fn main() -> Result<(), async_nats::Error> {
let client = async_nats::connect("nats://localhost:4222").await?;
// Publish
client.publish("events.user.created", "user data".into()).await?;
// Subscribe
let mut subscriber = client.subscribe("events.>").await?;
while let Some(message) = subscriber.next().await {
println!("Received: {:?}", message);
}
Ok(())
}
import asyncio
import nats
async def main():
nc = await nats.connect("nats://localhost:4222")
# Subscribe
async def message_handler(msg):
print(f"Received: {msg.subject}: {msg.data.decode()}")
await nc.subscribe("events.>", cb=message_handler)
# Publish
await nc.publish("events.user.created", b'{"user_id": 123}')
# Request/Reply
response = await nc.request("service.ping", b'', timeout=1)
print(f"Response: {response.data.decode()}")
asyncio.run(main())
| Concept | Description |
|---|---|
| Workflow | Durable, long-running business process |
| Activity | A single unit of work (can fail/retry) |
| Worker | Executes workflows and activities |
| Task Queue | Routes work to workers |
| Signal | External event sent to running workflow |
| Query | Read workflow state without affecting it |
from temporalio import workflow, activity
from datetime import timedelta
@activity.defn
async def send_email(to: str, subject: str) -> str:
# Actual email sending logic
return f"Email sent to {to}"
@activity.defn
async def process_payment(order_id: str, amount: float) -> bool:
# Payment processing logic
return True
@workflow.defn
class OrderWorkflow:
@workflow.run
async def run(self, order_id: str) -> str:
# Activities with retry policy
payment_result = await workflow.execute_activity(
process_payment,
args=[order_id, 99.99],
start_to_close_timeout=timedelta(seconds=30),
retry_policy=RetryPolicy(maximum_attempts=3)
)
if payment_result:
await workflow.execute_activity(
send_email,
args=["customer@example.com", "Order Confirmed"],
start_to_close_timeout=timedelta(seconds=10)
)
return f"Order {order_id} completed"
from temporalio.client import Client
from temporalio.worker import Worker
async def main():
client = await Client.connect("localhost:7233")
worker = Worker(
client,
task_queue="order-queue",
workflows=[OrderWorkflow],
activities=[send_email, process_payment]
)
await worker.run()
asyncio.run(main())
async def start_order():
client = await Client.connect("localhost:7233")
result = await client.execute_workflow(
OrderWorkflow.run,
"order-123",
id="order-workflow-123",
task_queue="order-queue"
)
print(f"Result: {result}")
# Start workflow
temporal workflow start \
--task-queue order-queue \
--type OrderWorkflow \
--input '"order-123"'
# List workflows
temporal workflow list
# Describe workflow
temporal workflow describe --workflow-id order-workflow-123
# Signal workflow
temporal workflow signal \
--workflow-id order-workflow-123 \
--name cancel \
--input '"reason"'
# Query workflow
temporal workflow query \
--workflow-id order-workflow-123 \
--name status
from dataclasses import dataclass
from typing import List
from datetime import datetime
@dataclass
class Event:
id: str
timestamp: datetime
type: str
data: dict
class OrderAggregate:
def __init__(self, order_id: str):
self.id = order_id
self.status = "pending"
self.items = []
self.events: List[Event] = []
def apply(self, event: Event):
if event.type == "OrderCreated":
self.status = "created"
self.items = event.data["items"]
elif event.type == "OrderPaid":
self.status = "paid"
elif event.type == "OrderShipped":
self.status = "shipped"
def add_item(self, item: dict):
event = Event(
id=str(uuid4()),
timestamp=datetime.utcnow(),
type="ItemAdded",
data={"item": item}
)
self.events.append(event)
self.apply(event)
# Command Handler
class CreateOrderCommand:
def __init__(self, customer_id: str, items: list):
self.customer_id = customer_id
self.items = items
async def handle_create_order(cmd: CreateOrderCommand):
order = Order.create(cmd.customer_id, cmd.items)
await event_store.append(order.id, order.events)
await nats.publish("orders.created", order.to_json())
# Query Handler (separate read model)
async def get_order_summary(order_id: str) -> dict:
return await read_db.query(
"SELECT * FROM order_summaries WHERE id = $1",
order_id
)
@workflow.defn
class OrderSaga:
@workflow.run
async def run(self, order: dict) -> str:
try:
# Step 1: Reserve inventory
reservation = await workflow.execute_activity(
reserve_inventory, args=[order["items"]]
)
# Step 2: Process payment
payment = await workflow.execute_activity(
process_payment, args=[order["total"]]
)
# Step 3: Ship order
shipping = await workflow.execute_activity(
create_shipment, args=[order["address"]]
)
return "Order completed"
except Exception as e:
# Compensating transactions
if reservation:
await workflow.execute_activity(release_inventory)
if payment:
await workflow.execute_activity(refund_payment)
raise
Activates when the user asks about AI prompts, needs prompt templates, wants to search for prompts, or mentions prompts.chat. Use for discovering, retrieving, and improving prompts.
Search, retrieve, and install Agent Skills from the prompts.chat registry using MCP tools. Use when the user asks to find skills, browse skill catalogs, install a skill for Claude, or extend Claude's capabilities with reusable AI agent components.
This skill should be used when the user asks to "create an agent", "add an agent", "write a subagent", "agent frontmatter", "when to use description", "agent examples", "agent tools", "agent colors", "autonomous agent", or needs guidance on agent structure, system prompts, triggering conditions, or agent development best practices for Claude Code plugins.