Basic Queue (Producer + Consumer)¶
This guide shows the smallest end-to-end setup: enqueue JSON work from a producer, process it with a consumer, and shut down cleanly.
It is intentionally "low level" (queue primitives), and complements the workflow-focused guide.
Prerequisites¶
- pgqrs installed
- A database backend selected (examples use SQLite for simplicity)
- Schema installed (
admin.install())
Setup¶
The snippets in this page focus on the consumer patterns (polling + interrupt).
They assume you already have:
store(connected + bootstrapped)producerandconsumer(orconsumer_a/consumer_b)
If you want fully runnable examples end-to-end, use the guide tests directly:
- Rust:
crates/pgqrs/tests/guide_tests.rs - Python:
py-pgqrs/tests/test_guides.py
Step 3: Create a Consumer and Poll¶
The consumer runs a poll loop that:
- dequeues up to
batch_sizemessages - calls your handler
- archives messages on success
- releases messages back to the queue on handler error
// Assumes `store` and `consumer` already exist.
let store_task = store.clone();
let consumer_task_handle = consumer.clone();
let consumer_task = tokio::spawn(async move {
pgqrs::dequeue()
.worker(&consumer_task_handle)
.batch(1)
.handle(|_msg| Box::pin(async { Ok(()) }))
.poll(&store_task)
.await
});
consumer.interrupt().await.unwrap();
let res = timeout(Duration::from_secs(15), consumer_task)
.await
.unwrap()
.unwrap();
assert!(matches!(res, Err(pgqrs::error::Error::Suspended { .. })));
assert_eq!(
consumer.status().await.unwrap(),
pgqrs::types::WorkerStatus::Suspended
);
More Patterns¶
Two common variations you can build on top of the basic consumer loop.
Handoff Between Consumers¶
Start consumer A, process one message, interrupt it, then start consumer B and confirm the next message is claimed by B.
// Assumes `store` and `consumer_a` already exist.
let store_a = store.clone();
let consumer_a_task_handle = consumer_a.clone();
let task_a = tokio::spawn(async move {
pgqrs::dequeue()
.worker(&consumer_a_task_handle)
.batch(5)
.handle_batch(|_msgs| Box::pin(async { Ok(()) }))
.poll(&store_a)
.await
});
// Assumes `consumer_a` and `task_a` exist.
consumer_a.interrupt().await.unwrap();
let res_a = timeout(Duration::from_secs(5), task_a)
.await
.unwrap()
.unwrap();
assert!(matches!(res_a, Err(pgqrs::error::Error::Suspended { .. })));
assert_eq!(
consumer_a.status().await.unwrap(),
pgqrs::types::WorkerStatus::Suspended
);
// Assumes `store` and `consumer_b` already exist.
let store_b = store.clone();
let consumer_b_task_handle = consumer_b.clone();
let task_b = tokio::spawn(async move {
pgqrs::dequeue()
.worker(&consumer_b_task_handle)
.batch(5)
.handle_batch(|_msgs| Box::pin(async { Ok(()) }))
.poll(&store_b)
.await
});
// Assumes `consumer_b` and `task_b` exist.
consumer_b.interrupt().await.unwrap();
let res_b = timeout(Duration::from_secs(5), task_b)
.await
.unwrap()
.unwrap();
assert!(matches!(res_b, Err(pgqrs::error::Error::Suspended { .. })));
assert_eq!(
consumer_b.status().await.unwrap(),
pgqrs::types::WorkerStatus::Suspended
);
task_a = asyncio.create_task(
pgqrs.dequeue()
.worker(consumer_a)
.batch(5)
.handle_batch(handle_batch)
.poll(store)
)
await consumer_a.interrupt()
with pytest.raises(Exception):
await asyncio.wait_for(task_a, timeout=5)
task_b = asyncio.create_task(
pgqrs.dequeue()
.worker(consumer_b)
.batch(5)
.handle_batch(handle_batch)
.poll(store)
)
await consumer_b.interrupt()
with pytest.raises(Exception):
await asyncio.wait_for(task_b, timeout=5)
Two Consumers Processing Continuously¶
Run two consumers in parallel and enqueue a small batch; both consumers should drain the queue until interrupted.
// Assumes `store`, `consumer_a`, and `consumer_b` already exist.
let store_a = store.clone();
let consumer_a_task_handle = consumer_a.clone();
let task_a = tokio::spawn(async move {
pgqrs::dequeue()
.worker(&consumer_a_task_handle)
.batch(10)
.handle_batch(|_msgs| Box::pin(async { Ok(()) }))
.poll(&store_a)
.await
});
let store_b = store.clone();
let consumer_b_task_handle = consumer_b.clone();
let task_b = tokio::spawn(async move {
pgqrs::dequeue()
.worker(&consumer_b_task_handle)
.batch(10)
.handle_batch(|_msgs| Box::pin(async { Ok(()) }))
.poll(&store_b)
.await
});
// Assumes `consumer_a`, `consumer_b`, `task_a`, and `task_b` exist.
consumer_a.interrupt().await.unwrap();
consumer_b.interrupt().await.unwrap();
let res_a = timeout(Duration::from_secs(5), task_a)
.await
.unwrap()
.unwrap();
let res_b = timeout(Duration::from_secs(5), task_b)
.await
.unwrap()
.unwrap();
assert!(matches!(res_a, Err(pgqrs::error::Error::Suspended { .. })));
assert!(matches!(res_b, Err(pgqrs::error::Error::Suspended { .. })));
assert_eq!(
consumer_a.status().await.unwrap(),
pgqrs::types::WorkerStatus::Suspended
);
assert_eq!(
consumer_b.status().await.unwrap(),
pgqrs::types::WorkerStatus::Suspended
);
task_a = asyncio.create_task(
pgqrs.dequeue()
.worker(consumer_a)
.batch(10)
.handle_batch(handle_batch)
.poll(store)
)
task_b = asyncio.create_task(
pgqrs.dequeue()
.worker(consumer_b)
.batch(10)
.handle_batch(handle_batch)
.poll(store)
)
try:
await consumer_a.interrupt()
except pgqrs.StateTransitionError:
pass
try:
await consumer_b.interrupt()
except pgqrs.StateTransitionError:
pass
with pytest.raises(Exception):
await asyncio.wait_for(task_a, timeout=5)
with pytest.raises(Exception):
await asyncio.wait_for(task_b, timeout=5)
assert await consumer_a.status() == "SUSPENDED"
assert await consumer_b.status() == "SUSPENDED"
Next Steps¶
- If you need retries/backoff, see
docs/user-guide/guides/durable-workflows.md - If you need visibility timeouts and scheduling, see
docs/user-guide/guides/delayed-messages.md - For production worker patterns, see
docs/user-guide/guides/worker-management.md