From compound-engineering-feat-python
Implement async task queues with Celery and real-time WebSocket support with Django Channels. Use when a Django project includes celery or channels in its dependencies, or when implementing background processing, periodic tasks, WebSocket connections, or real-time features.
npx claudepluginhub weorbitant/compound-engineering-feat-python-plugin --plugin compound-engineering-feat-pythonThis skill uses the workspace's default tool permissions.
Patterns for background task processing with Celery and real-time communication with
Searches, retrieves, and installs Agent Skills from prompts.chat registry using MCP tools like search_skills and get_skill. Activates for finding skills, browsing catalogs, or extending Claude.
Searches prompts.chat for AI prompt templates by keyword or category, retrieves by ID with variable handling, and improves prompts via AI. Use for discovering or enhancing prompts.
Designs and optimizes AI agent action spaces, tool definitions, observation formats, error recovery, and context for higher task completion rates.
Patterns for background task processing with Celery and real-time communication with Django Channels in Django projects.
celery in dependencies: background tasks, periodic scheduling, task chainschannels in dependencies: WebSockets, real-time updates, async consumersDistributed task queue for executing work outside the request/response cycle. Core concepts:
Extends Django beyond HTTP to handle WebSockets, chat protocols, and other async protocols:
# proj/celery.py
import os
from celery import Celery
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "proj.settings")
app = Celery("proj")
app.config_from_object("django.conf:settings", namespace="CELERY")
app.autodiscover_tasks()
# proj/__init__.py
from .celery import app as celery_app
__all__ = ["celery_app"]
# settings.py
CELERY_BROKER_URL = "redis://localhost:6379/0"
CELERY_RESULT_BACKEND = "redis://localhost:6379/0"
# myapp/tasks.py
from celery import shared_task
@shared_task
def send_welcome_email(user_id: int) -> str:
user = User.objects.get(id=user_id)
# send email logic
return f"Email sent to {user.email}"
# settings.py
INSTALLED_APPS = [
"daphne",
"channels",
# ...
]
ASGI_APPLICATION = "proj.asgi.application"
CHANNEL_LAYERS = {
"default": {
"BACKEND": "channels_redis.core.RedisChannelLayer",
"CONFIG": {"hosts": [("127.0.0.1", 6379)]},
},
}
# proj/asgi.py
import os
from channels.routing import ProtocolTypeRouter, URLRouter
from channels.auth import AuthMiddlewareStack
from django.core.asgi import get_asgi_application
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "proj.settings")
django_asgi_app = get_asgi_application()
from myapp.routing import websocket_urlpatterns
application = ProtocolTypeRouter({
"http": django_asgi_app,
"websocket": AuthMiddlewareStack(URLRouter(websocket_urlpatterns)),
})
| Scenario | Solution |
|---|---|
| Offload slow work from HTTP request | Celery task |
| Run something every hour/day | Celery Beat periodic task |
| Chain multiple async steps | Celery canvas (chain/group/chord) |
| Push live updates to browser | Channels WebSocket consumer |
| Notify user when Celery task completes | Celery task + Channels group_send |
| Chat or collaborative editing | Channels with group messaging |
| File processing pipeline | Celery chain with retry |
Send real-time updates to the browser when a background task completes:
# tasks.py
from asgiref.sync import async_to_sync
from channels.layers import get_channel_layer
from celery import shared_task
@shared_task
def process_report(report_id: int, user_id: int) -> dict:
report = generate_report(report_id)
channel_layer = get_channel_layer()
async_to_sync(channel_layer.group_send)(
f"user_{user_id}",
{"type": "report.ready", "report_id": report_id, "url": report.url},
)
return {"status": "complete", "report_id": report_id}
# consumers.py
from channels.generic.websocket import JsonWebsocketConsumer
class NotificationConsumer(JsonWebsocketConsumer):
def connect(self):
self.user_group = f"user_{self.scope['user'].id}"
self.channel_layer.group_add(self.user_group, self.channel_name)
self.accept()
def disconnect(self, close_code):
self.channel_layer.group_discard(self.user_group, self.channel_name)
def report_ready(self, event):
self.send_json({"type": "report_ready", "data": event})