From 688d0f931411223166940204adff5232fe2a8b8a Mon Sep 17 00:00:00 2001 From: Pavel Mikhalkevich Date: Fri, 30 Aug 2024 08:10:06 +0400 Subject: [PATCH] Update to latest PgBoss --- Cargo.lock | 1 + Cargo.toml | 1 + src/client/mod.rs | 6 ++++-- src/client/public.rs | 2 +- src/lib.rs | 5 ++--- src/sql/ddl.rs | 3 ++- src/sql/proc.rs | 7 ++++++- src/utils.rs | 2 +- tests/e2e/queue.rs | 20 ++++++-------------- 9 files changed, 24 insertions(+), 23 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 0679a2d..2806fa9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -788,6 +788,7 @@ version = "0.1.0" dependencies = [ "chrono", "lazy_static", + "log", "serde", "serde_json", "sqlx", diff --git a/Cargo.toml b/Cargo.toml index a2efb2e..3f4fe91 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -16,6 +16,7 @@ default = [] [dependencies] chrono = { version = "0.4.38", features = ["serde"] } +log = "0.4.22" serde = { version = "1.0.208", features = ["derive"] } serde_json = "1.0.127" sqlx = { version = "=0.8.0", features = [ diff --git a/src/client/mod.rs b/src/client/mod.rs index f239636..9e40b4b 100644 --- a/src/client/mod.rs +++ b/src/client/mod.rs @@ -23,9 +23,11 @@ impl Client { async fn init(&mut self) -> Result<(), sqlx::Error> { if let Some(app) = self.maybe_existing_app().await? { - println!( + log::info!( "App already exists: version={}, maintained_on={:?}, cron_on={:?}", - app.version, app.maintained_on, app.cron_on + app.version, + app.maintained_on, + app.cron_on ); if app.version < crate::MINIMUM_SUPPORTED_PGBOSS_APP_VERSION as i32 { panic!("Cannot migrate from the currently installed PgBoss application.") diff --git a/src/client/public.rs b/src/client/public.rs index 12b7c80..e433d4a 100644 --- a/src/client/public.rs +++ b/src/client/public.rs @@ -106,7 +106,7 @@ impl Client { } /// Enqueue a job. - pub async fn send_job(&self, job: J) -> Result, Error> + pub async fn send_job(&self, _job: J) -> Result, Error> where J: Into, { diff --git a/src/lib.rs b/src/lib.rs index 7e61f8d..78821b0 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -18,10 +18,9 @@ use chrono::{DateTime, Utc}; use sqlx::FromRow; use std::fmt::Debug; -// See the PgBoss v10 package: // https://github.com/timgit/pg-boss/blob/4b3d9f4628860bb103f4498161e0ec6d17b55b56/src/contractor.js#L491 -pub(crate) const MINIMUM_SUPPORTED_PGBOSS_APP_VERSION: u8 = 21; -pub(crate) const CURRENT_PGBOSS_APP_VERSION: u8 = 21; +pub(crate) const MINIMUM_SUPPORTED_PGBOSS_APP_VERSION: u8 = 23; +pub(crate) const CURRENT_PGBOSS_APP_VERSION: u8 = 23; #[derive(Debug, Clone, Default, FromRow)] pub(crate) struct App { diff --git a/src/sql/ddl.rs b/src/sql/ddl.rs index 51636c3..1c1dc7e 100644 --- a/src/sql/ddl.rs +++ b/src/sql/ddl.rs @@ -94,7 +94,8 @@ pub(super) fn create_job_table(schema: &str) -> String { keep_until timestamptz not null default now() + interval '14 days', output jsonb, dead_letter text, - policy text + policy text, + CONSTRAINT job_pkey PRIMARY KEY (name, id) ) PARTITION BY LIST (name); ", JobState::Created diff --git a/src/sql/proc.rs b/src/sql/proc.rs index 22fd869..c5d400e 100644 --- a/src/sql/proc.rs +++ b/src/sql/proc.rs @@ -9,6 +9,7 @@ pub(super) fn create_create_queue_function(schema: &str) -> String { $$ DECLARE table_name varchar := 'j' || encode(sha224(queue_name::bytea), 'hex'); + queue_created_on timestamptz; BEGIN INSERT INTO {schema}.queue ( name, @@ -31,7 +32,11 @@ pub(super) fn create_create_queue_function(schema: &str) -> String { (options->>'retentionMinutes')::int, options->>'deadLetter', table_name - ); + ) ON CONFLICT DO NOTHING RETURNING created_on INTO queue_created_on; + + IF queue_created_on IS NULL THEN + RETURN; + END IF; EXECUTE format('CREATE TABLE {schema}.%I (LIKE {schema}.job INCLUDING DEFAULTS)', table_name); EXECUTE format('ALTER TABLE {schema}.%I ADD PRIMARY KEY (name, id)', table_name); diff --git a/src/utils.rs b/src/utils.rs index df770e2..18db50f 100644 --- a/src/utils.rs +++ b/src/utils.rs @@ -1,5 +1,5 @@ -use sqlx::{postgres::PgPoolOptions, PgPool}; use crate::Error; +use sqlx::{postgres::PgPoolOptions, PgPool}; pub(crate) async fn create_pool(url: Option<&str>) -> Result { let pool = match url { diff --git a/tests/e2e/queue.rs b/tests/e2e/queue.rs index 2b96b30..e338758 100644 --- a/tests/e2e/queue.rs +++ b/tests/e2e/queue.rs @@ -2,7 +2,7 @@ use std::time::Duration; use crate::utils::{self, POSRGRES_URL}; use chrono::Utc; -use pgboss::{Client, Error, QueueOptions, QueuePolicy}; +use pgboss::{Client, QueueOptions, QueuePolicy}; use sqlx::postgres::PgPoolOptions; #[tokio::test] @@ -65,8 +65,9 @@ async fn instantiated_idempotently() { } #[tokio::test] -async fn v21_app_already_exists() { - let local = "v21_app_already_exists"; +async fn app_latest_version_already_exists() { + let local = "app_latest_version_already_exists"; + utils::drop_schema(local).await.unwrap(); let create_schema_stmt = format!("CREATE SCHEMA {local};"); let create_version_table_stmt = format!( @@ -80,7 +81,7 @@ async fn v21_app_already_exists() { ); let insert_app_stmt = format!( "INSERT INTO {local}.version VALUES ('{}', '{}','{}')", - 21, + 23, Utc::now(), Utc::now() ); @@ -94,7 +95,6 @@ async fn v21_app_already_exists() { .unwrap(); let _c = Client::builder().schema(local).connect().await.unwrap(); - utils::drop_schema(local).await.unwrap(); } #[tokio::test] @@ -162,15 +162,7 @@ async fn create_queue_already_exists() { let client = Client::builder().schema(local).connect().await.unwrap(); client.create_standard_queue("job_type").await.unwrap(); - - if let Error::Sqlx(e) = client.create_standard_queue("job_type").await.unwrap_err() { - assert_eq!( - e.into_database_error().unwrap().constraint().unwrap(), - "queue_pkey" - ); - } else { - unreachable!() - } + client.create_standard_queue("job_type").await.unwrap(); } #[tokio::test]