From e5cd4f2e41a89270a547988667b40110ff8aad61 Mon Sep 17 00:00:00 2001 From: "Xiaotian (Jackie) Jiang" Date: Wed, 29 Jan 2025 11:27:35 -0800 Subject: [PATCH] Skip block split for local exchange --- .../query/mailbox/GrpcSendingMailbox.java | 5 ++++ .../query/mailbox/InMemorySendingMailbox.java | 5 ++++ .../pinot/query/mailbox/SendingMailbox.java | 6 ++++ .../query/runtime/blocks/BlockSplitter.java | 5 ++-- .../blocks/TransferableBlockUtils.java | 28 +++++++++++-------- .../operator/exchange/BlockExchange.java | 17 +++++++---- 6 files changed, 46 insertions(+), 20 deletions(-) diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcSendingMailbox.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcSendingMailbox.java index 3926d7cdbbdf..9ba870a19bce 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcSendingMailbox.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcSendingMailbox.java @@ -62,6 +62,11 @@ public GrpcSendingMailbox(String id, ChannelManager channelManager, String hostn _statMap = statMap; } + @Override + public boolean isLocal() { + return false; + } + @Override public void send(TransferableBlock block) throws IOException { diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/InMemorySendingMailbox.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/InMemorySendingMailbox.java index 5fb21c96c4a0..493e298b5ce0 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/InMemorySendingMailbox.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/InMemorySendingMailbox.java @@ -48,6 +48,11 @@ public InMemorySendingMailbox(String id, MailboxService mailboxService, long dea _statMap = statMap; } + @Override + public boolean isLocal() { + return true; + } + @Override public void send(TransferableBlock block) throws TimeoutException { diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/SendingMailbox.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/SendingMailbox.java index 576a4703dacc..964a65d1e6f9 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/SendingMailbox.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/SendingMailbox.java @@ -29,6 +29,12 @@ */ public interface SendingMailbox { + /** + * Returns whether the mailbox is sending data to a local receiver, where blocks can be directly passed to the + * receiver. + */ + boolean isLocal(); + /** * Sends a block to the receiver. Note that SendingMailbox are required to acquire resources lazily in this call, and * they should not acquire any resources when they are created. This method should throw if there was an error diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/BlockSplitter.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/BlockSplitter.java index 096003d444f8..ac5940f549c4 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/BlockSplitter.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/BlockSplitter.java @@ -20,7 +20,6 @@ import com.google.common.collect.Iterators; import java.util.Iterator; -import org.apache.pinot.common.datablock.BaseDataBlock; /** @@ -29,10 +28,10 @@ * underlying transport. */ public interface BlockSplitter { - BlockSplitter NO_OP = (block, type, maxBlockSize) -> Iterators.singletonIterator(block); + BlockSplitter NO_OP = (block, maxBlockSize) -> Iterators.singletonIterator(block); /** * @return a list of blocks that was split from the original {@code block} */ - Iterator split(TransferableBlock block, BaseDataBlock.Type type, int maxBlockSize); + Iterator split(TransferableBlock block, int maxBlockSize); } diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/TransferableBlockUtils.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/TransferableBlockUtils.java index 2f58711033b1..0d5a56d3445a 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/TransferableBlockUtils.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/TransferableBlockUtils.java @@ -27,6 +27,7 @@ import org.apache.pinot.common.datablock.DataBlock; import org.apache.pinot.common.datablock.DataBlockUtils; import org.apache.pinot.common.datablock.MetadataBlock; +import org.apache.pinot.common.utils.DataSchema; import org.apache.pinot.query.runtime.plan.MultiStageQueryStats; @@ -63,7 +64,6 @@ public static boolean isEndOfStream(TransferableBlock transferableBlock) { } /** - * * Split a block into multiple block so that each block size is within maxBlockSize. Currently, *
    *
  • For row data block, we split for row type dataBlock.
  • @@ -72,25 +72,29 @@ public static boolean isEndOfStream(TransferableBlock transferableBlock) { *
* * @param block the data block - * @param type type of block * @param maxBlockSize Each chunk of data is estimated to be less than maxBlockSize * @return a list of data block chunks */ - public static Iterator splitBlock(TransferableBlock block, DataBlock.Type type, int maxBlockSize) { - List blockChunks = new ArrayList<>(); + public static Iterator splitBlock(TransferableBlock block, int maxBlockSize) { + DataBlock.Type type = block.getType(); if (type == DataBlock.Type.ROW) { // Use estimated row size, this estimate is not accurate and is used to estimate numRowsPerChunk only. - int estimatedRowSizeInBytes = block.getDataSchema().getColumnNames().length * MEDIAN_COLUMN_SIZE_BYTES; + DataSchema dataSchema = block.getDataSchema(); + assert dataSchema != null; + int estimatedRowSizeInBytes = dataSchema.getColumnNames().length * MEDIAN_COLUMN_SIZE_BYTES; int numRowsPerChunk = maxBlockSize / estimatedRowSizeInBytes; Preconditions.checkState(numRowsPerChunk > 0, "row size too large for query engine to handle, abort!"); - int totalNumRows = block.getNumRows(); - List allRows = block.getContainer(); - int currentRow = 0; - while (currentRow < totalNumRows) { - List chunk = allRows.subList(currentRow, Math.min(currentRow + numRowsPerChunk, allRows.size())); - currentRow += numRowsPerChunk; - blockChunks.add(new TransferableBlock(chunk, block.getDataSchema(), block.getType())); + List rows = block.getContainer(); + int numRows = rows.size(); + int numChunks = (numRows + numRowsPerChunk - 1) / numRowsPerChunk; + if (numChunks == 1) { + return Iterators.singletonIterator(block); + } + List blockChunks = new ArrayList<>(numChunks); + for (int i = 0; i < numRows; i++) { + int end = Math.min(i + numRowsPerChunk, numRows); + blockChunks.add(new TransferableBlock(rows.subList(i, end), dataSchema, DataBlock.Type.ROW)); } return blockChunks.iterator(); } else if (type == DataBlock.Type.METADATA) { diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/exchange/BlockExchange.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/exchange/BlockExchange.java index f10699e820c0..418c4bbbfbcd 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/exchange/BlockExchange.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/exchange/BlockExchange.java @@ -24,7 +24,6 @@ import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeoutException; import org.apache.calcite.rel.RelDistribution; -import org.apache.pinot.common.datablock.DataBlock; import org.apache.pinot.query.mailbox.ReceivingMailbox; import org.apache.pinot.query.mailbox.SendingMailbox; import org.apache.pinot.query.planner.partitioning.KeySelectorFactory; @@ -142,10 +141,13 @@ protected void sendBlock(SendingMailbox sendingMailbox, TransferableBlock block) return; } - DataBlock.Type type = block.getType(); - Iterator splits = _splitter.split(block, type, MAX_MAILBOX_CONTENT_SIZE_BYTES); - while (splits.hasNext()) { - sendingMailbox.send(splits.next()); + if (sendingMailbox.isLocal()) { + sendingMailbox.send(block); + } else { + Iterator splits = _splitter.split(block, MAX_MAILBOX_CONTENT_SIZE_BYTES); + while (splits.hasNext()) { + sendingMailbox.send(splits.next()); + } } if (LOGGER.isTraceEnabled()) { LOGGER.trace("Block sent: {} {} to {}", block.getType(), System.identityHashCode(block), sendingMailbox); @@ -193,6 +195,11 @@ public BlockExchangeSendingMailbox(String id) { _id = id; } + @Override + public boolean isLocal() { + return true; + } + @Override public void send(TransferableBlock block) throws IOException, TimeoutException {