Help us improve
Share bugs, ideas, or general feedback.
From ensemble-development
Sets up Celery 5.3+ distributed task queues with Beat scheduler, Redis/RabbitMQ brokers, workflow patterns, and FastAPI integration for background jobs, periodic tasks, and async processing.
npx claudepluginhub fortiumpartners/ensemble --plugin ensemble-developmentHow this skill is triggered — by the user, by Claude, or both
Slash command
/ensemble-development:using-celeryThe summary Claude sees in its skill listing — used to decide when to auto-load this skill
Celery 5.3+ distributed task queue with Beat scheduler for Python applications. Background job processing, periodic scheduling, workflow patterns, and FastAPI integration.
Celery configuration templates for all frameworks (Django, Flask, FastAPI, standalone). Use when configuring Celery, setting up task queues, creating Celery apps, integrating with frameworks, or when user mentions Celery configuration, task queue setup, broker configuration, or framework integration.
Configures Celery async tasks in Django: broker setup, task design, beat scheduling, retries, canvas workflows, monitoring, and testing.
Provides async job processing patterns with Celery, ARQ, Redis, and Temporal for background tasks, workflows, scheduling, retries, rate limiting, and monitoring. Use for task queues and distributed execution.
Share bugs, ideas, or general feedback.
Celery 5.3+ distributed task queue with Beat scheduler for Python applications. Background job processing, periodic scheduling, workflow patterns, and FastAPI integration.
This skill is loaded by backend-developer when:
celery or celery[redis] in dependenciesceleryconfig.py or celery.py present@app.task) foundMinimum Detection Confidence: 0.8 (80%)
Prerequisite: Python skill should be loaded for core patterns.
my_project/
├── src/my_app/
│ ├── celery_app.py # Celery application
│ ├── config.py # Settings
│ ├── tasks/ # Task modules
│ │ ├── email.py
│ │ ├── reports.py
│ │ └── cleanup.py
│ └── workers/queues.py # Queue definitions
├── tests/
│ ├── conftest.py # Celery fixtures
│ └── tasks/
├── docker-compose.yml # Redis + workers
└── pyproject.toml
from celery import Celery
from kombu import Queue
from .config import settings
app = Celery(
"my_app",
broker=settings.celery_broker_url,
backend=settings.celery_result_backend,
include=["my_app.tasks.email", "my_app.tasks.reports"],
)
app.conf.update(
task_serializer="json",
accept_content=["json"],
result_serializer="json",
timezone="UTC",
enable_utc=True,
task_track_started=True,
task_time_limit=300,
task_soft_time_limit=240,
worker_prefetch_multiplier=1,
task_acks_late=True,
task_reject_on_worker_lost=True,
)
# Queue routing
app.conf.task_queues = (
Queue("default", routing_key="default"),
Queue("high_priority", routing_key="high"),
Queue("low_priority", routing_key="low"),
)
from celery import shared_task
from my_app.celery_app import app
@shared_task(name="tasks.add")
def add(x: int, y: int) -> int:
return x + y
@app.task(bind=True, name="tasks.send_email")
def send_email(self, to: str, subject: str, body: str) -> dict:
task_id = self.request.id
return {"task_id": task_id, "status": "sent"}
@shared_task(
bind=True,
max_retries=3,
default_retry_delay=60,
autoretry_for=(httpx.TimeoutException, httpx.ConnectError),
retry_backoff=True,
retry_backoff_max=600,
retry_jitter=True,
)
def call_external_api(self, endpoint: str, payload: dict) -> dict:
with httpx.Client(timeout=30) as client:
response = client.post(endpoint, json=payload)
response.raise_for_status()
return response.json()
@shared_task(bind=True, rate_limit="10/m", name="tasks.send_sms")
def send_sms(self, phone: str, message: str) -> dict:
return sms_service.send(phone, message)
from celery.exceptions import SoftTimeLimitExceeded
@shared_task(bind=True, soft_time_limit=300, time_limit=360)
def generate_report(self, report_id: int) -> dict:
try:
return build_report(report_id)
except SoftTimeLimitExceeded:
partial_save(report_id)
raise
See REFERENCE.md for manual retry, progress tracking, and custom retry backoff patterns.
app.conf.task_routes = {
"tasks.send_email": {"queue": "high_priority"},
"tasks.generate_report": {"queue": "low_priority"},
"tasks.process_payment": {"queue": "payments"},
"tasks.*": {"queue": "default"},
}
process_order.apply_async(args=[123], queue="high_priority")
process_order.apply_async(args=[456], routing_key="payments")
# High priority only
celery -A my_app.celery_app worker -Q high_priority -c 4
# Multiple queues
celery -A my_app.celery_app worker -Q default,low_priority -c 2
from celery.schedules import crontab
app.conf.beat_schedule = {
"health-check": {
"task": "tasks.health_check",
"schedule": 30.0, # Every 30 seconds
},
"daily-report": {
"task": "tasks.generate_daily_report",
"schedule": crontab(hour=2, minute=0), # Daily at 2 AM
},
"weekly-summary": {
"task": "tasks.send_weekly_summary",
"schedule": crontab(hour=9, minute=0, day_of_week=1), # Monday 9 AM
},
}
| Pattern | Expression |
|---|---|
| Every minute | crontab() |
| Every 15 min | crontab(minute="*/15") |
| Daily midnight | crontab(hour=0, minute=0) |
| Weekdays 9 AM | crontab(hour=9, minute=0, day_of_week="1-5") |
| Monthly 1st | crontab(hour=0, minute=0, day_of_month=1) |
# Standalone
celery -A my_app.celery_app beat --loglevel=info
# With worker (dev only)
celery -A my_app.celery_app worker --beat --loglevel=info
See REFERENCE.md for dynamic database schedules and advanced crontab patterns.
from celery import chain
workflow = chain(
fetch_data.s(url),
process_data.s(),
save_results.s(destination),
)
result = workflow.apply_async()
from celery import group
workflow = group(process_image.s(id) for id in image_ids)
result = workflow.apply_async()
all_results = result.get()
from celery import chord
workflow = chord(
(process_chunk.s(chunk) for chunk in chunks),
aggregate_results.s()
)
result = workflow.apply_async()
See REFERENCE.md for complex multi-step workflows and error handling in chains.
from fastapi import APIRouter
from celery.result import AsyncResult
from .celery_app import celery_app
from .tasks.email import send_email
router = APIRouter()
@router.post("/emails/send")
async def queue_email(to: str, subject: str, body: str) -> dict:
task = send_email.delay(to, subject, body)
return {"task_id": task.id, "status": "queued"}
@router.get("/tasks/{task_id}/status")
async def get_task_status(task_id: str) -> dict:
result = AsyncResult(task_id, app=celery_app)
response = {"task_id": task_id, "status": result.status, "ready": result.ready()}
if result.ready():
response["result"] = result.get() if result.successful() else str(result.result)
return response
@shared_task(bind=True)
def process_large_file(self, file_id: int) -> dict:
file_data = load_file(file_id)
for i, chunk in enumerate(file_data):
process_chunk(chunk)
self.update_state(state="PROGRESS", meta={"current": i + 1, "total": len(file_data)})
return {"processed": len(file_data)}
See REFERENCE.md for polling patterns, revocation, and lifespan management.
import pytest
@pytest.fixture(scope="session")
def celery_config():
return {
"broker_url": "memory://",
"result_backend": "cache+memory://",
"task_always_eager": True,
"task_eager_propagates": True,
}
def test_send_email_success(celery_app):
with patch("my_app.tasks.email.email_client") as mock:
mock.send.return_value = {"id": "msg_123"}
result = send_email.delay("user@example.com", "Test", "Hello")
assert result.successful()
assert result.get()["status"] == "sent"
See REFERENCE.md for integration tests with real workers and Beat schedule testing.
celery -A my_app.celery_app worker --loglevel=info
celery -A my_app.celery_app worker -c 4 -Q high,default
celery -A my_app.celery_app worker --pool=gevent -c 100
celery -A my_app.celery_app worker --autoscale=10,3
celery -A my_app.celery_app inspect active
celery -A my_app.celery_app inspect registered
celery -A my_app.celery_app inspect scheduled
celery -A my_app.celery_app inspect ping
celery -A my_app.celery_app control shutdown
celery -A my_app.celery_app purge
celery -A my_app.celery_app control revoke <task_id>
celery -A my_app.celery_app control rate_limit tasks.send_email 10/m
# Broker & backend
broker_url = "redis://localhost:6379/0"
result_backend = "redis://localhost:6379/1"
result_expires = 3600
# Serialization
task_serializer = "json"
result_serializer = "json"
accept_content = ["json"]
# Execution
task_time_limit = 300
task_soft_time_limit = 240
task_acks_late = True
task_reject_on_worker_lost = True
# Worker
worker_prefetch_multiplier = 1
worker_concurrency = 4
See REFERENCE.md for full configuration reference and environment-based settings.
| Anti-Pattern | Problem | Solution |
|---|---|---|
| Blocking in tasks | time.sleep() blocks worker | Use countdown or async |
| Large arguments | Megabytes through broker | Pass ID, fetch in task |
| Not idempotent | Duplicate charges on retry | Use idempotency keys |
| Ignoring results | Memory leaks in backend | Set ignore_result=True or configure result_expires |
| DB in task module | Import-time connections | Import inside task function |
See REFERENCE.md for detailed examples and solutions.