Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Aggregate files from the same segment into a single Arena #13570

Merged
merged 32 commits into from
Jul 25, 2024
Merged
Show file tree
Hide file tree
Changes from 23 commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
2d6af43
RefCountedSharedArena
ChrisHegarty Jul 14, 2024
fa0a962
remove optional
ChrisHegarty Jul 15, 2024
fe65dfe
use AtomicInteger
ChrisHegarty Jul 15, 2024
f68a917
improve test
ChrisHegarty Jul 15, 2024
7d2124f
Merge branch 'main' into refCntArena
ChrisHegarty Jul 15, 2024
d68e26e
private final
ChrisHegarty Jul 15, 2024
6b6d7c1
simplify with decrementAndGet
ChrisHegarty Jul 15, 2024
f39c401
rename to onClose
ChrisHegarty Jul 16, 2024
e65c4cb
add override
ChrisHegarty Jul 16, 2024
9357251
Merge branch 'main' into refCntArena
ChrisHegarty Jul 17, 2024
88d80de
update test
ChrisHegarty Jul 17, 2024
316d3f2
changes
ChrisHegarty Jul 17, 2024
a1d2d80
rewording
ChrisHegarty Jul 17, 2024
4591f7f
Add grouping API function
ChrisHegarty Jul 17, 2024
ef13a1e
javadoc and initialization
ChrisHegarty Jul 18, 2024
29fe87f
Add validation for segment id
ChrisHegarty Jul 18, 2024
10721d3
Merge branch 'main' into refCntArena
ChrisHegarty Jul 18, 2024
ba127f4
use CODEC_FILE_PATTERN
ChrisHegarty Jul 18, 2024
4f6eae7
optional generation
ChrisHegarty Jul 18, 2024
d1731b3
suppress unused
ChrisHegarty Jul 18, 2024
c42fbec
Add limit ref counting
ChrisHegarty Jul 19, 2024
ffe4ec6
lifetime ref count & fix compute in map
ChrisHegarty Jul 20, 2024
9aa7b8e
typo
ChrisHegarty Jul 20, 2024
875bcb1
ref counted review comments
ChrisHegarty Jul 23, 2024
77819db
typo
ChrisHegarty Jul 23, 2024
9b859de
Merge branch 'main' into refCntArena
ChrisHegarty Jul 23, 2024
5576c81
add sys prop
ChrisHegarty Jul 23, 2024
80f3d5c
move sys prop and gen grouping
ChrisHegarty Jul 24, 2024
8a6eba0
revert accidental
ChrisHegarty Jul 24, 2024
99262e2
remove optional, public sys prop, and review comments
ChrisHegarty Jul 24, 2024
aeab592
fix typo
ChrisHegarty Jul 24, 2024
e86191c
Merge branch 'main' into refCntArena
ChrisHegarty Jul 25, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions lucene/CHANGES.txt
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,13 @@ Optimizations
* GITHUB#13582: Stop requiring MaxScoreBulkScorer's outer window from having at
least INNER_WINDOW_SIZE docs. (Adrien Grand)

* GITHUB#13570, GITHUB#13574, GITHUB#13535: Avoid performance degradation with closing shared Arenas.
Closing many individual index files can potentially lead to a degradation in execution performance.
Index files are mmapped one-to-one with the JDK's foreign shared Arena. The JVM deoptimizes the top
few frames of all threads when closing a shared Arena (see JDK-8335480). We mitigate this situation
by 1) using a confined Arena where appropriate, and 2) grouping files from the same segment to a
single shared Arena. (Chris Hegarty, Michael Gibney, Uwe Schindler)

Changes in runtime behavior
---------------------

Expand Down
79 changes: 73 additions & 6 deletions lucene/core/src/java/org/apache/lucene/store/MMapDirectory.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,19 @@
*/
package org.apache.lucene.store;

import static org.apache.lucene.index.IndexFileNames.CODEC_FILE_PATTERN;

