Worker Management Guide¶
This guide covers running and managing multiple workers in production environments.
Understanding Workers¶
Every Producer and Consumer in pgqrs registers as a worker. Workers have:
- Unique ID
- Associated queue
- Status (Ready, Suspended, Stopped)
- Heartbeat timestamp
- Hostname and port for identification
Running Multiple Workers¶
Multiple Consumers per Queue¶
Run several consumers to increase throughput:
use pgqrs::{Admin, Consumer, Config};
use tokio::task::JoinSet;
async fn run_workers(num_workers: usize) -> Result<(), Box<dyn std::error::Error>> {
let config = Config::from_dsn("postgresql://localhost/mydb");
let admin = Admin::new(&config).await?;
let queue = admin.get_queue("tasks").await?;
let mut workers = JoinSet::new();
for i in 0..num_workers {
let pool = admin.pool.clone();
let queue = queue.clone();
let config = config.clone();
workers.spawn(async move {
let consumer = Consumer::new(
pool,
&queue,
&format!("worker-{}", i),
3000 + i as i32,
&config,
).await?;
consumer_loop(&consumer, i).await
});
}
println!("Started {} workers", num_workers);
// Wait for all workers
while let Some(result) = workers.join_next().await {
if let Err(e) = result {
eprintln!("Worker error: {:?}", e);
}
}
Ok(())
}
async fn consumer_loop(consumer: &Consumer, id: usize) -> Result<(), pgqrs::Error> {
loop {
let messages = consumer.dequeue().await?;
for message in messages {
println!("[Worker {}] Processing message {}", id, message.id);
// Process...
consumer.archive(message.id).await?;
}
if messages.is_empty() {
tokio::time::sleep(Duration::from_secs(1)).await;
}
}
}
import asyncio
from pgqrs import Consumer
async def run_workers(num_workers: int):
tasks = []
for i in range(num_workers):
consumer = Consumer(
"postgresql://localhost/mydb",
"tasks",
f"worker-{i}",
3000 + i,
)
task = asyncio.create_task(consumer_loop(consumer, i))
tasks.append(task)
print(f"Started {num_workers} workers")
await asyncio.gather(*tasks)
async def consumer_loop(consumer, worker_id: int):
while True:
messages = await consumer.dequeue()
for message in messages:
print(f"[Worker {worker_id}] Processing {message.id}")
# Process...
await consumer.archive(message.id)
if not messages:
await asyncio.sleep(1)
asyncio.run(run_workers(4))
Workers Across Multiple Processes¶
For production, run workers as separate processes:
# Start multiple worker processes
for i in {1..4}; do
WORKER_ID=$i cargo run --release --bin worker &
done
// worker binary
use std::env;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let worker_id: i32 = env::var("WORKER_ID")
.unwrap_or_else(|_| "0".into())
.parse()?;
let hostname = hostname::get()?.to_string_lossy().to_string();
let config = Config::from_env()?;
let admin = Admin::new(&config).await?;
let queue = admin.get_queue("tasks").await?;
let consumer = Consumer::new(
admin.pool.clone(),
&queue,
&hostname,
3000 + worker_id,
&config,
).await?;
println!("Worker {} started on {}:{}", worker_id, hostname, 3000 + worker_id);
run_consumer_loop(&consumer).await
}
Worker Lifecycle¶
Heartbeats¶
Send periodic heartbeats to indicate worker health:
use pgqrs::Worker;
use tokio::time::{interval, Duration};
async fn worker_with_heartbeat(consumer: Consumer) -> Result<(), Error> {
let mut heartbeat_interval = interval(Duration::from_secs(30));
loop {
tokio::select! {
_ = heartbeat_interval.tick() => {
if let Err(e) = consumer.heartbeat().await {
tracing::warn!("Heartbeat failed: {}", e);
}
}
result = consumer.dequeue() => {
let messages = result?;
for message in messages {
process_message(&message).await?;
consumer.archive(message.id).await?;
}
}
}
}
}
```python import pgqrs import asyncio
admin = pgqrs.Admin("postgresql://localhost/mydb")
Batch producer¶
producer = pgqrs.Producer(admin, "tasks", "batch-producer", 8080)
Enqueue multiple messages at once¶
payloads = [ {"task": "process_user", "user_id": i} for i in range(100, 200) ]
messages = await producer.enqueue_batch(payloads) print(f"Enqueued {len(messages)} messages")
Batch consumer¶
consumer = pgqrs.Consumer(admin, "tasks", "batch-consumer", 8081)
Process messages in batches¶
batch = await consumer.dequeue_batch(limit=50) if batch: # Process all messages in the batch results = await asyncio.gather(*[ process_message(msg.payload) for msg in batch ])
# Archive the whole batch
message_ids = [msg.id for msg in batch]
await consumer.archive_batch(message_ids)
print(f"Processed {len(batch)} messages")
```### Graceful Shutdown
Handle shutdown signals properly:
use pgqrs::Worker;
use tokio::signal;
use tokio::sync::watch;
async fn run_with_graceful_shutdown(consumer: Consumer) -> Result<(), Error> {
let (shutdown_tx, mut shutdown_rx) = watch::channel(false);
// Spawn shutdown handler
tokio::spawn(async move {
signal::ctrl_c().await.ok();
println!("\nShutdown signal received");
shutdown_tx.send(true).ok();
});
// Processing loop
loop {
if *shutdown_rx.borrow() {
break;
}
let messages = consumer.dequeue().await?;
for message in messages {
if *shutdown_rx.borrow() {
// Don't start new work during shutdown
break;
}
process_message(&message).await?;
consumer.archive(message.id).await?;
}
if messages.is_empty() {
tokio::time::sleep(Duration::from_millis(100)).await;
}
}
// Graceful shutdown
println!("Suspending worker...");
consumer.suspend().await?;
consumer.shutdown().await?;
println!("Worker shut down gracefully");
Ok(())
}
import asyncio
import signal
from pgqrs import Consumer
async def run_with_graceful_shutdown(consumer):
stop = asyncio.Event()
def handle_signal():
print("\nShutdown signal received")
stop.set()
loop = asyncio.get_event_loop()
for sig in (signal.SIGINT, signal.SIGTERM):
loop.add_signal_handler(sig, handle_signal)
while not stop.is_set():
messages = await consumer.dequeue()
for message in messages:
if stop.is_set():
break
await process_message(message)
await consumer.archive(message.id)
if not messages:
await asyncio.sleep(0.1)
print("Consumer stopped gracefully")
# Use signal handlers for graceful shutdown as shown above
Monitoring Workers¶
Via CLI¶
# List all workers
pgqrs worker list
# List workers for a queue
pgqrs worker list --queue tasks
# Check worker health
pgqrs worker health --queue tasks --max-age 300
# Get worker stats
pgqrs worker stats --queue tasks
Via Code¶
Worker Health Monitoring¶
Build a health monitoring system:
use std::collections::HashMap;
struct WorkerMonitor {
admin: Admin,
max_heartbeat_age: chrono::Duration,
}
impl WorkerMonitor {
async fn check_health(&self) -> Result<Vec<UnhealthyWorker>> {
let workers = self.admin.workers.list().await?;
let now = chrono::Utc::now();
let unhealthy: Vec<_> = workers
.into_iter()
.filter(|w| {
w.status == WorkerStatus::Ready &&
(now - w.last_heartbeat) > self.max_heartbeat_age
})
.map(|w| UnhealthyWorker {
id: w.id,
last_seen: w.last_heartbeat,
queue_id: w.queue_id,
})
.collect();
Ok(unhealthy)
}
async fn alert_unhealthy(&self) -> Result<()> {
let unhealthy = self.check_health().await?;
for worker in unhealthy {
tracing::error!(
worker_id = worker.id,
last_seen = %worker.last_seen,
"Worker appears unhealthy"
);
// Send alert via your preferred method
// send_alert(&worker).await?;
}
Ok(())
}
}
For Python, use the CLI for health monitoring until full API support is available:
import subprocess
import json
def check_worker_health(queue: str, max_age: int = 300) -> dict:
"""Check worker health using CLI."""
result = subprocess.run(
["pgqrs", "worker", "health", "--queue", queue,
"--max-age", str(max_age), "--format", "json"],
capture_output=True,
text=True,
)
if result.returncode == 0:
return json.loads(result.stdout)
return {"error": result.stderr}
# Or implement simple monitoring with available Python APIs
async def monitor_workers(admin):
workers = await admin.get_workers()
count = await workers.count()
print(f"Active workers: {count}")
# Additional monitoring via CLI or database queries
Scaling Strategies¶
Auto-Scaling Based on Queue Depth¶
async fn auto_scale_workers(
admin: &Admin,
queue_name: &str,
min_workers: usize,
max_workers: usize,
messages_per_worker: i64,
) -> Result<usize> {
let metrics = admin.queue_metrics(queue_name).await?;
let pending = metrics.pending_messages;
// Calculate desired workers
let desired = ((pending / messages_per_worker) as usize)
.clamp(min_workers, max_workers);
tracing::info!(
pending = pending,
desired_workers = desired,
"Auto-scale calculation"
);
Ok(desired)
}
import subprocess
import json
def get_queue_metrics(queue_name: str) -> dict:
"""Get queue metrics via CLI."""
result = subprocess.run(
["pgqrs", "queue", "metrics", queue_name, "--format", "json"],
capture_output=True,
text=True,
)
return json.loads(result.stdout) if result.returncode == 0 else {}
def calculate_desired_workers(
queue_name: str,
min_workers: int = 1,
max_workers: int = 10,
messages_per_worker: int = 100,
) -> int:
"""Calculate desired number of workers based on queue depth."""
metrics = get_queue_metrics(queue_name)
pending = metrics.get("pending_messages", 0)
desired = max(min_workers, min(max_workers, pending // messages_per_worker))
print(f"Queue depth: {pending}, Desired workers: {desired}")
return desired
Worker Pool Manager¶
struct WorkerPool {
workers: Vec<JoinHandle<Result<(), Error>>>,
shutdown_tx: watch::Sender<bool>,
config: Config,
queue: QueueInfo,
}
impl WorkerPool {
async fn new(config: Config, queue: QueueInfo, initial_size: usize) -> Self {
let (shutdown_tx, _) = watch::channel(false);
let mut pool = Self {
workers: Vec::new(),
shutdown_tx,
config,
queue,
};
pool.scale_to(initial_size).await;
pool
}
async fn scale_to(&mut self, target: usize) {
let current = self.workers.len();
if target > current {
// Add workers
for i in current..target {
let worker = self.spawn_worker(i).await;
self.workers.push(worker);
}
} else if target < current {
// Remove workers (send shutdown signal)
// In practice, you'd mark specific workers for shutdown
}
}
async fn spawn_worker(&self, id: usize) -> JoinHandle<Result<(), Error>> {
let config = self.config.clone();
let queue = self.queue.clone();
let mut shutdown_rx = self.shutdown_tx.subscribe();
tokio::spawn(async move {
let admin = Admin::new(&config).await?;
let consumer = Consumer::new(
admin.pool,
&queue,
&format!("pool-worker-{}", id),
3000 + id as i32,
&config,
).await?;
loop {
if *shutdown_rx.borrow() {
consumer.suspend().await?;
consumer.shutdown().await?;
break;
}
let messages = consumer.dequeue().await?;
for msg in messages {
process_message(&msg).await?;
consumer.archive(msg.id).await?;
}
if messages.is_empty() {
tokio::time::sleep(Duration::from_millis(100)).await;
}
}
Ok(())
})
}
async fn shutdown(&mut self) {
self.shutdown_tx.send(true).ok();
for worker in self.workers.drain(..) {
worker.await.ok();
}
}
}
import asyncio
from pgqrs import Consumer
class WorkerPool:
def __init__(self, dsn: str, queue_name: str, initial_size: int = 1):
self.dsn = dsn
self.queue_name = queue_name
self.workers: list[asyncio.Task] = []
self.shutdown_event = asyncio.Event()
# Spawn initial workers
for i in range(initial_size):
self._spawn_worker(i)
def _spawn_worker(self, worker_id: int):
task = asyncio.create_task(
self._worker_loop(worker_id),
name=f"worker-{worker_id}"
)
self.workers.append(task)
async def _worker_loop(self, worker_id: int):
consumer = Consumer(
self.dsn,
self.queue_name,
f"pool-worker-{worker_id}",
3000 + worker_id,
)
while not self.shutdown_event.is_set():
try:
messages = await consumer.dequeue()
for msg in messages:
if self.shutdown_event.is_set():
break
await self._process_message(msg)
await consumer.archive(msg.id)
if not messages:
await asyncio.sleep(0.1)
except Exception as e:
print(f"Worker {worker_id} error: {e}")
await asyncio.sleep(1)
print(f"Worker {worker_id} stopped")
async def _process_message(self, message):
# Override this in subclass
print(f"Processing message {message.id}")
def scale_to(self, target: int):
current = len(self.workers)
if target > current:
for i in range(current, target):
self._spawn_worker(i)
# Note: scaling down requires more complex logic
async def shutdown(self):
self.shutdown_event.set()
await asyncio.gather(*self.workers, return_exceptions=True)
print("Worker pool shut down")
# Usage
async def main():
pool = WorkerPool("postgresql://localhost/mydb", "tasks", initial_size=4)
try:
# Run until interrupted
await asyncio.Event().wait()
except asyncio.CancelledError:
await pool.shutdown()
asyncio.run(main())
Cleanup¶
Purge Stopped Workers¶
# Remove workers stopped more than 7 days ago
pgqrs worker purge --older-than 7d
# Remove workers stopped more than 30 days ago
pgqrs worker purge --older-than 30d
Delete Specific Worker¶
Best Practices¶
- Use meaningful identifiers - Include hostname and worker ID
- Send regular heartbeats - Every 30-60 seconds
- Implement graceful shutdown - Suspend before stopping
- Monitor worker health - Alert on stale heartbeats
- Clean up stopped workers - Purge periodically
- Scale based on queue depth - Not just time of day
- Use connection pooling - Share pools across workers when possible
Worker Health Monitoring¶
Monitor workers proactively to detect issues early:
use pgqrs::{Admin, WorkerHandle};
use chrono::Duration;
async fn check_worker_health(admin: &Admin) -> Result<(), Box<dyn std::error::Error>> {
let workers = admin.get_workers().await?;
let worker_list = workers.list().await?;
for worker in worker_list {
let handle = WorkerHandle::new(admin.pool.clone(), worker.id);
// Check if worker responded to heartbeat in last 5 minutes
if !handle.is_healthy(Duration::minutes(5)).await? {
// Alert your monitoring system
alert_stale_worker(&worker).await?;
}
}
Ok(())
}
async def monitor_workers(admin: pgqrs.Admin):
workers = await admin.get_workers()
worker_list = await workers.list()
stale_threshold = datetime.utcnow() - timedelta(minutes=5)
for worker in worker_list:
if worker.updated_at < stale_threshold:
# Send alert to monitoring system
await send_alert(f"Worker {worker.id} is stale")
Automated Cleanup¶
Set up automated cleanup of old workers:
// Run daily cleanup
async fn daily_worker_cleanup(admin: Admin) {
let mut interval = tokio::time::interval(Duration::from_secs(24 * 60 * 60));
loop {
interval.tick().await;
// Clean up workers stopped more than 7 days ago
if let Err(e) = cleanup_old_workers(&admin, 7).await {
log::error!("Worker cleanup failed: {}", e);
}
}
}
async def scheduled_cleanup(admin: pgqrs.Admin):
while True:
try:
# Clean up workers stopped more than 7 days ago
count = await cleanup_stopped_workers(admin, older_than_days=7)
print(f"Cleaned up {count} old workers")
# Wait 24 hours
await asyncio.sleep(24 * 60 * 60)
except Exception as e:
print(f"Cleanup error: {e}")
await asyncio.sleep(3600) # Retry in 1 hour
What's Next?¶
- Batch Processing - High-throughput patterns
- CLI Reference - Worker management commands