Skip to content

Commit

Permalink
Merge branch 'main' into patch-2
Browse files Browse the repository at this point in the history
  • Loading branch information
fossygirl authored Jul 20, 2023
2 parents 4854fc5 + 87dc88f commit aa271ba
Show file tree
Hide file tree
Showing 7 changed files with 379 additions and 305 deletions.
6 changes: 3 additions & 3 deletions src/backend/distributed/planner/merge_planner.c
Original file line number Diff line number Diff line change
Expand Up @@ -1184,9 +1184,9 @@ SourceResultPartitionColumnIndex(Query *mergeQuery, List *sourceTargetList,
{
if (IsCitusTableType(targetRelation->relationId, SINGLE_SHARD_DISTRIBUTED))
{
ereport(ERROR, (errmsg("MERGE operation on non-colocated "
"distributed table(s) without a shard "
"key is not yet supported")));
ereport(ERROR, (errmsg("MERGE operation across distributed schemas "
"or with a row-based distributed table is "
"not yet supported")));
}

/* Get all the Join conditions from the ON clause */
Expand Down
189 changes: 0 additions & 189 deletions src/test/regress/expected/merge.out
Original file line number Diff line number Diff line change
Expand Up @@ -3816,195 +3816,6 @@ UPDATE SET val = dist_source.val
WHEN NOT MATCHED THEN
INSERT VALUES(dist_source.id, dist_source.val);
ERROR: For MERGE command, append/range distribution table is not supported yet
-- test merge with single-shard tables
CREATE SCHEMA query_single_shard_table;
SET search_path TO query_single_shard_table;
CREATE TABLE nullkey_c1_t1(a int, b int);
CREATE TABLE nullkey_c1_t2(a int, b int);
SELECT create_distributed_table('nullkey_c1_t1', null, colocate_with=>'none');
create_distributed_table
---------------------------------------------------------------------

(1 row)

SELECT create_distributed_table('nullkey_c1_t2', null, colocate_with=>'nullkey_c1_t1');
create_distributed_table
---------------------------------------------------------------------

(1 row)

CREATE TABLE nullkey_c2_t1(a int, b int);
CREATE TABLE nullkey_c2_t2(a int, b int);
SELECT create_distributed_table('nullkey_c2_t1', null, colocate_with=>'none');
create_distributed_table
---------------------------------------------------------------------

(1 row)

SELECT create_distributed_table('nullkey_c2_t2', null, colocate_with=>'nullkey_c2_t1', distribution_type=>null);
create_distributed_table
---------------------------------------------------------------------

(1 row)

CREATE TABLE reference_table(a int, b int);
CREATE TABLE distributed_table(a int, b int);
CREATE TABLE citus_local_table(a int, b int);
SELECT create_reference_table('reference_table');
create_reference_table
---------------------------------------------------------------------

(1 row)

SELECT create_distributed_table('distributed_table', 'a');
create_distributed_table
---------------------------------------------------------------------

(1 row)

SELECT citus_add_local_table_to_metadata('citus_local_table');
citus_add_local_table_to_metadata
---------------------------------------------------------------------

(1 row)

SET client_min_messages TO DEBUG2;
INSERT INTO reference_table SELECT i, i FROM generate_series(0, 5) i;
DEBUG: distributed INSERT ... SELECT can only select from distributed tables
DEBUG: Collecting INSERT ... SELECT results on coordinator
INSERT INTO distributed_table SELECT i, i FROM generate_series(3, 8) i;
DEBUG: distributed INSERT ... SELECT can only select from distributed tables
DEBUG: Collecting INSERT ... SELECT results on coordinator
INSERT INTO citus_local_table SELECT i, i FROM generate_series(0, 10) i;
DEBUG: distributed INSERT ... SELECT can only select from distributed tables
DEBUG: Collecting INSERT ... SELECT results on coordinator
CREATE TABLE postgres_local_table(a int, b int);
INSERT INTO postgres_local_table SELECT i, i FROM generate_series(5, 10) i;
-- with a colocated table
MERGE INTO nullkey_c1_t1 USING nullkey_c1_t2 ON (nullkey_c1_t1.a = nullkey_c1_t2.a)
WHEN MATCHED THEN UPDATE SET b = nullkey_c1_t2.b;
DEBUG: <Deparsed MERGE query: MERGE INTO query_single_shard_table.nullkey_c1_t1_4000184 nullkey_c1_t1 USING query_single_shard_table.nullkey_c1_t2_4000185 nullkey_c1_t2 ON (nullkey_c1_t1.a OPERATOR(pg_catalog.=) nullkey_c1_t2.a) WHEN MATCHED THEN UPDATE SET b = nullkey_c1_t2.b>
DEBUG: Creating MERGE router plan
MERGE INTO nullkey_c1_t1 USING nullkey_c1_t2 ON (nullkey_c1_t1.a = nullkey_c1_t2.a)
WHEN MATCHED THEN DELETE;
DEBUG: <Deparsed MERGE query: MERGE INTO query_single_shard_table.nullkey_c1_t1_4000184 nullkey_c1_t1 USING query_single_shard_table.nullkey_c1_t2_4000185 nullkey_c1_t2 ON (nullkey_c1_t1.a OPERATOR(pg_catalog.=) nullkey_c1_t2.a) WHEN MATCHED THEN DELETE>
DEBUG: Creating MERGE router plan
MERGE INTO nullkey_c1_t1 USING nullkey_c1_t2 ON (nullkey_c1_t1.a = nullkey_c1_t2.a)
WHEN MATCHED THEN UPDATE SET b = nullkey_c1_t2.b
WHEN NOT MATCHED THEN INSERT VALUES (nullkey_c1_t2.a, nullkey_c1_t2.b);
DEBUG: <Deparsed MERGE query: MERGE INTO query_single_shard_table.nullkey_c1_t1_4000184 nullkey_c1_t1 USING query_single_shard_table.nullkey_c1_t2_4000185 nullkey_c1_t2 ON (nullkey_c1_t1.a OPERATOR(pg_catalog.=) nullkey_c1_t2.a) WHEN MATCHED THEN UPDATE SET b = nullkey_c1_t2.b WHEN NOT MATCHED THEN INSERT (a, b) VALUES (nullkey_c1_t2.a, nullkey_c1_t2.b)>
DEBUG: Creating MERGE router plan
MERGE INTO nullkey_c1_t1 USING nullkey_c1_t2 ON (nullkey_c1_t1.a = nullkey_c1_t2.a)
WHEN MATCHED THEN DELETE
WHEN NOT MATCHED THEN INSERT VALUES (nullkey_c1_t2.a, nullkey_c1_t2.b);
DEBUG: <Deparsed MERGE query: MERGE INTO query_single_shard_table.nullkey_c1_t1_4000184 nullkey_c1_t1 USING query_single_shard_table.nullkey_c1_t2_4000185 nullkey_c1_t2 ON (nullkey_c1_t1.a OPERATOR(pg_catalog.=) nullkey_c1_t2.a) WHEN MATCHED THEN DELETE WHEN NOT MATCHED THEN INSERT (a, b) VALUES (nullkey_c1_t2.a, nullkey_c1_t2.b)>
DEBUG: Creating MERGE router plan
-- with non-colocated single-shard table
MERGE INTO nullkey_c1_t1 USING nullkey_c2_t1 ON (nullkey_c1_t1.a = nullkey_c2_t1.a)
WHEN MATCHED THEN UPDATE SET b = nullkey_c2_t1.b;
DEBUG: Distributed tables are not co-located, try repartitioning
DEBUG: For MERGE command, all the distributed tables must be colocated
DEBUG: Creating MERGE repartition plan
ERROR: MERGE operation on non-colocated distributed table(s) without a shard key is not yet supported
MERGE INTO nullkey_c1_t1 USING nullkey_c2_t1 ON (nullkey_c1_t1.a = nullkey_c2_t1.a)
WHEN MATCHED THEN UPDATE SET b = nullkey_c2_t1.b
WHEN NOT MATCHED THEN INSERT VALUES (nullkey_c2_t1.a, nullkey_c2_t1.b);
DEBUG: Distributed tables are not co-located, try repartitioning
DEBUG: For MERGE command, all the distributed tables must be colocated
DEBUG: Creating MERGE repartition plan
ERROR: MERGE operation on non-colocated distributed table(s) without a shard key is not yet supported
-- with a distributed table
MERGE INTO nullkey_c1_t1 USING distributed_table ON (nullkey_c1_t1.a = distributed_table.a)
WHEN MATCHED THEN UPDATE SET b = distributed_table.b
WHEN NOT MATCHED THEN INSERT VALUES (distributed_table.a, distributed_table.b);
DEBUG: Distributed tables are not co-located, try repartitioning
DEBUG: For MERGE command, all the distributed tables must be colocated
DEBUG: Creating MERGE repartition plan
ERROR: MERGE operation on non-colocated distributed table(s) without a shard key is not yet supported
MERGE INTO distributed_table USING nullkey_c1_t1 ON (nullkey_c1_t1.a = distributed_table.a)
WHEN MATCHED THEN DELETE
WHEN NOT MATCHED THEN INSERT VALUES (nullkey_c1_t1.a, nullkey_c1_t1.b);
DEBUG: Distributed tables are not co-located, try repartitioning
DEBUG: For MERGE command, all the distributed tables must be colocated
DEBUG: Creating MERGE repartition plan
DEBUG: Using column - index:0 from the source list to redistribute
DEBUG: Distributed planning for a fast-path router query
DEBUG: Creating router plan
DEBUG: Collect source query results on coordinator
DEBUG: Create a MERGE task list that needs to be routed
DEBUG: <Deparsed MERGE query: MERGE INTO query_single_shard_table.distributed_table_4000189 citus_table_alias USING (SELECT intermediate_result.a, intermediate_result.b FROM read_intermediate_result('merge_into_XXX_4000189'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer)) nullkey_c1_t1 ON (nullkey_c1_t1.a OPERATOR(pg_catalog.=) citus_table_alias.a) WHEN MATCHED THEN DELETE WHEN NOT MATCHED THEN INSERT (a, b) VALUES (nullkey_c1_t1.a, nullkey_c1_t1.b)>
DEBUG: distributed statement: MERGE INTO query_single_shard_table.distributed_table_4000189 citus_table_alias USING (SELECT intermediate_result.a, intermediate_result.b FROM read_intermediate_result('merge_into_XXX_4000189'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer)) nullkey_c1_t1 ON (nullkey_c1_t1.a OPERATOR(pg_catalog.=) citus_table_alias.a) WHEN MATCHED THEN DELETE WHEN NOT MATCHED THEN INSERT (a, b) VALUES (nullkey_c1_t1.a, nullkey_c1_t1.b)
DEBUG: <Deparsed MERGE query: MERGE INTO query_single_shard_table.distributed_table_4000190 citus_table_alias USING (SELECT intermediate_result.a, intermediate_result.b FROM read_intermediate_result('merge_into_XXX_4000190'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer)) nullkey_c1_t1 ON (nullkey_c1_t1.a OPERATOR(pg_catalog.=) citus_table_alias.a) WHEN MATCHED THEN DELETE WHEN NOT MATCHED THEN INSERT (a, b) VALUES (nullkey_c1_t1.a, nullkey_c1_t1.b)>
DEBUG: distributed statement: MERGE INTO query_single_shard_table.distributed_table_4000190 citus_table_alias USING (SELECT intermediate_result.a, intermediate_result.b FROM read_intermediate_result('merge_into_XXX_4000190'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer)) nullkey_c1_t1 ON (nullkey_c1_t1.a OPERATOR(pg_catalog.=) citus_table_alias.a) WHEN MATCHED THEN DELETE WHEN NOT MATCHED THEN INSERT (a, b) VALUES (nullkey_c1_t1.a, nullkey_c1_t1.b)
DEBUG: <Deparsed MERGE query: MERGE INTO query_single_shard_table.distributed_table_4000191 citus_table_alias USING (SELECT intermediate_result.a, intermediate_result.b FROM read_intermediate_result('merge_into_XXX_4000191'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer)) nullkey_c1_t1 ON (nullkey_c1_t1.a OPERATOR(pg_catalog.=) citus_table_alias.a) WHEN MATCHED THEN DELETE WHEN NOT MATCHED THEN INSERT (a, b) VALUES (nullkey_c1_t1.a, nullkey_c1_t1.b)>
DEBUG: distributed statement: MERGE INTO query_single_shard_table.distributed_table_4000191 citus_table_alias USING (SELECT intermediate_result.a, intermediate_result.b FROM read_intermediate_result('merge_into_XXX_4000191'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer)) nullkey_c1_t1 ON (nullkey_c1_t1.a OPERATOR(pg_catalog.=) citus_table_alias.a) WHEN MATCHED THEN DELETE WHEN NOT MATCHED THEN INSERT (a, b) VALUES (nullkey_c1_t1.a, nullkey_c1_t1.b)
DEBUG: <Deparsed MERGE query: MERGE INTO query_single_shard_table.distributed_table_4000192 citus_table_alias USING (SELECT intermediate_result.a, intermediate_result.b FROM read_intermediate_result('merge_into_XXX_4000192'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer)) nullkey_c1_t1 ON (nullkey_c1_t1.a OPERATOR(pg_catalog.=) citus_table_alias.a) WHEN MATCHED THEN DELETE WHEN NOT MATCHED THEN INSERT (a, b) VALUES (nullkey_c1_t1.a, nullkey_c1_t1.b)>
DEBUG: distributed statement: MERGE INTO query_single_shard_table.distributed_table_4000192 citus_table_alias USING (SELECT intermediate_result.a, intermediate_result.b FROM read_intermediate_result('merge_into_XXX_4000192'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer)) nullkey_c1_t1 ON (nullkey_c1_t1.a OPERATOR(pg_catalog.=) citus_table_alias.a) WHEN MATCHED THEN DELETE WHEN NOT MATCHED THEN INSERT (a, b) VALUES (nullkey_c1_t1.a, nullkey_c1_t1.b)
-- with a reference table
MERGE INTO nullkey_c1_t1 USING reference_table ON (nullkey_c1_t1.a = reference_table.a)
WHEN MATCHED THEN UPDATE SET b = reference_table.b;
DEBUG: A mix of distributed and reference table, try repartitioning
DEBUG: A mix of distributed and reference table, routable query is not possible
DEBUG: Creating MERGE repartition plan
ERROR: MERGE operation on non-colocated distributed table(s) without a shard key is not yet supported
MERGE INTO reference_table USING nullkey_c1_t1 ON (nullkey_c1_t1.a = reference_table.a)
WHEN MATCHED THEN UPDATE SET b = nullkey_c1_t1.b
WHEN NOT MATCHED THEN INSERT VALUES (nullkey_c1_t1.a, nullkey_c1_t1.b);
ERROR: Reference table as target is not allowed in MERGE command
-- with a citus local table
MERGE INTO nullkey_c1_t1 USING citus_local_table ON (nullkey_c1_t1.a = citus_local_table.a)
WHEN MATCHED THEN UPDATE SET b = citus_local_table.b;
DEBUG: A mix of distributed and local table, try repartitioning
DEBUG: A mix of distributed and citus-local table, routable query is not possible
DEBUG: Creating MERGE repartition plan
ERROR: MERGE operation on non-colocated distributed table(s) without a shard key is not yet supported
MERGE INTO citus_local_table USING nullkey_c1_t1 ON (nullkey_c1_t1.a = citus_local_table.a)
WHEN MATCHED THEN DELETE;
DEBUG: A mix of distributed and local table, try repartitioning
DEBUG: A mix of distributed and citus-local table, routable query is not possible
DEBUG: Creating MERGE repartition plan
ERROR: MERGE involving repartition of rows is supported only if the target is distributed
-- with a postgres table
MERGE INTO nullkey_c1_t1 USING postgres_local_table ON (nullkey_c1_t1.a = postgres_local_table.a)
WHEN MATCHED THEN UPDATE SET b = postgres_local_table.b;
DEBUG: There is only one distributed table, merge is not pushable, try repartitioning
DEBUG: Creating MERGE repartition plan
ERROR: MERGE INTO an distributed table from Postgres table is not yet supported
MERGE INTO postgres_local_table USING nullkey_c1_t1 ON (nullkey_c1_t1.a = postgres_local_table.a)
WHEN MATCHED THEN UPDATE SET b = nullkey_c1_t1.b
WHEN NOT MATCHED THEN INSERT VALUES (nullkey_c1_t1.a, nullkey_c1_t1.b);
DEBUG: There is only one distributed table, merge is not pushable, try repartitioning
DEBUG: Creating MERGE repartition plan
ERROR: MERGE involving repartition of rows is supported only if the target is distributed
-- using ctes
WITH cte AS (
SELECT * FROM nullkey_c1_t1
)
MERGE INTO nullkey_c1_t1 USING cte ON (nullkey_c1_t1.a = cte.a)
WHEN MATCHED THEN UPDATE SET b = cte.b;
DEBUG: <Deparsed MERGE query: WITH cte AS (SELECT nullkey_c1_t1_1.a, nullkey_c1_t1_1.b FROM query_single_shard_table.nullkey_c1_t1_4000184 nullkey_c1_t1_1) MERGE INTO query_single_shard_table.nullkey_c1_t1_4000184 nullkey_c1_t1 USING cte ON (nullkey_c1_t1.a OPERATOR(pg_catalog.=) cte.a) WHEN MATCHED THEN UPDATE SET b = cte.b>
DEBUG: Creating MERGE router plan
WITH cte AS (
SELECT * FROM distributed_table
)
MERGE INTO nullkey_c1_t1 USING cte ON (nullkey_c1_t1.a = cte.a)
WHEN MATCHED THEN UPDATE SET b = cte.b;
DEBUG: Distributed tables are not co-located, try repartitioning
DEBUG: For MERGE command, all the distributed tables must be colocated
DEBUG: Creating MERGE repartition plan
ERROR: MERGE operation on non-colocated distributed table(s) without a shard key is not yet supported
WITH cte AS materialized (
SELECT * FROM distributed_table
)
MERGE INTO nullkey_c1_t1 USING cte ON (nullkey_c1_t1.a = cte.a)
WHEN MATCHED THEN UPDATE SET b = cte.b;
DEBUG: Distributed tables are not co-located, try repartitioning
DEBUG: For MERGE command, all the distributed tables must be colocated
DEBUG: Creating MERGE repartition plan
ERROR: MERGE operation on non-colocated distributed table(s) without a shard key is not yet supported
SET client_min_messages TO WARNING;
DROP SCHEMA query_single_shard_table CASCADE;
SET search_path TO merge_schema;
-- Test Columnar table
CREATE TABLE target_columnar(cid int, name text) USING columnar;
SELECT create_distributed_table('target_columnar', 'cid');
Expand Down
Loading

0 comments on commit aa271ba

Please sign in to comment.