Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add target search concurrency to TieredMergePolicy #13430

Merged
Show file tree
Hide file tree
Changes from 28 commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
ab0a13a
Add targetSearchConcurrency
carlosdelest May 27, 2024
ecf2035
Some renaming and fixes
carlosdelest May 27, 2024
6bf94d2
Some renaming and fixes
carlosdelest May 27, 2024
36ad550
minNumSegments can be overriden by force merge
carlosdelest May 27, 2024
585c927
Add minNumSegments to tests
carlosdelest May 27, 2024
c922f47
Merge remote-tracking branch 'origin/main'
carlosdelest May 27, 2024
f74616c
Add forceMerge semantics
carlosdelest May 28, 2024
31e132f
WIP - test
carlosdelest May 28, 2024
2d6173d
Take forced merges into account
carlosdelest May 28, 2024
22436c0
Minor fixes, adding tests
carlosdelest May 28, 2024
7c4397e
Rename minNumSegments -> targetSearchConcurrency
carlosdelest May 29, 2024
289d07d
Implement target search concurrency as number of segments for the las…
carlosdelest May 29, 2024
d4d89f1
PR feedback
carlosdelest May 30, 2024
bfdb58b
Removed some changes for not making doc merges so aggressive compared…
carlosdelest May 31, 2024
f240644
Focus testing on append only merges for target search concurrency
carlosdelest May 31, 2024
500fe32
Removing some useless changes from main
carlosdelest May 31, 2024
1b91298
Better handling of num docs overflow
carlosdelest Jun 4, 2024
299ac18
Simplify testing
carlosdelest Jun 4, 2024
e058a9f
Refactor singleton merges
carlosdelest Jun 4, 2024
9ab6fcd
Merge branch 'main' into carlosdelest/tiered-merge-policy-min-num-seg…
jpountz Jun 24, 2024
d44de4c
Tune merging logic for targetSearchConcurrency.
jpountz Jun 24, 2024
66d62e9
Add CHANGES entry.
jpountz Jun 24, 2024
ca4c19c
Add proper credit to Adrien on this :)
carlosdelest Jul 2, 2024
2c18e5f
Small refactoring for getMaxAllowedDocs
carlosdelest Jul 2, 2024
1b9b60b
Add target search concurrency checks in testSimulateUpdates()
carlosdelest Jul 2, 2024
3db835a
Minor improvement to corner case.
jpountz Jul 16, 2024
2d80315
Add target search concurrency to all tests randomly
carlosdelest Jul 16, 2024
8de0212
Remove check for target search concurrency, which already takes place…
carlosdelest Jul 16, 2024
898e2e8
Change target search concurrency for rare test occurrences
carlosdelest Jul 17, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion lucene/CHANGES.txt
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,11 @@ API Changes

New Features
---------------------
(No changes)

* GITHUB#13430: Allow configuring the search concurrency via
TieredMergePolicy#setTargetSearchConcurrency. This in-turn instructs the
merge policy to try to have at least this number of segments on the highest
tier. (Adrien Grand, Carlos Delgado)

