Skip to content

Commit

Permalink
Use Job::queue_name
Browse files Browse the repository at this point in the history
  • Loading branch information
rustworthy committed Sep 7, 2024
1 parent cbdbc0a commit 559921d
Show file tree
Hide file tree
Showing 8 changed files with 78 additions and 37 deletions.
2 changes: 2 additions & 0 deletions src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ pub use builder::ClientBuilder;
#[derive(Debug, Clone)]
struct Statements {
fetch_jobs: String,
delete_jobs: String,
create_job: String,
create_queue: String,
get_queue: String,
Expand All @@ -21,6 +22,7 @@ impl Statements {
fn for_schema(name: &str) -> Statements {
Statements {
fetch_jobs: sql::dml::fetch_jobs(name),
delete_jobs: sql::dml::delete_jobs(name),
create_job: sql::proc::create_job(name),
create_queue: sql::proc::create_queue(name),
get_queue: sql::dml::get_queue(name),
Expand Down
16 changes: 15 additions & 1 deletion src/client/public/job_ops.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ impl Client {
let job = job.borrow();
let id: Option<Uuid> = sqlx::query_scalar(&self.stmt.create_job)
.bind(job.id)
.bind(&job.name)
.bind(&job.queue_name)
.bind(Json(&job.data))
.bind(Json(&job.opts))
.fetch_one(&self.pool)
Expand Down Expand Up @@ -89,4 +89,18 @@ impl Client {
.await?;
Ok(maybe_job)
}

/// Delete a job from a queue.
pub async fn delete_job<Q>(&self, queue_name: Q, job_id: Uuid) -> Result<(), Error>
where
Q: AsRef<str>,
{
let deleted_count: (i64,) = sqlx::query_as(&self.stmt.delete_jobs)
.bind(queue_name.as_ref())
.bind([job_id])
.fetch_one(&self.pool)
.await?;
println!("{:?}", deleted_count);
Ok(())
}
}
29 changes: 11 additions & 18 deletions src/job.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,8 +99,8 @@ pub struct Job {
#[serde(skip_serializing_if = "Option::is_none")]
pub id: Option<Uuid>,

/// Job's name.
pub name: String,
/// Name of the queue to put this job onto.
pub queue_name: String,

/// Job's payload.
pub data: serde_json::Value,
Expand All @@ -119,8 +119,8 @@ pub struct ActiveJob {
/// ID of this job.
pub id: Uuid,

/// Job's name.
pub name: String,
/// Name of the queue this job was fetched from.
pub queue_name: String,

/// Job's payload.
pub data: serde_json::Value,
Expand All @@ -135,7 +135,7 @@ pub struct ActiveJob {
impl FromRow<'_, PgRow> for ActiveJob {
fn from_row(row: &PgRow) -> sqlx::Result<Self> {
let id: Uuid = row.try_get("id")?;
let name: String = row.try_get("name")?;
let queue_name: String = row.try_get("name")?;
let data: serde_json::Value = row.try_get("data")?;
let expire_in: Duration = row.try_get("expire_in").and_then(|v: f64| match v {
v if v >= 0.0 => Ok(Duration::from_secs_f64(v)),
Expand All @@ -146,7 +146,7 @@ impl FromRow<'_, PgRow> for ActiveJob {
})?;
Ok(ActiveJob {
id,
name,
queue_name,
data,
expire_in,
})
Expand All @@ -163,16 +163,9 @@ impl Job {
/// A builder for a job.
#[derive(Debug, Clone, Default)]
pub struct JobBuilder {
/// ID to assign to this job.
pub(crate) id: Option<Uuid>,

/// Job's name.
pub(crate) name: String,

/// Job's payload.
pub(crate) queue_name: String,
pub(crate) data: serde_json::Value,

/// Options specific to this job.
pub(crate) opts: JobOptions,
}

Expand All @@ -183,12 +176,12 @@ impl JobBuilder {
self
}

/// Job's name.
pub fn name<S>(mut self, value: S) -> Self
/// Name of the queue to put this job onto.
pub fn queue_name<S>(mut self, value: S) -> Self
where
S: Into<String>,
{
self.name = value.into();
self.queue_name = value.into();
self
}

Expand Down Expand Up @@ -257,7 +250,7 @@ impl JobBuilder {
pub fn build(self) -> Job {
Job {
id: self.id,
name: self.name,
queue_name: self.queue_name,
data: self.data,
opts: self.opts,
}
Expand Down
2 changes: 1 addition & 1 deletion src/sql/dml.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ pub(crate) fn delete_jobs(schema: &str) -> String {
format!(
r#"
WITH results as (
DELETE FROM ${schema}.job
DELETE FROM {schema}.job
WHERE name = $1 AND id IN (SELECT UNNEST($2::uuid[]))
RETURNING 1
)
Expand Down
31 changes: 31 additions & 0 deletions tests/e2e/job_delete.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
use crate::utils;
use pgboss::Client;
use uuid::Uuid;

#[tokio::test]
async fn delete_job_queue_does_not_exist() {
let local = "delete_job_queue_does_not_exist";
utils::drop_schema(&local).await.unwrap();

let c = Client::builder().schema(local).connect().await.unwrap();

let job_id = Uuid::new_v4();
let queue_name = "jobtype";

let _ = c.delete_job(queue_name, job_id).await.unwrap();
}

#[tokio::test]
async fn delete_job() {
let local = "fetch_job";
utils::drop_schema(&local).await.unwrap();

let c = Client::builder().schema(local).connect().await.unwrap();
c.create_standard_queue("jobtype").await.unwrap();

let job_id = Uuid::new_v4();
let queue_name = "jobtype";

// fetch one
let _ = c.delete_job(queue_name, job_id).await.unwrap();
}
20 changes: 10 additions & 10 deletions tests/e2e/job_fetch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ async fn fetch_one_job() {

// prepare jobs
let job1 = Job::builder()
.name("jobtype")
.queue_name("jobtype")
.data(json!({"key": "value1"}))
.priority(10) // should be fetched THIRD
.dead_letter("jobtype_dead_letter_queue")
Expand All @@ -30,7 +30,7 @@ async fn fetch_one_job() {
.build();

let job2 = Job::builder()
.name("jobtype")
.queue_name("jobtype")
.data(json!({"key": "value2"}))
.priority(20) // should be fetched FIRST
.dead_letter("jobtype_dead_letter_queue")
Expand All @@ -41,7 +41,7 @@ async fn fetch_one_job() {
.build();

let job3 = Job::builder()
.name("jobtype")
.queue_name("jobtype")
.data(json!({"key": "value3"}))
.priority(15) // should be fetched SECOND
.dead_letter("jobtype_dead_letter_queue")
Expand All @@ -63,7 +63,7 @@ async fn fetch_one_job() {
.expect("no error")
.expect("a job");

assert_eq!(job.name, "jobtype");
assert_eq!(job.queue_name, "jobtype");
assert_eq!(job.data, json!({"key": "value2"}));
assert_eq!(job.expire_in, Duration::from_secs(60 * 15)); // default

Expand All @@ -74,7 +74,7 @@ async fn fetch_one_job() {
.expect("no error")
.expect("a job");

assert_eq!(job.name, "jobtype");
assert_eq!(job.queue_name, "jobtype");
assert_eq!(job.data, json!({"key": "value3"}));
assert_eq!(job.expire_in, Duration::from_secs(60 * 15)); // default

Expand All @@ -85,7 +85,7 @@ async fn fetch_one_job() {
.expect("no error")
.expect("a job");

assert_eq!(job.name, "jobtype");
assert_eq!(job.queue_name, "jobtype");
assert_eq!(job.data, json!({"key": "value1"}));
assert_eq!(job.expire_in, Duration::from_secs(30)); // our override

Expand All @@ -105,22 +105,22 @@ async fn fetch_many_jobs() {
let job1_id = Uuid::new_v4();
let job1 = Job::builder()
.id(job1_id)
.name("jobtype")
.queue_name("jobtype")
.data(json!({"key": "value1", "priority": 1}))
.priority(1)
.build();

let job2_id = Uuid::new_v4();
let job2 = Job::builder()
.id(job2_id)
.name("jobtype")
.queue_name("jobtype")
.data(json!({"key": "value2", "priority": 0}))
.build();

let job3_id = Uuid::new_v4();
let job3 = Job::builder()
.id(job3_id)
.name("jobtype")
.queue_name("jobtype")
.data(json!({"key": "value3", "priority": 10}))
.priority(10)
.build();
Expand All @@ -142,7 +142,7 @@ async fn fetch_many_jobs() {
.expect("no error")
.expect("a job");

assert_eq!(job.name, "jobtype");
assert_eq!(job.queue_name, "jobtype");
assert_eq!(job.data, json!({"key": "value2", "priority": 0}));

// queue has been drained!
Expand Down
14 changes: 7 additions & 7 deletions tests/e2e/job_send.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ async fn send_job() {

let c = Client::builder().schema(local).connect().await.unwrap();
c.create_standard_queue("jobtype").await.unwrap();
let job = Job::builder().name("jobtype").build();
let job = Job::builder().queue_name("jobtype").build();
let _id = c.send_job(&job).await.expect("no error");
}

Expand All @@ -24,11 +24,11 @@ async fn send_job_with_id() {
c.create_standard_queue("jobtype").await.unwrap();

let id = uuid::Uuid::new_v4();
let job = Job::builder().name("jobtype").id(id).build();
let job = Job::builder().queue_name("jobtype").id(id).build();
let inserted_id = c.send_job(&job).await.expect("no error");
assert_eq!(inserted_id, id);

let job = Job::builder().name("jobtype").id(id).build();
let job = Job::builder().queue_name("jobtype").id(id).build();
let err = c.send_job(&job).await.unwrap_err();
if let Error::Conflict { msg } = err {
assert_eq!(msg, "job with this id already exists");
Expand All @@ -50,7 +50,7 @@ async fn send_job_with_dead_letter() {

let id = uuid::Uuid::new_v4();
let job = Job::builder()
.name("jobtype")
.queue_name("jobtype")
.id(id)
.dead_letter("jobtype_dead_letter_queue")
.build();
Expand All @@ -69,7 +69,7 @@ async fn send_job_with_dead_letter_does_not_exist() {

let id = uuid::Uuid::new_v4();
let job = Job::builder()
.name("jobtype")
.queue_name("jobtype")
.id(id)
.dead_letter("jobtype_dead_letter")
.build();
Expand All @@ -87,7 +87,7 @@ async fn send_job_queue_does_not_exist() {
utils::drop_schema(&local).await.unwrap();

let c = Client::builder().schema(local).connect().await.unwrap();
let job = Job::builder().name("jobtype").build();
let job = Job::builder().queue_name("jobtype").build();
if let Error::Unprocessable { msg } = c.send_job(&job).await.unwrap_err() {
assert!(msg.contains("queue does not exist"))
} else {
Expand Down Expand Up @@ -139,7 +139,7 @@ async fn send_job_fully_customized() {

let job = Job::builder()
.id(id)
.name("jobtype")
.queue_name("jobtype")
.data(json!({"key": "value"}))
.priority(10)
.dead_letter("jobtype_dead_letter_queue")
Expand Down
1 change: 1 addition & 0 deletions tests/e2e/main.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
mod job_delete;
mod job_fetch;
mod job_send;
mod queue;
Expand Down

0 comments on commit 559921d

Please sign in to comment.