import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.lang.invoke.MethodType;
import java.nio.channels.ClosedChannelException; // javadoc @link
import java.nio.file.Path;
import java.util.Locale;
import java.util.Optional;
import java.util.concurrent.Future;
import java.util.function.BiPredicate;
import java.util.function.Function;
import org.apache.lucene.index.IndexFileNames;
import org.apache.lucene.util.Constants;

/**
Expand All @@ -42,6 +47,11 @@
* performance of searches on a cold page cache at the expense of slowing down opening an index. See
* {@link #setPreload(BiPredicate)} for more details.
*
* <p>This class supports grouping of files that are part of the same logical group. This is a hint
* that allows for better handling of resources. For example, individual files that are part of the
* same segment can be considered part of the same logical group. See {@link
* #setGroupingFunction(Function)} for more details.
*
* <p>This class will use the modern {@link java.lang.foreign.MemorySegment} API available since
* Java 21 which allows to safely unmap previously mmapped files after closing the {@link
* IndexInput}s. There is no need to enable the "preview feature" of your Java version; it works out
Expand Down Expand Up @@ -83,6 +93,26 @@ public class MMapDirectory extends FSDirectory {
*/
public static final BiPredicate<String, IOContext> NO_FILES = (filename, context) -> false;

/** Argument for {@link #setGroupingFunction(Function)} that configures no grouping. */
public static final Function<String, Optional<String>> NO_GROUPING = filename -> Optional.empty();

/** Argument for {@link #setGroupingFunction(Function)} that configures grouping by segment. */
public static final Function<String, Optional<String>> GROUP_BY_SEGMENT =
filename -> {
if (!CODEC_FILE_PATTERN.matcher(filename).matches()) {
return Optional.empty();
}
String groupKey = IndexFileNames.parseSegmentName(filename).substring(1);
try {
ChrisHegarty marked this conversation as resolved.
Show resolved Hide resolved
groupKey += "-" + IndexFileNames.parseGeneration(filename);
} catch (
@SuppressWarnings("unused")
NumberFormatException unused) {
// does not confirm to the generation syntax, or trash
ChrisHegarty marked this conversation as resolved.
Show resolved Hide resolved
}
return Optional.of(groupKey);
};

/**
* Argument for {@link #setPreload(BiPredicate)} that configures files to be preloaded upon
* opening them if they use the {@link ReadAdvice#RANDOM_PRELOAD} advice.
Expand All @@ -102,6 +132,11 @@ public class MMapDirectory extends FSDirectory {
*/
public static final long DEFAULT_MAX_CHUNK_SIZE;

/** A provider specific context object or null, that will be passed to openInput. */
final Object attachment = PROVIDER.attachment();

private Function<String, Optional<String>> groupingFunction = GROUP_BY_SEGMENT;

final int chunkSizePower;

/**
Expand Down Expand Up @@ -184,6 +219,21 @@ public void setPreload(BiPredicate<String, IOContext> preload) {
this.preload = preload;
}

/**
ChrisHegarty marked this conversation as resolved.
Show resolved Hide resolved
* Configures a grouping function for files that are part of the same logical group. The gathering
* of files into a logical group is a hint that allows for better handling of resources.
*
* <p>By default, grouping is {@link #GROUP_BY_SEGMENT}. To disable, invoke this method with
* {@link #NO_GROUPING}.
*
* @param groupingFunction a function that accepts a file name and returns an optional group key.
* If the optional is present, then its value is the logical group to which the file belongs.
* Otherwise, the file name if not associated with any logical group.
*/
public void setGroupingFunction(Function<String, Optional<String>> groupingFunction) {
this.groupingFunction = groupingFunction;
}

