Message broker setup patterns (Redis, RabbitMQ, SQS) for Celery including connection strings, SSL configuration, high availability, and production best practices. Use when configuring message brokers, setting up Redis/RabbitMQ/SQS, troubleshooting broker connections, implementing HA/failover, securing broker communications with SSL, or when user mentions broker setup, connection issues, sentinel, quorum queues, or AWS SQS integration.
/plugin marketplace add vanman2024/ai-dev-marketplace/plugin install celery@ai-dev-marketplaceThis skill is limited to using the following tools:
README.mdexamples/rabbitmq-ha.mdexamples/redis-sentinel.mdexamples/sqs-setup.mdscripts/setup-rabbitmq.shscripts/setup-redis.shscripts/test-broker-connection.shtemplates/connection-strings.envtemplates/rabbitmq-config.pytemplates/redis-config.pytemplates/sqs-config.pytemplates/ssl-config.pyPurpose: Comprehensive message broker configuration for Celery with production-ready patterns for Redis, RabbitMQ, and Amazon SQS.
Activation Triggers:
Key Resources:
templates/redis-config.py - Production Redis configurationtemplates/rabbitmq-config.py - RabbitMQ with quorum queuestemplates/sqs-config.py - AWS SQS with IAM rolestemplates/connection-strings.env - Connection string formatstemplates/ssl-config.py - SSL/TLS configurationscripts/test-broker-connection.sh - Connection testingscripts/setup-redis.sh - Redis installation and configurationscripts/setup-rabbitmq.sh - RabbitMQ cluster setupexamples/redis-sentinel.md - High availability with Sentinelexamples/rabbitmq-ha.md - RabbitMQ clustering and quorum queuesexamples/sqs-setup.md - Complete AWS SQS integration| Feature | Redis | RabbitMQ | SQS |
|---|---|---|---|
| Performance | Excellent (small msgs) | Very Good | Good |
| Reliability | Good (with Sentinel) | Excellent (quorum) | Excellent |
| Monitoring | Yes | Yes | Limited |
| Remote Control | Yes | Yes | No |
| Management | Manual/Cloud | Manual | Fully Managed |
| Cost | Server/Cloud | Server/Cloud | Pay-per-use |
| Best For | Speed, simple setup | Reliability, features | AWS, serverless |
Choose Redis when:
Choose RabbitMQ when:
Choose SQS when:
# Install dependencies
pip install "celery[redis]"
# Use template
cp templates/redis-config.py celeryconfig.py
# Configure environment
cp templates/connection-strings.env .env
# Edit .env with actual values
Key settings:
# templates/redis-config.py
broker_url = 'redis://:password@localhost:6379/0'
broker_transport_options = {
'visibility_timeout': 3600,
'retry_on_timeout': True,
'max_connections': 50,
}
CRITICAL: Set maxmemory-policy noeviction in Redis config:
# Run setup script
./scripts/setup-redis.sh --install --configure
# Or manually
redis-cli CONFIG SET maxmemory-policy noeviction
Why: Prevents Redis from evicting task data, which would cause task loss.
# See comprehensive guide
cat examples/redis-sentinel.md
# Quick setup
docker-compose -f examples/docker-compose.sentinel.yml up -d
# Configure Celery for Sentinel
CELERY_BROKER_URL='sentinel://host1:26379;host2:26379;host3:26379/0'
Connection string format:
# Sentinel URL
broker_url = 'sentinel://sentinel1:26379;sentinel2:26379/0'
broker_transport_options = {
'master_name': 'mymaster',
'sentinel_kwargs': {'password': 'sentinel_password'},
}
# Install dependencies
pip install "celery[amqp]"
# Use template
cp templates/rabbitmq-config.py celeryconfig.py
# Run setup script
./scripts/setup-rabbitmq.sh --install --configure
Key settings:
# templates/rabbitmq-config.py
broker_url = 'amqp://user:password@localhost:5672/vhost'
# CRITICAL for quorum queues
broker_transport_options = {
'confirm_publish': True, # Required!
}
Provides:
from kombu import Queue
task_queues = (
Queue(
'default',
queue_arguments={
'x-queue-type': 'quorum',
'x-delivery-limit': 3,
}
),
)
# See comprehensive guide
cat examples/rabbitmq-ha.md
# Docker cluster setup
docker-compose -f examples/docker-compose.rabbitmq-cluster.yml up -d
# Verify cluster
docker exec rabbitmq-1 rabbitmqctl cluster_status
HAProxy load balancing:
# Distribute connections across cluster
broker_url = 'amqp://user:password@haproxy:5670/vhost'
# Create IAM policy and user
aws iam create-policy \
--policy-name CelerySQSPolicy \
--policy-document file://examples/celery-sqs-policy.json
# Or use IAM role (recommended for EC2/ECS)
# See examples/sqs-setup.md for complete guide
# Install dependencies
pip install "celery[sqs]"
# Use template
cp templates/sqs-config.py celeryconfig.py
With IAM role (recommended):
# No credentials needed
broker_url = 'sqs://'
broker_transport_options = {
'region': 'us-east-1',
'visibility_timeout': 3600,
'polling_interval': 1,
'wait_time_seconds': 10, # Long polling
}
With explicit credentials:
broker_url = 'sqs://access_key:secret_key@'
For ordered task processing:
task_queues = (
Queue(
'celery-default.fifo',
queue_arguments={
'FifoQueue': 'true',
'ContentBasedDeduplication': 'true',
}
),
)
# Send with message group ID
task.apply_async(
args=[data],
properties={'MessageGroupId': 'user-123'}
)
SQS doesn't support results - use S3 or DynamoDB:
# Option 1: S3
result_backend = 's3://my-bucket/celery-results/'
# Option 2: DynamoDB
result_backend = 'dynamodb://'
result_backend_transport_options = {
'table_name': 'celery-results',
}
# Option 3: Redis (hybrid)
result_backend = 'redis://redis.example.com:6379/0'
# Use templates/ssl-config.py
from templates.ssl_config import app
# Or manually
broker_url = 'rediss://password@host:6380/0' # Note: rediss://
broker_use_ssl = {
'ssl_cert_reqs': ssl.CERT_REQUIRED,
'ssl_ca_certs': '/path/to/ca.pem',
'ssl_certfile': '/path/to/client-cert.pem',
'ssl_keyfile': '/path/to/client-key.pem',
}
broker_url = 'amqps://user:password@host:5671/vhost' # Note: amqps://
broker_use_ssl = {
'ssl_cert_reqs': ssl.CERT_REQUIRED,
'ssl_ca_certs': '/path/to/ca.pem',
'ssl_certfile': '/path/to/client-cert.pem',
'ssl_keyfile': '/path/to/client-key.pem',
}
# .env
BROKER_SSL_ENABLED=true
BROKER_SSL_CERT=/path/to/client-cert.pem
BROKER_SSL_KEY=/path/to/client-key.pem
BROKER_SSL_CA=/path/to/ca-cert.pem
BROKER_SSL_VERIFY_MODE=CERT_REQUIRED
# Test any broker type
./scripts/test-broker-connection.sh redis
./scripts/test-broker-connection.sh rabbitmq
./scripts/test-broker-connection.sh sqs
# Check specific features
./scripts/test-broker-connection.sh redis --ssl
Script checks:
from celery import Celery
app = Celery(broker='redis://localhost:6379/0')
# Test connection
try:
with app.connection() as conn:
conn.ensure_connection(max_retries=3, timeout=5)
print("✅ Connection successful")
except Exception as e:
print(f"❌ Connection failed: {e}")
Problem: Redis evicting task data Solution:
./scripts/setup-redis.sh --configure
# Or manually:
redis-cli CONFIG SET maxmemory-policy noeviction
Problem: Queue doesn't exist or wrong vhost Solution:
# Check vhost
rabbitmqctl list_vhosts
# Check permissions
rabbitmqctl list_permissions -p /celery_vhost
Problem: IAM permissions insufficient Solution:
# Verify IAM policy includes all required actions
# See examples/sqs-setup.md for complete policy
# Test credentials
aws sts get-caller-identity
aws sqs list-queues --queue-name-prefix celery-
Problem: Network/firewall blocking connection Solution:
# Test network connectivity
telnet broker-host 6379 # Redis
telnet broker-host 5672 # RabbitMQ
# Check firewall rules
# Verify security groups (AWS)
# Check iptables rules
broker_transport_options = {
'visibility_timeout': 3600,
'max_connections': 100, # Increase for high concurrency
'socket_timeout': 5.0,
'socket_keepalive': True,
'health_check_interval': 30,
}
# Worker settings
worker_prefetch_multiplier = 4 # Prefetch 4x concurrency
worker_max_tasks_per_child = 1000
broker_transport_options = {
'confirm_publish': True,
'max_retries': 3,
}
# Use quorum queues for reliability
# Adjust prefetch for throughput
worker_prefetch_multiplier = 4
# Disable QoS for maximum speed (classic queues only)
# Note: Not compatible with worker autoscaling
# Reduce API calls (costs)
broker_transport_options = {
'polling_interval': 5, # Poll less frequently
'wait_time_seconds': 20, # Max long polling
}
# CRITICAL: Prevent visibility timeout
worker_prefetch_multiplier = 1 # Must be 1 for SQS
Redis:
INFO clientsINFO memoryDBSIZEINFO statsRabbitMQ:
rabbitmqctl list_queuesrabbitmqctl statusSQS:
ApproximateNumberOfMessagesVisible: Queue backlogNumberOfMessagesSent: Task creation rateNumberOfMessagesReceived: Task completion rateNumberOfEmptyReceives: Polling efficiency#!/bin/bash
# health-check.sh
BROKER_TYPE="${1:-redis}"
case "$BROKER_TYPE" in
redis)
redis-cli PING || exit 1
;;
rabbitmq)
rabbitmqctl status || exit 1
;;
sqs)
aws sqs list-queues --queue-name-prefix celery- || exit 1
;;
esac
echo "✅ Broker healthy"
requirepassTemplates:
redis-config.py - Production Redis configurationrabbitmq-config.py - RabbitMQ with quorum queuessqs-config.py - AWS SQS with IAMconnection-strings.env - All connection formatsssl-config.py - SSL/TLS configurationScripts:
test-broker-connection.sh - Test any brokersetup-redis.sh - Redis installation and configsetup-rabbitmq.sh - RabbitMQ cluster setupExamples:
redis-sentinel.md - High availability setuprabbitmq-ha.md - Clustering and quorum queuessqs-setup.md - Complete AWS integrationDocumentation:
Version: 1.0.0 Celery Compatibility: 5.0+ Supported Brokers: Redis 6+, RabbitMQ 3.8+, Amazon SQS
This skill follows strict security rules:
.gitignore protection documented