pgque-postgres-queue

Compare original and translation side by side

🇺🇸

Original

English
🇨🇳

Translation

Chinese

PgQue – Zero-Bloat Postgres Queue

PgQue——零膨胀Postgres队列

Skill by ara.so — Daily 2026 Skills collection.
PgQue is a pure PL/pgSQL reimplementation of the battle-tested PgQ (Skype/Postgres) architecture. It uses snapshot-based batching and TRUNCATE-based table rotation instead of row-level locking, delivering zero dead-tuple bloat, predictable performance under sustained load, and native fan-out — all from a single SQL file on any Postgres 14+ instance including managed providers (RDS, Aurora, Cloud SQL, Supabase, Neon).
ara.so提供的技能——2026每日技能合集。
PgQue是经过实战检验的PgQ(Skype/Postgres)架构的纯PL/pgSQL重实现。它采用基于快照的批处理基于TRUNCATE的表轮转替代行级锁,实现了零死元组膨胀、持续负载下的可预测性能,以及原生扇出功能——所有这些仅需一个SQL文件,即可在任何Postgres 14+实例上运行,包括托管服务提供商(RDS、Aurora、Cloud SQL、Supabase、Neon)。

Key Concepts

核心概念

  • Tick: A periodic snapshot that closes a batch of events. Nothing is delivered until a tick fires.
  • Batch: A group of events captured in one tick, consumed atomically by a subscriber.
  • Subscriber/Consumer: A named cursor on the event log. Multiple consumers get independent copies of every batch (fan-out).
  • Zero bloat: Events are stored in rotating tables and cleared via
    TRUNCATE
    , never
    DELETE
    . No dead tuples.
  • Latency trade-off: End-to-end delivery is ~1–2 s (one tick interval + poll). Per-call function latency is microseconds.
  • Tick:定期生成的快照,用于结束一批事件。只有当Tick触发时,事件才会被投递。
  • Batch(批处理):一次Tick中捕获的一组事件,由订阅者原子性地消费。
  • Subscriber/Consumer(订阅者/消费者):事件日志上的命名游标。多个消费者会获取每批事件的独立副本(扇出功能)。
  • 零膨胀:事件存储在轮转表中,并通过
    TRUNCATE
    清理,从不使用
    DELETE
    。无死元组产生。
  • 延迟权衡:端到端投递延迟约1-2秒(一个Tick间隔 + 轮询时间)。单次调用函数的延迟为微秒级。

Installation

安装

Requirements

要求

  • Postgres 14+
  • pg_cron
    (recommended) or an external scheduler calling
    pgque.ticker()
    every second
  • Postgres 14+
  • pg_cron
    (推荐)外部调度器每秒调用一次
    pgque.ticker()

Install from SQL file

从SQL文件安装

bash
undefined
bash
undefined

Clone the repo

Clone the repo

Install in a single transaction

Install in a single transaction

PAGER=cat psql --no-psqlrc --single-transaction -d mydb -f sql/pgque.sql

Or inside a `psql` session:

```sql
begin;
\i sql/pgque.sql
commit;
PAGER=cat psql --no-psqlrc --single-transaction -d mydb -f sql/pgque.sql

或者在`psql`会话中执行:

```sql
begin;
\i sql/pgque.sql
commit;

Start the ticker (pg_cron)

启动Ticker(使用pg_cron)

sql
-- Creates pg_cron jobs for ticker (every 1s) and maintenance (every 30s)
select pgque.start();
sql
-- 创建pg_cron任务,用于Ticker(每秒一次)和维护(每30秒一次)
select pgque.start();

Start the ticker (without pg_cron)

启动Ticker(不使用pg_cron)

Run these externally on a schedule:
bash
undefined
在外部按调度执行以下命令:
bash
undefined

Every 1 second

每秒执行一次

psql -d mydb -c "select pgque.ticker()"
psql -d mydb -c "select pgque.ticker()"

Every 30 seconds

每30秒执行一次

psql -d mydb -c "select pgque.maint()"

> **Warning**: Without a running ticker, consumers see nothing. Enqueue works, but no batches are created.
psql -d mydb -c "select pgque.maint()"

> **警告**:如果Ticker未运行,消费者将无法收到任何内容。入队操作可以正常执行,但不会创建批处理。

Uninstall

卸载

