From 19f70601f89eef6929911350cd4f9fbc1cdbf397 Mon Sep 17 00:00:00 2001 From: Pavel Mikhalkevich Date: Sat, 7 Sep 2024 15:05:33 +0400 Subject: [PATCH] Add Client::delete_jobs --- src/client/public/job_ops.rs | 22 ++++++++++- tests/e2e/job_delete.rs | 76 ++++++++++++++++++++++++++++++++++++ 2 files changed, 96 insertions(+), 2 deletions(-) diff --git a/src/client/public/job_ops.rs b/src/client/public/job_ops.rs index 1b13bc1..d3168c4 100644 --- a/src/client/public/job_ops.rs +++ b/src/client/public/job_ops.rs @@ -91,15 +91,33 @@ impl Client { } /// Delete a job from a queue. + /// + /// In a happy path, returns `false`, if the specified queue or the job with this ID does not exist, + /// otherwise returns `true`. + /// + /// To delete numerous jobs from a queue, use [Client::delete_jobs]. pub async fn delete_job(&self, queue_name: Q, job_id: Uuid) -> Result where Q: AsRef, + { + let deleted_count = self.delete_jobs(queue_name, [job_id]).await?; + Ok(deleted_count == 1) + } + + /// Delete numerous jobs from a queue. + /// + /// In a happy path, returns the number of deleted records, where `0` means + /// the specified queue if the jobs with these ids do not exist. + pub async fn delete_jobs(&self, queue_name: Q, job_ids: J) -> Result + where + Q: AsRef, + J: IntoIterator, { let deleted_count: (i64,) = sqlx::query_as(&self.stmt.delete_jobs) .bind(queue_name.as_ref()) - .bind([job_id]) + .bind(job_ids.into_iter().collect::>()) .fetch_one(&self.pool) .await?; - Ok(deleted_count.0 == 1) + Ok(deleted_count.0 as usize) } } diff --git a/tests/e2e/job_delete.rs b/tests/e2e/job_delete.rs index e175e40..0623ce2 100644 --- a/tests/e2e/job_delete.rs +++ b/tests/e2e/job_delete.rs @@ -50,3 +50,79 @@ async fn delete_job() { assert!(c.fetch_job(queue_name).await.unwrap().is_none()); } + +#[tokio::test] +async fn delete_jobs() { + let local = "delete_jobs"; + utils::drop_schema(&local).await.unwrap(); + + let c = Client::builder().schema(local).connect().await.unwrap(); + c.create_standard_queue("jobtype").await.unwrap(); + + let queue_name = "jobtype"; + + let job1_id = Uuid::new_v4(); + let job1 = Job::builder() + .id(job1_id) + .queue_name(queue_name) + .priority(50) + .build(); + + let job2_id = Uuid::new_v4(); + let job2 = Job::builder() + .id(job2_id) + .queue_name(queue_name) + .priority(51) + .build(); + + let job3_id = Uuid::new_v4(); + let job3 = Job::builder() + .id(job3_id) + .queue_name(queue_name) + .priority(0) + .build(); + + c.send_job(job1).await.unwrap(); + c.send_job(job2).await.unwrap(); + c.send_job(job3).await.unwrap(); + + // deleting the high priority jobs + let deleted_count = c.delete_jobs(queue_name, [job1_id, job2_id]).await.unwrap(); + + assert_eq!(deleted_count, 2); + + let j = c + .fetch_job(queue_name) + .await + .unwrap() + .expect("the only remaining job"); + assert_eq!(j.id, job3_id); + + assert!(c.fetch_job(queue_name).await.unwrap().is_none()); + + let deleted = c.delete_job(queue_name, j.id).await.unwrap(); + assert!(deleted); +} + +#[tokio::test] +async fn delete_jobs_queue_does_not_exist() { + let local = "delete_jobs_queue_does_not_exist"; + utils::drop_schema(&local).await.unwrap(); + + let c = Client::builder().schema(local).connect().await.unwrap(); + + let deleted = c.delete_jobs("jobtype", [Uuid::new_v4()]).await.unwrap(); + assert_eq!(deleted, 0) +} + +#[tokio::test] +async fn delete_jobs_do_not_exist() { + let local = "delete_jobs_do_not_exist"; + utils::drop_schema(&local).await.unwrap(); + + let c = Client::builder().schema(local).connect().await.unwrap(); + c.create_standard_queue("jobtype").await.unwrap(); + + let deleted = c.delete_jobs("jobtype", [Uuid::new_v4()]).await.unwrap(); + assert_eq!(deleted, 0); +}