Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Aggregate files from the same segment into a single Arena #13570

Merged
merged 32 commits into from
Jul 25, 2024
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
2d6af43
RefCountedSharedArena
ChrisHegarty Jul 14, 2024
fa0a962
remove optional
ChrisHegarty Jul 15, 2024
fe65dfe
use AtomicInteger
ChrisHegarty Jul 15, 2024
f68a917
improve test
ChrisHegarty Jul 15, 2024
7d2124f
Merge branch 'main' into refCntArena
ChrisHegarty Jul 15, 2024
d68e26e
private final
ChrisHegarty Jul 15, 2024
6b6d7c1
simplify with decrementAndGet
ChrisHegarty Jul 15, 2024
f39c401
rename to onClose
ChrisHegarty Jul 16, 2024
e65c4cb
add override
ChrisHegarty Jul 16, 2024
9357251
Merge branch 'main' into refCntArena
ChrisHegarty Jul 17, 2024
88d80de
update test
ChrisHegarty Jul 17, 2024
316d3f2
changes
ChrisHegarty Jul 17, 2024
a1d2d80
rewording
ChrisHegarty Jul 17, 2024
4591f7f
Add grouping API function
ChrisHegarty Jul 17, 2024
ef13a1e
javadoc and initialization
ChrisHegarty Jul 18, 2024
29fe87f
Add validation for segment id
ChrisHegarty Jul 18, 2024
10721d3
Merge branch 'main' into refCntArena
ChrisHegarty Jul 18, 2024
ba127f4
use CODEC_FILE_PATTERN
ChrisHegarty Jul 18, 2024
4f6eae7
optional generation
ChrisHegarty Jul 18, 2024
d1731b3
suppress unused
ChrisHegarty Jul 18, 2024
c42fbec
Add limit ref counting
ChrisHegarty Jul 19, 2024
ffe4ec6
lifetime ref count & fix compute in map
ChrisHegarty Jul 20, 2024
9aa7b8e
typo
ChrisHegarty Jul 20, 2024
875bcb1
ref counted review comments
ChrisHegarty Jul 23, 2024
77819db
typo
ChrisHegarty Jul 23, 2024
9b859de
Merge branch 'main' into refCntArena
ChrisHegarty Jul 23, 2024
5576c81
add sys prop
ChrisHegarty Jul 23, 2024
80f3d5c
move sys prop and gen grouping
ChrisHegarty Jul 24, 2024
8a6eba0
revert accidental
ChrisHegarty Jul 24, 2024
99262e2
remove optional, public sys prop, and review comments
ChrisHegarty Jul 24, 2024
aeab592
fix typo
ChrisHegarty Jul 24, 2024
e86191c
Merge branch 'main' into refCntArena
ChrisHegarty Jul 25, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 16 additions & 6 deletions lucene/core/src/java/org/apache/lucene/store/MMapDirectory.java
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,9 @@ public class MMapDirectory extends FSDirectory {
*/
public static final long DEFAULT_MAX_CHUNK_SIZE;

/** A provider specific context object or null, that will be passed to openInput. */
final Object attachment = PROVIDER.attachment();

final int chunkSizePower;

/**
Expand Down Expand Up @@ -199,20 +202,27 @@ public IndexInput openInput(String name, IOContext context) throws IOException {
ensureOpen();
ensureCanRead(name);
Path path = directory.resolve(name);
return PROVIDER.openInput(path, context, chunkSizePower, preload.test(name, context));
return PROVIDER.openInput(
path, context, chunkSizePower, preload.test(name, context), attachment);
}

// visible for tests:
static final MMapIndexInputProvider PROVIDER;
static final MMapIndexInputProvider<Object> PROVIDER;

interface MMapIndexInputProvider {
IndexInput openInput(Path path, IOContext context, int chunkSizePower, boolean preload)
interface MMapIndexInputProvider<A> {
IndexInput openInput(
Path path, IOContext context, int chunkSizePower, boolean preload, A attachment)
throws IOException;

long getDefaultMaxChunkSize();

boolean supportsMadvise();

/** An optional attachment of the provider, that will be passed to openInput. */
ChrisHegarty marked this conversation as resolved.
Show resolved Hide resolved
default A attachment() {
return null;
}

default IOException convertMapFailedIOException(
IOException ioe, String resourceDescription, long bufSize) {
final String originalMessage;
Expand Down Expand Up @@ -256,15 +266,15 @@ default IOException convertMapFailedIOException(
}
}

private static MMapIndexInputProvider lookupProvider() {
private static <A> MMapIndexInputProvider<A> lookupProvider() {
uschindler marked this conversation as resolved.
Show resolved Hide resolved
ChrisHegarty marked this conversation as resolved.
Show resolved Hide resolved
final var lookup = MethodHandles.lookup();
try {
final var cls = lookup.findClass("org.apache.lucene.store.MemorySegmentIndexInputProvider");
// we use method handles, so we do not need to deal with setAccessible as we have private
// access through the lookup:
final var constr = lookup.findConstructor(cls, MethodType.methodType(void.class));
try {
return (MMapIndexInputProvider) constr.invoke();
return (MMapIndexInputProvider<A>) constr.invoke();
} catch (RuntimeException | Error e) {
throw e;
} catch (Throwable th) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,15 @@
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.lucene.index.IndexFileNames;
import org.apache.lucene.util.Constants;
import org.apache.lucene.util.Unwrappable;

@SuppressWarnings("preview")
final class MemorySegmentIndexInputProvider implements MMapDirectory.MMapIndexInputProvider {
final class MemorySegmentIndexInputProvider
implements MMapDirectory.MMapIndexInputProvider<
ConcurrentHashMap<String, RefCountedSharedArena>> {

private final Optional<NativeAccess> nativeAccess;

Expand All @@ -37,7 +41,12 @@ final class MemorySegmentIndexInputProvider implements MMapDirectory.MMapIndexIn
}

@Override
public IndexInput openInput(Path path, IOContext context, int chunkSizePower, boolean preload)
public IndexInput openInput(
Path path,
IOContext context,
int chunkSizePower,
boolean preload,
ConcurrentHashMap<String, RefCountedSharedArena> arenas)
throws IOException {
final String resourceDescription = "MemorySegmentIndexInput(path=\"" + path.toString() + "\")";

Expand All @@ -46,7 +55,7 @@ public IndexInput openInput(Path path, IOContext context, int chunkSizePower, bo

boolean success = false;
final boolean confined = context == IOContext.READONCE;
final Arena arena = confined ? Arena.ofConfined() : Arena.ofShared();
final Arena arena = confined ? Arena.ofConfined() : getSharedArena(path, arenas);
try (var fc = FileChannel.open(path, StandardOpenOption.READ)) {
final long fileSize = fc.size();
final IndexInput in =
Expand Down Expand Up @@ -125,4 +134,31 @@ private final MemorySegment[] map(
}
return segments;
}

public ConcurrentHashMap<String, RefCountedSharedArena> attachment() {
ChrisHegarty marked this conversation as resolved.
Show resolved Hide resolved
return new ConcurrentHashMap<>();
}

/**
* Gets an arena for the give path, potentially aggregating files from the same segment into a
* single ref counted shared arena. A ref counted shared arena, if created will be added to the
* given arenas map.
*/
static Arena getSharedArena(Path p, ConcurrentHashMap<String, RefCountedSharedArena> arenas) {
String filename = p.getFileName().toString();
String segmentName = IndexFileNames.parseSegmentName(filename);
if (filename.length() == segmentName.length()) {
// no segment found; just use a shared segment
return Arena.ofShared();
ChrisHegarty marked this conversation as resolved.
Show resolved Hide resolved
}

while (true) {
var refCountedArena =
arenas.computeIfAbsent(
segmentName, s -> new RefCountedSharedArena(s, () -> arenas.remove(s)));
if (refCountedArena.acquire()) {
return refCountedArena;
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.lucene.store;

import java.lang.foreign.Arena;
import java.lang.foreign.MemorySegment;
import java.util.concurrent.atomic.AtomicInteger;

@SuppressWarnings("preview")
final class RefCountedSharedArena implements Arena {

static final int OPEN = 0;
static final int CLOSED = -1;

private final String segmentName;
private final Runnable removeFromMap;
ChrisHegarty marked this conversation as resolved.
Show resolved Hide resolved
private final Arena arena;
private final AtomicInteger state;

RefCountedSharedArena(String segmentName, Runnable removeFromMap) {
this.segmentName = segmentName;
this.removeFromMap = removeFromMap;
this.arena = Arena.ofShared();
this.state = new AtomicInteger(OPEN);
}

// for debugging
String getSegmentName() {
return segmentName;
}

boolean acquire() {
int value;
while (true) {
value = state.get();
if (value < OPEN) {
return false;
}
if (state.compareAndSet(value, value + 1)) {
return true;
}
}
}

void release() {
int value;
while (true) {
value = state.get();
if (value <= OPEN) {
throw new IllegalStateException("already closed");
}
if (state.compareAndSet(value, value - 1)) {
ChrisHegarty marked this conversation as resolved.
Show resolved Hide resolved
if (value - 1 == OPEN && state.compareAndSet(OPEN, CLOSED)) {
removeFromMap.run();
arena.close();
}
return;
}
}
}

@Override
public void close() {
release();
}

@Override
public MemorySegment allocate(long byteSize, long byteAlignment) {
throw new UnsupportedOperationException();
}

@Override
public MemorySegment.Scope scope() {
return arena.scope();
}

@Override
public String toString() {
return "RefCountedArena[segmentName="
+ segmentName
+ ", value="
+ state.get()
+ ", arena="
+ arena
+ "]";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,21 @@
*/
package org.apache.lucene.store;

import static java.util.stream.Collectors.toList;

import java.io.IOException;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.stream.IntStream;
import org.apache.lucene.tests.store.BaseDirectoryTestCase;
import org.apache.lucene.util.Constants;
import org.apache.lucene.util.NamedThreadFactory;
Expand Down Expand Up @@ -172,4 +179,76 @@ static Object getAndUnwrap(Future<Object> future) throws Throwable {
throw ee.getCause();
}
}

public void testArenas() throws Exception {
// First create a number of file name lists to test with.
var exts =
List.of(
".si", ".cfs", ".cfe", ".dvd", ".dvm", "nvd", ".nvm", ".fdt", ".vec", ".vex", ".vemf");
var names =
IntStream.range(0, 50)
.mapToObj(i -> "_" + i)
.flatMap(s -> exts.stream().map(ext -> s + ext))
.collect(toList());
IntStream.range(0, 50).mapToObj(i -> "segment_" + i).forEach(names::add);
Collections.shuffle(names, random());

final int size = 6;
byte[] bytes = new byte[size];
random().nextBytes(bytes);

try (var dir = new MMapDirectory(createTempDir("testArenas"))) {
for (var name : names) {
try (IndexOutput out = dir.createOutput(name, IOContext.DEFAULT)) {
out.writeBytes(bytes, 0, bytes.length);
}
}

int nThreads = 10;
int perListSize = (names.size() + nThreads) / nThreads;
List<List<String>> nameLists =
IntStream.range(0, nThreads)
.mapToObj(
i ->
names.subList(
perListSize * i, Math.min(perListSize * i + perListSize, names.size())))
.toList();

var threadFactory = new NamedThreadFactory("testArenas");
try (var executor = Executors.newFixedThreadPool(nThreads, threadFactory)) {
var tasks = nameLists.stream().map(l -> new IndicesOpenTask(l, dir)).toList();
var futures = tasks.stream().map(executor::submit).toList();
for (var future : futures) {
future.get();
}
}

if (!(dir.attachment instanceof ConcurrentHashMap<?, ?> map)) {
throw new AssertionError("unexpected attachment: " + dir.attachment);
}
assertEquals(0, map.size());
}
}

static class IndicesOpenTask implements Callable<Void> {
final List<String> names;
final Directory dir;

IndicesOpenTask(List<String> names, Directory dir) {
this.names = names;
this.dir = dir;
}

@Override
public Void call() throws Exception {
List<IndexInput> closeables = new ArrayList<>();
for (var name : names) {
closeables.add(dir.openInput(name, IOContext.DEFAULT));
}
for (IndexInput closeable : closeables) {
closeable.close();
}
return null;
}
}
}