Rust development tools including PyO3 bindings, sqlx database access, AGiXT SDK, and Rust-Python interop
Builds Rust-Python bindings with PyO3, type-safe database access with sqlx, and AGiXT SDK integration.
/plugin marketplace add FlexNetOS/ripple-env/plugin install flexnetos-ripple-env@FlexNetOS/ripple-envThis skill inherits all available tools. When active, it can use any tool Claude has access to.
This skill provides expertise in Rust development tools for building high-performance components, Python bindings, and type-safe database access.
PyO3 enables writing Python extensions in Rust for performance-critical ROS2 components.
# Install maturin (build tool for PyO3)
pixi add maturin
# Create new PyO3 project
maturin new my_rust_module --bindings pyo3
cd my_rust_module
[package]
name = "my_rust_module"
version = "0.1.0"
edition = "2021"
[lib]
name = "my_rust_module"
crate-type = ["cdylib"]
[dependencies]
pyo3 = { version = "0.20", features = ["extension-module"] }
numpy = "0.20" # For NumPy array support
use pyo3::prelude::*;
/// A fast point cloud processor
#[pyclass]
struct PointCloudProcessor {
#[pyo3(get, set)]
voxel_size: f64,
}
#[pymethods]
impl PointCloudProcessor {
#[new]
fn new(voxel_size: f64) -> Self {
PointCloudProcessor { voxel_size }
}
/// Downsample point cloud using voxel grid
fn downsample(&self, points: Vec<[f64; 3]>) -> Vec<[f64; 3]> {
// High-performance Rust implementation
points.into_iter()
.filter(|p| p[0] % self.voxel_size < 0.5)
.collect()
}
}
/// Process sensor data with SIMD optimization
#[pyfunction]
fn fast_transform(data: Vec<f64>, scale: f64) -> Vec<f64> {
data.iter().map(|x| x * scale).collect()
}
/// Python module
#[pymodule]
fn my_rust_module(_py: Python, m: &PyModule) -> PyResult<()> {
m.add_class::<PointCloudProcessor>()?;
m.add_function(wrap_pyfunction!(fast_transform, m)?)?;
Ok(())
}
use numpy::{PyArray1, PyArray2, PyReadonlyArray2};
use pyo3::prelude::*;
#[pyfunction]
fn process_lidar_scan<'py>(
py: Python<'py>,
points: PyReadonlyArray2<'py, f64>,
) -> &'py PyArray1<f64> {
let points = points.as_array();
let distances: Vec<f64> = points
.rows()
.into_iter()
.map(|row| (row[0].powi(2) + row[1].powi(2) + row[2].powi(2)).sqrt())
.collect();
PyArray1::from_vec(py, distances)
}
# Development build
maturin develop
# Release build
maturin build --release
# Install in current environment
pip install target/wheels/*.whl
# Build for multiple Python versions
maturin build --release --interpreter python3.10 python3.11
# In your ROS2 node
import my_rust_module
from sensor_msgs.msg import PointCloud2
import numpy as np
class FastLidarNode(Node):
def __init__(self):
super().__init__('fast_lidar')
self.processor = my_rust_module.PointCloudProcessor(0.1)
def process_callback(self, msg: PointCloud2):
# Convert to numpy array
points = np.array(list(pc2.read_points(msg)))
# Fast Rust processing
result = my_rust_module.process_lidar_scan(points)
sqlx provides compile-time checked SQL queries for robotics data persistence.
# Install sqlx CLI
cargo install sqlx-cli
# Or via Nix
nix profile install nixpkgs#sqlx-cli
[dependencies]
sqlx = { version = "0.7", features = [
"runtime-tokio",
"postgres",
"sqlite",
"chrono",
"uuid"
]}
tokio = { version = "1", features = ["full"] }
# Create database
sqlx database create
# Run migrations
sqlx migrate run
# Prepare offline mode (for CI)
cargo sqlx prepare
-- migrations/001_create_robots.sql
CREATE TABLE robots (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
name VARCHAR(255) NOT NULL,
status VARCHAR(50) NOT NULL DEFAULT 'idle',
position_x DOUBLE PRECISION,
position_y DOUBLE PRECISION,
battery_level DOUBLE PRECISION,
last_seen TIMESTAMPTZ DEFAULT NOW(),
created_at TIMESTAMPTZ DEFAULT NOW()
);
CREATE TABLE telemetry (
id BIGSERIAL PRIMARY KEY,
robot_id UUID REFERENCES robots(id),
metric_name VARCHAR(100) NOT NULL,
metric_value DOUBLE PRECISION NOT NULL,
timestamp TIMESTAMPTZ DEFAULT NOW()
);
CREATE INDEX idx_telemetry_robot_time ON telemetry(robot_id, timestamp);
use sqlx::{postgres::PgPoolOptions, FromRow, Pool, Postgres};
use uuid::Uuid;
use chrono::{DateTime, Utc};
#[derive(Debug, FromRow)]
struct Robot {
id: Uuid,
name: String,
status: String,
position_x: Option<f64>,
position_y: Option<f64>,
battery_level: Option<f64>,
last_seen: DateTime<Utc>,
}
struct RobotRepository {
pool: Pool<Postgres>,
}
impl RobotRepository {
async fn new(database_url: &str) -> Result<Self, sqlx::Error> {
let pool = PgPoolOptions::new()
.max_connections(5)
.connect(database_url)
.await?;
Ok(Self { pool })
}
// Compile-time verified query
async fn get_robot(&self, id: Uuid) -> Result<Robot, sqlx::Error> {
sqlx::query_as!(
Robot,
r#"
SELECT id, name, status, position_x, position_y,
battery_level, last_seen
FROM robots
WHERE id = $1
"#,
id
)
.fetch_one(&self.pool)
.await
}
async fn update_position(
&self,
id: Uuid,
x: f64,
y: f64
) -> Result<(), sqlx::Error> {
sqlx::query!(
r#"
UPDATE robots
SET position_x = $2, position_y = $3, last_seen = NOW()
WHERE id = $1
"#,
id, x, y
)
.execute(&self.pool)
.await?;
Ok(())
}
async fn record_telemetry(
&self,
robot_id: Uuid,
metric: &str,
value: f64,
) -> Result<(), sqlx::Error> {
sqlx::query!(
r#"
INSERT INTO telemetry (robot_id, metric_name, metric_value)
VALUES ($1, $2, $3)
"#,
robot_id, metric, value
)
.execute(&self.pool)
.await?;
Ok(())
}
}
use sqlx::sqlite::{SqlitePool, SqlitePoolOptions};
async fn setup_local_db() -> Result<SqlitePool, sqlx::Error> {
let pool = SqlitePoolOptions::new()
.max_connections(1)
.connect("sqlite:robot_local.db?mode=rwc")
.await?;
// Run embedded migrations
sqlx::migrate!("./migrations")
.run(&pool)
.await?;
Ok(pool)
}
{
devShells.default = pkgs.mkShell {
packages = with pkgs; [
# Rust toolchain
rustc
cargo
rust-analyzer
# PyO3 tools
maturin
# sqlx
sqlx-cli
# Database clients
postgresql
sqlite
];
shellHook = ''
export DATABASE_URL="postgres://user:pass@localhost/robotics"
export SQLX_OFFLINE=true # Use prepared queries in CI
'';
};
}
#[pyclass] for stateful objectsVec<T> over PyList for performancenumpy crate for array operationspy.allow_threads(|| ...)query! macro)The AGiXT Rust SDK enables Rust applications to communicate with AGiXT for AI-powered robotics.
rust/
āāā Cargo.toml # Workspace root
āāā agixt-bridge/
āāā Cargo.toml # AGiXT bridge crate
āāā src/
ā āāā main.rs # CLI entry point
ā āāā lib.rs # Library exports
ā āāā client.rs # AGiXT client wrapper
ā āāā commands.rs # Robot command types
ā āāā config.rs # Configuration
āāā examples/
āāā basic_chat.rs # Basic AGiXT chat
āāā robot_commands.rs # Robot command processing
[dependencies]
agixt_sdk = "0.1"
tokio = { version = "1.0", features = ["full"] }
reqwest = { version = "0.11", features = ["json"] }
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
anyhow = "1.0"
use agixt_sdk::AGiXTSDK;
#[tokio::main]
async fn main() -> anyhow::Result<()> {
let client = AGiXTSDK::new(
Some("http://localhost:7437".to_string()),
Some("agixt-dev-key".to_string()),
false,
);
// Get available providers
let providers = client.get_providers().await?;
println!("Providers: {:?}", providers);
// Create conversation
let conv = client.new_conversation("ros2-agent", None, None).await?;
println!("Conversation: {:?}", conv);
Ok(())
}
# Build the bridge
cd rust/agixt-bridge
cargo build
# Run example
cargo run --example basic_chat
# Run with release optimizations
cargo build --release
use agixt_sdk::AGiXTSDK;
use std::sync::Arc;
pub struct AGiXTROS2Bridge {
client: Arc<AGiXTSDK>,
agent_name: String,
}
impl AGiXTROS2Bridge {
pub fn new(url: &str, api_key: &str, agent: &str) -> Self {
let client = AGiXTSDK::new(
Some(url.to_string()),
Some(api_key.to_string()),
false,
);
Self {
client: Arc::new(client),
agent_name: agent.to_string(),
}
}
pub async fn process_command(&self, command: &str) -> anyhow::Result<String> {
// Send command to AGiXT for AI processing
// Return structured response for ROS2 action
todo!("Implement based on your robot's action interface")
}
}
Environment variables:
AGIXT_URL - AGiXT API URL (default: http://localhost:7437)AGIXT_API_KEY - API key (default: agixt-dev-key)LOCALAI_URL - LocalAI URL (default: http://localhost:8080)AGIXT_AGENT - Default agent name (default: ros2-agent)See rust/agixt-bridge/src/config.rs for configuration details.
This skill should be used when the user asks to "create a slash command", "add a command", "write a custom command", "define command arguments", "use command frontmatter", "organize commands", "create command with file references", "interactive command", "use AskUserQuestion in command", or needs guidance on slash command structure, YAML frontmatter fields, dynamic arguments, bash execution in commands, user interaction patterns, or command development best practices for Claude Code.
This skill should be used when the user asks to "create an agent", "add an agent", "write a subagent", "agent frontmatter", "when to use description", "agent examples", "agent tools", "agent colors", "autonomous agent", or needs guidance on agent structure, system prompts, triggering conditions, or agent development best practices for Claude Code plugins.
This skill should be used when the user asks to "create a hook", "add a PreToolUse/PostToolUse/Stop hook", "validate tool use", "implement prompt-based hooks", "use ${CLAUDE_PLUGIN_ROOT}", "set up event-driven automation", "block dangerous commands", or mentions hook events (PreToolUse, PostToolUse, Stop, SubagentStop, SessionStart, SessionEnd, UserPromptSubmit, PreCompact, Notification). Provides comprehensive guidance for creating and implementing Claude Code plugin hooks with focus on advanced prompt-based hooks API.