Auto-activate for saq imports, SAQ task queue configuration. SAQ (Simple Async Queue): async task queues, background jobs, cron scheduling, worker lifecycle. Produces SAQ task definitions, Worker setup, CronJob scheduling, and queue configuration. Use when: defining background tasks, enqueueing jobs, scheduling cron work, or managing worker lifecycle. Not for Celery, RQ, or Dramatiq -- SAQ has its own async-native patterns. For Litestar integration (SAQPlugin, DI, web UI, CLI), see flow:litestar.
From flownpx claudepluginhub cofin/flow --plugin flowThis skill uses the workspace's default tool permissions.
references/patterns.mdSearches, retrieves, and installs Agent Skills from prompts.chat registry using MCP tools like search_skills and get_skill. Activates for finding skills, browsing catalogs, or extending Claude.
Searches prompts.chat for AI prompt templates by keyword or category, retrieves by ID with variable handling, and improves prompts via AI. Use for discovering or enhancing prompts.
Compares coding agents like Claude Code and Aider on custom YAML-defined codebase tasks using git worktrees, measuring pass rate, cost, time, and consistency.
SAQ is a lightweight async task queue built on asyncio. Supports Redis and Postgres backends. Designed for simplicity with async-native patterns — no separate broker process, no class-based tasks, just plain async functions.
T | None (not Optional[T])from __future__ import annotationsasync defctx)from saq import Queue
# Redis backend
queue = Queue.from_url("redis://localhost")
# Postgres backend
queue = Queue.from_url("postgresql+asyncpg://user:pass@localhost/db")
async def send_email(ctx: dict, *, recipient: str, subject: str, body: str) -> None:
"""Send an email as a background task.
Args:
ctx: SAQ context dict (contains queue, job, and custom startup keys).
recipient: Email recipient address.
subject: Email subject line.
body: Email body content.
"""
mailer = ctx["mailer"] # injected via startup hook
await mailer.send(recipient, subject, body)
# Fire and forget
await queue.enqueue("send_email", recipient="user@example.com", subject="Hello", body="World")
# Enqueue and wait for result
result = await queue.apply("send_email", recipient="user@example.com", subject="Hello", body="World")
# With job options
await queue.enqueue(
"send_email",
recipient="user@example.com",
subject="Hello",
body="World",
timeout=30,
retries=3,
ttl=3600,
key="email-user@example.com", # deduplication key
)
from saq import CronJob
# Run at the top of every hour
hourly_report = CronJob(
function=generate_report,
cron="0 * * * *",
timeout=300,
)
# Run every 15 minutes
health_check = CronJob(
function=check_health,
cron="*/15 * * * *",
timeout=60,
retries=1,
)
from saq import Worker
worker = Worker(
queue,
functions=[send_email, process_order, generate_report],
cron_jobs=[hourly_report, health_check],
concurrency=10,
startup=startup_hook,
shutdown=shutdown_hook,
before_process=before_process_hook,
after_process=after_process_hook,
)
# Run the worker (blocks)
import asyncio
asyncio.run(worker.start())
| Option | Type | Default | Description |
|---|---|---|---|
timeout | int | None | Seconds before job times out. Always set this. |
retries | int | 0 | Number of retry attempts on failure |
ttl | int | 600 | Seconds to retain result after completion |
key | str | None | Deduplication key — skip if a job with this key is already queued/active |
heartbeat | int | 0 | Seconds between heartbeat updates (use for long-running jobs) |
scheduled | int | 0 | Unix timestamp to delay job start |
queued → active → complete
→ failed
→ aborted
The ctx dict passed to every task contains:
ctx["queue"] — the Queue instancectx["job"] — the current Job objectstartup hook (e.g., ctx["db"], ctx["mailer"])Write async def functions with ctx: dict as the first positional arg and all task parameters as keyword-only args (after *). Keep task functions focused — each task does one thing.
Create a Queue using Queue.from_url() with your Redis or Postgres DSN. Store the queue instance where it can be shared across your app (module-level, app state, or DI container).
Write startup and shutdown hooks to initialize and clean up shared resources (DB pools, HTTP clients, mailers). Attach resources to ctx in startup so all tasks can access them.
Wrap any recurring work in CronJob instances with explicit cron expressions and timeouts. Do not use external cron tools (crontab, Kubernetes CronJob) for work that belongs in the queue.
Instantiate Worker with the queue, task functions, cron jobs, concurrency limit, and lifecycle hooks. Run with asyncio.run(worker.start()) or integrate into your process manager.
Call queue.enqueue() for fire-and-forget or queue.apply() when you need the result. Use the key parameter for natural deduplication (e.g., per-user jobs that should not stack).
timeout — the default is no timeout. A hung task will block a worker slot forever.heartbeat for long-running jobs — without heartbeat, SAQ may mark a long-active job as stuck and re-queue it. Set heartbeat to roughly 1/3 of expected runtime.CronJob for scheduled work — do not schedule SAQ tasks from external cron tools. CronJobs are managed by the worker and participate in the job lifecycle (retries, timeouts, observability).ctx — SAQ injects the context dict as the first positional argument. Keyword-only task params come after *.await worker.stop() on SIGTERM/SIGINT. Abrupt process kills can leave jobs stranded in active state.key for deduplication — if the same logical job can be enqueued multiple times (e.g., per-user sync), set a stable key to prevent stacking.concurrency — default is 10. Lower for CPU/memory-intensive tasks, higher for I/O-bound tasks. Consider backend connection pool sizes.startup) for shared resources like DB pools and HTTP clients.Before delivering SAQ code, verify:
async def with ctx: dict as the first positional arg*)timeout is set on all long-running jobs and CronJob definitionsheartbeat is set for jobs that run longer than ~30 secondsstartup hook and attached to ctxCronJob is used for scheduled/recurring work (not external cron)key is used where job deduplication is neededTask: Background email sender with startup hook, cron health check, and deduplication.
import asyncio
from saq import CronJob, Queue, Worker
# --- Shared queue (module-level) ---
queue = Queue.from_url("redis://localhost")
# --- Lifecycle hooks ---
async def startup(ctx: dict) -> None:
"""Initialize shared resources and attach to context."""
# Example: async HTTP client for sending email
import httpx
ctx["http"] = httpx.AsyncClient()
async def shutdown(ctx: dict) -> None:
"""Clean up shared resources."""
await ctx["http"].aclose()
# --- Task definitions ---
async def send_welcome_email(ctx: dict, *, user_id: int, email: str) -> None:
"""Send a welcome email to a new user.
Args:
ctx: SAQ context dict.
user_id: ID of the new user.
email: Recipient email address.
"""
http: httpx.AsyncClient = ctx["http"]
await http.post(
"https://api.email-provider.com/send",
json={"to": email, "template": "welcome", "user_id": user_id},
)
async def process_export(ctx: dict, *, export_id: int) -> dict:
"""Process a data export job.
Args:
ctx: SAQ context dict.
export_id: ID of the export record to process.
Returns:
Dict with export result metadata.
"""
# Long-running — heartbeat prevents SAQ from marking it stuck
job = ctx["job"]
# ... processing logic ...
return {"export_id": export_id, "rows": 1000}
async def check_queue_health(ctx: dict) -> None:
"""Scheduled health check — logs queue stats."""
q: Queue = ctx["queue"]
info = await q.info()
print(f"Queue stats: {info}")
# --- CronJob ---
health_check = CronJob(
function=check_queue_health,
cron="*/5 * * * *",
timeout=30,
)
# --- Worker ---
worker = Worker(
queue,
functions=[send_welcome_email, process_export],
cron_jobs=[health_check],
concurrency=10,
startup=startup,
shutdown=shutdown,
)
# --- Enqueueing from application code ---
async def on_user_created(user_id: int, email: str) -> None:
await queue.enqueue(
"send_welcome_email",
user_id=user_id,
email=email,
timeout=30,
retries=2,
key=f"welcome-{user_id}", # deduplicate: only one welcome email per user
)
async def start_export(export_id: int) -> None:
await queue.enqueue(
"process_export",
export_id=export_id,
timeout=600,
heartbeat=120, # update heartbeat every 2 minutes
key=f"export-{export_id}",
)
if __name__ == "__main__":
asyncio.run(worker.start())
</example>
For detailed guides and patterns, refer to the following documents in references/:
flow:litestar → litestar-saq section.