Admin API¶
The Admin provides queue management, schema administration, and monitoring capabilities.
Creating an Admin¶
use pgqrs::{Admin, Config};
let config = Config::from_dsn("postgresql://localhost/mydb");
let admin = Admin::new(&config).await?;
The Admin maintains a connection pool that can be shared with Producers and Consumers.
Properties¶
| Property | Type | Description |
|---|---|---|
pool |
PgPool |
Database connection pool |
config |
Config |
Configuration settings |
queues |
Queues |
Queue table operations |
workers |
Workers |
Worker table operations |
messages |
Messages |
Message table operations |
archive |
Archive |
Archive table operations |
Schema Management¶
install¶
Install the pgqrs schema (tables, indexes, constraints).
Run this once when setting up pgqrs for the first time. It's idempotent—safe to call multiple times.
verify¶
Verify that the pgqrs schema is correctly installed.
Returns an error if the schema is missing or corrupted.
uninstall¶
Remove the pgqrs schema (tables and data).
Danger
This permanently deletes all queues, messages, workers, and archives!
Queue Management¶
create_queue¶
Create a new queue.
let queue = admin.create_queue("email-notifications").await?;
println!("Created queue: {} (ID: {})", queue.queue_name, queue.id);
Returns: Result<QueueInfo> with:
| Field | Type | Description |
|---|---|---|
id |
i64 |
Unique queue ID |
queue_name |
String |
Queue name |
created_at |
DateTime<Utc> |
Creation timestamp |
get_queue¶
Get a queue by name.
Returns: Result<QueueInfo> - Error if queue doesn't exist.
delete_queue¶
Delete a queue and all its messages.
Warning
This permanently deletes the queue, all pending messages, and all archived messages.
queue_metrics¶
Get metrics for a specific queue.
let metrics = admin.queue_metrics("email-notifications").await?;
println!("Queue: {}", metrics.name);
println!(" Total messages: {}", metrics.total_messages);
println!(" Pending: {}", metrics.pending_messages);
println!(" Locked: {}", metrics.locked_messages);
println!(" Archived: {}", metrics.archived_messages);
if let Some(oldest) = metrics.oldest_pending_message {
println!(" Oldest pending: {}", oldest);
}
Returns: Result<QueueMetrics> with:
| Field | Type | Description |
|---|---|---|
name |
String |
Queue name |
total_messages |
i64 |
Total in messages table |
pending_messages |
i64 |
Available for processing |
locked_messages |
i64 |
Currently being processed |
archived_messages |
i64 |
In archive table |
oldest_pending_message |
Option<DateTime> |
Oldest pending timestamp |
newest_message |
Option<DateTime> |
Newest message timestamp |
all_queues_metrics¶
Get metrics for all queues.
let all_metrics = admin.all_queues_metrics().await?;
for metrics in all_metrics {
println!("{}: {} pending, {} locked, {} archived",
metrics.name,
metrics.pending_messages,
metrics.locked_messages,
metrics.archived_messages
);
}
Archive Management¶
purge_archive¶
Delete old archived messages.
Table APIs¶
Admin provides direct access to table operations through its fields:
Queues Table¶
// List all queues
let queues = admin.queues.list().await?;
// Get queue by ID
let queue = admin.queues.get(queue_id).await?;
// Get queue by name
let queue = admin.queues.get_by_name("tasks").await?;
// Count queues
let count = admin.queues.count().await?;
Workers Table¶
// List all workers
let workers = admin.workers.list().await?;
// List workers for a queue
let tx = &mut admin.pool.begin().await?;
let queue_workers = admin.workers.filter_by_fk(queue_id, tx).await?;
// Count workers
let count = admin.workers.count().await?;
Messages Table¶
use pgqrs::tables::Messages;
let messages = Messages::new(admin.pool.clone());
// Count pending messages for a queue
let pending = messages.count_pending(queue_id).await?;
// Count total messages
let total = messages.count().await?;
Archive Table¶
use pgqrs::Archive;
let archive = Archive::new(admin.pool.clone());
// Count archived messages for a queue
let tx = &mut admin.pool.begin().await?;
let count = archive.count_for_fk(queue_id, tx).await?;
// List archived messages
let archived = archive.filter_by_fk(queue_id, tx).await?;
Patterns¶
Queue Setup Script¶
async fn setup_queues(admin: &Admin) -> Result<()> {
// Install schema if needed
admin.install().await?;
// Create application queues
let queues = ["emails", "notifications", "reports", "dlq"];
for name in queues {
match admin.create_queue(name).await {
Ok(q) => println!("Created queue: {}", q.queue_name),
Err(e) if e.to_string().contains("already exists") => {
println!("Queue {} already exists", name);
}
Err(e) => return Err(e.into()),
}
}
Ok(())
}
Monitoring Dashboard¶
async fn print_dashboard(admin: &Admin) -> Result<()> {
let metrics = admin.all_queues_metrics().await?;
println!("\n{:=^60}", " Queue Dashboard ");
println!("{:<20} {:>8} {:>8} {:>8} {:>8}",
"Queue", "Total", "Pending", "Locked", "Archived");
println!("{:-^60}", "");
for m in metrics {
println!("{:<20} {:>8} {:>8} {:>8} {:>8}",
m.name,
m.total_messages,
m.pending_messages,
m.locked_messages,
m.archived_messages
);
}
Ok(())
}
Health Check Endpoint¶
use serde::Serialize;
#[derive(Serialize)]
struct HealthStatus {
healthy: bool,
database: bool,
queues: Vec<QueueHealth>,
}
#[derive(Serialize)]
struct QueueHealth {
name: String,
pending: i64,
oldest_pending_age_seconds: Option<i64>,
}
async fn health_check(admin: &Admin) -> Result<HealthStatus> {
// Check database connectivity
let db_healthy = admin.verify().await.is_ok();
// Check queue health
let metrics = admin.all_queues_metrics().await?;
let now = chrono::Utc::now();
let queues: Vec<QueueHealth> = metrics
.iter()
.map(|m| QueueHealth {
name: m.name.clone(),
pending: m.pending_messages,
oldest_pending_age_seconds: m.oldest_pending_message
.map(|t| (now - t).num_seconds()),
})
.collect();
Ok(HealthStatus {
healthy: db_healthy,
database: db_healthy,
queues,
})
}
Cleanup Job¶
async fn cleanup_old_data(admin: &Admin) -> Result<()> {
// Purge archives older than 30 days
let queues = admin.queues.list().await?;
for queue in queues {
admin.purge_archive(&queue.queue_name).await?;
tracing::info!("Purged archive for queue: {}", queue.queue_name);
}
// Purge old stopped workers
// (This would require additional CLI command or direct SQL)
Ok(())
}
Full Example¶
use pgqrs::{Admin, Config};
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let config = Config::from_dsn("postgresql://localhost/mydb");
let admin = Admin::new(&config).await?;
// Initial setup
println!("Installing schema...");
admin.install().await?;
admin.verify().await?;
println!("Schema verified");
// Create queues
println!("\nCreating queues...");
let email_queue = admin.create_queue("emails").await?;
let task_queue = admin.create_queue("tasks").await?;
println!("Created: emails ({}), tasks ({})", email_queue.id, task_queue.id);
// List queues
println!("\nAll queues:");
let queues = admin.queues.list().await?;
for q in &queues {
println!(" - {} (ID: {}, created: {})",
q.queue_name, q.id, q.created_at);
}
// Show metrics
println!("\nQueue metrics:");
let all_metrics = admin.all_queues_metrics().await?;
for m in &all_metrics {
println!(" {}: {} pending, {} archived",
m.name, m.pending_messages, m.archived_messages);
}
// The pool can be shared with producers/consumers
println!("\nConnection pool can be shared via: admin.pool.clone()");
Ok(())
}
See Also¶
- Producer API - Creating messages
- Consumer API - Processing messages
- Configuration - Configuration options
- CLI Reference - Command-line administration