Skip to content

pgqrs

pgqrs is a postgres-native, library-only durable execution engine.

Written in Rust with Python bindings. Built for Postgres. Also supports SQLite, Turso, and S3-backed queues.

What is Durable Execution?

A durable execution engine ensures workflows resume from application crashes or pauses. Each step executes exactly once. State persists in the database. Processes resume from the last completed step.

Key Properties

  • Postgres-native: Leverages SKIP LOCKED, ACID transactions
  • Library-only: Runs in-process with your application
  • Multi-backend: Postgres, SQLite, Turso, and S3-backed queues
  • Type-safe: Rust core with idiomatic Python bindings
  • Transaction-safe: Exactly-once step execution within database transactions

Choose Your Backend

Scenario Recommended Backend Why
Production with multiple workers PostgreSQL Full concurrency, no writer conflicts
CLI tools & scripts SQLite / Turso Zero-config, embedded, portable
Testing & prototyping SQLite / Turso Fast setup, no external dependencies
Embedded applications SQLite / Turso Single-file database, no server
Remote durable queue without a database server S3 SQLite queue state persisted as an S3 object
High write throughput PostgreSQL SQLite/Turso allow only 1 writer at a time

S3 Queue Model

The S3 backend stores queue state as a SQLite database file in object storage.

  • s3://bucket/key.sqlite selects the S3 backend
  • Durable mode syncs writes to S3 before returning
  • Local mode gives Rust applications explicit snapshot() / sync() control

Learn the S3 lifecycle

SQLite/Turso Concurrency Limit

SQLite and Turso use database-level locks. With many concurrent writers, you may hit lock contention. See SkyPilot's findings on SQLite concurrency. pgqrs enables WAL mode and sets a 5s busy timeout to mitigate this, but PostgreSQL is recommended for multi-worker scenarios.

Benchmark Highlights

Current queue benchmark baselines show:

  • PostgreSQL scales with consumers and batch size in the fixed-backlog drain scenario
  • SQLite benefits from larger batch sizes but does not scale with more consumers in that same scenario
  • Turso benchmark guidance is still WIP

See benchmark methodology and scenario writeups

Job Queue

Simple, reliable message queue for background processing:

use pgqrs;
use serde_json::json;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    // Connect to PostgreSQL
    let store = pgqrs::connect("postgresql://localhost/mydb").await?;

    // Setup (run once)
    pgqrs::admin(&store).install().await?;
    store.queue("tasks").await?;

    // Producer: enqueue a job
    let ids = pgqrs::enqueue()
        .message(&json!({"task": "send_email", "to": "user@example.com"}))
        .to("tasks")
        .execute(&store)
        .await?;

    // Consumer: process jobs
    pgqrs::dequeue()
        .from("tasks")
        .handle(|msg| async move {
            println!("Processing: {:?}", msg.payload);
            Ok(())
        })
        .execute(&store)
        .await?;

    Ok(())
}
import asyncio
import pgqrs

async def main():
    # Connect to PostgreSQL
    store = await pgqrs.connect("postgresql://localhost/mydb")

    # Setup (run once)
    admin = pgqrs.admin(store)
    await admin.install()
    await store.queue("tasks")

    # Producer: enqueue a job
    producer = await store.producer("tasks")
    msg_id = await producer.enqueue({
        "task": "send_email",
        "to": "user@example.com"
    })
    print(f"Enqueued: {msg_id}")

    # Consumer: process jobs
    consumer = await store.consumer("tasks")
    messages = await consumer.dequeue(batch_size=1)
    for msg in messages:
        print(f"Processing: {msg.payload}")
        await consumer.archive(msg.id)

asyncio.run(main())

Learn more about Producer & Consumer

Durable Workflows

Orchestrate multi-step processes that survive crashes and resume from where they left off:

use pgqrs;
use serde_json::json;