Improvements
---------------------
Expand Down
80 changes: 65 additions & 15 deletions lucene/core/src/java/org/apache/lucene/index/TieredMergePolicy.java
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ public class TieredMergePolicy extends MergePolicy {
private double segsPerTier = 10.0;
private double forceMergeDeletesPctAllowed = 10.0;
private double deletesPctAllowed = 20.0;
private int targetSearchConcurrency = 1;

/** Sole constructor, setting all settings to their defaults. */
public TieredMergePolicy() {
Expand Down Expand Up @@ -257,6 +258,26 @@ public double getSegmentsPerTier() {
return segsPerTier;
}

/**
* Sets the target search concurrency. This prevents creating segments that are bigger than
* maxDoc/targetSearchConcurrency, which in turn makes the work parallelizable into
* targetSearchConcurrency slices of similar doc counts. It also makes merging less aggressive, as
* higher values result in indices that do less merging and have more segments
*/
public TieredMergePolicy setTargetSearchConcurrency(int targetSearchConcurrency) {
if (targetSearchConcurrency < 1) {
throw new IllegalArgumentException(
"targetSearchConcurrency must be >= 1 (got " + targetSearchConcurrency + ")");
}
this.targetSearchConcurrency = targetSearchConcurrency;
return this;
}

/** Returns the target search concurrency. */
public int getTargetSearchConcurrency() {
return targetSearchConcurrency;
}

private static class SegmentSizeAndDocs {
private final SegmentCommitInfo segInfo;
/// Size of the segment in bytes, pro-rated by the number of live documents.
Expand Down Expand Up @@ -371,31 +392,40 @@ public MergeSpecification findMerges(
// If we have too-large segments, grace them out of the maximum segment count
// If we're above certain thresholds of deleted docs, we can merge very large segments.
int tooBigCount = 0;
// We relax merging for the bigger segments for concurrency reasons, as we want to have several
// segments on the highest tier without over-merging on the lower tiers.
int concurrencyCount = 0;
iter = sortedInfos.iterator();

double allowedSegCount = 0;

// remove large segments from consideration under two conditions.
// 1> Overall percent deleted docs relatively small and this segment is larger than 50%
// maxSegSize
// 2> overall percent deleted docs large and this segment is large and has few deleted docs

while (iter.hasNext()) {
SegmentSizeAndDocs segSizeDocs = iter.next();
double segDelPct = 100 * (double) segSizeDocs.delCount / (double) segSizeDocs.maxDoc;
if (segSizeDocs.sizeInBytes > maxMergedSegmentBytes / 2
&& (totalDelPct <= deletesPctAllowed || segDelPct <= deletesPctAllowed)) {
iter.remove();
tooBigCount++; // Just for reporting purposes.
tooBigCount++;
totIndexBytes -= segSizeDocs.sizeInBytes;
allowedDelCount -= segSizeDocs.delCount;
} else if (concurrencyCount + tooBigCount < targetSearchConcurrency - 1) {
// Make sure we count a whole segment for the first targetSearchConcurrency-1 segments to
// avoid over merging on the lower levels.
concurrencyCount++;
allowedSegCount++;
totIndexBytes -= segSizeDocs.sizeInBytes;
}
}
allowedDelCount = Math.max(0, allowedDelCount);

final int mergeFactor = (int) Math.min(maxMergeAtOnce, segsPerTier);
// Compute max allowed segments in the index
// Compute max allowed segments for the remainder of the index
long levelSize = Math.max(minSegmentBytes, floorSegmentBytes);
long bytesLeft = totIndexBytes;
double allowedSegCount = 0;
while (true) {
final double segCountLevel = bytesLeft / (double) levelSize;
if (segCountLevel < segsPerTier || levelSize == maxMergedSegmentBytes) {
Expand All @@ -408,7 +438,8 @@ public MergeSpecification findMerges(
}
// allowedSegCount may occasionally be less than segsPerTier
// if segment sizes are below the floor size
allowedSegCount = Math.max(allowedSegCount, segsPerTier);
allowedSegCount = Math.max(allowedSegCount, Math.max(segsPerTier, targetSearchConcurrency));
int allowedDocCount = getMaxAllowedDocs(totalMaxDoc, totalDelDocs);

if (verbose(mergeContext) && tooBigCount > 0) {
message(
Expand All @@ -419,7 +450,11 @@ public MergeSpecification findMerges(
+ " (eligible count="
+ sortedInfos.size()
+ ") tooBigCount= "
+ tooBigCount,
+ tooBigCount
+ " allowedDocCount="
+ allowedDocCount
+ " vs doc count="
+ infos.totalMaxDoc(),
mergeContext);
}
return doFindMerges(
Expand All @@ -428,6 +463,7 @@ public MergeSpecification findMerges(
mergeFactor,
(int) allowedSegCount,
allowedDelCount,
allowedDocCount,
MERGE_TYPE.NATURAL,
mergeContext,
mergingBytes >= maxMergedSegmentBytes);
Expand All @@ -439,6 +475,7 @@ private MergeSpecification doFindMerges(
final int mergeFactor,
final int allowedSegCount,
final int allowedDelCount,
final int allowedDocCount,
final MERGE_TYPE mergeType,
MergeContext mergeContext,
boolean maxMergeIsRunning)
Expand Down Expand Up @@ -522,16 +559,23 @@ private MergeSpecification doFindMerges(
final List<SegmentCommitInfo> candidate = new ArrayList<>();
carlosdelest marked this conversation as resolved.
Show resolved Hide resolved
boolean hitTooLarge = false;
long bytesThisMerge = 0;
long docCountThisMerge = 0;
for (int idx = startIdx;
idx < sortedEligible.size()
&& candidate.size() < mergeFactor
&& bytesThisMerge < maxMergedSegmentBytes;
&& bytesThisMerge < maxMergedSegmentBytes
&& (bytesThisMerge < floorSegmentBytes || docCountThisMerge <= allowedDocCount);
idx++) {
final SegmentSizeAndDocs segSizeDocs = sortedEligible.get(idx);
final long segBytes = segSizeDocs.sizeInBytes;

if (totAfterMergeBytes + segBytes > maxMergedSegmentBytes) {
hitTooLarge = true;
int segDocCount = segSizeDocs.maxDoc - segSizeDocs.delCount;
if (totAfterMergeBytes + segBytes > maxMergedSegmentBytes
|| (totAfterMergeBytes > floorSegmentBytes
&& docCountThisMerge + segDocCount > allowedDocCount)) {
// Only set hitTooLarge when reaching the maximum byte size, as this will create
// segments of the maximum size which will no longer be eligible for merging for a long
// time (until they accumulate enough deletes).
hitTooLarge |= totAfterMergeBytes + segBytes > maxMergedSegmentBytes;
if (candidate.size() == 0) {
// We should never have something coming in that _cannot_ be merged, so handle
// singleton merges
Expand All @@ -548,6 +592,7 @@ private MergeSpecification doFindMerges(
}
candidate.add(segSizeDocs.segInfo);
bytesThisMerge += segBytes;
docCountThisMerge += segDocCount;
totAfterMergeBytes += segBytes;
}

Expand Down Expand Up @@ -916,14 +961,13 @@ public MergeSpecification findForcedDeletesMerges(SegmentInfos infos, MergeConte
final Set<SegmentCommitInfo> merging = mergeContext.getMergingSegments();

boolean haveWork = false;
int totalDelCount = 0;
for (SegmentCommitInfo info : infos) {
int delCount = mergeContext.numDeletesToMerge(info);
assert assertDelCount(delCount, info);
totalDelCount += delCount;
double pctDeletes = 100. * ((double) delCount) / info.info.maxDoc();
if (pctDeletes > forceMergeDeletesPctAllowed && !merging.contains(info)) {
haveWork = true;
break;
}
haveWork = haveWork || (pctDeletes > forceMergeDeletesPctAllowed && !merging.contains(info));
carlosdelest marked this conversation as resolved.
Show resolved Hide resolved
}

if (haveWork == false) {
Expand All @@ -950,11 +994,16 @@ public MergeSpecification findForcedDeletesMerges(SegmentInfos infos, MergeConte
Integer.MAX_VALUE,
Integer.MAX_VALUE,
0,
getMaxAllowedDocs(infos.totalMaxDoc(), totalDelCount),
MERGE_TYPE.FORCE_MERGE_DELETES,
mergeContext,
false);
}

int getMaxAllowedDocs(int totalMaxDoc, int totalDelDocs) {
return Math.ceilDiv(totalMaxDoc - totalDelDocs, targetSearchConcurrency);
}

private long floorSize(long bytes) {
return Math.max(floorSegmentBytes, bytes);
}
Expand All @@ -969,7 +1018,8 @@ public String toString() {
sb.append("segmentsPerTier=").append(segsPerTier).append(", ");
sb.append("maxCFSSegmentSizeMB=").append(getMaxCFSSegmentSizeMB()).append(", ");
sb.append("noCFSRatio=").append(noCFSRatio).append(", ");
sb.append("deletesPctAllowed=").append(deletesPctAllowed);
sb.append("deletesPctAllowed=").append(deletesPctAllowed).append(", ");
sb.append("targetSearchConcurrency=").append(targetSearchConcurrency);
return sb.toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ protected void assertSegmentInfos(MergePolicy policy, SegmentInfos infos) throws
segmentSizes.add(weightedByteSize);
minSegmentBytes = Math.min(minSegmentBytes, weightedByteSize);
}
Collections.sort(segmentSizes);

final double delPercentage = 100.0 * totalDelCount / totalMaxDoc;
assertTrue(
Expand All @@ -77,12 +78,26 @@ protected void assertSegmentInfos(MergePolicy policy, SegmentInfos infos) throws
long levelSizeBytes = Math.max(minSegmentBytes, (long) (tmp.getFloorSegmentMB() * 1024 * 1024));
long bytesLeft = totalBytes;
double allowedSegCount = 0;
List<Long> biggestSegments = segmentSizes;
if (biggestSegments.size() > tmp.getTargetSearchConcurrency() - 1) {
biggestSegments =
biggestSegments.subList(
biggestSegments.size() - tmp.getTargetSearchConcurrency() + 1,
biggestSegments.size());
}
// Allow whole segments for the targetSearchConcurrency-1 biggest segments
for (long size : biggestSegments) {
bytesLeft -= size;
allowedSegCount++;
}

// below we make the assumption that segments that reached the max segment
// size divided by 2 don't need merging anymore
int mergeFactor = (int) Math.min(tmp.getSegmentsPerTier(), tmp.getMaxMergeAtOnce());
while (true) {
final double segCountLevel = bytesLeft / (double) levelSizeBytes;
if (segCountLevel < tmp.getSegmentsPerTier() || levelSizeBytes >= maxMergedSegmentBytes / 2) {
if (segCountLevel <= tmp.getSegmentsPerTier()
|| levelSizeBytes >= maxMergedSegmentBytes / 2) {
allowedSegCount += Math.ceil(segCountLevel);
break;
}
Expand All @@ -94,7 +109,6 @@ protected void assertSegmentInfos(MergePolicy policy, SegmentInfos infos) throws

// It's ok to be over the allowed segment count if none of the most balanced merges are balanced
// enough
Collections.sort(segmentSizes);
boolean hasBalancedMerges = false;
for (int i = 0; i < segmentSizes.size() - mergeFactor; ++i) {
long maxMergeSegmentSize = segmentSizes.get(i + mergeFactor - 1);
Expand All @@ -111,11 +125,24 @@ protected void assertSegmentInfos(MergePolicy policy, SegmentInfos infos) throws
}
}

// There can be more segments if we can't merge docs because they are balanced between segments.
// At least the
// 2 smallest segments should be mergeable.
// should be 2 segments to merge
int maxDocsPerSegment = tmp.getMaxAllowedDocs(infos.totalMaxDoc(), totalDelCount);
List<Integer> segmentDocs =
infos.asList().stream()
.map(info -> info.info.maxDoc() - info.getDelCount())
.sorted()
.toList();
boolean eligibleDocsMerge =
segmentDocs.size() >= 2 && segmentDocs.get(0) + segmentDocs.get(1) < maxDocsPerSegment;

int numSegments = infos.asList().size();
assertTrue(
String.format(
Locale.ROOT,
"mergeFactor=%d minSegmentBytes=%,d maxMergedSegmentBytes=%,d segmentsPerTier=%g maxMergeAtOnce=%d numSegments=%d allowed=%g totalBytes=%,d delPercentage=%g deletesPctAllowed=%g",
"mergeFactor=%d minSegmentBytes=%,d maxMergedSegmentBytes=%,d segmentsPerTier=%g maxMergeAtOnce=%d numSegments=%d allowed=%g totalBytes=%,d delPercentage=%g deletesPctAllowed=%g targetNumSegments=%d",
mergeFactor,
minSegmentBytes,
maxMergedSegmentBytes,
Expand All @@ -125,8 +152,9 @@ protected void assertSegmentInfos(MergePolicy policy, SegmentInfos infos) throws
allowedSegCount,
totalBytes,
delPercentage,
tmp.getDeletesPctAllowed()),
numSegments <= allowedSegCount || hasBalancedMerges == false);
tmp.getDeletesPctAllowed(),
tmp.getTargetSearchConcurrency()),
numSegments <= allowedSegCount || hasBalancedMerges == false || eligibleDocsMerge == false);
}

@Override
Expand Down Expand Up @@ -208,6 +236,7 @@ public void testPartialMerge() throws Exception {

int segmentCount = w.getSegmentCount();
int targetCount = TestUtil.nextInt(random(), 1, segmentCount);

if (VERBOSE) {
System.out.println(
"TEST: merge to " + targetCount + " segs (current count=" + segmentCount + ")");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1095,6 +1095,12 @@ public static TieredMergePolicy newTieredMergePolicy(Random r) {
} else {
tmp.setSegmentsPerTier(TestUtil.nextInt(r, 10, 50));
}
if (rarely(r)) {
tmp.setTargetSearchConcurrency(TestUtil.nextInt(r, 2, 20));
} else {
tmp.setTargetSearchConcurrency(TestUtil.nextInt(r, 10, 50));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would have expected the opposite: that we commonly set a reasonable target search concurrency, and rarely set a high value?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see - I was mimicking the segments per tier approach here, but I guess that makes sense. Changing it

}

configureRandom(r, tmp);
tmp.setDeletesPctAllowed(20 + random().nextDouble() * 30);
return tmp;
Expand Down