Help us improve
Share bugs, ideas, or general feedback.
From python-development
Python background job patterns including task queues, workers, and event-driven architecture. Use when implementing async task processing, job queues, long-running operations, or decoupling work from request/response cycles.
npx claudepluginhub bachsh/supermarket --plugin python-developmentHow this skill is triggered — by the user, by Claude, or both
Slash command
/python-development:python-background-jobsThe summary Claude sees in its skill listing — used to decide when to auto-load this skill
Decouple long-running or unreliable work from request/response cycles. Return immediately to the user while background workers handle the heavy lifting asynchronously.
Searches, retrieves, and installs Agent Skills from prompts.chat registry using MCP tools like search_skills and get_skill. Activates for finding skills, browsing catalogs, or extending Claude.
Provides behavioral guidelines to reduce common LLM coding mistakes, focusing on simplicity, surgical changes, assumption surfacing, and verifiable success criteria.
Create and present web-based slidedecks using Slidev with Markdown, Vue components, code highlighting, animations, interactive demos, LaTeX, and Mermaid for technical presentations, conference talks, code walkthroughs, and teaching materials.
Share bugs, ideas, or general feedback.
Decouple long-running or unreliable work from request/response cycles. Return immediately to the user while background workers handle the heavy lifting asynchronously.
API accepts request, enqueues a job, returns immediately with a job ID. Workers process jobs asynchronously.
Tasks may be retried on failure. Design for safe re-execution.
Jobs transition through states: pending → running → succeeded/failed.
Most queues guarantee at-least-once delivery. Your code must handle duplicates.
This skill uses Celery for examples, a widely adopted task queue. Alternatives like RQ, Dramatiq, and cloud-native solutions (AWS SQS, GCP Tasks) are equally valid choices.
from celery import Celery
app = Celery("tasks", broker="redis://localhost:6379")
@app.task
def send_email(to: str, subject: str, body: str) -> None:
# This runs in a background worker
email_client.send(to, subject, body)
# In your API handler
send_email.delay("user@example.com", "Welcome!", "Thanks for signing up")
For operations exceeding a few seconds, return a job ID and process asynchronously.
from uuid import uuid4
from dataclasses import dataclass
from enum import Enum
from datetime import datetime
class JobStatus(Enum):
PENDING = "pending"
RUNNING = "running"
SUCCEEDED = "succeeded"
FAILED = "failed"
@dataclass
class Job:
id: str
status: JobStatus
created_at: datetime
started_at: datetime | None = None
completed_at: datetime | None = None
result: dict | None = None
error: str | None = None
# API endpoint
async def start_export(request: ExportRequest) -> JobResponse:
"""Start export job and return job ID."""
job_id = str(uuid4())
# Persist job record
await jobs_repo.create(Job(
id=job_id,
status=JobStatus.PENDING,
created_at=datetime.utcnow(),
))
# Enqueue task for background processing
await task_queue.enqueue(
"export_data",
job_id=job_id,
params=request.model_dump(),
)
# Return immediately with job ID
return JobResponse(
job_id=job_id,
status="pending",
poll_url=f"/jobs/{job_id}",
)
Configure Celery tasks with proper retry and timeout settings.
from celery import Celery
app = Celery("tasks", broker="redis://localhost:6379")
# Global configuration
app.conf.update(
task_time_limit=3600, # Hard limit: 1 hour
task_soft_time_limit=3000, # Soft limit: 50 minutes
task_acks_late=True, # Acknowledge after completion
task_reject_on_worker_lost=True,
worker_prefetch_multiplier=1, # Don't prefetch too many tasks
)
@app.task(
bind=True,
max_retries=3,
default_retry_delay=60,
autoretry_for=(ConnectionError, TimeoutError),
)
def process_payment(self, payment_id: str) -> dict:
"""Process payment with automatic retry on transient errors."""
try:
result = payment_gateway.charge(payment_id)
return {"status": "success", "transaction_id": result.id}
except PaymentDeclinedError as e:
# Don't retry permanent failures
return {"status": "declined", "reason": str(e)}
except TransientError as e:
# Retry with exponential backoff
raise self.retry(exc=e, countdown=2 ** self.request.retries * 60)
Workers may retry on crash or timeout. Design for safe re-execution.
@app.task(bind=True)
def process_order(self, order_id: str) -> None:
"""Process order idempotently."""
order = orders_repo.get(order_id)
# Already processed? Return early
if order.status == OrderStatus.COMPLETED:
logger.info("Order already processed", order_id=order_id)
return
# Already in progress? Check if we should continue
if order.status == OrderStatus.PROCESSING:
# Use idempotency key to avoid double-charging
pass
# Process with idempotency key
result = payment_provider.charge(
amount=order.total,
idempotency_key=f"order-{order_id}", # Critical!
)
orders_repo.update(order_id, status=OrderStatus.COMPLETED)
Idempotency Strategies:
INSERT ... ON CONFLICT UPDATEPersist job state transitions for visibility and debugging.
class JobRepository:
"""Repository for managing job state."""
async def create(self, job: Job) -> Job:
"""Create new job record."""
await self._db.execute(
"""INSERT INTO jobs (id, status, created_at)
VALUES ($1, $2, $3)""",
job.id, job.status.value, job.created_at,
)
return job
async def update_status(
self,
job_id: str,
status: JobStatus,
**fields,
) -> None:
"""Update job status with timestamp."""
updates = {"status": status.value, **fields}
if status == JobStatus.RUNNING:
updates["started_at"] = datetime.utcnow()
elif status in (JobStatus.SUCCEEDED, JobStatus.FAILED):
updates["completed_at"] = datetime.utcnow()
await self._db.execute(
"UPDATE jobs SET status = $1, ... WHERE id = $2",
updates, job_id,
)
logger.info(
"Job status updated",
job_id=job_id,
status=status.value,
)
Handle permanently failed tasks for manual inspection.
@app.task(bind=True, max_retries=3)
def process_webhook(self, webhook_id: str, payload: dict) -> None:
"""Process webhook with DLQ for failures."""
try:
result = send_webhook(payload)
if not result.success:
raise WebhookFailedError(result.error)
except Exception as e:
if self.request.retries >= self.max_retries:
# Move to dead letter queue for manual inspection
dead_letter_queue.send({
"task": "process_webhook",
"webhook_id": webhook_id,
"payload": payload,
"error": str(e),
"attempts": self.request.retries + 1,
"failed_at": datetime.utcnow().isoformat(),
})
logger.error(
"Webhook moved to DLQ after max retries",
webhook_id=webhook_id,
error=str(e),
)
return
# Exponential backoff retry
raise self.retry(exc=e, countdown=2 ** self.request.retries * 60)
Provide an endpoint for clients to check job status.
from fastapi import FastAPI, HTTPException
app = FastAPI()
@app.get("/jobs/{job_id}")
async def get_job_status(job_id: str) -> JobStatusResponse:
"""Get current status of a background job."""
job = await jobs_repo.get(job_id)
if job is None:
raise HTTPException(404, f"Job {job_id} not found")
return JobStatusResponse(
job_id=job.id,
status=job.status.value,
created_at=job.created_at,
started_at=job.started_at,
completed_at=job.completed_at,
result=job.result if job.status == JobStatus.SUCCEEDED else None,
error=job.error if job.status == JobStatus.FAILED else None,
# Helpful for clients
is_terminal=job.status in (JobStatus.SUCCEEDED, JobStatus.FAILED),
)
Compose complex workflows from simple tasks.
from celery import chain, group, chord
# Simple chain: A → B → C
workflow = chain(
extract_data.s(source_id),
transform_data.s(),
load_data.s(destination_id),
)
# Parallel execution: A, B, C all at once
parallel = group(
send_email.s(user_email),
send_sms.s(user_phone),
update_analytics.s(event_data),
)
# Chord: Run tasks in parallel, then a callback
# Process all items, then send completion notification
workflow = chord(
[process_item.s(item_id) for item_id in item_ids],
send_completion_notification.s(batch_id),
)
workflow.apply_async()
Choose the right tool for your needs.
RQ (Redis Queue): Simple, Redis-based
from rq import Queue
from redis import Redis
queue = Queue(connection=Redis())
job = queue.enqueue(send_email, "user@example.com", "Subject", "Body")
Dramatiq: Modern Celery alternative
import dramatiq
from dramatiq.brokers.redis import RedisBroker
dramatiq.set_broker(RedisBroker())
@dramatiq.actor
def send_email(to: str, subject: str, body: str) -> None:
email_client.send(to, subject, body)
Cloud-native options: