Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Aggregate files from the same segment into a single Arena #13570

Merged
merged 32 commits into from
Jul 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
2d6af43
RefCountedSharedArena
ChrisHegarty Jul 14, 2024
fa0a962
remove optional
ChrisHegarty Jul 15, 2024
fe65dfe
use AtomicInteger
ChrisHegarty Jul 15, 2024
f68a917
improve test
ChrisHegarty Jul 15, 2024
7d2124f
Merge branch 'main' into refCntArena
ChrisHegarty Jul 15, 2024
d68e26e
private final
ChrisHegarty Jul 15, 2024
6b6d7c1
simplify with decrementAndGet
ChrisHegarty Jul 15, 2024
f39c401
rename to onClose
ChrisHegarty Jul 16, 2024
e65c4cb
add override
ChrisHegarty Jul 16, 2024
9357251
Merge branch 'main' into refCntArena
ChrisHegarty Jul 17, 2024
88d80de
update test
ChrisHegarty Jul 17, 2024
316d3f2
changes
ChrisHegarty Jul 17, 2024
a1d2d80
rewording
ChrisHegarty Jul 17, 2024
4591f7f
Add grouping API function
ChrisHegarty Jul 17, 2024
ef13a1e
javadoc and initialization
ChrisHegarty Jul 18, 2024
29fe87f
Add validation for segment id
ChrisHegarty Jul 18, 2024
10721d3
Merge branch 'main' into refCntArena
ChrisHegarty Jul 18, 2024
ba127f4
use CODEC_FILE_PATTERN
ChrisHegarty Jul 18, 2024
4f6eae7
optional generation
ChrisHegarty Jul 18, 2024
d1731b3
suppress unused
ChrisHegarty Jul 18, 2024
c42fbec
Add limit ref counting
ChrisHegarty Jul 19, 2024
ffe4ec6
lifetime ref count & fix compute in map
ChrisHegarty Jul 20, 2024
9aa7b8e
typo
ChrisHegarty Jul 20, 2024
875bcb1
ref counted review comments
ChrisHegarty Jul 23, 2024
77819db
typo
ChrisHegarty Jul 23, 2024
9b859de
Merge branch 'main' into refCntArena
ChrisHegarty Jul 23, 2024
5576c81
add sys prop
ChrisHegarty Jul 23, 2024
80f3d5c
move sys prop and gen grouping
ChrisHegarty Jul 24, 2024
8a6eba0
revert accidental
ChrisHegarty Jul 24, 2024
99262e2
remove optional, public sys prop, and review comments
ChrisHegarty Jul 24, 2024
aeab592
fix typo
ChrisHegarty Jul 24, 2024
e86191c
Merge branch 'main' into refCntArena
ChrisHegarty Jul 25, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions lucene/CHANGES.txt
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,13 @@ Optimizations
* GITHUB#13582: Stop requiring MaxScoreBulkScorer's outer window from having at
least INNER_WINDOW_SIZE docs. (Adrien Grand)

* GITHUB#13570, GITHUB#13574, GITHUB#13535: Avoid performance degradation with closing shared Arenas.
Closing many individual index files can potentially lead to a degradation in execution performance.
Index files are mmapped one-to-one with the JDK's foreign shared Arena. The JVM deoptimizes the top
few frames of all threads when closing a shared Arena (see JDK-8335480). We mitigate this situation
by 1) using a confined Arena where appropriate, and 2) grouping files from the same segment to a
single shared Arena. (Chris Hegarty, Michael Gibney, Uwe Schindler)

Changes in runtime behavior
---------------------

Expand Down
115 changes: 108 additions & 7 deletions lucene/core/src/java/org/apache/lucene/store/MMapDirectory.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,20 @@
*/
package org.apache.lucene.store;

import static org.apache.lucene.index.IndexFileNames.CODEC_FILE_PATTERN;

import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.lang.invoke.MethodType;
import java.nio.channels.ClosedChannelException; // javadoc @link
import java.nio.file.Path;
import java.util.Locale;
import java.util.Optional;
import java.util.concurrent.Future;
import java.util.function.BiPredicate;
import java.util.function.Function;
import java.util.logging.Logger;
import org.apache.lucene.index.IndexFileNames;
import org.apache.lucene.util.Constants;

/**
Expand All @@ -42,6 +48,11 @@
* performance of searches on a cold page cache at the expense of slowing down opening an index. See
* {@link #setPreload(BiPredicate)} for more details.
*
* <p>This class supports grouping of files that are part of the same logical group. This is a hint
* that allows for better handling of resources. For example, individual files that are part of the
* same segment can be considered part of the same logical group. See {@link
* #setGroupingFunction(Function)} for more details.
*
* <p>This class will use the modern {@link java.lang.foreign.MemorySegment} API available since
* Java 21 which allows to safely unmap previously mmapped files after closing the {@link
* IndexInput}s. There is no need to enable the "preview feature" of your Java version; it works out
Expand Down Expand Up @@ -83,6 +94,41 @@ public class MMapDirectory extends FSDirectory {
*/
public static final BiPredicate<String, IOContext> NO_FILES = (filename, context) -> false;

