Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bug Fix] Incorrect Query Results with Distributed Query Plans. citusdata#7698 and citusdata#7697 #7810

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
102 changes: 100 additions & 2 deletions src/backend/distributed/planner/local_distributed_join_planner.c
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@
#include "optimizer/optimizer.h"
#include "optimizer/planner.h"
#include "optimizer/prep.h"
#include "optimizer/restrictinfo.h"
#include "parser/parse_relation.h"
#include "parser/parsetree.h"
#include "utils/builtins.h"
Expand Down Expand Up @@ -135,6 +136,7 @@ typedef struct RangeTableEntryDetails
RangeTblEntry *rangeTableEntry;
List *requiredAttributeNumbers;
bool hasConstantFilterOnUniqueColumn;
bool hasDependencyOnInitPlanParam;
#if PG_VERSION_NUM >= PG_VERSION_16
RTEPermissionInfo *perminfo;
#endif
Expand Down Expand Up @@ -175,6 +177,10 @@ typedef enum ConversionChoice

static bool HasConstantFilterOnUniqueColumn(RangeTblEntry *rangeTableEntry,
RelationRestriction *relationRestriction);

static bool HasDependencyOnInitPlanParam(RangeTblEntry *rangeTableEntry,
RelationRestriction *relationRestriction);

static ConversionCandidates * CreateConversionCandidates(PlannerRestrictionContext *
plannerRestrictionContext,
List *rangeTableList,
Expand Down Expand Up @@ -290,7 +296,13 @@ GetConversionChoice(ConversionCandidates *conversionCandidates,

if (LocalTableJoinPolicy == LOCAL_JOIN_POLICY_PREFER_LOCAL)
{
return localRTECandidate ? CONVERT_LOCAL_TABLES : CONVERT_DISTRIBUTED_TABLES;
/*
* If Local table is referenced by the InitPlan that is kind of a One time filter,
* In that case we should refrain from converting the local tables.
*/
return localRTECandidate &&
(!localRTECandidate->hasDependencyOnInitPlanParam) ?
CONVERT_LOCAL_TABLES : CONVERT_DISTRIBUTED_TABLES;
}
else if (LocalTableJoinPolicy == LOCAL_JOIN_POLICY_PREFER_DISTRIBUTED)
{
Expand All @@ -314,7 +326,9 @@ GetConversionChoice(ConversionCandidates *conversionCandidates,
}
else
{
return localRTECandidate ? CONVERT_LOCAL_TABLES : CONVERT_DISTRIBUTED_TABLES;
return localRTECandidate &&
(!localRTECandidate->hasDependencyOnInitPlanParam) ?
CONVERT_LOCAL_TABLES : CONVERT_DISTRIBUTED_TABLES;
}
}
}
Expand Down Expand Up @@ -383,6 +397,87 @@ ShouldConvertLocalTableJoinsToSubqueries(List *rangeTableList)
}


/*
* HasDependencyOnInitPlanParam
*
* This function returns true if the given rangeTableEntry has a dependency
* on an InitPlan parameter.
*/
static bool
HasDependencyOnInitPlanParam(RangeTblEntry *rangeTableEntry,
RelationRestriction *relationRestriction)
{
List *initPlanParamIDs = NIL;
ListCell *lc = NULL;

/*
* Exit early if the plan does not include the initPlan or if relationRestriction
* does not contain joininfo.
*/
if (rangeTableEntry == NULL || relationRestriction == NULL)
{
return false;
}
if (relationRestriction->relOptInfo->joininfo == NULL)
{
return false;
}
if (relationRestriction->plannerInfo->init_plans == NULL)
{
return false;
}

/*
* Gather all parameter IDs referenced by the InitPlan
*/
foreach(lc, relationRestriction->plannerInfo->init_plans)
{
Node *plan = (Node *) lfirst(lc);

if (IsA(plan, SubPlan))
{
SubPlan *subplan = (SubPlan *) plan;
if (subplan->setParam != NIL)
{
initPlanParamIDs = list_concat_unique_int(initPlanParamIDs,
subplan->setParam);
}
}
}
if (initPlanParamIDs == NIL)
{
return false;
}

/*
* Check if any parameter in the join conditions (join info) for this relation
* is referenced by the initPlan. This is important to ensure that we can
* decide whether we want to convert local or remote tables.
*/
List *whereClauseList = extract_actual_clauses(
relationRestriction->relOptInfo->joininfo,
true);

foreach(lc, whereClauseList)
{
Node *clause = (Node *) lfirst(lc);

if (IsA(clause, Param))
{
Param *param = (Param *) clause;
if (param->paramkind == PARAM_EXEC)
{
if (list_member_int(initPlanParamIDs, param->paramid))
{
return true;
}
}
}
}
return false;
}