/**
* Returns the current mmap chunk size.
*
Expand All @@ -199,20 +249,37 @@ public IndexInput openInput(String name, IOContext context) throws IOException {
ensureOpen();
ensureCanRead(name);
Path path = directory.resolve(name);
return PROVIDER.openInput(path, context, chunkSizePower, preload.test(name, context));
return PROVIDER.openInput(
path,
context,
chunkSizePower,
preload.test(name, context),
groupingFunction.apply(name),
attachment);
}

// visible for tests:
static final MMapIndexInputProvider PROVIDER;
static final MMapIndexInputProvider<Object> PROVIDER;

interface MMapIndexInputProvider {
IndexInput openInput(Path path, IOContext context, int chunkSizePower, boolean preload)
interface MMapIndexInputProvider<A> {
IndexInput openInput(
Path path,
IOContext context,
int chunkSizePower,
boolean preload,
Optional<String> group,
A attachment)
throws IOException;

long getDefaultMaxChunkSize();

boolean supportsMadvise();

/** An optional attachment of the provider, that will be passed to openInput. */
ChrisHegarty marked this conversation as resolved.
Show resolved Hide resolved
default A attachment() {
return null;
}

default IOException convertMapFailedIOException(
IOException ioe, String resourceDescription, long bufSize) {
final String originalMessage;
Expand Down Expand Up @@ -256,15 +323,15 @@ default IOException convertMapFailedIOException(
}
}