sql
\i sql/pgque_uninstall.sql
sql
\i sql/pgque_uninstall.sql

Roles & Grants

角色与权限

RoleUse
pgque_reader
Dashboards, metrics, read-only
pgque_writer
Producers and consumers (most apps)
pgque_admin
Operators, migrations
sql
-- Grant producer/consumer access to app user
CREATE USER app_worker WITH PASSWORD '...';
GRANT pgque_writer TO app_worker;

-- Grant read-only metrics access
CREATE USER metrics_reader WITH PASSWORD '...';
GRANT pgque_reader TO metrics_reader;
角色用途
pgque_reader
仪表盘、指标、只读访问
pgque_writer
生产者和消费者(大多数应用)
pgque_admin
运维人员、迁移操作
sql
-- 为应用用户授予生产者/消费者权限
CREATE USER app_worker WITH PASSWORD '...';
GRANT pgque_writer TO app_worker;

-- 授予只读指标访问权限
CREATE USER metrics_reader WITH PASSWORD '...';
GRANT pgque_reader TO metrics_reader;

Core API (Modern Style)

核心API(现代风格)

Create a Queue

创建队列

sql
SELECT pgque.create_queue('orders');
sql
SELECT pgque.create_queue('orders');

Subscribe a Consumer

订阅消费者

sql
-- Register a named consumer on the queue
SELECT pgque.subscribe('orders', 'order-processor');
sql
-- 在队列上注册一个命名消费者
SELECT pgque.subscribe('orders', 'order-processor');

Send Events (Enqueue)

发送事件(入队)

sql
-- Send a single event (type, data)
SELECT pgque.send('orders', 'new_order', '{"order_id": 42, "amount": 99.99}');

-- Send a batch of events
SELECT pgque.send_batch('orders', ARRAY[
  ROW('new_order', '{"order_id": 43}')::pgque.event_data,
  ROW('new_order', '{"order_id": 44}')::pgque.event_data
]);
sql
-- 发送单个事件(类型、数据)
SELECT pgque.send('orders', 'new_order', '{"order_id": 42, "amount": 99.99}');

-- 发送一批事件
SELECT pgque.send_batch('orders', ARRAY[
  ROW('new_order', '{"order_id": 43}')::pgque.event_data,
  ROW('new_order', '{"order_id": 44}')::pgque.event_data
]);

Receive and Acknowledge Events

接收并确认事件

sql
-- Receive next batch for a consumer (returns batch_id + events)
SELECT * FROM pgque.receive('orders', 'order-processor');

-- Acknowledge successful processing (batch_id from receive)
SELECT pgque.ack('orders', 'order-processor', :batch_id);

-- Negative-acknowledge (retry / dead-letter)
SELECT pgque.nack('orders', 'order-processor', :batch_id);
sql
-- 为消费者接收下一批事件(返回batch_id + 事件)
SELECT * FROM pgque.receive('orders', 'order-processor');

-- 确认处理成功(使用receive返回的batch_id)
SELECT pgque.ack('orders', 'order-processor', :batch_id);

-- 否定确认(重试/死信)
SELECT pgque.nack('orders', 'order-processor', :batch_id);

Unsubscribe

取消订阅

sql
SELECT pgque.unsubscribe('orders', 'order-processor');
sql
SELECT pgque.unsubscribe('orders', 'order-processor');

Low-Level PgQ API

底层PgQ API

These map directly to the original PgQ primitives and are also available via
pgque_writer
:
sql
-- Enqueue a single event
SELECT pgque.insert_event('orders', 'new_order', '{"order_id": 42}');

-- Register a consumer
SELECT pgque.register_consumer('orders', 'order-processor');

-- Get the next available batch ID
SELECT pgque.next_batch('orders', 'order-processor');
-- Returns: batch_id (bigint), or NULL if nothing ready

-- Fetch all events in a batch
SELECT * FROM pgque.get_batch_events(:batch_id);
-- Returns: ev_id, ev_time, ev_txid, ev_retry, ev_type, ev_data, ev_extra1..4

-- Mark batch as successfully processed
SELECT pgque.finish_batch(:batch_id);

-- Schedule an event for retry (with delay in seconds)
SELECT pgque.event_retry(:batch_id, :ev_id, 60);  -- retry in 60s

