Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Backport 2.x] star tree file formats refactoring and fixing offset bug (#15975) #16302

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.opensearch.index.compositeindex.datacube.startree.node.InMemoryTreeNode;
import org.opensearch.index.compositeindex.datacube.startree.node.StarTreeNodeType;
import org.opensearch.index.compositeindex.datacube.startree.utils.SequentialDocValuesIterator;
import org.opensearch.index.compositeindex.datacube.startree.utils.iterator.SortedNumericStarTreeValuesIterator;
import org.opensearch.index.mapper.DocCountFieldMapper;
import org.opensearch.index.mapper.FieldMapper;
import org.opensearch.index.mapper.FieldValueConverter;
Expand Down Expand Up @@ -193,7 +194,9 @@ public List<SequentialDocValuesIterator> getMetricReaders(SegmentWriteState stat
metricFieldInfo = getFieldInfo(metric.getField(), DocValuesType.SORTED_NUMERIC);
}
metricReader = new SequentialDocValuesIterator(
fieldProducerMap.get(metricFieldInfo.name).getSortedNumeric(metricFieldInfo)
new SortedNumericStarTreeValuesIterator(
fieldProducerMap.get(metricFieldInfo.name).getSortedNumeric(metricFieldInfo)
)
);
}
metricReaders.add(metricReader);
Expand Down Expand Up @@ -228,7 +231,7 @@ public void build(
dimensionFieldInfo = getFieldInfo(dimension, DocValuesType.SORTED_NUMERIC);
}
dimensionReaders[i] = new SequentialDocValuesIterator(
fieldProducerMap.get(dimensionFieldInfo.name).getSortedNumeric(dimensionFieldInfo)
new SortedNumericStarTreeValuesIterator(fieldProducerMap.get(dimensionFieldInfo.name).getSortedNumeric(dimensionFieldInfo))
);
}
Iterator<StarTreeDocument> starTreeDocumentIterator = sortAndAggregateSegmentDocuments(dimensionReaders, metricReaders);
Expand Down Expand Up @@ -287,7 +290,7 @@ void appendDocumentsToStarTree(Iterator<StarTreeDocument> starTreeDocumentIterat
}
}

