Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Experimental trivial vectorized reads implementation for RAR #1487

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion build.xml
Original file line number Diff line number Diff line change
Expand Up @@ -717,7 +717,7 @@
<dependency groupId="org.apache.lucene" artifactId="lucene-core" version="9.8.0-5ea8bb4f21" />
<dependency groupId="org.apache.lucene" artifactId="lucene-analysis-common" version="9.8.0-5ea8bb4f21" />
<dependency groupId="org.apache.lucene" artifactId="lucene-backward-codecs" version="9.8.0-5ea8bb4f21" />
<dependency groupId="io.github.jbellis" artifactId="jvector" version="3.0.6" />
<dependency groupId="io.github.jbellis" artifactId="jvector" version="3.0.7-13de754a" />
<dependency groupId="com.bpodgursky" artifactId="jbool_expressions" version="1.14" scope="test"/>

<dependency groupId="com.carrotsearch.randomizedtesting" artifactId="randomizedtesting-runner" version="2.1.2" scope="test">
Expand Down
6 changes: 6 additions & 0 deletions src/java/org/apache/cassandra/cache/ChunkCache.java
Original file line number Diff line number Diff line change
Expand Up @@ -385,6 +385,12 @@ public CachingRebufferer(ChunkReader file)
alignmentMask = -chunkSize;
}

@Override
public boolean supportsConcurrentRebuffer()
{
return source.supportsReadingChunksConcurrently();
}

@Override
public BufferHolder rebuffer(long position)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -568,7 +568,13 @@ public enum CassandraRelevantProperties
* Do not wait for gossip to be enabled before starting stabilisation period. This is required especially for tests
* which do not enable gossip at all.
*/
CLUSTER_VERSION_PROVIDER_SKIP_WAIT_FOR_GOSSIP("cassandra.test.cluster_version_provider.skip_wait_for_gossip");
CLUSTER_VERSION_PROVIDER_SKIP_WAIT_FOR_GOSSIP("cassandra.test.cluster_version_provider.skip_wait_for_gossip"),


/**
* (Experimental) Thread pool size for RandomAccessReader "vectored" reads.
*/
RAR_VECTORED_READS_THREAD_POOL_SIZE("cassandra.rar.vectored_reads_thread_pool_size");

