Skip to content

Commit

Permalink
Fix root mailbox send node build using null receiver stage
Browse files Browse the repository at this point in the history
  • Loading branch information
gortiz committed Nov 20, 2024
1 parent 05b894e commit 1597f5b
Show file tree
Hide file tree
Showing 4 changed files with 9 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -224,8 +224,6 @@ public Boolean visitMailboxReceive(MailboxReceiveNode node1, PlanNode node2) {
// TODO: Keys should probably be removed from the equivalence check, but would require to verify both
// keys are present in the data schema. We are not doing that for now.
&& Objects.equals(node1.getKeys(), that.getKeys())
// Distribution type is not needed for equivalence. We deal with difference distribution types in the
// spooling logic.
&& node1.getDistributionType() == that.getDistributionType()
// TODO: Sort, sort on sender and collations can probably be removed from the equivalence check, but would
// require some extra checks or transformation on the spooling logic. We are not doing that for now.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ private static PlanFragment planNodeToPlanFragment(
// Sub plan root needs to send final results back to the Broker
// TODO: Should be SINGLETON (currently SINGLETON has to be local, so use BROADCAST_DISTRIBUTED instead)
MailboxSendNode subPlanRootSenderNode =
new MailboxSendNode(node.getStageId(), node.getDataSchema(), List.of(node), null,
new MailboxSendNode(node.getStageId(), node.getDataSchema(), List.of(node), 0,
PinotRelExchangeType.getDefaultExchangeType(), RelDistribution.Type.BROADCAST_DISTRIBUTED, null, false,
null, false);
PlanFragment planFragment1 = new PlanFragment(1, subPlanRootSenderNode, new ArrayList<>());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,7 @@


public class MailboxSendNode extends BasePlanNode {
@Nullable
private BitSet _receiverStages;
private final BitSet _receiverStages;
private final PinotRelExchangeType _exchangeType;
private RelDistribution.Type _distributionType;
private final List<Integer> _keys;
Expand All @@ -41,11 +40,10 @@ public class MailboxSendNode extends BasePlanNode {

// NOTE: null List is converted to empty List because there is no way to differentiate them in proto during ser/de.
public MailboxSendNode(int stageId, DataSchema dataSchema, List<PlanNode> inputs,
@Nullable BitSet receiverStages, PinotRelExchangeType exchangeType,
BitSet receiverStages, PinotRelExchangeType exchangeType,
RelDistribution.Type distributionType, @Nullable List<Integer> keys, boolean prePartitioned,
@Nullable List<RelFieldCollation> collations, boolean sort) {
super(stageId, dataSchema, null, inputs);
// we need a copy of _receivers to make sure it is modifiable
_receiverStages = receiverStages != null ? (BitSet) receiverStages.clone() : new BitSet();
_exchangeType = exchangeType;
_distributionType = distributionType;
Expand Down Expand Up @@ -76,25 +74,17 @@ public MailboxSendNode(int stageId, DataSchema dataSchema, List<PlanNode> inputs
}

public BitSet getReceiverStages() {
Preconditions.checkState(_receiverStages != null && !_receiverStages.isEmpty(), "Receivers not set");
return _receiverStages;
Preconditions.checkState(!_receiverStages.isEmpty(), "Receivers not set");
return (BitSet) _receiverStages.clone();
}

@Deprecated
public int getReceiverStageId() {
Preconditions.checkState(_receiverStages != null && !_receiverStages.isEmpty(), "Receivers not set");
Preconditions.checkState(!_receiverStages.isEmpty(), "Receivers not set");
return _receiverStages.nextSetBit(0);
}

public void setReceiverStages(BitSet receiverStages) {
Preconditions.checkState(_receiverStages == null || _receiverStages.isEmpty(), "Receivers already set");
Preconditions.checkArgument(receiverStages != null && !receiverStages.isEmpty(), "Invalid receivers: %s",
receiverStages);
_receiverStages = receiverStages;
}

public void addReceiver(MailboxReceiveNode node) {
Preconditions.checkState(_receiverStages != null, "Receivers not set");
if (_receiverStages.get(node.getStageId())) {
throw new IllegalStateException("Receiver already added: " + node.getStageId());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ public void cleanup() {
* Notice that this method does not offer any way to customize the initial send mailbox.
*/
public MailboxSendNode when(SimpleChildBuilder<? extends PlanNode> builder) {
return sendMailbox(0, 0, builder).build(0);
return sendMailbox(0, builder).build(0);
}

/**
Expand Down Expand Up @@ -187,10 +187,10 @@ public MailboxSendNode stage(int stageId) {
* {@code exchange} creates a pair of send and receive mailboxes and deals with the stageId management.
*/
public SimpleChildBuilder<MailboxSendNode> sendMailbox(
int receiverStageId, int receiverId, SimpleChildBuilder<? extends PlanNode> childBuilder) {
int newStageId, SimpleChildBuilder<? extends PlanNode> childBuilder) {
return (stageId, mySchema, myHints) -> {
PlanNode input = childBuilder.build(stageId);
MailboxSendNode mailboxSendNode = new MailboxSendNode(stageId, mySchema, List.of(input), receiverStageId, null,
MailboxSendNode mailboxSendNode = new MailboxSendNode(newStageId, mySchema, List.of(input), stageId, null,
null, null, false, null, false);
MailboxSendNode old = _stageRoots.put(stageId, mailboxSendNode);
Preconditions.checkState(old == null, "Mailbox already exists for stageId: %s", stageId);
Expand Down

0 comments on commit 1597f5b

Please sign in to comment.