Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add levels to DocValues skipper index #13563

Merged
merged 12 commits into from
Jul 19, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,13 @@
import static org.apache.lucene.codecs.lucene90.Lucene90DocValuesFormat.DIRECT_MONOTONIC_BLOCK_SHIFT;
import static org.apache.lucene.codecs.lucene90.Lucene90DocValuesFormat.NUMERIC_BLOCK_SHIFT;
import static org.apache.lucene.codecs.lucene90.Lucene90DocValuesFormat.NUMERIC_BLOCK_SIZE;
import static org.apache.lucene.codecs.lucene90.Lucene90DocValuesFormat.SKIP_INDEX_MAX_LEVEL;
import static org.apache.lucene.codecs.lucene90.Lucene90DocValuesFormat.SKIP_INDEX_NUMBER_INTERVALS_PER_LEVEL;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import org.apache.lucene.codecs.CodecUtil;
import org.apache.lucene.codecs.DocValuesConsumer;
import org.apache.lucene.codecs.DocValuesProducer;
Expand All @@ -43,7 +47,6 @@
import org.apache.lucene.store.ByteArrayDataOutput;
import org.apache.lucene.store.ByteBuffersDataOutput;
import org.apache.lucene.store.ByteBuffersIndexOutput;
import org.apache.lucene.store.DataOutput;
import org.apache.lucene.store.IndexOutput;
import org.apache.lucene.util.ArrayUtil;
import org.apache.lucene.util.BytesRef;
Expand Down Expand Up @@ -207,65 +210,120 @@ void accumulate(long value) {
maxValue = Math.max(maxValue, value);
}

void accumulate(SkipAccumulator other) {
maxDocID = other.maxDocID;
minValue = Math.min(minValue, other.minValue);
maxValue = Math.max(maxValue, other.maxValue);
docCount += other.docCount;
}

void nextDoc(int docID) {
maxDocID = docID;
++docCount;
}

void writeTo(DataOutput output) throws IOException {
output.writeInt(maxDocID);
output.writeInt(minDocID);
output.writeLong(maxValue);
output.writeLong(minValue);
output.writeInt(docCount);
public static SkipAccumulator merge(List<SkipAccumulator> list, int index, int length) {
SkipAccumulator acc = new SkipAccumulator(list.get(index).minDocID);
for (int i = 0; i < length; i++) {
acc.accumulate(list.get(index + i));
}
return acc;
}
}