-- Unregister consumer
SELECT pgque.unregister_consumer('orders', 'order-processor');
这些API直接映射到原始PgQ原语,
pgque_writer
角色也可访问:
sql
-- 入队单个事件
SELECT pgque.insert_event('orders', 'new_order', '{"order_id": 42}');

-- 注册消费者
SELECT pgque.register_consumer('orders', 'order-processor');

-- 获取下一个可用的批次ID
SELECT pgque.next_batch('orders', 'order-processor');
-- 返回值:batch_id(bigint),若无可用批次则返回NULL

-- 获取批次中的所有事件
SELECT * FROM pgque.get_batch_events(:batch_id);
-- 返回值:ev_id, ev_time, ev_txid, ev_retry, ev_type, ev_data, ev_extra1..4

-- 标记批次处理成功
SELECT pgque.finish_batch(:batch_id);

-- 安排事件重试(延迟时间单位为秒)
SELECT pgque.event_retry(:batch_id, :ev_id, 60);  -- 60秒后重试

-- 注销消费者
SELECT pgque.unregister_consumer('orders', 'order-processor');

Complete Working Example

完整工作示例

Producer (Python with psycopg2)

生产者(使用psycopg2的Python代码)

python
import psycopg2
import json
import os

conn = psycopg2.connect(os.environ["DATABASE_URL"])
conn.autocommit = False

def enqueue_order(order_id: int, amount: float):
    with conn.cursor() as cur:
        cur.execute(
            "SELECT pgque.send(%s, %s, %s)",
            ("orders", "new_order", json.dumps({"order_id": order_id, "amount": amount}))
        )
    conn.commit()

enqueue_order(42, 99.99)
python
import psycopg2
import json
import os

conn = psycopg2.connect(os.environ["DATABASE_URL"])
conn.autocommit = False

def enqueue_order(order_id: int, amount: float):
    with conn.cursor() as cur:
        cur.execute(
            "SELECT pgque.send(%s, %s, %s)",
            ("orders", "new_order", json.dumps({"order_id": order_id, "amount": amount}))
        )
    conn.commit()

enqueue_order(42, 99.99)

Consumer (Python with psycopg2)

消费者(使用psycopg2的Python代码)

python
import psycopg2
import psycopg2.extras
import json
import os
import time

conn = psycopg2.connect(os.environ["DATABASE_URL"])
conn.autocommit = False

QUEUE = "orders"
CONSUMER = "order-processor"

def setup():
    with conn.cursor() as cur:
        cur.execute("SELECT pgque.subscribe(%s, %s)", (QUEUE, CONSUMER))
    conn.commit()

def process_batch():
    with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur:
        cur.execute("SELECT * FROM pgque.receive(%s, %s)", (QUEUE, CONSUMER))
        rows = cur.fetchall()

    if not rows:
        conn.rollback()
        return False

    batch_id = rows[0]["batch_id"]
    for row in rows:
        event = json.loads(row["ev_data"])
        print(f"Processing order {event['order_id']}")
        # ... your processing logic ...

    with conn.cursor() as cur:
        cur.execute("SELECT pgque.ack(%s, %s, %s)", (QUEUE, CONSUMER, batch_id))
    conn.commit()
    return True

setup()
while True:
    if not process_batch():
        time.sleep(1)  # wait for next tick
python
import psycopg2
import psycopg2.extras
import json
import os
import time

conn = psycopg2.connect(os.environ["DATABASE_URL"])
conn.autocommit = False

QUEUE = "orders"
CONSUMER = "order-processor"

def setup():
    with conn.cursor() as cur:
        cur.execute("SELECT pgque.subscribe(%s, %s)", (QUEUE, CONSUMER))
    conn.commit()

def process_batch():
    with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur:
        cur.execute("SELECT * FROM pgque.receive(%s, %s)", (QUEUE, CONSUMER))
        rows = cur.fetchall()

    if not rows:
        conn.rollback()
        return False

    batch_id = rows[0]["batch_id"]
    for row in rows:
        event = json.loads(row["ev_data"])
        print(f"Processing order {event['order_id']}")
        # ... 你的处理逻辑 ...

    with conn.cursor() as cur:
        cur.execute("SELECT pgque.ack(%s, %s, %s)", (QUEUE, CONSUMER, batch_id))
    conn.commit()
    return True

setup()
while True:
    if not process_batch():
        time.sleep(1)  # 等待下一个Tick

