Help us improve
Share bugs, ideas, or general feedback.
From fullstack-agents
Production-ready Celery worker configuration for distributed task processing.
npx claudepluginhub adelabdelgawad/fullstack-agents --plugin fullstack-agentsHow this skill is triggered — by the user, by Claude, or both
Slash command
/fullstack-agents:celeryThe summary Claude sees in its skill listing — used to decide when to auto-load this skill
Production-ready Celery worker configuration for distributed task processing.
Sets up Celery 5.3+ distributed task queues with Beat scheduler, Redis/RabbitMQ brokers, workflow patterns, and FastAPI integration for background jobs, periodic tasks, and async processing.
Celery configuration templates for all frameworks (Django, Flask, FastAPI, standalone). Use when configuring Celery, setting up task queues, creating Celery apps, integrating with frameworks, or when user mentions Celery configuration, task queue setup, broker configuration, or framework integration.
Python background job patterns: task queues, workers, event-driven architecture. Helps decouple long-running work from request/response cycles using Celery or alternatives.
Share bugs, ideas, or general feedback.
Production-ready Celery worker configuration for distributed task processing.
Use this skill when asked to:
┌─────────────────────────────────────────────────────────────┐
│ FastAPI Application │
│ dispatch_to_celery() → task.delay() → Redis Broker │
└─────────────────────────────┬───────────────────────────────┘
│
▼
┌─────────────────┐
│ Redis Broker │
│ (Message Queue)│
└────────┬────────┘
│
┌──────────────┼──────────────┐
│ │ │
▼ ▼ ▼
┌──────────┐ ┌──────────┐ ┌──────────┐
│ Worker 1 │ │ Worker 2 │ │ Worker 3 │
│ Q: celery│ │ Q: files │ │ Q: email │
└──────────┘ └──────────┘ └──────────┘
│
▼
┌─────────────────┐
│ Result Backend │
│ (Redis) │
└─────────────────┘
# celery_app.py
from celery import Celery
from settings import settings
celery_app = Celery(
"app_name",
broker=settings.CELERY_BROKER_URL,
backend=settings.CELERY_RESULT_BACKEND,
include=[
"tasks.email",
"tasks.files",
"tasks.scheduler",
],
)
celery_app.conf.update(
# Serialization
task_serializer="json",
result_serializer="json",
accept_content=["json"],
# Reliability - CRITICAL
task_acks_late=True, # Ack after completion
task_reject_on_worker_lost=True, # Requeue if worker dies
task_track_started=True, # Track task start
# Results
result_expires=86400, # 24 hours
# Worker Performance
worker_prefetch_multiplier=1, # Fair distribution
worker_concurrency=10, # Concurrent tasks
# Time Limits - IMPORTANT
task_soft_time_limit=300, # 5 min soft limit
task_time_limit=360, # 6 min hard limit
# Retries
task_default_retry_delay=60, # 1 min default delay
# Timezone
timezone="UTC",
enable_utc=True,
# Connection
broker_connection_retry_on_startup=True,
)
@shared_task(
bind=True, # Access self
max_retries=3, # Retry 3 times
default_retry_delay=60, # 1 min delay
autoretry_for=(Exception,), # Auto-retry
retry_backoff=True, # Exponential backoff
retry_backoff_max=300, # Max 5 min
retry_jitter=True, # Prevent thundering herd
soft_time_limit=120, # Soft limit
time_limit=180, # Hard limit
)
def my_task(self, **kwargs):
pass
# Basic worker
celery -A celery_app worker --loglevel=info
# With concurrency
celery -A celery_app worker --loglevel=info --concurrency=4
# Specific queues
celery -A celery_app worker -Q celery,file_queue,email_queue
# With gevent pool (high concurrency)
celery -A celery_app worker -P gevent --concurrency=100
# celery_app.py
celery_app.conf.task_routes = {
'tasks.email.*': {'queue': 'email'},
'tasks.files.*': {'queue': 'files'},
'tasks.heavy.*': {'queue': 'heavy'},
}
# Or per-task
@shared_task(queue='high_priority')
def urgent_task():
pass