// A workflow definition is just async code plus durable steps.
#[pgqrs::pgqrs_workflow(name = "archive_files")]
async fn archive_files(
    run: &pgqrs::Run,
    input: serde_json::Value,
) -> Result<serde_json::Value, pgqrs::Error> {
    // Step results are persisted. If the worker crashes after this step,
    // pgqrs will replay the cached result instead of re-running it.
    let files = pgqrs::workflow_step(run, "list_files", || async {
        Ok::<_, pgqrs::Error>(vec![input["path"].as_str().unwrap().to_string()])
    })
    .await?;

    // The second step sees the output of the first step just like normal async code.
    let archive_path = pgqrs::workflow_step(run, "create_archive", || async {
        Ok::<_, pgqrs::Error>(format!("{}.zip", files[0]))
    })
    .await?;

    Ok(json!({ "archive": archive_path }))
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let store = pgqrs::connect("postgresql://localhost/mydb").await?;

    // Install schema once per database.
    pgqrs::admin(&store).install().await?;

    // Register the workflow definition. This is idempotent.
    pgqrs::workflow()
        .name(archive_files)
        .create()
        .await?;

    // Start a worker for this workflow queue.
    //
    // For a quickstart, spawning the worker in the same process keeps the example
    // self-contained. In production, run this polling loop in a dedicated worker
    // service instead.
    let consumer = pgqrs::consumer("workflow-worker", 8080, archive_files.name())
        .create(&store)
        .await?;
    let store_for_worker = store.clone();
    let consumer_for_worker = consumer.clone();
    let worker_task = tokio::spawn(async move {
        pgqrs::workflow()
            .name(archive_files)
            .consumer(&consumer_for_worker)
            .poll(&store_for_worker)
            .await
    });

    // Trigger a new workflow run. This only enqueues work; it does not execute it inline.
    let message = pgqrs::workflow()
        .name(archive_files)
        .trigger(&json!({"path": "/tmp/report.csv"}))?
        .execute()
        .await?;

    // Wait for the workflow result while the worker polls in the background.
    let result: serde_json::Value = pgqrs::run()
        .message(message)
        .store(&store)
        .result()
        .await?;
    println!("Workflow result: {:?}", result);

    // Stop the background worker before exiting the example.
    consumer.interrupt().await?;
    let _ = worker_task.await;

    Ok(())
}
import asyncio
import pgqrs
from pgqrs.decorators import step as step_def
from pgqrs.decorators import workflow as workflow_def


@workflow_def(name="archive_files")
async def archive_files_wf(ctx, input_data: dict) -> dict:
    # Decorated steps persist their results automatically.
    @step_def
    async def list_files(step_ctx):
        return [input_data["path"]]

    @step_def
    async def create_archive(step_ctx, files):
        return f"{files[0]}.zip"

    files = await list_files(ctx)
    archive_path = await create_archive(ctx, files)
    return {"archive": archive_path}


async def main():
    store = await pgqrs.connect("postgresql://localhost/mydb")
    admin = pgqrs.admin(store)

    # Install schema once per database.
    await admin.install()

    # Register the workflow definition. This is idempotent.
    await pgqrs.workflow().name("archive_files").store(store).create()

    # Start a worker for the workflow queue.
    #
    # For a quickstart, running it as a background task keeps the example small.
    # In production, run the polling loop in a separate worker process.
    consumer = await store.consumer("archive_files")
    worker_task = asyncio.create_task(
        pgqrs.dequeue()
        .worker(consumer)
        .handle_workflow(archive_files_wf)
        .poll(store)
    )

    # Trigger a workflow run. This only enqueues the input payload.
    message = await (
        pgqrs.workflow()
        .name("archive_files")
        .store(store)
        .trigger({"path": "/tmp/report.csv"})
        .execute()
    )

    # Wait for the workflow result while the worker polls in the background.
    result = await pgqrs.run().message(message).store(store).result()
    print(f"Workflow result: {result}")

    # Stop the background worker before exiting the example.
    await consumer.interrupt()
    try:
        await worker_task
    except Exception:
        pass

asyncio.run(main())

Key benefits:

  • Crash recovery: Automatically resumes from the last completed step
  • Exactly-once semantics: Completed steps are never re-executed
  • Persistent state: All progress stored in PostgreSQL

Learn more about Durable Workflows

Follow the Durable Workflows Guide

Next Steps