diff --git a/src/backend/distributed/planner/merge_planner.c b/src/backend/distributed/planner/merge_planner.c index 09d2d90acec..4d64b8f5625 100644 --- a/src/backend/distributed/planner/merge_planner.c +++ b/src/backend/distributed/planner/merge_planner.c @@ -182,6 +182,14 @@ CreateRouterMergePlan(Oid targetRelationId, Query *originalQuery, Query *query, return distributedPlan; } + Var *insertVar = + FetchAndValidateInsertVarIfExists(targetRelationId, originalQuery); + if (insertVar && + !IsDistributionColumnInMergeSource((Expr *) insertVar, originalQuery, true)) + { + ereport(ERROR, (errmsg("MERGE INSERT must use the source table " + "distribution column value"))); + } Job *job = RouterJob(originalQuery, plannerRestrictionContext, &distributedPlan->planningError); @@ -1116,27 +1124,6 @@ DeferErrorIfRoutableMergeNotSupported(Query *query, List *rangeTableList, "repartitioning"))); return deferredError; } - - - /* - * If execution has reached this point, it indicates that the query can be delegated to the worker. - * However, before proceeding with this delegation, we need to confirm that the user is utilizing - * the distribution column of the source table in the Insert variable. - * If this is not the case, we should refrain from pushing down the query. - * This is just a deffered error which will be handle by caller. - */ - - Var *insertVar = - FetchAndValidateInsertVarIfExists(targetRelationId, query); - if (insertVar && - !IsDistributionColumnInMergeSource((Expr *) insertVar, query, true)) - { - ereport(DEBUG1, (errmsg( - "MERGE INSERT must use the source table distribution column value for push down to workers. Otherwise, repartitioning will be applied"))); - return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, - "MERGE INSERT must use the source table distribution column value for push down to workers. Otherwise, repartitioning will be applied", - NULL, NULL); - } return NULL; } diff --git a/src/test/regress/expected/merge.out b/src/test/regress/expected/merge.out index 5056ba5432e..a73467e81a8 100644 --- a/src/test/regress/expected/merge.out +++ b/src/test/regress/expected/merge.out @@ -1128,7 +1128,7 @@ DO NOTHING WHEN NOT MATCHED THEN INSERT VALUES(rs_source.id); DEBUG: Creating MERGE router plan -DEBUG: +DEBUG: RESET client_min_messages; SELECT * INTO rs_local FROM rs_target ORDER BY 1 ; -- Should be equal @@ -1259,7 +1259,7 @@ DO NOTHING WHEN NOT MATCHED THEN INSERT VALUES(fn_source.id, fn_source.source); DEBUG: Creating MERGE router plan -DEBUG: +DEBUG: RESET client_min_messages; SELECT * INTO fn_local FROM fn_target ORDER BY 1 ; -- Should be equal @@ -1552,7 +1552,7 @@ BEGIN; SET citus.log_remote_commands to true; SET client_min_messages TO DEBUG1; EXECUTE merge_prepare(2); -DEBUG: +DEBUG: DEBUG: Creating MERGE router plan NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_distributed_transaction_id(xx, xx, 'xxxxxxx'); DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx @@ -1782,13 +1782,13 @@ NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_ DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_distributed_transaction_id(xx, xx, 'xxxxxxx'); DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx t USING merge_schema.citus_source_xxxxxxx s ON (t.id OPERATOR(pg_catalog.=) s.id) WHEN MATCHED THEN DO NOTHING WHEN NOT MATCHED AND (s.id OPERATOR(pg_catalog.<) 100) THEN INSERT (id, val) VALUES (s.id, s.val) +NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx t USING merge_schema.citus_source_xxxxxxx s ON (t.id OPERATOR(pg_catalog.=) s.id) WHEN MATCHED THEN DO NOTHING WHEN NOT MATCHED AND (s.id OPERATOR(pg_catalog.<) 100) THEN INSERT (id, val) VALUES (s.id, s.val) DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx t USING merge_schema.citus_source_xxxxxxx s ON (t.id OPERATOR(pg_catalog.=) s.id) WHEN MATCHED THEN DO NOTHING WHEN NOT MATCHED AND (s.id OPERATOR(pg_catalog.<) 100) THEN INSERT (id, val) VALUES (s.id, s.val) +NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx t USING merge_schema.citus_source_xxxxxxx s ON (t.id OPERATOR(pg_catalog.=) s.id) WHEN MATCHED THEN DO NOTHING WHEN NOT MATCHED AND (s.id OPERATOR(pg_catalog.<) 100) THEN INSERT (id, val) VALUES (s.id, s.val) DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx t USING merge_schema.citus_source_xxxxxxx s ON (t.id OPERATOR(pg_catalog.=) s.id) WHEN MATCHED THEN DO NOTHING WHEN NOT MATCHED AND (s.id OPERATOR(pg_catalog.<) 100) THEN INSERT (id, val) VALUES (s.id, s.val) +NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx t USING merge_schema.citus_source_xxxxxxx s ON (t.id OPERATOR(pg_catalog.=) s.id) WHEN MATCHED THEN DO NOTHING WHEN NOT MATCHED AND (s.id OPERATOR(pg_catalog.<) 100) THEN INSERT (id, val) VALUES (s.id, s.val) DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx t USING merge_schema.citus_source_xxxxxxx s ON (t.id OPERATOR(pg_catalog.=) s.id) WHEN MATCHED THEN DO NOTHING WHEN NOT MATCHED AND (s.id OPERATOR(pg_catalog.<) 100) THEN INSERT (id, val) VALUES (s.id, s.val) +NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx t USING merge_schema.citus_source_xxxxxxx s ON (t.id OPERATOR(pg_catalog.=) s.id) WHEN MATCHED THEN DO NOTHING WHEN NOT MATCHED AND (s.id OPERATOR(pg_catalog.<) 100) THEN INSERT (id, val) VALUES (s.id, s.val) DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx SET citus.log_remote_commands to false; SELECT compare_tables(); @@ -1842,297 +1842,6 @@ SELECT compare_tables(); (1 row) ROLLBACK; --- let's create source and target table -ALTER SEQUENCE pg_catalog.pg_dist_colocationid_seq RESTART 13000; -CREATE TABLE source_pushdowntest (id integer); -CREATE TABLE target_pushdowntest (id integer ); --- let's distribute both table on id field -SELECT create_distributed_table('source_pushdowntest', 'id'); - create_distributed_table ---------------------------------------------------------------------- - -(1 row) - -SELECT create_distributed_table('target_pushdowntest', 'id'); - create_distributed_table ---------------------------------------------------------------------- - -(1 row) - --- we are doing this operation on single node setup let's figure out colocation id of both tables --- both has same colocation id so both are colocated. -WITH colocations AS ( - SELECT colocationid - FROM pg_dist_partition - WHERE logicalrelid = 'source_pushdowntest'::regclass - OR logicalrelid = 'target_pushdowntest'::regclass -) -SELECT - CASE - WHEN COUNT(DISTINCT colocationid) = 1 THEN 'Same' - ELSE 'Different' - END AS colocation_status -FROM colocations; - colocation_status ---------------------------------------------------------------------- - Same -(1 row) - -SET client_min_messages TO DEBUG1; --- Test 1 : tables are colocated AND query is multisharded AND Join On distributed column : should push down to workers. -EXPLAIN (costs off, timing off, summary off) -MERGE INTO target_pushdowntest t -USING source_pushdowntest s -ON t.id = s.id -WHEN NOT MATCHED THEN - INSERT (id) - VALUES (s.id); -DEBUG: -DEBUG: -DEBUG: -DEBUG: -DEBUG: Creating MERGE router plan - QUERY PLAN ---------------------------------------------------------------------- - Custom Scan (Citus Adaptive) - Task Count: 4 - Tasks Shown: All - -> Task - Node: host=localhost port=xxxxx dbname=regression - -> Merge on target_pushdowntest_4000068 t - -> Merge Left Join - Merge Cond: (s.id = t.id) - -> Sort - Sort Key: s.id - -> Seq Scan on source_pushdowntest_4000064 s - -> Sort - Sort Key: t.id - -> Seq Scan on target_pushdowntest_4000068 t - -> Task - Node: host=localhost port=xxxxx dbname=regression - -> Merge on target_pushdowntest_4000069 t - -> Merge Left Join - Merge Cond: (s.id = t.id) - -> Sort - Sort Key: s.id - -> Seq Scan on source_pushdowntest_4000065 s - -> Sort - Sort Key: t.id - -> Seq Scan on target_pushdowntest_4000069 t - -> Task - Node: host=localhost port=xxxxx dbname=regression - -> Merge on target_pushdowntest_4000070 t - -> Merge Left Join - Merge Cond: (s.id = t.id) - -> Sort - Sort Key: s.id - -> Seq Scan on source_pushdowntest_4000066 s - -> Sort - Sort Key: t.id - -> Seq Scan on target_pushdowntest_4000070 t - -> Task - Node: host=localhost port=xxxxx dbname=regression - -> Merge on target_pushdowntest_4000071 t - -> Merge Left Join - Merge Cond: (s.id = t.id) - -> Sort - Sort Key: s.id - -> Seq Scan on source_pushdowntest_4000067 s - -> Sort - Sort Key: t.id - -> Seq Scan on target_pushdowntest_4000071 t -(47 rows) - --- Test 2 : tables are colocated AND source query is not multisharded : should push down to worker. --- DEBUG LOGS show that query is getting pushed down -MERGE INTO target_pushdowntest t -USING (SELECT * from source_pushdowntest where id = 1) s -on t.id = s.id -WHEN NOT MATCHED THEN - INSERT (id) - VALUES (s.id); -DEBUG: -DEBUG: Creating MERGE router plan --- Test 3 : tables are colocated source query is single sharded but not using source distributed column in insertion. let's not pushdown. -INSERT INTO source_pushdowntest (id) VALUES (3); -EXPLAIN (costs off, timing off, summary off) -MERGE INTO target_pushdowntest t -USING (SELECT 1 as somekey, id from source_pushdowntest where id = 1) s -on t.id = s.somekey -WHEN NOT MATCHED THEN - INSERT (id) - VALUES (s.somekey); -DEBUG: MERGE INSERT must use the source table distribution column value for push down to workers. Otherwise, repartitioning will be applied -DEBUG: MERGE INSERT must use the source table distribution column value for push down to workers. Otherwise, repartitioning will be applied -DEBUG: Creating MERGE repartition plan -DEBUG: Using column - index:0 from the source list to redistribute - QUERY PLAN ---------------------------------------------------------------------- - Custom Scan (Citus MERGE INTO ...) - MERGE INTO target_pushdowntest method: pull to coordinator - -> Custom Scan (Citus Adaptive) - Task Count: 1 - Tasks Shown: All - -> Task - Node: host=localhost port=xxxxx dbname=regression - -> Seq Scan on source_pushdowntest_4000064 source_pushdowntest - Filter: (id = 1) -(9 rows) - --- let's verify if we use some other column from source for value of distributed column in target. --- it should be inserted to correct shard of target. -CREATE TABLE source_withdata (id integer, some_number integer); -CREATE TABLE target_table (id integer, name text); -SELECT create_distributed_table('source_withdata', 'id'); - create_distributed_table ---------------------------------------------------------------------- - -(1 row) - -SELECT create_distributed_table('target_table', 'id'); - create_distributed_table ---------------------------------------------------------------------- - -(1 row) - -INSERT INTO source_withdata (id, some_number) VALUES (1, 3); --- we will use some_number column from source_withdata to insert into distributed column of target. --- value of some_number is 3 let's verify what shard it should go to. -select worker_hash(3); - worker_hash ---------------------------------------------------------------------- - -28094569 -(1 row) - --- it should go to second shard of target as target has 4 shard and hash "-28094569" comes in range of second shard. -MERGE INTO target_table t -USING (SELECT id, some_number from source_withdata where id = 1) s -on t.id = s.some_number -WHEN NOT MATCHED THEN - INSERT (id, name) - VALUES (s.some_number, 'parag'); -DEBUG: Sub-query is not pushable, try repartitioning -DEBUG: MERGE command is only supported when all distributed tables are co-located and joined on their distribution columns -DEBUG: Creating MERGE repartition plan -DEBUG: Using column - index:1 from the source list to redistribute -DEBUG: Collect source query results on coordinator -DEBUG: Create a MERGE task list that needs to be routed -DEBUG: -DEBUG: -DEBUG: -DEBUG: -DEBUG: Execute MERGE task list --- let's verify if data inserted to second shard of target. -EXPLAIN (analyze on, costs off, timing off, summary off) SELECT * FROM target_table; - QUERY PLAN ---------------------------------------------------------------------- - Custom Scan (Citus Adaptive) (actual rows=1 loops=1) - Task Count: 4 - Tuple data received from nodes: 9 bytes - Tasks Shown: All - -> Task - Tuple data received from node: 0 bytes - Node: host=localhost port=xxxxx dbname=regression - -> Seq Scan on target_table_4000076 target_table (actual rows=0 loops=1) - -> Task - Tuple data received from node: 9 bytes - Node: host=localhost port=xxxxx dbname=regression - -> Seq Scan on target_table_4000077 target_table (actual rows=1 loops=1) - -> Task - Tuple data received from node: 0 bytes - Node: host=localhost port=xxxxx dbname=regression - -> Seq Scan on target_table_4000078 target_table (actual rows=0 loops=1) - -> Task - Tuple data received from node: 0 bytes - Node: host=localhost port=xxxxx dbname=regression - -> Seq Scan on target_table_4000079 target_table (actual rows=0 loops=1) -(20 rows) - --- let's verify target data too. -SELECT * FROM target_table; - id | name ---------------------------------------------------------------------- - 3 | parag -(1 row) - --- test UPDATE : when source is single sharded and table are colocated -MERGE INTO target_table t -USING (SELECT id, some_number from source_withdata where id = 1) s -on t.id = s.some_number -WHEN MATCHED THEN - UPDATE SET name = 'parag jain'; -DEBUG: Sub-query is not pushable, try repartitioning -DEBUG: MERGE command is only supported when all distributed tables are co-located and joined on their distribution columns -DEBUG: Creating MERGE repartition plan -DEBUG: Using column - index:1 from the source list to redistribute -DEBUG: Collect source query results on coordinator -DEBUG: Create a MERGE task list that needs to be routed -DEBUG: -DEBUG: -DEBUG: -DEBUG: -DEBUG: Execute MERGE task list --- let's verify if data updated properly. -SELECT * FROM target_table; - id | name ---------------------------------------------------------------------- - 3 | parag jain -(1 row) - --- let's see what happend when we try to update distributed key of target table -MERGE INTO target_table t -USING (SELECT id, some_number from source_withdata where id = 1) s -on t.id = s.some_number -WHEN MATCHED THEN - UPDATE SET id = 1500; -ERROR: updating the distribution column is not allowed in MERGE actions -SELECT * FROM target_table; - id | name ---------------------------------------------------------------------- - 3 | parag jain -(1 row) - --- test DELETE : when source is single sharded and table are colocated -MERGE INTO target_table t -USING (SELECT id, some_number from source_withdata where id = 1) s -on t.id = s.some_number -WHEN MATCHED THEN - DELETE; -DEBUG: Sub-query is not pushable, try repartitioning -DEBUG: MERGE command is only supported when all distributed tables are co-located and joined on their distribution columns -DEBUG: Creating MERGE repartition plan -DEBUG: Using column - index:1 from the source list to redistribute -DEBUG: Collect source query results on coordinator -DEBUG: Create a MERGE task list that needs to be routed -DEBUG: -DEBUG: -DEBUG: -DEBUG: -DEBUG: Execute MERGE task list --- let's verify if data deleted properly. -SELECT * FROM target_table; - id | name ---------------------------------------------------------------------- -(0 rows) - --- -DELETE FROM source_withdata; -DELETE FROM target_table; -INSERT INTO source VALUES (1,1); -merge into target_table sda -using source_withdata sdn -on sda.id = sdn.id AND sda.id = 1 -when not matched then - insert (id) - values (10000); -ERROR: MERGE INSERT is using unsupported expression type for distribution column -DETAIL: Inserting arbitrary values that don't correspond to the joined column values can lead to unpredictable outcomes where rows are incorrectly distributed among different shards -SELECT * FROM target_table WHERE id = 10000; - id | name ---------------------------------------------------------------------- -(0 rows) - -RESET client_min_messages; -- This will prune shards with restriction information as NOT MATCHED is void BEGIN; SET citus.log_remote_commands to true; @@ -3189,14 +2898,14 @@ WHEN NOT MATCHED THEN -> Limit -> Sort Sort Key: id2 - -> Seq Scan on demo_source_table_4000151 demo_source_table + -> Seq Scan on demo_source_table_4000135 demo_source_table -> Distributed Subplan XXX_2 -> Custom Scan (Citus Adaptive) Task Count: 4 Tasks Shown: One of 4 -> Task Node: host=localhost port=xxxxx dbname=regression - -> Seq Scan on demo_source_table_4000151 demo_source_table + -> Seq Scan on demo_source_table_4000135 demo_source_table Task Count: 1 Tasks Shown: All -> Task @@ -3410,10 +3119,10 @@ DEBUG: Creating MERGE repartition plan DEBUG: Using column - index:0 from the source list to redistribute DEBUG: Collect source query results on coordinator DEBUG: Create a MERGE task list that needs to be routed -DEBUG: -DEBUG: -DEBUG: -DEBUG: +DEBUG: +DEBUG: +DEBUG: +DEBUG: DEBUG: Execute MERGE task list RESET client_min_messages; SELECT * FROM target_6785 ORDER BY 1; @@ -3531,7 +3240,7 @@ USING s1 s ON t.id = s.id WHEN NOT MATCHED THEN INSERT (id) VALUES(s.val); -ERROR: MERGE INSERT must use the source's joining column for target's distribution column +ERROR: MERGE INSERT must use the source table distribution column value MERGE INTO t1 t USING s1 s ON t.id = s.id @@ -4257,7 +3966,7 @@ CONTEXT: SQL statement "SELECT citus_drop_all_shards(v_obj.objid, v_obj.schema_ PL/pgSQL function citus_drop_trigger() line XX at PERFORM DROP FUNCTION merge_when_and_write(); DROP SCHEMA merge_schema CASCADE; -NOTICE: drop cascades to 107 other objects +NOTICE: drop cascades to 103 other objects DETAIL: drop cascades to function insert_data() drop cascades to table local_local drop cascades to table target @@ -4317,10 +4026,6 @@ drop cascades to table pg_source drop cascades to table citus_target drop cascades to table citus_source drop cascades to function compare_tables() -drop cascades to table source_pushdowntest -drop cascades to table target_pushdowntest -drop cascades to table source_withdata -drop cascades to table target_table drop cascades to view pg_source_view drop cascades to view citus_source_view drop cascades to table pg_pa_target @@ -4337,7 +4042,7 @@ drop cascades to table target_set drop cascades to table source_set drop cascades to table refsource_ref drop cascades to table pg_result -drop cascades to table refsource_ref_4000128 +drop cascades to table refsource_ref_4000112 drop cascades to table pg_ref drop cascades to table local_ref drop cascades to table reftarget_local @@ -4355,7 +4060,11 @@ drop cascades to table source_6785 drop cascades to table target_6785 drop cascades to function add_s(integer,integer) drop cascades to table pg -drop cascades to table t1_4000190 -drop cascades to table s1_4000191 +drop cascades to table t1_4000174 +drop cascades to table s1_4000175 drop cascades to table t1 -and 7 other objects (see server log for list) +drop cascades to table s1 +drop cascades to table dist_target +drop cascades to table dist_source +drop cascades to view show_tables +and 3 other objects (see server log for list) diff --git a/src/test/regress/multi_schedule b/src/test/regress/multi_schedule index 7f0c7ca57f6..fca36f5ab06 100644 --- a/src/test/regress/multi_schedule +++ b/src/test/regress/multi_schedule @@ -116,8 +116,7 @@ test: function_with_case_when test: clock # MERGE tests -test: merge pgmerge -test: merge_repartition2 +test: merge pgmerge merge_repartition2 test: merge_repartition1 merge_schema_sharding test: merge_partition_tables diff --git a/src/test/regress/sql/merge.sql b/src/test/regress/sql/merge.sql index 5316b5233ae..a41e8084145 100644 --- a/src/test/regress/sql/merge.sql +++ b/src/test/regress/sql/merge.sql @@ -1206,139 +1206,6 @@ SET citus.log_remote_commands to false; SELECT compare_tables(); ROLLBACK; - --- let's create source and target table -ALTER SEQUENCE pg_catalog.pg_dist_colocationid_seq RESTART 13000; -CREATE TABLE source_pushdowntest (id integer); -CREATE TABLE target_pushdowntest (id integer ); - --- let's distribute both table on id field -SELECT create_distributed_table('source_pushdowntest', 'id'); -SELECT create_distributed_table('target_pushdowntest', 'id'); - --- we are doing this operation on single node setup let's figure out colocation id of both tables --- both has same colocation id so both are colocated. -WITH colocations AS ( - SELECT colocationid - FROM pg_dist_partition - WHERE logicalrelid = 'source_pushdowntest'::regclass - OR logicalrelid = 'target_pushdowntest'::regclass -) -SELECT - CASE - WHEN COUNT(DISTINCT colocationid) = 1 THEN 'Same' - ELSE 'Different' - END AS colocation_status -FROM colocations; - -SET client_min_messages TO DEBUG1; --- Test 1 : tables are colocated AND query is multisharded AND Join On distributed column : should push down to workers. - -EXPLAIN (costs off, timing off, summary off) -MERGE INTO target_pushdowntest t -USING source_pushdowntest s -ON t.id = s.id -WHEN NOT MATCHED THEN - INSERT (id) - VALUES (s.id); - --- Test 2 : tables are colocated AND source query is not multisharded : should push down to worker. --- DEBUG LOGS show that query is getting pushed down -MERGE INTO target_pushdowntest t -USING (SELECT * from source_pushdowntest where id = 1) s -on t.id = s.id -WHEN NOT MATCHED THEN - INSERT (id) - VALUES (s.id); - - --- Test 3 : tables are colocated source query is single sharded but not using source distributed column in insertion. let's not pushdown. -INSERT INTO source_pushdowntest (id) VALUES (3); - -EXPLAIN (costs off, timing off, summary off) -MERGE INTO target_pushdowntest t -USING (SELECT 1 as somekey, id from source_pushdowntest where id = 1) s -on t.id = s.somekey -WHEN NOT MATCHED THEN - INSERT (id) - VALUES (s.somekey); - - --- let's verify if we use some other column from source for value of distributed column in target. --- it should be inserted to correct shard of target. -CREATE TABLE source_withdata (id integer, some_number integer); -CREATE TABLE target_table (id integer, name text); -SELECT create_distributed_table('source_withdata', 'id'); -SELECT create_distributed_table('target_table', 'id'); - -INSERT INTO source_withdata (id, some_number) VALUES (1, 3); - --- we will use some_number column from source_withdata to insert into distributed column of target. --- value of some_number is 3 let's verify what shard it should go to. -select worker_hash(3); - --- it should go to second shard of target as target has 4 shard and hash "-28094569" comes in range of second shard. -MERGE INTO target_table t -USING (SELECT id, some_number from source_withdata where id = 1) s -on t.id = s.some_number -WHEN NOT MATCHED THEN - INSERT (id, name) - VALUES (s.some_number, 'parag'); - --- let's verify if data inserted to second shard of target. -EXPLAIN (analyze on, costs off, timing off, summary off) SELECT * FROM target_table; - --- let's verify target data too. -SELECT * FROM target_table; - - --- test UPDATE : when source is single sharded and table are colocated -MERGE INTO target_table t -USING (SELECT id, some_number from source_withdata where id = 1) s -on t.id = s.some_number -WHEN MATCHED THEN - UPDATE SET name = 'parag jain'; - --- let's verify if data updated properly. -SELECT * FROM target_table; - --- let's see what happend when we try to update distributed key of target table -MERGE INTO target_table t -USING (SELECT id, some_number from source_withdata where id = 1) s -on t.id = s.some_number -WHEN MATCHED THEN - UPDATE SET id = 1500; - -SELECT * FROM target_table; - --- test DELETE : when source is single sharded and table are colocated -MERGE INTO target_table t -USING (SELECT id, some_number from source_withdata where id = 1) s -on t.id = s.some_number -WHEN MATCHED THEN - DELETE; - --- let's verify if data deleted properly. -SELECT * FROM target_table; - --- -DELETE FROM source_withdata; -DELETE FROM target_table; -INSERT INTO source VALUES (1,1); - -merge into target_table sda -using source_withdata sdn -on sda.id = sdn.id AND sda.id = 1 -when not matched then - insert (id) - values (10000); - -SELECT * FROM target_table WHERE id = 10000; - -RESET client_min_messages; - - - -- This will prune shards with restriction information as NOT MATCHED is void BEGIN; SET citus.log_remote_commands to true;