private void writeSkipIndex(FieldInfo field, DocValuesProducer valuesProducer)
throws IOException {
assert field.hasDocValuesSkipIndex();
// TODO: This disk compression once we introduce levels
long start = data.getFilePointer();
SortedNumericDocValues values = valuesProducer.getSortedNumeric(field);
final long start = data.getFilePointer();
final SortedNumericDocValues values = valuesProducer.getSortedNumeric(field);
long globalMaxValue = Long.MIN_VALUE;
long globalMinValue = Long.MAX_VALUE;
int globalDocCount = 0;
int maxDocId = -1;
List<SkipAccumulator> accumulators = new ArrayList<>();
SkipAccumulator accumulator = null;
int counter = 0;
for (int doc = values.nextDoc(); doc != DocIdSetIterator.NO_MORE_DOCS; doc = values.nextDoc()) {
if (counter == 0) {
if (accumulator == null) {
accumulator = new SkipAccumulator(doc);
accumulators.add(accumulator);
}
accumulator.nextDoc(doc);
for (int i = 0, end = values.docValueCount(); i < end; ++i) {
accumulator.accumulate(values.nextValue());
}
if (++counter == skipIndexIntervalSize) {
if (accumulator.docCount == skipIndexIntervalSize) {
globalMaxValue = Math.max(globalMaxValue, accumulator.maxValue);
globalMinValue = Math.min(globalMinValue, accumulator.minValue);
globalDocCount += accumulator.docCount;
maxDocId = accumulator.maxDocID;
accumulator.writeTo(data);
counter = 0;
accumulator = null;
if (accumulators.size()
== SKIP_INDEX_NUMBER_INTERVALS_PER_LEVEL[SKIP_INDEX_MAX_LEVEL - 1]) {
writeLevels(accumulators);
accumulators.clear();
}
}
}

if (counter > 0) {
globalMaxValue = Math.max(globalMaxValue, accumulator.maxValue);
globalMinValue = Math.min(globalMinValue, accumulator.minValue);
globalDocCount += accumulator.docCount;
maxDocId = accumulator.maxDocID;
accumulator.writeTo(data);
if (accumulators.isEmpty() == false) {
if (accumulator != null) {
globalMaxValue = Math.max(globalMaxValue, accumulator.maxValue);
globalMinValue = Math.min(globalMinValue, accumulator.minValue);
globalDocCount += accumulator.docCount;
maxDocId = accumulator.maxDocID;
}
writeLevels(accumulators);
}
meta.writeLong(start); // record the start in meta
meta.writeLong(data.getFilePointer() - start); // record the length
assert globalDocCount == 0 || globalMaxValue >= globalMinValue;
meta.writeLong(globalMaxValue);
meta.writeLong(globalMinValue);
assert globalDocCount <= maxDocId + 1;
meta.writeInt(globalDocCount);
meta.writeInt(maxDocId);
}

private void writeLevels(List<SkipAccumulator> accumulators) throws IOException {
for (int index = 0; index < accumulators.size(); index++) {
// compute how many levels we need to write for the current accumulator
final int levels = getLevels(index, accumulators.size());
// build the levels
final SkipAccumulator[] accLevels = new SkipAccumulator[levels];
iverase marked this conversation as resolved.
Show resolved Hide resolved
for (int level = 0; level < levels; level++) {
accLevels[level] =
SkipAccumulator.merge(
accumulators, index, SKIP_INDEX_NUMBER_INTERVALS_PER_LEVEL[level]);
iverase marked this conversation as resolved.
Show resolved Hide resolved
}
// write the number of levels
data.writeByte((byte) levels);
// write the maxDocIDs in reverse order. This is done so we don't
// need to read all of them in case of slipping
for (int level = levels - 1; level >= 0; level--) {
data.writeInt(accLevels[level].maxDocID);
}
// write the rest of the interval in natural order. This is only
// read if the interval is competitive.
for (int level = 0; level < levels; level++) {
data.writeInt(accLevels[level].minDocID);
data.writeLong(accLevels[level].maxValue);
data.writeLong(accLevels[level].minValue);
data.writeInt(accLevels[level].docCount);
}
}
}

private int getLevels(int index, int size) {
final int left = size - index;
for (int level = SKIP_INDEX_MAX_LEVEL - 1; level > 0; level--) {
final int numberIntervals = SKIP_INDEX_NUMBER_INTERVALS_PER_LEVEL[level];
if (left >= numberIntervals && index % numberIntervals == 0) {
return level + 1;
}
}
return 1;
iverase marked this conversation as resolved.
Show resolved Hide resolved
}

private long[] writeValues(FieldInfo field, DocValuesProducer valuesProducer, boolean ords)
throws IOException {
SortedNumericDocValues values = valuesProducer.getSortedNumeric(field);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -194,5 +194,34 @@ public DocValuesProducer fieldsProducer(SegmentReadState state) throws IOExcepti
static final int TERMS_DICT_REVERSE_INDEX_SIZE = 1 << TERMS_DICT_REVERSE_INDEX_SHIFT;
static final int TERMS_DICT_REVERSE_INDEX_MASK = TERMS_DICT_REVERSE_INDEX_SIZE - 1;

// number of documents in an interval
private static final int DEFAULT_SKIP_INDEX_INTERVAL_SIZE = 4096;
// number of intervals represented as a shift to create a new level, this is 1 << 3 == 8
// intervals.
static final int SKIP_INDEX_LEVEL_SHIFT = 3;
// max number of levels
// Increasing this number, it increases how much heap we need at index time.
// we currently need (1 * 8 * 8 * 8) = 512 accumulators on heap
static final int SKIP_INDEX_MAX_LEVEL = 4;
// how many intervals at level 0 are in each level (1 << (SKIP_INDEX_LEVEL_SHIFT * level)).
static int[] SKIP_INDEX_NUMBER_INTERVALS_PER_LEVEL = new int[SKIP_INDEX_MAX_LEVEL];
// number of bytes to skip when skipping a level. It does not take into account the
// current interval that is being read.
static long[] SKIP_INDEX_JUMP_LENGTH_PER_LEVEL = new long[SKIP_INDEX_MAX_LEVEL];

static {
for (int level = 0; level < SKIP_INDEX_MAX_LEVEL; level++) {
SKIP_INDEX_NUMBER_INTERVALS_PER_LEVEL[level] = 1 << (SKIP_INDEX_LEVEL_SHIFT * level);
iverase marked this conversation as resolved.
Show resolved Hide resolved
if (level > 0) {
SKIP_INDEX_JUMP_LENGTH_PER_LEVEL[level] =
// jump from previous level
SKIP_INDEX_JUMP_LENGTH_PER_LEVEL[level - 1]
// add nodes added by new level minus first one
+ (SKIP_INDEX_NUMBER_INTERVALS_PER_LEVEL[level] - 1) * 29L
iverase marked this conversation as resolved.
Show resolved Hide resolved
// remove the byte levels added in the previous level except the first one
- SKIP_INDEX_NUMBER_INTERVALS_PER_LEVEL[level - 1]
+ 1;
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
*/
package org.apache.lucene.codecs.lucene90;

import static org.apache.lucene.codecs.lucene90.Lucene90DocValuesFormat.SKIP_INDEX_JUMP_LENGTH_PER_LEVEL;
import static org.apache.lucene.codecs.lucene90.Lucene90DocValuesFormat.SKIP_INDEX_MAX_LEVEL;
import static org.apache.lucene.codecs.lucene90.Lucene90DocValuesFormat.TERMS_DICT_BLOCK_LZ4_SHIFT;

import java.io.IOException;
Expand Down Expand Up @@ -1792,61 +1794,88 @@ public DocValuesSkipper getSkipper(FieldInfo field) throws IOException {
if (input.length() > 0) {
input.prefetch(0, 1);
}
// TODO: should we write to disk the actual max level for this segment?
iverase marked this conversation as resolved.
Show resolved Hide resolved
return new DocValuesSkipper() {
int minDocID = -1;
int maxDocID = -1;
long minValue, maxValue;
int docCount;
final int[] minDocID = new int[SKIP_INDEX_MAX_LEVEL];
final int[] maxDocID = new int[SKIP_INDEX_MAX_LEVEL];

{
for (int i = 0; i < SKIP_INDEX_MAX_LEVEL; i++) {
minDocID[i] = maxDocID[i] = -1;
}
}

final long[] minValue = new long[SKIP_INDEX_MAX_LEVEL];
final long[] maxValue = new long[SKIP_INDEX_MAX_LEVEL];
final int[] docCount = new int[SKIP_INDEX_MAX_LEVEL];
int levels;

@Override
public void advance(int target) throws IOException {
if (target > entry.maxDocId) {
minDocID = DocIdSetIterator.NO_MORE_DOCS;
maxDocID = DocIdSetIterator.NO_MORE_DOCS;
// skipper is exhausted
for (int i = 0; i < SKIP_INDEX_MAX_LEVEL; i++) {
minDocID[i] = maxDocID[i] = DocIdSetIterator.NO_MORE_DOCS;
}
} else {
// find next interval
assert target > maxDocID[0] : "target must be bigger that current interval";
while (true) {
maxDocID = input.readInt();
if (maxDocID >= target) {
minDocID = input.readInt();
maxValue = input.readLong();
minValue = input.readLong();
docCount = input.readInt();
levels = input.readByte();
iverase marked this conversation as resolved.
Show resolved Hide resolved
assert levels <= SKIP_INDEX_MAX_LEVEL && levels > 0 : "level out of range";
boolean competitive = true;
// check if current interval is competitive or we can jump to the next position
iverase marked this conversation as resolved.
Show resolved Hide resolved
for (int level = levels - 1; level >= 0; level--) {
if ((maxDocID[level] = input.readInt()) < target) {
input.skipBytes(
(level * 4L) // the number of maxDocID left
+ (levels * 24L) // the content of this interval
+ SKIP_INDEX_JUMP_LENGTH_PER_LEVEL[level]); // the jump for the level
competitive = false;
break;
}
}
if (competitive) {
for (int level = 0; level < levels; level++) {
minDocID[level] = input.readInt();
maxValue[level] = input.readLong();
minValue[level] = input.readLong();
docCount[level] = input.readInt();
}
iverase marked this conversation as resolved.
Show resolved Hide resolved
break;
} else {
input.skipBytes(24);
}
}
}
}

@Override
public int numLevels() {
return 1;
return levels;
}

@Override
public int minDocID(int level) {
return minDocID;
return minDocID[level];
}

@Override
public int maxDocID(int level) {
return maxDocID;
return maxDocID[level];
}

@Override
public long minValue(int level) {
return minValue;
return minValue[level];
}

@Override
public long maxValue(int level) {
return maxValue;
return maxValue[level];
}

@Override
public int docCount(int level) {
return docCount;
return docCount[level];
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1194,24 +1194,27 @@ public int numLevels() {
@Override
public int minDocID(int level) {
assertThread("Doc values skipper", creationThread);
Objects.checkIndex(level, numLevels());
int minDocID = in.minDocID(level);
assert minDocID <= in.maxDocID(level);
if (level > 0) {
assert minDocID <= in.minDocID(level - 1);
if (minDocID != -1 && minDocID != DocIdSetIterator.NO_MORE_DOCS) {
Objects.checkIndex(level, numLevels());
}
for (int i = 0; i < level; i++) {
assert minDocID >= in.minDocID(i);
iverase marked this conversation as resolved.
Show resolved Hide resolved
}
return minDocID;
}

@Override
public int maxDocID(int level) {
assertThread("Doc values skipper", creationThread);
Objects.checkIndex(level, numLevels());
int maxDocID = in.maxDocID(level);

assert maxDocID >= in.minDocID(level);
if (level > 0) {
assert maxDocID >= in.maxDocID(level - 1);
if (maxDocID != -1 && maxDocID != DocIdSetIterator.NO_MORE_DOCS) {
Objects.checkIndex(level, numLevels());
}
for (int i = 0; i < level; i++) {
assert maxDocID >= in.maxDocID(i);
}
return maxDocID;
}
Expand Down
Loading