Fan-Out Example (Multiple Independent Consumers)

扇出示例(多个独立消费者)

sql
-- One queue, multiple consumers each get ALL events independently
SELECT pgque.create_queue('user-events');

SELECT pgque.subscribe('user-events', 'analytics-service');
SELECT pgque.subscribe('user-events', 'notification-service');
SELECT pgque.subscribe('user-events', 'audit-log');

-- Producer sends once
SELECT pgque.send('user-events', 'user_signup', '{"user_id": 1}');

-- Each consumer independently receives the same event
SELECT * FROM pgque.receive('user-events', 'analytics-service');
SELECT * FROM pgque.receive('user-events', 'notification-service');
SELECT * FROM pgque.receive('user-events', 'audit-log');
sql
-- 一个队列,多个消费者各自独立获取所有事件
SELECT pgque.create_queue('user-events');

SELECT pgque.subscribe('user-events', 'analytics-service');
SELECT pgque.subscribe('user-events', 'notification-service');
SELECT pgque.subscribe('user-events', 'audit-log');

-- 生产者发送一次事件
SELECT pgque.send('user-events', 'user_signup', '{"user_id": 1}');

-- 每个消费者独立接收同一事件
SELECT * FROM pgque.receive('user-events', 'analytics-service');
SELECT * FROM pgque.receive('user-events', 'notification-service');
SELECT * FROM pgque.receive('user-events', 'audit-log');

Retry / Dead Letter Pattern

重试/死信模式

sql
-- Using low-level API with retry logic
DO $$
DECLARE
  v_batch_id bigint;
  v_ev       record;
BEGIN
  -- Get next batch
  SELECT pgque.next_batch('orders', 'order-processor') INTO v_batch_id;

  IF v_batch_id IS NULL THEN
    RAISE NOTICE 'No batch available';
    RETURN;
  END IF;

  -- Process each event
  FOR v_ev IN SELECT * FROM pgque.get_batch_events(v_batch_id) LOOP
    BEGIN
      -- Attempt processing
      RAISE NOTICE 'Processing event % type %', v_ev.ev_id, v_ev.ev_type;

      -- On transient failure, retry after 30 seconds
      -- SELECT pgque.event_retry(v_batch_id, v_ev.ev_id, 30);

    EXCEPTION WHEN OTHERS THEN
      -- Schedule retry
      PERFORM pgque.event_retry(v_batch_id, v_ev.ev_id, 60);
      RAISE NOTICE 'Event % queued for retry', v_ev.ev_id;
    END;
  END LOOP;

  -- Finish batch (events not retried are acked)
  PERFORM pgque.finish_batch(v_batch_id);
END;
$$;
sql
-- 使用底层API实现重试逻辑
DO $$
DECLARE
  v_batch_id bigint;
  v_ev       record;
BEGIN
  -- 获取下一个批次
  SELECT pgque.next_batch('orders', 'order-processor') INTO v_batch_id;

  IF v_batch_id IS NULL THEN
    RAISE NOTICE 'No batch available';
    RETURN;
  END IF;

  -- 处理每个事件
  FOR v_ev IN SELECT * FROM pgque.get_batch_events(v_batch_id) LOOP
    BEGIN
      -- 尝试处理
      RAISE NOTICE 'Processing event % type %', v_ev.ev_id, v_ev.ev_type;

      -- 如果是临时故障,30秒后重试
      -- SELECT pgque.event_retry(v_batch_id, v_ev.ev_id, 30);

    EXCEPTION WHEN OTHERS THEN
      -- 安排重试
      PERFORM pgque.event_retry(v_batch_id, v_ev.ev_id, 60);
      RAISE NOTICE 'Event % queued for retry', v_ev.ev_id;
    END;
  END LOOP;

  -- 完成批次处理(未重试的事件将被确认)
  PERFORM pgque.finish_batch(v_batch_id);
END;
$$;

Monitoring & Introspection

监控与自省

sql
-- Queue info (depth, consumer count, last tick)
SELECT * FROM pgque.get_queue_info();
SELECT * FROM pgque.get_queue_info('orders');

-- Consumer lag and position
SELECT * FROM pgque.get_consumer_info();
SELECT * FROM pgque.get_consumer_info('orders');
SELECT * FROM pgque.get_consumer_info('orders', 'order-processor');

