Celery 5.3+ distributed task queue with Beat scheduler, Redis/RabbitMQ brokers, workflow patterns, and FastAPI integration. Use for background jobs, periodic tasks, and async processing.
From ensemble-developmentnpx claudepluginhub fortiumpartners/ensemble --plugin ensemble-developmentThis skill uses the workspace's default tool permissions.
README.mdREFERENCE.mdVALIDATION.mdexamples/README.mdexamples/fastapi_celery.example.pyexamples/task_patterns.example.pytemplates/README.mdtemplates/beat_schedule.template.pytemplates/celery_config.template.pytemplates/pytest_celery.template.pytemplates/task.template.pySearches, retrieves, and installs Agent Skills from prompts.chat registry using MCP tools like search_skills and get_skill. Activates for finding skills, browsing catalogs, or extending Claude.
Searches prompts.chat for AI prompt templates by keyword or category, retrieves by ID with variable handling, and improves prompts via AI. Use for discovering or enhancing prompts.
Guides agent creation for Claude Code plugins with file templates, frontmatter specs (name, description, model), triggering examples, system prompts, and best practices.
Celery 5.3+ distributed task queue with Beat scheduler for Python applications. Background job processing, periodic scheduling, workflow patterns, and FastAPI integration.
This skill is loaded by backend-developer when:
celery or celery[redis] in dependenciesceleryconfig.py or celery.py present@app.task) foundMinimum Detection Confidence: 0.8 (80%)
Prerequisite: Python skill should be loaded for core patterns.
my_project/
├── src/my_app/
│ ├── celery_app.py # Celery application
│ ├── config.py # Settings
│ ├── tasks/ # Task modules
│ │ ├── email.py
│ │ ├── reports.py
│ │ └── cleanup.py
│ └── workers/queues.py # Queue definitions
├── tests/
│ ├── conftest.py # Celery fixtures
│ └── tasks/
├── docker-compose.yml # Redis + workers
└── pyproject.toml
from celery import Celery
from kombu import Queue
from .config import settings
app = Celery(
"my_app",
broker=settings.celery_broker_url,
backend=settings.celery_result_backend,
include=["my_app.tasks.email", "my_app.tasks.reports"],
)
app.conf.update(
task_serializer="json",
accept_content=["json"],
result_serializer="json",
timezone="UTC",
enable_utc=True,
task_track_started=True,
task_time_limit=300,
task_soft_time_limit=240,
worker_prefetch_multiplier=1,
task_acks_late=True,
task_reject_on_worker_lost=True,
)
# Queue routing
app.conf.task_queues = (
Queue("default", routing_key="default"),
Queue("high_priority", routing_key="high"),
Queue("low_priority", routing_key="low"),
)
from celery import shared_task
from my_app.celery_app import app
@shared_task(name="tasks.add")
def add(x: int, y: int) -> int:
return x + y
@app.task(bind=True, name="tasks.send_email")
def send_email(self, to: str, subject: str, body: str) -> dict:
task_id = self.request.id
return {"task_id": task_id, "status": "sent"}
@shared_task(
bind=True,
max_retries=3,
default_retry_delay=60,
autoretry_for=(httpx.TimeoutException, httpx.ConnectError),
retry_backoff=True,
retry_backoff_max=600,
retry_jitter=True,
)
def call_external_api(self, endpoint: str, payload: dict) -> dict:
with httpx.Client(timeout=30) as client:
response = client.post(endpoint, json=payload)
response.raise_for_status()
return response.json()
@shared_task(bind=True, rate_limit="10/m", name="tasks.send_sms")
def send_sms(self, phone: str, message: str) -> dict:
return sms_service.send(phone, message)
from celery.exceptions import SoftTimeLimitExceeded
@shared_task(bind=True, soft_time_limit=300, time_limit=360)
def generate_report(self, report_id: int) -> dict:
try:
return build_report(report_id)
except SoftTimeLimitExceeded:
partial_save(report_id)
raise
See REFERENCE.md for manual retry, progress tracking, and custom retry backoff patterns.
app.conf.task_routes = {
"tasks.send_email": {"queue": "high_priority"},
"tasks.generate_report": {"queue": "low_priority"},
"tasks.process_payment": {"queue": "payments"},
"tasks.*": {"queue": "default"},
}
process_order.apply_async(args=[123], queue="high_priority")
process_order.apply_async(args=[456], routing_key="payments")
# High priority only
celery -A my_app.celery_app worker -Q high_priority -c 4
# Multiple queues
celery -A my_app.celery_app worker -Q default,low_priority -c 2
from celery.schedules import crontab
app.conf.beat_schedule = {
"health-check": {
"task": "tasks.health_check",
"schedule": 30.0, # Every 30 seconds
},
"daily-report": {
"task": "tasks.generate_daily_report",
"schedule": crontab(hour=2, minute=0), # Daily at 2 AM
},
"weekly-summary": {
"task": "tasks.send_weekly_summary",
"schedule": crontab(hour=9, minute=0, day_of_week=1), # Monday 9 AM
},
}
| Pattern | Expression |
|---|---|
| Every minute | crontab() |
| Every 15 min | crontab(minute="*/15") |
| Daily midnight | crontab(hour=0, minute=0) |
| Weekdays 9 AM | crontab(hour=9, minute=0, day_of_week="1-5") |
| Monthly 1st | crontab(hour=0, minute=0, day_of_month=1) |
# Standalone
celery -A my_app.celery_app beat --loglevel=info
# With worker (dev only)
celery -A my_app.celery_app worker --beat --loglevel=info
See REFERENCE.md for dynamic database schedules and advanced crontab patterns.
from celery import chain
workflow = chain(
fetch_data.s(url),
process_data.s(),
save_results.s(destination),
)
result = workflow.apply_async()
from celery import group
workflow = group(process_image.s(id) for id in image_ids)
result = workflow.apply_async()
all_results = result.get()
from celery import chord
workflow = chord(
(process_chunk.s(chunk) for chunk in chunks),
aggregate_results.s()
)
result = workflow.apply_async()
See REFERENCE.md for complex multi-step workflows and error handling in chains.
from fastapi import APIRouter
from celery.result import AsyncResult
from .celery_app import celery_app
from .tasks.email import send_email
router = APIRouter()
@router.post("/emails/send")
async def queue_email(to: str, subject: str, body: str) -> dict:
task = send_email.delay(to, subject, body)
return {"task_id": task.id, "status": "queued"}
@router.get("/tasks/{task_id}/status")
async def get_task_status(task_id: str) -> dict:
result = AsyncResult(task_id, app=celery_app)
response = {"task_id": task_id, "status": result.status, "ready": result.ready()}
if result.ready():
response["result"] = result.get() if result.successful() else str(result.result)
return response
@shared_task(bind=True)
def process_large_file(self, file_id: int) -> dict:
file_data = load_file(file_id)
for i, chunk in enumerate(file_data):
process_chunk(chunk)
self.update_state(state="PROGRESS", meta={"current": i + 1, "total": len(file_data)})
return {"processed": len(file_data)}
See REFERENCE.md for polling patterns, revocation, and lifespan management.
import pytest
@pytest.fixture(scope="session")
def celery_config():
return {
"broker_url": "memory://",
"result_backend": "cache+memory://",
"task_always_eager": True,
"task_eager_propagates": True,
}
def test_send_email_success(celery_app):
with patch("my_app.tasks.email.email_client") as mock:
mock.send.return_value = {"id": "msg_123"}
result = send_email.delay("user@example.com", "Test", "Hello")
assert result.successful()
assert result.get()["status"] == "sent"
See REFERENCE.md for integration tests with real workers and Beat schedule testing.
celery -A my_app.celery_app worker --loglevel=info
celery -A my_app.celery_app worker -c 4 -Q high,default
celery -A my_app.celery_app worker --pool=gevent -c 100
celery -A my_app.celery_app worker --autoscale=10,3
celery -A my_app.celery_app inspect active
celery -A my_app.celery_app inspect registered
celery -A my_app.celery_app inspect scheduled
celery -A my_app.celery_app inspect ping
celery -A my_app.celery_app control shutdown
celery -A my_app.celery_app purge
celery -A my_app.celery_app control revoke <task_id>
celery -A my_app.celery_app control rate_limit tasks.send_email 10/m
# Broker & backend
broker_url = "redis://localhost:6379/0"
result_backend = "redis://localhost:6379/1"
result_expires = 3600
# Serialization
task_serializer = "json"
result_serializer = "json"
accept_content = ["json"]
# Execution
task_time_limit = 300
task_soft_time_limit = 240
task_acks_late = True
task_reject_on_worker_lost = True
# Worker
worker_prefetch_multiplier = 1
worker_concurrency = 4
See REFERENCE.md for full configuration reference and environment-based settings.
| Anti-Pattern | Problem | Solution |
|---|---|---|
| Blocking in tasks | time.sleep() blocks worker | Use countdown or async |
| Large arguments | Megabytes through broker | Pass ID, fetch in task |
| Not idempotent | Duplicate charges on retry | Use idempotency keys |
| Ignoring results | Memory leaks in backend | Set ignore_result=True or configure result_expires |
| DB in task module | Import-time connections | Import inside task function |
See REFERENCE.md for detailed examples and solutions.