pgque-postgres-queue

Expert skill for PgQue – a zero-bloat, snapshot-based Postgres queue using pure PL/pgSQL with no C extensions required.

22 stars

Best use case

pgque-postgres-queue is best used when you need a repeatable AI agent workflow instead of a one-off prompt.

Expert skill for PgQue – a zero-bloat, snapshot-based Postgres queue using pure PL/pgSQL with no C extensions required.

Teams using pgque-postgres-queue should expect a more consistent output, faster repeated execution, less prompt rewriting.

When to use this skill

  • You want a reusable workflow that can be run more than once with consistent structure.

When not to use this skill

  • You only need a quick one-off answer and do not need a reusable workflow.
  • You cannot install or maintain the underlying files, dependencies, or repository context.

Installation

Claude Code / Cursor / Codex

$curl -o ~/.claude/skills/pgque-postgres-queue/SKILL.md --create-dirs "https://raw.githubusercontent.com/Aradotso/trending-skills/main/skills/pgque-postgres-queue/SKILL.md"

Manual Installation

  1. Download SKILL.md from GitHub
  2. Place it in .claude/skills/pgque-postgres-queue/SKILL.md inside your project
  3. Restart your AI agent — it will auto-discover the skill

How pgque-postgres-queue Compares

Feature / Agentpgque-postgres-queueStandard Approach
Platform SupportNot specifiedLimited / Varies
Context Awareness High Baseline
Installation ComplexityUnknownN/A

Frequently Asked Questions

What does this skill do?

Expert skill for PgQue – a zero-bloat, snapshot-based Postgres queue using pure PL/pgSQL with no C extensions required.

Where can I find the source code?

You can find the source code on GitHub using the link provided at the top of the page.

SKILL.md Source

# PgQue – Zero-Bloat Postgres Queue

