From 8b0b69572a634c0b00a38c086b69c8d98a210aa3 Mon Sep 17 00:00:00 2001 From: Pavel Mikhalkevich Date: Thu, 3 Oct 2024 00:11:28 +0400 Subject: [PATCH] Add Job::dead_letter --- src/job.rs | 31 ++++++++++++++------ src/sql/dml.rs | 8 ++++-- tests/e2e/job_send.rs | 66 ++++++++++++++++++++++++++++++++++++++++++- 3 files changed, 92 insertions(+), 13 deletions(-) diff --git a/src/job.rs b/src/job.rs index 1591603..8a32fdc 100644 --- a/src/job.rs +++ b/src/job.rs @@ -211,7 +211,10 @@ pub struct JobDetails { pub state: JobState, /// [Policy](QueuePolicy) applied to this job. - pub policy: QueuePolicy, + /// + /// This will be `None` for jobs consumed from dead + /// letter queues. + pub policy: Option, /// Job's priority. pub priority: usize, @@ -252,12 +255,16 @@ pub struct JobDetails { /// When this job was completed. pub completed_at: Option>, + + /// Name of the dead letter queue for this job, if any. + pub dead_letter: Option, } impl FromRow<'_, PgRow> for JobDetails { fn from_row(row: &PgRow) -> sqlx::Result { let id: Uuid = row.try_get("id")?; let queue_name: String = row.try_get("name")?; + let dead_letter: Option = row.try_get("dead_letter")?; let data: serde_json::Value = row.try_get("data")?; let expire_in = row.try_get("expire_in").and_then(|v: f64| match v { v if v >= 0.0 => Ok(Duration::from_secs_f64(v)), @@ -266,12 +273,18 @@ impl FromRow<'_, PgRow> for JobDetails { source: "'expire_in' should be non-negative".into(), }), })?; - let policy = row.try_get("policy").and_then(|v: String| { - QueuePolicy::try_from(v).map_err(|e| sqlx::Error::ColumnDecode { - index: "policy".to_string(), - source: e.into(), - }) - })?; + let policy = row + .try_get("policy") + .and_then(|v: Option| match v { + None => Ok(None), + Some(v) => match QueuePolicy::try_from(v) { + Err(e) => Err(sqlx::Error::ColumnDecode { + index: "policy".to_string(), + source: e.into(), + }), + Ok(v) => Ok(Some(v)), + }, + })?; let priority = row.try_get("priority").and_then(|v: i32| match v { v if v >= 0 => Ok(v as usize), v => Err(sqlx::Error::ColumnDecode { @@ -323,6 +336,7 @@ impl FromRow<'_, PgRow> for JobDetails { Ok(JobDetails { id, queue_name, + dead_letter, data, expire_in, policy, @@ -414,8 +428,7 @@ impl<'a> JobBuilder<'a> { self } - /// Number of retry attempts. - + /// Maximum number of retry attempts. pub fn retry_limit(mut self, value: usize) -> Self { self.retry_limit = Some(value); self diff --git a/src/sql/dml.rs b/src/sql/dml.rs index 80664fb..6192991 100644 --- a/src/sql/dml.rs +++ b/src/sql/dml.rs @@ -94,7 +94,8 @@ pub(crate) fn fetch_jobs(schema: &str) -> String { started_on as started_at, singleton_on as singleton_at, completed_on as completed_at, - singleton_key; + singleton_key, + dead_letter; "# ) } @@ -262,7 +263,7 @@ pub(crate) fn complete_jobs(schema: &str) -> String { JobState::Completed, // 1 ) } -// + + + + + + + + + + + + + + + + + +// + + + + + + + + + + + + + + + + + + // id | name | priority | data | state | retry_limit | retry_count | retry_delay | retry_backoff | start_after | started_on | singleton_key | singleton_on | expire_in | created_on | completed_on | keep_until | output | dead_letter | policy // --------------------------------------+---------+----------+------+-----------+-------------+-------------+-------------+---------------+-------------------------------+-------------------------------+---------------+--------------+-----------+-------------------------------+-------------------------------+-------------------------------+------------------------+-------------+---------- // 71c7e215-0528-417c-951b-fc01b3fac4b3 | jobtype | 0 | null | completed | 0 | 0 | 0 | f | 2024-09-29 09:23:09.502695+00 | 2024-09-29 09:23:09.514796+00 | | | 00:15:00 | 2024-09-29 09:23:09.502695+00 | 2024-09-29 09:23:09.526609+00 | 2024-10-13 09:23:09.502695+00 | {"result": "success!"} | | standard @@ -286,7 +287,8 @@ pub(crate) fn get_job_info(schema: &str) -> String { started_on as started_at, singleton_on as singleton_at, completed_on as completed_at, - singleton_key + singleton_key, + dead_letter FROM {schema}.job WHERE name = $1 and id = $2; "#, diff --git a/tests/e2e/job_send.rs b/tests/e2e/job_send.rs index 46d7a74..e5ca32b 100644 --- a/tests/e2e/job_send.rs +++ b/tests/e2e/job_send.rs @@ -166,7 +166,7 @@ async fn send_job_fully_customized() { assert_eq!(job_info.data, job.data); assert_eq!(job_info.id, job.id.unwrap()); - assert_eq!(job_info.policy, QueuePolicy::Standard); + assert_eq!(job_info.policy.unwrap(), QueuePolicy::Standard); // Important bit, we have not _consumed_, rather just got it's details, // so the state is not 'active' rather still 'created'. assert_eq!(job_info.state, JobState::Created); @@ -228,3 +228,67 @@ async fn send_jobs_throttled() { .expect("queued this time and ID issued"); assert_ne!(id1, id2); } + +#[tokio::test] +async fn send_job_dlq_named_as_main_queue() { + let local = "send_job_dlq_named_as_main_queue"; + utils::drop_schema(&local).await.unwrap(); + + let c = Client::builder().schema(local).connect().await.unwrap(); + c.create_standard_queue("jobtype").await.unwrap(); + c.create_standard_queue("jobtype_dlq").await.unwrap(); + + let job1 = Job::builder() + .retry_limit(0) + .queue_name("jobtype") + .dead_letter("jobtype") + .build(); + + // but failed job where queue name == dlq name will not get into dlq + let id1 = c.send_job(&job1).await.expect("no error"); + + let fetched_job_1 = c.fetch_job("jobtype").await.unwrap().unwrap(); + assert_eq!(fetched_job_1.id, id1); + + // let's fail job1 + let ok = c + .fail_job( + "jobtype", + fetched_job_1.id, + json!({"details": "testing..."}), + ) + .await + .unwrap(); + assert!(ok); + + assert!(c.fetch_job("jobtype_dlq").await.unwrap().is_none()); + + let job2 = Job::builder() + .retry_limit(0) + .queue_name("jobtype") + .dead_letter("jobtype_dlq") + .build(); + let id2 = c.send_job(&job2).await.expect("no error"); + + let fetched_job_2 = c.fetch_job("jobtype").await.unwrap().unwrap(); + assert_ne!(fetched_job_2.id, id1); + assert_eq!(fetched_job_2.id, id2); + + // let's fail job2 + let ok = c + .fail_job( + "jobtype", + fetched_job_2.id, + json!({"details": "testing again..."}), + ) + .await + .unwrap(); + assert!(ok); + + let job2_from_dlq = c.fetch_job("jobtype_dlq").await.unwrap().unwrap(); + assert_eq!(fetched_job_2.dead_letter.unwrap(), job2_from_dlq.queue_name); + assert_eq!(fetched_job_2.data, job2_from_dlq.data); + assert_eq!(fetched_job_2.retry_limit, job2_from_dlq.retry_limit); + // assert_eq!(job2.output, job2_from_dlq.output); + // assert_eq!(job2.keep_until, job2_from_dlq.keep_until); +}