Batch Processing Guide¶
This guide covers efficient batch processing patterns for high-throughput scenarios.
When to Use Batch Processing¶
- Processing thousands of messages per minute
- Reducing database round-trips
- Improving throughput with bulk operations
- ETL and data pipeline workloads
Batch Enqueueing¶
Send multiple messages in a single transaction.
use pgqrs::{Admin, Producer, Config};
use serde_json::json;
async fn batch_enqueue_example() -> Result<(), Box<dyn std::error::Error>> {
let config = Config::from_dsn("postgresql://localhost/mydb");
let admin = Admin::new(&config).await?;
let queue = admin.get_queue("tasks").await?;
let producer = Producer::new(
admin.pool.clone(), &queue, "producer", 3000, &config
).await?;
// Prepare batch of messages
let payloads: Vec<_> = (0..1000)
.map(|i| json!({"task_id": i, "data": "process me"}))
.collect();
// Send all at once
let messages = producer.batch_enqueue(&payloads).await?;
println!("Enqueued {} messages in one transaction", messages.len());
Ok(())
}
import asyncio
from pgqrs import Producer
async def batch_enqueue_example():
producer = Producer(
"postgresql://localhost/mydb",
"tasks",
"producer",
3000,
)
# Note: Python API currently sends one at a time
# For batching, use a loop with concurrent sends
tasks = []
for i in range(1000):
task = producer.enqueue({"task_id": i, "data": "process me"})
tasks.append(task)
# Send concurrently (with semaphore for connection limits)
results = await asyncio.gather(*tasks[:100]) # Batch of 100
print(f"Enqueued {len(results)} messages")
asyncio.run(batch_enqueue_example())
Batch Dequeueing¶
Fetch multiple messages in one operation.
use pgqrs::{Admin, Consumer, Config};
async fn batch_dequeue_example() -> Result<(), Box<dyn std::error::Error>> {
let config = Config::from_dsn("postgresql://localhost/mydb");
let admin = Admin::new(&config).await?;
let queue = admin.get_queue("tasks").await?;
let consumer = Consumer::new(
admin.pool.clone(), &queue, "consumer", 3001, &config
).await?;
// Fetch up to 100 messages with 30-second lock
let messages = consumer.dequeue_many_with_delay(100, 30).await?;
println!("Fetched {} messages", messages.len());
for message in &messages {
// Process each message
println!("Processing: {}", message.id);
}
// Batch archive
let ids: Vec<i64> = messages.iter().map(|m| m.id).collect();
let results = consumer.archive_many(ids).await?;
let archived = results.iter().filter(|&&r| r).count();
println!("Archived {} messages", archived);
Ok(())
}
Processing Patterns¶
Sequential Batch Processing¶
Process batches one after another with controlled throughput.
async fn sequential_batch_processing(consumer: &Consumer) -> Result<(), Error> {
loop {
// Fetch batch
let messages = consumer.dequeue_many_with_delay(100, 60).await?;
if messages.is_empty() {
tokio::time::sleep(Duration::from_secs(1)).await;
continue;
}
// Process sequentially
let mut successful_ids = Vec::new();
for message in &messages {
match process_message(message).await {
Ok(_) => successful_ids.push(message.id),
Err(e) => {
tracing::warn!("Failed to process {}: {}", message.id, e);
// Message will become available again after timeout
}
}
}
// Batch archive successful ones
if !successful_ids.is_empty() {
consumer.archive_many(successful_ids).await?;
}
}
}
Parallel Batch Processing¶
Process all messages in a batch concurrently.
use futures::future::join_all;
async fn parallel_batch_processing(consumer: &Consumer) -> Result<(), Error> {
let messages = consumer.dequeue_many_with_delay(100, 60).await?;
// Process all in parallel
let futures: Vec<_> = messages.iter().map(|m| async {
let result = process_message(m).await;
(m.id, result)
}).collect();
let results = join_all(futures).await;
// Separate successes and failures
let successful: Vec<i64> = results
.iter()
.filter(|(_, r)| r.is_ok())
.map(|(id, _)| *id)
.collect();
let failed: Vec<i64> = results
.iter()
.filter(|(_, r)| r.is_err())
.map(|(id, _)| *id)
.collect();
// Batch archive successful
consumer.archive_many(successful).await?;
// Log failures (will retry after timeout)
for id in failed {
tracing::warn!("Message {} failed, will retry", id);
}
Ok(())
}
Chunked Processing¶
Process large batches in smaller chunks.
async fn chunked_processing(consumer: &Consumer, chunk_size: usize) -> Result<(), Error> {
// Fetch large batch
let messages = consumer.dequeue_many_with_delay(1000, 300).await?;
// Process in chunks
for chunk in messages.chunks(chunk_size) {
let futures: Vec<_> = chunk.iter().map(process_message).collect();
let results = join_all(futures).await;
// Archive this chunk
let ids: Vec<i64> = chunk.iter()
.zip(results.iter())
.filter(|(_, r)| r.is_ok())
.map(|(m, _)| m.id)
.collect();
consumer.archive_many(ids).await?;
// Progress logging
tracing::info!("Processed chunk of {} messages", chunk.len());
}
Ok(())
}
Throughput Optimization¶
Tuning Batch Size¶
| Scenario | Recommended Batch Size |
|---|---|
| Quick tasks (< 10ms) | 100-500 |
| Medium tasks (10-100ms) | 50-100 |
| Slow tasks (> 100ms) | 10-50 |
| I/O bound tasks | 100-200 (with parallel) |
// Adaptive batch sizing
async fn adaptive_batch_consumer(consumer: &Consumer) -> Result<(), Error> {
let mut batch_size = 50;
let target_batch_time = Duration::from_secs(5);
loop {
let start = Instant::now();
let messages = consumer.dequeue_many_with_delay(batch_size, 60).await?;
if messages.is_empty() {
tokio::time::sleep(Duration::from_millis(100)).await;
continue;
}
// Process batch
for message in &messages {
process_message(message).await?;
}
consumer.archive_many(messages.iter().map(|m| m.id).collect()).await?;
let elapsed = start.elapsed();
// Adjust batch size
if elapsed < target_batch_time / 2 && batch_size < 500 {
batch_size = (batch_size * 3 / 2).min(500);
} else if elapsed > target_batch_time * 2 && batch_size > 10 {
batch_size = (batch_size * 2 / 3).max(10);
}
tracing::debug!("Batch size: {}, time: {:?}", batch_size, elapsed);
}
}
Connection Pool Sizing¶
For batch processing, ensure adequate connection pool size:
// In your config
let config = Config {
dsn: "postgresql://localhost/mydb".into(),
max_connections: 20, // Increase for parallel batch processing
..Default::default()
};
Batch Processing with Multiple Workers¶
Scale horizontally with multiple consumers.
use tokio::task::JoinSet;
async fn run_batch_workers(num_workers: usize) -> Result<(), Error> {
let config = Config::from_dsn("postgresql://localhost/mydb");
let admin = Admin::new(&config).await?;
let queue = admin.get_queue("tasks").await?;
let mut workers = JoinSet::new();
for i in 0..num_workers {
let pool = admin.pool.clone();
let queue = queue.clone();
let config = config.clone();
workers.spawn(async move {
let consumer = Consumer::new(
pool,
&queue,
&format!("worker-{}", i),
3000 + i as i32,
&config,
).await?;
batch_consumer_loop(&consumer).await
});
}
// Wait for all workers
while let Some(result) = workers.join_next().await {
if let Err(e) = result {
tracing::error!("Worker error: {:?}", e);
}
}
Ok(())
}
async fn batch_consumer_loop(consumer: &Consumer) -> Result<(), Error> {
loop {
let messages = consumer.dequeue_many_with_delay(100, 60).await?;
if messages.is_empty() {
tokio::time::sleep(Duration::from_millis(100)).await;
continue;
}
// Process and archive
for message in &messages {
process_message(message).await?;
}
let ids: Vec<i64> = messages.iter().map(|m| m.id).collect();
consumer.archive_many(ids).await?;
}
}
Monitoring Batch Processing¶
Track throughput and processing times:
use std::time::Instant;
async fn monitored_batch_processing(consumer: &Consumer) -> Result<(), Error> {
let mut total_processed: u64 = 0;
let start = Instant::now();
loop {
let batch_start = Instant::now();
let messages = consumer.dequeue_many_with_delay(100, 60).await?;
if messages.is_empty() {
tokio::time::sleep(Duration::from_millis(100)).await;
continue;
}
let batch_size = messages.len();
// Process batch
for message in &messages {
process_message(message).await?;
}
consumer.archive_many(messages.iter().map(|m| m.id).collect()).await?;
// Update metrics
total_processed += batch_size as u64;
let batch_time = batch_start.elapsed();
let total_time = start.elapsed();
let throughput = total_processed as f64 / total_time.as_secs_f64();
tracing::info!(
batch_size = batch_size,
batch_time_ms = batch_time.as_millis(),
total_processed = total_processed,
throughput_per_sec = throughput,
"Batch completed"
);
}
}
Best Practices¶
- Match batch size to processing time - Larger batches for quick tasks
- Use appropriate lock times - Lock time should cover entire batch
- Handle partial failures - Archive successful, let failed retry
- Monitor throughput - Track messages per second
- Scale with workers - Add consumers for more throughput
What's Next?¶
- Delayed Messages - Schedule future tasks
- Worker Management - Scale your workers