Expert skill for multi-sensor fusion and state estimation using Kalman filtering. Implement EKF/UKF, configure robot_localization, fuse IMU, GPS, odometry, and visual sensors for robust localization.
Implements Kalman filters to fuse IMU, GPS, and odometry data for robust robot localization.
npx claudepluginhub a5c-ai/babysitterThis skill is limited to using the following tools:
README.mdYou are sensor-fusion - a specialized skill for multi-sensor fusion and state estimation using Kalman filtering and factor graph optimization.
This skill enables AI-powered sensor fusion including:
Configure the ROS2 robot_localization package for EKF/UKF:
# ekf_localization.yaml
ekf_filter_node:
ros__parameters:
# Coordinate frames
map_frame: map
odom_frame: odom
base_link_frame: base_link
world_frame: odom
# Frequencies
frequency: 50.0
sensor_timeout: 0.1
two_d_mode: false
# Transform settings
transform_time_offset: 0.0
transform_timeout: 0.0
print_diagnostics: true
debug: false
publish_tf: true
publish_acceleration: false
# IMU input
imu0: /imu/data
imu0_config: [false, false, false, # x, y, z position
true, true, true, # roll, pitch, yaw
false, false, false, # vx, vy, vz
true, true, true, # vroll, vpitch, vyaw
true, true, true] # ax, ay, az
imu0_differential: false
imu0_relative: false
imu0_queue_size: 10
imu0_remove_gravitational_acceleration: true
# Wheel odometry input
odom0: /wheel_odom
odom0_config: [true, true, false, # x, y, z position
false, false, true, # roll, pitch, yaw
true, true, false, # vx, vy, vz
false, false, true, # vroll, vpitch, vyaw
false, false, false] # ax, ay, az
odom0_differential: false
odom0_relative: false
odom0_queue_size: 10
# GPS input (for map frame)
# odom1: /gps/odom
# odom1_config: [true, true, true,
# false, false, false,
# false, false, false,
# false, false, false,
# false, false, false]
# odom1_differential: false
# Process noise covariance (Q matrix diagonal)
process_noise_covariance: [
0.05, # x
0.05, # y
0.06, # z
0.03, # roll
0.03, # pitch
0.06, # yaw
0.025, # vx
0.025, # vy
0.04, # vz
0.01, # vroll
0.01, # vpitch
0.02, # vyaw
0.01, # ax
0.01, # ay
0.015 # az
]
# Initial estimate covariance (P0 matrix diagonal)
initial_estimate_covariance: [
1e-9, 1e-9, 1e-9, # position
1e-9, 1e-9, 1e-9, # orientation
1e-9, 1e-9, 1e-9, # velocity
1e-9, 1e-9, 1e-9, # angular velocity
1e-9, 1e-9, 1e-9 # acceleration
]
Configure two EKF instances for continuous odometry and global localization:
# dual_ekf_navsat.yaml
# EKF for continuous odometry (odom frame)
ekf_filter_node_odom:
ros__parameters:
frequency: 50.0
two_d_mode: false
map_frame: map
odom_frame: odom
base_link_frame: base_link
world_frame: odom
# Fuse IMU and wheel odometry only
imu0: /imu/data
imu0_config: [false, false, false,
true, true, true,
false, false, false,
true, true, true,
true, true, true]
imu0_remove_gravitational_acceleration: true
odom0: /wheel_odom
odom0_config: [true, true, false,
false, false, true,
true, true, false,
false, false, true,
false, false, false]
# EKF for global localization (map frame)
ekf_filter_node_map:
ros__parameters:
frequency: 50.0
two_d_mode: false
map_frame: map
odom_frame: odom
base_link_frame: base_link
world_frame: map
# Fuse odometry output and GPS
odom0: /odometry/filtered
odom0_config: [true, true, true,
true, true, true,
true, true, true,
true, true, true,
true, true, true]
odom1: /gps/odom
odom1_config: [true, true, true,
false, false, false,
false, false, false,
false, false, false,
false, false, false]
odom1_differential: false
# NavSat transform node
navsat_transform_node:
ros__parameters:
frequency: 50.0
delay: 0.0
magnetic_declination_radians: 0.0
yaw_offset: 0.0
zero_altitude: true
broadcast_utm_transform: true
publish_filtered_gps: true
use_odometry_yaw: false
wait_for_datum: false
Implement a custom EKF for state estimation:
import numpy as np
from scipy.linalg import block_diag
class RobotEKF:
"""Extended Kalman Filter for robot localization."""
def __init__(self, dt=0.02):
self.dt = dt
# State: [x, y, z, roll, pitch, yaw, vx, vy, vz, wx, wy, wz]
self.n_states = 12
# State vector
self.x = np.zeros(self.n_states)
# State covariance
self.P = np.eye(self.n_states) * 0.1
# Process noise
self.Q = np.diag([
0.01, 0.01, 0.01, # position noise
0.001, 0.001, 0.001, # orientation noise
0.1, 0.1, 0.1, # velocity noise
0.01, 0.01, 0.01 # angular velocity noise
])
def predict(self, u=None):
"""Predict step using motion model."""
dt = self.dt
x, y, z = self.x[0:3]
roll, pitch, yaw = self.x[3:6]
vx, vy, vz = self.x[6:9]
wx, wy, wz = self.x[9:12]
# State transition (constant velocity model)
# Position update
self.x[0] += vx * np.cos(yaw) * dt - vy * np.sin(yaw) * dt
self.x[1] += vx * np.sin(yaw) * dt + vy * np.cos(yaw) * dt
self.x[2] += vz * dt
# Orientation update
self.x[3] += wx * dt
self.x[4] += wy * dt
self.x[5] += wz * dt
# Jacobian of state transition
F = self._compute_jacobian()
# Covariance prediction
self.P = F @ self.P @ F.T + self.Q
def _compute_jacobian(self):
"""Compute Jacobian of state transition."""
dt = self.dt
yaw = self.x[5]
vx, vy = self.x[6:8]
F = np.eye(self.n_states)
# Position derivatives w.r.t. yaw
F[0, 5] = -vx * np.sin(yaw) * dt - vy * np.cos(yaw) * dt
F[1, 5] = vx * np.cos(yaw) * dt - vy * np.sin(yaw) * dt
# Position derivatives w.r.t. velocity
F[0, 6] = np.cos(yaw) * dt
F[0, 7] = -np.sin(yaw) * dt
F[1, 6] = np.sin(yaw) * dt
F[1, 7] = np.cos(yaw) * dt
F[2, 8] = dt
# Orientation derivatives w.r.t. angular velocity
F[3, 9] = dt
F[4, 10] = dt
F[5, 11] = dt
return F
def update_imu(self, imu_data):
"""Update with IMU measurement (orientation, angular velocity, acceleration)."""
# Measurement: [roll, pitch, yaw, wx, wy, wz, ax, ay, az]
z = np.array([
imu_data['roll'], imu_data['pitch'], imu_data['yaw'],
imu_data['wx'], imu_data['wy'], imu_data['wz']
])
# Measurement matrix
H = np.zeros((6, self.n_states))
H[0:3, 3:6] = np.eye(3) # Orientation
H[3:6, 9:12] = np.eye(3) # Angular velocity
# Measurement noise
R = np.diag([0.01, 0.01, 0.02, 0.001, 0.001, 0.001])
self._ekf_update(z, H, R)
def update_odom(self, odom_data):
"""Update with odometry measurement (velocity)."""
z = np.array([odom_data['vx'], odom_data['vy'], odom_data['wz']])
# Measurement matrix
H = np.zeros((3, self.n_states))
H[0, 6] = 1 # vx
H[1, 7] = 1 # vy
H[2, 11] = 1 # wz
# Measurement noise
R = np.diag([0.05, 0.05, 0.02])
self._ekf_update(z, H, R)
def update_gps(self, gps_data):
"""Update with GPS measurement (position)."""
z = np.array([gps_data['x'], gps_data['y'], gps_data['z']])
# Measurement matrix
H = np.zeros((3, self.n_states))
H[0:3, 0:3] = np.eye(3)
# Measurement noise (GPS typically 1-5m accuracy)
R = np.diag([2.0, 2.0, 5.0])
# Outlier rejection using Mahalanobis distance
y = z - H @ self.x
S = H @ self.P @ H.T + R
mahal_dist = np.sqrt(y.T @ np.linalg.inv(S) @ y)
if mahal_dist < 5.0: # Chi-squared threshold
self._ekf_update(z, H, R)
else:
print(f"GPS outlier rejected: Mahalanobis distance = {mahal_dist:.2f}")
def _ekf_update(self, z, H, R):
"""Standard EKF update step."""
# Innovation
y = z - H @ self.x
# Innovation covariance
S = H @ self.P @ H.T + R
# Kalman gain
K = self.P @ H.T @ np.linalg.inv(S)
# State update
self.x = self.x + K @ y
# Covariance update (Joseph form for numerical stability)
I_KH = np.eye(self.n_states) - K @ H
self.P = I_KH @ self.P @ I_KH.T + K @ R @ K.T
def get_state(self):
"""Return current state estimate."""
return {
'position': self.x[0:3],
'orientation': self.x[3:6],
'velocity': self.x[6:9],
'angular_velocity': self.x[9:12],
'covariance': np.diag(self.P)
}
Implement IMU preintegration for efficient optimization:
import numpy as np
from scipy.spatial.transform import Rotation
class IMUPreintegration:
"""IMU preintegration for factor graph optimization."""
def __init__(self, acc_noise=0.01, gyro_noise=0.001,
acc_bias_noise=0.0001, gyro_bias_noise=0.00001):
self.acc_noise = acc_noise
self.gyro_noise = gyro_noise
self.acc_bias_noise = acc_bias_noise
self.gyro_bias_noise = gyro_bias_noise
self.reset()
def reset(self):
"""Reset preintegration."""
self.delta_R = np.eye(3)
self.delta_v = np.zeros(3)
self.delta_p = np.zeros(3)
self.delta_t = 0.0
# Jacobians for bias correction
self.dR_dbg = np.zeros((3, 3))
self.dv_dba = np.zeros((3, 3))
self.dv_dbg = np.zeros((3, 3))
self.dp_dba = np.zeros((3, 3))
self.dp_dbg = np.zeros((3, 3))
# Covariance
self.cov = np.zeros((9, 9))
def integrate(self, acc, gyro, dt, acc_bias=None, gyro_bias=None):
"""Integrate IMU measurement."""
if acc_bias is None:
acc_bias = np.zeros(3)
if gyro_bias is None:
gyro_bias = np.zeros(3)
# Remove bias
acc_unbiased = acc - acc_bias
gyro_unbiased = gyro - gyro_bias
# Rotation increment
theta = gyro_unbiased * dt
dR = Rotation.from_rotvec(theta).as_matrix()
# Update preintegrated measurements
self.delta_p += self.delta_v * dt + 0.5 * self.delta_R @ acc_unbiased * dt**2
self.delta_v += self.delta_R @ acc_unbiased * dt
self.delta_R = self.delta_R @ dR
self.delta_t += dt
# Update Jacobians
self._update_jacobians(acc_unbiased, gyro_unbiased, dt)
# Update covariance
self._update_covariance(dt)
def _update_jacobians(self, acc, gyro, dt):
"""Update bias Jacobians."""
# Skew-symmetric matrix
def skew(v):
return np.array([
[0, -v[2], v[1]],
[v[2], 0, -v[0]],
[-v[1], v[0], 0]
])
theta = gyro * dt
Jr = self._right_jacobian(theta)
# Update rotation Jacobian
self.dR_dbg = self.delta_R.T @ self.dR_dbg - Jr * dt
# Update velocity Jacobians
self.dv_dba = self.dv_dba - self.delta_R * dt
self.dv_dbg = self.dv_dbg - self.delta_R @ skew(acc) @ self.dR_dbg * dt
# Update position Jacobians
self.dp_dba = self.dp_dba + self.dv_dba * dt - 0.5 * self.delta_R * dt**2
self.dp_dbg = self.dp_dbg + self.dv_dbg * dt - 0.5 * self.delta_R @ skew(acc) @ self.dR_dbg * dt**2
def _right_jacobian(self, theta):
"""Compute right Jacobian of SO(3)."""
angle = np.linalg.norm(theta)
if angle < 1e-8:
return np.eye(3)
axis = theta / angle
s = np.sin(angle)
c = np.cos(angle)
return (s / angle) * np.eye(3) + \
(1 - s / angle) * np.outer(axis, axis) + \
((1 - c) / angle) * self._skew(axis)
def _skew(self, v):
"""Skew-symmetric matrix."""
return np.array([
[0, -v[2], v[1]],
[v[2], 0, -v[0]],
[-v[1], v[0], 0]
])
def _update_covariance(self, dt):
"""Update preintegration covariance."""
# Simplified covariance propagation
A = np.eye(9)
B = np.eye(9) * dt
noise_cov = np.diag([
self.gyro_noise**2, self.gyro_noise**2, self.gyro_noise**2,
self.acc_noise**2, self.acc_noise**2, self.acc_noise**2,
self.gyro_bias_noise**2, self.gyro_bias_noise**2, self.gyro_bias_noise**2
])
self.cov = A @ self.cov @ A.T + B @ noise_cov @ B.T
def get_preintegrated(self):
"""Return preintegrated measurements."""
return {
'delta_R': self.delta_R,
'delta_v': self.delta_v,
'delta_p': self.delta_p,
'delta_t': self.delta_t,
'covariance': self.cov,
'jacobians': {
'dR_dbg': self.dR_dbg,
'dv_dba': self.dv_dba,
'dv_dbg': self.dv_dbg,
'dp_dba': self.dp_dba,
'dp_dbg': self.dp_dbg
}
}
Guidelines for tuning process and measurement noise:
def tune_noise_covariances(sensor_data_log, initial_Q, initial_R):
"""
Autotuning for noise covariances using innovation analysis.
Parameters:
- sensor_data_log: List of sensor measurements
- initial_Q: Initial process noise covariance
- initial_R: Initial measurement noise covariance
Returns:
- Tuned Q and R matrices
"""
from scipy.optimize import minimize
def compute_nees(Q_diag, R_diag, data):
"""Compute Normalized Estimation Error Squared."""
ekf = RobotEKF()
ekf.Q = np.diag(Q_diag)
nees_values = []
for measurement in data:
ekf.predict()
# Compute innovation
z = measurement['z']
H = measurement['H']
R = np.diag(R_diag[:len(z)])
y = z - H @ ekf.x
S = H @ ekf.P @ H.T + R
# NEES
nees = y.T @ np.linalg.inv(S) @ y / len(z)
nees_values.append(nees)
ekf._ekf_update(z, H, R)
return np.mean(nees_values)
def objective(params):
n_Q = len(initial_Q)
Q_diag = params[:n_Q]
R_diag = params[n_Q:]
nees = compute_nees(Q_diag, R_diag, sensor_data_log)
# NEES should be close to 1 for consistent estimation
return (nees - 1.0)**2
initial_params = np.concatenate([np.diag(initial_Q), np.diag(initial_R)])
result = minimize(objective, initial_params,
method='L-BFGS-B',
bounds=[(1e-6, 10)] * len(initial_params))
n_Q = len(initial_Q)
Q_tuned = np.diag(result.x[:n_Q])
R_tuned = np.diag(result.x[n_Q:])
return Q_tuned, R_tuned
Launch robot_localization with sensor fusion:
from launch import LaunchDescription
from launch_ros.actions import Node
from launch.substitutions import PathJoinSubstitution
from launch_ros.substitutions import FindPackageShare
def generate_launch_description():
pkg_share = FindPackageShare('my_robot_localization')
ekf_config = PathJoinSubstitution([pkg_share, 'config', 'ekf.yaml'])
return LaunchDescription([
# EKF for odometry frame
Node(
package='robot_localization',
executable='ekf_node',
name='ekf_filter_node_odom',
output='screen',
parameters=[ekf_config],
remappings=[
('odometry/filtered', 'odometry/local'),
('accel/filtered', 'accel/local')
]
),
# EKF for map frame (with GPS)
Node(
package='robot_localization',
executable='ekf_node',
name='ekf_filter_node_map',
output='screen',
parameters=[ekf_config],
remappings=[
('odometry/filtered', 'odometry/global'),
('accel/filtered', 'accel/global')
]
),
# NavSat transform for GPS
Node(
package='robot_localization',
executable='navsat_transform_node',
name='navsat_transform_node',
output='screen',
parameters=[ekf_config],
remappings=[
('odometry/filtered', 'odometry/global'),
('gps/fix', 'gps/data'),
('imu/data', 'imu/data')
]
)
])
This skill can leverage the following MCP servers for enhanced capabilities:
| Server | Description | Reference |
|---|---|---|
| ros-mcp-server | ROS topic access | GitHub |
This skill integrates with the following processes:
sensor-fusion-framework.js - Primary fusion frameworkvisual-slam-implementation.js - VIO fusionlidar-mapping-localization.js - LiDAR-inertial fusionrobot-calibration.js - Sensor calibrationWhen executing operations, provide structured output:
{
"operation": "configure-fusion",
"filterType": "EKF",
"status": "success",
"sensors": {
"imu": {"fused": true, "rate": 200},
"odom": {"fused": true, "rate": 50},
"gps": {"fused": true, "rate": 5}
},
"artifacts": [
"config/ekf_localization.yaml",
"launch/localization.launch.py"
],
"tuningRecommendations": [
"Consider increasing IMU trust (lower R values)",
"GPS noise may need adjustment for urban environments"
]
}
Activates when the user asks about AI prompts, needs prompt templates, wants to search for prompts, or mentions prompts.chat. Use for discovering, retrieving, and improving prompts.
Search, retrieve, and install Agent Skills from the prompts.chat registry using MCP tools. Use when the user asks to find skills, browse skill catalogs, install a skill for Claude, or extend Claude's capabilities with reusable AI agent components.
This skill should be used when the user asks to "create an agent", "add an agent", "write a subagent", "agent frontmatter", "when to use description", "agent examples", "agent tools", "agent colors", "autonomous agent", or needs guidance on agent structure, system prompts, triggering conditions, or agent development best practices for Claude Code plugins.