Skip to content

Commit

Permalink
Add Client::delete_jobs
Browse files Browse the repository at this point in the history
  • Loading branch information
rustworthy committed Sep 7, 2024
1 parent 6d2b772 commit 19f7060
Show file tree
Hide file tree
Showing 2 changed files with 96 additions and 2 deletions.
22 changes: 20 additions & 2 deletions src/client/public/job_ops.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,15 +91,33 @@ impl Client {
}

/// Delete a job from a queue.
///
/// In a happy path, returns `false`, if the specified queue or the job with this ID does not exist,
/// otherwise returns `true`.
///
/// To delete numerous jobs from a queue, use [Client::delete_jobs].
pub async fn delete_job<Q>(&self, queue_name: Q, job_id: Uuid) -> Result<bool, Error>
where
Q: AsRef<str>,
{
let deleted_count = self.delete_jobs(queue_name, [job_id]).await?;
Ok(deleted_count == 1)
}

/// Delete numerous jobs from a queue.
///
/// In a happy path, returns the number of deleted records, where `0` means
/// the specified queue if the jobs with these ids do not exist.
pub async fn delete_jobs<Q, J>(&self, queue_name: Q, job_ids: J) -> Result<usize, Error>
where
Q: AsRef<str>,
J: IntoIterator<Item = Uuid>,
{
let deleted_count: (i64,) = sqlx::query_as(&self.stmt.delete_jobs)
.bind(queue_name.as_ref())
.bind([job_id])
.bind(job_ids.into_iter().collect::<Vec<Uuid>>())
.fetch_one(&self.pool)
.await?;
Ok(deleted_count.0 == 1)
Ok(deleted_count.0 as usize)
}
}
76 changes: 76 additions & 0 deletions tests/e2e/job_delete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,3 +50,79 @@ async fn delete_job() {

assert!(c.fetch_job(queue_name).await.unwrap().is_none());
}

#[tokio::test]
async fn delete_jobs() {
let local = "delete_jobs";
utils::drop_schema(&local).await.unwrap();

let c = Client::builder().schema(local).connect().await.unwrap();
c.create_standard_queue("jobtype").await.unwrap();

let queue_name = "jobtype";

let job1_id = Uuid::new_v4();
let job1 = Job::builder()
.id(job1_id)
.queue_name(queue_name)
.priority(50)
.build();

let job2_id = Uuid::new_v4();
let job2 = Job::builder()
.id(job2_id)
.queue_name(queue_name)
.priority(51)
.build();

let job3_id = Uuid::new_v4();
let job3 = Job::builder()
.id(job3_id)
.queue_name(queue_name)
.priority(0)
.build();

c.send_job(job1).await.unwrap();
c.send_job(job2).await.unwrap();
c.send_job(job3).await.unwrap();

// deleting the high priority jobs
let deleted_count = c.delete_jobs(queue_name, [job1_id, job2_id]).await.unwrap();

assert_eq!(deleted_count, 2);

let j = c
.fetch_job(queue_name)
.await
.unwrap()
.expect("the only remaining job");
assert_eq!(j.id, job3_id);

assert!(c.fetch_job(queue_name).await.unwrap().is_none());

let deleted = c.delete_job(queue_name, j.id).await.unwrap();
assert!(deleted);
}

#[tokio::test]
async fn delete_jobs_queue_does_not_exist() {
let local = "delete_jobs_queue_does_not_exist";
utils::drop_schema(&local).await.unwrap();

let c = Client::builder().schema(local).connect().await.unwrap();

let deleted = c.delete_jobs("jobtype", [Uuid::new_v4()]).await.unwrap();
assert_eq!(deleted, 0)
}

#[tokio::test]
async fn delete_jobs_do_not_exist() {
let local = "delete_jobs_do_not_exist";
utils::drop_schema(&local).await.unwrap();

let c = Client::builder().schema(local).connect().await.unwrap();
c.create_standard_queue("jobtype").await.unwrap();

let deleted = c.delete_jobs("jobtype", [Uuid::new_v4()]).await.unwrap();
assert_eq!(deleted, 0);
}

0 comments on commit 19f7060

Please sign in to comment.