From ripple-env
Distributed systems tools including NATS messaging, libp2p networking, and Temporal workflows
npx claudepluginhub flexnetos/ripple-envThis skill uses the workspace's default tool permissions.
This skill provides expertise in distributed systems infrastructure for multi-robot coordination, microservices communication, and workflow orchestration.
Searches, retrieves, and installs Agent Skills from prompts.chat registry using MCP tools like search_skills and get_skill. Activates for finding skills, browsing catalogs, or extending Claude.
Searches prompts.chat for AI prompt templates by keyword or category, retrieves by ID with variable handling, and improves prompts via AI. Use for discovering or enhancing prompts.
Checks Next.js compilation errors using a running Turbopack dev server after code edits. Fixes actionable issues before reporting complete. Replaces `next build`.
This skill provides expertise in distributed systems infrastructure for multi-robot coordination, microservices communication, and workflow orchestration.
NATS is a high-performance messaging system ideal for ROS2 multi-robot coordination.
# In flake.nix devshells or packages
{ pkgs, ... }:
{
packages = with pkgs; [
nats-server # NATS server
natscli # CLI tools
];
}
pixi add nats-py # Python client
pixi add nats-server # Server (if available)
# Start NATS server
nats-server
# With JetStream (persistence)
nats-server --jetstream
# With config file
nats-server -c /path/to/nats.conf
# nats.conf
port: 4222
http_port: 8222
jetstream {
store_dir: "/var/lib/nats/jetstream"
max_memory_store: 1G
max_file_store: 10G
}
# Cluster configuration
cluster {
name: "ros2-cluster"
port: 6222
routes: [
"nats://robot1:6222"
"nats://robot2:6222"
]
}
import asyncio
import nats
async def main():
# Connect to NATS
nc = await nats.connect("nats://localhost:4222")
# Publish message
await nc.publish("robot.status", b"online")
# Subscribe to topic
async def message_handler(msg):
print(f"Received: {msg.data.decode()}")
sub = await nc.subscribe("robot.*", cb=message_handler)
# Request-reply pattern
response = await nc.request("robot.ping", b"hello", timeout=1.0)
print(f"Response: {response.data.decode()}")
await nc.close()
asyncio.run(main())
import rclpy
from rclpy.node import Node
import nats
import asyncio
import json
class NATSBridgeNode(Node):
"""Bridge ROS2 topics to NATS subjects."""
def __init__(self):
super().__init__('nats_bridge')
self.nc = None
async def connect_nats(self):
self.nc = await nats.connect("nats://localhost:4222")
async def publish_to_nats(self, subject: str, data: dict):
if self.nc:
await self.nc.publish(subject, json.dumps(data).encode())
async def subscribe_from_nats(self, subject: str, callback):
if self.nc:
await self.nc.subscribe(subject, cb=callback)
import nats
from nats.js import JetStreamContext
async def setup_jetstream():
nc = await nats.connect()
js = nc.jetstream()
# Create stream
await js.add_stream(name="ROBOTS", subjects=["robot.*"])
# Publish with acknowledgment
ack = await js.publish("robot.telemetry", b"data")
print(f"Published: seq={ack.seq}")
# Consumer
sub = await js.pull_subscribe("robot.*", "telemetry-consumer")
msgs = await sub.fetch(10)
for msg in msgs:
await msg.ack()
# Server info
nats server info
# Publish message
nats pub robot.command "move forward"
# Subscribe
nats sub "robot.>"
# Request-reply
nats request robot.ping "hello"
# Stream management
nats stream add ROBOTS --subjects "robot.*"
nats stream info ROBOTS
nats consumer add ROBOTS telemetry
Temporal provides durable workflow execution for complex robotics tasks.
{ pkgs, ... }:
{
packages = with pkgs; [
temporal-cli # CLI tools
# temporalite # Local development server
];
}
# Start local server
temporalite start --namespace default
# Or with Docker
docker run -d --name temporal \
-p 7233:7233 -p 8233:8233 \
temporalio/auto-setup:latest
from temporalio import activity, workflow
from temporalio.client import Client
from temporalio.worker import Worker
from datetime import timedelta
@activity.defn
async def navigate_to_waypoint(waypoint: dict) -> bool:
"""Activity: Navigate robot to waypoint."""
# ROS2 navigation logic here
return True
@activity.defn
async def pick_object(object_id: str) -> bool:
"""Activity: Pick up an object."""
return True
@workflow.defn
class DeliveryWorkflow:
"""Durable workflow for robot delivery task."""
@workflow.run
async def run(self, delivery_request: dict) -> str:
# Navigate to pickup
await workflow.execute_activity(
navigate_to_waypoint,
delivery_request["pickup_location"],
start_to_close_timeout=timedelta(minutes=5)
)
# Pick up item
await workflow.execute_activity(
pick_object,
delivery_request["object_id"],
start_to_close_timeout=timedelta(minutes=2)
)
# Navigate to delivery
await workflow.execute_activity(
navigate_to_waypoint,
delivery_request["delivery_location"],
start_to_close_timeout=timedelta(minutes=5)
)
return "delivered"
async def main():
client = await Client.connect("localhost:7233")
# Start workflow
handle = await client.start_workflow(
DeliveryWorkflow.run,
{"pickup_location": {...}, "delivery_location": {...}},
id="delivery-001",
task_queue="robot-tasks"
)
result = await handle.result()
print(f"Delivery result: {result}")
┌─────────────┐ ┌─────────────┐ ┌─────────────┐
│ Robot 1 │ │ Robot 2 │ │ Robot 3 │
│ (ROS2) │ │ (ROS2) │ │ (ROS2) │
└──────┬──────┘ └──────┬──────┘ └──────┬──────┘
│ │ │
└───────────────────┼───────────────────┘
│
┌──────┴──────┐
│ NATS Cluster│
│ (JetStream)│
└──────┬──────┘
│
┌────────────┼────────────┐
│ │ │
┌──────┴──────┐ ┌──┴───┐ ┌────┴─────┐
│ Coordinator │ │ Fleet │ │ Telemetry│
│ Service │ │ Mgmt │ │ Collector│
└─────────────┘ └──────┘ └──────────┘
┌─────────────────────────────────────────────────────┐
│ NATS Subjects │
├───────────────┬─────────────────┬───────────────────┤
│ robot.status │ robot.telemetry │ robot.command │
│ task.created │ task.completed │ task.failed │
│ alert.warning │ alert.critical │ alert.resolved │
└───────────────┴─────────────────┴───────────────────┘
robot.{id}.{type})