private static MMapIndexInputProvider lookupProvider() {
private static <A> MMapIndexInputProvider<A> lookupProvider() {
uschindler marked this conversation as resolved.
Show resolved Hide resolved
ChrisHegarty marked this conversation as resolved.
Show resolved Hide resolved
final var lookup = MethodHandles.lookup();
try {
final var cls = lookup.findClass("org.apache.lucene.store.MemorySegmentIndexInputProvider");
// we use method handles, so we do not need to deal with setAccessible as we have private
// access through the lookup:
final var constr = lookup.findConstructor(cls, MethodType.methodType(void.class));
try {
return (MMapIndexInputProvider) constr.invoke();
return (MMapIndexInputProvider<A>) constr.invoke();
} catch (RuntimeException | Error e) {
throw e;
} catch (Throwable th) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,14 @@
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.lucene.util.Constants;
import org.apache.lucene.util.Unwrappable;

@SuppressWarnings("preview")
final class MemorySegmentIndexInputProvider implements MMapDirectory.MMapIndexInputProvider {
final class MemorySegmentIndexInputProvider
implements MMapDirectory.MMapIndexInputProvider<
ConcurrentHashMap<String, RefCountedSharedArena>> {

private final Optional<NativeAccess> nativeAccess;

Expand All @@ -37,7 +40,13 @@ final class MemorySegmentIndexInputProvider implements MMapDirectory.MMapIndexIn
}

@Override
public IndexInput openInput(Path path, IOContext context, int chunkSizePower, boolean preload)
public IndexInput openInput(
Path path,
IOContext context,
int chunkSizePower,
boolean preload,
Optional<String> group,
ConcurrentHashMap<String, RefCountedSharedArena> arenas)
throws IOException {
final String resourceDescription = "MemorySegmentIndexInput(path=\"" + path.toString() + "\")";

Expand All @@ -46,7 +55,7 @@ public IndexInput openInput(Path path, IOContext context, int chunkSizePower, bo

boolean success = false;
final boolean confined = context == IOContext.READONCE;
final Arena arena = confined ? Arena.ofConfined() : Arena.ofShared();
final Arena arena = confined ? Arena.ofConfined() : getSharedArena(group, arenas);
try (var fc = FileChannel.open(path, StandardOpenOption.READ)) {
final long fileSize = fc.size();
final IndexInput in =
Expand Down Expand Up @@ -125,4 +134,40 @@ private final MemorySegment[] map(
}
return segments;
}

@Override
public ConcurrentHashMap<String, RefCountedSharedArena> attachment() {
ChrisHegarty marked this conversation as resolved.
Show resolved Hide resolved
return new ConcurrentHashMap<>();
}

/**
* Gets an arena for the given group, potentially aggregating files from the same segment into a
* single ref counted shared arena. A ref counted shared arena, if created will be added to the
* given arenas map.
*/
static Arena getSharedArena(
Optional<String> group, ConcurrentHashMap<String, RefCountedSharedArena> arenas) {
if (group.isEmpty()) {
return Arena.ofShared();
ChrisHegarty marked this conversation as resolved.
Show resolved Hide resolved
}

String key = group.get();
var refCountedArena =
arenas.computeIfAbsent(key, s -> new RefCountedSharedArena(s, () -> arenas.remove(s)));
if (refCountedArena.acquire()) {
return refCountedArena;
} else {
return arenas.compute(
key,
(s, v) -> {
if (v != null && v.acquire()) {
return v;
} else {
v = new RefCountedSharedArena(s, () -> arenas.remove(s));
v.acquire(); // guaranteed to succeed
return v;
}
});
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.lucene.store;

import java.lang.foreign.Arena;
import java.lang.foreign.MemorySegment;
import java.util.concurrent.atomic.AtomicInteger;

/**
* A reference counted share Arena.
ChrisHegarty marked this conversation as resolved.
Show resolved Hide resolved
*
* <p>The purpose of this class is to allow a number of mmapped memory segments to be associated
* with a single underlying arena in order to avoid closing the underlying arena until all segments
* are closed. Typically, these memory segments belong to the same logical group, e.g. individual
* files of the same index segment. We do this to avoid the expensive cost of closing a shared
* Arena.
*
* <p>The reference count is increased by {@link #acquire()}, and decreased by {@link #release()}.
* When the reference count reaches 0, then the underlying arena is closed and the given {@code
* onClose} runnable is executed. No more references can be acquired.
*
* <p>The total number of acquires that can be obtained for the lifetime of an instance of this
* class is 1024. When the total number of acquires is exhausted, then not more acquires are
* permitted and {@link #acquire()} returns false. This is independent of the actual number of the
* ref count.
*/
@SuppressWarnings("preview")
final class RefCountedSharedArena implements Arena {

static final int CLOSED = 0;
// initial state of 0x400 (1024) maximum permits, and a ref count of 0
static final int INITIAL = 0x04000000;
ChrisHegarty marked this conversation as resolved.
Show resolved Hide resolved

private final String segmentName;
private final Runnable onClose;
private final Arena arena;

// high 16 bits contain the total remaining acquires; monotonically decreasing
// low 16 bit contain the current ref count
private final AtomicInteger state;

RefCountedSharedArena(String segmentName, Runnable onClose) {
this.segmentName = segmentName;
this.onClose = onClose;
this.arena = Arena.ofShared();
this.state = new AtomicInteger(INITIAL);
}

// for debugging
String getSegmentName() {
return segmentName;
}

/**
* Returns true if the ref count has been increased. Otherwise, false if there are no remaining
* acquires.
*/
boolean acquire() {
int value;
while (true) {
value = state.get();
if (value == CLOSED) {
throw new IllegalStateException("closed");
}
final int remaining = value >>> 16;
if (remaining == 0) {
return false;
}
ChrisHegarty marked this conversation as resolved.
Show resolved Hide resolved
int newValue = ((remaining - 1) << 16) | ((value & 0xFFFF) + 1);
ChrisHegarty marked this conversation as resolved.
Show resolved Hide resolved
if (this.state.compareAndSet(value, newValue)) {
ChrisHegarty marked this conversation as resolved.
Show resolved Hide resolved
return true;
}
}
}

/** Decrements the ref count. */
void release() {
int value;
while (true) {
value = state.get();
if (value == CLOSED) {
throw new IllegalStateException("closed");
}
final int count = value & 0xFFFF;
if (count == 0) {
throw new IllegalStateException("nothing to release");
}
ChrisHegarty marked this conversation as resolved.
Show resolved Hide resolved
final int newValue = count == 1 ? CLOSED : value - 1;
if (this.state.compareAndSet(value, newValue)) {
if (newValue == CLOSED) {
onClose.run();
arena.close();
}
return;
}
}
}

@Override
public void close() {
release();
}

@Override
public MemorySegment allocate(long byteSize, long byteAlignment) {
throw new UnsupportedOperationException();
}

@Override
public MemorySegment.Scope scope() {
return arena.scope();
}

@Override
public String toString() {
return "RefCountedArena[segmentName="
+ segmentName
+ ", value="
+ state.get()
+ ", arena="
+ arena
+ "]";
}
}
Loading