From ripple-env
Messaging and event-driven architecture. Activate when: (1) Working with NATS pub/sub, (2) Configuring Temporal workflows, (3) Implementing event sourcing, (4) Setting up message queues, or (5) Designing async communication patterns.
npx claudepluginhub flexnetos/ripple-envThis skill uses the workspace's default tool permissions.
This skill covers messaging systems (NATS), workflow orchestration (Temporal), and event-driven architecture patterns.
Searches, retrieves, and installs Agent Skills from prompts.chat registry using MCP tools like search_skills and get_skill. Activates for finding skills, browsing catalogs, or extending Claude.
Searches prompts.chat for AI prompt templates by keyword or category, retrieves by ID with variable handling, and improves prompts via AI. Use for discovering or enhancing prompts.
Checks Next.js compilation errors using a running Turbopack dev server after code edits. Fixes actionable issues before reporting complete. Replaces `next build`.
This skill covers messaging systems (NATS), workflow orchestration (Temporal), and event-driven architecture patterns.
| Concept | Description |
|---|---|
| Subject | Message address/topic (e.g., orders.created) |
| Publisher | Sends messages to subjects |
| Subscriber | Receives messages from subjects |
| Queue Group | Load-balanced message distribution |
| JetStream | Persistence and streaming layer |
| KV Store | Key-value storage built on JetStream |
# Connect
nats context add local --server nats://localhost:4222
nats context select local
# Pub/Sub
nats pub orders.created '{"id": 123}'
nats sub 'orders.>' # Wildcard subscription
# Request/Reply
nats reply 'service.ping' 'pong' # In terminal 1
nats request 'service.ping' '' # In terminal 2
# JetStream
nats stream add ORDERS --subjects "orders.>" --retention limits
nats stream info ORDERS
nats consumer add ORDERS processor --pull --ack explicit
# KV Store
nats kv add CONFIG
nats kv put CONFIG app.setting "value"
nats kv get CONFIG app.setting
nats kv watch CONFIG
# nats.conf
port: 4222
jetstream {
store_dir: /data/jetstream
max_mem: 1G
max_file: 10G
}
cluster {
name: my-cluster
port: 6222
routes: [
nats-route://nats-1:6222
nats-route://nats-2:6222
]
}
use async_nats;
#[tokio::main]
async fn main() -> Result<(), async_nats::Error> {
let client = async_nats::connect("nats://localhost:4222").await?;
// Publish
client.publish("events.user.created", "user data".into()).await?;
// Subscribe
let mut subscriber = client.subscribe("events.>").await?;
while let Some(message) = subscriber.next().await {
println!("Received: {:?}", message);
}
Ok(())
}
import asyncio
import nats
async def main():
nc = await nats.connect("nats://localhost:4222")
# Subscribe
async def message_handler(msg):
print(f"Received: {msg.subject}: {msg.data.decode()}")
await nc.subscribe("events.>", cb=message_handler)
# Publish
await nc.publish("events.user.created", b'{"user_id": 123}')
# Request/Reply
response = await nc.request("service.ping", b'', timeout=1)
print(f"Response: {response.data.decode()}")
asyncio.run(main())
| Concept | Description |
|---|---|
| Workflow | Durable, long-running business process |
| Activity | A single unit of work (can fail/retry) |
| Worker | Executes workflows and activities |
| Task Queue | Routes work to workers |
| Signal | External event sent to running workflow |
| Query | Read workflow state without affecting it |
from temporalio import workflow, activity
from datetime import timedelta
@activity.defn
async def send_email(to: str, subject: str) -> str:
# Actual email sending logic
return f"Email sent to {to}"
@activity.defn
async def process_payment(order_id: str, amount: float) -> bool:
# Payment processing logic
return True
@workflow.defn
class OrderWorkflow:
@workflow.run
async def run(self, order_id: str) -> str:
# Activities with retry policy
payment_result = await workflow.execute_activity(
process_payment,
args=[order_id, 99.99],
start_to_close_timeout=timedelta(seconds=30),
retry_policy=RetryPolicy(maximum_attempts=3)
)
if payment_result:
await workflow.execute_activity(
send_email,
args=["customer@example.com", "Order Confirmed"],
start_to_close_timeout=timedelta(seconds=10)
)
return f"Order {order_id} completed"
from temporalio.client import Client
from temporalio.worker import Worker
async def main():
client = await Client.connect("localhost:7233")
worker = Worker(
client,
task_queue="order-queue",
workflows=[OrderWorkflow],
activities=[send_email, process_payment]
)
await worker.run()
asyncio.run(main())
async def start_order():
client = await Client.connect("localhost:7233")
result = await client.execute_workflow(
OrderWorkflow.run,
"order-123",
id="order-workflow-123",
task_queue="order-queue"
)
print(f"Result: {result}")
# Start workflow
temporal workflow start \
--task-queue order-queue \
--type OrderWorkflow \
--input '"order-123"'
# List workflows
temporal workflow list
# Describe workflow
temporal workflow describe --workflow-id order-workflow-123
# Signal workflow
temporal workflow signal \
--workflow-id order-workflow-123 \
--name cancel \
--input '"reason"'
# Query workflow
temporal workflow query \
--workflow-id order-workflow-123 \
--name status
from dataclasses import dataclass
from typing import List
from datetime import datetime
@dataclass
class Event:
id: str
timestamp: datetime
type: str
data: dict
class OrderAggregate:
def __init__(self, order_id: str):
self.id = order_id
self.status = "pending"
self.items = []
self.events: List[Event] = []
def apply(self, event: Event):
if event.type == "OrderCreated":
self.status = "created"
self.items = event.data["items"]
elif event.type == "OrderPaid":
self.status = "paid"
elif event.type == "OrderShipped":
self.status = "shipped"
def add_item(self, item: dict):
event = Event(
id=str(uuid4()),
timestamp=datetime.utcnow(),
type="ItemAdded",
data={"item": item}
)
self.events.append(event)
self.apply(event)
# Command Handler
class CreateOrderCommand:
def __init__(self, customer_id: str, items: list):
self.customer_id = customer_id
self.items = items
async def handle_create_order(cmd: CreateOrderCommand):
order = Order.create(cmd.customer_id, cmd.items)
await event_store.append(order.id, order.events)
await nats.publish("orders.created", order.to_json())
# Query Handler (separate read model)
async def get_order_summary(order_id: str) -> dict:
return await read_db.query(
"SELECT * FROM order_summaries WHERE id = $1",
order_id
)
@workflow.defn
class OrderSaga:
@workflow.run
async def run(self, order: dict) -> str:
try:
# Step 1: Reserve inventory
reservation = await workflow.execute_activity(
reserve_inventory, args=[order["items"]]
)
# Step 2: Process payment
payment = await workflow.execute_activity(
process_payment, args=[order["total"]]
)
# Step 3: Ship order
shipping = await workflow.execute_activity(
create_shipment, args=[order["address"]]
)
return "Order completed"
except Exception as e:
# Compensating transactions
if reservation:
await workflow.execute_activity(release_inventory)
if payment:
await workflow.execute_activity(refund_payment)
raise