From 1597f5b2ec91687e20eecd547c0303fd9ee176f5 Mon Sep 17 00:00:00 2001 From: Gonzalo Ortiz Date: Wed, 20 Nov 2024 09:07:13 +0100 Subject: [PATCH] Fix root mailbox send node build using null receiver stage --- .../logical/EquivalentStagesFinder.java | 2 -- .../logical/PinotLogicalQueryPlanner.java | 2 +- .../planner/plannode/MailboxSendNode.java | 20 +++++-------------- .../query/planner/logical/StagesTestBase.java | 6 +++--- 4 files changed, 9 insertions(+), 21 deletions(-) diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/EquivalentStagesFinder.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/EquivalentStagesFinder.java index abf1949fecf9..28bca306cd5c 100644 --- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/EquivalentStagesFinder.java +++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/EquivalentStagesFinder.java @@ -224,8 +224,6 @@ public Boolean visitMailboxReceive(MailboxReceiveNode node1, PlanNode node2) { // TODO: Keys should probably be removed from the equivalence check, but would require to verify both // keys are present in the data schema. We are not doing that for now. && Objects.equals(node1.getKeys(), that.getKeys()) - // Distribution type is not needed for equivalence. We deal with difference distribution types in the - // spooling logic. && node1.getDistributionType() == that.getDistributionType() // TODO: Sort, sort on sender and collations can probably be removed from the equivalence check, but would // require some extra checks or transformation on the spooling logic. We are not doing that for now. diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/PinotLogicalQueryPlanner.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/PinotLogicalQueryPlanner.java index 7609961ca15b..8282ea787b31 100644 --- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/PinotLogicalQueryPlanner.java +++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/PinotLogicalQueryPlanner.java @@ -99,7 +99,7 @@ private static PlanFragment planNodeToPlanFragment( // Sub plan root needs to send final results back to the Broker // TODO: Should be SINGLETON (currently SINGLETON has to be local, so use BROADCAST_DISTRIBUTED instead) MailboxSendNode subPlanRootSenderNode = - new MailboxSendNode(node.getStageId(), node.getDataSchema(), List.of(node), null, + new MailboxSendNode(node.getStageId(), node.getDataSchema(), List.of(node), 0, PinotRelExchangeType.getDefaultExchangeType(), RelDistribution.Type.BROADCAST_DISTRIBUTED, null, false, null, false); PlanFragment planFragment1 = new PlanFragment(1, subPlanRootSenderNode, new ArrayList<>()); diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/MailboxSendNode.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/MailboxSendNode.java index 433238810113..d8eb0b84ef4f 100644 --- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/MailboxSendNode.java +++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/MailboxSendNode.java @@ -30,8 +30,7 @@ public class MailboxSendNode extends BasePlanNode { - @Nullable - private BitSet _receiverStages; + private final BitSet _receiverStages; private final PinotRelExchangeType _exchangeType; private RelDistribution.Type _distributionType; private final List _keys; @@ -41,11 +40,10 @@ public class MailboxSendNode extends BasePlanNode { // NOTE: null List is converted to empty List because there is no way to differentiate them in proto during ser/de. public MailboxSendNode(int stageId, DataSchema dataSchema, List inputs, - @Nullable BitSet receiverStages, PinotRelExchangeType exchangeType, + BitSet receiverStages, PinotRelExchangeType exchangeType, RelDistribution.Type distributionType, @Nullable List keys, boolean prePartitioned, @Nullable List collations, boolean sort) { super(stageId, dataSchema, null, inputs); - // we need a copy of _receivers to make sure it is modifiable _receiverStages = receiverStages != null ? (BitSet) receiverStages.clone() : new BitSet(); _exchangeType = exchangeType; _distributionType = distributionType; @@ -76,25 +74,17 @@ public MailboxSendNode(int stageId, DataSchema dataSchema, List inputs } public BitSet getReceiverStages() { - Preconditions.checkState(_receiverStages != null && !_receiverStages.isEmpty(), "Receivers not set"); - return _receiverStages; + Preconditions.checkState(!_receiverStages.isEmpty(), "Receivers not set"); + return (BitSet) _receiverStages.clone(); } @Deprecated public int getReceiverStageId() { - Preconditions.checkState(_receiverStages != null && !_receiverStages.isEmpty(), "Receivers not set"); + Preconditions.checkState(!_receiverStages.isEmpty(), "Receivers not set"); return _receiverStages.nextSetBit(0); } - public void setReceiverStages(BitSet receiverStages) { - Preconditions.checkState(_receiverStages == null || _receiverStages.isEmpty(), "Receivers already set"); - Preconditions.checkArgument(receiverStages != null && !receiverStages.isEmpty(), "Invalid receivers: %s", - receiverStages); - _receiverStages = receiverStages; - } - public void addReceiver(MailboxReceiveNode node) { - Preconditions.checkState(_receiverStages != null, "Receivers not set"); if (_receiverStages.get(node.getStageId())) { throw new IllegalStateException("Receiver already added: " + node.getStageId()); } diff --git a/pinot-query-planner/src/test/java/org/apache/pinot/query/planner/logical/StagesTestBase.java b/pinot-query-planner/src/test/java/org/apache/pinot/query/planner/logical/StagesTestBase.java index 6e32ca4c3514..a67c236a50cb 100644 --- a/pinot-query-planner/src/test/java/org/apache/pinot/query/planner/logical/StagesTestBase.java +++ b/pinot-query-planner/src/test/java/org/apache/pinot/query/planner/logical/StagesTestBase.java @@ -81,7 +81,7 @@ public void cleanup() { * Notice that this method does not offer any way to customize the initial send mailbox. */ public MailboxSendNode when(SimpleChildBuilder builder) { - return sendMailbox(0, 0, builder).build(0); + return sendMailbox(0, builder).build(0); } /** @@ -187,10 +187,10 @@ public MailboxSendNode stage(int stageId) { * {@code exchange} creates a pair of send and receive mailboxes and deals with the stageId management. */ public SimpleChildBuilder sendMailbox( - int receiverStageId, int receiverId, SimpleChildBuilder childBuilder) { + int newStageId, SimpleChildBuilder childBuilder) { return (stageId, mySchema, myHints) -> { PlanNode input = childBuilder.build(stageId); - MailboxSendNode mailboxSendNode = new MailboxSendNode(stageId, mySchema, List.of(input), receiverStageId, null, + MailboxSendNode mailboxSendNode = new MailboxSendNode(newStageId, mySchema, List.of(input), stageId, null, null, null, false, null, false); MailboxSendNode old = _stageRoots.put(stageId, mailboxSendNode); Preconditions.checkState(old == null, "Mailbox already exists for stageId: %s", stageId);