-- Batch details
SELECT * FROM pgque.get_batch_info(:batch_id);

-- Version
SELECT pgque.version();
sql
-- 队列信息(深度、消费者数量、最后一次Tick时间)
SELECT * FROM pgque.get_queue_info();
SELECT * FROM pgque.get_queue_info('orders');

-- 消费者延迟与位置
SELECT * FROM pgque.get_consumer_info();
SELECT * FROM pgque.get_consumer_info('orders');
SELECT * FROM pgque.get_consumer_info('orders', 'order-processor');

-- 批次详情
SELECT * FROM pgque.get_batch_info(:batch_id);

-- 版本信息
SELECT pgque.version();

Configuration & Tuning

配置与调优

Tick Frequency

Tick频率

sql
-- Default: ticker called every 1 second via pg_cron
-- To change tick interval, update the pg_cron job:
SELECT cron.alter_job(
  job_id := (SELECT jobid FROM cron.job WHERE command LIKE '%pgque.ticker%'),
  schedule := '* * * * *'  -- every minute (coarser)
);
sql
-- 默认:通过pg_cron每秒调用一次ticker
-- 如需修改Tick间隔,更新pg_cron任务:
SELECT cron.alter_job(
  job_id := (SELECT jobid FROM cron.job WHERE command LIKE '%pgque.ticker%'),
  schedule := '* * * * *'  -- 每分钟一次(更低频率)
);

Force Immediate Tick (Testing/Demos)

强制立即触发Tick(测试/演示)

sql
-- Force a tick right now without waiting for pg_cron
SELECT pgque.force_tick('orders');
-- or
SELECT pgque.ticker();
sql
-- 无需等待pg_cron,立即触发一次Tick
SELECT pgque.force_tick('orders');
-- 或
SELECT pgque.ticker();

pg_cron Log Hygiene

pg_cron日志清理

sql
-- Disable run logging (ticker runs every second = 3600 rows/hour)
ALTER SYSTEM SET cron.log_run = off;
SELECT pg_reload_conf();

-- Or periodically purge:
SELECT cron.schedule('pgque-cron-purge', '0 * * * *',
  $$DELETE FROM cron.job_run_details WHERE end_time < now() - interval '1 hour'$$
);
sql
-- 禁用运行日志(ticker每秒运行一次 = 每小时3600条记录)
ALTER SYSTEM SET cron.log_run = off;
SELECT pg_reload_conf();

-- 或定期清理:
SELECT cron.schedule('pgque-cron-purge', '0 * * * *',
  $$DELETE FROM cron.job_run_details WHERE end_time < now() - interval '1 hour'$$
);

pg_cron in Different Database

在不同数据库中使用pg_cron

If pg_cron is in
postgres
DB but PgQue is in
mydb
:
sql
-- Run from the pg_cron database (postgres)
SELECT cron.schedule_in_database(
  'pgque-ticker', '* * * * *',
  'SELECT pgque.ticker()', 'mydb'
);
SELECT cron.schedule_in_database(
  'pgque-maint', '* * * * *',
  'SELECT pgque.maint()', 'mydb'
);
如果pg_cron安装在
postgres
数据库,但PgQue在
mydb
数据库:
sql
-- 在pg_cron所在数据库(postgres)中执行
SELECT cron.schedule_in_database(
  'pgque-ticker', '* * * * *',
  'SELECT pgque.ticker()', 'mydb'
);
SELECT cron.schedule_in_database(
  'pgque-maint', '* * * * *',
  'SELECT pgque.maint()', 'mydb'
);

Common Patterns

常见模式

Transactional Enqueue (Send with Business Logic)

事务性入队(与业务逻辑一起发送事件)

sql
-- Event is only enqueued if the whole transaction commits
BEGIN;
  INSERT INTO orders (id, amount) VALUES (42, 99.99);
  SELECT pgque.send('orders', 'new_order', '{"order_id": 42}');
COMMIT;
sql
-- 只有当整个事务提交时,事件才会被入队
BEGIN;
  INSERT INTO orders (id, amount) VALUES (42, 99.99);
  SELECT pgque.send('orders', 'new_order', '{"order_id": 42}');
COMMIT;

Queue Depth Check Before Scaling

扩缩容前检查队列深度

sql
SELECT
  queue_name,
  ev_per_sec,
  consumer_count,
  pending_events
