Skip to content

Commit

Permalink
Skip block split for local exchange
Browse files Browse the repository at this point in the history
  • Loading branch information
Jackie-Jiang committed Jan 29, 2025
1 parent 2f4a2d5 commit e5cd4f2
Show file tree
Hide file tree
Showing 6 changed files with 46 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,11 @@ public GrpcSendingMailbox(String id, ChannelManager channelManager, String hostn
_statMap = statMap;
}

@Override
public boolean isLocal() {
return false;
}

@Override
public void send(TransferableBlock block)
throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,11 @@ public InMemorySendingMailbox(String id, MailboxService mailboxService, long dea
_statMap = statMap;
}

@Override
public boolean isLocal() {
return true;
}

@Override
public void send(TransferableBlock block)
throws TimeoutException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,12 @@
*/
public interface SendingMailbox {

/**
* Returns whether the mailbox is sending data to a local receiver, where blocks can be directly passed to the
* receiver.
*/
boolean isLocal();

/**
* Sends a block to the receiver. Note that SendingMailbox are required to acquire resources lazily in this call, and
* they should <b>not</b> acquire any resources when they are created. This method should throw if there was an error
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@

import com.google.common.collect.Iterators;
import java.util.Iterator;
import org.apache.pinot.common.datablock.BaseDataBlock;


/**
Expand All @@ -29,10 +28,10 @@
* underlying transport.
*/
public interface BlockSplitter {
BlockSplitter NO_OP = (block, type, maxBlockSize) -> Iterators.singletonIterator(block);
BlockSplitter NO_OP = (block, maxBlockSize) -> Iterators.singletonIterator(block);

/**
* @return a list of blocks that was split from the original {@code block}
*/
Iterator<TransferableBlock> split(TransferableBlock block, BaseDataBlock.Type type, int maxBlockSize);
Iterator<TransferableBlock> split(TransferableBlock block, int maxBlockSize);
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.pinot.common.datablock.DataBlock;
import org.apache.pinot.common.datablock.DataBlockUtils;
import org.apache.pinot.common.datablock.MetadataBlock;
import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.query.runtime.plan.MultiStageQueryStats;


Expand Down Expand Up @@ -63,7 +64,6 @@ public static boolean isEndOfStream(TransferableBlock transferableBlock) {
}

/**
*
* Split a block into multiple block so that each block size is within maxBlockSize. Currently,
* <ul>
* <li>For row data block, we split for row type dataBlock.</li>
Expand All @@ -72,25 +72,29 @@ public static boolean isEndOfStream(TransferableBlock transferableBlock) {
* </ul>
*
* @param block the data block
* @param type type of block
* @param maxBlockSize Each chunk of data is estimated to be less than maxBlockSize
* @return a list of data block chunks
*/
public static Iterator<TransferableBlock> splitBlock(TransferableBlock block, DataBlock.Type type, int maxBlockSize) {
List<TransferableBlock> blockChunks = new ArrayList<>();
public static Iterator<TransferableBlock> splitBlock(TransferableBlock block, int maxBlockSize) {
DataBlock.Type type = block.getType();
if (type == DataBlock.Type.ROW) {
// Use estimated row size, this estimate is not accurate and is used to estimate numRowsPerChunk only.
int estimatedRowSizeInBytes = block.getDataSchema().getColumnNames().length * MEDIAN_COLUMN_SIZE_BYTES;
DataSchema dataSchema = block.getDataSchema();
assert dataSchema != null;
int estimatedRowSizeInBytes = dataSchema.getColumnNames().length * MEDIAN_COLUMN_SIZE_BYTES;
int numRowsPerChunk = maxBlockSize / estimatedRowSizeInBytes;
Preconditions.checkState(numRowsPerChunk > 0, "row size too large for query engine to handle, abort!");

int totalNumRows = block.getNumRows();
List<Object[]> allRows = block.getContainer();
int currentRow = 0;
while (currentRow < totalNumRows) {
List<Object[]> chunk = allRows.subList(currentRow, Math.min(currentRow + numRowsPerChunk, allRows.size()));
currentRow += numRowsPerChunk;
blockChunks.add(new TransferableBlock(chunk, block.getDataSchema(), block.getType()));
List<Object[]> rows = block.getContainer();
int numRows = rows.size();
int numChunks = (numRows + numRowsPerChunk - 1) / numRowsPerChunk;
if (numChunks == 1) {
return Iterators.singletonIterator(block);
}
List<TransferableBlock> blockChunks = new ArrayList<>(numChunks);
for (int i = 0; i < numRows; i++) {
int end = Math.min(i + numRowsPerChunk, numRows);
blockChunks.add(new TransferableBlock(rows.subList(i, end), dataSchema, DataBlock.Type.ROW));
}
return blockChunks.iterator();
} else if (type == DataBlock.Type.METADATA) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeoutException;
import org.apache.calcite.rel.RelDistribution;
import org.apache.pinot.common.datablock.DataBlock;
import org.apache.pinot.query.mailbox.ReceivingMailbox;
import org.apache.pinot.query.mailbox.SendingMailbox;
import org.apache.pinot.query.planner.partitioning.KeySelectorFactory;
Expand Down Expand Up @@ -142,10 +141,13 @@ protected void sendBlock(SendingMailbox sendingMailbox, TransferableBlock block)
return;
}

DataBlock.Type type = block.getType();
Iterator<TransferableBlock> splits = _splitter.split(block, type, MAX_MAILBOX_CONTENT_SIZE_BYTES);
while (splits.hasNext()) {
sendingMailbox.send(splits.next());
if (sendingMailbox.isLocal()) {
sendingMailbox.send(block);
} else {
Iterator<TransferableBlock> splits = _splitter.split(block, MAX_MAILBOX_CONTENT_SIZE_BYTES);
while (splits.hasNext()) {
sendingMailbox.send(splits.next());
}
}
if (LOGGER.isTraceEnabled()) {
LOGGER.trace("Block sent: {} {} to {}", block.getType(), System.identityHashCode(block), sendingMailbox);
Expand Down Expand Up @@ -193,6 +195,11 @@ public BlockExchangeSendingMailbox(String id) {
_id = id;
}

@Override
public boolean isLocal() {
return true;
}

@Override
public void send(TransferableBlock block)
throws IOException, TimeoutException {
Expand Down

0 comments on commit e5cd4f2

Please sign in to comment.