From litestar-skills
Auto-activate for litestar_saq imports, SAQPlugin, SAQConfig, QueueConfig, TaskQueues. The first-party Litestar plugin around SAQ (Simple Async Queue): background tasks, cron jobs, queue web UI, `litestar workers run` CLI, DI of `TaskQueues`. Produces SAQPlugin configs, QueueConfig definitions, task functions, CronJobs, and DI-injected enqueue patterns. Use when: adding background jobs to a Litestar app, scheduling cron work, exposing the SAQ web UI, or running workers via the Litestar CLI. Not for Celery, RQ, or Dramatiq — Litestar's first-party choice is SAQ. For raw SAQ patterns outside Litestar, see standalone SAQ docs. Or document the custom-PG-native alternative pattern when SAQ is explicitly rejected.
npx claudepluginhub litestar-org/litestar-skills --plugin litestar-skillsThis skill uses the workspace's default tool permissions.
`litestar-saq` is the first-party plugin that integrates [SAQ (Simple Async Queue)](https://github.com/tobymao/saq) with Litestar. It provides:
Guides Payload CMS config (payload.config.ts), collections, fields, hooks, access control, APIs. Debugs validation errors, security, relationships, queries, transactions, hook behavior.
Builds scalable data pipelines, modern data warehouses, and real-time streaming architectures using Spark, dbt, Airflow, Kafka, and cloud platforms like Snowflake, BigQuery.
Builds production Apache Airflow DAGs with best practices for operators, sensors, testing, and deployment. For data pipelines, workflow orchestration, and batch job scheduling.
litestar-saq is the first-party plugin that integrates SAQ (Simple Async Queue) with Litestar. It provides:
SAQPlugin — registers queues, workers, lifespan management, and DI for TaskQueuesSAQConfig / QueueConfig — declarative queue + worker configurationlitestar workers run — CLI to start workers in-process or as a separate processTaskQueues into route handlers for ergonomic enqueueingT | None, never Optional[T]from __future__ import annotations — canonical Litestar apps do.async def.ctx: dict (the SAQ context dict).* are keyword-only.The canonical pattern from litestar-fullstack-spa/src/py/app/server/plugins.py uses lazy initialization and use_server_lifespan=True so workers share the app's lifespan with the web process:
# Branch A — SAQ with Redis as the broker (pick when Redis is already in-stack
# for cache / sessions, or when you want the SAQ web UI + multi-queue fanout).
from litestar_saq import SAQConfig, SAQPlugin, QueueConfig, CronJob
from app.lib.settings import get_settings
def create_saq_plugin() -> SAQPlugin:
settings = get_settings()
return SAQPlugin(
config=SAQConfig(
dsn=settings.redis.url, # redis://... — Redis broker
use_server_lifespan=True, # workers run inside web process by default
web_enabled=settings.saq.web_enabled,
queue_configs=[
QueueConfig(
name="default",
tasks=["app.domain.system.tasks.send_email"],
scheduled_tasks=[
CronJob(
function="app.domain.system.tasks.cleanup_sessions",
cron="*/15 * * * *",
timeout=120,
),
],
),
],
),
)
saq_plugin = create_saq_plugin()
# Branch B — SAQ with PostgreSQL as the broker (pick when the project is
# PG-only, you want one less piece of infra, or throughput is moderate).
def create_saq_plugin_pg() -> SAQPlugin:
settings = get_settings()
return SAQPlugin(
config=SAQConfig(
dsn=settings.database.url, # postgresql+asyncpg://... — PG broker
use_server_lifespan=True,
web_enabled=settings.saq.web_enabled,
queue_configs=[
QueueConfig(
name="default",
tasks=["app.domain.system.tasks.send_email"],
),
],
),
)
Pick Branch A (SAQ + Redis) when: Redis is already in-stack (cache, sessions, Channels), you need multi-queue fanout across many workers, want the SAQ web UI and dead-letter dashboards, or have high throughput (>1k jobs/s per queue).
Pick Branch B (SAQ + PostgreSQL) when: PG-only deployment (Cloud SQL, AlloyDB, self-hosted single DB), avoiding extra infra, moderate throughput (<1k jobs/s), or you want jobs and business data in the same transactional boundary.
Pick Branch C (custom PG-native, no SAQ) when: you need pg_notify wake-ups with zero polling lag, FOR UPDATE SKIP LOCKED atomic task claim, or multi-target execution routing (local / cloudrun / immediate) — and you're willing to own a thin TaskService + WorkerPlugin pair. See below.
Anti-pattern: hard-coding dsn=settings.redis.url in a PG-only project just because Redis is the "default" example. Match the broker to the stack.
Some projects reject SAQ entirely in favor of a thin TaskService + WorkerPlugin pair directly over PostgreSQL. This wins when you want FOR UPDATE SKIP LOCKED for atomic task claiming, pg_notify wake-ups (no polling lag), and multi-target execution routing (local / cloudrun / immediate) with zero extra dependencies beyond your existing Postgres connection.
The canonical example is the dma/accelerator codebase, which implements this pattern throughout. The WorkerPlugin wires task discovery and the in-process worker into the Litestar app lifecycle; the @task decorator registers callables and optional cron schedules; TaskService.create_task persists tasks to a job table. Canonical references: dma/accelerator/src/py/dma/utils/worker/plugin.py:L57–224 and dma/accelerator/src/py/dma/lib/jobs.py:L792–903.
See references/postgresql-native.md for the full pattern.
# NOTE: do NOT use `from __future__ import annotations` in modules that define
# @task-decorated functions — the decorator inspects signatures at registration time.
from app.lib.worker.jobs import task
@task(cron="0 2 * * *", timeout=120)
async def nightly_cleanup() -> None:
"""Purge soft-deleted records every night at 02:00 UTC."""
...
@task(priority=5, retries=1, timeout=300, execution_target="cloudrun")
async def generate_report(*, report_id: int) -> None:
"""Export report — runs on Cloud Run for isolation."""
...
# Enqueue imperatively (from a handler or service):
await generate_report.enqueue(execution_target="cloudrun", report_id=42)
from litestar import Litestar
from app.server.plugins import saq_plugin
app = Litestar(
route_handlers=[...],
plugins=[saq_plugin],
)
# app/domain/system/tasks.py
async def send_email(ctx: dict, *, recipient: str, subject: str, body: str) -> None:
"""Send an email as a background job.
Args:
ctx: SAQ context dict (queue, job, app-state).
recipient: To address.
subject: Email subject.
body: Email body.
"""
email_service = ctx["state"]["email_service"]
await email_service.send(recipient, subject, body)
from litestar import Controller, post
from litestar_saq import TaskQueues
class NotificationController(Controller):
path = "/api/notifications"
@post("/")
async def queue_notification(
self,
data: NotificationCreate,
task_queues: TaskQueues,
) -> dict[str, str]:
queue = task_queues.get("default")
await queue.enqueue(
"send_email",
recipient=data.email,
subject=data.subject,
body=data.body,
timeout=30,
retries=2,
key=f"notify-{data.email}",
)
return {"status": "queued"}
# Run workers (uses the same Litestar app)
litestar --app app:app workers run
# Run workers in a separate process (production)
litestar --app app:app workers run --process
# Inspect queues
litestar --app app:app workers status
When web_enabled=True, the SAQ web UI is mounted under the Litestar app for queue introspection and job retry.
| Option | Default | Use |
|---|---|---|
timeout | None | Always set — bound how long a job can run |
retries | 0 | Retry count on exception |
ttl | 600 | Seconds to retain result after completion |
key | None | Deduplication key — skip if already queued |
heartbeat | 0 | Heartbeat interval for long-running jobs |
scheduled | 0 | Unix timestamp to delay start |
pip install litestar-saq
Build QueueConfig instances for each logical queue ("default", "emails", "reports"). Reference task functions by dotted path; the plugin imports them at startup.
Wrap QueueConfigs in SAQConfig with the broker DSN. Pick Redis (redis://...) when Redis is already in the stack; pick PostgreSQL (postgresql+asyncpg://...) when the project is PG-only. Both brokers are fully supported — see Plugin Setup above for both patterns. Set use_server_lifespan=True so workers run inside the web process by default. Toggle web_enabled for the introspection UI.
Place task functions in app/domain/<domain>/tasks.py. First arg ctx: dict, rest keyword-only. Pull shared resources (DB, HTTP client, email service) from ctx["state"].
Add CronJob entries to QueueConfig.scheduled_tasks for recurring work. Always set timeout. Do not use external cron tools for work that belongs in the queue.
Inject TaskQueues into route handlers. Use task_queues.get("name") then await queue.enqueue("task_name", ...). Use key= for deduplication.
For real-time updates after a job completes, publish to Litestar Channels from inside the task. See ../litestar/references/websockets.md.
For dev: litestar run (workers + web in one process via use_server_lifespan=True).
For production: litestar workers run --process separately from litestar run.
litestar-saq, not raw SAQ, in Litestar apps — the plugin handles DI, lifespan, CLI, and the web UI. Raw SAQ misses all of that.timeout on tasks and CronJobs — default is no timeout; a hung task pins a worker slot forever.heartbeat for jobs that run longer than ~30s, otherwise SAQ may mark them stuck and re-queue.TaskQueues via DI — don't import a global queue inside handlers. The plugin owns the queue lifecycle.CronJob for scheduled work — not external cron. CronJobs participate in retries, timeouts, and observability.key= for deduplication — same logical job (per-user sync, per-resource refresh) should not stack.use_server_lifespan=True for dev and small-to-mid apps (workers inside the web process). Switch to --process for high-throughput production.../litestar/references/websockets.md.ctx["state"], not module-level globals — keeps tests deterministic and supports per-worker init.pg_notify wake-ups, FOR UPDATE SKIP LOCKED atomic claim, or execution-target routing across Cloud Run / local. For every other PG-only case, SAQ+PG is the simpler default. See references/postgresql-native.md.Before delivering Litestar + SAQ code, verify:
SAQPlugin is in app.pluginsSAQConfig.use_server_lifespan is set explicitlyQueueConfig lists tasks by dotted path; the imports resolvectx: dict as the first positional arg, keyword-only params after *timeout setheartbeat settimeout and a sensible cron expressionTaskQueues, not module globalskey= where applicablelitestar workers run --process)Task: A Litestar app with a default queue, an email task, a cleanup CronJob, and a handler that enqueues notifications. This example uses Redis as the SAQ broker; swap dsn=settings.redis.url for dsn=settings.database.url if the project is PG-only — see Quick Reference above for both patterns.
# app/server/plugins.py
from litestar_saq import SAQConfig, SAQPlugin, QueueConfig, CronJob
from app.lib.settings import get_settings
def create_saq_plugin() -> SAQPlugin:
settings = get_settings()
return SAQPlugin(
config=SAQConfig(
dsn=settings.redis.url, # Redis broker — swap for settings.database.url in PG-only stacks
use_server_lifespan=True,
web_enabled=settings.saq.web_enabled,
queue_configs=[
QueueConfig(
name="default",
tasks=[
"app.domain.system.tasks.send_email",
"app.domain.system.tasks.cleanup_sessions",
],
scheduled_tasks=[
CronJob(
function="app.domain.system.tasks.cleanup_sessions",
cron="*/15 * * * *",
timeout=120,
),
],
),
],
),
)
saq_plugin = create_saq_plugin()
# app/domain/system/tasks.py
async def send_email(ctx: dict, *, recipient: str, subject: str, body: str) -> None:
"""Send an email as a background job."""
email = ctx["state"]["email_service"]
await email.send(recipient, subject, body)
async def cleanup_sessions(ctx: dict) -> None:
"""Purge expired sessions every 15 minutes."""
db = ctx["state"]["db"]
await db.execute("DELETE FROM session WHERE expires_at < now()")
# app/domain/notifications/controllers.py
from litestar import Controller, post
from litestar_saq import TaskQueues
from app.domain.notifications.schemas import NotificationCreate
class NotificationController(Controller):
path = "/api/notifications"
tags = ["Notifications"]
@post("/")
async def queue_notification(
self,
data: NotificationCreate,
task_queues: TaskQueues,
) -> dict[str, str]:
queue = task_queues.get("default")
await queue.enqueue(
"send_email",
recipient=data.email,
subject=data.subject,
body=data.body,
timeout=30,
retries=2,
key=f"notify-{data.email}",
)
return {"status": "queued"}
# app.py
from litestar import Litestar
from app.domain.notifications.controllers import NotificationController
from app.server.plugins import saq_plugin
app = Litestar(
route_handlers=[NotificationController],
plugins=[saq_plugin],
)
# Dev: workers + web in one process
litestar --app app:app run
# Prod: separate worker process
litestar --app app:app workers run --process
</example>