FROM pgque.get_queue_info()
WHERE pending_events > 1000;
sql
SELECT
  queue_name,
  ev_per_sec,
  consumer_count,
  pending_events
FROM pgque.get_queue_info()
WHERE pending_events > 1000;

List All Consumers with Lag

列出所有存在延迟的消费者

sql
SELECT
  queue_name,
  consumer_name,
  pending_events AS lag,
  last_seen
FROM pgque.get_consumer_info()
ORDER BY lag DESC;
sql
SELECT
  queue_name,
  consumer_name,
  pending_events AS lag,
  last_seen
FROM pgque.get_consumer_info()
ORDER BY lag DESC;

Troubleshooting

故障排查

Consumers receive nothing

消费者接收不到任何内容

Cause: Ticker is not running.
sql
-- Check if ticker has fired recently
SELECT * FROM pgque.get_queue_info('orders');
-- Look at last_tick timestamp

-- Manually fire a tick
SELECT pgque.ticker();

-- Check pg_cron jobs exist
SELECT * FROM cron.job WHERE command LIKE '%pgque%';
原因:Ticker未运行。
sql
-- 检查Ticker最近是否触发过
SELECT * FROM pgque.get_queue_info('orders');
-- 查看last_tick时间戳

-- 手动触发一次Tick
SELECT pgque.ticker();

-- 检查pg_cron任务是否存在
SELECT * FROM cron.job WHERE command LIKE '%pgque%';

Events not appearing after send

发送事件后未显示

sql
-- Confirm ticker is running; force one
SELECT pgque.ticker();

-- Check queue exists
SELECT * FROM pgque.get_queue_info('orders');

-- Check consumer is registered
SELECT * FROM pgque.get_consumer_info('orders', 'my-consumer');
sql
-- 确认Ticker正在运行;强制触发一次
SELECT pgque.ticker();

-- 检查队列是否存在
SELECT * FROM pgque.get_queue_info('orders');

-- 检查消费者是否已注册
SELECT * FROM pgque.get_consumer_info('orders', 'my-consumer');

Performance / VACUUM pressure

性能/VACUUM压力

PgQue is immune to dead-tuple bloat in the event path by design. If you see VACUUM activity, it's from your own application tables, not from PgQue's queue tables.
PgQue设计上在事件处理路径中不受死元组膨胀影响。如果发现VACUUM活动,来自您自己的应用表,而非PgQue的队列表。

Retry events not reappearing

重试事件未重新出现

sql
-- Maintenance job handles retry scheduling
-- Make sure pgque.maint() is running every ~30s
SELECT pgque.maint();

-- Check for events in retry state
SELECT * FROM pgque.get_queue_info('orders');
sql
-- 维护任务负责调度重试
-- 确保pgque.maint()每30秒左右运行一次
SELECT pgque.maint();

-- 检查处于重试状态的事件
SELECT * FROM pgque.get_queue_info('orders');

Upgrade / Reinstall

升级/重新安装

Upgrade paths are still being stabilized. To safely reinstall:
bash
psql -d mydb -c "\i sql/pgque_uninstall.sql"
psql -d mydb --single-transaction -f sql/pgque.sql
psql -d mydb -c "select pgque.start()"
升级路径仍在稳定中。如需安全重新安装:
bash
psql -d mydb -c "\i sql/pgque_uninstall.sql"
psql -d mydb --single-transaction -f sql/pgque.sql
psql -d mydb -c "select pgque.start()"

Architecture Summary

架构概述

Producer → pgque.send() → event tables (rotating)
                           pgque.ticker()  ←── pg_cron (every 1s)
                           batch snapshot created
Consumer A → pgque.receive() → batch events → pgque.ack()
Consumer B → pgque.receive() → same batch  → pgque.ack()
Consumer C → pgque.receive() → same batch  → pgque.ack()
                           pgque.maint() → TRUNCATE old tables
                                           (zero dead tuples)
生产者 → pgque.send() → 事件表(轮转)
                           pgque.ticker()  ←── pg_cron(每秒一次)
                           创建批次快照
消费者A → pgque.receive() → 批次事件 → pgque.ack()
消费者B → pgque.receive() → 同一批次  → pgque.ack()
消费者C → pgque.receive() → 同一批次  → pgque.ack()
                           pgque.maint() → TRUNCATE旧表
                                           (无死元组)