CassandraRelevantProperties(String key, String defaultVal)
{
Expand Down
5 changes: 5 additions & 0 deletions src/java/org/apache/cassandra/io/util/ChunkReader.java
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,11 @@ public interface ChunkReader extends RebuffererFactory
*/
void readChunk(long position, ByteBuffer buffer);

default boolean supportsReadingChunksConcurrently()
{
return false;
}

/**
* Buffer size required for this rebufferer. Must be power of 2 if alignment is required.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,12 @@ public void readChunk(long position, ByteBuffer uncompressed)
}
}

@Override
public boolean supportsReadingChunksConcurrently()
{
return true;
}

@Override
public void invalidateIfCached(long position)
{
Expand Down
113 changes: 113 additions & 0 deletions src/java/org/apache/cassandra/io/util/RandomAccessReader.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,31 @@
import java.nio.FloatBuffer;
import java.nio.IntBuffer;
import java.nio.LongBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import javax.annotation.concurrent.NotThreadSafe;

import com.google.common.primitives.Ints;

import io.netty.util.concurrent.FastThreadLocal;
import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor;
import org.apache.cassandra.concurrent.NamedThreadFactory;
import org.apache.cassandra.config.CassandraRelevantProperties;
import org.apache.cassandra.io.compress.BufferType;
import org.apache.cassandra.io.util.Rebufferer.BufferHolder;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.Throwables;

@NotThreadSafe
public class RandomAccessReader extends RebufferingInputStream implements FileDataInput, io.github.jbellis.jvector.disk.RandomAccessReader
{
private static final int VECTORED_READS_POOL_SIZE = CassandraRelevantProperties.RAR_VECTORED_READS_THREAD_POOL_SIZE.getInt(FBUtilities.getAvailableProcessors() / 2);

// The default buffer size when the client doesn't specify it
public static final int DEFAULT_BUFFER_SIZE = 4096;

Expand All @@ -43,6 +58,8 @@ public class RandomAccessReader extends RebufferingInputStream implements FileDa
private final ByteOrder order;
private BufferHolder bufferHolder;

private VectoredReadsPool vectoredReadsPool;

/**
* Only created through Builder
*
Expand Down Expand Up @@ -179,6 +196,55 @@ public void read(int[] dest, int offset, int count) throws IOException
readInts(buffer, order, dest, offset, count);
}

@Override
public void read(int[][] ints, long[] positions) throws IOException
{
if (!rebufferer.supportsConcurrentRebuffer())
{
io.github.jbellis.jvector.disk.RandomAccessReader.super.read(ints, positions);
return;
}

if (ints.length != positions.length)
throw new IllegalArgumentException(String.format("ints.length %d != positions.length %d", ints.length, positions.length));

if (ints.length == 0)
return;

maybeInitVectoredReadsPool();
List<CompletableFuture<Void>> futures = new ArrayList<>(ints.length - 1);
for (int i = 0; i < ints.length - 1; i++)
{
if (ints[i].length == 0)
continue;

futures.add(vectoredReadsPool.readAsync(ints[i], positions[i]));
}

// Read last array in the current thread both because "why not" and also so this RAR is set "as if" the read
// was done synchronously.
seek(positions[ints.length - 1]);
read(ints[ints.length - 1], 0, ints[ints.length - 1].length);

for (CompletableFuture<Void> future : futures)
{
try
{
future.get();
}
catch (Exception e)
{
throw Throwables.cleaned(e);
}
}
}

private void maybeInitVectoredReadsPool()
{
if (vectoredReadsPool == null)
vectoredReadsPool = new VectoredReadsPool(VECTORED_READS_POOL_SIZE, rebufferer, order);
}

private static void readFloats(ByteBuffer buffer, ByteOrder order, float[] dest, int offset, int count)
{
FloatBuffer floatBuffer = updateBufferByteOrderIfNeeded(buffer, order).asFloatBuffer();
Expand Down Expand Up @@ -478,4 +544,51 @@ public static RandomAccessReader open(File file)
}
}

private static class VectoredReadsPool
{
private final ThreadPoolExecutor executor;
private final FastThreadLocal<RandomAccessReader> perThreadReaders;
protected static final AtomicInteger id = new AtomicInteger(1);

VectoredReadsPool(int size, Rebufferer rebufferer, ByteOrder order)
{
this.executor = new DebuggableThreadPoolExecutor(size, Integer.MAX_VALUE, TimeUnit.SECONDS, new LinkedBlockingQueue<>(), new NamedThreadFactory("rar-vectored-reads-" + id.getAndIncrement()));
this.perThreadReaders = new FastThreadLocal<>()
{
@Override
protected RandomAccessReader initialValue()
{
return new RandomAccessReader(rebufferer, order, Rebufferer.EMPTY);
}

@Override
protected void onRemoval(RandomAccessReader reader)
{
reader.close();
}
};
}

CompletableFuture<Void> readAsync(int[] ints, long position)
{
return CompletableFuture.runAsync(() -> {
try
{
RandomAccessReader reader = perThreadReaders.get();
reader.seek(position);
reader.read(ints, 0, ints.length);
}
catch (Exception e)
{
throw Throwables.cleaned(e);
}
}, executor);
}

void close()
{
executor.shutdown();
}
}

}
5 changes: 5 additions & 0 deletions src/java/org/apache/cassandra/io/util/Rebufferer.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,11 @@ public interface Rebufferer extends ReaderFileProxy
*/
BufferHolder rebuffer(long position);

default boolean supportsConcurrentRebuffer()
{
return false;
}

/**
* Called when a reader is closed. Should clean up reader-specific data.
*/
Expand Down
6 changes: 6 additions & 0 deletions src/java/org/apache/cassandra/io/util/SimpleChunkReader.java
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,12 @@ public void readChunk(long position, ByteBuffer buffer)
buffer.flip();
}

@Override
public boolean supportsReadingChunksConcurrently()
{
return true;
}

@Override
public int chunkSize()
{
Expand Down
130 changes: 130 additions & 0 deletions test/unit/org/apache/cassandra/io/util/RandomAccessReaderTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.stream.Collectors;

import org.junit.Assert;
import org.junit.BeforeClass;
Expand All @@ -47,6 +48,7 @@

import static org.junit.Assert.*;

import org.apache.cassandra.cache.ChunkCache;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.io.compress.BufferType;
import org.apache.cassandra.utils.ByteBufferUtil;
Expand Down Expand Up @@ -508,6 +510,134 @@ private void testReadFullyIntArray(int shift, ByteOrder order) throws IOExceptio
}
}

