Async task processing with Celery, ARQ, and Redis for Python backends. Use when implementing background tasks, job queues, workers, scheduled jobs, or periodic task processing.
/plugin marketplace add yonatangross/skillforge-claude-plugin/plugin install skillforge-complete@skillforgeThis skill inherits all available tools. When active, it can use any tool Claude has access to.
checklists/background-jobs-checklist.mdexamples/arq-fastapi.mdreferences/task-queue-patterns.mdtemplates/arq-worker-template.pyOffload long-running tasks with async job queues.
| Tool | Language | Best For | Complexity |
|---|---|---|---|
| ARQ | Python (async) | FastAPI, simple jobs | Low |
| Celery | Python | Complex workflows, enterprise | High |
| RQ | Python | Simple Redis queues | Low |
| Dramatiq | Python | Reliable messaging | Medium |
# backend/app/workers/arq_worker.py
from arq import create_pool
from arq.connections import RedisSettings
async def startup(ctx: dict):
"""Initialize worker resources."""
ctx["db"] = await create_db_pool()
ctx["http"] = httpx.AsyncClient()
async def shutdown(ctx: dict):
"""Cleanup worker resources."""
await ctx["db"].close()
await ctx["http"].aclose()
class WorkerSettings:
redis_settings = RedisSettings(host="redis", port=6379)
functions = [
send_email,
generate_report,
process_webhook,
]
on_startup = startup
on_shutdown = shutdown
max_jobs = 10
job_timeout = 300 # 5 minutes
from arq import func
async def send_email(
ctx: dict,
to: str,
subject: str,
body: str,
) -> dict:
"""Send email task."""
http = ctx["http"]
response = await http.post(
"https://api.sendgrid.com/v3/mail/send",
json={"to": to, "subject": subject, "html": body},
headers={"Authorization": f"Bearer {SENDGRID_KEY}"},
)
return {"status": response.status_code, "to": to}
async def generate_report(
ctx: dict,
report_id: str,
format: str = "pdf",
) -> dict:
"""Generate report asynchronously."""
db = ctx["db"]
data = await db.fetch_report_data(report_id)
pdf_bytes = await render_pdf(data)
await db.save_report_file(report_id, pdf_bytes)
return {"report_id": report_id, "size": len(pdf_bytes)}
from arq import create_pool
from arq.connections import RedisSettings
# Dependency
async def get_arq_pool():
return await create_pool(RedisSettings(host="redis"))
@router.post("/api/v1/reports")
async def create_report(
data: ReportRequest,
arq: ArqRedis = Depends(get_arq_pool),
):
report = await service.create_report(data)
# Enqueue background job
job = await arq.enqueue_job(
"generate_report",
report.id,
format=data.format,
)
return {"report_id": report.id, "job_id": job.job_id}
@router.get("/api/v1/jobs/{job_id}")
async def get_job_status(
job_id: str,
arq: ArqRedis = Depends(get_arq_pool),
):
job = Job(job_id, arq)
status = await job.status()
result = await job.result() if status == JobStatus.complete else None
return {"job_id": job_id, "status": status, "result": result}
# backend/app/workers/celery_app.py
from celery import Celery
celery_app = Celery(
"skillforge",
broker="redis://redis:6379/0",
backend="redis://redis:6379/1",
)
celery_app.conf.update(
task_serializer="json",
accept_content=["json"],
result_serializer="json",
timezone="UTC",
task_track_started=True,
task_time_limit=600, # 10 minutes hard limit
task_soft_time_limit=540, # 9 minutes soft limit
worker_prefetch_multiplier=1, # Fair distribution
task_acks_late=True, # Acknowledge after completion
task_reject_on_worker_lost=True,
)
from celery import shared_task
from celery.utils.log import get_task_logger
logger = get_task_logger(__name__)
@shared_task(
bind=True,
max_retries=3,
default_retry_delay=60,
autoretry_for=(ConnectionError, TimeoutError),
)
def send_email(self, to: str, subject: str, body: str) -> dict:
"""Send email with automatic retry."""
try:
response = requests.post(
"https://api.sendgrid.com/v3/mail/send",
json={"to": to, "subject": subject, "html": body},
headers={"Authorization": f"Bearer {SENDGRID_KEY}"},
timeout=30,
)
response.raise_for_status()
return {"status": "sent", "to": to}
except Exception as exc:
logger.error(f"Email failed: {exc}")
raise self.retry(exc=exc)
@shared_task(bind=True)
def generate_report(self, report_id: str) -> dict:
"""Long-running report generation."""
self.update_state(state="PROGRESS", meta={"step": "fetching"})
data = fetch_report_data(report_id)
self.update_state(state="PROGRESS", meta={"step": "rendering"})
pdf = render_pdf(data)
self.update_state(state="PROGRESS", meta={"step": "saving"})
save_report(report_id, pdf)
return {"report_id": report_id, "size": len(pdf)}
from celery import chain, group, chord
# Sequential execution
workflow = chain(
extract_data.s(source_id),
transform_data.s(),
load_data.s(destination_id),
)
result = workflow.apply_async()
# Parallel execution
parallel = group(
process_chunk.s(chunk) for chunk in chunks
)
result = parallel.apply_async()
# Parallel with callback
chord_workflow = chord(
[process_chunk.s(chunk) for chunk in chunks],
aggregate_results.s(),
)
result = chord_workflow.apply_async()
from celery.schedules import crontab
celery_app.conf.beat_schedule = {
"cleanup-expired-sessions": {
"task": "app.workers.tasks.cleanup_sessions",
"schedule": crontab(minute=0, hour="*/6"), # Every 6 hours
},
"generate-daily-report": {
"task": "app.workers.tasks.daily_report",
"schedule": crontab(minute=0, hour=2), # 2 AM daily
},
"sync-external-data": {
"task": "app.workers.tasks.sync_data",
"schedule": 300.0, # Every 5 minutes
},
}
from fastapi import BackgroundTasks
@router.post("/api/v1/users")
async def create_user(
data: UserCreate,
background_tasks: BackgroundTasks,
):
user = await service.create_user(data)
# Simple background task (in-process)
background_tasks.add_task(send_welcome_email, user.email)
return user
# For distributed tasks, use ARQ/Celery
@router.post("/api/v1/exports")
async def create_export(
data: ExportRequest,
arq: ArqRedis = Depends(get_arq_pool),
):
job = await arq.enqueue_job("export_data", data.dict())
return {"job_id": job.job_id}
from enum import Enum
class JobStatus(Enum):
PENDING = "pending"
STARTED = "started"
PROGRESS = "progress"
SUCCESS = "success"
FAILURE = "failure"
REVOKED = "revoked"
@router.get("/api/v1/jobs/{job_id}")
async def get_job(job_id: str):
# Celery
result = AsyncResult(job_id, app=celery_app)
return {
"job_id": job_id,
"status": result.status,
"result": result.result if result.ready() else None,
"progress": result.info if result.status == "PROGRESS" else None,
}
# NEVER run long tasks synchronously
@router.post("/api/v1/reports")
async def create_report(data: ReportRequest):
pdf = await generate_pdf(data) # Blocks for minutes!
return pdf
# NEVER lose jobs on failure
@shared_task
def risky_task():
do_work() # No retry, no error handling
# NEVER store large results in Redis
@shared_task
def process_file(file_id: str) -> bytes:
return large_file_bytes # Store in S3/DB instead!
# NEVER use BackgroundTasks for distributed work
background_tasks.add_task(long_running_job) # Lost if server restarts
| Decision | Recommendation |
|---|---|
| Simple async | ARQ (native async) |
| Complex workflows | Celery (chains, chords) |
| In-process quick | FastAPI BackgroundTasks |
| LLM workflows | LangGraph (not Celery) |
| Result storage | Redis for status, S3/DB for data |
| Retry strategy | Exponential backoff with jitter |
langgraph-checkpoints - LLM workflow persistenceresilience-patterns - Retry and fallbackobservability-monitoring - Job metricsKeywords: arq, async queue, redis queue, background task Solves:
Keywords: celery, task queue, distributed tasks, worker Solves:
Keywords: chain, group, chord, celery workflow Solves:
Keywords: periodic, scheduled, cron, celery beat Solves:
This skill should be used when the user asks to "create an agent", "add an agent", "write a subagent", "agent frontmatter", "when to use description", "agent examples", "agent tools", "agent colors", "autonomous agent", or needs guidance on agent structure, system prompts, triggering conditions, or agent development best practices for Claude Code plugins.
This skill should be used when the user asks to "create a slash command", "add a command", "write a custom command", "define command arguments", "use command frontmatter", "organize commands", "create command with file references", "interactive command", "use AskUserQuestion in command", or needs guidance on slash command structure, YAML frontmatter fields, dynamic arguments, bash execution in commands, user interaction patterns, or command development best practices for Claude Code.
This skill should be used when the user asks to "create a hook", "add a PreToolUse/PostToolUse/Stop hook", "validate tool use", "implement prompt-based hooks", "use ${CLAUDE_PLUGIN_ROOT}", "set up event-driven automation", "block dangerous commands", or mentions hook events (PreToolUse, PostToolUse, Stop, SubagentStop, SessionStart, SessionEnd, UserPromptSubmit, PreCompact, Notification). Provides comprehensive guidance for creating and implementing Claude Code plugin hooks with focus on advanced prompt-based hooks API.