Producer API¶
The Producer is responsible for creating and enqueueing messages to a queue.
Creating a Producer¶
use pgqrs::{Admin, Producer, Config};
let config = Config::from_dsn("postgresql://localhost/mydb");
let admin = Admin::new(&config).await?;
let queue = admin.get_queue("tasks").await?;
let producer = Producer::new(
admin.pool.clone(), // Database connection pool
&queue, // Queue to produce to
"my-service", // Hostname for worker identification
3000, // Port for worker identification
&config, // Configuration
).await?;
Parameters¶
| Parameter | Type | Description |
|---|---|---|
pool |
PgPool |
SQLx PostgreSQL connection pool |
queue |
&QueueInfo |
Queue to produce messages to |
hostname |
&str |
Hostname for worker identification |
port |
i32 |
Port for worker identification |
config |
&Config |
pgqrs configuration |
The hostname and port are used to identify this producer as a worker in the database.
Methods¶
enqueue¶
Enqueue a single message to the queue.
use serde_json::json;
let payload = json!({
"action": "send_email",
"to": "user@example.com",
"subject": "Welcome!"
});
let message = producer.enqueue(&payload).await?;
println!("Message ID: {}", message.id);
println!("Enqueued at: {}", message.enqueued_at);
Returns: Result<QueueMessage> - The created message with its ID and metadata.
batch_enqueue¶
Enqueue multiple messages in a single transaction.
let payloads = vec![
json!({"user_id": 1, "action": "welcome_email"}),
json!({"user_id": 2, "action": "welcome_email"}),
json!({"user_id": 3, "action": "welcome_email"}),
];
let messages = producer.batch_enqueue(&payloads).await?;
println!("Enqueued {} messages", messages.len());
for msg in &messages {
println!(" ID: {}", msg.id);
}
Returns: Result<Vec<QueueMessage>> - All created messages.
Performance
Batch enqueue is more efficient than multiple single enqueues because it uses a single database transaction.
enqueue_delayed¶
Enqueue a message that won't be visible until after a delay.
let payload = json!({
"reminder": "Follow up with customer",
"customer_id": 12345
});
// Message available after 5 minutes (300 seconds)
let message = producer.enqueue_delayed(&payload, 300).await?;
println!("Message {} will be available at {}", message.id, message.vt);
Parameters:
| Parameter | Type | Description |
|---|---|---|
payload |
&Value |
JSON payload |
delay_seconds |
i64 |
Seconds until message is visible |
Returns: Result<QueueMessage> - The created message with future visibility timeout.
extend_visibility¶
Extend the lock on a message being processed.
// Consumer dequeued message with ID 42
// Processing is taking longer than expected...
// Extend lock by 30 more seconds
let extended = consumer.extend_visibility(42, 30).await?;
if extended {
println!("Lock extended, continue processing...");
} else {
println!("Failed to extend - message may have been released");
}
Parameters:
| Parameter | Type | Description |
|---|---|---|
message_id |
i64 |
ID of the message |
duration_seconds |
i64 |
Additional seconds to lock |
Returns: Result<bool> - true if extended, false if message not found or already released.
Warning
You can only extend visibility on messages that are currently locked (being processed).
Worker Methods¶
Producer implements the Worker trait, giving access to worker lifecycle methods:
use pgqrs::Worker;
// Get worker ID
let id = producer.worker_id();
// Check status
let status = producer.status().await?;
// Send heartbeat
producer.heartbeat().await?;
// Check if healthy (last heartbeat within 5 minutes)
let healthy = producer.is_healthy(chrono::Duration::minutes(5)).await?;
// Lifecycle operations
producer.suspend().await?;
producer.resume().await?;
producer.shutdown().await?;
See Workers for details on worker lifecycle.
Patterns¶
Async Producer Service¶
use std::sync::Arc;
struct EmailService {
producer: Arc<Producer>,
}
impl EmailService {
async fn new(admin: &Admin, queue: &QueueInfo) -> Result<Self> {
let producer = Producer::new(
admin.pool.clone(),
queue,
"email-service",
3000,
&admin.config,
).await?;
Ok(Self {
producer: Arc::new(producer),
})
}
async fn send_welcome(&self, user_id: i64) -> Result<i64> {
let payload = json!({
"type": "welcome",
"user_id": user_id
});
let msg = self.producer.enqueue(&payload).await?;
Ok(msg.id)
}
async fn send_batch(&self, user_ids: Vec<i64>) -> Result<Vec<i64>> {
let payloads: Vec<_> = user_ids
.iter()
.map(|id| json!({"type": "welcome", "user_id": id}))
.collect();
let messages = self.producer.batch_enqueue(&payloads).await?;
Ok(messages.iter().map(|m| m.id).collect())
}
}
Rate-Limited Producer¶
use tokio::time::{sleep, Duration};
async fn enqueue_with_rate_limit(
producer: &Producer,
payloads: Vec<Value>,
rate_per_second: usize,
) -> Result<Vec<i64>> {
let mut ids = Vec::new();
let delay = Duration::from_secs(1) / rate_per_second as u32;
for payload in payloads {
let msg = producer.enqueue(&payload).await?;
ids.push(msg.id);
sleep(delay).await;
}
Ok(ids)
}
Producer with Retry¶
use tokio::time::{sleep, Duration};
async fn enqueue_with_retry(
producer: &Producer,
payload: &Value,
max_retries: u32,
) -> Result<QueueMessage> {
let mut attempts = 0;
loop {
match producer.enqueue(payload).await {
Ok(msg) => return Ok(msg),
Err(e) if attempts < max_retries => {
attempts += 1;
tracing::warn!("Enqueue failed, retry {}/{}: {}", attempts, max_retries, e);
sleep(Duration::from_millis(100 * 2_u64.pow(attempts))).await;
}
Err(e) => return Err(e),
}
}
}
Full Example¶
use pgqrs::{Admin, Producer, Config, Worker};
use serde_json::json;
use tokio::signal;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Setup
let config = Config::from_dsn("postgresql://localhost/mydb");
let admin = Admin::new(&config).await?;
let queue = admin.create_queue("notifications").await?;
// Create producer
let producer = Producer::new(
admin.pool.clone(),
&queue,
"notification-service",
3000,
&config,
).await?;
println!("Producer worker ID: {}", producer.worker_id());
// Enqueue immediate messages
let msg1 = producer.enqueue(&json!({
"type": "email",
"to": "user@example.com"
})).await?;
println!("Sent immediate message: {}", msg1.id);
// Enqueue delayed message
let msg2 = producer.enqueue_delayed(&json!({
"type": "reminder",
"to": "user@example.com"
}), 3600).await?;
println!("Sent delayed message: {} (available in 1 hour)", msg2.id);
// Batch enqueue
let batch = producer.batch_enqueue(&vec![
json!({"type": "sms", "to": "+1234567890"}),
json!({"type": "sms", "to": "+0987654321"}),
]).await?;
println!("Sent batch of {} messages", batch.len());
// Graceful shutdown on Ctrl+C
signal::ctrl_c().await?;
producer.suspend().await?;
producer.shutdown().await?;
println!("Producer shut down gracefully");
Ok(())
}
See Also¶
- Consumer API - Processing messages
- Admin API - Queue management
- Batch Processing Guide - High-throughput patterns
- Delayed Messages Guide - Scheduling messages