From fullstack-agents
Production-ready Celery worker configuration for distributed task processing.
npx claudepluginhub adelabdelgawad/fullstack-agents --plugin fullstack-agentsThis skill uses the workspace's default tool permissions.
Production-ready Celery worker configuration for distributed task processing.
Searches, retrieves, and installs Agent Skills from prompts.chat registry using MCP tools like search_skills and get_skill. Activates for finding skills, browsing catalogs, or extending Claude.
Searches prompts.chat for AI prompt templates by keyword or category, retrieves by ID with variable handling, and improves prompts via AI. Use for discovering or enhancing prompts.
Checks Next.js compilation errors using a running Turbopack dev server after code edits. Fixes actionable issues before reporting complete. Replaces `next build`.
Production-ready Celery worker configuration for distributed task processing.
Use this skill when asked to:
┌─────────────────────────────────────────────────────────────┐
│ FastAPI Application │
│ dispatch_to_celery() → task.delay() → Redis Broker │
└─────────────────────────────┬───────────────────────────────┘
│
▼
┌─────────────────┐
│ Redis Broker │
│ (Message Queue)│
└────────┬────────┘
│
┌──────────────┼──────────────┐
│ │ │
▼ ▼ ▼
┌──────────┐ ┌──────────┐ ┌──────────┐
│ Worker 1 │ │ Worker 2 │ │ Worker 3 │
│ Q: celery│ │ Q: files │ │ Q: email │
└──────────┘ └──────────┘ └──────────┘
│
▼
┌─────────────────┐
│ Result Backend │
│ (Redis) │
└─────────────────┘
# celery_app.py
from celery import Celery
from settings import settings
celery_app = Celery(
"app_name",
broker=settings.CELERY_BROKER_URL,
backend=settings.CELERY_RESULT_BACKEND,
include=[
"tasks.email",
"tasks.files",
"tasks.scheduler",
],
)
celery_app.conf.update(
# Serialization
task_serializer="json",
result_serializer="json",
accept_content=["json"],
# Reliability - CRITICAL
task_acks_late=True, # Ack after completion
task_reject_on_worker_lost=True, # Requeue if worker dies
task_track_started=True, # Track task start
# Results
result_expires=86400, # 24 hours
# Worker Performance
worker_prefetch_multiplier=1, # Fair distribution
worker_concurrency=10, # Concurrent tasks
# Time Limits - IMPORTANT
task_soft_time_limit=300, # 5 min soft limit
task_time_limit=360, # 6 min hard limit
# Retries
task_default_retry_delay=60, # 1 min default delay
# Timezone
timezone="UTC",
enable_utc=True,
# Connection
broker_connection_retry_on_startup=True,
)
@shared_task(
bind=True, # Access self
max_retries=3, # Retry 3 times
default_retry_delay=60, # 1 min delay
autoretry_for=(Exception,), # Auto-retry
retry_backoff=True, # Exponential backoff
retry_backoff_max=300, # Max 5 min
retry_jitter=True, # Prevent thundering herd
soft_time_limit=120, # Soft limit
time_limit=180, # Hard limit
)
def my_task(self, **kwargs):
pass
# Basic worker
celery -A celery_app worker --loglevel=info
# With concurrency
celery -A celery_app worker --loglevel=info --concurrency=4
# Specific queues
celery -A celery_app worker -Q celery,file_queue,email_queue
# With gevent pool (high concurrency)
celery -A celery_app worker -P gevent --concurrency=100
# celery_app.py
celery_app.conf.task_routes = {
'tasks.email.*': {'queue': 'email'},
'tasks.files.*': {'queue': 'files'},
'tasks.heavy.*': {'queue': 'heavy'},
}
# Or per-task
@shared_task(queue='high_priority')
def urgent_task():
pass