/*
* HasConstantFilterOnUniqueColumn returns true if the given rangeTableEntry has a constant
* filter on a unique column.
Expand Down Expand Up @@ -581,6 +676,9 @@ CreateConversionCandidates(PlannerRestrictionContext *plannerRestrictionContext,
RequiredAttrNumbersForRelation(rangeTableEntry, plannerRestrictionContext);
rangeTableEntryDetails->hasConstantFilterOnUniqueColumn =
HasConstantFilterOnUniqueColumn(rangeTableEntry, relationRestriction);
rangeTableEntryDetails->hasDependencyOnInitPlanParam =
HasDependencyOnInitPlanParam(rangeTableEntry, relationRestriction);

#if PG_VERSION_NUM >= PG_VERSION_16
rangeTableEntryDetails->perminfo = NULL;
if (rangeTableEntry->perminfoindex)
Expand Down
88 changes: 88 additions & 0 deletions src/test/regress/expected/issue_7698_7697.out
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
-- Issue #7698: An incorrect query result, where the distributed query plan seems wrong
-- https://github.com/citusdata/citus/issues/7698
CREATE TABLE t1 (vkey int4 ,c10 int4);
CREATE TABLE t3 (vkey int4);
INSERT INTO t3 (vkey) values (1);
INSERT INTO t1 (vkey,c10) values (4, -70);
SELECT t3.vkey
FROM (t1 RIGHT OUTER JOIN t3
ON (t1.c10 = t3.vkey ))
WHERE EXISTS (SELECT * FROM t3);
vkey
---------------------------------------------------------------------
1
(1 row)

-- Make t1 a distributed table
SELECT create_distributed_table('t1', 'vkey');
NOTICE: Copying data from local table...
NOTICE: copying the data has completed
DETAIL: The local data in the table is no longer visible, but is still on disk.
HINT: To remove the local data, run: SELECT truncate_local_data_after_distributing_table($$public.t1$$)
create_distributed_table
---------------------------------------------------------------------

(1 row)

-- Result should remain the same after making t1 a distributed table
SELECT t3.vkey
FROM (t1 RIGHT OUTER JOIN t3
ON (t1.c10 = t3.vkey ))
WHERE EXISTS (SELECT * FROM t3);
vkey
---------------------------------------------------------------------
1
(1 row)

--- cleanup
DROP TABLE t1;
DROP TABLE t3;
-- Issue #7697: Incorrect result from a distributed table full outer join an undistributed table.
-- https://github.com/citusdata/citus/issues/7697
CREATE TABLE t0 (vkey int4 ,c3 timestamp);
CREATE TABLE t3 (vkey int4 ,c26 timestamp);
CREATE TABLE t4 (vkey int4);
INSERT INTO t0 (vkey, c3) VALUES
(13,make_timestamp(2019, 10, 23, 15, 34, 50));
INSERT INTO t3 (vkey,c26) VALUES
(1, make_timestamp(2024, 3, 26, 17, 36, 53));
INSERT INTO t4 (vkey) VALUES
(1);
SELECT * FROM
(t0 FULL OUTER JOIN t3 ON (t0.c3 = t3.c26 ))
WHERE (
EXISTS (SELECT * FROM t4)
);
vkey | c3 | vkey | c26
---------------------------------------------------------------------
13 | Wed Oct 23 15:34:50 2019 | |
| | 1 | Tue Mar 26 17:36:53 2024
(2 rows)

-- change t0 to distributed table
SELECT create_distributed_table('t0', 'vkey');
NOTICE: Copying data from local table...
NOTICE: copying the data has completed
DETAIL: The local data in the table is no longer visible, but is still on disk.
HINT: To remove the local data, run: SELECT truncate_local_data_after_distributing_table($$public.t0$$)
create_distributed_table
---------------------------------------------------------------------

(1 row)

-- Result should remain the same after making t0 a distributed table
SELECT * FROM
(t0 FULL OUTER JOIN t3 ON (t0.c3 = t3.c26 ))
WHERE (
EXISTS (SELECT * FROM t4)
);
vkey | c3 | vkey | c26
---------------------------------------------------------------------
| | 1 | Tue Mar 26 17:36:53 2024
13 | Wed Oct 23 15:34:50 2019 | |
(2 rows)

--- cleanup
DROP TABLE t0;
DROP TABLE t3;
DROP TABLE t4;
2 changes: 1 addition & 1 deletion src/test/regress/multi_schedule
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ test: multi_dropped_column_aliases foreign_key_restriction_enforcement
test: binary_protocol
test: alter_table_set_access_method
test: alter_distributed_table
test: issue_5248 issue_5099 issue_5763 issue_6543 issue_6758 issue_7477 issue_7705
test: issue_5248 issue_5099 issue_5763 issue_6543 issue_6758 issue_7477 issue_7705 issue_7698_7697
test: object_propagation_debug
test: undistribute_table
test: run_command_on_all_nodes
Expand Down
66 changes: 66 additions & 0 deletions src/test/regress/sql/issue_7698_7697.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@

-- Issue #7698: An incorrect query result, where the distributed query plan seems wrong
-- https://github.com/citusdata/citus/issues/7698

CREATE TABLE t1 (vkey int4 ,c10 int4);
CREATE TABLE t3 (vkey int4);
INSERT INTO t3 (vkey) values (1);
INSERT INTO t1 (vkey,c10) values (4, -70);

SELECT t3.vkey
FROM (t1 RIGHT OUTER JOIN t3
ON (t1.c10 = t3.vkey ))
WHERE EXISTS (SELECT * FROM t3);

-- Make t1 a distributed table
SELECT create_distributed_table('t1', 'vkey');

-- Result should remain the same after making t1 a distributed table

SELECT t3.vkey
FROM (t1 RIGHT OUTER JOIN t3
ON (t1.c10 = t3.vkey ))
WHERE EXISTS (SELECT * FROM t3);

--- cleanup
DROP TABLE t1;
DROP TABLE t3;

-- Issue #7697: Incorrect result from a distributed table full outer join an undistributed table.
-- https://github.com/citusdata/citus/issues/7697

CREATE TABLE t0 (vkey int4 ,c3 timestamp);
CREATE TABLE t3 (vkey int4 ,c26 timestamp);
CREATE TABLE t4 (vkey int4);


INSERT INTO t0 (vkey, c3) VALUES
(13,make_timestamp(2019, 10, 23, 15, 34, 50));

INSERT INTO t3 (vkey,c26) VALUES
(1, make_timestamp(2024, 3, 26, 17, 36, 53));

INSERT INTO t4 (vkey) VALUES
(1);

SELECT * FROM
(t0 FULL OUTER JOIN t3 ON (t0.c3 = t3.c26 ))
WHERE (
EXISTS (SELECT * FROM t4)
);

-- change t0 to distributed table
SELECT create_distributed_table('t0', 'vkey');

-- Result should remain the same after making t0 a distributed table

SELECT * FROM
(t0 FULL OUTER JOIN t3 ON (t0.c3 = t3.c26 ))
WHERE (
EXISTS (SELECT * FROM t4)
);

--- cleanup
DROP TABLE t0;
DROP TABLE t3;
DROP TABLE t4;
Loading