Celery 5.3+ distributed task queue with Beat scheduler, Redis/RabbitMQ brokers, workflow patterns, and FastAPI integration. Use for background jobs, periodic tasks, and async processing.
Implements Celery 5.3+ distributed task queues with Beat scheduler for background job processing. Claude uses this when `celery` is in dependencies or task decorators are detected to set up async workflows, periodic tasks, and FastAPI integrations.
/plugin marketplace add FortiumPartners/ensemble/plugin install ensemble-development@ensembleThis skill inherits all available tools. When active, it can use any tool Claude has access to.
README.mdREFERENCE.mdVALIDATION.mdexamples/README.mdexamples/fastapi_celery.example.pyexamples/task_patterns.example.pytemplates/README.mdtemplates/beat_schedule.template.pytemplates/celery_config.template.pytemplates/pytest_celery.template.pytemplates/task.template.pyCelery 5.3+ distributed task queue with Beat scheduler for Python applications. Background job processing, periodic scheduling, workflow patterns, and FastAPI integration.
This skill is loaded by backend-developer when:
celery or celery[redis] in dependenciesceleryconfig.py or celery.py present@app.task) foundMinimum Detection Confidence: 0.8 (80%)
Prerequisite: Python skill should be loaded for core patterns.
my_project/
├── src/my_app/
│ ├── celery_app.py # Celery application
│ ├── config.py # Settings
│ ├── tasks/ # Task modules
│ │ ├── email.py
│ │ ├── reports.py
│ │ └── cleanup.py
│ └── workers/queues.py # Queue definitions
├── tests/
│ ├── conftest.py # Celery fixtures
│ └── tasks/
├── docker-compose.yml # Redis + workers
└── pyproject.toml
from celery import Celery
from kombu import Queue
from .config import settings
app = Celery(
"my_app",
broker=settings.celery_broker_url,
backend=settings.celery_result_backend,
include=["my_app.tasks.email", "my_app.tasks.reports"],
)
app.conf.update(
task_serializer="json",
accept_content=["json"],
result_serializer="json",
timezone="UTC",
enable_utc=True,
task_track_started=True,
task_time_limit=300,
task_soft_time_limit=240,
worker_prefetch_multiplier=1,
task_acks_late=True,
task_reject_on_worker_lost=True,
)
# Queue routing
app.conf.task_queues = (
Queue("default", routing_key="default"),
Queue("high_priority", routing_key="high"),
Queue("low_priority", routing_key="low"),
)
from celery import shared_task
from my_app.celery_app import app
@shared_task(name="tasks.add")
def add(x: int, y: int) -> int:
return x + y
@app.task(bind=True, name="tasks.send_email")
def send_email(self, to: str, subject: str, body: str) -> dict:
task_id = self.request.id
return {"task_id": task_id, "status": "sent"}
@shared_task(
bind=True,
max_retries=3,
default_retry_delay=60,
autoretry_for=(httpx.TimeoutException, httpx.ConnectError),
retry_backoff=True,
retry_backoff_max=600,
retry_jitter=True,
)
def call_external_api(self, endpoint: str, payload: dict) -> dict:
with httpx.Client(timeout=30) as client:
response = client.post(endpoint, json=payload)
response.raise_for_status()
return response.json()
@shared_task(bind=True, rate_limit="10/m", name="tasks.send_sms")
def send_sms(self, phone: str, message: str) -> dict:
return sms_service.send(phone, message)
from celery.exceptions import SoftTimeLimitExceeded
@shared_task(bind=True, soft_time_limit=300, time_limit=360)
def generate_report(self, report_id: int) -> dict:
try:
return build_report(report_id)
except SoftTimeLimitExceeded:
partial_save(report_id)
raise
See REFERENCE.md for manual retry, progress tracking, and custom retry backoff patterns.
app.conf.task_routes = {
"tasks.send_email": {"queue": "high_priority"},
"tasks.generate_report": {"queue": "low_priority"},
"tasks.process_payment": {"queue": "payments"},
"tasks.*": {"queue": "default"},
}
process_order.apply_async(args=[123], queue="high_priority")
process_order.apply_async(args=[456], routing_key="payments")
# High priority only
celery -A my_app.celery_app worker -Q high_priority -c 4
# Multiple queues
celery -A my_app.celery_app worker -Q default,low_priority -c 2
from celery.schedules import crontab
app.conf.beat_schedule = {
"health-check": {
"task": "tasks.health_check",
"schedule": 30.0, # Every 30 seconds
},
"daily-report": {
"task": "tasks.generate_daily_report",
"schedule": crontab(hour=2, minute=0), # Daily at 2 AM
},
"weekly-summary": {
"task": "tasks.send_weekly_summary",
"schedule": crontab(hour=9, minute=0, day_of_week=1), # Monday 9 AM
},
}
| Pattern | Expression |
|---|---|
| Every minute | crontab() |
| Every 15 min | crontab(minute="*/15") |
| Daily midnight | crontab(hour=0, minute=0) |
| Weekdays 9 AM | crontab(hour=9, minute=0, day_of_week="1-5") |
| Monthly 1st | crontab(hour=0, minute=0, day_of_month=1) |
# Standalone
celery -A my_app.celery_app beat --loglevel=info
# With worker (dev only)
celery -A my_app.celery_app worker --beat --loglevel=info
See REFERENCE.md for dynamic database schedules and advanced crontab patterns.
from celery import chain
workflow = chain(
fetch_data.s(url),
process_data.s(),
save_results.s(destination),
)
result = workflow.apply_async()
from celery import group
workflow = group(process_image.s(id) for id in image_ids)
result = workflow.apply_async()
all_results = result.get()
from celery import chord
workflow = chord(
(process_chunk.s(chunk) for chunk in chunks),
aggregate_results.s()
)
result = workflow.apply_async()
See REFERENCE.md for complex multi-step workflows and error handling in chains.
from fastapi import APIRouter
from celery.result import AsyncResult
from .celery_app import celery_app
from .tasks.email import send_email
router = APIRouter()
@router.post("/emails/send")
async def queue_email(to: str, subject: str, body: str) -> dict:
task = send_email.delay(to, subject, body)
return {"task_id": task.id, "status": "queued"}
@router.get("/tasks/{task_id}/status")
async def get_task_status(task_id: str) -> dict:
result = AsyncResult(task_id, app=celery_app)
response = {"task_id": task_id, "status": result.status, "ready": result.ready()}
if result.ready():
response["result"] = result.get() if result.successful() else str(result.result)
return response
@shared_task(bind=True)
def process_large_file(self, file_id: int) -> dict:
file_data = load_file(file_id)
for i, chunk in enumerate(file_data):
process_chunk(chunk)
self.update_state(state="PROGRESS", meta={"current": i + 1, "total": len(file_data)})
return {"processed": len(file_data)}
See REFERENCE.md for polling patterns, revocation, and lifespan management.
import pytest
@pytest.fixture(scope="session")
def celery_config():
return {
"broker_url": "memory://",
"result_backend": "cache+memory://",
"task_always_eager": True,
"task_eager_propagates": True,
}
def test_send_email_success(celery_app):
with patch("my_app.tasks.email.email_client") as mock:
mock.send.return_value = {"id": "msg_123"}
result = send_email.delay("user@example.com", "Test", "Hello")
assert result.successful()
assert result.get()["status"] == "sent"
See REFERENCE.md for integration tests with real workers and Beat schedule testing.
celery -A my_app.celery_app worker --loglevel=info
celery -A my_app.celery_app worker -c 4 -Q high,default
celery -A my_app.celery_app worker --pool=gevent -c 100
celery -A my_app.celery_app worker --autoscale=10,3
celery -A my_app.celery_app inspect active
celery -A my_app.celery_app inspect registered
celery -A my_app.celery_app inspect scheduled
celery -A my_app.celery_app inspect ping
celery -A my_app.celery_app control shutdown
celery -A my_app.celery_app purge
celery -A my_app.celery_app control revoke <task_id>
celery -A my_app.celery_app control rate_limit tasks.send_email 10/m
# Broker & backend
broker_url = "redis://localhost:6379/0"
result_backend = "redis://localhost:6379/1"
result_expires = 3600
# Serialization
task_serializer = "json"
result_serializer = "json"
accept_content = ["json"]
# Execution
task_time_limit = 300
task_soft_time_limit = 240
task_acks_late = True
task_reject_on_worker_lost = True
# Worker
worker_prefetch_multiplier = 1
worker_concurrency = 4
See REFERENCE.md for full configuration reference and environment-based settings.
| Anti-Pattern | Problem | Solution |
|---|---|---|
| Blocking in tasks | time.sleep() blocks worker | Use countdown or async |
| Large arguments | Megabytes through broker | Pass ID, fetch in task |
| Not idempotent | Duplicate charges on retry | Use idempotency keys |
| Ignoring results | Memory leaks in backend | Set ignore_result=True or configure result_expires |
| DB in task module | Import-time connections | Import inside task function |
See REFERENCE.md for detailed examples and solutions.
This skill should be used when the user asks to "create a slash command", "add a command", "write a custom command", "define command arguments", "use command frontmatter", "organize commands", "create command with file references", "interactive command", "use AskUserQuestion in command", or needs guidance on slash command structure, YAML frontmatter fields, dynamic arguments, bash execution in commands, user interaction patterns, or command development best practices for Claude Code.
This skill should be used when the user asks to "create an agent", "add an agent", "write a subagent", "agent frontmatter", "when to use description", "agent examples", "agent tools", "agent colors", "autonomous agent", or needs guidance on agent structure, system prompts, triggering conditions, or agent development best practices for Claude Code plugins.
This skill should be used when the user asks to "create a hook", "add a PreToolUse/PostToolUse/Stop hook", "validate tool use", "implement prompt-based hooks", "use ${CLAUDE_PLUGIN_ROOT}", "set up event-driven automation", "block dangerous commands", or mentions hook events (PreToolUse, PostToolUse, Stop, SubagentStop, SessionStart, SessionEnd, UserPromptSubmit, PreCompact, Notification). Provides comprehensive guidance for creating and implementing Claude Code plugin hooks with focus on advanced prompt-based hooks API.