/**
* This sysprop allows to control the total maximum number of mmapped files that can be associated
* with a single shared {@link java.lang.foreign.Arena foreign Arena}. For example, to set the max
* number of permits to 256, pass the following on the command line pass {@code
* -Dorg.apache.lucene.store.MMapDirectory.sharedArenaMaxPermits=256}. Setting a value of 1
* associates one file to one shared arena.
*
* @lucene.internal
*/
public static final String SHARED_ARENA_MAX_PERMITS_SYSPROP =
"org.apache.lucene.store.MMapDirectory.sharedArenaMaxPermits";

/** Argument for {@link #setGroupingFunction(Function)} that configures no grouping. */
public static final Function<String, Optional<String>> NO_GROUPING = filename -> Optional.empty();

/** Argument for {@link #setGroupingFunction(Function)} that configures grouping by segment. */
public static final Function<String, Optional<String>> GROUP_BY_SEGMENT =
filename -> {
if (!CODEC_FILE_PATTERN.matcher(filename).matches()) {
return Optional.empty();
}
String groupKey = IndexFileNames.parseSegmentName(filename).substring(1);
try {
ChrisHegarty marked this conversation as resolved.
Show resolved Hide resolved
// keep the original generation (=0) in base group, later generations in extra group
if (IndexFileNames.parseGeneration(filename) > 0) {
groupKey += "-g";
}
} catch (
@SuppressWarnings("unused")
NumberFormatException unused) {
// does not confirm to the generation syntax, or trash
ChrisHegarty marked this conversation as resolved.
Show resolved Hide resolved
}
return Optional.of(groupKey);
};

/**
* Argument for {@link #setPreload(BiPredicate)} that configures files to be preloaded upon
* opening them if they use the {@link ReadAdvice#RANDOM_PRELOAD} advice.
Expand All @@ -102,6 +148,11 @@ public class MMapDirectory extends FSDirectory {
*/
public static final long DEFAULT_MAX_CHUNK_SIZE;

/** A provider specific context object or null, that will be passed to openInput. */
final Object attachment = PROVIDER.attachment();

private Function<String, Optional<String>> groupingFunction = GROUP_BY_SEGMENT;

final int chunkSizePower;

/**
Expand Down Expand Up @@ -184,6 +235,21 @@ public void setPreload(BiPredicate<String, IOContext> preload) {
this.preload = preload;
}

/**
ChrisHegarty marked this conversation as resolved.
Show resolved Hide resolved
* Configures a grouping function for files that are part of the same logical group. The gathering
* of files into a logical group is a hint that allows for better handling of resources.
*
* <p>By default, grouping is {@link #GROUP_BY_SEGMENT}. To disable, invoke this method with
* {@link #NO_GROUPING}.
*
* @param groupingFunction a function that accepts a file name and returns an optional group key.
* If the optional is present, then its value is the logical group to which the file belongs.
* Otherwise, the file name if not associated with any logical group.
*/
public void setGroupingFunction(Function<String, Optional<String>> groupingFunction) {
this.groupingFunction = groupingFunction;
}

/**
* Returns the current mmap chunk size.
*
Expand All @@ -199,20 +265,37 @@ public IndexInput openInput(String name, IOContext context) throws IOException {
ensureOpen();
ensureCanRead(name);
Path path = directory.resolve(name);
return PROVIDER.openInput(path, context, chunkSizePower, preload.test(name, context));
return PROVIDER.openInput(
path,
context,
chunkSizePower,
preload.test(name, context),
groupingFunction.apply(name),
attachment);
}

// visible for tests:
static final MMapIndexInputProvider PROVIDER;
static final MMapIndexInputProvider<Object> PROVIDER;

interface MMapIndexInputProvider {
IndexInput openInput(Path path, IOContext context, int chunkSizePower, boolean preload)
interface MMapIndexInputProvider<A> {
IndexInput openInput(
Path path,
IOContext context,
int chunkSizePower,
boolean preload,
Optional<String> group,
A attachment)
throws IOException;

long getDefaultMaxChunkSize();

boolean supportsMadvise();

/** An optional attachment of the provider, that will be passed to openInput. */
ChrisHegarty marked this conversation as resolved.
Show resolved Hide resolved
default A attachment() {
return null;
}

default IOException convertMapFailedIOException(
IOException ioe, String resourceDescription, long bufSize) {
final String originalMessage;
Expand Down Expand Up @@ -256,15 +339,33 @@ default IOException convertMapFailedIOException(
}
}

