Skip to content

Commit

Permalink
Merge pull request #1 from gistia/malvim/fix/use-atomic-operations
Browse files Browse the repository at this point in the history
fix: updated to use atomic operations
  • Loading branch information
fcoury authored Mar 26, 2024
2 parents 14b4d1c + 2abd0aa commit 2b09685
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 33 deletions.
4 changes: 2 additions & 2 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -180,14 +180,14 @@ mod test {

// Now poll_next should return this job to us
let job1 = queue.poll_next(&[TestJob1::name()]).await.unwrap().unwrap();
assert_eq!(job1.retries(), 0);
assert_eq!(job1.retries(), 1);
// Fail the job
job1.fail().await.unwrap();

// We should be able to get the same job again, but it should have increased retry count

let job1 = queue.poll_next(&[TestJob1::name()]).await.unwrap().unwrap();
assert_eq!(job1.retries(), 1);
assert_eq!(job1.retries(), 2);
}

#[tokio::test]
Expand Down
38 changes: 16 additions & 22 deletions src/queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@ use bincode::Decode;
use bson::{doc, Binary};
use chrono::Utc;
use mongodb::{
options::{ClientOptions, ConnectionString, FindOneOptions, Tls, TlsOptions, UpdateOptions},
options::{
ClientOptions, ConnectionString, FindOneAndUpdateOptions, ReturnDocument, Tls, TlsOptions,
},
Client, Collection, Database,
};
use tracing::instrument;
Expand Down Expand Up @@ -128,29 +130,27 @@ impl Queue for MongoDbQueue {
"job_type": job_types_doc
};

let update_doc = doc! {
"$set": { "started_at": bson::DateTime::from_millis(Utc::now().timestamp_millis()) },
"$inc": { "retries": 1 }
};

let sort_doc = doc! {
"priority": -1
};

let find_options = FindOneOptions::builder().sort(sort_doc).build();
let options = FindOneAndUpdateOptions::builder()
.sort(sort_doc)
.return_document(ReturnDocument::After)
.build();

let row = self
.collection()
.find_one(filter_doc, find_options)
.find_one_and_update(filter_doc, update_doc, options)
.await
.context("Failed to check out a job from the queue")?;

if let Some(row) = row {
let update_doc = doc! {
"$set": { "started_at": bson::DateTime::from_millis(Utc::now().timestamp_millis()) },
"$inc": { "retries": 1 }
};
let update_options = UpdateOptions::builder().build();

self.collection()
.update_one(doc! { "jid": &row.jid }, update_doc, update_options)
.await
.context("Failed to update job")?;

Ok(Some(MongoDbJobHandle::new(row, self.database.clone())))
} else {
Ok(None)
Expand Down Expand Up @@ -194,18 +194,12 @@ impl Queue for MongoDbQueue {

let row = self
.collection()
.find_one(filter_doc.clone(), None)
.find_one_and_delete(filter_doc, None)
.await
.context("Failed to find job in the queue")?;
.context("Failed to remove job from the queue")?;

match row {
Some(row) => {
let _ = self
.collection()
.delete_one(filter_doc, None)
.await
.context("Failed to remove job from the queue")?;

let payload: Vec<u8> = row.payload.bytes;
let (decoded, _) = bincode::decode_from_slice(&payload, self.bincode_config)?;
Ok(decoded)
Expand Down
9 changes: 0 additions & 9 deletions terraform.tfstate

This file was deleted.

0 comments on commit 2b09685

Please sign in to comment.