diff --git a/src/backend/distributed/commands/multi_copy.c b/src/backend/distributed/commands/multi_copy.c index b9e9058501a..e792238167e 100644 --- a/src/backend/distributed/commands/multi_copy.c +++ b/src/backend/distributed/commands/multi_copy.c @@ -346,6 +346,9 @@ static LocalCopyStatus GetLocalCopyStatus(void); static bool ShardIntervalListHasLocalPlacements(List *shardIntervalList); static void LogLocalCopyToRelationExecution(uint64 shardId); static void LogLocalCopyToFileExecution(uint64 shardId); +#if PG_VERSION_NUM >= PG_VERSION_15 +static void ErrorIfMergeInCopy(CopyStmt *copyStatement); +#endif /* exports for SQL callable functions */ @@ -2828,6 +2831,25 @@ CopyStatementHasFormat(CopyStmt *copyStatement, char *formatName) } +#if PG_VERSION_NUM >= PG_VERSION_15 + +/* + * ErrorIfMergeInCopy Raises an exception if the MERGE is called in the COPY. + */ +static void +ErrorIfMergeInCopy(CopyStmt *copyStatement) +{ + if (!copyStatement->relation && (IsA(copyStatement->query, MergeStmt))) + { + ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("MERGE not supported in COPY"))); + } +} + + +#endif + + /* * ProcessCopyStmt handles Citus specific concerns for COPY like supporting * COPYing from distributed tables and preventing unsupported actions. The @@ -2838,6 +2860,10 @@ Node * ProcessCopyStmt(CopyStmt *copyStatement, QueryCompletion *completionTag, const char *queryString) { + #if PG_VERSION_NUM >= PG_VERSION_15 + ErrorIfMergeInCopy(copyStatement); + #endif + /* * Handle special COPY "resultid" FROM STDIN WITH (format result) commands * for sending intermediate results to workers. diff --git a/src/backend/distributed/planner/merge_planner.c b/src/backend/distributed/planner/merge_planner.c index 8048002e005..32927e92af7 100644 --- a/src/backend/distributed/planner/merge_planner.c +++ b/src/backend/distributed/planner/merge_planner.c @@ -97,6 +97,7 @@ static DistributedPlan * CreateNonPushableMergePlan(Oid targetRelationId, uint64 plannerRestrictionContext, ParamListInfo boundParams); static char * MergeCommandResultIdPrefix(uint64 planId); +static void ErrorIfMergeHasReturningList(Query *query); #endif @@ -949,9 +950,24 @@ ConvertSourceRTEIntoSubquery(Query *mergeQuery, RangeTblEntry *sourceRte, } +/* + * ErrorIfMergeHasReturningList raises an exception if the MERGE + * has a RETURNING clause. + */ +static void +ErrorIfMergeHasReturningList(Query *query) +{ + if (query->returningList) + { + ereport(ERROR, (errmsg("MERGE with RETURNING is not yet supported"))); + } +} + + /* * ErrorIfMergeNotSupported Checks for conditions that are not supported in either * the routable or repartition strategies. It checks for + * - MERGE with a RETURNING clause * - Supported table types and their combinations * - Check the target lists and quals of both the query and merge actions * - Supported CTEs @@ -959,6 +975,7 @@ ConvertSourceRTEIntoSubquery(Query *mergeQuery, RangeTblEntry *sourceRte, static void ErrorIfMergeNotSupported(Query *query, Oid targetRelationId, List *rangeTableList) { + ErrorIfMergeHasReturningList(query); ErrorIfMergeHasUnsupportedTables(targetRelationId, rangeTableList); ErrorIfMergeQueryQualAndTargetListNotSupported(targetRelationId, query); ErrorIfUnsupportedCTEs(query); diff --git a/src/test/regress/expected/pgmerge.out b/src/test/regress/expected/pgmerge.out index a0f5d0c862b..ef96322a781 100644 --- a/src/test/regress/expected/pgmerge.out +++ b/src/test/regress/expected/pgmerge.out @@ -174,9 +174,21 @@ COPY ( WHEN MATCHED THEN DELETE ) TO stdout; ERROR: MERGE not supported in COPY +-- used in a CTE with RETURNING +WITH foo AS ( + MERGE INTO target USING source ON (true) + WHEN MATCHED THEN DELETE RETURNING target.* +) SELECT * FROM foo; +ERROR: syntax error at or near "RETURNING" +-- used in COPY with RETURNING +COPY ( + MERGE INTO target USING source ON (true) + WHEN MATCHED THEN DELETE RETURNING target.* +) TO stdout; +ERROR: syntax error at or near "RETURNING" -- unsupported relation types -- view -CREATE VIEW tv AS SELECT * FROM target; +CREATE VIEW tv AS SELECT count(tid) AS tid FROM target; MERGE INTO tv t USING source s ON t.tid = s.sid diff --git a/src/test/regress/expected/pgmerge_1.out b/src/test/regress/expected/pgmerge_1.out new file mode 100644 index 00000000000..32b3129e0eb --- /dev/null +++ b/src/test/regress/expected/pgmerge_1.out @@ -0,0 +1,2149 @@ +SHOW server_version \gset +SELECT substring(:'server_version', '\d+')::int >= 15 AS server_version_ge_15 +\gset +\if :server_version_ge_15 +\else +\q +\endif +-- +-- MERGE test from PG community (adapted to Citus by converting all tables to Citus local) +-- +DROP SCHEMA IF EXISTS pgmerge_schema CASCADE; +NOTICE: schema "pgmerge_schema" does not exist, skipping +CREATE SCHEMA pgmerge_schema; +SET search_path TO pgmerge_schema; +SET citus.use_citus_managed_tables to true; +\set SHOW_CONTEXT errors +SET citus.next_shard_id TO 4001000; +SET client_min_messages = warning; +SELECT 1 FROM master_add_node('localhost', :master_port, groupid => 0); + ?column? +--------------------------------------------------------------------- + 1 +(1 row) + +RESET client_min_messages; +CREATE USER regress_merge_privs; +CREATE USER regress_merge_no_privs; +DROP TABLE IF EXISTS target; +NOTICE: table "target" does not exist, skipping +DROP TABLE IF EXISTS source; +NOTICE: table "source" does not exist, skipping +CREATE TABLE target (tid integer, balance integer) + WITH (autovacuum_enabled=off); +CREATE TABLE source (sid integer, delta integer) -- no index + WITH (autovacuum_enabled=off); +SELECT citus_add_local_table_to_metadata('target'); + citus_add_local_table_to_metadata +--------------------------------------------------------------------- + +(1 row) + +SELECT citus_add_local_table_to_metadata('source'); + citus_add_local_table_to_metadata +--------------------------------------------------------------------- + +(1 row) + +INSERT INTO target VALUES (1, 10); +INSERT INTO target VALUES (2, 20); +INSERT INTO target VALUES (3, 30); +SELECT t.ctid is not null as matched, t.*, s.* FROM source s FULL OUTER JOIN target t ON s.sid = t.tid ORDER BY t.tid, s.sid; + matched | tid | balance | sid | delta +--------------------------------------------------------------------- + t | 1 | 10 | | + t | 2 | 20 | | + t | 3 | 30 | | +(3 rows) + +ALTER TABLE target OWNER TO regress_merge_privs; +ALTER TABLE source OWNER TO regress_merge_privs; +CREATE TABLE target2 (tid integer, balance integer) + WITH (autovacuum_enabled=off); +CREATE TABLE source2 (sid integer, delta integer) + WITH (autovacuum_enabled=off); +SELECT citus_add_local_table_to_metadata('target2'); + citus_add_local_table_to_metadata +--------------------------------------------------------------------- + +(1 row) + +SELECT citus_add_local_table_to_metadata('source2'); + citus_add_local_table_to_metadata +--------------------------------------------------------------------- + +(1 row) + +ALTER TABLE target2 OWNER TO regress_merge_no_privs; +ALTER TABLE source2 OWNER TO regress_merge_no_privs; +GRANT INSERT ON target TO regress_merge_no_privs; +GRANT USAGE, CREATE ON SCHEMA pgmerge_schema TO regress_merge_privs; +GRANT USAGE ON SCHEMA pgmerge_schema TO regress_merge_no_privs; +SET SESSION AUTHORIZATION regress_merge_privs; +EXPLAIN (COSTS OFF) +MERGE INTO target t +USING source AS s +ON t.tid = s.sid +WHEN MATCHED THEN + DELETE; + QUERY PLAN +--------------------------------------------------------------------- + Custom Scan (Citus Adaptive) + Task Count: 1 + Tasks Shown: All + -> Task + Node: host=localhost port=xxxxx dbname=regression + -> Merge on target_4001000 t + -> Merge Join + Merge Cond: (t.tid = s.sid) + -> Sort + Sort Key: t.tid + -> Seq Scan on target_4001000 t + -> Sort + Sort Key: s.sid + -> Seq Scan on source_4001001 s +(14 rows) + +-- +-- Errors +-- +MERGE INTO target t RANDOMWORD +USING source AS s +ON t.tid = s.sid +WHEN MATCHED THEN + UPDATE SET balance = 0; +ERROR: syntax error at or near "RANDOMWORD" +-- MATCHED/INSERT error +MERGE INTO target t +USING source AS s +ON t.tid = s.sid +WHEN MATCHED THEN + INSERT DEFAULT VALUES; +ERROR: syntax error at or near "INSERT" +-- incorrectly specifying INTO target +MERGE INTO target t +USING source AS s +ON t.tid = s.sid +WHEN NOT MATCHED THEN + INSERT INTO target DEFAULT VALUES; +ERROR: syntax error at or near "INTO" +-- Multiple VALUES clause +MERGE INTO target t +USING source AS s +ON t.tid = s.sid +WHEN NOT MATCHED THEN + INSERT VALUES (1,1), (2,2); +ERROR: syntax error at or near "," +-- SELECT query for INSERT +MERGE INTO target t +USING source AS s +ON t.tid = s.sid +WHEN NOT MATCHED THEN + INSERT SELECT (1, 1); +ERROR: syntax error at or near "SELECT" +-- NOT MATCHED/UPDATE +MERGE INTO target t +USING source AS s +ON t.tid = s.sid +WHEN NOT MATCHED THEN + UPDATE SET balance = 0; +ERROR: syntax error at or near "UPDATE" +-- UPDATE tablename +MERGE INTO target t +USING source AS s +ON t.tid = s.sid +WHEN MATCHED THEN + UPDATE target SET balance = 0; +ERROR: syntax error at or near "target" +-- source and target names the same +MERGE INTO target +USING target +ON tid = tid +WHEN MATCHED THEN DO NOTHING; +ERROR: name "target" specified more than once +DETAIL: The name is used both as MERGE target table and data source. +-- used in a CTE +WITH foo AS ( + MERGE INTO target USING source ON (true) + WHEN MATCHED THEN DELETE +) SELECT * FROM foo; +ERROR: WITH query "foo" does not have a RETURNING clause +-- used in COPY +COPY ( + MERGE INTO target USING source ON (true) + WHEN MATCHED THEN DELETE +) TO stdout; +ERROR: MERGE not supported in COPY +-- used in a CTE with RETURNING +WITH foo AS ( + MERGE INTO target USING source ON (true) + WHEN MATCHED THEN DELETE RETURNING target.* +) SELECT * FROM foo; +ERROR: MERGE with RETURNING is not yet supported +-- used in COPY with RETURNING +COPY ( + MERGE INTO target USING source ON (true) + WHEN MATCHED THEN DELETE RETURNING target.* +) TO stdout; +ERROR: MERGE not supported in COPY +-- unsupported relation types +-- view +CREATE VIEW tv AS SELECT count(tid) AS tid FROM target; +MERGE INTO tv t +USING source s +ON t.tid = s.sid +WHEN NOT MATCHED THEN + INSERT DEFAULT VALUES; +ERROR: cannot insert into view "tv" +DETAIL: Views that return aggregate functions are not automatically updatable. +HINT: To enable inserting into the view using MERGE, provide an INSTEAD OF INSERT trigger. +DROP VIEW tv; +-- materialized view +CREATE MATERIALIZED VIEW mv AS SELECT * FROM target; +MERGE INTO mv t +USING source s +ON t.tid = s.sid +WHEN NOT MATCHED THEN + INSERT DEFAULT VALUES; +ERROR: cannot execute MERGE on relation "mv" +DETAIL: This operation is not supported for materialized views. +DROP MATERIALIZED VIEW mv; +-- permissions +MERGE INTO target +USING source2 +ON target.tid = source2.sid +WHEN MATCHED THEN + UPDATE SET balance = 0; +ERROR: permission denied for table source2 +GRANT INSERT ON target TO regress_merge_no_privs; +SET SESSION AUTHORIZATION regress_merge_no_privs; +MERGE INTO target +USING source2 +ON target.tid = source2.sid +WHEN MATCHED THEN + UPDATE SET balance = 0; +ERROR: permission denied for table target +GRANT UPDATE ON target2 TO regress_merge_privs; +SET SESSION AUTHORIZATION regress_merge_privs; +MERGE INTO target2 +USING source +ON target2.tid = source.sid +WHEN MATCHED THEN + DELETE; +ERROR: permission denied for table target2 +MERGE INTO target2 +USING source +ON target2.tid = source.sid +WHEN NOT MATCHED THEN + INSERT DEFAULT VALUES; +ERROR: permission denied for table target2 +-- check if the target can be accessed from source relation subquery; we should +-- not be able to do so +\set VERBOSITY terse +MERGE INTO target t +USING (SELECT * FROM source WHERE t.tid > sid) s +ON t.tid = s.sid +WHEN NOT MATCHED THEN + INSERT DEFAULT VALUES; +ERROR: invalid reference to FROM-clause entry for table "t" at character 55 +\set VERBOSITY default +-- +-- initial tests +-- +-- zero rows in source has no effect +MERGE INTO target +USING source +ON target.tid = source.sid +WHEN MATCHED THEN + UPDATE SET balance = 0; +MERGE INTO target t +USING source AS s +ON t.tid = s.sid +WHEN MATCHED THEN + UPDATE SET balance = 0; +MERGE INTO target t +USING source AS s +ON t.tid = s.sid +WHEN MATCHED THEN + DELETE; +BEGIN; +MERGE INTO target t +USING source AS s +ON t.tid = s.sid +WHEN NOT MATCHED THEN + INSERT DEFAULT VALUES; +ROLLBACK; +-- insert some non-matching source rows to work from +INSERT INTO source VALUES (4, 40); +SELECT * FROM source ORDER BY sid; + sid | delta +--------------------------------------------------------------------- + 4 | 40 +(1 row) + +SELECT * FROM target ORDER BY tid; + tid | balance +--------------------------------------------------------------------- + 1 | 10 + 2 | 20 + 3 | 30 +(3 rows) + +MERGE INTO target t +USING source AS s +ON t.tid = s.sid +WHEN NOT MATCHED THEN + DO NOTHING; +MERGE INTO target t +USING source AS s +ON t.tid = s.sid +WHEN MATCHED THEN + UPDATE SET balance = 0; +MERGE INTO target t +USING source AS s +ON t.tid = s.sid +WHEN MATCHED THEN + DELETE; +BEGIN; +MERGE INTO target t +USING source AS s +ON t.tid = s.sid +WHEN NOT MATCHED THEN + INSERT DEFAULT VALUES; +SELECT * FROM target ORDER BY tid; + tid | balance +--------------------------------------------------------------------- + 1 | 10 + 2 | 20 + 3 | 30 + | +(4 rows) + +ROLLBACK; +-- index plans +INSERT INTO target SELECT generate_series(1000,2500), 0; +ALTER TABLE target ADD CONSTRAINT targetidx PRIMARY KEY (tid); +ANALYZE target; +EXPLAIN (COSTS OFF) +MERGE INTO target t +USING source AS s +ON t.tid = s.sid +WHEN MATCHED THEN + UPDATE SET balance = 0; + QUERY PLAN +--------------------------------------------------------------------- + Custom Scan (Citus Adaptive) + Task Count: 1 + Tasks Shown: All + -> Task + Node: host=localhost port=xxxxx dbname=regression + -> Merge on target_4001000 t + -> Hash Join + Hash Cond: (s.sid = t.tid) + -> Seq Scan on source_4001001 s + -> Hash + -> Seq Scan on target_4001000 t +(11 rows) + +EXPLAIN (COSTS OFF) +MERGE INTO target t +USING source AS s +ON t.tid = s.sid +WHEN MATCHED THEN + DELETE; + QUERY PLAN +--------------------------------------------------------------------- + Custom Scan (Citus Adaptive) + Task Count: 1 + Tasks Shown: All + -> Task + Node: host=localhost port=xxxxx dbname=regression + -> Merge on target_4001000 t + -> Hash Join + Hash Cond: (s.sid = t.tid) + -> Seq Scan on source_4001001 s + -> Hash + -> Seq Scan on target_4001000 t +(11 rows) + +EXPLAIN (COSTS OFF) +MERGE INTO target t +USING source AS s +ON t.tid = s.sid +WHEN NOT MATCHED THEN + INSERT VALUES (4, NULL); + QUERY PLAN +--------------------------------------------------------------------- + Custom Scan (Citus Adaptive) + Task Count: 1 + Tasks Shown: All + -> Task + Node: host=localhost port=xxxxx dbname=regression + -> Merge on target_4001000 t + -> Hash Left Join + Hash Cond: (s.sid = t.tid) + -> Seq Scan on source_4001001 s + -> Hash + -> Seq Scan on target_4001000 t +(11 rows) + +DELETE FROM target WHERE tid > 100; +ANALYZE target; +-- insert some matching source rows to work from +INSERT INTO source VALUES (2, 5); +INSERT INTO source VALUES (3, 20); +SELECT * FROM source ORDER BY sid; + sid | delta +--------------------------------------------------------------------- + 2 | 5 + 3 | 20 + 4 | 40 +(3 rows) + +SELECT * FROM target ORDER BY tid; + tid | balance +--------------------------------------------------------------------- + 1 | 10 + 2 | 20 + 3 | 30 +(3 rows) + +-- equivalent of an UPDATE join +BEGIN; +MERGE INTO target t +USING source AS s +ON t.tid = s.sid +WHEN MATCHED THEN + UPDATE SET balance = 0; +SELECT * FROM target ORDER BY tid; + tid | balance +--------------------------------------------------------------------- + 1 | 10 + 2 | 0 + 3 | 0 +(3 rows) + +ROLLBACK; +-- equivalent of a DELETE join +BEGIN; +MERGE INTO target t +USING source AS s +ON t.tid = s.sid +WHEN MATCHED THEN + DELETE; +SELECT * FROM target ORDER BY tid; + tid | balance +--------------------------------------------------------------------- + 1 | 10 +(1 row) + +ROLLBACK; +BEGIN; +MERGE INTO target t +USING source AS s +ON t.tid = s.sid +WHEN MATCHED THEN + DO NOTHING; +SELECT * FROM target ORDER BY tid; + tid | balance +--------------------------------------------------------------------- + 1 | 10 + 2 | 20 + 3 | 30 +(3 rows) + +ROLLBACK; +BEGIN; +MERGE INTO target t +USING source AS s +ON t.tid = s.sid +WHEN NOT MATCHED THEN + INSERT VALUES (4, NULL); +SELECT * FROM target ORDER BY tid; + tid | balance +--------------------------------------------------------------------- + 1 | 10 + 2 | 20 + 3 | 30 + 4 | +(4 rows) + +ROLLBACK; +-- duplicate source row causes multiple target row update ERROR +INSERT INTO source VALUES (2, 5); +SELECT * FROM source ORDER BY sid; + sid | delta +--------------------------------------------------------------------- + 2 | 5 + 2 | 5 + 3 | 20 + 4 | 40 +(4 rows) + +SELECT * FROM target ORDER BY tid; + tid | balance +--------------------------------------------------------------------- + 1 | 10 + 2 | 20 + 3 | 30 +(3 rows) + +BEGIN; +MERGE INTO target t +USING source AS s +ON t.tid = s.sid +WHEN MATCHED THEN + UPDATE SET balance = 0; +ERROR: MERGE command cannot affect row a second time +HINT: Ensure that not more than one source row matches any one target row. +ROLLBACK; +BEGIN; +MERGE INTO target t +USING source AS s +ON t.tid = s.sid +WHEN MATCHED THEN + DELETE; +ERROR: MERGE command cannot affect row a second time +HINT: Ensure that not more than one source row matches any one target row. +ROLLBACK; +-- remove duplicate MATCHED data from source data +DELETE FROM source WHERE sid = 2; +INSERT INTO source VALUES (2, 5); +SELECT * FROM source ORDER BY sid; + sid | delta +--------------------------------------------------------------------- + 2 | 5 + 3 | 20 + 4 | 40 +(3 rows) + +SELECT * FROM target ORDER BY tid; + tid | balance +--------------------------------------------------------------------- + 1 | 10 + 2 | 20 + 3 | 30 +(3 rows) + +-- duplicate source row on INSERT should fail because of target_pkey +INSERT INTO source VALUES (4, 40); +BEGIN; +MERGE INTO target t +USING source AS s +ON t.tid = s.sid +WHEN NOT MATCHED THEN + INSERT VALUES (4, NULL); +ERROR: duplicate key value violates unique constraint "targetidx_4001000" +DETAIL: Key (tid)=(4) already exists. +SELECT * FROM target ORDER BY tid; +ERROR: current transaction is aborted, commands ignored until end of transaction block +ROLLBACK; +-- remove duplicate NOT MATCHED data from source data +DELETE FROM source WHERE sid = 4; +INSERT INTO source VALUES (4, 40); +SELECT * FROM source ORDER BY sid; + sid | delta +--------------------------------------------------------------------- + 2 | 5 + 3 | 20 + 4 | 40 +(3 rows) + +SELECT * FROM target ORDER BY tid; + tid | balance +--------------------------------------------------------------------- + 1 | 10 + 2 | 20 + 3 | 30 +(3 rows) + +-- remove constraints +alter table target drop CONSTRAINT targetidx; +alter table target alter column tid drop not null; +-- multiple actions +BEGIN; +MERGE INTO target t +USING source AS s +ON t.tid = s.sid +WHEN NOT MATCHED THEN + INSERT VALUES (4, 4) +WHEN MATCHED THEN + UPDATE SET balance = 0; +SELECT * FROM target ORDER BY tid; + tid | balance +--------------------------------------------------------------------- + 1 | 10 + 2 | 0 + 3 | 0 + 4 | 4 +(4 rows) + +ROLLBACK; +-- should be equivalent +BEGIN; +MERGE INTO target t +USING source AS s +ON t.tid = s.sid +WHEN MATCHED THEN + UPDATE SET balance = 0 +WHEN NOT MATCHED THEN + INSERT VALUES (4, 4); +SELECT * FROM target ORDER BY tid; + tid | balance +--------------------------------------------------------------------- + 1 | 10 + 2 | 0 + 3 | 0 + 4 | 4 +(4 rows) + +ROLLBACK; +-- column references +-- do a simple equivalent of an UPDATE join +BEGIN; +MERGE INTO target t +USING source AS s +ON t.tid = s.sid +WHEN MATCHED THEN + UPDATE SET balance = t.balance + s.delta; +SELECT * FROM target ORDER BY tid; + tid | balance +--------------------------------------------------------------------- + 1 | 10 + 2 | 25 + 3 | 50 +(3 rows) + +ROLLBACK; +-- do a simple equivalent of an INSERT SELECT +BEGIN; +MERGE INTO target t +USING source AS s +ON t.tid = s.sid +WHEN NOT MATCHED THEN + INSERT VALUES (s.sid, s.delta); +SELECT * FROM target ORDER BY tid; + tid | balance +--------------------------------------------------------------------- + 1 | 10 + 2 | 20 + 3 | 30 + 4 | 40 +(4 rows) + +ROLLBACK; +-- and again with duplicate source rows +INSERT INTO source VALUES (5, 50); +INSERT INTO source VALUES (5, 50); +-- do a simple equivalent of an INSERT SELECT +BEGIN; +MERGE INTO target t +USING source AS s +ON t.tid = s.sid +WHEN NOT MATCHED THEN + INSERT VALUES (s.sid, s.delta); +SELECT * FROM target ORDER BY tid; + tid | balance +--------------------------------------------------------------------- + 1 | 10 + 2 | 20 + 3 | 30 + 4 | 40 + 5 | 50 + 5 | 50 +(6 rows) + +ROLLBACK; +-- removing duplicate source rows +DELETE FROM source WHERE sid = 5; +-- and again with explicitly identified column list +BEGIN; +MERGE INTO target t +USING source AS s +ON t.tid = s.sid +WHEN NOT MATCHED THEN + INSERT (tid, balance) VALUES (s.sid, s.delta); +SELECT * FROM target ORDER BY tid; + tid | balance +--------------------------------------------------------------------- + 1 | 10 + 2 | 20 + 3 | 30 + 4 | 40 +(4 rows) + +ROLLBACK; +-- and again with a subtle error: referring to non-existent target row for NOT MATCHED +\set VERBOSITY terse +MERGE INTO target t +USING source AS s +ON t.tid = s.sid +WHEN NOT MATCHED THEN + INSERT (tid, balance) VALUES (t.tid, s.delta); +ERROR: invalid reference to FROM-clause entry for table "t" at character 109 +-- and again with a constant ON clause +BEGIN; +MERGE INTO target t +USING source AS s +ON (SELECT true) +WHEN NOT MATCHED THEN + INSERT (tid, balance) VALUES (t.tid, s.delta); +ERROR: invalid reference to FROM-clause entry for table "t" at character 109 +SELECT * FROM target ORDER BY tid; +ERROR: current transaction is aborted, commands ignored until end of transaction block +ROLLBACK; +-- now the classic UPSERT +BEGIN; +MERGE INTO target t +USING source AS s +ON t.tid = s.sid +WHEN MATCHED THEN + UPDATE SET balance = t.balance + s.delta +WHEN NOT MATCHED THEN + INSERT VALUES (s.sid, s.delta); +SELECT * FROM target ORDER BY tid; + tid | balance +--------------------------------------------------------------------- + 1 | 10 + 2 | 25 + 3 | 50 + 4 | 40 +(4 rows) + +ROLLBACK; +-- unreachable WHEN clause should ERROR +BEGIN; +MERGE INTO target t +USING source AS s +ON t.tid = s.sid +WHEN MATCHED THEN /* Terminal WHEN clause for MATCHED */ + DELETE +WHEN MATCHED AND s.delta > 0 THEN + UPDATE SET balance = t.balance - s.delta; +ERROR: unreachable WHEN clause specified after unconditional WHEN clause +ROLLBACK; +-- conditional WHEN clause +CREATE TABLE wq_target (tid integer not null, balance integer DEFAULT -1) + WITH (autovacuum_enabled=off); +CREATE TABLE wq_source (balance integer, sid integer) + WITH (autovacuum_enabled=off); +SELECT citus_add_local_table_to_metadata('wq_target'); + citus_add_local_table_to_metadata +--------------------------------------------------------------------- + +(1 row) + +SELECT citus_add_local_table_to_metadata('wq_source'); + citus_add_local_table_to_metadata +--------------------------------------------------------------------- + +(1 row) + +INSERT INTO wq_source (sid, balance) VALUES (1, 100); +BEGIN; +-- try a simple INSERT with default values first +MERGE INTO wq_target t +USING wq_source s ON t.tid = s.sid +WHEN NOT MATCHED THEN + INSERT (tid) VALUES (s.sid); +SELECT * FROM wq_target; + tid | balance +--------------------------------------------------------------------- + 1 | -1 +(1 row) + +ROLLBACK; +-- this time with a FALSE condition +MERGE INTO wq_target t +USING wq_source s ON t.tid = s.sid +WHEN NOT MATCHED AND FALSE THEN + INSERT (tid) VALUES (s.sid); +SELECT * FROM wq_target; + tid | balance +--------------------------------------------------------------------- +(0 rows) + +-- this time with an actual condition which returns false +MERGE INTO wq_target t +USING wq_source s ON t.tid = s.sid +WHEN NOT MATCHED AND s.balance <> 100 THEN + INSERT (tid) VALUES (s.sid); +SELECT * FROM wq_target; + tid | balance +--------------------------------------------------------------------- +(0 rows) + +BEGIN; +-- and now with a condition which returns true +MERGE INTO wq_target t +USING wq_source s ON t.tid = s.sid +WHEN NOT MATCHED AND s.balance = 100 THEN + INSERT (tid) VALUES (s.sid); +SELECT * FROM wq_target; + tid | balance +--------------------------------------------------------------------- + 1 | -1 +(1 row) + +ROLLBACK; +-- conditions in the NOT MATCHED clause can only refer to source columns +BEGIN; +MERGE INTO wq_target t +USING wq_source s ON t.tid = s.sid +WHEN NOT MATCHED AND t.balance = 100 THEN + INSERT (tid) VALUES (s.sid); +ERROR: invalid reference to FROM-clause entry for table "t" at character 80 +SELECT * FROM wq_target; +ERROR: current transaction is aborted, commands ignored until end of transaction block +ROLLBACK; +\set VERBOSITY default +MERGE INTO wq_target t +USING wq_source s ON t.tid = s.sid +WHEN NOT MATCHED AND s.balance = 100 THEN + INSERT (tid) VALUES (s.sid); +SELECT * FROM wq_target; + tid | balance +--------------------------------------------------------------------- + 1 | -1 +(1 row) + +-- conditions in MATCHED clause can refer to both source and target +SELECT * FROM wq_source; + balance | sid +--------------------------------------------------------------------- + 100 | 1 +(1 row) + +MERGE INTO wq_target t +USING wq_source s ON t.tid = s.sid +WHEN MATCHED AND s.balance = 100 THEN + UPDATE SET balance = t.balance + s.balance; +SELECT * FROM wq_target; + tid | balance +--------------------------------------------------------------------- + 1 | 99 +(1 row) + +MERGE INTO wq_target t +USING wq_source s ON t.tid = s.sid +WHEN MATCHED AND t.balance = 100 THEN + UPDATE SET balance = t.balance + s.balance; +SELECT * FROM wq_target; + tid | balance +--------------------------------------------------------------------- + 1 | 99 +(1 row) + +-- check if AND works +MERGE INTO wq_target t +USING wq_source s ON t.tid = s.sid +WHEN MATCHED AND t.balance = 99 AND s.balance > 100 THEN + UPDATE SET balance = t.balance + s.balance; +SELECT * FROM wq_target; + tid | balance +--------------------------------------------------------------------- + 1 | 99 +(1 row) + +MERGE INTO wq_target t +USING wq_source s ON t.tid = s.sid +WHEN MATCHED AND t.balance = 99 AND s.balance = 100 THEN + UPDATE SET balance = t.balance + s.balance; +SELECT * FROM wq_target; + tid | balance +--------------------------------------------------------------------- + 1 | 199 +(1 row) + +-- check if OR works +MERGE INTO wq_target t +USING wq_source s ON t.tid = s.sid +WHEN MATCHED AND t.balance = 99 OR s.balance > 100 THEN + UPDATE SET balance = t.balance + s.balance; +SELECT * FROM wq_target; + tid | balance +--------------------------------------------------------------------- + 1 | 199 +(1 row) + +MERGE INTO wq_target t +USING wq_source s ON t.tid = s.sid +WHEN MATCHED AND t.balance = 199 OR s.balance > 100 THEN + UPDATE SET balance = t.balance + s.balance; +SELECT * FROM wq_target; + tid | balance +--------------------------------------------------------------------- + 1 | 299 +(1 row) + +-- check source-side whole-row references +BEGIN; +MERGE INTO wq_target t +USING wq_source s ON (t.tid = s.sid) +WHEN matched and t = s or t.tid = s.sid THEN + UPDATE SET balance = t.balance + s.balance; +SELECT * FROM wq_target; + tid | balance +--------------------------------------------------------------------- + 1 | 399 +(1 row) + +ROLLBACK; +-- check if subqueries work in the conditions? +MERGE INTO wq_target t +USING wq_source s ON t.tid = s.sid +WHEN MATCHED AND t.balance > (SELECT max(balance) FROM target) THEN + UPDATE SET balance = t.balance + s.balance; +-- check if we can access system columns in the conditions +MERGE INTO wq_target t +USING wq_source s ON t.tid = s.sid +WHEN MATCHED AND t.xmin = t.xmax THEN + UPDATE SET balance = t.balance + s.balance; +ERROR: cannot use system column "xmin" in MERGE WHEN condition +MERGE INTO wq_target t +USING wq_source s ON t.tid = s.sid +WHEN MATCHED AND t.tableoid >= 0 THEN + UPDATE SET balance = t.balance + s.balance; +SELECT * FROM wq_target; + tid | balance +--------------------------------------------------------------------- + 1 | 499 +(1 row) + +-- test preventing WHEN conditions from writing to the database +create or replace function merge_when_and_write() returns boolean +language plpgsql as +$$ +BEGIN + INSERT INTO target VALUES (100, 100); + RETURN TRUE; +END; +$$; +BEGIN; +MERGE INTO wq_target t +USING wq_source s ON t.tid = s.sid +WHEN MATCHED AND (merge_when_and_write()) THEN + UPDATE SET balance = t.balance + s.balance; +ERROR: non-IMMUTABLE functions are not yet supported in MERGE sql with distributed tables +ROLLBACK; +-- Test preventing ON condition from writing to the database +BEGIN; +MERGE INTO wq_target t +USING wq_source s ON t.tid = s.sid AND (merge_when_and_write()) +WHEN MATCHED THEN + UPDATE SET balance = t.balance + s.balance; +ERROR: non-IMMUTABLE functions are not yet supported in MERGE sql with distributed tables +ROLLBACK; +drop function merge_when_and_write(); +DROP TABLE wq_target, wq_source; +-- test triggers +create or replace function merge_trigfunc () returns trigger +language plpgsql as +$$ +DECLARE + line text; +BEGIN + SELECT INTO line format('%s %s %s trigger%s', + TG_WHEN, TG_OP, TG_LEVEL, CASE + WHEN TG_OP = 'INSERT' AND TG_LEVEL = 'ROW' + THEN format(' row: %s', NEW) + WHEN TG_OP = 'UPDATE' AND TG_LEVEL = 'ROW' + THEN format(' row: %s -> %s', OLD, NEW) + WHEN TG_OP = 'DELETE' AND TG_LEVEL = 'ROW' + THEN format(' row: %s', OLD) + END); + + RAISE NOTICE '%', line; + IF (TG_WHEN = 'BEFORE' AND TG_LEVEL = 'ROW') THEN + IF (TG_OP = 'DELETE') THEN + RETURN OLD; + ELSE + RETURN NEW; + END IF; + ELSE + RETURN NULL; + END IF; +END; +$$; +CREATE TRIGGER merge_bsi BEFORE INSERT ON target FOR EACH STATEMENT EXECUTE PROCEDURE merge_trigfunc (); +CREATE TRIGGER merge_bsu BEFORE UPDATE ON target FOR EACH STATEMENT EXECUTE PROCEDURE merge_trigfunc (); +CREATE TRIGGER merge_bsd BEFORE DELETE ON target FOR EACH STATEMENT EXECUTE PROCEDURE merge_trigfunc (); +CREATE TRIGGER merge_asi AFTER INSERT ON target FOR EACH STATEMENT EXECUTE PROCEDURE merge_trigfunc (); +CREATE TRIGGER merge_asu AFTER UPDATE ON target FOR EACH STATEMENT EXECUTE PROCEDURE merge_trigfunc (); +CREATE TRIGGER merge_asd AFTER DELETE ON target FOR EACH STATEMENT EXECUTE PROCEDURE merge_trigfunc (); +CREATE TRIGGER merge_bri BEFORE INSERT ON target FOR EACH ROW EXECUTE PROCEDURE merge_trigfunc (); +CREATE TRIGGER merge_bru BEFORE UPDATE ON target FOR EACH ROW EXECUTE PROCEDURE merge_trigfunc (); +CREATE TRIGGER merge_brd BEFORE DELETE ON target FOR EACH ROW EXECUTE PROCEDURE merge_trigfunc (); +CREATE TRIGGER merge_ari AFTER INSERT ON target FOR EACH ROW EXECUTE PROCEDURE merge_trigfunc (); +CREATE TRIGGER merge_aru AFTER UPDATE ON target FOR EACH ROW EXECUTE PROCEDURE merge_trigfunc (); +CREATE TRIGGER merge_ard AFTER DELETE ON target FOR EACH ROW EXECUTE PROCEDURE merge_trigfunc (); +-- now the classic UPSERT, with a DELETE +BEGIN; +UPDATE target SET balance = 0 WHERE tid = 3; +NOTICE: BEFORE UPDATE STATEMENT trigger +NOTICE: BEFORE UPDATE ROW trigger row: (3,30) -> (3,0) +NOTICE: AFTER UPDATE ROW trigger row: (3,30) -> (3,0) +NOTICE: AFTER UPDATE STATEMENT trigger +--EXPLAIN (ANALYZE ON, COSTS OFF, SUMMARY OFF, TIMING OFF) +MERGE INTO target t +USING source AS s +ON t.tid = s.sid +WHEN MATCHED AND t.balance > s.delta THEN + UPDATE SET balance = t.balance - s.delta +WHEN MATCHED THEN + DELETE +WHEN NOT MATCHED THEN + INSERT VALUES (s.sid, s.delta); +NOTICE: BEFORE INSERT STATEMENT trigger +NOTICE: BEFORE UPDATE STATEMENT trigger +NOTICE: BEFORE DELETE STATEMENT trigger +NOTICE: BEFORE DELETE ROW trigger row: (3,0) +NOTICE: BEFORE UPDATE ROW trigger row: (2,20) -> (2,15) +NOTICE: BEFORE INSERT ROW trigger row: (4,40) +NOTICE: AFTER DELETE ROW trigger row: (3,0) +NOTICE: AFTER UPDATE ROW trigger row: (2,20) -> (2,15) +NOTICE: AFTER INSERT ROW trigger row: (4,40) +NOTICE: AFTER DELETE STATEMENT trigger +NOTICE: AFTER UPDATE STATEMENT trigger +NOTICE: AFTER INSERT STATEMENT trigger +SELECT * FROM target ORDER BY tid; + tid | balance +--------------------------------------------------------------------- + 1 | 10 + 2 | 15 + 4 | 40 +(3 rows) + +ROLLBACK; +-- Test behavior of triggers that turn UPDATE/DELETE into no-ops +create or replace function skip_merge_op() returns trigger +language plpgsql as +$$ +BEGIN + RETURN NULL; +END; +$$; +SELECT * FROM target full outer join source on (sid = tid); + tid | balance | sid | delta +--------------------------------------------------------------------- + 3 | 30 | 3 | 20 + 2 | 20 | 2 | 5 + | | 4 | 40 + 1 | 10 | | +(4 rows) + +create trigger merge_skip BEFORE INSERT OR UPDATE or DELETE + ON target FOR EACH ROW EXECUTE FUNCTION skip_merge_op(); +MERGE INTO target t +USING source AS s +ON t.tid = s.sid +WHEN MATCHED AND s.sid = 3 THEN UPDATE SET balance = t.balance + s.delta +WHEN MATCHED THEN DELETE +WHEN NOT MATCHED THEN INSERT VALUES (sid, delta); +NOTICE: BEFORE INSERT STATEMENT trigger +NOTICE: BEFORE UPDATE STATEMENT trigger +NOTICE: BEFORE DELETE STATEMENT trigger +NOTICE: BEFORE UPDATE ROW trigger row: (3,30) -> (3,50) +NOTICE: BEFORE DELETE ROW trigger row: (2,20) +NOTICE: BEFORE INSERT ROW trigger row: (4,40) +NOTICE: AFTER DELETE STATEMENT trigger +NOTICE: AFTER UPDATE STATEMENT trigger +NOTICE: AFTER INSERT STATEMENT trigger +SELECT * FROM target FULL OUTER JOIN source ON (sid = tid); + tid | balance | sid | delta +--------------------------------------------------------------------- + 3 | 30 | 3 | 20 + 2 | 20 | 2 | 5 + | | 4 | 40 + 1 | 10 | | +(4 rows) + +DROP TRIGGER merge_skip ON target; +DROP FUNCTION skip_merge_op(); +-- test from PL/pgSQL +-- make sure MERGE INTO isn't interpreted to mean returning variables like SELECT INTO +BEGIN; +DO LANGUAGE plpgsql $$ +BEGIN +MERGE INTO target t +USING source AS s +ON t.tid = s.sid +WHEN MATCHED AND t.balance > s.delta THEN + UPDATE SET balance = t.balance - s.delta; +END; +$$; +NOTICE: BEFORE UPDATE STATEMENT trigger +NOTICE: BEFORE UPDATE ROW trigger row: (3,30) -> (3,10) +NOTICE: BEFORE UPDATE ROW trigger row: (2,20) -> (2,15) +NOTICE: AFTER UPDATE ROW trigger row: (3,30) -> (3,10) +NOTICE: AFTER UPDATE ROW trigger row: (2,20) -> (2,15) +NOTICE: AFTER UPDATE STATEMENT trigger +ROLLBACK; +--source constants +BEGIN; +MERGE INTO target t +USING (SELECT 9 AS sid, 57 AS delta) AS s +ON t.tid = s.sid +WHEN NOT MATCHED THEN + INSERT (tid, balance) VALUES (s.sid, s.delta); +NOTICE: BEFORE INSERT STATEMENT trigger +NOTICE: BEFORE INSERT ROW trigger row: (9,57) +NOTICE: AFTER INSERT ROW trigger row: (9,57) +NOTICE: AFTER INSERT STATEMENT trigger +SELECT * FROM target ORDER BY tid; + tid | balance +--------------------------------------------------------------------- + 1 | 10 + 2 | 20 + 3 | 30 + 9 | 57 +(4 rows) + +ROLLBACK; +--source query +BEGIN; +MERGE INTO target t +USING (SELECT sid, delta FROM source WHERE delta > 0) AS s +ON t.tid = s.sid +WHEN NOT MATCHED THEN + INSERT (tid, balance) VALUES (s.sid, s.delta); +NOTICE: BEFORE INSERT STATEMENT trigger +NOTICE: BEFORE INSERT ROW trigger row: (4,40) +NOTICE: AFTER INSERT ROW trigger row: (4,40) +NOTICE: AFTER INSERT STATEMENT trigger +SELECT * FROM target ORDER BY tid; + tid | balance +--------------------------------------------------------------------- + 1 | 10 + 2 | 20 + 3 | 30 + 4 | 40 +(4 rows) + +ROLLBACK; +BEGIN; +MERGE INTO target t +USING (SELECT sid, delta as newname FROM source WHERE delta > 0) AS s +ON t.tid = s.sid +WHEN NOT MATCHED THEN + INSERT (tid, balance) VALUES (s.sid, s.newname); +NOTICE: BEFORE INSERT STATEMENT trigger +NOTICE: BEFORE INSERT ROW trigger row: (4,40) +NOTICE: AFTER INSERT ROW trigger row: (4,40) +NOTICE: AFTER INSERT STATEMENT trigger +SELECT * FROM target ORDER BY tid; + tid | balance +--------------------------------------------------------------------- + 1 | 10 + 2 | 20 + 3 | 30 + 4 | 40 +(4 rows) + +ROLLBACK; +--self-merge +BEGIN; +MERGE INTO target t1 +USING target t2 +ON t1.tid = t2.tid +WHEN MATCHED THEN + UPDATE SET balance = t1.balance + t2.balance +WHEN NOT MATCHED THEN + INSERT VALUES (t2.tid, t2.balance); +NOTICE: BEFORE INSERT STATEMENT trigger +NOTICE: BEFORE UPDATE STATEMENT trigger +NOTICE: BEFORE UPDATE ROW trigger row: (1,10) -> (1,20) +NOTICE: BEFORE UPDATE ROW trigger row: (2,20) -> (2,40) +NOTICE: BEFORE UPDATE ROW trigger row: (3,30) -> (3,60) +NOTICE: AFTER UPDATE ROW trigger row: (1,10) -> (1,20) +NOTICE: AFTER UPDATE ROW trigger row: (2,20) -> (2,40) +NOTICE: AFTER UPDATE ROW trigger row: (3,30) -> (3,60) +NOTICE: AFTER UPDATE STATEMENT trigger +NOTICE: AFTER INSERT STATEMENT trigger +SELECT * FROM target ORDER BY tid; + tid | balance +--------------------------------------------------------------------- + 1 | 20 + 2 | 40 + 3 | 60 +(3 rows) + +ROLLBACK; +BEGIN; +MERGE INTO target t +USING (SELECT tid as sid, balance as delta FROM target WHERE balance > 0) AS s +ON t.tid = s.sid +WHEN NOT MATCHED THEN + INSERT (tid, balance) VALUES (s.sid, s.delta); +NOTICE: BEFORE INSERT STATEMENT trigger +NOTICE: AFTER INSERT STATEMENT trigger +SELECT * FROM target ORDER BY tid; + tid | balance +--------------------------------------------------------------------- + 1 | 10 + 2 | 20 + 3 | 30 +(3 rows) + +ROLLBACK; +BEGIN; +MERGE INTO target t +USING +(SELECT sid, max(delta) AS delta + FROM source + GROUP BY sid + HAVING count(*) = 1 + ORDER BY sid ASC) AS s +ON t.tid = s.sid +WHEN NOT MATCHED THEN + INSERT (tid, balance) VALUES (s.sid, s.delta); +NOTICE: BEFORE INSERT STATEMENT trigger +NOTICE: BEFORE INSERT ROW trigger row: (4,40) +NOTICE: AFTER INSERT ROW trigger row: (4,40) +NOTICE: AFTER INSERT STATEMENT trigger +SELECT * FROM target ORDER BY tid; + tid | balance +--------------------------------------------------------------------- + 1 | 10 + 2 | 20 + 3 | 30 + 4 | 40 +(4 rows) + +ROLLBACK; +-- plpgsql parameters and results +BEGIN; +CREATE FUNCTION merge_func (p_id integer, p_bal integer) +RETURNS INTEGER +LANGUAGE plpgsql +AS $$ +DECLARE + result integer; +BEGIN +MERGE INTO target t +USING (SELECT p_id AS sid) AS s +ON t.tid = s.sid +WHEN MATCHED THEN + UPDATE SET balance = t.balance - p_bal; +IF FOUND THEN + GET DIAGNOSTICS result := ROW_COUNT; +END IF; +RETURN result; +END; +$$; +SELECT merge_func(3, 4); +NOTICE: BEFORE UPDATE STATEMENT trigger +NOTICE: BEFORE UPDATE ROW trigger row: (3,30) -> (3,26) +NOTICE: AFTER UPDATE ROW trigger row: (3,30) -> (3,26) +NOTICE: AFTER UPDATE STATEMENT trigger + merge_func +--------------------------------------------------------------------- + 1 +(1 row) + +SELECT * FROM target ORDER BY tid; + tid | balance +--------------------------------------------------------------------- + 1 | 10 + 2 | 20 + 3 | 26 +(3 rows) + +ROLLBACK; +-- PREPARE +BEGIN; +prepare foom as merge into target t using (select 1 as sid) s on (t.tid = s.sid) when matched then update set balance = 1; +execute foom; +NOTICE: BEFORE UPDATE STATEMENT trigger +NOTICE: BEFORE UPDATE ROW trigger row: (1,10) -> (1,1) +NOTICE: AFTER UPDATE ROW trigger row: (1,10) -> (1,1) +NOTICE: AFTER UPDATE STATEMENT trigger +SELECT * FROM target ORDER BY tid; + tid | balance +--------------------------------------------------------------------- + 1 | 1 + 2 | 20 + 3 | 30 +(3 rows) + +ROLLBACK; +BEGIN; +PREPARE foom2 (integer, integer) AS +MERGE INTO target t +USING (SELECT 1) s +ON t.tid = $1 +WHEN MATCHED THEN +UPDATE SET balance = $2; +--EXPLAIN (ANALYZE ON, COSTS OFF, SUMMARY OFF, TIMING OFF) +execute foom2 (1, 1); +NOTICE: BEFORE UPDATE STATEMENT trigger +NOTICE: BEFORE UPDATE ROW trigger row: (1,10) -> (1,1) +NOTICE: AFTER UPDATE ROW trigger row: (1,10) -> (1,1) +NOTICE: AFTER UPDATE STATEMENT trigger +SELECT * FROM target ORDER BY tid; + tid | balance +--------------------------------------------------------------------- + 1 | 1 + 2 | 20 + 3 | 30 +(3 rows) + +ROLLBACK; +-- subqueries in source relation +CREATE TABLE sq_target (tid integer NOT NULL, balance integer) + WITH (autovacuum_enabled=off); +CREATE TABLE sq_source (delta integer, sid integer, balance integer DEFAULT 0) + WITH (autovacuum_enabled=off); +SELECT citus_add_local_table_to_metadata('sq_target'); + citus_add_local_table_to_metadata +--------------------------------------------------------------------- + +(1 row) + +SELECT citus_add_local_table_to_metadata('sq_source'); + citus_add_local_table_to_metadata +--------------------------------------------------------------------- + +(1 row) + +INSERT INTO sq_target(tid, balance) VALUES (1,100), (2,200), (3,300); +INSERT INTO sq_source(sid, delta) VALUES (1,10), (2,20), (4,40); +BEGIN; +MERGE INTO sq_target t +USING (SELECT * FROM sq_source) s +ON tid = sid +WHEN MATCHED AND t.balance > delta THEN + UPDATE SET balance = t.balance + delta; +SELECT * FROM sq_target; + tid | balance +--------------------------------------------------------------------- + 3 | 300 + 1 | 110 + 2 | 220 +(3 rows) + +ROLLBACK; +-- try a view +CREATE VIEW v AS SELECT * FROM sq_source WHERE sid < 2; +BEGIN; +MERGE INTO sq_target +USING v +ON tid = sid +WHEN MATCHED THEN + UPDATE SET balance = v.balance + delta; +SELECT * FROM sq_target; + tid | balance +--------------------------------------------------------------------- + 2 | 200 + 3 | 300 + 1 | 10 +(3 rows) + +ROLLBACK; +-- ambiguous reference to a column +BEGIN; +MERGE INTO sq_target +USING v +ON tid = sid +WHEN MATCHED AND tid > 2 THEN + UPDATE SET balance = balance + delta +WHEN NOT MATCHED THEN + INSERT (balance, tid) VALUES (balance + delta, sid) +WHEN MATCHED AND tid < 2 THEN + DELETE; +ERROR: column reference "balance" is ambiguous +ROLLBACK; +BEGIN; +INSERT INTO sq_source (sid, balance, delta) VALUES (-1, -1, -10); +MERGE INTO sq_target t +USING v +ON tid = sid +WHEN MATCHED AND tid > 2 THEN + UPDATE SET balance = t.balance + delta +WHEN NOT MATCHED THEN + INSERT (balance, tid) VALUES (balance + delta, sid) +WHEN MATCHED AND tid < 2 THEN + DELETE; +SELECT * FROM sq_target; + tid | balance +--------------------------------------------------------------------- + 2 | 200 + 3 | 300 + -1 | -11 +(3 rows) + +ROLLBACK; +-- CTEs +BEGIN; +INSERT INTO sq_source (sid, balance, delta) VALUES (-1, -1, -10); +WITH targq AS ( + SELECT * FROM v +) +MERGE INTO sq_target t +USING v +ON tid = sid +WHEN MATCHED AND tid > 2 THEN + UPDATE SET balance = t.balance + delta +WHEN NOT MATCHED THEN + INSERT (balance, tid) VALUES (balance + delta, sid) +WHEN MATCHED AND tid < 2 THEN + DELETE; +ROLLBACK; +-- RETURNING +BEGIN; +INSERT INTO sq_source (sid, balance, delta) VALUES (-1, -1, -10); +MERGE INTO sq_target t +USING v +ON tid = sid +WHEN MATCHED AND tid > 2 THEN + UPDATE SET balance = t.balance + delta +WHEN NOT MATCHED THEN + INSERT (balance, tid) VALUES (balance + delta, sid) +WHEN MATCHED AND tid < 2 THEN + DELETE +RETURNING *; +ERROR: MERGE with RETURNING is not yet supported +ROLLBACK; +-- EXPLAIN +CREATE TABLE ex_mtarget (a int, b int) + WITH (autovacuum_enabled=off); +CREATE TABLE ex_msource (a int, b int) + WITH (autovacuum_enabled=off); +SELECT citus_add_local_table_to_metadata('ex_mtarget'); + citus_add_local_table_to_metadata +--------------------------------------------------------------------- + +(1 row) + +SELECT citus_add_local_table_to_metadata('ex_msource'); + citus_add_local_table_to_metadata +--------------------------------------------------------------------- + +(1 row) + +INSERT INTO ex_mtarget SELECT i, i*10 FROM generate_series(1,100,2) i; +INSERT INTO ex_msource SELECT i, i*10 FROM generate_series(1,100,1) i; +CREATE FUNCTION explain_merge(query text) RETURNS SETOF text +LANGUAGE plpgsql AS +$$ +DECLARE ln text; +BEGIN + FOR ln IN + EXECUTE 'explain (analyze, timing off, summary off, costs off) ' || + query + LOOP + ln := regexp_replace(ln, '(Memory( Usage)?|Buckets|Batches): \S*', '\1: xxx', 'g'); + RETURN NEXT ln; + END LOOP; +END; +$$; +-- only updates +SELECT explain_merge(' +MERGE INTO ex_mtarget t USING ex_msource s ON t.a = s.a +WHEN MATCHED THEN + UPDATE SET b = t.b + 1'); + explain_merge +--------------------------------------------------------------------- + Custom Scan (Citus Adaptive) (actual rows=0 loops=1) + Task Count: 1 + Tasks Shown: All + -> Task + Node: host=localhost port=xxxxx dbname=regression + -> Merge on ex_mtarget_4001008 t (actual rows=0 loops=1) + Tuples: updated=50 + -> Merge Join (actual rows=50 loops=1) + Merge Cond: (t.a = s.a) + -> Sort (actual rows=50 loops=1) + Sort Key: t.a + Sort Method: quicksort Memory: xxx + -> Seq Scan on ex_mtarget_4001008 t (actual rows=50 loops=1) + -> Sort (actual rows=100 loops=1) + Sort Key: s.a + Sort Method: quicksort Memory: xxx + -> Seq Scan on ex_msource_4001009 s (actual rows=100 loops=1) +(17 rows) + +-- only updates to selected tuples +SELECT explain_merge(' +MERGE INTO ex_mtarget t USING ex_msource s ON t.a = s.a +WHEN MATCHED AND t.a < 10 THEN + UPDATE SET b = t.b + 1'); + explain_merge +--------------------------------------------------------------------- + Custom Scan (Citus Adaptive) (actual rows=0 loops=1) + Task Count: 1 + Tasks Shown: All + -> Task + Node: host=localhost port=xxxxx dbname=regression + -> Merge on ex_mtarget_4001008 t (actual rows=0 loops=1) + Tuples: updated=5 skipped=45 + -> Merge Join (actual rows=50 loops=1) + Merge Cond: (t.a = s.a) + -> Sort (actual rows=50 loops=1) + Sort Key: t.a + Sort Method: quicksort Memory: xxx + -> Seq Scan on ex_mtarget_4001008 t (actual rows=50 loops=1) + -> Sort (actual rows=100 loops=1) + Sort Key: s.a + Sort Method: quicksort Memory: xxx + -> Seq Scan on ex_msource_4001009 s (actual rows=100 loops=1) +(17 rows) + +-- updates + deletes +SELECT explain_merge(' +MERGE INTO ex_mtarget t USING ex_msource s ON t.a = s.a +WHEN MATCHED AND t.a < 10 THEN + UPDATE SET b = t.b + 1 +WHEN MATCHED AND t.a >= 10 AND t.a <= 20 THEN + DELETE'); + explain_merge +--------------------------------------------------------------------- + Custom Scan (Citus Adaptive) (actual rows=0 loops=1) + Task Count: 1 + Tasks Shown: All + -> Task + Node: host=localhost port=xxxxx dbname=regression + -> Merge on ex_mtarget_4001008 t (actual rows=0 loops=1) + Tuples: updated=5 deleted=5 skipped=40 + -> Merge Join (actual rows=50 loops=1) + Merge Cond: (t.a = s.a) + -> Sort (actual rows=50 loops=1) + Sort Key: t.a + Sort Method: quicksort Memory: xxx + -> Seq Scan on ex_mtarget_4001008 t (actual rows=50 loops=1) + -> Sort (actual rows=100 loops=1) + Sort Key: s.a + Sort Method: quicksort Memory: xxx + -> Seq Scan on ex_msource_4001009 s (actual rows=100 loops=1) +(17 rows) + +-- only inserts +SELECT explain_merge(' +MERGE INTO ex_mtarget t USING ex_msource s ON t.a = s.a +WHEN NOT MATCHED AND s.a < 10 THEN + INSERT VALUES (a, b)'); + explain_merge +--------------------------------------------------------------------- + Custom Scan (Citus Adaptive) (actual rows=0 loops=1) + Task Count: 1 + Tasks Shown: All + -> Task + Node: host=localhost port=xxxxx dbname=regression + -> Merge on ex_mtarget_4001008 t (actual rows=0 loops=1) + Tuples: inserted=4 skipped=96 + -> Merge Left Join (actual rows=100 loops=1) + Merge Cond: (s.a = t.a) + -> Sort (actual rows=100 loops=1) + Sort Key: s.a + Sort Method: quicksort Memory: xxx + -> Seq Scan on ex_msource_4001009 s (actual rows=100 loops=1) + -> Sort (actual rows=45 loops=1) + Sort Key: t.a + Sort Method: quicksort Memory: xxx + -> Seq Scan on ex_mtarget_4001008 t (actual rows=45 loops=1) +(17 rows) + +-- all three +SELECT explain_merge(' +MERGE INTO ex_mtarget t USING ex_msource s ON t.a = s.a +WHEN MATCHED AND t.a < 10 THEN + UPDATE SET b = t.b + 1 +WHEN MATCHED AND t.a >= 30 AND t.a <= 40 THEN + DELETE +WHEN NOT MATCHED AND s.a < 20 THEN + INSERT VALUES (a, b)'); + explain_merge +--------------------------------------------------------------------- + Custom Scan (Citus Adaptive) (actual rows=0 loops=1) + Task Count: 1 + Tasks Shown: All + -> Task + Node: host=localhost port=xxxxx dbname=regression + -> Merge on ex_mtarget_4001008 t (actual rows=0 loops=1) + Tuples: inserted=10 updated=9 deleted=5 skipped=76 + -> Merge Left Join (actual rows=100 loops=1) + Merge Cond: (s.a = t.a) + -> Sort (actual rows=100 loops=1) + Sort Key: s.a + Sort Method: quicksort Memory: xxx + -> Seq Scan on ex_msource_4001009 s (actual rows=100 loops=1) + -> Sort (actual rows=49 loops=1) + Sort Key: t.a + Sort Method: quicksort Memory: xxx + -> Seq Scan on ex_mtarget_4001008 t (actual rows=49 loops=1) +(17 rows) + +-- nothing +SELECT explain_merge(' +MERGE INTO ex_mtarget t USING ex_msource s ON t.a = s.a AND t.a < -1000 +WHEN MATCHED AND t.a < 10 THEN + DO NOTHING'); + explain_merge +--------------------------------------------------------------------- + Custom Scan (Citus Adaptive) (actual rows=0 loops=1) + Task Count: 1 + Tasks Shown: All + -> Task + Node: host=localhost port=xxxxx dbname=regression + -> Merge on ex_mtarget_4001008 t (actual rows=0 loops=1) + -> Merge Join (actual rows=0 loops=1) + Merge Cond: (t.a = s.a) + -> Sort (actual rows=0 loops=1) + Sort Key: t.a + Sort Method: quicksort Memory: xxx + -> Seq Scan on ex_mtarget_4001008 t (actual rows=0 loops=1) + Filter: (a < '-1000'::integer) + Rows Removed by Filter: 54 + -> Sort (never executed) + Sort Key: s.a + -> Seq Scan on ex_msource_4001009 s (never executed) +(17 rows) + +DROP TABLE ex_msource, ex_mtarget; +DROP FUNCTION explain_merge(text); +-- Subqueries +BEGIN; +MERGE INTO sq_target t +USING v +ON tid = sid +WHEN MATCHED THEN + UPDATE SET balance = (SELECT count(*) FROM sq_target); +SELECT * FROM sq_target WHERE tid = 1; + tid | balance +--------------------------------------------------------------------- + 1 | 3 +(1 row) + +ROLLBACK; +BEGIN; +MERGE INTO sq_target t +USING v +ON tid = sid +WHEN MATCHED AND (SELECT count(*) > 0 FROM sq_target) THEN + UPDATE SET balance = 42; +SELECT * FROM sq_target WHERE tid = 1; + tid | balance +--------------------------------------------------------------------- + 1 | 42 +(1 row) + +ROLLBACK; +BEGIN; +MERGE INTO sq_target t +USING v +ON tid = sid AND (SELECT count(*) > 0 FROM sq_target) +WHEN MATCHED THEN + UPDATE SET balance = 42; +SELECT * FROM sq_target WHERE tid = 1; + tid | balance +--------------------------------------------------------------------- + 1 | 42 +(1 row) + +ROLLBACK; +DROP TABLE sq_target, sq_source CASCADE; +NOTICE: drop cascades to view v +CREATE TABLE pa_target (tid integer, balance float, val text) + PARTITION BY LIST (tid); +CREATE TABLE part1 PARTITION OF pa_target FOR VALUES IN (1,4) + WITH (autovacuum_enabled=off); +CREATE TABLE part2 PARTITION OF pa_target FOR VALUES IN (2,5,6) + WITH (autovacuum_enabled=off); +CREATE TABLE part3 PARTITION OF pa_target FOR VALUES IN (3,8,9) + WITH (autovacuum_enabled=off); +CREATE TABLE part4 PARTITION OF pa_target DEFAULT + WITH (autovacuum_enabled=off); +CREATE TABLE pa_source (sid integer, delta float); +-- insert many rows to the source table +INSERT INTO pa_source SELECT id, id * 10 FROM generate_series(1,14) AS id; +-- insert a few rows in the target table (odd numbered tid) +INSERT INTO pa_target SELECT id, id * 100, 'initial' FROM generate_series(1,14,2) AS id; +SELECT citus_add_local_table_to_metadata('pa_target'); + citus_add_local_table_to_metadata +--------------------------------------------------------------------- + +(1 row) + +SELECT citus_add_local_table_to_metadata('pa_source'); + citus_add_local_table_to_metadata +--------------------------------------------------------------------- + +(1 row) + +-- try simple MERGE +BEGIN; +MERGE INTO pa_target t + USING pa_source s + ON t.tid = s.sid + WHEN MATCHED THEN + UPDATE SET balance = balance + delta, val = val || ' updated by merge' + WHEN NOT MATCHED THEN + INSERT VALUES (sid, delta, 'inserted by merge'); +SELECT * FROM pa_target ORDER BY tid; + tid | balance | val +--------------------------------------------------------------------- + 1 | 110 | initial updated by merge + 2 | 20 | inserted by merge + 3 | 330 | initial updated by merge + 4 | 40 | inserted by merge + 5 | 550 | initial updated by merge + 6 | 60 | inserted by merge + 7 | 770 | initial updated by merge + 8 | 80 | inserted by merge + 9 | 990 | initial updated by merge + 10 | 100 | inserted by merge + 11 | 1210 | initial updated by merge + 12 | 120 | inserted by merge + 13 | 1430 | initial updated by merge + 14 | 140 | inserted by merge +(14 rows) + +ROLLBACK; +-- same with a constant qual +BEGIN; +MERGE INTO pa_target t + USING pa_source s + ON t.tid = s.sid AND tid = 1 + WHEN MATCHED THEN + UPDATE SET balance = balance + delta, val = val || ' updated by merge' + WHEN NOT MATCHED THEN + INSERT VALUES (sid, delta, 'inserted by merge'); +SELECT * FROM pa_target ORDER BY tid; + tid | balance | val +--------------------------------------------------------------------- + 1 | 110 | initial updated by merge + 2 | 20 | inserted by merge + 3 | 30 | inserted by merge + 3 | 300 | initial + 4 | 40 | inserted by merge + 5 | 500 | initial + 5 | 50 | inserted by merge + 6 | 60 | inserted by merge + 7 | 700 | initial + 7 | 70 | inserted by merge + 8 | 80 | inserted by merge + 9 | 90 | inserted by merge + 9 | 900 | initial + 10 | 100 | inserted by merge + 11 | 1100 | initial + 11 | 110 | inserted by merge + 12 | 120 | inserted by merge + 13 | 1300 | initial + 13 | 130 | inserted by merge + 14 | 140 | inserted by merge +(20 rows) + +ROLLBACK; +-- try updating the partition key column +BEGIN; +MERGE INTO pa_target t + USING pa_source s + ON t.tid = s.sid + WHEN MATCHED THEN + UPDATE SET tid = tid + 1, balance = balance + delta, val = val || ' updated by merge' + WHEN NOT MATCHED THEN + INSERT VALUES (sid, delta, 'inserted by merge'); +SELECT * FROM pa_target ORDER BY tid; + tid | balance | val +--------------------------------------------------------------------- + 2 | 110 | initial updated by merge + 2 | 20 | inserted by merge + 4 | 40 | inserted by merge + 4 | 330 | initial updated by merge + 6 | 550 | initial updated by merge + 6 | 60 | inserted by merge + 8 | 80 | inserted by merge + 8 | 770 | initial updated by merge + 10 | 990 | initial updated by merge + 10 | 100 | inserted by merge + 12 | 1210 | initial updated by merge + 12 | 120 | inserted by merge + 14 | 1430 | initial updated by merge + 14 | 140 | inserted by merge +(14 rows) + +ROLLBACK; +DROP TABLE pa_target CASCADE; +-- The target table is partitioned in the same way, but this time by attaching +-- partitions which have columns in different order, dropped columns etc. +CREATE TABLE pa_target (tid integer, balance float, val text) + PARTITION BY LIST (tid); +CREATE TABLE part1 (tid integer, balance float, val text) + WITH (autovacuum_enabled=off); +CREATE TABLE part2 (balance float, tid integer, val text) + WITH (autovacuum_enabled=off); +CREATE TABLE part3 (tid integer, balance float, val text) + WITH (autovacuum_enabled=off); +CREATE TABLE part4 (extraid text, tid integer, balance float, val text) + WITH (autovacuum_enabled=off); +ALTER TABLE part4 DROP COLUMN extraid; +ALTER TABLE pa_target ATTACH PARTITION part1 FOR VALUES IN (1,4); +ALTER TABLE pa_target ATTACH PARTITION part2 FOR VALUES IN (2,5,6); +ALTER TABLE pa_target ATTACH PARTITION part3 FOR VALUES IN (3,8,9); +ALTER TABLE pa_target ATTACH PARTITION part4 DEFAULT; +-- insert a few rows in the target table (odd numbered tid) +INSERT INTO pa_target SELECT id, id * 100, 'initial' FROM generate_series(1,14,2) AS id; +-- try simple MERGE +BEGIN; +MERGE INTO pa_target t + USING pa_source s + ON t.tid = s.sid + WHEN MATCHED THEN + UPDATE SET balance = balance + delta, val = val || ' updated by merge' + WHEN NOT MATCHED THEN + INSERT VALUES (sid, delta, 'inserted by merge'); +SELECT * FROM pa_target ORDER BY tid; + tid | balance | val +--------------------------------------------------------------------- + 1 | 110 | initial updated by merge + 2 | 20 | inserted by merge + 3 | 330 | initial updated by merge + 4 | 40 | inserted by merge + 5 | 550 | initial updated by merge + 6 | 60 | inserted by merge + 7 | 770 | initial updated by merge + 8 | 80 | inserted by merge + 9 | 990 | initial updated by merge + 10 | 100 | inserted by merge + 11 | 1210 | initial updated by merge + 12 | 120 | inserted by merge + 13 | 1430 | initial updated by merge + 14 | 140 | inserted by merge +(14 rows) + +ROLLBACK; +-- same with a constant qual +BEGIN; +MERGE INTO pa_target t + USING pa_source s + ON t.tid = s.sid AND tid IN (1, 5) + WHEN MATCHED AND tid % 5 = 0 THEN DELETE + WHEN MATCHED THEN + UPDATE SET balance = balance + delta, val = val || ' updated by merge' + WHEN NOT MATCHED THEN + INSERT VALUES (sid, delta, 'inserted by merge'); +SELECT * FROM pa_target ORDER BY tid; + tid | balance | val +--------------------------------------------------------------------- + 1 | 110 | initial updated by merge + 2 | 20 | inserted by merge + 3 | 30 | inserted by merge + 3 | 300 | initial + 4 | 40 | inserted by merge + 6 | 60 | inserted by merge + 7 | 700 | initial + 7 | 70 | inserted by merge + 8 | 80 | inserted by merge + 9 | 900 | initial + 9 | 90 | inserted by merge + 10 | 100 | inserted by merge + 11 | 110 | inserted by merge + 11 | 1100 | initial + 12 | 120 | inserted by merge + 13 | 1300 | initial + 13 | 130 | inserted by merge + 14 | 140 | inserted by merge +(18 rows) + +ROLLBACK; +-- try updating the partition key column +BEGIN; +MERGE INTO pa_target t + USING pa_source s + ON t.tid = s.sid + WHEN MATCHED THEN + UPDATE SET tid = tid + 1, balance = balance + delta, val = val || ' updated by merge' + WHEN NOT MATCHED THEN + INSERT VALUES (sid, delta, 'inserted by merge'); +SELECT * FROM pa_target ORDER BY tid; + tid | balance | val +--------------------------------------------------------------------- + 2 | 110 | initial updated by merge + 2 | 20 | inserted by merge + 4 | 40 | inserted by merge + 4 | 330 | initial updated by merge + 6 | 550 | initial updated by merge + 6 | 60 | inserted by merge + 8 | 80 | inserted by merge + 8 | 770 | initial updated by merge + 10 | 990 | initial updated by merge + 10 | 100 | inserted by merge + 12 | 1210 | initial updated by merge + 12 | 120 | inserted by merge + 14 | 1430 | initial updated by merge + 14 | 140 | inserted by merge +(14 rows) + +ROLLBACK; +DROP TABLE pa_source; +DROP TABLE pa_target CASCADE; +-- Sub-partitioning +SET citus.use_citus_managed_tables to false; +CREATE TABLE pa_target (logts timestamp, tid integer, balance float, val text) + PARTITION BY RANGE (logts); +CREATE TABLE part_m01 PARTITION OF pa_target + FOR VALUES FROM ('2017-01-01') TO ('2017-02-01') + PARTITION BY LIST (tid); +SET citus.use_citus_managed_tables to true; +CREATE TABLE part_m01_odd PARTITION OF part_m01 + FOR VALUES IN (1,3,5,7,9) WITH (autovacuum_enabled=off); +CREATE TABLE part_m01_even PARTITION OF part_m01 + FOR VALUES IN (2,4,6,8) WITH (autovacuum_enabled=off); +CREATE TABLE part_m02 PARTITION OF pa_target + FOR VALUES FROM ('2017-02-01') TO ('2017-03-01') + PARTITION BY LIST (tid); +CREATE TABLE part_m02_odd PARTITION OF part_m02 + FOR VALUES IN (1,3,5,7,9) WITH (autovacuum_enabled=off); +CREATE TABLE part_m02_even PARTITION OF part_m02 + FOR VALUES IN (2,4,6,8) WITH (autovacuum_enabled=off); +CREATE TABLE pa_source (sid integer, delta float) + WITH (autovacuum_enabled=off); +-- insert many rows to the source table +INSERT INTO pa_source SELECT id, id * 10 FROM generate_series(1,14) AS id; +-- insert a few rows in the target table (odd numbered tid) +INSERT INTO pa_target SELECT '2017-01-31', id, id * 100, 'initial' FROM generate_series(1,9,3) AS id; +INSERT INTO pa_target SELECT '2017-02-28', id, id * 100, 'initial' FROM generate_series(2,9,3) AS id; +-- try simple MERGE +SET client_min_messages TO DEBUG1; +BEGIN; +MERGE INTO pa_target t + USING (SELECT * FROM pa_source WHERE sid < 10) s + --USING (SELECT '2017-01-15' AS slogts, * FROM pa_source WHERE sid < 10) s + ON t.tid = s.sid + WHEN MATCHED THEN + UPDATE SET balance = balance + delta, val = val || ' updated by merge' + WHEN NOT MATCHED THEN + INSERT VALUES ('2017-01-15', sid, delta, 'inserted by merge'); +DEBUG: Creating MERGE router plan +DEBUG: + --INSERT VALUES (slogts::timestamp, sid, delta, 'inserted by merge'); +SELECT * FROM pa_target ORDER BY tid; + logts | tid | balance | val +--------------------------------------------------------------------- + Tue Jan 31 00:00:00 2017 | 1 | 110 | initial updated by merge + Tue Feb 28 00:00:00 2017 | 2 | 220 | initial updated by merge + Sun Jan 15 00:00:00 2017 | 3 | 30 | inserted by merge + Tue Jan 31 00:00:00 2017 | 4 | 440 | initial updated by merge + Tue Feb 28 00:00:00 2017 | 5 | 550 | initial updated by merge + Sun Jan 15 00:00:00 2017 | 6 | 60 | inserted by merge + Tue Jan 31 00:00:00 2017 | 7 | 770 | initial updated by merge + Tue Feb 28 00:00:00 2017 | 8 | 880 | initial updated by merge + Sun Jan 15 00:00:00 2017 | 9 | 90 | inserted by merge +(9 rows) + +ROLLBACK; +RESET client_min_messages; +DROP TABLE pa_source; +DROP TABLE pa_target CASCADE; +-- some complex joins on the source side +CREATE TABLE cj_target (tid integer, balance float, val text) + WITH (autovacuum_enabled=off); +CREATE TABLE cj_source1 (sid1 integer, scat integer, delta integer) + WITH (autovacuum_enabled=off); +CREATE TABLE cj_source2 (sid2 integer, sval text) + WITH (autovacuum_enabled=off); +INSERT INTO cj_source1 VALUES (1, 10, 100); +INSERT INTO cj_source1 VALUES (1, 20, 200); +INSERT INTO cj_source1 VALUES (2, 20, 300); +INSERT INTO cj_source1 VALUES (3, 10, 400); +INSERT INTO cj_source2 VALUES (1, 'initial source2'); +INSERT INTO cj_source2 VALUES (2, 'initial source2'); +INSERT INTO cj_source2 VALUES (3, 'initial source2'); +-- source relation is an unaliased join +MERGE INTO cj_target t +USING cj_source1 s1 + INNER JOIN cj_source2 s2 ON sid1 = sid2 +ON t.tid = sid1 +WHEN NOT MATCHED THEN + INSERT VALUES (sid1, delta, sval); +-- try accessing columns from either side of the source join +MERGE INTO cj_target t +USING cj_source2 s2 + INNER JOIN cj_source1 s1 ON sid1 = sid2 AND scat = 20 +ON t.tid = sid1 +WHEN NOT MATCHED THEN + INSERT VALUES (sid2, delta, sval) +WHEN MATCHED THEN + DELETE; +-- some simple expressions in INSERT targetlist +MERGE INTO cj_target t +USING cj_source2 s2 + INNER JOIN cj_source1 s1 ON sid1 = sid2 +ON t.tid = sid1 +WHEN NOT MATCHED THEN + INSERT VALUES (sid2, delta + scat, sval) +WHEN MATCHED THEN + UPDATE SET val = val || ' updated by merge'; +MERGE INTO cj_target t +USING cj_source2 s2 + INNER JOIN cj_source1 s1 ON sid1 = sid2 AND scat = 20 +ON t.tid = sid1 +WHEN MATCHED THEN + UPDATE SET val = val || ' ' || delta::text; +SELECT * FROM cj_target; + tid | balance | val +--------------------------------------------------------------------- + 3 | 400 | initial source2 updated by merge + 1 | 220 | initial source2 200 + 1 | 110 | initial source2 200 + 2 | 320 | initial source2 300 +(4 rows) + +ALTER TABLE cj_source1 RENAME COLUMN sid1 TO sid; +ALTER TABLE cj_source2 RENAME COLUMN sid2 TO sid; +TRUNCATE cj_target; +MERGE INTO cj_target t +USING cj_source1 s1 + INNER JOIN cj_source2 s2 ON s1.sid = s2.sid +ON t.tid = s1.sid +WHEN NOT MATCHED THEN + INSERT VALUES (s2.sid, delta, sval); +DROP TABLE cj_source2, cj_source1, cj_target; +-- Function scans +CREATE TABLE fs_target (a int, b int, c text) + WITH (autovacuum_enabled=off); +MERGE INTO fs_target t +USING generate_series(1,100,1) AS id +ON t.a = id +WHEN MATCHED THEN + UPDATE SET b = b + id +WHEN NOT MATCHED THEN + INSERT VALUES (id, -1); +MERGE INTO fs_target t +USING generate_series(1,100,2) AS id +ON t.a = id +WHEN MATCHED THEN + UPDATE SET b = b + id, c = 'updated '|| id.*::text +WHEN NOT MATCHED THEN + INSERT VALUES (id, -1, 'inserted ' || id.*::text); +SELECT count(*) FROM fs_target; + count +--------------------------------------------------------------------- + 100 +(1 row) + +DROP TABLE fs_target; +-- SERIALIZABLE test +-- handled in isolation tests +-- Inheritance-based partitioning +SET citus.use_citus_managed_tables to false; +CREATE TABLE measurement ( + city_id int not null, + logdate date not null, + peaktemp int, + unitsales int +) WITH (autovacuum_enabled=off); +CREATE TABLE measurement_y2006m02 ( + CHECK ( logdate >= DATE '2006-02-01' AND logdate < DATE '2006-03-01' ) +) INHERITS (measurement) WITH (autovacuum_enabled=off); +CREATE TABLE measurement_y2006m03 ( + CHECK ( logdate >= DATE '2006-03-01' AND logdate < DATE '2006-04-01' ) +) INHERITS (measurement) WITH (autovacuum_enabled=off); +CREATE TABLE measurement_y2007m01 ( + filler text, + peaktemp int, + logdate date not null, + city_id int not null, + unitsales int + CHECK ( logdate >= DATE '2007-01-01' AND logdate < DATE '2007-02-01') +) WITH (autovacuum_enabled=off); +ALTER TABLE measurement_y2007m01 DROP COLUMN filler; +ALTER TABLE measurement_y2007m01 INHERIT measurement; +SET citus.use_citus_managed_tables to true; +CREATE OR REPLACE FUNCTION measurement_insert_trigger() +RETURNS TRIGGER AS $$ +BEGIN + IF ( NEW.logdate >= DATE '2006-02-01' AND + NEW.logdate < DATE '2006-03-01' ) THEN + INSERT INTO measurement_y2006m02 VALUES (NEW.*); + ELSIF ( NEW.logdate >= DATE '2006-03-01' AND + NEW.logdate < DATE '2006-04-01' ) THEN + INSERT INTO measurement_y2006m03 VALUES (NEW.*); + ELSIF ( NEW.logdate >= DATE '2007-01-01' AND + NEW.logdate < DATE '2007-02-01' ) THEN + INSERT INTO measurement_y2007m01 (city_id, logdate, peaktemp, unitsales) + VALUES (NEW.*); + ELSE + RAISE EXCEPTION 'Date out of range. Fix the measurement_insert_trigger() function!'; + END IF; + RETURN NULL; +END; +$$ LANGUAGE plpgsql ; +CREATE TRIGGER insert_measurement_trigger + BEFORE INSERT ON measurement + FOR EACH ROW EXECUTE PROCEDURE measurement_insert_trigger(); +INSERT INTO measurement VALUES (1, '2006-02-10', 35, 10); +INSERT INTO measurement VALUES (1, '2006-02-16', 45, 20); +INSERT INTO measurement VALUES (1, '2006-03-17', 25, 10); +INSERT INTO measurement VALUES (1, '2006-03-27', 15, 40); +INSERT INTO measurement VALUES (1, '2007-01-15', 10, 10); +INSERT INTO measurement VALUES (1, '2007-01-17', 10, 10); +SELECT tableoid::regclass, * FROM measurement ORDER BY city_id, logdate; + tableoid | city_id | logdate | peaktemp | unitsales +--------------------------------------------------------------------- + measurement_y2006m02 | 1 | 02-10-2006 | 35 | 10 + measurement_y2006m02 | 1 | 02-16-2006 | 45 | 20 + measurement_y2006m03 | 1 | 03-17-2006 | 25 | 10 + measurement_y2006m03 | 1 | 03-27-2006 | 15 | 40 + measurement_y2007m01 | 1 | 01-15-2007 | 10 | 10 + measurement_y2007m01 | 1 | 01-17-2007 | 10 | 10 +(6 rows) + +CREATE TABLE new_measurement (LIKE measurement) WITH (autovacuum_enabled=off); +INSERT INTO new_measurement VALUES (1, '2006-03-01', 20, 10); +INSERT INTO new_measurement VALUES (1, '2006-02-16', 50, 10); +INSERT INTO new_measurement VALUES (2, '2006-02-10', 20, 20); +INSERT INTO new_measurement VALUES (1, '2006-03-27', NULL, NULL); +INSERT INTO new_measurement VALUES (1, '2007-01-17', NULL, NULL); +INSERT INTO new_measurement VALUES (1, '2007-01-15', 5, NULL); +INSERT INTO new_measurement VALUES (1, '2007-01-16', 10, 10); +SET client_min_messages TO DEBUG1; +MERGE into measurement m + USING new_measurement nm ON + (m.city_id = nm.city_id and m.logdate=nm.logdate) +WHEN MATCHED AND nm.peaktemp IS NULL THEN DELETE +WHEN MATCHED THEN UPDATE + SET peaktemp = greatest(m.peaktemp, nm.peaktemp), + unitsales = m.unitsales + coalesce(nm.unitsales, 0) +WHEN NOT MATCHED THEN INSERT + (city_id, logdate, peaktemp, unitsales) + VALUES (city_id, logdate, peaktemp, unitsales); +DEBUG: Creating MERGE router plan +DEBUG: +RESET client_min_messages; +SELECT tableoid::regclass, * FROM measurement ORDER BY city_id, logdate; + tableoid | city_id | logdate | peaktemp | unitsales +--------------------------------------------------------------------- + measurement_y2006m02 | 1 | 02-10-2006 | 35 | 10 + measurement_y2006m02 | 1 | 02-16-2006 | 50 | 30 + measurement_y2006m03 | 1 | 03-01-2006 | 20 | 10 + measurement_y2006m03 | 1 | 03-17-2006 | 25 | 10 + measurement_y2007m01 | 1 | 01-15-2007 | 10 | 10 + measurement_y2007m01 | 1 | 01-16-2007 | 10 | 10 + measurement_y2006m02 | 2 | 02-10-2006 | 20 | 20 +(7 rows) + +DROP TABLE measurement, new_measurement CASCADE; +NOTICE: drop cascades to 3 other objects +DETAIL: drop cascades to table measurement_y2006m02 +drop cascades to table measurement_y2006m03 +drop cascades to table measurement_y2007m01 +DROP FUNCTION measurement_insert_trigger(); +-- prepare +RESET SESSION AUTHORIZATION; +SET citus.use_citus_managed_tables to false; +REVOKE ALL ON SCHEMA pgmerge_schema FROM regress_merge_privs; +REVOKE ALL ON SCHEMA pgmerge_schema FROM regress_merge_no_privs; +DROP SCHEMA pgmerge_schema CASCADE; +NOTICE: drop cascades to 9 other objects +DETAIL: drop cascades to table target_4001000 +drop cascades to table target +drop cascades to table source_4001001 +drop cascades to table source +drop cascades to table target2_4001002 +drop cascades to table target2 +drop cascades to table source2_4001003 +drop cascades to table source2 +drop cascades to function merge_trigfunc() +DROP USER regress_merge_privs; +DROP USER regress_merge_no_privs; diff --git a/src/test/regress/sql/pgmerge.sql b/src/test/regress/sql/pgmerge.sql index e1f3c7aabc0..aaa51e8e9b6 100644 --- a/src/test/regress/sql/pgmerge.sql +++ b/src/test/regress/sql/pgmerge.sql @@ -126,10 +126,20 @@ COPY ( MERGE INTO target USING source ON (true) WHEN MATCHED THEN DELETE ) TO stdout; +-- used in a CTE with RETURNING +WITH foo AS ( + MERGE INTO target USING source ON (true) + WHEN MATCHED THEN DELETE RETURNING target.* +) SELECT * FROM foo; +-- used in COPY with RETURNING +COPY ( + MERGE INTO target USING source ON (true) + WHEN MATCHED THEN DELETE RETURNING target.* +) TO stdout; -- unsupported relation types -- view -CREATE VIEW tv AS SELECT * FROM target; +CREATE VIEW tv AS SELECT count(tid) AS tid FROM target; MERGE INTO tv t USING source s ON t.tid = s.sid