private static MMapIndexInputProvider lookupProvider() {
private static int getSharedArenaMaxPermitsSysprop() {
int ret = 1024; // default value
try {
String str = System.getProperty(SHARED_ARENA_MAX_PERMITS_SYSPROP);
if (str != null) {
ret = Integer.parseInt(str);
}
} catch (@SuppressWarnings("unused") NumberFormatException | SecurityException ignored) {
Logger.getLogger(MMapDirectory.class.getName())
.warning(
"Cannot read sysprop "
+ SHARED_ARENA_MAX_PERMITS_SYSPROP
+ ", so the default value will be used.");
}
return ret;
}

private static <A> MMapIndexInputProvider<A> lookupProvider() {
uschindler marked this conversation as resolved.
Show resolved Hide resolved
ChrisHegarty marked this conversation as resolved.
Show resolved Hide resolved
final var maxPermits = getSharedArenaMaxPermitsSysprop();
final var lookup = MethodHandles.lookup();
try {
final var cls = lookup.findClass("org.apache.lucene.store.MemorySegmentIndexInputProvider");
// we use method handles, so we do not need to deal with setAccessible as we have private
// access through the lookup:
final var constr = lookup.findConstructor(cls, MethodType.methodType(void.class));
final var constr = lookup.findConstructor(cls, MethodType.methodType(void.class, int.class));
try {
return (MMapIndexInputProvider) constr.invoke();
return (MMapIndexInputProvider<A>) constr.invoke(maxPermits);
} catch (RuntimeException | Error e) {
throw e;
} catch (Throwable th) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,20 +24,32 @@
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.logging.Logger;
import org.apache.lucene.util.Constants;
import org.apache.lucene.util.Unwrappable;

@SuppressWarnings("preview")
final class MemorySegmentIndexInputProvider implements MMapDirectory.MMapIndexInputProvider {
final class MemorySegmentIndexInputProvider
implements MMapDirectory.MMapIndexInputProvider<
ConcurrentHashMap<String, RefCountedSharedArena>> {

private final Optional<NativeAccess> nativeAccess;
private final int sharedArenaMaxPermits;

MemorySegmentIndexInputProvider() {
MemorySegmentIndexInputProvider(int maxPermits) {
this.nativeAccess = NativeAccess.getImplementation();
this.sharedArenaMaxPermits = checkMaxPermits(maxPermits);
}

@Override
public IndexInput openInput(Path path, IOContext context, int chunkSizePower, boolean preload)
public IndexInput openInput(
Path path,
IOContext context,
int chunkSizePower,
boolean preload,
Optional<String> group,
ConcurrentHashMap<String, RefCountedSharedArena> arenas)
throws IOException {
final String resourceDescription = "MemorySegmentIndexInput(path=\"" + path.toString() + "\")";

Expand All @@ -46,7 +58,7 @@ public IndexInput openInput(Path path, IOContext context, int chunkSizePower, bo

boolean success = false;
final boolean confined = context == IOContext.READONCE;
final Arena arena = confined ? Arena.ofConfined() : Arena.ofShared();
final Arena arena = confined ? Arena.ofConfined() : getSharedArena(group, arenas);
try (var fc = FileChannel.open(path, StandardOpenOption.READ)) {
final long fileSize = fc.size();
final IndexInput in =
Expand Down Expand Up @@ -125,4 +137,53 @@ private final MemorySegment[] map(
}
return segments;
}

@Override
public ConcurrentHashMap<String, RefCountedSharedArena> attachment() {
ChrisHegarty marked this conversation as resolved.
Show resolved Hide resolved
return new ConcurrentHashMap<>();
}

private static int checkMaxPermits(int maxPermits) {
if (RefCountedSharedArena.validMaxPermits(maxPermits)) {
return maxPermits;
}
Logger.getLogger(MemorySegmentIndexInputProvider.class.getName())
.warning(
"Invalid value for sysprop "
+ MMapDirectory.SHARED_ARENA_MAX_PERMITS_SYSPROP
+ ", must be positive and <= 0x07FF. The default value will be used.");
return RefCountedSharedArena.DEFAULT_MAX_PERMITS;
}

/**
* Gets an arena for the given group, potentially aggregating files from the same segment into a
* single ref counted shared arena. A ref counted shared arena, if created will be added to the
* given arenas map.
*/
private Arena getSharedArena(
Optional<String> group, ConcurrentHashMap<String, RefCountedSharedArena> arenas) {
if (group.isEmpty()) {
return Arena.ofShared();
ChrisHegarty marked this conversation as resolved.
Show resolved Hide resolved
}

String key = group.get();
var refCountedArena =
arenas.computeIfAbsent(
key, s -> new RefCountedSharedArena(s, () -> arenas.remove(s), sharedArenaMaxPermits));
if (refCountedArena.acquire()) {
return refCountedArena;
} else {
return arenas.compute(
key,
(s, v) -> {
if (v != null && v.acquire()) {
return v;
} else {
v = new RefCountedSharedArena(s, () -> arenas.remove(s), sharedArenaMaxPermits);
v.acquire(); // guaranteed to succeed
return v;
}
});
}
}
}
Loading