Skip to content

Commit

Permalink
Enable PGMQ to run without installing an extension
Browse files Browse the repository at this point in the history
  • Loading branch information
axelfontaine committed Feb 9, 2025
1 parent e4d4b84 commit 5b52ebf
Showing 1 changed file with 96 additions and 79 deletions.
175 changes: 96 additions & 79 deletions pgmq-extension/sql/pgmq.sql
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
------------------------------------------------------------
-- Schema, tables, records, privileges, indexes, etc
------------------------------------------------------------
-- We don't need to create the `pgmq` schema because it is automatically
-- created by postgres due to being declared in extension control file
-- When installed as an extension, we don't need to create the `pgmq` schema
-- because it is automatically created by postgres due to being declared in
-- the extension control file
CREATE SCHEMA IF NOT EXISTS pgmq;

-- Table where queues and metadata about them is stored
CREATE TABLE pgmq.meta (
Expand Down Expand Up @@ -268,6 +270,29 @@ BEGIN
END;
$$ LANGUAGE plpgsql;
-- send: actual implementation
CREATE FUNCTION pgmq.send(
queue_name TEXT,
msg JSONB,
headers JSONB,
delay TIMESTAMP WITH TIME ZONE
) RETURNS SETOF BIGINT AS $$
DECLARE
sql TEXT;
qtable TEXT := pgmq.format_table_name(queue_name, 'q');
BEGIN
sql := FORMAT(
$QUERY$
INSERT INTO pgmq.%I (vt, message, headers)
VALUES ($2, $1, $3)
RETURNING msg_id;
$QUERY$,
qtable
);
RETURN QUERY EXECUTE sql USING msg, delay, headers;
END;
$$ LANGUAGE plpgsql;
-- send: 2 args, no delay or headers
CREATE FUNCTION pgmq.send(
queue_name TEXT,
Expand Down Expand Up @@ -313,26 +338,26 @@ CREATE FUNCTION pgmq.send(
SELECT * FROM pgmq.send(queue_name, msg, headers, clock_timestamp() + make_interval(secs => delay));
$$ LANGUAGE sql;
-- send: actual implementation
CREATE FUNCTION pgmq.send(
-- send_batch: actual implementation
CREATE FUNCTION pgmq.send_batch(
queue_name TEXT,
msg JSONB,
headers JSONB,
msgs JSONB[],
headers JSONB[],
delay TIMESTAMP WITH TIME ZONE
) RETURNS SETOF BIGINT AS $$
DECLARE
sql TEXT;
qtable TEXT := pgmq.format_table_name(queue_name, 'q');
BEGIN
sql := FORMAT(
$QUERY$
$QUERY$
INSERT INTO pgmq.%I (vt, message, headers)
VALUES ($2, $1, $3)
SELECT $2, unnest($1), unnest(coalesce($3, ARRAY[]::jsonb[]))
RETURNING msg_id;
$QUERY$,
qtable
);
RETURN QUERY EXECUTE sql USING msg, delay, headers;
qtable
);
RETURN QUERY EXECUTE sql USING msgs, delay, headers;
END;
$$ LANGUAGE plpgsql;
Expand Down Expand Up @@ -381,29 +406,6 @@ CREATE FUNCTION pgmq.send_batch(
SELECT * FROM pgmq.send_batch(queue_name, msgs, headers, clock_timestamp() + make_interval(secs => delay));
$$ LANGUAGE sql;
-- send_batch: actual implementation
CREATE FUNCTION pgmq.send_batch(
queue_name TEXT,
msgs JSONB[],
headers JSONB[],
delay TIMESTAMP WITH TIME ZONE
) RETURNS SETOF BIGINT AS $$
DECLARE
sql TEXT;
qtable TEXT := pgmq.format_table_name(queue_name, 'q');
BEGIN
sql := FORMAT(
$QUERY$
INSERT INTO pgmq.%I (vt, message, headers)
SELECT $2, unnest($1), unnest(coalesce($3, ARRAY[]::jsonb[]))
RETURNING msg_id;
$QUERY$,
qtable
);
RETURN QUERY EXECUTE sql USING msgs, delay, headers;
END;
$$ LANGUAGE plpgsql;
-- returned by pgmq.metrics() and pgmq.metrics_all
CREATE TYPE pgmq.metrics_result AS (
queue_name text,
Expand Down Expand Up @@ -504,7 +506,9 @@ RETURNS VOID AS $$
DECLARE
atable TEXT := pgmq.format_table_name(queue_name, 'a');
BEGIN
EXECUTE format('ALTER EXTENSION pgmq DROP TABLE pgmq.%I', atable);
IF pgmq._extension_exists('pgmq') THEN
EXECUTE format('ALTER EXTENSION pgmq DROP TABLE pgmq.%I', atable);
END IF;
END
$$ LANGUAGE plpgsql;
Expand Down Expand Up @@ -601,26 +605,28 @@ BEGIN
queue_name
) INTO partitioned;
EXECUTE FORMAT(
$QUERY$
ALTER EXTENSION pgmq DROP TABLE pgmq.%I
$QUERY$,
qtable
);
IF pgmq._extension_exists('pgmq') THEN
EXECUTE FORMAT(
$QUERY$
ALTER EXTENSION pgmq DROP TABLE pgmq.%I
$QUERY$,
qtable
);
EXECUTE FORMAT(
$QUERY$
ALTER EXTENSION pgmq DROP SEQUENCE pgmq.%I
$QUERY$,
qtable_seq
);
EXECUTE FORMAT(
$QUERY$
ALTER EXTENSION pgmq DROP SEQUENCE pgmq.%I
$QUERY$,
qtable_seq
);
EXECUTE FORMAT(
$QUERY$
ALTER EXTENSION pgmq DROP TABLE pgmq.%I
$QUERY$,
atable
);
EXECUTE FORMAT(
$QUERY$
ALTER EXTENSION pgmq DROP TABLE pgmq.%I
$QUERY$,
atable
);
END IF;
EXECUTE FORMAT(
$QUERY$
Expand Down Expand Up @@ -729,13 +735,15 @@ BEGIN
atable
);
IF NOT pgmq._belongs_to_pgmq(qtable) THEN
EXECUTE FORMAT('ALTER EXTENSION pgmq ADD TABLE pgmq.%I', qtable);
EXECUTE FORMAT('ALTER EXTENSION pgmq ADD SEQUENCE pgmq.%I', qtable_seq);
END IF;
IF pgmq._extension_exists('pgmq') THEN
IF NOT pgmq._belongs_to_pgmq(qtable) THEN
EXECUTE FORMAT('ALTER EXTENSION pgmq ADD TABLE pgmq.%I', qtable);
EXECUTE FORMAT('ALTER EXTENSION pgmq ADD SEQUENCE pgmq.%I', qtable_seq);
END IF;
IF NOT pgmq._belongs_to_pgmq(atable) THEN
EXECUTE FORMAT('ALTER EXTENSION pgmq ADD TABLE pgmq.%I', atable);
IF NOT pgmq._belongs_to_pgmq(atable) THEN
EXECUTE FORMAT('ALTER EXTENSION pgmq ADD TABLE pgmq.%I', atable);
END IF;
END IF;
EXECUTE FORMAT(
Expand Down Expand Up @@ -801,13 +809,15 @@ BEGIN
atable
);
IF NOT pgmq._belongs_to_pgmq(qtable) THEN
EXECUTE FORMAT('ALTER EXTENSION pgmq ADD TABLE pgmq.%I', qtable);
EXECUTE FORMAT('ALTER EXTENSION pgmq ADD SEQUENCE pgmq.%I', qtable_seq);
END IF;
IF pgmq._extension_exists('pgmq') THEN
IF NOT pgmq._belongs_to_pgmq(qtable) THEN
EXECUTE FORMAT('ALTER EXTENSION pgmq ADD TABLE pgmq.%I', qtable);
EXECUTE FORMAT('ALTER EXTENSION pgmq ADD SEQUENCE pgmq.%I', qtable_seq);
END IF;
IF NOT pgmq._belongs_to_pgmq(atable) THEN
EXECUTE FORMAT('ALTER EXTENSION pgmq ADD TABLE pgmq.%I', atable);
IF NOT pgmq._belongs_to_pgmq(atable) THEN
EXECUTE FORMAT('ALTER EXTENSION pgmq ADD TABLE pgmq.%I', atable);
END IF;
END IF;
EXECUTE FORMAT(
Expand Down Expand Up @@ -851,18 +861,21 @@ BEGIN
END;
$$ LANGUAGE plpgsql;
CREATE FUNCTION pgmq._ensure_pg_partman_installed()
RETURNS void AS $$
DECLARE
extension_exists BOOLEAN;
BEGIN
SELECT EXISTS (
CREATE FUNCTION pgmq._extension_exists(extension_name TEXT)
RETURNS BOOLEAN
LANGUAGE SQL
AS $$
SELECT EXISTS (
SELECT 1
FROM pg_extension
WHERE extname = 'pg_partman'
) INTO extension_exists;
WHERE extname = extension_name
)
$$;
IF NOT extension_exists THEN
CREATE FUNCTION pgmq._ensure_pg_partman_installed()
RETURNS void AS $$
BEGIN
IF NOT pgmq._extension_exists('pg_partman') THEN
RAISE EXCEPTION 'pg_partman is required for partitioned queues';
END IF;
END;
Expand Down Expand Up @@ -910,9 +923,11 @@ BEGIN
qtable, partition_col
);
IF NOT pgmq._belongs_to_pgmq(qtable) THEN
EXECUTE FORMAT('ALTER EXTENSION pgmq ADD TABLE pgmq.%I', qtable);
EXECUTE FORMAT('ALTER EXTENSION pgmq ADD SEQUENCE pgmq.%I', qtable_seq);
IF pgmq._extension_exists('pgmq') THEN
IF NOT pgmq._belongs_to_pgmq(qtable) THEN
EXECUTE FORMAT('ALTER EXTENSION pgmq ADD TABLE pgmq.%I', qtable);
EXECUTE FORMAT('ALTER EXTENSION pgmq ADD SEQUENCE pgmq.%I', qtable_seq);
END IF;
END IF;
-- https://github.com/pgpartman/pg_partman/blob/master/doc/pg_partman.md
Expand Down Expand Up @@ -988,8 +1003,10 @@ BEGIN
atable, a_partition_col
);
IF NOT pgmq._belongs_to_pgmq(atable) THEN
EXECUTE FORMAT('ALTER EXTENSION pgmq ADD TABLE pgmq.%I', atable);
IF pgmq._extension_exists('pgmq') THEN
IF NOT pgmq._belongs_to_pgmq(atable) THEN
EXECUTE FORMAT('ALTER EXTENSION pgmq ADD TABLE pgmq.%I', atable);
END IF;
END IF;
-- https://github.com/pgpartman/pg_partman/blob/master/doc/pg_partman.md
Expand Down

0 comments on commit 5b52ebf

Please sign in to comment.