From aradotso-trending-skills-37
Deploys PgQue: zero-bloat Postgres queue using PL/pgSQL snapshots, TRUNCATE rotation for fan-out events. Covers install, ticker setup, API, roles on Postgres 14+.
npx claudepluginhub joshuarweaver/cascade-ai-ml-agents-misc-1 --plugin aradotso-trending-skills-37This skill uses the workspace's default tool permissions.
> Skill by [ara.so](https://ara.so) — Daily 2026 Skills collection.
Guides Next.js Cache Components and Partial Prerendering (PPR) with cacheComponents enabled. Implements 'use cache', cacheLife(), cacheTag(), revalidateTag(), static/dynamic optimization, and cache debugging.
Guides building MCP servers enabling LLMs to interact with external services via tools. Covers best practices, TypeScript/Node (MCP SDK), Python (FastMCP).
Generates original PNG/PDF visual art via design philosophy manifestos for posters, graphics, and static designs on user request.
Skill by ara.so — Daily 2026 Skills collection.
PgQue is a pure PL/pgSQL reimplementation of the battle-tested PgQ (Skype/Postgres) architecture. It uses snapshot-based batching and TRUNCATE-based table rotation instead of row-level locking, delivering zero dead-tuple bloat, predictable performance under sustained load, and native fan-out — all from a single SQL file on any Postgres 14+ instance including managed providers (RDS, Aurora, Cloud SQL, Supabase, Neon).
TRUNCATE, never DELETE. No dead tuples.pg_cron (recommended) or an external scheduler calling pgque.ticker() every second# Clone the repo
git clone https://github.com/NikolayS/pgque.git
cd pgque
# Install in a single transaction
PAGER=cat psql --no-psqlrc --single-transaction -d mydb -f sql/pgque.sql
Or inside a psql session:
begin;
\i sql/pgque.sql
commit;
-- Creates pg_cron jobs for ticker (every 1s) and maintenance (every 30s)
select pgque.start();
Run these externally on a schedule:
# Every 1 second
psql -d mydb -c "select pgque.ticker()"
# Every 30 seconds
psql -d mydb -c "select pgque.maint()"
Warning: Without a running ticker, consumers see nothing. Enqueue works, but no batches are created.
\i sql/pgque_uninstall.sql
| Role | Use |
|---|---|
pgque_reader | Dashboards, metrics, read-only |
pgque_writer | Producers and consumers (most apps) |
pgque_admin | Operators, migrations |
-- Grant producer/consumer access to app user
CREATE USER app_worker WITH PASSWORD '...';
GRANT pgque_writer TO app_worker;
-- Grant read-only metrics access
CREATE USER metrics_reader WITH PASSWORD '...';
GRANT pgque_reader TO metrics_reader;
SELECT pgque.create_queue('orders');
-- Register a named consumer on the queue
SELECT pgque.subscribe('orders', 'order-processor');
-- Send a single event (type, data)
SELECT pgque.send('orders', 'new_order', '{"order_id": 42, "amount": 99.99}');
-- Send a batch of events
SELECT pgque.send_batch('orders', ARRAY[
ROW('new_order', '{"order_id": 43}')::pgque.event_data,
ROW('new_order', '{"order_id": 44}')::pgque.event_data
]);
-- Receive next batch for a consumer (returns batch_id + events)
SELECT * FROM pgque.receive('orders', 'order-processor');
-- Acknowledge successful processing (batch_id from receive)
SELECT pgque.ack('orders', 'order-processor', :batch_id);
-- Negative-acknowledge (retry / dead-letter)
SELECT pgque.nack('orders', 'order-processor', :batch_id);
SELECT pgque.unsubscribe('orders', 'order-processor');
These map directly to the original PgQ primitives and are also available via pgque_writer:
-- Enqueue a single event
SELECT pgque.insert_event('orders', 'new_order', '{"order_id": 42}');
-- Register a consumer
SELECT pgque.register_consumer('orders', 'order-processor');
-- Get the next available batch ID
SELECT pgque.next_batch('orders', 'order-processor');
-- Returns: batch_id (bigint), or NULL if nothing ready
-- Fetch all events in a batch
SELECT * FROM pgque.get_batch_events(:batch_id);
-- Returns: ev_id, ev_time, ev_txid, ev_retry, ev_type, ev_data, ev_extra1..4
-- Mark batch as successfully processed
SELECT pgque.finish_batch(:batch_id);
-- Schedule an event for retry (with delay in seconds)
SELECT pgque.event_retry(:batch_id, :ev_id, 60); -- retry in 60s
-- Unregister consumer
SELECT pgque.unregister_consumer('orders', 'order-processor');
import psycopg2
import json
import os
conn = psycopg2.connect(os.environ["DATABASE_URL"])
conn.autocommit = False
def enqueue_order(order_id: int, amount: float):
with conn.cursor() as cur:
cur.execute(
"SELECT pgque.send(%s, %s, %s)",
("orders", "new_order", json.dumps({"order_id": order_id, "amount": amount}))
)
conn.commit()
enqueue_order(42, 99.99)
import psycopg2
import psycopg2.extras
import json
import os
import time
conn = psycopg2.connect(os.environ["DATABASE_URL"])
conn.autocommit = False
QUEUE = "orders"
CONSUMER = "order-processor"
def setup():
with conn.cursor() as cur:
cur.execute("SELECT pgque.subscribe(%s, %s)", (QUEUE, CONSUMER))
conn.commit()
def process_batch():
with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur:
cur.execute("SELECT * FROM pgque.receive(%s, %s)", (QUEUE, CONSUMER))
rows = cur.fetchall()
if not rows:
conn.rollback()
return False
batch_id = rows[0]["batch_id"]
for row in rows:
event = json.loads(row["ev_data"])
print(f"Processing order {event['order_id']}")
# ... your processing logic ...
with conn.cursor() as cur:
cur.execute("SELECT pgque.ack(%s, %s, %s)", (QUEUE, CONSUMER, batch_id))
conn.commit()
return True
setup()
while True:
if not process_batch():
time.sleep(1) # wait for next tick
-- One queue, multiple consumers each get ALL events independently
SELECT pgque.create_queue('user-events');
SELECT pgque.subscribe('user-events', 'analytics-service');
SELECT pgque.subscribe('user-events', 'notification-service');
SELECT pgque.subscribe('user-events', 'audit-log');
-- Producer sends once
SELECT pgque.send('user-events', 'user_signup', '{"user_id": 1}');
-- Each consumer independently receives the same event
SELECT * FROM pgque.receive('user-events', 'analytics-service');
SELECT * FROM pgque.receive('user-events', 'notification-service');
SELECT * FROM pgque.receive('user-events', 'audit-log');
-- Using low-level API with retry logic
DO $$
DECLARE
v_batch_id bigint;
v_ev record;
BEGIN
-- Get next batch
SELECT pgque.next_batch('orders', 'order-processor') INTO v_batch_id;
IF v_batch_id IS NULL THEN
RAISE NOTICE 'No batch available';
RETURN;
END IF;
-- Process each event
FOR v_ev IN SELECT * FROM pgque.get_batch_events(v_batch_id) LOOP
BEGIN
-- Attempt processing
RAISE NOTICE 'Processing event % type %', v_ev.ev_id, v_ev.ev_type;
-- On transient failure, retry after 30 seconds
-- SELECT pgque.event_retry(v_batch_id, v_ev.ev_id, 30);
EXCEPTION WHEN OTHERS THEN
-- Schedule retry
PERFORM pgque.event_retry(v_batch_id, v_ev.ev_id, 60);
RAISE NOTICE 'Event % queued for retry', v_ev.ev_id;
END;
END LOOP;
-- Finish batch (events not retried are acked)
PERFORM pgque.finish_batch(v_batch_id);
END;
$$;
-- Queue info (depth, consumer count, last tick)
SELECT * FROM pgque.get_queue_info();
SELECT * FROM pgque.get_queue_info('orders');
-- Consumer lag and position
SELECT * FROM pgque.get_consumer_info();
SELECT * FROM pgque.get_consumer_info('orders');
SELECT * FROM pgque.get_consumer_info('orders', 'order-processor');
-- Batch details
SELECT * FROM pgque.get_batch_info(:batch_id);
-- Version
SELECT pgque.version();
-- Default: ticker called every 1 second via pg_cron
-- To change tick interval, update the pg_cron job:
SELECT cron.alter_job(
job_id := (SELECT jobid FROM cron.job WHERE command LIKE '%pgque.ticker%'),
schedule := '* * * * *' -- every minute (coarser)
);
-- Force a tick right now without waiting for pg_cron
SELECT pgque.force_tick('orders');
-- or
SELECT pgque.ticker();
-- Disable run logging (ticker runs every second = 3600 rows/hour)
ALTER SYSTEM SET cron.log_run = off;
SELECT pg_reload_conf();
-- Or periodically purge:
SELECT cron.schedule('pgque-cron-purge', '0 * * * *',
$$DELETE FROM cron.job_run_details WHERE end_time < now() - interval '1 hour'$$
);
If pg_cron is in postgres DB but PgQue is in mydb:
-- Run from the pg_cron database (postgres)
SELECT cron.schedule_in_database(
'pgque-ticker', '* * * * *',
'SELECT pgque.ticker()', 'mydb'
);
SELECT cron.schedule_in_database(
'pgque-maint', '* * * * *',
'SELECT pgque.maint()', 'mydb'
);
-- Event is only enqueued if the whole transaction commits
BEGIN;
INSERT INTO orders (id, amount) VALUES (42, 99.99);
SELECT pgque.send('orders', 'new_order', '{"order_id": 42}');
COMMIT;
SELECT
queue_name,
ev_per_sec,
consumer_count,
pending_events
FROM pgque.get_queue_info()
WHERE pending_events > 1000;
SELECT
queue_name,
consumer_name,
pending_events AS lag,
last_seen
FROM pgque.get_consumer_info()
ORDER BY lag DESC;
Cause: Ticker is not running.
-- Check if ticker has fired recently
SELECT * FROM pgque.get_queue_info('orders');
-- Look at last_tick timestamp
-- Manually fire a tick
SELECT pgque.ticker();
-- Check pg_cron jobs exist
SELECT * FROM cron.job WHERE command LIKE '%pgque%';
-- Confirm ticker is running; force one
SELECT pgque.ticker();
-- Check queue exists
SELECT * FROM pgque.get_queue_info('orders');
-- Check consumer is registered
SELECT * FROM pgque.get_consumer_info('orders', 'my-consumer');
PgQue is immune to dead-tuple bloat in the event path by design. If you see VACUUM activity, it's from your own application tables, not from PgQue's queue tables.
-- Maintenance job handles retry scheduling
-- Make sure pgque.maint() is running every ~30s
SELECT pgque.maint();
-- Check for events in retry state
SELECT * FROM pgque.get_queue_info('orders');
Upgrade paths are still being stabilized. To safely reinstall:
psql -d mydb -c "\i sql/pgque_uninstall.sql"
psql -d mydb --single-transaction -f sql/pgque.sql
psql -d mydb -c "select pgque.start()"
Producer → pgque.send() → event tables (rotating)
↓
pgque.ticker() ←── pg_cron (every 1s)
↓
batch snapshot created
↓
Consumer A → pgque.receive() → batch events → pgque.ack()
Consumer B → pgque.receive() → same batch → pgque.ack()
Consumer C → pgque.receive() → same batch → pgque.ack()
↓
pgque.maint() → TRUNCATE old tables
(zero dead tuples)