Skip to content

Commit

Permalink
Add target search concurrency to TieredMergePolicy (#13430)
Browse files Browse the repository at this point in the history
  • Loading branch information
carlosdelest authored Jul 17, 2024
1 parent 99488b2 commit 22ca695
Show file tree
Hide file tree
Showing 4 changed files with 110 additions and 21 deletions.
6 changes: 5 additions & 1 deletion lucene/CHANGES.txt
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,11 @@ API Changes

New Features
---------------------
(No changes)

* GITHUB#13430: Allow configuring the search concurrency via
TieredMergePolicy#setTargetSearchConcurrency. This in-turn instructs the
merge policy to try to have at least this number of segments on the highest
tier. (Adrien Grand, Carlos Delgado)

Improvements
---------------------
Expand Down
80 changes: 65 additions & 15 deletions lucene/core/src/java/org/apache/lucene/index/TieredMergePolicy.java
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ public class TieredMergePolicy extends MergePolicy {
private double segsPerTier = 10.0;
private double forceMergeDeletesPctAllowed = 10.0;
private double deletesPctAllowed = 20.0;
private int targetSearchConcurrency = 1;

/** Sole constructor, setting all settings to their defaults. */
public TieredMergePolicy() {
Expand Down Expand Up @@ -257,6 +258,26 @@ public double getSegmentsPerTier() {
return segsPerTier;
}

/**
* Sets the target search concurrency. This prevents creating segments that are bigger than
* maxDoc/targetSearchConcurrency, which in turn makes the work parallelizable into
* targetSearchConcurrency slices of similar doc counts. It also makes merging less aggressive, as
* higher values result in indices that do less merging and have more segments
*/
public TieredMergePolicy setTargetSearchConcurrency(int targetSearchConcurrency) {
if (targetSearchConcurrency < 1) {
throw new IllegalArgumentException(
"targetSearchConcurrency must be >= 1 (got " + targetSearchConcurrency + ")");
}
this.targetSearchConcurrency = targetSearchConcurrency;
return this;
}

/** Returns the target search concurrency. */
public int getTargetSearchConcurrency() {
return targetSearchConcurrency;
}

private static class SegmentSizeAndDocs {
private final SegmentCommitInfo segInfo;
/// Size of the segment in bytes, pro-rated by the number of live documents.
Expand Down Expand Up @@ -371,31 +392,40 @@ public MergeSpecification findMerges(
// If we have too-large segments, grace them out of the maximum segment count
// If we're above certain thresholds of deleted docs, we can merge very large segments.
int tooBigCount = 0;
// We relax merging for the bigger segments for concurrency reasons, as we want to have several
// segments on the highest tier without over-merging on the lower tiers.
int concurrencyCount = 0;
iter = sortedInfos.iterator();

double allowedSegCount = 0;

// remove large segments from consideration under two conditions.
// 1> Overall percent deleted docs relatively small and this segment is larger than 50%
// maxSegSize
// 2> overall percent deleted docs large and this segment is large and has few deleted docs

while (iter.hasNext()) {
SegmentSizeAndDocs segSizeDocs = iter.next();
double segDelPct = 100 * (double) segSizeDocs.delCount / (double) segSizeDocs.maxDoc;
if (segSizeDocs.sizeInBytes > maxMergedSegmentBytes / 2
&& (totalDelPct <= deletesPctAllowed || segDelPct <= deletesPctAllowed)) {
iter.remove();
tooBigCount++; // Just for reporting purposes.
tooBigCount++;
totIndexBytes -= segSizeDocs.sizeInBytes;
allowedDelCount -= segSizeDocs.delCount;
} else if (concurrencyCount + tooBigCount < targetSearchConcurrency - 1) {
// Make sure we count a whole segment for the first targetSearchConcurrency-1 segments to
// avoid over merging on the lower levels.
concurrencyCount++;
allowedSegCount++;
totIndexBytes -= segSizeDocs.sizeInBytes;
}
}
allowedDelCount = Math.max(0, allowedDelCount);

final int mergeFactor = (int) Math.min(maxMergeAtOnce, segsPerTier);
// Compute max allowed segments in the index
// Compute max allowed segments for the remainder of the index
long levelSize = Math.max(minSegmentBytes, floorSegmentBytes);
long bytesLeft = totIndexBytes;
double allowedSegCount = 0;
while (true) {
final double segCountLevel = bytesLeft / (double) levelSize;
if (segCountLevel < segsPerTier || levelSize == maxMergedSegmentBytes) {
Expand All @@ -408,7 +438,8 @@ public MergeSpecification findMerges(
}
// allowedSegCount may occasionally be less than segsPerTier
// if segment sizes are below the floor size
allowedSegCount = Math.max(allowedSegCount, segsPerTier);
allowedSegCount = Math.max(allowedSegCount, Math.max(segsPerTier, targetSearchConcurrency));
int allowedDocCount = getMaxAllowedDocs(totalMaxDoc, totalDelDocs);

if (verbose(mergeContext) && tooBigCount > 0) {
message(
Expand All @@ -419,7 +450,11 @@ public MergeSpecification findMerges(
+ " (eligible count="
+ sortedInfos.size()
+ ") tooBigCount= "
+ tooBigCount,
+ tooBigCount
+ " allowedDocCount="
+ allowedDocCount
+ " vs doc count="
+ infos.totalMaxDoc(),
mergeContext);
}
return doFindMerges(
Expand All @@ -428,6 +463,7 @@ public MergeSpecification findMerges(
mergeFactor,
(int) allowedSegCount,
allowedDelCount,
allowedDocCount,
MERGE_TYPE.NATURAL,
mergeContext,
mergingBytes >= maxMergedSegmentBytes);
Expand All @@ -439,6 +475,7 @@ private MergeSpecification doFindMerges(
final int mergeFactor,
final int allowedSegCount,
final int allowedDelCount,
final int allowedDocCount,
final MERGE_TYPE mergeType,
MergeContext mergeContext,
boolean maxMergeIsRunning)
Expand Down Expand Up @@ -522,16 +559,23 @@ private MergeSpecification doFindMerges(
final List<SegmentCommitInfo> candidate = new ArrayList<>();
boolean hitTooLarge = false;
long bytesThisMerge = 0;
long docCountThisMerge = 0;
for (int idx = startIdx;
idx < sortedEligible.size()
&& candidate.size() < mergeFactor
&& bytesThisMerge < maxMergedSegmentBytes;
&& bytesThisMerge < maxMergedSegmentBytes
&& (bytesThisMerge < floorSegmentBytes || docCountThisMerge <= allowedDocCount);
idx++) {
final SegmentSizeAndDocs segSizeDocs = sortedEligible.get(idx);
final long segBytes = segSizeDocs.sizeInBytes;

if (totAfterMergeBytes + segBytes > maxMergedSegmentBytes) {
hitTooLarge = true;
int segDocCount = segSizeDocs.maxDoc - segSizeDocs.delCount;
if (totAfterMergeBytes + segBytes > maxMergedSegmentBytes
|| (totAfterMergeBytes > floorSegmentBytes
&& docCountThisMerge + segDocCount > allowedDocCount)) {
// Only set hitTooLarge when reaching the maximum byte size, as this will create
// segments of the maximum size which will no longer be eligible for merging for a long
// time (until they accumulate enough deletes).
hitTooLarge |= totAfterMergeBytes + segBytes > maxMergedSegmentBytes;
if (candidate.size() == 0) {
// We should never have something coming in that _cannot_ be merged, so handle
// singleton merges
Expand All @@ -548,6 +592,7 @@ private MergeSpecification doFindMerges(
}
candidate.add(segSizeDocs.segInfo);
bytesThisMerge += segBytes;
docCountThisMerge += segDocCount;
totAfterMergeBytes += segBytes;
}

Expand Down Expand Up @@ -916,14 +961,13 @@ public MergeSpecification findForcedDeletesMerges(SegmentInfos infos, MergeConte
final Set<SegmentCommitInfo> merging = mergeContext.getMergingSegments();

boolean haveWork = false;
int totalDelCount = 0;
for (SegmentCommitInfo info : infos) {
int delCount = mergeContext.numDeletesToMerge(info);
assert assertDelCount(delCount, info);
totalDelCount += delCount;
double pctDeletes = 100. * ((double) delCount) / info.info.maxDoc();
if (pctDeletes > forceMergeDeletesPctAllowed && !merging.contains(info)) {
haveWork = true;
break;
}
haveWork = haveWork || (pctDeletes > forceMergeDeletesPctAllowed && !merging.contains(info));
}

if (haveWork == false) {
Expand All @@ -950,11 +994,16 @@ public MergeSpecification findForcedDeletesMerges(SegmentInfos infos, MergeConte
Integer.MAX_VALUE,
Integer.MAX_VALUE,
0,
getMaxAllowedDocs(infos.totalMaxDoc(), totalDelCount),
MERGE_TYPE.FORCE_MERGE_DELETES,
mergeContext,
false);
}

int getMaxAllowedDocs(int totalMaxDoc, int totalDelDocs) {
return Math.ceilDiv(totalMaxDoc - totalDelDocs, targetSearchConcurrency);
}

private long floorSize(long bytes) {
return Math.max(floorSegmentBytes, bytes);
}
Expand All @@ -969,7 +1018,8 @@ public String toString() {
sb.append("segmentsPerTier=").append(segsPerTier).append(", ");
sb.append("maxCFSSegmentSizeMB=").append(getMaxCFSSegmentSizeMB()).append(", ");
sb.append("noCFSRatio=").append(noCFSRatio).append(", ");
sb.append("deletesPctAllowed=").append(deletesPctAllowed);
sb.append("deletesPctAllowed=").append(deletesPctAllowed).append(", ");
sb.append("targetSearchConcurrency=").append(targetSearchConcurrency);
return sb.toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ protected void assertSegmentInfos(MergePolicy policy, SegmentInfos infos) throws
segmentSizes.add(weightedByteSize);
minSegmentBytes = Math.min(minSegmentBytes, weightedByteSize);
}
Collections.sort(segmentSizes);

final double delPercentage = 100.0 * totalDelCount / totalMaxDoc;
assertTrue(
Expand All @@ -77,12 +78,26 @@ protected void assertSegmentInfos(MergePolicy policy, SegmentInfos infos) throws
long levelSizeBytes = Math.max(minSegmentBytes, (long) (tmp.getFloorSegmentMB() * 1024 * 1024));
long bytesLeft = totalBytes;
double allowedSegCount = 0;
List<Long> biggestSegments = segmentSizes;
if (biggestSegments.size() > tmp.getTargetSearchConcurrency() - 1) {
biggestSegments =
biggestSegments.subList(
biggestSegments.size() - tmp.getTargetSearchConcurrency() + 1,
biggestSegments.size());
}
// Allow whole segments for the targetSearchConcurrency-1 biggest segments
for (long size : biggestSegments) {
bytesLeft -= size;
allowedSegCount++;
}

// below we make the assumption that segments that reached the max segment
// size divided by 2 don't need merging anymore
int mergeFactor = (int) Math.min(tmp.getSegmentsPerTier(), tmp.getMaxMergeAtOnce());
while (true) {
final double segCountLevel = bytesLeft / (double) levelSizeBytes;
if (segCountLevel < tmp.getSegmentsPerTier() || levelSizeBytes >= maxMergedSegmentBytes / 2) {
if (segCountLevel <= tmp.getSegmentsPerTier()
|| levelSizeBytes >= maxMergedSegmentBytes / 2) {
allowedSegCount += Math.ceil(segCountLevel);
break;
}
Expand All @@ -94,7 +109,6 @@ protected void assertSegmentInfos(MergePolicy policy, SegmentInfos infos) throws

// It's ok to be over the allowed segment count if none of the most balanced merges are balanced
// enough
Collections.sort(segmentSizes);
boolean hasBalancedMerges = false;
for (int i = 0; i < segmentSizes.size() - mergeFactor; ++i) {
long maxMergeSegmentSize = segmentSizes.get(i + mergeFactor - 1);
Expand All @@ -111,11 +125,24 @@ protected void assertSegmentInfos(MergePolicy policy, SegmentInfos infos) throws
}
}

// There can be more segments if we can't merge docs because they are balanced between segments.
// At least the
// 2 smallest segments should be mergeable.
// should be 2 segments to merge
int maxDocsPerSegment = tmp.getMaxAllowedDocs(infos.totalMaxDoc(), totalDelCount);
List<Integer> segmentDocs =
infos.asList().stream()
.map(info -> info.info.maxDoc() - info.getDelCount())
.sorted()
.toList();
boolean eligibleDocsMerge =
segmentDocs.size() >= 2 && segmentDocs.get(0) + segmentDocs.get(1) < maxDocsPerSegment;

int numSegments = infos.asList().size();
assertTrue(
String.format(
Locale.ROOT,
"mergeFactor=%d minSegmentBytes=%,d maxMergedSegmentBytes=%,d segmentsPerTier=%g maxMergeAtOnce=%d numSegments=%d allowed=%g totalBytes=%,d delPercentage=%g deletesPctAllowed=%g",
"mergeFactor=%d minSegmentBytes=%,d maxMergedSegmentBytes=%,d segmentsPerTier=%g maxMergeAtOnce=%d numSegments=%d allowed=%g totalBytes=%,d delPercentage=%g deletesPctAllowed=%g targetNumSegments=%d",
mergeFactor,
minSegmentBytes,
maxMergedSegmentBytes,
Expand All @@ -125,8 +152,9 @@ protected void assertSegmentInfos(MergePolicy policy, SegmentInfos infos) throws
allowedSegCount,
totalBytes,
delPercentage,
tmp.getDeletesPctAllowed()),
numSegments <= allowedSegCount || hasBalancedMerges == false);
tmp.getDeletesPctAllowed(),
tmp.getTargetSearchConcurrency()),
numSegments <= allowedSegCount || hasBalancedMerges == false || eligibleDocsMerge == false);
}

@Override
Expand Down Expand Up @@ -208,6 +236,7 @@ public void testPartialMerge() throws Exception {

int segmentCount = w.getSegmentCount();
int targetCount = TestUtil.nextInt(random(), 1, segmentCount);

if (VERBOSE) {
System.out.println(
"TEST: merge to " + targetCount + " segs (current count=" + segmentCount + ")");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1095,6 +1095,12 @@ public static TieredMergePolicy newTieredMergePolicy(Random r) {
} else {
tmp.setSegmentsPerTier(TestUtil.nextInt(r, 10, 50));
}
if (rarely(r)) {
tmp.setTargetSearchConcurrency(TestUtil.nextInt(r, 10, 50));
} else {
tmp.setTargetSearchConcurrency(TestUtil.nextInt(r, 2, 20));
}

configureRandom(r, tmp);
tmp.setDeletesPctAllowed(20 + random().nextDouble() * 30);
return tmp;
Expand Down

0 comments on commit 22ca695

Please sign in to comment.