Help us improve
Share bugs, ideas, or general feedback.
From ecc
Configures Celery async tasks in Django: broker setup, task design, beat scheduling, retries, canvas workflows, monitoring, and testing.
npx claudepluginhub affaan-m/ecc --plugin eccHow this skill is triggered — by the user, by Claude, or both
Slash command
/ecc:django-celeryThe summary Claude sees in its skill listing — used to decide when to auto-load this skill
Production-grade patterns for background task processing in Django using Celery with Redis or RabbitMQ.
Configures Django + Celery for async task execution, periodic scheduling via Beat, retries, canvas workflows, monitoring, and testing.
Sets up Celery 5.3+ distributed task queues with Beat scheduler, Redis/RabbitMQ brokers, workflow patterns, and FastAPI integration for background jobs, periodic tasks, and async processing.
Production-ready Celery task templates with error handling, retries, rate limiting, time limits, and custom task classes. Use when creating Celery tasks, implementing retry logic, adding rate limiting, setting time limits, building custom task classes, validating task inputs with Pydantic, handling database operations, making API calls, or when user mentions task patterns, retry mechanisms, task templates, error handling, task best practices.
Share bugs, ideas, or general feedback.
Production-grade patterns for background task processing in Django using Celery with Redis or RabbitMQ.
pip install celery[redis] django-celery-results django-celery-beat
celery.py — App Entrypoint# config/celery.py
import os
from celery import Celery
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'config.settings.development')
app = Celery('myproject')
app.config_from_object('django.conf:settings', namespace='CELERY')
app.autodiscover_tasks() # Discovers tasks.py in each INSTALLED_APP
@app.task(bind=True, ignore_result=True)
def debug_task(self):
print(f'Request: {self.request!r}')
# config/__init__.py
from .celery import app as celery_app
__all__ = ('celery_app',)
# config/settings/base.py
# Broker (Redis recommended for production)
CELERY_BROKER_URL = env('CELERY_BROKER_URL', default='redis://localhost:6379/0')
CELERY_RESULT_BACKEND = env('CELERY_RESULT_BACKEND', default='django-db')
# Serialization
CELERY_ACCEPT_CONTENT = ['json']
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
# Task behavior
CELERY_TASK_TRACK_STARTED = True
CELERY_TASK_TIME_LIMIT = 30 * 60 # Hard limit: 30 min
CELERY_TASK_SOFT_TIME_LIMIT = 25 * 60 # Soft limit: sends SoftTimeLimitExceeded
CELERY_WORKER_PREFETCH_MULTIPLIER = 1 # Prevent worker hoarding long tasks
CELERY_TASK_ACKS_LATE = True # Re-queue on worker crash
# Result persistence
CELERY_RESULT_EXPIRES = 60 * 60 * 24 # Keep results 24 hours
# Beat scheduler (for periodic tasks)
CELERY_BEAT_SCHEDULER = 'django_celery_beat.schedulers:DatabaseScheduler'
# Installed apps
INSTALLED_APPS += [
'django_celery_results',
'django_celery_beat',
]
# Start worker (development)
celery -A config worker --loglevel=info
# Start beat scheduler (periodic tasks)
celery -A config beat --loglevel=info --scheduler django_celery_beat.schedulers:DatabaseScheduler
# Combined worker + beat (dev only, never production)
celery -A config worker --beat --loglevel=info
# Production: multiple workers with concurrency
celery -A config worker --loglevel=warning --concurrency=4 -Q default,high_priority
# apps/notifications/tasks.py
from celery import shared_task
import logging
logger = logging.getLogger(__name__)
@shared_task(name='notifications.send_welcome_email')
def send_welcome_email(user_id: int) -> None:
"""Send welcome email to newly registered user."""
from apps.users.models import User
from apps.notifications.services import EmailService
try:
user = User.objects.get(pk=user_id)
except User.DoesNotExist:
logger.warning('send_welcome_email: user %s not found', user_id)
return # Idempotent — do not raise, task already impossible to complete
EmailService.send_welcome(user)
logger.info('Welcome email sent to user %s', user_id)
@shared_task(
bind=True,
name='integrations.sync_to_crm',
max_retries=5,
default_retry_delay=60, # seconds before first retry
autoretry_for=(ConnectionError, TimeoutError),
retry_backoff=True, # exponential backoff
retry_backoff_max=600, # cap at 10 minutes
retry_jitter=True, # randomise to avoid thundering herd
)
def sync_contact_to_crm(self, contact_id: int) -> dict:
"""Sync contact to external CRM with retry on transient failures."""
from apps.crm.services import CRMClient
try:
result = CRMClient().sync(contact_id)
return result
except CRMClient.RateLimitError as exc:
# Specific retry delay from response header
raise self.retry(exc=exc, countdown=int(exc.retry_after))
Design tasks so they can safely run multiple times with the same inputs:
@shared_task(name='orders.mark_shipped')
def mark_order_shipped(order_id: int, tracking_number: str) -> None:
"""Mark order as shipped — safe to run multiple times."""
from apps.orders.models import Order
updated = Order.objects.filter(
pk=order_id,
status=Order.Status.PROCESSING, # Guard: only update if not already shipped
).update(
status=Order.Status.SHIPPED,
tracking_number=tracking_number,
)
if not updated:
logger.info('mark_order_shipped: order %s already shipped or not found', order_id)
from celery.exceptions import SoftTimeLimitExceeded
@shared_task(
bind=True,
name='reports.generate_pdf',
soft_time_limit=120,
time_limit=150,
)
def generate_pdf_report(self, report_id: int) -> str:
"""Generate PDF report with graceful timeout handling."""
from apps.reports.services import PDFGenerator
try:
path = PDFGenerator.build(report_id)
return path
except SoftTimeLimitExceeded:
# Clean up partial files before hard kill
PDFGenerator.cleanup(report_id)
raise
from datetime import timedelta
from django.utils import timezone
# Fire and forget (async)
send_welcome_email.delay(user.pk)
# Schedule in the future
send_reminder.apply_async(args=[user.pk], countdown=3600) # 1 hour from now
send_reminder.apply_async(args=[user.pk], eta=timezone.now() + timedelta(days=1))
# Apply with queue routing
sync_contact_to_crm.apply_async(args=[contact.pk], queue='high_priority')
# Run synchronously (tests / debugging only)
result = generate_pdf_report.apply(args=[report.pk])
# config/settings/base.py
from celery.schedules import crontab
CELERY_BEAT_SCHEDULE = {
'cleanup-expired-sessions': {
'task': 'users.cleanup_expired_sessions',
'schedule': crontab(hour=2, minute=0), # 2am daily
},
'sync-inventory': {
'task': 'products.sync_inventory',
'schedule': 60.0, # every 60 seconds
},
'weekly-digest': {
'task': 'notifications.send_weekly_digest',
'schedule': crontab(day_of_week='monday', hour=8, minute=0),
},
}
# Manage periodic tasks from Django admin or code
from django_celery_beat.models import PeriodicTask, CrontabSchedule
import json
schedule, _ = CrontabSchedule.objects.get_or_create(
hour='*/6', minute='0',
timezone='UTC',
)
PeriodicTask.objects.update_or_create(
name='Sync inventory every 6 hours',
defaults={
'crontab': schedule,
'task': 'products.sync_inventory',
'args': json.dumps([]),
'enabled': True,
}
)
from celery import chain, group, chord
# Chain: run tasks sequentially, passing results
pipeline = chain(
fetch_data.s(source_id),
transform_data.s(), # receives fetch_data result as first arg
load_to_warehouse.s(),
)
pipeline.delay()
# Group: run tasks in parallel
parallel = group(
send_welcome_email.s(user_id)
for user_id in new_user_ids
)
parallel.delay()
# Chord: parallel tasks + callback when all complete
result = chord(
group(process_chunk.s(chunk) for chunk in data_chunks),
aggregate_results.s(), # called with list of chunk results
)
result.delay()
# apps/core/tasks.py
from celery.signals import task_failure
@task_failure.connect
def on_task_failure(sender, task_id, exception, args, kwargs, traceback, einfo, **kw):
"""Log all task failures to Sentry / alerting."""
import sentry_sdk
with sentry_sdk.new_scope() as scope:
scope.set_context('celery', {
'task': sender.name,
'task_id': task_id,
'args': args,
'kwargs': kwargs,
})
sentry_sdk.capture_exception(exception)
# Route failed tasks to dead-letter queue after max retries
@shared_task(
bind=True,
max_retries=3,
name='payments.charge_card',
)
def charge_card(self, order_id: int) -> None:
from apps.payments.models import Order, FailedCharge
try:
_do_charge(order_id)
except Exception as exc:
if self.request.retries >= self.max_retries:
# Persist to dead-letter table for manual review
FailedCharge.objects.create(
order_id=order_id,
error=str(exc),
task_id=self.request.id,
)
return # Don't raise — task is permanently failed
raise self.retry(exc=exc)
# tests/test_tasks.py
import pytest
from unittest.mock import patch, MagicMock
from apps.notifications.tasks import send_welcome_email
class TestSendWelcomeEmail:
@pytest.mark.django_db
def test_sends_email_to_existing_user(self, user):
with patch('apps.notifications.services.EmailService') as mock_email:
send_welcome_email(user.pk)
mock_email.send_welcome.assert_called_once_with(user)
@pytest.mark.django_db
def test_skips_missing_user_gracefully(self):
"""Should not raise when user is deleted between enqueue and execute."""
send_welcome_email(99999) # Non-existent user — must not raise
# config/settings/test.py
CELERY_TASK_ALWAYS_EAGER = True # Run tasks synchronously in tests
CELERY_TASK_EAGER_PROPAGATES = True # Re-raise exceptions from tasks
# tests/test_integration.py
@pytest.mark.django_db
def test_registration_triggers_welcome_email(client):
with patch('apps.notifications.services.EmailService') as mock_email:
response = client.post('/api/users/', {
'email': 'new@example.com',
'password': 'strongpass123',
})
assert response.status_code == 201
mock_email.send_welcome.assert_called_once()
@pytest.mark.django_db
def test_task_retries_on_connection_error():
with patch('apps.crm.services.CRMClient.sync') as mock_sync:
mock_sync.side_effect = ConnectionError('timeout')
with pytest.raises(ConnectionError):
sync_contact_to_crm.apply(args=[1], throw=True)
assert mock_sync.call_count == 1 # First attempt only when eager
# Inspect active workers and queues
celery -A config inspect active
celery -A config inspect stats
celery -A config inspect reserved
# Check queue lengths (Redis)
redis-cli llen celery
# Flower: web-based real-time monitor
pip install flower
celery -A config flower --port=5555
# BAD: Passing model instances — they may be stale by execution time
send_welcome_email.delay(user) # Never pass ORM objects
send_welcome_email.delay(user.pk) # Always pass PKs
# BAD: Calling tasks synchronously in production views
result = generate_report.apply() # Blocks the request thread
# BAD: Non-idempotent task without guards
@shared_task
def charge_and_fulfill(order_id):
order.charge() # May charge twice if task retries!
order.fulfill()
# GOOD: Idempotent with status guard
@shared_task
def charge_and_fulfill(order_id):
order = Order.objects.select_for_update().get(pk=order_id)
if order.status != Order.Status.PENDING:
return # Already processed
order.charge()
order.fulfill()
| Check | Setting |
|---|---|
| Worker restarts on crash | supervisord or systemd unit |
CELERY_TASK_ACKS_LATE = True | Re-queue tasks on worker crash |
CELERY_WORKER_PREFETCH_MULTIPLIER = 1 | Fair distribution of long tasks |
| Separate queues per priority | -Q default,high_priority,low_priority |
CELERY_TASK_SOFT_TIME_LIMIT set | Graceful timeout before hard kill |
| Sentry integration | Capture all task_failure signals |
| Flower or other monitor | Visibility into queue depths |
| Beat runs on single node only | Prevents duplicate scheduled task execution |
django-patterns — ORM, service layer, and project structuredjango-tdd — Testing Django models, views, and servicespython-testing — pytest configuration and fixtures