diff --git a/docs/api/sql/functions.md b/docs/api/sql/functions.md index 0091712e..8819d174 100644 --- a/docs/api/sql/functions.md +++ b/docs/api/sql/functions.md @@ -22,15 +22,27 @@ RETURNS SETOF bigint | :--- | :---- | :--- | | queue_name | text | The name of the queue | | msg | jsonb | The message to send to the queue | -| delay | integer | Time in seconds before the message becomes visible. Defaults to 0. | +| delay | integer/timestampz | Time in seconds before the message becomes visible, or a timestamp of when it becomes visible. Defaults to 0. | Example: ```sql select * from pgmq.send('my_queue', '{"hello": "world"}'); - send + send ------ 4 + +-- Message with a delay of 5 seconds +select * from pgmq.send('my_queue', '{"hello": "world"}', 5); + send +------ + 5 + +-- Message readable from tomorrow +select * from pgmq.send('my_queue', '{"hello": "world"}', CURRENT_TIMESTAMP + INTERVAL '1 day'); + send +------ + 6 ``` --- @@ -53,17 +65,45 @@ RETURNS SETOF bigint | :--- | :---- | :--- | | queue_name | text | The name of the queue | | msgs | jsonb[] | Array of messages to send to the queue | -| delay | integer | Time in seconds before the messages becomes visible. Defaults to 0. | +| delay | integer/timestampz | Time in seconds before the messages becomes visible, or a timestamp of when it becomes visible. Defaults to 0. | ```sql -select * from pgmq.send_batch('my_queue', ARRAY[ - '{"hello": "world_0"}'::jsonb, - '{"hello": "world_1"}'::jsonb] +select * from pgmq.send_batch('my_queue', + ARRAY[ + '{"hello": "world_0"}', + '{"hello": "world_1"}' + ]::jsonb[] ); - send_batch + send_batch ------------ 1 2 + +-- Message with a delay of 5 seconds +select * from pgmq.send_batch('my_queue', + ARRAY[ + '{"hello": "world_0"}', + '{"hello": "world_1"}' + ]::jsonb[], + 5 +); + send_batch +------------ + 3 + 4 + +-- Message readable from tomorrow +select * from pgmq.send_batch('my_queue', + ARRAY[ + '{"hello": "world_0"}', + '{"hello": "world_1"}' + ]::jsonb[], + 5 +); + send_batch +------------ + 6 + 7 ``` --- @@ -101,7 +141,7 @@ Read messages from a queue ```sql select * from pgmq.read('my_queue', 10, 2); - msg_id | read_ct | enqueued_at | vt | message + msg_id | read_ct | enqueued_at | vt | message --------+---------+-------------------------------+-------------------------------+---------------------- 1 | 1 | 2023-10-28 19:14:47.356595-05 | 2023-10-28 19:17:08.608922-05 | {"hello": "world_0"} 2 | 1 | 2023-10-28 19:14:47.356595-05 | 2023-10-28 19:17:08.608974-05 | {"hello": "world_1"} @@ -112,7 +152,7 @@ Read a message from a queue with message filtering ```sql select * from pgmq.read('my_queue', 10, 2, '{"hello": "world_1"}'); - msg_id | read_ct | enqueued_at | vt | message + msg_id | read_ct | enqueued_at | vt | message --------+---------+-------------------------------+-------------------------------+---------------------- 2 | 1 | 2023-10-28 19:14:47.356595-05 | 2023-10-28 19:17:08.608974-05 | {"hello": "world_1"} (1 row) @@ -155,7 +195,7 @@ Example: ```sql select * from pgmq.read_with_poll('my_queue', 1, 1, 5, 100); - msg_id | read_ct | enqueued_at | vt | message + msg_id | read_ct | enqueued_at | vt | message --------+---------+-------------------------------+-------------------------------+-------------------- 1 | 1 | 2023-10-28 19:09:09.177756-05 | 2023-10-28 19:27:00.337929-05 | {"hello": "world"} ``` @@ -185,7 +225,7 @@ Example: ```sql pgmq=# select * from pgmq.pop('my_queue'); - msg_id | read_ct | enqueued_at | vt | message + msg_id | read_ct | enqueued_at | vt | message --------+---------+-------------------------------+-------------------------------+-------------------- 1 | 2 | 2023-10-28 19:09:09.177756-05 | 2023-10-28 19:27:00.337929-05 | {"hello": "world"} ``` @@ -214,7 +254,7 @@ Example: ```sql select pgmq.delete('my_queue', 5); - delete + delete -------- t ``` @@ -243,7 +283,7 @@ Delete two messages that exist. ```sql select * from pgmq.delete('my_queue', ARRAY[2, 3]); - delete + delete -------- 2 3 @@ -253,7 +293,7 @@ Delete two messages, one that exists and one that does not. Message `999` does n ```sql select * from pgmq.delete('my_queue', ARRAY[6, 999]); - delete + delete -------- 6 ``` @@ -280,8 +320,8 @@ Example: Purge the queue when it contains 8 messages; ```sql -select * from pgmq.purge_queue('my_queue'); - purge_queue +select * from pgmq.purge_queue('my_queue'); + purge_queue ------------- 8 ``` @@ -311,7 +351,7 @@ Example; remove message with ID 1 from queue `my_queue` and archive it: ```sql SELECT * FROM pgmq.archive('my_queue', 1); - archive + archive --------- t ``` @@ -341,7 +381,7 @@ Delete messages with ID 1 and 2 from queue `my_queue` and move to the archive. ```sql SELECT * FROM pgmq.archive('my_queue', ARRAY[1, 2]); - archive + archive --------- 1 2 @@ -351,7 +391,7 @@ Delete messages 4, which exists and 999, which does not exist. ```sql select * from pgmq.archive('my_queue', ARRAY[4, 999]); - archive + archive --------- 4 ``` @@ -379,7 +419,7 @@ Example: ```sql select from pgmq.create('my_queue'); - create + create -------- ``` @@ -416,7 +456,7 @@ select from pgmq.create_partitioned( '100000', '10000000' ); - create_partitioned + create_partitioned -------------------- ``` @@ -424,7 +464,7 @@ select from pgmq.create_partitioned( ### create_unlogged -Creates an unlogged table. This is useful when write throughput is more important that durability. +Creates an unlogged table. This is useful when write throughput is more important that durability. See Postgres documentation for [unlogged tables](https://www.postgresql.org/docs/current/sql-createtable.html#SQL-CREATETABLE-UNLOGGED) for more information. ```text @@ -442,7 +482,7 @@ Example: ```sql select pgmq.create_unlogged('my_unlogged'); - create_unlogged + create_unlogged ----------------- ``` @@ -467,7 +507,7 @@ Example: ```sql select * from pgmq.detach_archive('my_queue'); - detach_archive + detach_archive ---------------- ``` @@ -492,7 +532,7 @@ Example: ```sql select * from pgmq.drop_queue('my_unlogged'); - drop_queue + drop_queue ------------ t ``` @@ -526,7 +566,7 @@ Set the visibility timeout of message 1 to 30 seconds from now. ```sql select * from pgmq.set_vt('my_queue', 11, 30); - msg_id | read_ct | enqueued_at | vt | message + msg_id | read_ct | enqueued_at | vt | message --------+---------+-------------------------------+-------------------------------+---------------------- 1 | 0 | 2023-10-28 19:42:21.778741-05 | 2023-10-28 19:59:34.286462-05 | {"hello": "world_0"} ``` @@ -551,7 +591,7 @@ Example: ```sql select * from pgmq.list_queues(); - queue_name | created_at | is_partitioned | is_unlogged + queue_name | created_at | is_partitioned | is_unlogged ----------------------+-------------------------------+----------------+------------- my_queue | 2023-10-28 14:13:17.092576-05 | f | f my_partitioned_queue | 2023-10-28 19:47:37.098692-05 | t | f @@ -597,7 +637,7 @@ Example: ```sql select * from pgmq.metrics('my_queue'); - queue_name | queue_length | newest_msg_age_sec | oldest_msg_age_sec | total_messages | scrape_time + queue_name | queue_length | newest_msg_age_sec | oldest_msg_age_sec | total_messages | scrape_time ------------+--------------+--------------------+--------------------+----------------+------------------------------- my_queue | 16 | 2445 | 2447 | 35 | 2023-10-28 20:23:08.406259-05 ``` @@ -633,7 +673,7 @@ RETURNS TABLE( ```sql select * from pgmq.metrics_all(); - queue_name | queue_length | newest_msg_age_sec | oldest_msg_age_sec | total_messages | scrape_time + queue_name | queue_length | newest_msg_age_sec | oldest_msg_age_sec | total_messages | scrape_time ----------------------+--------------+--------------------+--------------------+----------------+------------------------------- my_queue | 16 | 2563 | 2565 | 35 | 2023-10-28 20:25:07.016413-05 my_partitioned_queue | 1 | 11 | 11 | 1 | 2023-10-28 20:25:07.016413-05 diff --git a/pgmq-extension/sql/pgmq.sql b/pgmq-extension/sql/pgmq.sql index 2d0f02b5..303b2b76 100644 --- a/pgmq-extension/sql/pgmq.sql +++ b/pgmq-extension/sql/pgmq.sql @@ -274,6 +274,18 @@ CREATE FUNCTION pgmq.send( msg JSONB, delay INTEGER DEFAULT 0 ) RETURNS SETOF BIGINT AS $$ +BEGIN + RETURN QUERY SELECT * FROM pgmq.send(queue_name, msg, clock_timestamp() + make_interval(secs => delay)); +END; +$$ LANGUAGE plpgsql; + +-- send_at +-- sends a message to a queue, with a delay as a timestamp +CREATE FUNCTION pgmq.send( + queue_name TEXT, + msg JSONB, + delay TIMESTAMP WITH TIME ZONE +) RETURNS SETOF BIGINT AS $$ DECLARE sql TEXT; qtable TEXT := pgmq.format_table_name(queue_name, 'q'); @@ -281,12 +293,12 @@ BEGIN sql := FORMAT( $QUERY$ INSERT INTO pgmq.%I (vt, message) - VALUES ((clock_timestamp() + %L), $1) + VALUES ($2, $1) RETURNING msg_id; $QUERY$, - qtable, make_interval(secs => delay) + qtable ); - RETURN QUERY EXECUTE sql USING msg; + RETURN QUERY EXECUTE sql USING msg, delay; END; $$ LANGUAGE plpgsql; @@ -297,6 +309,18 @@ CREATE FUNCTION pgmq.send_batch( msgs JSONB[], delay INTEGER DEFAULT 0 ) RETURNS SETOF BIGINT AS $$ +BEGIN + RETURN QUERY SELECT * FROM pgmq.send_batch(queue_name, msgs, clock_timestamp() + make_interval(secs => delay)); +END; +$$ LANGUAGE plpgsql; + +-- send_batch_at +-- sends an array of list of messages to a queue, with a delay as a timestamp +CREATE FUNCTION pgmq.send_batch( + queue_name TEXT, + msgs JSONB[], + delay TIMESTAMP WITH TIME ZONE +) RETURNS SETOF BIGINT AS $$ DECLARE sql TEXT; qtable TEXT := pgmq.format_table_name(queue_name, 'q'); @@ -304,12 +328,12 @@ BEGIN sql := FORMAT( $QUERY$ INSERT INTO pgmq.%I (vt, message) - SELECT clock_timestamp() + %L, unnest($1) + SELECT $2, unnest($1) RETURNING msg_id; $QUERY$, - qtable, make_interval(secs => delay) + qtable ); - RETURN QUERY EXECUTE sql USING msgs; + RETURN QUERY EXECUTE sql USING msgs, delay; END; $$ LANGUAGE plpgsql; diff --git a/pgmq-extension/test/expected/base.out b/pgmq-extension/test/expected/base.out index 24f21efb..34ae005a 100644 --- a/pgmq-extension/test/expected/base.out +++ b/pgmq-extension/test/expected/base.out @@ -129,6 +129,32 @@ SELECT msg_id = :msg_id FROM pgmq.read('test_default_queue', 2, 1); t (1 row) +SELECT pgmq.create('test_default_queue_vt'); + create +-------- + +(1 row) + +-- send message with timestamp +SELECT * from pgmq.send('test_default_queue_vt', '{"hello": "world"}', CURRENT_TIMESTAMP + '5 seconds'::interval); + send +------ + 1 +(1 row) + +-- read, assert no messages because we set timestamp to the future +SELECT msg_id = :msg_id FROM pgmq.read('test_default_queue_vt', 2, 1); + ?column? +---------- +(0 rows) + +-- read again, now using poll to block until message is ready +SELECT msg_id = :msg_id FROM pgmq.read_with_poll('test_default_queue_vt', 10, 1, 10); + ?column? +---------- + t +(1 row) + -- send a batch of 2 messages SELECT pgmq.create('batch_queue'); create @@ -145,6 +171,23 @@ SELECT ARRAY( SELECT pgmq.send_batch( t (1 row) +-- send a batch of 2 messages with timestamp +SELECT pgmq.create('batch_queue_vt'); + create +-------- + +(1 row) + +SELECT ARRAY( SELECT pgmq.send_batch( + 'batch_queue_vt', + ARRAY['{"hello": "world_0"}', '{"hello": "world_1"}']::jsonb[], + CURRENT_TIMESTAMP + '5 seconds'::interval +)) = ARRAY[1, 2]::BIGINT[]; + ?column? +---------- + t +(1 row) + -- CREATE with 5 seconds per partition, 10 seconds retention SELECT pgmq.create_partitioned('test_duration_queue', '5 seconds', '10 seconds'); create_partitioned @@ -201,7 +244,7 @@ SELECT queue_name, queue_length, newest_msg_age_sec, oldest_msg_age_sec, total_m SELECT COUNT(1) from pgmq.metrics_all(); count ------- - 7 + 9 (1 row) -- delete all the queues @@ -229,6 +272,8 @@ WARNING: drop_queue(queue_name, partitioned) is deprecated and will be removed WARNING: drop_queue(queue_name, partitioned) is deprecated and will be removed in PGMQ v2.0. Use drop_queue(queue_name) instead WARNING: drop_queue(queue_name, partitioned) is deprecated and will be removed in PGMQ v2.0. Use drop_queue(queue_name) instead WARNING: drop_queue(queue_name, partitioned) is deprecated and will be removed in PGMQ v2.0. Use drop_queue(queue_name) instead +WARNING: drop_queue(queue_name, partitioned) is deprecated and will be removed in PGMQ v2.0. Use drop_queue(queue_name) instead +WARNING: drop_queue(queue_name, partitioned) is deprecated and will be removed in PGMQ v2.0. Use drop_queue(queue_name) instead WARNING: drop_queue(queue_name, partitioned) is deprecated and will be removed in PGMQ v2.0. Use drop_queue(queue_name) instead drop_queue ------------ @@ -237,7 +282,9 @@ WARNING: drop_queue(queue_name, partitioned) is deprecated and will be removed t t t -(5 rows) + t + t +(7 rows) SELECT queue_name FROM pgmq.list_queues(); queue_name diff --git a/pgmq-extension/test/sql/base.sql b/pgmq-extension/test/sql/base.sql index 5bf99e52..61e1ae5f 100644 --- a/pgmq-extension/test/sql/base.sql +++ b/pgmq-extension/test/sql/base.sql @@ -55,6 +55,17 @@ SELECT msg_id = :msg_id FROM pgmq.set_vt('test_default_queue', :msg_id, 0); -- read again, should have msg_id 1 again SELECT msg_id = :msg_id FROM pgmq.read('test_default_queue', 2, 1); +SELECT pgmq.create('test_default_queue_vt'); + +-- send message with timestamp +SELECT * from pgmq.send('test_default_queue_vt', '{"hello": "world"}', CURRENT_TIMESTAMP + '5 seconds'::interval); + +-- read, assert no messages because we set timestamp to the future +SELECT msg_id = :msg_id FROM pgmq.read('test_default_queue_vt', 2, 1); + +-- read again, now using poll to block until message is ready +SELECT msg_id = :msg_id FROM pgmq.read_with_poll('test_default_queue_vt', 10, 1, 10); + -- send a batch of 2 messages SELECT pgmq.create('batch_queue'); SELECT ARRAY( SELECT pgmq.send_batch( @@ -62,6 +73,14 @@ SELECT ARRAY( SELECT pgmq.send_batch( ARRAY['{"hello": "world_0"}', '{"hello": "world_1"}']::jsonb[] )) = ARRAY[1, 2]::BIGINT[]; +-- send a batch of 2 messages with timestamp +SELECT pgmq.create('batch_queue_vt'); +SELECT ARRAY( SELECT pgmq.send_batch( + 'batch_queue_vt', + ARRAY['{"hello": "world_0"}', '{"hello": "world_1"}']::jsonb[], + CURRENT_TIMESTAMP + '5 seconds'::interval +)) = ARRAY[1, 2]::BIGINT[]; + -- CREATE with 5 seconds per partition, 10 seconds retention SELECT pgmq.create_partitioned('test_duration_queue', '5 seconds', '10 seconds');