diff --git a/pinot-common/src/main/java/org/apache/pinot/common/protocols/SegmentCompletionProtocol.java b/pinot-common/src/main/java/org/apache/pinot/common/protocols/SegmentCompletionProtocol.java index 86d880a9357b..f5aed34154b7 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/protocols/SegmentCompletionProtocol.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/protocols/SegmentCompletionProtocol.java @@ -148,6 +148,9 @@ public enum ControllerResponseStatus { public static final String REASON_END_OF_PARTITION_GROUP = "endOfPartitionGroup"; // Stop reason sent by server as force commit message received public static final String REASON_FORCE_COMMIT_MESSAGE_RECEIVED = "forceCommitMessageReceived"; + // Stop reason sent by server as mutable index cannot consume more rows + // (like size reaching close to its limit or number of col values for a col is about to overflow int max) + public static final String REASON_INDEX_CAPACITY_THRESHOLD_BREACHED = "indexCapacityThresholdBreached"; // Canned responses public static final Response RESP_NOT_LEADER = diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java index 35e8aa3c466b..06f46495337b 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java @@ -363,6 +363,13 @@ private boolean endCriteriaReached() { _numRowsConsumed, _numRowsIndexed); _stopReason = SegmentCompletionProtocol.REASON_FORCE_COMMIT_MESSAGE_RECEIVED; return true; + } else if (!canAddMore()) { + _segmentLogger.info( + "Stopping consumption as mutable index cannot consume more rows - numRowsConsumed={} " + + "numRowsIndexed={}", + _numRowsConsumed, _numRowsIndexed); + _stopReason = SegmentCompletionProtocol.REASON_INDEX_CAPACITY_THRESHOLD_BREACHED; + return true; } return false; @@ -697,6 +704,11 @@ private boolean processStreamEvents(MessageBatch messageBatch, long idlePipeSlee return prematureExit; } + @VisibleForTesting + boolean canAddMore() { + return _realtimeSegment.canAddMore(); + } + public class PartitionConsumer implements Runnable { public void run() { long initialConsumptionEnd = 0L; diff --git a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManagerTest.java b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManagerTest.java index f7ca4530bd35..60a2f1923364 100644 --- a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManagerTest.java +++ b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManagerTest.java @@ -637,6 +637,19 @@ public void testEndCriteriaChecking() segmentDataManager._timeSupplier.set(endTime); Assert.assertTrue(segmentDataManager.invokeEndCriteriaReached()); } + + // test end criteria reached if any of the index cannot take more rows + try (FakeRealtimeSegmentDataManager segmentDataManager = createFakeSegmentManager(false, new TimeSupplier(), null, + null, null)) { + segmentDataManager._state.set(segmentDataManager, RealtimeSegmentDataManager.State.INITIAL_CONSUMING); + Assert.assertFalse(segmentDataManager.invokeEndCriteriaReached()); + + segmentDataManager.setIndexCapacityThresholdBreached(true); + + Assert.assertTrue(segmentDataManager.invokeEndCriteriaReached()); + Assert.assertEquals(segmentDataManager.getStopReason(), + SegmentCompletionProtocol.REASON_INDEX_CAPACITY_THRESHOLD_BREACHED); + } } private void setHasMessagesFetched(FakeRealtimeSegmentDataManager segmentDataManager, boolean hasMessagesFetched) @@ -907,6 +920,7 @@ public static class FakeRealtimeSegmentDataManager extends RealtimeSegmentDataMa public Map _semaphoreMap; public boolean _stubConsumeLoop = true; private TimeSupplier _timeSupplier; + private boolean _indexCapacityThresholdBreached; private static InstanceDataManagerConfig makeInstanceDataManagerConfig() { InstanceDataManagerConfig dataManagerConfig = mock(InstanceDataManagerConfig.class); @@ -1087,6 +1101,15 @@ public void setFinalOffset(long offset) { setOffset(offset, "_finalOffset"); } + @Override + protected boolean canAddMore() { + return !_indexCapacityThresholdBreached; + } + + public void setIndexCapacityThresholdBreached(boolean indexCapacityThresholdBreached) { + _indexCapacityThresholdBreached = indexCapacityThresholdBreached; + } + public boolean invokeEndCriteriaReached() { Method endCriteriaReached = null; try { diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java index d7008665fdcd..1812334f67b9 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java @@ -150,6 +150,7 @@ public class MutableSegmentImpl implements MutableSegment { private final File _consumerDir; private final Map _indexContainerMap = new HashMap<>(); + private boolean _indexCapacityThresholdBreached; private final IdMap _recordIdMap; @@ -828,7 +829,20 @@ private void addNewRow(int docId, GenericRow row) { Object[] values = (Object[]) value; for (Map.Entry indexEntry : indexContainer._mutableIndexes.entrySet()) { try { - indexEntry.getValue().add(values, dictIds, docId); + MutableIndex mutableIndex = indexEntry.getValue(); + mutableIndex.add(values, dictIds, docId); + // Few of the Immutable version of the mutable index are bounded by size like FixedBitMVForwardIndex. + // If num of values overflows or size is above limit, A mutable index is unable to convert to + // an immutable index and segment build fails causing the realtime consumption to stop. + // Hence, The below check is a temporary measure to avoid such scenarios until immutable index + // implementations are changed. + if (!_indexCapacityThresholdBreached && !mutableIndex.canAddMore()) { + _logger.info( + "Index: {} for column: {} cannot consume more rows, marking _indexCapacityThresholdBreached as true", + indexEntry.getKey(), column + ); + _indexCapacityThresholdBreached = true; + } } catch (Exception e) { recordIndexingError(indexEntry.getKey(), e); } @@ -1265,6 +1279,10 @@ private boolean isAggregateMetricsEnabled() { return _recordIdMap != null; } + public boolean canAddMore() { + return !_indexCapacityThresholdBreached; + } + // NOTE: Okay for single-writer @SuppressWarnings("NonAtomicOperationOnVolatileField") private static class ValuesInfo { diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/forward/FixedByteMVMutableForwardIndex.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/forward/FixedByteMVMutableForwardIndex.java index e30f15f81d4c..b1826e08a82e 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/forward/FixedByteMVMutableForwardIndex.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/forward/FixedByteMVMutableForwardIndex.java @@ -104,6 +104,9 @@ public class FixedByteMVMutableForwardIndex implements MutableForwardIndex { private static final int INCREMENT_PERCENTAGE = 100; //Increments the Initial size by 100% of initial capacity every time we runs out of capacity + // Conservative figure to not breach 2GB size limit for immutable index + private final static int DEFAULT_THRESHOLD_FOR_NUM_OF_VALUES_PER_COLUMN = 450_000_000; + // For single writer multiple readers setup, use ArrayList for writer and CopyOnWriteArrayList for reader private final List _headerWriters = new ArrayList<>(); private final List _headerReaders = new CopyOnWriteArrayList<>(); @@ -124,6 +127,7 @@ public class FixedByteMVMutableForwardIndex implements MutableForwardIndex { private int _currentCapacity = 0; private int _prevRowStartIndex = 0; // Offset in the data-buffer for the last row added. private int _prevRowLength = 0; // Number of values in the column for the last row added. + private int _numValues = 0; public FixedByteMVMutableForwardIndex(int maxNumberOfMultiValuesPerRow, int avgMultiValueCount, int rowCountPerChunk, int columnSizeInBytes, PinotDataBufferMemoryManager memoryManager, String context, boolean isDictionaryEncoded, @@ -200,6 +204,7 @@ private int getRowInCurrentHeader(int row) { private int updateHeader(int row, int numValues) { assert (numValues <= _maxNumberOfMultiValuesPerRow); + _numValues += numValues; int newStartIndex = _prevRowStartIndex + _prevRowLength; if (newStartIndex + numValues > _currentCapacity) { addDataBuffer(_incrementalCapacity); @@ -414,6 +419,11 @@ public void setDoubleMV(int docId, double[] values) { } } + @Override + public boolean canAddMore() { + return _numValues < DEFAULT_THRESHOLD_FOR_NUM_OF_VALUES_PER_COLUMN; + } + @Override public void close() throws IOException { diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentEntriesAboveThresholdTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentEntriesAboveThresholdTest.java new file mode 100644 index 000000000000..1eaaab657d21 --- /dev/null +++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentEntriesAboveThresholdTest.java @@ -0,0 +1,194 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.segment.local.indexsegment.mutable; + +import java.io.File; +import java.io.IOException; +import java.lang.reflect.Field; +import java.net.URL; +import java.util.Collections; +import java.util.Map; +import org.apache.commons.io.FileUtils; +import org.apache.pinot.segment.local.segment.creator.SegmentTestUtils; +import org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl; +import org.apache.pinot.segment.local.segment.index.forward.ForwardIndexPlugin; +import org.apache.pinot.segment.local.segment.virtualcolumn.VirtualColumnProviderFactory; +import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig; +import org.apache.pinot.segment.spi.creator.SegmentIndexCreationDriver; +import org.apache.pinot.segment.spi.index.IndexType; +import org.apache.pinot.segment.spi.index.StandardIndexes; +import org.apache.pinot.segment.spi.index.mutable.MutableForwardIndex; +import org.apache.pinot.segment.spi.index.mutable.MutableIndex; +import org.apache.pinot.spi.data.FieldSpec; +import org.apache.pinot.spi.data.Schema; +import org.apache.pinot.spi.data.readers.FileFormat; +import org.apache.pinot.spi.data.readers.GenericRow; +import org.apache.pinot.spi.data.readers.RecordReader; +import org.apache.pinot.spi.data.readers.RecordReaderFactory; +import org.apache.pinot.spi.stream.StreamMessageMetadata; +import org.testng.Assert; +import org.testng.annotations.Test; + + +public class MutableSegmentEntriesAboveThresholdTest { + private static final File TEMP_DIR = + new File(FileUtils.getTempDirectory(), MutableSegmentEntriesAboveThresholdTest.class.getSimpleName()); + private static final String AVRO_FILE = "data/test_data-mv.avro"; + private Schema _schema; + + private static class FakeMutableForwardIndex implements MutableForwardIndex { + + private final MutableForwardIndex _mutableForwardIndex; + private static final int THRESHOLD = 2; + private int _numValues; + + FakeMutableForwardIndex(MutableForwardIndex mutableForwardIndex) { + _mutableForwardIndex = mutableForwardIndex; + _numValues = 0; + } + + @Override + public boolean canAddMore() { + return _numValues < THRESHOLD; + } + + @Override + public void setDictIdMV(int docId, int[] dictIds) { + _numValues += dictIds.length; + _mutableForwardIndex.setDictIdMV(docId, dictIds); + } + + @Override + public int getLengthOfShortestElement() { + return _mutableForwardIndex.getLengthOfShortestElement(); + } + + @Override + public int getLengthOfLongestElement() { + return _mutableForwardIndex.getLengthOfLongestElement(); + } + + @Override + public void setDictId(int docId, int dictId) { + _mutableForwardIndex.setDictId(docId, dictId); + } + + @Override + public boolean isDictionaryEncoded() { + return _mutableForwardIndex.isDictionaryEncoded(); + } + + @Override + public boolean isSingleValue() { + return _mutableForwardIndex.isSingleValue(); + } + + @Override + public FieldSpec.DataType getStoredType() { + return _mutableForwardIndex.getStoredType(); + } + + @Override + public void close() + throws IOException { + _mutableForwardIndex.close(); + } + } + + private File getAvroFile() { + URL resourceUrl = MutableSegmentImplTest.class.getClassLoader().getResource(AVRO_FILE); + Assert.assertNotNull(resourceUrl); + return new File(resourceUrl.getFile()); + } + + private MutableSegmentImpl getMutableSegment(File avroFile) + throws Exception { + FileUtils.deleteQuietly(TEMP_DIR); + + SegmentGeneratorConfig config = + SegmentTestUtils.getSegmentGeneratorConfigWithoutTimeColumn(avroFile, TEMP_DIR, "testTable"); + SegmentIndexCreationDriver driver = new SegmentIndexCreationDriverImpl(); + driver.init(config); + driver.build(); + + _schema = config.getSchema(); + VirtualColumnProviderFactory.addBuiltInVirtualColumnsToSegmentSchema(_schema, "testSegment"); + return MutableSegmentImplTestUtils + .createMutableSegmentImpl(_schema, Collections.emptySet(), Collections.emptySet(), Collections.emptySet(), + Collections.emptyMap(), + false, false, null, null, null, null, null, null, Collections.emptyList()); + } + + @Test + public void testNoLimitBreached() + throws Exception { + File avroFile = getAvroFile(); + MutableSegmentImpl mutableSegment = getMutableSegment(avroFile); + StreamMessageMetadata defaultMetadata = new StreamMessageMetadata(System.currentTimeMillis(), new GenericRow()); + try (RecordReader recordReader = RecordReaderFactory + .getRecordReader(FileFormat.AVRO, avroFile, _schema.getColumnNames(), null)) { + GenericRow reuse = new GenericRow(); + while (recordReader.hasNext()) { + mutableSegment.index(recordReader.next(reuse), defaultMetadata); + } + } + assert mutableSegment.canAddMore(); + } + + @Test + public void testLimitBreached() + throws Exception { + File avroFile = getAvroFile(); + MutableSegmentImpl mutableSegment = getMutableSegment(avroFile); + + Field indexContainerMapField = MutableSegmentImpl.class.getDeclaredField("_indexContainerMap"); + indexContainerMapField.setAccessible(true); + Map colVsIndexContainer = (Map) indexContainerMapField.get(mutableSegment); + + for (Map.Entry entry : colVsIndexContainer.entrySet()) { + Object indexContainer = entry.getValue(); + Field mutableIndexesField = indexContainer.getClass().getDeclaredField("_mutableIndexes"); + mutableIndexesField.setAccessible(true); + Map indexTypeVsMutableIndex = + (Map) mutableIndexesField.get(indexContainer); + + MutableForwardIndex mutableForwardIndex = null; + for (IndexType indexType : indexTypeVsMutableIndex.keySet()) { + if (indexType.getId().equals(StandardIndexes.FORWARD_ID)) { + mutableForwardIndex = (MutableForwardIndex) indexTypeVsMutableIndex.get(indexType); + } + } + + assert mutableForwardIndex != null; + + indexTypeVsMutableIndex.put(new ForwardIndexPlugin().getIndexType(), + new FakeMutableForwardIndex(mutableForwardIndex)); + } + StreamMessageMetadata defaultMetadata = new StreamMessageMetadata(System.currentTimeMillis(), new GenericRow()); + try (RecordReader recordReader = RecordReaderFactory + .getRecordReader(FileFormat.AVRO, avroFile, _schema.getColumnNames(), null)) { + GenericRow reuse = new GenericRow(); + while (recordReader.hasNext()) { + mutableSegment.index(recordReader.next(reuse), defaultMetadata); + } + } + + assert !mutableSegment.canAddMore(); + } +} diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/forward/mutable/FixedByteMVMutableForwardIndexTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/forward/mutable/FixedByteMVMutableForwardIndexTest.java index 326827756257..77fd706f3c64 100644 --- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/forward/mutable/FixedByteMVMutableForwardIndexTest.java +++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/forward/mutable/FixedByteMVMutableForwardIndexTest.java @@ -19,6 +19,7 @@ package org.apache.pinot.segment.local.segment.index.forward.mutable; import java.io.IOException; +import java.lang.reflect.Field; import java.util.Arrays; import java.util.Random; import org.apache.pinot.segment.local.io.writer.impl.DirectMemoryManager; @@ -78,6 +79,7 @@ public void testIntArray(final long seed, boolean isDictionaryEncoded) readerWriter = new FixedByteMVMutableForwardIndex(maxNumberOfMultiValuesPerRow, 2, rows / 2, columnSizeInBytes, _memoryManager, "IntArray", isDictionaryEncoded, FieldSpec.DataType.INT); + int valuesAdded = 0; Random r = new Random(seed); int[][] data = new int[rows][]; @@ -87,6 +89,7 @@ public void testIntArray(final long seed, boolean isDictionaryEncoded) data[i][j] = r.nextInt(); } readerWriter.setIntMV(i, data[i]); + valuesAdded += data[i].length; } int[] ret = new int[maxNumberOfMultiValuesPerRow]; for (int i = 0; i < rows; i++) { @@ -94,6 +97,7 @@ public void testIntArray(final long seed, boolean isDictionaryEncoded) Assert.assertEquals(data[i].length, length, "Failed with seed=" + seed); Assert.assertTrue(Arrays.equals(data[i], Arrays.copyOf(ret, length)), "Failed with seed=" + seed); } + validateNumOfValues(readerWriter, valuesAdded); readerWriter.close(); } @@ -106,6 +110,7 @@ public void testIntArrayFixedSize(int multiValuesPerRow, long seed, boolean isDi // transition to new ones readerWriter = new FixedByteMVMutableForwardIndex(multiValuesPerRow, multiValuesPerRow, multiValuesPerRow * 2, columnSizeInBytes, _memoryManager, "IntArrayFixedSize", isDictionaryEncoded, FieldSpec.DataType.INT); + int valuesAdded = 0; Random r = new Random(seed); int[][] data = new int[rows][]; @@ -115,6 +120,7 @@ public void testIntArrayFixedSize(int multiValuesPerRow, long seed, boolean isDi data[i][j] = r.nextInt(); } readerWriter.setIntMV(i, data[i]); + valuesAdded += data[i].length; } int[] ret = new int[multiValuesPerRow]; for (int i = 0; i < rows; i++) { @@ -122,6 +128,7 @@ public void testIntArrayFixedSize(int multiValuesPerRow, long seed, boolean isDi Assert.assertEquals(data[i].length, length, "Failed with seed=" + seed); Assert.assertTrue(Arrays.equals(data[i], Arrays.copyOf(ret, length)), "Failed with seed=" + seed); } + validateNumOfValues(readerWriter, valuesAdded); readerWriter.close(); } @@ -135,6 +142,7 @@ public void testWithZeroSize(long seed, boolean isDictionaryEncoded) readerWriter = new FixedByteMVMutableForwardIndex(maxNumberOfMultiValuesPerRow, 3, r.nextInt(rows) + 1, columnSizeInBytes, _memoryManager, "ZeroSize", isDictionaryEncoded, FieldSpec.DataType.INT); + int valuesAdded = 0; int[][] data = new int[rows][]; for (int i = 0; i < rows; i++) { @@ -144,9 +152,11 @@ public void testWithZeroSize(long seed, boolean isDictionaryEncoded) data[i][j] = r.nextInt(); } readerWriter.setIntMV(i, data[i]); + valuesAdded += data[i].length; } else { data[i] = new int[0]; readerWriter.setIntMV(i, data[i]); + valuesAdded += data[i].length; } } int[] ret = new int[maxNumberOfMultiValuesPerRow]; @@ -155,6 +165,7 @@ public void testWithZeroSize(long seed, boolean isDictionaryEncoded) Assert.assertEquals(data[i].length, length, "Failed with seed=" + seed); Assert.assertTrue(Arrays.equals(data[i], Arrays.copyOf(ret, length)), "Failed with seed=" + seed); } + validateNumOfValues(readerWriter, valuesAdded); readerWriter.close(); } @@ -187,6 +198,7 @@ private void testLongArray(boolean isDictionaryEncoded) final int maxNumberOfMultiValuesPerRow = r.nextInt(100) + 1; FixedByteMVMutableForwardIndex readerWriter = createReaderWriter(FieldSpec.DataType.LONG, r, rows, maxNumberOfMultiValuesPerRow, isDictionaryEncoded); + int valuesAdded = 0; long[][] data = new long[rows][]; for (int i = 0; i < rows; i++) { @@ -196,9 +208,11 @@ private void testLongArray(boolean isDictionaryEncoded) data[i][j] = r.nextLong(); } readerWriter.setLongMV(i, data[i]); + valuesAdded += data[i].length; } else { data[i] = new long[0]; readerWriter.setLongMV(i, data[i]); + valuesAdded += data[i].length; } } long[] ret = new long[maxNumberOfMultiValuesPerRow]; @@ -207,6 +221,7 @@ private void testLongArray(boolean isDictionaryEncoded) Assert.assertEquals(data[i].length, length, "Failed with seed=" + seed); Assert.assertTrue(Arrays.equals(data[i], Arrays.copyOf(ret, length)), "Failed with seed=" + seed); } + validateNumOfValues(readerWriter, valuesAdded); readerWriter.close(); } @@ -225,6 +240,7 @@ private void testFloatArray(boolean isDictoinaryEncoded) final int maxNumberOfMultiValuesPerRow = r.nextInt(100) + 1; FixedByteMVMutableForwardIndex readerWriter = createReaderWriter(FieldSpec.DataType.FLOAT, r, rows, maxNumberOfMultiValuesPerRow, isDictoinaryEncoded); + int valuesAdded = 0; float[][] data = new float[rows][]; for (int i = 0; i < rows; i++) { @@ -234,9 +250,11 @@ private void testFloatArray(boolean isDictoinaryEncoded) data[i][j] = r.nextFloat(); } readerWriter.setFloatMV(i, data[i]); + valuesAdded += data[i].length; } else { data[i] = new float[0]; readerWriter.setFloatMV(i, data[i]); + valuesAdded += data[i].length; } } float[] ret = new float[maxNumberOfMultiValuesPerRow]; @@ -245,6 +263,7 @@ private void testFloatArray(boolean isDictoinaryEncoded) Assert.assertEquals(data[i].length, length, "Failed with seed=" + seed); Assert.assertTrue(Arrays.equals(data[i], Arrays.copyOf(ret, length)), "Failed with seed=" + seed); } + validateNumOfValues(readerWriter, valuesAdded); readerWriter.close(); } @@ -263,6 +282,7 @@ private void testDoubleArray(boolean isDictonaryEncoded) final int maxNumberOfMultiValuesPerRow = r.nextInt(100) + 1; FixedByteMVMutableForwardIndex readerWriter = createReaderWriter(FieldSpec.DataType.DOUBLE, r, rows, maxNumberOfMultiValuesPerRow, isDictonaryEncoded); + int valuesAdded = 0; double[][] data = new double[rows][]; for (int i = 0; i < rows; i++) { @@ -272,9 +292,11 @@ private void testDoubleArray(boolean isDictonaryEncoded) data[i][j] = r.nextDouble(); } readerWriter.setDoubleMV(i, data[i]); + valuesAdded += data[i].length; } else { data[i] = new double[0]; readerWriter.setDoubleMV(i, data[i]); + valuesAdded += data[i].length; } } double[] ret = new double[maxNumberOfMultiValuesPerRow]; @@ -283,6 +305,25 @@ private void testDoubleArray(boolean isDictonaryEncoded) Assert.assertEquals(data[i].length, length, "Failed with seed=" + seed); Assert.assertTrue(Arrays.equals(data[i], Arrays.copyOf(ret, length)), "Failed with seed=" + seed); } + validateNumOfValues(readerWriter, valuesAdded); readerWriter.close(); } + + private int getNumValues(FixedByteMVMutableForwardIndex readerWriter) + throws NoSuchFieldException, IllegalAccessException { + Field numValuesField = FixedByteMVMutableForwardIndex.class.getDeclaredField("_numValues"); + numValuesField.setAccessible(true); + return (int) numValuesField.get(readerWriter); + } + + private void validateNumOfValues(FixedByteMVMutableForwardIndex readerWriter, int valuesAdded) { + int numValuesPresentInIndex; + try { + numValuesPresentInIndex = getNumValues(readerWriter); + } catch (Exception e) { + throw new AssertionError("failed to validate the num of values added in the index"); + } + Assert.assertEquals(numValuesPresentInIndex, valuesAdded); + Assert.assertTrue(readerWriter.canAddMore()); + } } diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/forward/mutable/FixedByteSVMutableForwardIndexTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/forward/mutable/FixedByteSVMutableForwardIndexTest.java index 1e0d82779367..cadea9a1fca7 100644 --- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/forward/mutable/FixedByteSVMutableForwardIndexTest.java +++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/forward/mutable/FixedByteSVMutableForwardIndexTest.java @@ -112,6 +112,7 @@ private void testDictId(final Random random, final int rows, final int div) for (int i = 0; i < 2 * rows; i++) { Assert.assertEquals(readerWriter.getDictId(start + i), 0); } + Assert.assertTrue(readerWriter.canAddMore()); readerWriter.close(); } diff --git a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/mutable/MutableIndex.java b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/mutable/MutableIndex.java index dc3bdc98691d..494361947b25 100644 --- a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/mutable/MutableIndex.java +++ b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/mutable/MutableIndex.java @@ -68,4 +68,11 @@ public interface MutableIndex extends IndexReader { */ default void commit() { } + + /** + * Returns a boolean denoting whether the mutable index can consume any more rows or not. + */ + default boolean canAddMore() { + return true; + } }