Skip to content

Commit

Permalink
Add crate Error enum
Browse files Browse the repository at this point in the history
  • Loading branch information
rustworthy committed Aug 29, 2024
1 parent d0d1363 commit 413e70e
Show file tree
Hide file tree
Showing 8 changed files with 46 additions and 27 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ sqlx = { version = "=0.8.0", features = [
"chrono",
"uuid",
] }
thiserror = "1.0.63"
uuid = { version = "1.10.0", features = ["v4"] }

[dev-dependencies]
Expand Down
10 changes: 5 additions & 5 deletions src/client/builder.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use sqlx::postgres::PgPool;

use super::{opts, Client};
use crate::utils;
use crate::{utils, Error};

/// Builder for [`Client`].
#[derive(Debug, Clone)]
Expand All @@ -28,7 +28,7 @@ impl ClientBuilder {
}

/// Connect to the PostgreSQL server.
pub async fn connect(self) -> Result<Client, sqlx::Error> {
pub async fn connect(self) -> Result<Client, Error> {
let pool = utils::create_pool(None).await?;
self.with_pool(pool).await
}
Expand All @@ -37,7 +37,7 @@ impl ClientBuilder {
///
/// To configure `ssl` (e.g. `sslmode=require`), you will need to build
/// your own `Pool` and use [`ClientBuilder::with_pool`] method instead.
pub async fn connect_to<S>(self, url: S) -> Result<Client, sqlx::Error>
pub async fn connect_to<S>(self, url: S) -> Result<Client, Error>
where
S: AsRef<str>,
{
Expand All @@ -46,10 +46,10 @@ impl ClientBuilder {
}

/// Bring your own pool.
pub async fn with_pool(self, pool: PgPool) -> Result<Client, sqlx::Error> {
pub async fn with_pool(self, pool: PgPool) -> Result<Client, Error> {
let opts = opts::ClientOptions {
schema: self.schema,
};
Client::new(pool, opts).await
Ok(Client::new(pool, opts).await?)
}
}
31 changes: 16 additions & 15 deletions src/client/public.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use sqlx::types::Json;
use uuid::Uuid;

use super::{builder::ClientBuilder, opts, Client};
use crate::Error;

impl Client {
/// Create an instance of [`ClientBuilder`]
Expand All @@ -18,7 +19,7 @@ impl Client {
}

/// Connect to the PostgreSQL server.
pub async fn connect() -> Result<Client, sqlx::Error> {
pub async fn connect() -> Result<Client, Error> {
let pool = utils::create_pool(None).await?;
Client::with_pool(pool).await
}
Expand All @@ -27,7 +28,7 @@ impl Client {
///
/// To configure `ssl` (e.g. `sslmode=require`), you will need to build
/// your own `Pool` and use [`ClientBuilder::with_pool`] method instead.
pub async fn connect_to<U>(url: U) -> Result<Client, sqlx::Error>
pub async fn connect_to<U>(url: U) -> Result<Client, Error>
where
U: AsRef<str>,
{
Expand All @@ -36,28 +37,28 @@ impl Client {
}

/// Bring your own pool.
pub async fn with_pool(pool: PgPool) -> Result<Self, sqlx::Error> {
pub async fn with_pool(pool: PgPool) -> Result<Self, Error> {
let opts = opts::ClientOptions::default();
Client::new(pool, opts).await
Ok(Client::new(pool, opts).await?)
}

/// Registers a customized queue in the database.
pub async fn create_queue<'a, Q>(&self, opts: Q) -> Result<(), sqlx::Error>
pub async fn create_queue<'a, Q>(&self, opts: Q) -> Result<(), Error>
where
Q: Borrow<QueueOptions<'a>>,
{
let stmt = sql::proc::create_queue(&self.opts.schema);
let q_opts = opts.borrow();
sqlx::query(&stmt)
Ok(sqlx::query(&stmt)
.bind(q_opts.name)
.bind(Json(q_opts))
.execute(&self.pool)
.await
.map(|_| ())
.map(|_| ())?)
}

/// Registers a standard queue in the database.
pub async fn create_standard_queue<Q>(&self, name: Q) -> Result<(), sqlx::Error>
pub async fn create_standard_queue<Q>(&self, name: Q) -> Result<(), Error>
where
Q: AsRef<str>,
{
Expand All @@ -69,7 +70,7 @@ impl Client {
}

/// Returns [`QueueInfo`] on the queue with this name, if any.
pub async fn get_queue<Q>(&self, queue_name: Q) -> Result<Option<QueueInfo>, sqlx::Error>
pub async fn get_queue<Q>(&self, queue_name: Q) -> Result<Option<QueueInfo>, Error>
where
Q: AsRef<str>,
{
Expand All @@ -82,7 +83,7 @@ impl Client {
}

/// Return info on all the queues in the system.
pub async fn get_queues(&self) -> Result<Vec<QueueInfo>, sqlx::Error> {
pub async fn get_queues(&self) -> Result<Vec<QueueInfo>, Error> {
let stmt = sql::dml::get_queues(&self.opts.schema);
let queues: Vec<QueueInfo> = sqlx::query_as(&stmt).fetch_all(&self.pool).await?;
Ok(queues)
Expand All @@ -92,20 +93,20 @@ impl Client {
///
/// Deletes a queue and all jobs from the active job table.
/// Any jobs in the archive table are retained.
pub async fn delete_queue<Q>(&self, queue_name: Q) -> Result<(), sqlx::Error>
pub async fn delete_queue<Q>(&self, queue_name: Q) -> Result<(), Error>
where
Q: AsRef<str>,
{
let stmt = sql::proc::delete_queue(&self.opts.schema);
sqlx::query(&stmt)
Ok(sqlx::query(&stmt)
.bind(queue_name.as_ref())
.execute(&self.pool)
.await
.map(|_| ())
.map(|_| ())?)
}

/// Enqueue a job.
pub async fn send_job<J>(&self, job: J) -> Result<Option<Uuid>, sqlx::Error>
pub async fn send_job<J>(&self, job: J) -> Result<Option<Uuid>, Error>
where
J: Into<Job>,
{
Expand All @@ -122,7 +123,7 @@ impl Client {
}

/// Create and enqueue a job.
pub async fn send<Q, D>(&self, name: Q, data: D) -> Result<Option<Uuid>, sqlx::Error>
pub async fn send<Q, D>(&self, name: Q, data: D) -> Result<Option<Uuid>, Error>
where
Q: AsRef<str>,
D: Into<serde_json::Value>,
Expand Down
10 changes: 10 additions & 0 deletions src/error.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
use thiserror::Error;

/// Enumerates all errors that this crate may return.
#[derive(Debug, Error)]
#[non_exhaustive]
pub enum Error {
/// Sqlx PostgreSQL driver error.
#[error("db driver error")]
Sqlx(#[from] sqlx::Error),
}
2 changes: 2 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,14 @@
#![cfg_attr(docsrs, feature(doc_cfg))]

mod client;
mod error;
mod job;
mod queue;
mod sql;
mod utils;

pub use client::{Client, ClientBuilder};
pub use error::Error;
pub use job::Job;
pub use queue::{QueueInfo, QueueOptions, QueuePolicy};

Expand Down
3 changes: 2 additions & 1 deletion src/utils.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use sqlx::{postgres::PgPoolOptions, PgPool};
use crate::Error;

pub(crate) async fn create_pool(url: Option<&str>) -> Result<PgPool, sqlx::Error> {
pub(crate) async fn create_pool(url: Option<&str>) -> Result<PgPool, Error> {
let pool = match url {
Some(url) => {
PgPoolOptions::new()
Expand Down
15 changes: 9 additions & 6 deletions tests/e2e/queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::time::Duration;

use crate::utils::{self, POSRGRES_URL};
use chrono::Utc;
use pgboss::{Client, QueueOptions, QueuePolicy};
use pgboss::{Client, Error, QueueOptions, QueuePolicy};
use sqlx::postgres::PgPoolOptions;

#[tokio::test]
Expand Down Expand Up @@ -163,11 +163,14 @@ async fn create_queue_already_exists() {
let client = Client::builder().schema(local).connect().await.unwrap();
client.create_standard_queue("job_type").await.unwrap();

let err = client.create_standard_queue("job_type").await.unwrap_err();
assert_eq!(
err.into_database_error().unwrap().constraint().unwrap(),
"queue_pkey"
);
if let Error::Sqlx(e) = client.create_standard_queue("job_type").await.unwrap_err() {
assert_eq!(
e.into_database_error().unwrap().constraint().unwrap(),
"queue_pkey"
);
} else {
unreachable!()
}
}

#[tokio::test]
Expand Down

0 comments on commit 413e70e

Please sign in to comment.