diff --git a/Cargo.lock b/Cargo.lock index e2b9f52..d5b602e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -50,6 +50,28 @@ dependencies = [ "libc", ] +[[package]] +name = "async-stream" +version = "0.3.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0b5a71a6f37880a80d1d7f19efd781e4b5de42c88f0722cc13bcb6cc2cfe8476" +dependencies = [ + "async-stream-impl", + "futures-core", + "pin-project-lite", +] + +[[package]] +name = "async-stream-impl" +version = "0.3.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c7c24de15d275a1ecfd47a380fb4d5ec9bfe0933f309ed5e705b775596a3574d" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "atoi" version = "2.0.0" @@ -782,6 +804,7 @@ dependencies = [ "sqlx", "thiserror", "tokio", + "tokio-test", "uuid", ] @@ -1455,6 +1478,19 @@ dependencies = [ "tokio", ] +[[package]] +name = "tokio-test" +version = "0.4.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2468baabc3311435b55dd935f702f42cd1b8abb7e754fb7dfb16bd36aa88f9f7" +dependencies = [ + "async-stream", + "bytes", + "futures-core", + "tokio", + "tokio-stream", +] + [[package]] name = "tracing" version = "0.1.40" diff --git a/Cargo.toml b/Cargo.toml index 3177620..38b2b84 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -36,6 +36,7 @@ sqlx = { version = "0.8.0", features = [ "postgres", "tls-rustls", ] } +tokio-test = "0.4.4" [package.metadata.docs.rs] all-features = true diff --git a/README.md b/README.md index a18fca6..f6e1115 100644 --- a/README.md +++ b/README.md @@ -1,8 +1,55 @@ # pgboss-rs +## Queue jobs with Rust and PostgreSQL like a boss. + [![Crates.io](https://img.shields.io/crates/v/pgboss.svg)](https://crates.io/crates/pgboss) -[![Documentation](https://docs.rs/faktory/badge.svg)](https://docs.rs/pgboss/) +[![Documentation](https://docs.rs/pgboss/badge.svg)](https://docs.rs/pgboss/) [![Codecov](https://codecov.io/github/rustworthy/pgboss-rs/coverage.svg?branch=main)](https://codecov.io/gh/rustworthy/pgboss-rs) [![dependency status](https://deps.rs/repo/github/rustworthy/pgboss-rs/status.svg)](https://deps.rs/repo/github/rustworthy/pgboss-rs) -Queueing jobs in Postgres from Rust like a boss. [PgBoss](https://github.com/timgit/pg-boss)-compatible. +Inspired by, compatible with and partially ported from [`pg-boss`](https://github.com/timgit/pg-boss/tree/master) Node.js package. + +Heavily influenced by decisions and approaches in [`faktory-rs`](https://github.com/jonhoo/faktory-rs) crate. + +```rust +use std::time::Duration; +use serde_json::json; +use pgboss::{Client, Job, JobState}; + +// Create a client first. +let c = Client::builder().schema("desired_schema_name").connect().await.unwrap(); + +// Then create a queue. +c.create_standard_queue("qname").await.unwrap(); // NB! queue should be created before pushing jobs +c.create_standard_queue("qname_dlq").await.unwrap(); + +// Build a job and ... +let job = Job::builder() + .queue_name("qname") // which queue this job should be sent to + .data(json!({"key": "value"})) // arbitrary json, your job's payload + .priority(10) // will be consumer prior to those with lower priorities + .retry_limit(1) // only retry this job once + .dead_letter("qname_dlq") // send to this queue when retry limit exceeded + .retry_delay(Duration::from_secs(60 * 5)) // do not retry immediately after failure + .expire_in(Duration::from_secs(60 * 5)) // only give the worker 5 minutes to complete the job + .retain_for(Duration::from_secs(60 * 60 * 24)) // do not archive for at least 1 day + .delay_for(Duration::from_secs(5)) // make it visible to consumers after 5 seconds + .singleton_for(Duration::from_secs(7)) // only allow one job for at least 7 seconds + .singleton_key("buzz") // allow more than one job if their key is different from this + .build(); + +// ... enqueue it. +let _id = c.send_job(&job).await.expect("no error"); + +// Consume from the queue. +let fetched_job = c + .fetch_job("qname") + .await + .expect("no error") + .expect("a job"); + +assert_eq!(fetched_job.data, job.data); +assert_eq!(fetched_job.state, JobState::Active); + +c.complete_job("qname", fetched_job.id, json!({"result": "success!"})).await.expect("no error"); +``` diff --git a/src/client/public/connect_ops.rs b/src/client/public/connect_ops.rs index e76623a..dc6e08c 100644 --- a/src/client/public/connect_ops.rs +++ b/src/client/public/connect_ops.rs @@ -8,7 +8,14 @@ use sqlx::postgres::PgPool; use super::ClientBuilder; impl Client { - /// Connect to the PostgreSQL server. + /// Create new [`Client`] and connect to a PostgreSQL server. + /// + /// If `url` is not given, `POSTGRES_PROVIDER` is read to get the name of the environment variable + /// to get the address from (defaults to `POSTGRES_URL`), and then that environment variable is read + /// to get the server address. If the latter environment variable is not defined, the connection will be + /// made to `postgres://localhost:5432`. + /// + /// You can optionally use [`Client::connect_to`] and pass the `url` as an argument. pub async fn connect() -> Result { let pool = utils::create_pool(None).await?; Client::with_pool(pool).await diff --git a/src/lib.rs b/src/lib.rs index b91b6d4..5a10f46 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,4 +1,54 @@ -//! Crate docs +//! Queue jobs with Rust and PostgreSQL like a boss. +//! +//! Inspired by, compatible with and partially ported from [`pg-boss`](https://github.com/timgit/pg-boss/tree/master) Node.js package. +//! +//! Heavily influenced by decisions and approaches in [`faktory-rs`](https://github.com/jonhoo/faktory-rs) crate. +//! +//! ```no_run +//! # tokio_test::block_on(async { +//! use std::time::Duration; +//! use serde_json::json; +//! use pgboss::{Client, Job, JobState}; +//! +//! // Create a client first. +//! let c = Client::builder().schema("desired_schema_name").connect().await.unwrap(); +//! +//! // Then create a queue. +//! c.create_standard_queue("qname").await.unwrap(); // NB! queue should be created before pushing jobs +//! c.create_standard_queue("qname_dlq").await.unwrap(); +//! +//! // Build a job and ... +//! let job = Job::builder() +//! .queue_name("jobtype") // which queue this job should be sent to +//! .data(json!({"key": "value"})) // arbitrary json, your job's payload +//! .priority(10) // will be consumer prior to those with lower priorities +//! .retry_limit(1) // only retry this job once +//! .dead_letter("jobtype_dlq") // send to this queue when retry limit exceeded +//! .retry_delay(Duration::from_secs(60 * 5)) // do not retry immediately after failure +//! .expire_in(Duration::from_secs(60 * 5)) // only give the worker 5 minutes to complete the job +//! .retain_for(Duration::from_secs(60 * 60 * 24)) // do not archive for at least 1 day +//! .delay_for(Duration::from_secs(5)) // make it visible to consumers after 5 seconds +//! .singleton_for(Duration::from_secs(7)) // only allow one job for at least 7 seconds +//! .singleton_key("buzz") // allow more than one job if their key is different from this +//! .build(); +//! +//! // ... enqueue it. +//! let _id = c.send_job(&job).await.expect("no error"); +//! +//! // Consume from the queue. +//! let fetched_job = c +//! .fetch_job("jobtype") +//! .await +//! .expect("no error") +//! .expect("a job"); +//! +//! assert_eq!(fetched_job.data, job.data); +//! assert_eq!(fetched_job.state, JobState::Active); +//! +//! c.complete_job("qname", fetched_job.id, json!({"result": "success!"})).await.expect("no error"); +//! # }); +//! ``` +//! #![deny(missing_docs)] #![cfg_attr(docsrs, feature(doc_cfg))]