Skip to content

Commit

Permalink
Add Job::dead_letter
Browse files Browse the repository at this point in the history
  • Loading branch information
rustworthy committed Oct 2, 2024
1 parent 8e368db commit 8b0b695
Show file tree
Hide file tree
Showing 3 changed files with 92 additions and 13 deletions.
31 changes: 22 additions & 9 deletions src/job.rs
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,10 @@ pub struct JobDetails {
pub state: JobState,

/// [Policy](QueuePolicy) applied to this job.
pub policy: QueuePolicy,
///
/// This will be `None` for jobs consumed from dead
/// letter queues.
pub policy: Option<QueuePolicy>,

/// Job's priority.
pub priority: usize,
Expand Down Expand Up @@ -252,12 +255,16 @@ pub struct JobDetails {

/// When this job was completed.
pub completed_at: Option<DateTime<Utc>>,

/// Name of the dead letter queue for this job, if any.
pub dead_letter: Option<String>,
}

impl FromRow<'_, PgRow> for JobDetails {
fn from_row(row: &PgRow) -> sqlx::Result<Self> {
let id: Uuid = row.try_get("id")?;
let queue_name: String = row.try_get("name")?;
let dead_letter: Option<String> = row.try_get("dead_letter")?;
let data: serde_json::Value = row.try_get("data")?;
let expire_in = row.try_get("expire_in").and_then(|v: f64| match v {
v if v >= 0.0 => Ok(Duration::from_secs_f64(v)),
Expand All @@ -266,12 +273,18 @@ impl FromRow<'_, PgRow> for JobDetails {
source: "'expire_in' should be non-negative".into(),
}),
})?;
let policy = row.try_get("policy").and_then(|v: String| {
QueuePolicy::try_from(v).map_err(|e| sqlx::Error::ColumnDecode {
index: "policy".to_string(),
source: e.into(),
})
})?;
let policy = row
.try_get("policy")
.and_then(|v: Option<String>| match v {
None => Ok(None),
Some(v) => match QueuePolicy::try_from(v) {
Err(e) => Err(sqlx::Error::ColumnDecode {
index: "policy".to_string(),
source: e.into(),
}),
Ok(v) => Ok(Some(v)),
},
})?;
let priority = row.try_get("priority").and_then(|v: i32| match v {
v if v >= 0 => Ok(v as usize),
v => Err(sqlx::Error::ColumnDecode {
Expand Down Expand Up @@ -323,6 +336,7 @@ impl FromRow<'_, PgRow> for JobDetails {
Ok(JobDetails {
id,
queue_name,
dead_letter,
data,
expire_in,
policy,
Expand Down Expand Up @@ -414,8 +428,7 @@ impl<'a> JobBuilder<'a> {
self
}

/// Number of retry attempts.

/// Maximum number of retry attempts.
pub fn retry_limit(mut self, value: usize) -> Self {
self.retry_limit = Some(value);
self
Expand Down
8 changes: 5 additions & 3 deletions src/sql/dml.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,8 @@ pub(crate) fn fetch_jobs(schema: &str) -> String {
started_on as started_at,
singleton_on as singleton_at,
completed_on as completed_at,
singleton_key;
singleton_key,
dead_letter;
"#
)
}
Expand Down Expand Up @@ -262,7 +263,7 @@ pub(crate) fn complete_jobs(schema: &str) -> String {
JobState::Completed, // 1
)
}
// + + + + + + + + + + + + + + + + +
// + + + + + + + + + + + + + + + + + +
// id | name | priority | data | state | retry_limit | retry_count | retry_delay | retry_backoff | start_after | started_on | singleton_key | singleton_on | expire_in | created_on | completed_on | keep_until | output | dead_letter | policy
// --------------------------------------+---------+----------+------+-----------+-------------+-------------+-------------+---------------+-------------------------------+-------------------------------+---------------+--------------+-----------+-------------------------------+-------------------------------+-------------------------------+------------------------+-------------+----------
// 71c7e215-0528-417c-951b-fc01b3fac4b3 | jobtype | 0 | null | completed | 0 | 0 | 0 | f | 2024-09-29 09:23:09.502695+00 | 2024-09-29 09:23:09.514796+00 | | | 00:15:00 | 2024-09-29 09:23:09.502695+00 | 2024-09-29 09:23:09.526609+00 | 2024-10-13 09:23:09.502695+00 | {"result": "success!"} | | standard
Expand All @@ -286,7 +287,8 @@ pub(crate) fn get_job_info(schema: &str) -> String {
started_on as started_at,
singleton_on as singleton_at,
completed_on as completed_at,
singleton_key
singleton_key,
dead_letter
FROM {schema}.job
WHERE name = $1 and id = $2;
"#,
Expand Down
66 changes: 65 additions & 1 deletion tests/e2e/job_send.rs
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ async fn send_job_fully_customized() {

assert_eq!(job_info.data, job.data);
assert_eq!(job_info.id, job.id.unwrap());
assert_eq!(job_info.policy, QueuePolicy::Standard);
assert_eq!(job_info.policy.unwrap(), QueuePolicy::Standard);
// Important bit, we have not _consumed_, rather just got it's details,
// so the state is not 'active' rather still 'created'.
assert_eq!(job_info.state, JobState::Created);
Expand Down Expand Up @@ -228,3 +228,67 @@ async fn send_jobs_throttled() {
.expect("queued this time and ID issued");
assert_ne!(id1, id2);
}

#[tokio::test]
async fn send_job_dlq_named_as_main_queue() {
let local = "send_job_dlq_named_as_main_queue";
utils::drop_schema(&local).await.unwrap();

let c = Client::builder().schema(local).connect().await.unwrap();
c.create_standard_queue("jobtype").await.unwrap();
c.create_standard_queue("jobtype_dlq").await.unwrap();

let job1 = Job::builder()
.retry_limit(0)
.queue_name("jobtype")
.dead_letter("jobtype")
.build();

// but failed job where queue name == dlq name will not get into dlq
let id1 = c.send_job(&job1).await.expect("no error");

let fetched_job_1 = c.fetch_job("jobtype").await.unwrap().unwrap();
assert_eq!(fetched_job_1.id, id1);

// let's fail job1
let ok = c
.fail_job(
"jobtype",
fetched_job_1.id,
json!({"details": "testing..."}),
)
.await
.unwrap();
assert!(ok);

assert!(c.fetch_job("jobtype_dlq").await.unwrap().is_none());

let job2 = Job::builder()
.retry_limit(0)
.queue_name("jobtype")
.dead_letter("jobtype_dlq")
.build();
let id2 = c.send_job(&job2).await.expect("no error");

let fetched_job_2 = c.fetch_job("jobtype").await.unwrap().unwrap();
assert_ne!(fetched_job_2.id, id1);
assert_eq!(fetched_job_2.id, id2);

// let's fail job2
let ok = c
.fail_job(
"jobtype",
fetched_job_2.id,
json!({"details": "testing again..."}),
)
.await
.unwrap();
assert!(ok);

let job2_from_dlq = c.fetch_job("jobtype_dlq").await.unwrap().unwrap();
assert_eq!(fetched_job_2.dead_letter.unwrap(), job2_from_dlq.queue_name);
assert_eq!(fetched_job_2.data, job2_from_dlq.data);
assert_eq!(fetched_job_2.retry_limit, job2_from_dlq.retry_limit);
// assert_eq!(job2.output, job2_from_dlq.output);
// assert_eq!(job2.keep_until, job2_from_dlq.keep_until);
}

0 comments on commit 8b0b695

Please sign in to comment.