Distributed systems tools including NATS messaging, libp2p networking, and Temporal workflows
Integrates NATS messaging and Temporal workflows for distributed robotics coordination.
npx claudepluginhub flexnetos/ripple-envThis skill inherits all available tools. When active, it can use any tool Claude has access to.
This skill provides expertise in distributed systems infrastructure for multi-robot coordination, microservices communication, and workflow orchestration.
NATS is a high-performance messaging system ideal for ROS2 multi-robot coordination.
# In flake.nix devshells or packages
{ pkgs, ... }:
{
packages = with pkgs; [
nats-server # NATS server
natscli # CLI tools
];
}
pixi add nats-py # Python client
pixi add nats-server # Server (if available)
# Start NATS server
nats-server
# With JetStream (persistence)
nats-server --jetstream
# With config file
nats-server -c /path/to/nats.conf
# nats.conf
port: 4222
http_port: 8222
jetstream {
store_dir: "/var/lib/nats/jetstream"
max_memory_store: 1G
max_file_store: 10G
}
# Cluster configuration
cluster {
name: "ros2-cluster"
port: 6222
routes: [
"nats://robot1:6222"
"nats://robot2:6222"
]
}
import asyncio
import nats
async def main():
# Connect to NATS
nc = await nats.connect("nats://localhost:4222")
# Publish message
await nc.publish("robot.status", b"online")
# Subscribe to topic
async def message_handler(msg):
print(f"Received: {msg.data.decode()}")
sub = await nc.subscribe("robot.*", cb=message_handler)
# Request-reply pattern
response = await nc.request("robot.ping", b"hello", timeout=1.0)
print(f"Response: {response.data.decode()}")
await nc.close()
asyncio.run(main())
import rclpy
from rclpy.node import Node
import nats
import asyncio
import json
class NATSBridgeNode(Node):
"""Bridge ROS2 topics to NATS subjects."""
def __init__(self):
super().__init__('nats_bridge')
self.nc = None
async def connect_nats(self):
self.nc = await nats.connect("nats://localhost:4222")
async def publish_to_nats(self, subject: str, data: dict):
if self.nc:
await self.nc.publish(subject, json.dumps(data).encode())
async def subscribe_from_nats(self, subject: str, callback):
if self.nc:
await self.nc.subscribe(subject, cb=callback)
import nats
from nats.js import JetStreamContext
async def setup_jetstream():
nc = await nats.connect()
js = nc.jetstream()
# Create stream
await js.add_stream(name="ROBOTS", subjects=["robot.*"])
# Publish with acknowledgment
ack = await js.publish("robot.telemetry", b"data")
print(f"Published: seq={ack.seq}")
# Consumer
sub = await js.pull_subscribe("robot.*", "telemetry-consumer")
msgs = await sub.fetch(10)
for msg in msgs:
await msg.ack()
# Server info
nats server info
# Publish message
nats pub robot.command "move forward"
# Subscribe
nats sub "robot.>"
# Request-reply
nats request robot.ping "hello"
# Stream management
nats stream add ROBOTS --subjects "robot.*"
nats stream info ROBOTS
nats consumer add ROBOTS telemetry
Temporal provides durable workflow execution for complex robotics tasks.
{ pkgs, ... }:
{
packages = with pkgs; [
temporal-cli # CLI tools
# temporalite # Local development server
];
}
# Start local server
temporalite start --namespace default
# Or with Docker
docker run -d --name temporal \
-p 7233:7233 -p 8233:8233 \
temporalio/auto-setup:latest
from temporalio import activity, workflow
from temporalio.client import Client
from temporalio.worker import Worker
from datetime import timedelta
@activity.defn
async def navigate_to_waypoint(waypoint: dict) -> bool:
"""Activity: Navigate robot to waypoint."""
# ROS2 navigation logic here
return True
@activity.defn
async def pick_object(object_id: str) -> bool:
"""Activity: Pick up an object."""
return True
@workflow.defn
class DeliveryWorkflow:
"""Durable workflow for robot delivery task."""
@workflow.run
async def run(self, delivery_request: dict) -> str:
# Navigate to pickup
await workflow.execute_activity(
navigate_to_waypoint,
delivery_request["pickup_location"],
start_to_close_timeout=timedelta(minutes=5)
)
# Pick up item
await workflow.execute_activity(
pick_object,
delivery_request["object_id"],
start_to_close_timeout=timedelta(minutes=2)
)
# Navigate to delivery
await workflow.execute_activity(
navigate_to_waypoint,
delivery_request["delivery_location"],
start_to_close_timeout=timedelta(minutes=5)
)
return "delivered"
async def main():
client = await Client.connect("localhost:7233")
# Start workflow
handle = await client.start_workflow(
DeliveryWorkflow.run,
{"pickup_location": {...}, "delivery_location": {...}},
id="delivery-001",
task_queue="robot-tasks"
)
result = await handle.result()
print(f"Delivery result: {result}")
┌─────────────┐ ┌─────────────┐ ┌─────────────┐
│ Robot 1 │ │ Robot 2 │ │ Robot 3 │
│ (ROS2) │ │ (ROS2) │ │ (ROS2) │
└──────┬──────┘ └──────┬──────┘ └──────┬──────┘
│ │ │
└───────────────────┼───────────────────┘
│
┌──────┴──────┐
│ NATS Cluster│
│ (JetStream)│
└──────┬──────┘
│
┌────────────┼────────────┐
│ │ │
┌──────┴──────┐ ┌──┴───┐ ┌────┴─────┐
│ Coordinator │ │ Fleet │ │ Telemetry│
│ Service │ │ Mgmt │ │ Collector│
└─────────────┘ └──────┘ └──────────┘
┌─────────────────────────────────────────────────────┐
│ NATS Subjects │
├───────────────┬─────────────────┬───────────────────┤
│ robot.status │ robot.telemetry │ robot.command │
│ task.created │ task.completed │ task.failed │
│ alert.warning │ alert.critical │ alert.resolved │
└───────────────┴─────────────────┴───────────────────┘
robot.{id}.{type})Activates when the user asks about AI prompts, needs prompt templates, wants to search for prompts, or mentions prompts.chat. Use for discovering, retrieving, and improving prompts.
Search, retrieve, and install Agent Skills from the prompts.chat registry using MCP tools. Use when the user asks to find skills, browse skill catalogs, install a skill for Claude, or extend Claude's capabilities with reusable AI agent components.
This skill should be used when the user asks to "create an agent", "add an agent", "write a subagent", "agent frontmatter", "when to use description", "agent examples", "agent tools", "agent colors", "autonomous agent", or needs guidance on agent structure, system prompts, triggering conditions, or agent development best practices for Claude Code plugins.