// read vectored array tests - ints

private static final class VectoredIntReadArrayCase
{
private final int[][] expected;
private final long totalBytes;

static int counter;

VectoredIntReadArrayCase(int... numElements)
{
this.expected = new int[numElements.length][];
long bytes = 0;
for (int i = 0; i < numElements.length; i++)
{
int num = numElements[i];
bytes += ((long)num) * Integer.BYTES;
this.expected[i] = new int[num];
for (int j = 0; j < num; j++)
{
this.expected[i][j] = counter++;
}
}
this.totalBytes = bytes;
}

int[][] createSizedArray()
{
int[][] arr = new int[expected.length][];
for (int i = 0; i < expected.length; i++)
arr[i] = new int[expected[i].length];
return arr;
}

long[] computePositions(long initial)
{
long[] positions = new long[expected.length];
positions[0] = initial;
for (int i = 1; i < expected.length; i++)
positions[i] = positions[i - 1] + ((long)expected[i - 1].length) * Integer.BYTES;
return positions;
}

@Override
public String toString()
{
return Arrays.stream(expected).map(arr -> Integer.toString(arr.length)).collect(Collectors.joining(", ", "[", "]"));
}
}

@Test
public void testVectoredIntArray() throws IOException
{
testVectoredReadIntArray(ByteOrder.BIG_ENDIAN);
testVectoredReadIntArray(ByteOrder.LITTLE_ENDIAN);
}

private void testVectoredReadIntArray(ByteOrder order) throws IOException
{
// Note: using a small buffer would not work; the code in FileHandle enforce at least 4096 in the case
// we care about (simpl chunk reader), so reflecting this here.
int bufferSize = 4096;

List<VectoredIntReadArrayCase> cases = new ArrayList<>();
cases.add(new VectoredIntReadArrayCase(0, 0, 0));
cases.add(new VectoredIntReadArrayCase(10, 0, 10));
cases.add(new VectoredIntReadArrayCase(17, 100, 4, 12));
cases.add(new VectoredIntReadArrayCase(100, 100, 100));
cases.add(new VectoredIntReadArrayCase(121));
cases.add(new VectoredIntReadArrayCase(10000, 1));
cases.add(new VectoredIntReadArrayCase(1000, 20000, 10000, 30000));

File file = writeFile(writer -> {
try
{
writer.order(order);
for (VectoredIntReadArrayCase testCase : cases)
{
for (int[] array : testCase.expected)
for (int f : array)
writer.writeInt(f);
}
return false;
}
catch (IOException e)
{
throw new RuntimeException(e);
}
});

try (ChannelProxy channel = new ChannelProxy(file);
FileHandle.Builder builder = new FileHandle.Builder(channel)
.order(order)
.bufferType(BufferType.OFF_HEAP)
.mmapped(false)
.withChunkCache(ChunkCache.instance)
.bufferSize(bufferSize);
FileHandle fh = builder.complete();
RandomAccessReader reader = fh.createReader())
{
assertEquals(channel.size(), reader.length());
assertEquals(channel.size(), reader.bytesRemaining());
assertEquals(file.length(), reader.available());

// Running twice: first run will mostly read from the file; the 2nd run will use the chunk cache
doTestVectoredReadIntArray(reader, cases);
doTestVectoredReadIntArray(reader, cases);
}
}

private void doTestVectoredReadIntArray(RandomAccessReader reader, List<VectoredIntReadArrayCase> cases) throws IOException
{
long position = 0;
for (VectoredIntReadArrayCase testCase : cases)
{
int[][] readArray = testCase.createSizedArray();
long[] positions = testCase.computePositions(position);
reader.read(readArray, positions);

assertArrayEquals(testCase.expected, readArray);

position += testCase.totalBytes;
}

assertTrue(reader.isEOF());
assertEquals(0, reader.bytesRemaining());
}

/** A fake file channel that simply increments the position and doesn't
* actually read anything. We use it to simulate very large files, > 2G.
*/
Expand Down