From 559921d55721b074ed720002e25ad33fdb10c5c4 Mon Sep 17 00:00:00 2001 From: Pavel Mikhalkevich Date: Sat, 7 Sep 2024 13:51:44 +0400 Subject: [PATCH] Use Job::queue_name --- src/client/mod.rs | 2 ++ src/client/public/job_ops.rs | 16 +++++++++++++++- src/job.rs | 29 +++++++++++------------------ src/sql/dml.rs | 2 +- tests/e2e/job_delete.rs | 31 +++++++++++++++++++++++++++++++ tests/e2e/job_fetch.rs | 20 ++++++++++---------- tests/e2e/job_send.rs | 14 +++++++------- tests/e2e/main.rs | 1 + 8 files changed, 78 insertions(+), 37 deletions(-) create mode 100644 tests/e2e/job_delete.rs diff --git a/src/client/mod.rs b/src/client/mod.rs index 389cc6e..c18e506 100644 --- a/src/client/mod.rs +++ b/src/client/mod.rs @@ -10,6 +10,7 @@ pub use builder::ClientBuilder; #[derive(Debug, Clone)] struct Statements { fetch_jobs: String, + delete_jobs: String, create_job: String, create_queue: String, get_queue: String, @@ -21,6 +22,7 @@ impl Statements { fn for_schema(name: &str) -> Statements { Statements { fetch_jobs: sql::dml::fetch_jobs(name), + delete_jobs: sql::dml::delete_jobs(name), create_job: sql::proc::create_job(name), create_queue: sql::proc::create_queue(name), get_queue: sql::dml::get_queue(name), diff --git a/src/client/public/job_ops.rs b/src/client/public/job_ops.rs index 2ff262d..a6cda9b 100644 --- a/src/client/public/job_ops.rs +++ b/src/client/public/job_ops.rs @@ -15,7 +15,7 @@ impl Client { let job = job.borrow(); let id: Option = sqlx::query_scalar(&self.stmt.create_job) .bind(job.id) - .bind(&job.name) + .bind(&job.queue_name) .bind(Json(&job.data)) .bind(Json(&job.opts)) .fetch_one(&self.pool) @@ -89,4 +89,18 @@ impl Client { .await?; Ok(maybe_job) } + + /// Delete a job from a queue. + pub async fn delete_job(&self, queue_name: Q, job_id: Uuid) -> Result<(), Error> + where + Q: AsRef, + { + let deleted_count: (i64,) = sqlx::query_as(&self.stmt.delete_jobs) + .bind(queue_name.as_ref()) + .bind([job_id]) + .fetch_one(&self.pool) + .await?; + println!("{:?}", deleted_count); + Ok(()) + } } diff --git a/src/job.rs b/src/job.rs index 9e106ac..9f02bf7 100644 --- a/src/job.rs +++ b/src/job.rs @@ -99,8 +99,8 @@ pub struct Job { #[serde(skip_serializing_if = "Option::is_none")] pub id: Option, - /// Job's name. - pub name: String, + /// Name of the queue to put this job onto. + pub queue_name: String, /// Job's payload. pub data: serde_json::Value, @@ -119,8 +119,8 @@ pub struct ActiveJob { /// ID of this job. pub id: Uuid, - /// Job's name. - pub name: String, + /// Name of the queue this job was fetched from. + pub queue_name: String, /// Job's payload. pub data: serde_json::Value, @@ -135,7 +135,7 @@ pub struct ActiveJob { impl FromRow<'_, PgRow> for ActiveJob { fn from_row(row: &PgRow) -> sqlx::Result { let id: Uuid = row.try_get("id")?; - let name: String = row.try_get("name")?; + let queue_name: String = row.try_get("name")?; let data: serde_json::Value = row.try_get("data")?; let expire_in: Duration = row.try_get("expire_in").and_then(|v: f64| match v { v if v >= 0.0 => Ok(Duration::from_secs_f64(v)), @@ -146,7 +146,7 @@ impl FromRow<'_, PgRow> for ActiveJob { })?; Ok(ActiveJob { id, - name, + queue_name, data, expire_in, }) @@ -163,16 +163,9 @@ impl Job { /// A builder for a job. #[derive(Debug, Clone, Default)] pub struct JobBuilder { - /// ID to assign to this job. pub(crate) id: Option, - - /// Job's name. - pub(crate) name: String, - - /// Job's payload. + pub(crate) queue_name: String, pub(crate) data: serde_json::Value, - - /// Options specific to this job. pub(crate) opts: JobOptions, } @@ -183,12 +176,12 @@ impl JobBuilder { self } - /// Job's name. - pub fn name(mut self, value: S) -> Self + /// Name of the queue to put this job onto. + pub fn queue_name(mut self, value: S) -> Self where S: Into, { - self.name = value.into(); + self.queue_name = value.into(); self } @@ -257,7 +250,7 @@ impl JobBuilder { pub fn build(self) -> Job { Job { id: self.id, - name: self.name, + queue_name: self.queue_name, data: self.data, opts: self.opts, } diff --git a/src/sql/dml.rs b/src/sql/dml.rs index 03726a1..0bf48e8 100644 --- a/src/sql/dml.rs +++ b/src/sql/dml.rs @@ -84,7 +84,7 @@ pub(crate) fn delete_jobs(schema: &str) -> String { format!( r#" WITH results as ( - DELETE FROM ${schema}.job + DELETE FROM {schema}.job WHERE name = $1 AND id IN (SELECT UNNEST($2::uuid[])) RETURNING 1 ) diff --git a/tests/e2e/job_delete.rs b/tests/e2e/job_delete.rs new file mode 100644 index 0000000..f9aa76c --- /dev/null +++ b/tests/e2e/job_delete.rs @@ -0,0 +1,31 @@ +use crate::utils; +use pgboss::Client; +use uuid::Uuid; + +#[tokio::test] +async fn delete_job_queue_does_not_exist() { + let local = "delete_job_queue_does_not_exist"; + utils::drop_schema(&local).await.unwrap(); + + let c = Client::builder().schema(local).connect().await.unwrap(); + + let job_id = Uuid::new_v4(); + let queue_name = "jobtype"; + + let _ = c.delete_job(queue_name, job_id).await.unwrap(); +} + +#[tokio::test] +async fn delete_job() { + let local = "fetch_job"; + utils::drop_schema(&local).await.unwrap(); + + let c = Client::builder().schema(local).connect().await.unwrap(); + c.create_standard_queue("jobtype").await.unwrap(); + + let job_id = Uuid::new_v4(); + let queue_name = "jobtype"; + + // fetch one + let _ = c.delete_job(queue_name, job_id).await.unwrap(); +} diff --git a/tests/e2e/job_fetch.rs b/tests/e2e/job_fetch.rs index 5723812..eb14c6c 100644 --- a/tests/e2e/job_fetch.rs +++ b/tests/e2e/job_fetch.rs @@ -18,7 +18,7 @@ async fn fetch_one_job() { // prepare jobs let job1 = Job::builder() - .name("jobtype") + .queue_name("jobtype") .data(json!({"key": "value1"})) .priority(10) // should be fetched THIRD .dead_letter("jobtype_dead_letter_queue") @@ -30,7 +30,7 @@ async fn fetch_one_job() { .build(); let job2 = Job::builder() - .name("jobtype") + .queue_name("jobtype") .data(json!({"key": "value2"})) .priority(20) // should be fetched FIRST .dead_letter("jobtype_dead_letter_queue") @@ -41,7 +41,7 @@ async fn fetch_one_job() { .build(); let job3 = Job::builder() - .name("jobtype") + .queue_name("jobtype") .data(json!({"key": "value3"})) .priority(15) // should be fetched SECOND .dead_letter("jobtype_dead_letter_queue") @@ -63,7 +63,7 @@ async fn fetch_one_job() { .expect("no error") .expect("a job"); - assert_eq!(job.name, "jobtype"); + assert_eq!(job.queue_name, "jobtype"); assert_eq!(job.data, json!({"key": "value2"})); assert_eq!(job.expire_in, Duration::from_secs(60 * 15)); // default @@ -74,7 +74,7 @@ async fn fetch_one_job() { .expect("no error") .expect("a job"); - assert_eq!(job.name, "jobtype"); + assert_eq!(job.queue_name, "jobtype"); assert_eq!(job.data, json!({"key": "value3"})); assert_eq!(job.expire_in, Duration::from_secs(60 * 15)); // default @@ -85,7 +85,7 @@ async fn fetch_one_job() { .expect("no error") .expect("a job"); - assert_eq!(job.name, "jobtype"); + assert_eq!(job.queue_name, "jobtype"); assert_eq!(job.data, json!({"key": "value1"})); assert_eq!(job.expire_in, Duration::from_secs(30)); // our override @@ -105,7 +105,7 @@ async fn fetch_many_jobs() { let job1_id = Uuid::new_v4(); let job1 = Job::builder() .id(job1_id) - .name("jobtype") + .queue_name("jobtype") .data(json!({"key": "value1", "priority": 1})) .priority(1) .build(); @@ -113,14 +113,14 @@ async fn fetch_many_jobs() { let job2_id = Uuid::new_v4(); let job2 = Job::builder() .id(job2_id) - .name("jobtype") + .queue_name("jobtype") .data(json!({"key": "value2", "priority": 0})) .build(); let job3_id = Uuid::new_v4(); let job3 = Job::builder() .id(job3_id) - .name("jobtype") + .queue_name("jobtype") .data(json!({"key": "value3", "priority": 10})) .priority(10) .build(); @@ -142,7 +142,7 @@ async fn fetch_many_jobs() { .expect("no error") .expect("a job"); - assert_eq!(job.name, "jobtype"); + assert_eq!(job.queue_name, "jobtype"); assert_eq!(job.data, json!({"key": "value2", "priority": 0})); // queue has been drained! diff --git a/tests/e2e/job_send.rs b/tests/e2e/job_send.rs index 244a098..56e6968 100644 --- a/tests/e2e/job_send.rs +++ b/tests/e2e/job_send.rs @@ -11,7 +11,7 @@ async fn send_job() { let c = Client::builder().schema(local).connect().await.unwrap(); c.create_standard_queue("jobtype").await.unwrap(); - let job = Job::builder().name("jobtype").build(); + let job = Job::builder().queue_name("jobtype").build(); let _id = c.send_job(&job).await.expect("no error"); } @@ -24,11 +24,11 @@ async fn send_job_with_id() { c.create_standard_queue("jobtype").await.unwrap(); let id = uuid::Uuid::new_v4(); - let job = Job::builder().name("jobtype").id(id).build(); + let job = Job::builder().queue_name("jobtype").id(id).build(); let inserted_id = c.send_job(&job).await.expect("no error"); assert_eq!(inserted_id, id); - let job = Job::builder().name("jobtype").id(id).build(); + let job = Job::builder().queue_name("jobtype").id(id).build(); let err = c.send_job(&job).await.unwrap_err(); if let Error::Conflict { msg } = err { assert_eq!(msg, "job with this id already exists"); @@ -50,7 +50,7 @@ async fn send_job_with_dead_letter() { let id = uuid::Uuid::new_v4(); let job = Job::builder() - .name("jobtype") + .queue_name("jobtype") .id(id) .dead_letter("jobtype_dead_letter_queue") .build(); @@ -69,7 +69,7 @@ async fn send_job_with_dead_letter_does_not_exist() { let id = uuid::Uuid::new_v4(); let job = Job::builder() - .name("jobtype") + .queue_name("jobtype") .id(id) .dead_letter("jobtype_dead_letter") .build(); @@ -87,7 +87,7 @@ async fn send_job_queue_does_not_exist() { utils::drop_schema(&local).await.unwrap(); let c = Client::builder().schema(local).connect().await.unwrap(); - let job = Job::builder().name("jobtype").build(); + let job = Job::builder().queue_name("jobtype").build(); if let Error::Unprocessable { msg } = c.send_job(&job).await.unwrap_err() { assert!(msg.contains("queue does not exist")) } else { @@ -139,7 +139,7 @@ async fn send_job_fully_customized() { let job = Job::builder() .id(id) - .name("jobtype") + .queue_name("jobtype") .data(json!({"key": "value"})) .priority(10) .dead_letter("jobtype_dead_letter_queue") diff --git a/tests/e2e/main.rs b/tests/e2e/main.rs index b5db55e..82d3902 100644 --- a/tests/e2e/main.rs +++ b/tests/e2e/main.rs @@ -1,3 +1,4 @@ +mod job_delete; mod job_fetch; mod job_send; mod queue;