Skip to content

Commit

Permalink
Update to latest PgBoss
Browse files Browse the repository at this point in the history
  • Loading branch information
rustworthy committed Aug 30, 2024
1 parent 413e70e commit 688d0f9
Show file tree
Hide file tree
Showing 9 changed files with 24 additions and 23 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ default = []

[dependencies]
chrono = { version = "0.4.38", features = ["serde"] }
log = "0.4.22"
serde = { version = "1.0.208", features = ["derive"] }
serde_json = "1.0.127"
sqlx = { version = "=0.8.0", features = [
Expand Down
6 changes: 4 additions & 2 deletions src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,11 @@ impl Client {

async fn init(&mut self) -> Result<(), sqlx::Error> {
if let Some(app) = self.maybe_existing_app().await? {
println!(
log::info!(
"App already exists: version={}, maintained_on={:?}, cron_on={:?}",
app.version, app.maintained_on, app.cron_on
app.version,
app.maintained_on,
app.cron_on
);
if app.version < crate::MINIMUM_SUPPORTED_PGBOSS_APP_VERSION as i32 {
panic!("Cannot migrate from the currently installed PgBoss application.")
Expand Down
2 changes: 1 addition & 1 deletion src/client/public.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ impl Client {
}

/// Enqueue a job.
pub async fn send_job<J>(&self, job: J) -> Result<Option<Uuid>, Error>
pub async fn send_job<J>(&self, _job: J) -> Result<Option<Uuid>, Error>
where
J: Into<Job>,
{
Expand Down
5 changes: 2 additions & 3 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,9 @@ use chrono::{DateTime, Utc};
use sqlx::FromRow;
use std::fmt::Debug;

// See the PgBoss v10 package:
// https://github.com/timgit/pg-boss/blob/4b3d9f4628860bb103f4498161e0ec6d17b55b56/src/contractor.js#L491
pub(crate) const MINIMUM_SUPPORTED_PGBOSS_APP_VERSION: u8 = 21;
pub(crate) const CURRENT_PGBOSS_APP_VERSION: u8 = 21;
pub(crate) const MINIMUM_SUPPORTED_PGBOSS_APP_VERSION: u8 = 23;
pub(crate) const CURRENT_PGBOSS_APP_VERSION: u8 = 23;

#[derive(Debug, Clone, Default, FromRow)]
pub(crate) struct App {
Expand Down
3 changes: 2 additions & 1 deletion src/sql/ddl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,8 @@ pub(super) fn create_job_table(schema: &str) -> String {
keep_until timestamptz not null default now() + interval '14 days',
output jsonb,
dead_letter text,
policy text
policy text,
CONSTRAINT job_pkey PRIMARY KEY (name, id)
) PARTITION BY LIST (name);
",
JobState::Created
Expand Down
7 changes: 6 additions & 1 deletion src/sql/proc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ pub(super) fn create_create_queue_function(schema: &str) -> String {
$$
DECLARE
table_name varchar := 'j' || encode(sha224(queue_name::bytea), 'hex');
queue_created_on timestamptz;
BEGIN
INSERT INTO {schema}.queue (
name,
Expand All @@ -31,7 +32,11 @@ pub(super) fn create_create_queue_function(schema: &str) -> String {
(options->>'retentionMinutes')::int,
options->>'deadLetter',
table_name
);
) ON CONFLICT DO NOTHING RETURNING created_on INTO queue_created_on;
IF queue_created_on IS NULL THEN
RETURN;
END IF;
EXECUTE format('CREATE TABLE {schema}.%I (LIKE {schema}.job INCLUDING DEFAULTS)', table_name);
EXECUTE format('ALTER TABLE {schema}.%I ADD PRIMARY KEY (name, id)', table_name);
Expand Down
2 changes: 1 addition & 1 deletion src/utils.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use sqlx::{postgres::PgPoolOptions, PgPool};
use crate::Error;
use sqlx::{postgres::PgPoolOptions, PgPool};

pub(crate) async fn create_pool(url: Option<&str>) -> Result<PgPool, Error> {
let pool = match url {
Expand Down
20 changes: 6 additions & 14 deletions tests/e2e/queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::time::Duration;

use crate::utils::{self, POSRGRES_URL};
use chrono::Utc;
use pgboss::{Client, Error, QueueOptions, QueuePolicy};
use pgboss::{Client, QueueOptions, QueuePolicy};
use sqlx::postgres::PgPoolOptions;

#[tokio::test]
Expand Down Expand Up @@ -65,8 +65,9 @@ async fn instantiated_idempotently() {
}

#[tokio::test]
async fn v21_app_already_exists() {
let local = "v21_app_already_exists";
async fn app_latest_version_already_exists() {
let local = "app_latest_version_already_exists";
utils::drop_schema(local).await.unwrap();

let create_schema_stmt = format!("CREATE SCHEMA {local};");
let create_version_table_stmt = format!(
Expand All @@ -80,7 +81,7 @@ async fn v21_app_already_exists() {
);
let insert_app_stmt = format!(
"INSERT INTO {local}.version VALUES ('{}', '{}','{}')",
21,
23,
Utc::now(),
Utc::now()
);
Expand All @@ -94,7 +95,6 @@ async fn v21_app_already_exists() {
.unwrap();

let _c = Client::builder().schema(local).connect().await.unwrap();
utils::drop_schema(local).await.unwrap();
}

#[tokio::test]
Expand Down Expand Up @@ -162,15 +162,7 @@ async fn create_queue_already_exists() {

let client = Client::builder().schema(local).connect().await.unwrap();
client.create_standard_queue("job_type").await.unwrap();

if let Error::Sqlx(e) = client.create_standard_queue("job_type").await.unwrap_err() {
assert_eq!(
e.into_database_error().unwrap().constraint().unwrap(),
"queue_pkey"
);
} else {
unreachable!()
}
client.create_standard_queue("job_type").await.unwrap();
}

#[tokio::test]
Expand Down

0 comments on commit 688d0f9

Please sign in to comment.