private void serializeStarTree(int numSegmentStarTreeDocument, int numStarTreeDocs) throws IOException {
private void serializeStarTree(int numSegmentStarTreeDocuments, int numStarTreeDocs) throws IOException {
// serialize the star tree data
long dataFilePointer = dataOut.getFilePointer();
StarTreeWriter starTreeWriter = new StarTreeWriter();
Expand All @@ -299,7 +302,7 @@ private void serializeStarTree(int numSegmentStarTreeDocument, int numStarTreeDo
starTreeField,
metricAggregatorInfos,
numStarTreeNodes,
numSegmentStarTreeDocument,
numSegmentStarTreeDocuments,
numStarTreeDocs,
dataFilePointer,
totalStarTreeDataLength
Expand Down Expand Up @@ -400,22 +403,20 @@ protected StarTreeDocument getStarTreeDocument(
) throws IOException {
Long[] dims = new Long[numDimensions];
int i = 0;
for (SequentialDocValuesIterator dimensionDocValueIterator : dimensionReaders) {
dimensionDocValueIterator.nextDoc(currentDocId);
Long val = dimensionDocValueIterator.value(currentDocId);
for (SequentialDocValuesIterator dimensionValueIterator : dimensionReaders) {
dimensionValueIterator.nextEntry(currentDocId);
Long val = dimensionValueIterator.value(currentDocId);
dims[i] = val;
i++;
}
i = 0;
Object[] metrics = new Object[metricReaders.size()];
for (SequentialDocValuesIterator metricDocValuesIterator : metricReaders) {
metricDocValuesIterator.nextDoc(currentDocId);
for (SequentialDocValuesIterator metricValuesIterator : metricReaders) {
metricValuesIterator.nextEntry(currentDocId);
// As part of merge, we traverse the star tree doc values
// The type of data stored in metric fields is different from the
// actual indexing field they're based on
metrics[i] = metricAggregatorInfos.get(i)
.getValueAggregators()
.toAggregatedValueType(metricDocValuesIterator.value(currentDocId));
metrics[i] = metricAggregatorInfos.get(i).getValueAggregators().toAggregatedValueType(metricValuesIterator.value(currentDocId));
i++;
}
return new StarTreeDocument(dims, metrics);
Expand Down Expand Up @@ -502,7 +503,7 @@ Long[] getStarTreeDimensionsFromSegment(int currentDocId, SequentialDocValuesIte
for (int i = 0; i < numDimensions; i++) {
if (dimensionReaders[i] != null) {
try {
dimensionReaders[i].nextDoc(currentDocId);
dimensionReaders[i].nextEntry(currentDocId);
} catch (IOException e) {
logger.error("unable to iterate to next doc", e);
throw new RuntimeException("unable to iterate to next doc", e);
Expand Down Expand Up @@ -530,7 +531,7 @@ private Object[] getStarTreeMetricsFromSegment(int currentDocId, List<Sequential
SequentialDocValuesIterator metricStatReader = metricsReaders.get(i);
if (metricStatReader != null) {
try {
metricStatReader.nextDoc(currentDocId);
metricStatReader.nextEntry(currentDocId);
} catch (IOException e) {
logger.error("unable to iterate to next doc", e);
throw new RuntimeException("unable to iterate to next doc", e);
Expand Down Expand Up @@ -672,7 +673,7 @@ private SequentialDocValuesIterator getIteratorForNumericField(
SequentialDocValuesIterator sequentialDocValuesIterator;
assert fieldProducerMap.containsKey(fieldInfo.name);
sequentialDocValuesIterator = new SequentialDocValuesIterator(
DocValues.singleton(fieldProducerMap.get(fieldInfo.name).getNumeric(fieldInfo))
new SortedNumericStarTreeValuesIterator(DocValues.singleton(fieldProducerMap.get(fieldInfo.name).getNumeric(fieldInfo)))
);
return sequentialDocValuesIterator;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ Iterator<StarTreeDocument> mergeStarTrees(List<StarTreeValues> starTreeValuesSub
.size()];
for (int i = 0; i < dimensionsSplitOrder.size(); i++) {
String dimension = dimensionsSplitOrder.get(i).getField();
dimensionReaders[i] = new SequentialDocValuesIterator(starTreeValues.getDimensionDocIdSetIterator(dimension));
dimensionReaders[i] = new SequentialDocValuesIterator(starTreeValues.getDimensionValuesIterator(dimension));
}
List<SequentialDocValuesIterator> metricReaders = new ArrayList<>();
// get doc id set iterators for metrics
Expand All @@ -164,7 +164,7 @@ Iterator<StarTreeDocument> mergeStarTrees(List<StarTreeValues> starTreeValuesSub
metric.getField(),
metricStat.getTypeName()
);
metricReaders.add(new SequentialDocValuesIterator(starTreeValues.getMetricDocIdSetIterator(metricFullName)));
metricReaders.add(new SequentialDocValuesIterator(starTreeValues.getMetricValuesIterator(metricFullName)));
}
}
int currentDocId = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ StarTreeDocument[] getSegmentsStarTreeDocuments(List<StarTreeValues> starTreeVal

for (int i = 0; i < dimensionsSplitOrder.size(); i++) {
String dimension = dimensionsSplitOrder.get(i).getField();
dimensionReaders[i] = new SequentialDocValuesIterator(starTreeValues.getDimensionDocIdSetIterator(dimension));
dimensionReaders[i] = new SequentialDocValuesIterator(starTreeValues.getDimensionValuesIterator(dimension));
}

List<SequentialDocValuesIterator> metricReaders = new ArrayList<>();
Expand All @@ -150,7 +150,7 @@ StarTreeDocument[] getSegmentsStarTreeDocuments(List<StarTreeValues> starTreeVal
metric.getField(),
metricStat.getTypeName()
);
metricReaders.add(new SequentialDocValuesIterator(starTreeValues.getMetricDocIdSetIterator(metricFullName)));
metricReaders.add(new SequentialDocValuesIterator(starTreeValues.getMetricValuesIterator(metricFullName)));

}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@

import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -55,11 +54,9 @@
private RandomAccessInput starTreeDocsFileRandomInput;
private IndexOutput starTreeDocsFileOutput;
private final Map<String, Integer> fileToEndDocIdMap;
private final List<Integer> starTreeDocumentOffsets = new ArrayList<>();
private int currentFileStartDocId;
private int numReadableStarTreeDocuments;
private int starTreeFileCount = -1;
private int currBytes = 0;
private final int fileCountMergeThreshold;
private int numStarTreeDocs = 0;

Expand Down Expand Up @@ -98,15 +95,26 @@
public void writeStarTreeDocument(StarTreeDocument starTreeDocument, boolean isAggregatedDoc) throws IOException {
assert isAggregatedDoc == true;
int numBytes = writeStarTreeDocument(starTreeDocument, starTreeDocsFileOutput, true);
addStarTreeDocumentOffset(numBytes);
if (docSizeInBytes == -1) {
docSizeInBytes = numBytes;

Check warning on line 99 in server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/builder/StarTreeDocsFileManager.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/builder/StarTreeDocsFileManager.java#L99

Added line #L99 was not covered by tests
} else {
assert docSizeInBytes == numBytes;
}
numStarTreeDocs++;
}

@Override
public StarTreeDocument readStarTreeDocument(int docId, boolean isAggregatedDoc) throws IOException {
assert isAggregatedDoc == true;
ensureDocumentReadable(docId);
return readStarTreeDocument(starTreeDocsFileRandomInput, starTreeDocumentOffsets.get(docId), true);
return readStarTreeDocument(starTreeDocsFileRandomInput, getOffset(docId), true);
}

/**
* Returns offset for the docId based on the current file start id
*/
private long getOffset(int docId) {
return (long) (docId - currentFileStartDocId) * docSizeInBytes;
}

@Override
Expand All @@ -119,19 +127,10 @@
public Long[] readDimensions(int docId) throws IOException {
ensureDocumentReadable(docId);
Long[] dims = new Long[starTreeField.getDimensionsOrder().size()];
readDimensions(dims, starTreeDocsFileRandomInput, starTreeDocumentOffsets.get(docId));
readDimensions(dims, starTreeDocsFileRandomInput, getOffset(docId));
return dims;
}

private void addStarTreeDocumentOffset(int bytes) {
starTreeDocumentOffsets.add(currBytes);
currBytes += bytes;
if (docSizeInBytes == -1) {
docSizeInBytes = bytes;
}
assert docSizeInBytes == bytes;
}

/**
* Load the correct StarTreeDocuments file based on the docId
*/
Expand Down Expand Up @@ -199,7 +198,6 @@
* If the operation is only for reading existing documents, a new file is not created.
*/
private void closeAndMaybeCreateNewFile(boolean shouldCreateFileForAppend, int numStarTreeDocs) throws IOException {
currBytes = 0;
if (starTreeDocsFileOutput != null) {
fileToEndDocIdMap.put(starTreeDocsFileOutput.getName(), numStarTreeDocs);
IOUtils.close(starTreeDocsFileOutput);
Expand Down Expand Up @@ -232,7 +230,6 @@
deleteOldFiles();
fileToEndDocIdMap.clear();
fileToEndDocIdMap.put(mergedOutput.getName(), numStarTreeDocs);
resetStarTreeDocumentOffsets();
}
}

Expand All @@ -259,17 +256,6 @@
}
}

/**
* Reset the star tree document offsets based on the merged file
*/
private void resetStarTreeDocumentOffsets() {
int curr = 0;
for (int i = 0; i < starTreeDocumentOffsets.size(); i++) {
starTreeDocumentOffsets.set(i, curr);
curr += docSizeInBytes;
}
}

@Override
public void close() {
try {
Expand All @@ -288,7 +274,6 @@
tmpDirectory.deleteFile(file);
} catch (IOException ignored) {} // similar to IOUtils.deleteFilesIgnoringExceptions
}
starTreeDocumentOffsets.clear();
fileToEndDocIdMap.clear();
}
}
Loading
Loading