diff --git a/src/backend/distributed/planner/local_distributed_join_planner.c b/src/backend/distributed/planner/local_distributed_join_planner.c index a6502bf43c4..13fc4465b76 100644 --- a/src/backend/distributed/planner/local_distributed_join_planner.c +++ b/src/backend/distributed/planner/local_distributed_join_planner.c @@ -87,6 +87,7 @@ #include "optimizer/optimizer.h" #include "optimizer/planner.h" #include "optimizer/prep.h" +#include "optimizer/restrictinfo.h" #include "parser/parse_relation.h" #include "parser/parsetree.h" #include "utils/builtins.h" @@ -135,6 +136,7 @@ typedef struct RangeTableEntryDetails RangeTblEntry *rangeTableEntry; List *requiredAttributeNumbers; bool hasConstantFilterOnUniqueColumn; + bool hasDependencyOnInitPlanParam; #if PG_VERSION_NUM >= PG_VERSION_16 RTEPermissionInfo *perminfo; #endif @@ -175,6 +177,10 @@ typedef enum ConversionChoice static bool HasConstantFilterOnUniqueColumn(RangeTblEntry *rangeTableEntry, RelationRestriction *relationRestriction); + +static bool HasDependencyOnInitPlanParam(RangeTblEntry *rangeTableEntry, + RelationRestriction *relationRestriction); + static ConversionCandidates * CreateConversionCandidates(PlannerRestrictionContext * plannerRestrictionContext, List *rangeTableList, @@ -290,7 +296,13 @@ GetConversionChoice(ConversionCandidates *conversionCandidates, if (LocalTableJoinPolicy == LOCAL_JOIN_POLICY_PREFER_LOCAL) { - return localRTECandidate ? CONVERT_LOCAL_TABLES : CONVERT_DISTRIBUTED_TABLES; + /* + * If Local table is referenced by the InitPlan that is kind of a One time filter, + * In that case we should refrain from converting the local tables. + */ + return localRTECandidate && + (!localRTECandidate->hasDependencyOnInitPlanParam) ? + CONVERT_LOCAL_TABLES : CONVERT_DISTRIBUTED_TABLES; } else if (LocalTableJoinPolicy == LOCAL_JOIN_POLICY_PREFER_DISTRIBUTED) { @@ -314,7 +326,9 @@ GetConversionChoice(ConversionCandidates *conversionCandidates, } else { - return localRTECandidate ? CONVERT_LOCAL_TABLES : CONVERT_DISTRIBUTED_TABLES; + return localRTECandidate && + (!localRTECandidate->hasDependencyOnInitPlanParam) ? + CONVERT_LOCAL_TABLES : CONVERT_DISTRIBUTED_TABLES; } } } @@ -383,6 +397,87 @@ ShouldConvertLocalTableJoinsToSubqueries(List *rangeTableList) } +/* + * HasDependencyOnInitPlanParam + * + * This function returns true if the given rangeTableEntry has a dependency + * on an InitPlan parameter. + */ +static bool +HasDependencyOnInitPlanParam(RangeTblEntry *rangeTableEntry, + RelationRestriction *relationRestriction) +{ + List *initPlanParamIDs = NIL; + ListCell *lc = NULL; + + /* + * Exit early if the plan does not include the initPlan or if relationRestriction + * does not contain joininfo. + */ + if (rangeTableEntry == NULL || relationRestriction == NULL) + { + return false; + } + if (relationRestriction->relOptInfo->joininfo == NULL) + { + return false; + } + if (relationRestriction->plannerInfo->init_plans == NULL) + { + return false; + } + + /* + * Gather all parameter IDs referenced by the InitPlan + */ + foreach(lc, relationRestriction->plannerInfo->init_plans) + { + Node *plan = (Node *) lfirst(lc); + + if (IsA(plan, SubPlan)) + { + SubPlan *subplan = (SubPlan *) plan; + if (subplan->setParam != NIL) + { + initPlanParamIDs = list_concat_unique_int(initPlanParamIDs, + subplan->setParam); + } + } + } + if (initPlanParamIDs == NIL) + { + return false; + } + + /* + * Check if any parameter in the join conditions (join info) for this relation + * is referenced by the initPlan. This is important to ensure that we can + * decide whether we want to convert local or remote tables. + */ + List *whereClauseList = extract_actual_clauses( + relationRestriction->relOptInfo->joininfo, + true); + + foreach(lc, whereClauseList) + { + Node *clause = (Node *) lfirst(lc); + + if (IsA(clause, Param)) + { + Param *param = (Param *) clause; + if (param->paramkind == PARAM_EXEC) + { + if (list_member_int(initPlanParamIDs, param->paramid)) + { + return true; + } + } + } + } + return false; +} + + /* * HasConstantFilterOnUniqueColumn returns true if the given rangeTableEntry has a constant * filter on a unique column. @@ -581,6 +676,9 @@ CreateConversionCandidates(PlannerRestrictionContext *plannerRestrictionContext, RequiredAttrNumbersForRelation(rangeTableEntry, plannerRestrictionContext); rangeTableEntryDetails->hasConstantFilterOnUniqueColumn = HasConstantFilterOnUniqueColumn(rangeTableEntry, relationRestriction); + rangeTableEntryDetails->hasDependencyOnInitPlanParam = + HasDependencyOnInitPlanParam(rangeTableEntry, relationRestriction); + #if PG_VERSION_NUM >= PG_VERSION_16 rangeTableEntryDetails->perminfo = NULL; if (rangeTableEntry->perminfoindex) diff --git a/src/test/regress/expected/issue_7698_7697.out b/src/test/regress/expected/issue_7698_7697.out new file mode 100644 index 00000000000..d3c6f23eebe --- /dev/null +++ b/src/test/regress/expected/issue_7698_7697.out @@ -0,0 +1,88 @@ +-- Issue #7698: An incorrect query result, where the distributed query plan seems wrong +-- https://github.com/citusdata/citus/issues/7698 +CREATE TABLE t1 (vkey int4 ,c10 int4); +CREATE TABLE t3 (vkey int4); +INSERT INTO t3 (vkey) values (1); +INSERT INTO t1 (vkey,c10) values (4, -70); +SELECT t3.vkey + FROM (t1 RIGHT OUTER JOIN t3 + ON (t1.c10 = t3.vkey )) + WHERE EXISTS (SELECT * FROM t3); + vkey +--------------------------------------------------------------------- + 1 +(1 row) + +-- Make t1 a distributed table +SELECT create_distributed_table('t1', 'vkey'); +NOTICE: Copying data from local table... +NOTICE: copying the data has completed +DETAIL: The local data in the table is no longer visible, but is still on disk. +HINT: To remove the local data, run: SELECT truncate_local_data_after_distributing_table($$public.t1$$) + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +-- Result should remain the same after making t1 a distributed table +SELECT t3.vkey + FROM (t1 RIGHT OUTER JOIN t3 + ON (t1.c10 = t3.vkey )) + WHERE EXISTS (SELECT * FROM t3); + vkey +--------------------------------------------------------------------- + 1 +(1 row) + +--- cleanup +DROP TABLE t1; +DROP TABLE t3; +-- Issue #7697: Incorrect result from a distributed table full outer join an undistributed table. +-- https://github.com/citusdata/citus/issues/7697 +CREATE TABLE t0 (vkey int4 ,c3 timestamp); +CREATE TABLE t3 (vkey int4 ,c26 timestamp); +CREATE TABLE t4 (vkey int4); +INSERT INTO t0 (vkey, c3) VALUES + (13,make_timestamp(2019, 10, 23, 15, 34, 50)); +INSERT INTO t3 (vkey,c26) VALUES + (1, make_timestamp(2024, 3, 26, 17, 36, 53)); +INSERT INTO t4 (vkey) VALUES + (1); +SELECT * FROM + (t0 FULL OUTER JOIN t3 ON (t0.c3 = t3.c26 )) + WHERE ( + EXISTS (SELECT * FROM t4) + ); + vkey | c3 | vkey | c26 +--------------------------------------------------------------------- + 13 | Wed Oct 23 15:34:50 2019 | | + | | 1 | Tue Mar 26 17:36:53 2024 +(2 rows) + +-- change t0 to distributed table +SELECT create_distributed_table('t0', 'vkey'); +NOTICE: Copying data from local table... +NOTICE: copying the data has completed +DETAIL: The local data in the table is no longer visible, but is still on disk. +HINT: To remove the local data, run: SELECT truncate_local_data_after_distributing_table($$public.t0$$) + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +-- Result should remain the same after making t0 a distributed table +SELECT * FROM + (t0 FULL OUTER JOIN t3 ON (t0.c3 = t3.c26 )) + WHERE ( + EXISTS (SELECT * FROM t4) + ); + vkey | c3 | vkey | c26 +--------------------------------------------------------------------- + | | 1 | Tue Mar 26 17:36:53 2024 + 13 | Wed Oct 23 15:34:50 2019 | | +(2 rows) + +--- cleanup +DROP TABLE t0; +DROP TABLE t3; +DROP TABLE t4; diff --git a/src/test/regress/multi_schedule b/src/test/regress/multi_schedule index bbb4047a950..322f836fd90 100644 --- a/src/test/regress/multi_schedule +++ b/src/test/regress/multi_schedule @@ -103,7 +103,7 @@ test: multi_dropped_column_aliases foreign_key_restriction_enforcement test: binary_protocol test: alter_table_set_access_method test: alter_distributed_table -test: issue_5248 issue_5099 issue_5763 issue_6543 issue_6758 issue_7477 issue_7705 +test: issue_5248 issue_5099 issue_5763 issue_6543 issue_6758 issue_7477 issue_7705 issue_7698_7697 test: object_propagation_debug test: undistribute_table test: run_command_on_all_nodes diff --git a/src/test/regress/sql/issue_7698_7697.sql b/src/test/regress/sql/issue_7698_7697.sql new file mode 100644 index 00000000000..2b09376252d --- /dev/null +++ b/src/test/regress/sql/issue_7698_7697.sql @@ -0,0 +1,66 @@ + +-- Issue #7698: An incorrect query result, where the distributed query plan seems wrong +-- https://github.com/citusdata/citus/issues/7698 + +CREATE TABLE t1 (vkey int4 ,c10 int4); +CREATE TABLE t3 (vkey int4); +INSERT INTO t3 (vkey) values (1); +INSERT INTO t1 (vkey,c10) values (4, -70); + +SELECT t3.vkey + FROM (t1 RIGHT OUTER JOIN t3 + ON (t1.c10 = t3.vkey )) + WHERE EXISTS (SELECT * FROM t3); + +-- Make t1 a distributed table +SELECT create_distributed_table('t1', 'vkey'); + +-- Result should remain the same after making t1 a distributed table + +SELECT t3.vkey + FROM (t1 RIGHT OUTER JOIN t3 + ON (t1.c10 = t3.vkey )) + WHERE EXISTS (SELECT * FROM t3); + +--- cleanup +DROP TABLE t1; +DROP TABLE t3; + +-- Issue #7697: Incorrect result from a distributed table full outer join an undistributed table. +-- https://github.com/citusdata/citus/issues/7697 + +CREATE TABLE t0 (vkey int4 ,c3 timestamp); +CREATE TABLE t3 (vkey int4 ,c26 timestamp); +CREATE TABLE t4 (vkey int4); + + +INSERT INTO t0 (vkey, c3) VALUES + (13,make_timestamp(2019, 10, 23, 15, 34, 50)); + +INSERT INTO t3 (vkey,c26) VALUES + (1, make_timestamp(2024, 3, 26, 17, 36, 53)); + +INSERT INTO t4 (vkey) VALUES + (1); + +SELECT * FROM + (t0 FULL OUTER JOIN t3 ON (t0.c3 = t3.c26 )) + WHERE ( + EXISTS (SELECT * FROM t4) + ); + +-- change t0 to distributed table +SELECT create_distributed_table('t0', 'vkey'); + +-- Result should remain the same after making t0 a distributed table + +SELECT * FROM + (t0 FULL OUTER JOIN t3 ON (t0.c3 = t3.c26 )) + WHERE ( + EXISTS (SELECT * FROM t4) + ); + +--- cleanup +DROP TABLE t0; +DROP TABLE t3; +DROP TABLE t4;