> Skill by [ara.so](https://ara.so) — Daily 2026 Skills collection.

PgQue is a pure PL/pgSQL reimplementation of the battle-tested PgQ (Skype/Postgres) architecture. It uses **snapshot-based batching** and **TRUNCATE-based table rotation** instead of row-level locking, delivering zero dead-tuple bloat, predictable performance under sustained load, and native fan-out — all from a single SQL file on any Postgres 14+ instance including managed providers (RDS, Aurora, Cloud SQL, Supabase, Neon).

## Key Concepts

- **Tick**: A periodic snapshot that closes a batch of events. Nothing is delivered until a tick fires.
- **Batch**: A group of events captured in one tick, consumed atomically by a subscriber.
- **Subscriber/Consumer**: A named cursor on the event log. Multiple consumers get independent copies of every batch (fan-out).
- **Zero bloat**: Events are stored in rotating tables and cleared via `TRUNCATE`, never `DELETE`. No dead tuples.
- **Latency trade-off**: End-to-end delivery is ~1–2 s (one tick interval + poll). Per-call function latency is microseconds.

## Installation

### Requirements
- Postgres 14+
- `pg_cron` (recommended) **or** an external scheduler calling `pgque.ticker()` every second

### Install from SQL file

```bash
# Clone the repo
git clone https://github.com/NikolayS/pgque.git
cd pgque

# Install in a single transaction
PAGER=cat psql --no-psqlrc --single-transaction -d mydb -f sql/pgque.sql
```

Or inside a `psql` session:

```sql
begin;
\i sql/pgque.sql
commit;
```

### Start the ticker (pg_cron)

```sql
-- Creates pg_cron jobs for ticker (every 1s) and maintenance (every 30s)
select pgque.start();
```

### Start the ticker (without pg_cron)

Run these externally on a schedule:

```bash
# Every 1 second
psql -d mydb -c "select pgque.ticker()"

# Every 30 seconds
psql -d mydb -c "select pgque.maint()"
```

> **Warning**: Without a running ticker, consumers see nothing. Enqueue works, but no batches are created.

### Uninstall

```sql
\i sql/pgque_uninstall.sql
```

## Roles & Grants

| Role | Use |
|---|---|
| `pgque_reader` | Dashboards, metrics, read-only |
| `pgque_writer` | Producers and consumers (most apps) |
| `pgque_admin` | Operators, migrations |

```sql
-- Grant producer/consumer access to app user
CREATE USER app_worker WITH PASSWORD '...';
GRANT pgque_writer TO app_worker;

-- Grant read-only metrics access
CREATE USER metrics_reader WITH PASSWORD '...';
GRANT pgque_reader TO metrics_reader;
```

## Core API (Modern Style)

### Create a Queue

```sql
SELECT pgque.create_queue('orders');
```

### Subscribe a Consumer

```sql
-- Register a named consumer on the queue
SELECT pgque.subscribe('orders', 'order-processor');
```

### Send Events (Enqueue)

```sql
-- Send a single event (type, data)
SELECT pgque.send('orders', 'new_order', '{"order_id": 42, "amount": 99.99}');

-- Send a batch of events
SELECT pgque.send_batch('orders', ARRAY[
  ROW('new_order', '{"order_id": 43}')::pgque.event_data,
  ROW('new_order', '{"order_id": 44}')::pgque.event_data
]);
```

### Receive and Acknowledge Events

```sql
-- Receive next batch for a consumer (returns batch_id + events)
SELECT * FROM pgque.receive('orders', 'order-processor');

-- Acknowledge successful processing (batch_id from receive)
SELECT pgque.ack('orders', 'order-processor', :batch_id);

-- Negative-acknowledge (retry / dead-letter)
SELECT pgque.nack('orders', 'order-processor', :batch_id);
```

### Unsubscribe

```sql
SELECT pgque.unsubscribe('orders', 'order-processor');
```

## Low-Level PgQ API

These map directly to the original PgQ primitives and are also available via `pgque_writer`:

```sql
-- Enqueue a single event
SELECT pgque.insert_event('orders', 'new_order', '{"order_id": 42}');

-- Register a consumer
SELECT pgque.register_consumer('orders', 'order-processor');

-- Get the next available batch ID
SELECT pgque.next_batch('orders', 'order-processor');
-- Returns: batch_id (bigint), or NULL if nothing ready

-- Fetch all events in a batch
SELECT * FROM pgque.get_batch_events(:batch_id);
-- Returns: ev_id, ev_time, ev_txid, ev_retry, ev_type, ev_data, ev_extra1..4

-- Mark batch as successfully processed
SELECT pgque.finish_batch(:batch_id);

-- Schedule an event for retry (with delay in seconds)
SELECT pgque.event_retry(:batch_id, :ev_id, 60);  -- retry in 60s

-- Unregister consumer
SELECT pgque.unregister_consumer('orders', 'order-processor');
```

## Complete Working Example

### Producer (Python with psycopg2)

```python
import psycopg2
import json
import os

conn = psycopg2.connect(os.environ["DATABASE_URL"])
conn.autocommit = False

def enqueue_order(order_id: int, amount: float):
    with conn.cursor() as cur:
        cur.execute(
            "SELECT pgque.send(%s, %s, %s)",
            ("orders", "new_order", json.dumps({"order_id": order_id, "amount": amount}))
        )
    conn.commit()

enqueue_order(42, 99.99)
```

### Consumer (Python with psycopg2)

```python
import psycopg2
import psycopg2.extras
import json
import os
import time

conn = psycopg2.connect(os.environ["DATABASE_URL"])
conn.autocommit = False

QUEUE = "orders"
CONSUMER = "order-processor"

def setup():
    with conn.cursor() as cur:
        cur.execute("SELECT pgque.subscribe(%s, %s)", (QUEUE, CONSUMER))
    conn.commit()

def process_batch():
    with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur:
        cur.execute("SELECT * FROM pgque.receive(%s, %s)", (QUEUE, CONSUMER))
        rows = cur.fetchall()

    if not rows:
        conn.rollback()
        return False

    batch_id = rows[0]["batch_id"]
    for row in rows:
        event = json.loads(row["ev_data"])
        print(f"Processing order {event['order_id']}")
        # ... your processing logic ...

    with conn.cursor() as cur:
        cur.execute("SELECT pgque.ack(%s, %s, %s)", (QUEUE, CONSUMER, batch_id))
    conn.commit()
    return True

setup()
while True:
    if not process_batch():
        time.sleep(1)  # wait for next tick
```

### Fan-Out Example (Multiple Independent Consumers)

```sql
-- One queue, multiple consumers each get ALL events independently
SELECT pgque.create_queue('user-events');

SELECT pgque.subscribe('user-events', 'analytics-service');
SELECT pgque.subscribe('user-events', 'notification-service');
SELECT pgque.subscribe('user-events', 'audit-log');

-- Producer sends once
SELECT pgque.send('user-events', 'user_signup', '{"user_id": 1}');

-- Each consumer independently receives the same event
SELECT * FROM pgque.receive('user-events', 'analytics-service');
SELECT * FROM pgque.receive('user-events', 'notification-service');
SELECT * FROM pgque.receive('user-events', 'audit-log');
```

### Retry / Dead Letter Pattern

```sql
-- Using low-level API with retry logic
DO $$
DECLARE
  v_batch_id bigint;
  v_ev       record;
BEGIN
  -- Get next batch
  SELECT pgque.next_batch('orders', 'order-processor') INTO v_batch_id;

  IF v_batch_id IS NULL THEN
    RAISE NOTICE 'No batch available';
    RETURN;
  END IF;

  -- Process each event
  FOR v_ev IN SELECT * FROM pgque.get_batch_events(v_batch_id) LOOP
    BEGIN
      -- Attempt processing
      RAISE NOTICE 'Processing event % type %', v_ev.ev_id, v_ev.ev_type;

      -- On transient failure, retry after 30 seconds
      -- SELECT pgque.event_retry(v_batch_id, v_ev.ev_id, 30);

    EXCEPTION WHEN OTHERS THEN
      -- Schedule retry
      PERFORM pgque.event_retry(v_batch_id, v_ev.ev_id, 60);
      RAISE NOTICE 'Event % queued for retry', v_ev.ev_id;
    END;
  END LOOP;

  -- Finish batch (events not retried are acked)
  PERFORM pgque.finish_batch(v_batch_id);
END;
$$;
```

## Monitoring & Introspection

```sql
-- Queue info (depth, consumer count, last tick)
SELECT * FROM pgque.get_queue_info();
SELECT * FROM pgque.get_queue_info('orders');

-- Consumer lag and position
SELECT * FROM pgque.get_consumer_info();
SELECT * FROM pgque.get_consumer_info('orders');
SELECT * FROM pgque.get_consumer_info('orders', 'order-processor');

-- Batch details
SELECT * FROM pgque.get_batch_info(:batch_id);

-- Version
SELECT pgque.version();
```

## Configuration & Tuning

### Tick Frequency

```sql
-- Default: ticker called every 1 second via pg_cron
-- To change tick interval, update the pg_cron job:
SELECT cron.alter_job(
  job_id := (SELECT jobid FROM cron.job WHERE command LIKE '%pgque.ticker%'),
  schedule := '* * * * *'  -- every minute (coarser)
);
```

### Force Immediate Tick (Testing/Demos)

```sql
-- Force a tick right now without waiting for pg_cron
SELECT pgque.force_tick('orders');
-- or
SELECT pgque.ticker();
```

### pg_cron Log Hygiene

```sql
-- Disable run logging (ticker runs every second = 3600 rows/hour)
ALTER SYSTEM SET cron.log_run = off;
SELECT pg_reload_conf();

-- Or periodically purge:
SELECT cron.schedule('pgque-cron-purge', '0 * * * *',
  $$DELETE FROM cron.job_run_details WHERE end_time < now() - interval '1 hour'$$
);
```

### pg_cron in Different Database

If pg_cron is in `postgres` DB but PgQue is in `mydb`:

```sql
-- Run from the pg_cron database (postgres)
SELECT cron.schedule_in_database(
  'pgque-ticker', '* * * * *',
  'SELECT pgque.ticker()', 'mydb'
);
SELECT cron.schedule_in_database(
  'pgque-maint', '* * * * *',
  'SELECT pgque.maint()', 'mydb'
);
```

## Common Patterns

### Transactional Enqueue (Send with Business Logic)

```sql
-- Event is only enqueued if the whole transaction commits
BEGIN;
  INSERT INTO orders (id, amount) VALUES (42, 99.99);
  SELECT pgque.send('orders', 'new_order', '{"order_id": 42}');
COMMIT;
```

### Queue Depth Check Before Scaling

```sql
SELECT
  queue_name,
  ev_per_sec,
  consumer_count,
  pending_events
FROM pgque.get_queue_info()
WHERE pending_events > 1000;
```

### List All Consumers with Lag

```sql
SELECT
  queue_name,
  consumer_name,
  pending_events AS lag,
  last_seen
FROM pgque.get_consumer_info()
ORDER BY lag DESC;
```

## Troubleshooting

### Consumers receive nothing

**Cause**: Ticker is not running.

```sql
-- Check if ticker has fired recently
SELECT * FROM pgque.get_queue_info('orders');
-- Look at last_tick timestamp

-- Manually fire a tick
SELECT pgque.ticker();

-- Check pg_cron jobs exist
SELECT * FROM cron.job WHERE command LIKE '%pgque%';
```

### Events not appearing after send

```sql
-- Confirm ticker is running; force one
SELECT pgque.ticker();

-- Check queue exists
SELECT * FROM pgque.get_queue_info('orders');

-- Check consumer is registered
SELECT * FROM pgque.get_consumer_info('orders', 'my-consumer');
```

### Performance / VACUUM pressure

PgQue is immune to dead-tuple bloat in the event path by design. If you see VACUUM activity, it's from your own application tables, not from PgQue's queue tables.

### Retry events not reappearing

```sql
-- Maintenance job handles retry scheduling
-- Make sure pgque.maint() is running every ~30s
SELECT pgque.maint();

-- Check for events in retry state
SELECT * FROM pgque.get_queue_info('orders');
```

### Upgrade / Reinstall

Upgrade paths are still being stabilized. To safely reinstall:

```bash
psql -d mydb -c "\i sql/pgque_uninstall.sql"
psql -d mydb --single-transaction -f sql/pgque.sql
psql -d mydb -c "select pgque.start()"
```

## Architecture Summary

```
Producer → pgque.send() → event tables (rotating)
                                    ↓
                           pgque.ticker()  ←── pg_cron (every 1s)
                                    ↓
                           batch snapshot created
                                    ↓
Consumer A → pgque.receive() → batch events → pgque.ack()
Consumer B → pgque.receive() → same batch  → pgque.ack()
Consumer C → pgque.receive() → same batch  → pgque.ack()
                                    ↓
                           pgque.maint() → TRUNCATE old tables
                                           (zero dead tuples)
```

Related Skills

xata-postgres-platform

22
from Aradotso/trending-skills

Expert skill for Xata open-source cloud-native Postgres platform with copy-on-write branching, scale-to-zero, and Kubernetes deployment

pgmicro-postgres-sqlite

22
from Aradotso/trending-skills

Use pgmicro — an in-process PostgreSQL reimplementation backed by SQLite-compatible storage, embeddable as a library or CLI

```markdown

22
from Aradotso/trending-skills

---

zeroboot-vm-sandbox

22
from Aradotso/trending-skills

Sub-millisecond VM sandboxes for AI agents using copy-on-write KVM forking via Zeroboot

yourvpndead-vpn-detection

22
from Aradotso/trending-skills

Android app that detects VPN/proxy servers (VLESS/xray/sing-box) via local SOCKS5 vulnerability, exposing exit IPs and server configs without root

x-mentor-skill-nuwa

22
from Aradotso/trending-skills

AI-powered X (Twitter) content strategy skill that distills methodologies from 6 top creators + open-source algorithm data into actionable writing, growth, and monetization guidance.

wx-favorites-report

22
from Aradotso/trending-skills

End-to-end pipeline to extract, decrypt, and visualize WeChat Mac favorites from encrypted SQLite DB into an interactive HTML report.

wterm-web-terminal

22
from Aradotso/trending-skills

Web terminal emulator with Zig/WASM core, DOM rendering, and React/vanilla JS bindings

worldmonitor-intelligence-dashboard

22
from Aradotso/trending-skills

Real-time global intelligence dashboard with AI-powered news aggregation, geopolitical monitoring, and infrastructure tracking

witr-process-inspector

22
from Aradotso/trending-skills

CLI and TUI tool that explains why processes, services, and ports are running by tracing causality chains across supervisors, containers, and shells.

wildworld-dataset

22
from Aradotso/trending-skills

WildWorld large-scale action-conditioned world modeling dataset with 108M+ frames from a photorealistic ARPG game, featuring per-frame annotations, 450+ actions, and explicit state information for generative world modeling research.

whatcable-macos-usb-inspector

22
from Aradotso/trending-skills

macOS menu bar app that identifies USB-C cable capabilities and charging diagnostics using IOKit