Skip to content

Commit

Permalink
feat(s3stream): add a third party package moe.cnkirito.kdio (#654)
Browse files Browse the repository at this point in the history
Signed-off-by: Ning Yu <[email protected]>
  • Loading branch information
Chillax-0v0 authored and SCNieh committed Nov 17, 2023
1 parent 8f09da5 commit ee9e776
Show file tree
Hide file tree
Showing 9 changed files with 819 additions and 34 deletions.
2 changes: 2 additions & 0 deletions codecov.yml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ ignore:
- "**/metadata/database/dao/*"
- "**/*Exception*"
- "apache/rocketmq/controller/**/*"
- "**/thirdparty/**/*"
- "**/benchmark/**/*"

comment:
layout: "diff, components, tree"
Expand Down
29 changes: 18 additions & 11 deletions s3stream/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -50,17 +50,6 @@
<artifactId>bucket4j-core</artifactId>
<version>8.5.0</version>
</dependency>
<dependency>
<groupId>moe.cnkirito.kdio</groupId>
<artifactId>kdio-core</artifactId>
<version>1.0.0</version>
<exclusions>
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
Expand Down Expand Up @@ -100,6 +89,11 @@
<artifactId>argparse4j</artifactId>
<version>0.9.0</version>
</dependency>
<dependency>
<groupId>net.java.dev.jna</groupId>
<artifactId>jna</artifactId>
<version>5.2.0</version>
</dependency>
</dependencies>

<build>
Expand All @@ -118,6 +112,7 @@
<failsOnError>true</failsOnError>
<includeTestSourceDirectory>true</includeTestSourceDirectory>
<excludes>**/generated*/**/*</excludes>
<excludes>**/thirdparty/**/*</excludes>
</configuration>
<goals>
<goal>check</goal>
Expand Down Expand Up @@ -172,6 +167,18 @@
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>${maven.compiler.source}</source>
<target>${maven.compiler.target}</target>
<compilerArgs>
<arg>--add-exports=java.base/sun.nio.ch=ALL-UNNAMED</arg>
<arg>--add-exports=java.base/jdk.internal.access=ALL-UNNAMED</arg>
</compilerArgs>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,20 +17,16 @@

package com.automq.stream.s3.wal.util;

import com.automq.stream.thirdparty.moe.cnkirito.kdio.DirectIOLib;
import com.automq.stream.thirdparty.moe.cnkirito.kdio.DirectIOUtils;
import com.automq.stream.thirdparty.moe.cnkirito.kdio.DirectRandomAccessFile;
import io.netty.buffer.ByteBuf;
import moe.cnkirito.kdio.DirectIOLib;
import moe.cnkirito.kdio.DirectRandomAccessFile;

import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;

public class WALBlockDeviceChannel implements WALChannel {
private static final int BLOCK_SIZE = Integer.parseInt(System.getProperty(
"automq.ebswal.blocksize",
"4096"
));
private static final DirectIOLib DIRECT_IO_LIB = DirectIOLib.getLibForPath("/");
// TODO: move these to config
private static final int PREALLOCATED_BYTE_BUFFER_SIZE = Integer.parseInt(System.getProperty(
"automq.ebswal.preallocatedByteBufferSize",
Expand All @@ -43,31 +39,34 @@ public class WALBlockDeviceChannel implements WALChannel {

final String blockDevicePath;
final long capacityWant;
final DirectIOLib directIOLib;

long capacityFact = 0;

DirectRandomAccessFile randomAccessFile;

ThreadLocal<ByteBuffer> threadLocalByteBuffer = new ThreadLocal<ByteBuffer>() {
ThreadLocal<ByteBuffer> threadLocalByteBuffer = new ThreadLocal<>() {
@Override
protected ByteBuffer initialValue() {
return ByteBuffer.allocateDirect(PREALLOCATED_BYTE_BUFFER_SIZE);
return DirectIOUtils.allocateForDirectIO(directIOLib, PREALLOCATED_BYTE_BUFFER_SIZE);
}
};

public WALBlockDeviceChannel(String blockDevicePath, long blockDeviceCapacityWant) {
this.blockDevicePath = blockDevicePath;
this.capacityWant = blockDeviceCapacityWant;
DirectIOLib lib = DirectIOLib.getLibForPath(blockDevicePath);
if (null == lib || !DirectIOLib.binit) {
throw new RuntimeException("O_DIRECT not supported");
} else {
this.directIOLib = lib;
}
}

@Override
public void open() throws IOException {
if (DirectIOLib.binit) {
randomAccessFile = new DirectRandomAccessFile(new File(blockDevicePath), "rw");
capacityFact = randomAccessFile.length();
} else {
throw new RuntimeException("your system do not support direct io");
}
randomAccessFile = new DirectRandomAccessFile(new File(blockDevicePath), "rw");
capacityFact = randomAccessFile.length();
}

@Override
Expand All @@ -88,9 +87,11 @@ public long capacity() {
private void makeThreadLocalBytebufferMatchDirectIO(int inputBufferDirectIOAlignedSize) {
ByteBuffer byteBufferWrite = threadLocalByteBuffer.get();
if (inputBufferDirectIOAlignedSize > byteBufferWrite.capacity()) {
if (inputBufferDirectIOAlignedSize <= PREALLOCATED_BYTE_BUFFER_MAX_SIZE)
if (inputBufferDirectIOAlignedSize <= PREALLOCATED_BYTE_BUFFER_MAX_SIZE) {
threadLocalByteBuffer.set(ByteBuffer.allocateDirect(inputBufferDirectIOAlignedSize));
else throw new RuntimeException("too large write size");
} else {
throw new RuntimeException("too large write size");
}
}
}

Expand All @@ -107,18 +108,18 @@ public void write(ByteBuf buf, long position) throws IOException {
byteBufferWrite.position(0).limit(bufferDirectIOAlignedSize);

int remaining = byteBufferWrite.limit();
int writen = 0;
int written = 0;
do {
ByteBuffer slice = byteBufferWrite.slice().position(writen).limit(remaining);
ByteBuffer slice = byteBufferWrite.slice().position(written).limit(remaining);
// FIXME: make sure the position is aligned
int write = randomAccessFile.write(slice, position + writen);
int write = randomAccessFile.write(slice, position + written);
if (write == -1) {
throw new IOException("write -1");
} else if (write % BLOCK_SIZE != 0) {
} else if (write % WALUtil.BLOCK_SIZE != 0) {
// Should not happen. If it happens, it means that the system does not support direct IO
write -= write % BLOCK_SIZE;
write -= write % WALUtil.BLOCK_SIZE;
}
writen += write;
written += write;
remaining -= write;
} while (remaining > 0);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/**
* Copyright 2019 xujingfeng ([email protected])
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.automq.stream.thirdparty.moe.cnkirito.kdio;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.Channel;

public interface DirectChannel extends Channel {
/**
* Writes from the <tt>src</tt> buffer into this channel at <tt>position</tt>.
*
* @param src
* The {@link ByteBuffer} to write from
*
* @param position
* The position within the file at which to start writing
*
* @return How many bytes were written from <tt>src</tt> into the file
* @throws IOException
*/
int write(ByteBuffer src, long position) throws IOException;

/**
* Reads from this channel into the <tt>dst</tt> buffer from <tt>position</tt>.
*
* @param dst
* The {@link ByteBuffer} to read into
*
* @param position
* The position within the file at which to start reading
*
* @return How many bytes were placed into <tt>dst</tt>
* @throws IOException
*/
int read(ByteBuffer dst, long position) throws IOException;

/**
* @return The file size for this channel
*/
long size();

/**
* @return <tt>true</tt> if this channel is read only, <tt>false</tt> otherwise
*/
boolean isReadOnly();

/**
* Truncates this file's length to <tt>fileLength</tt>.
*
* @param fileLength The length to which to truncate
*
* @return This UnsafeByteAlignedChannel
*
* @throws IOException
*/
DirectChannel truncate(long fileLength) throws IOException;

/**
* @return The file descriptor for this channel
*/
int getFD();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
/**
* Copyright 2019 xujingfeng ([email protected])
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.automq.stream.thirdparty.moe.cnkirito.kdio;

import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.NonWritableChannelException;

public class DirectChannelImpl implements DirectChannel {
private final DirectIOLib lib;
private final int fd;
private boolean isOpen;
private long fileLength;
private final boolean isReadOnly;

private DirectChannelImpl(DirectIOLib lib, int fd, long fileLength, boolean readOnly) {
this.lib = lib;
this.fd = fd;
this.isOpen = true;
this.isReadOnly = readOnly;
this.fileLength = fileLength;
}

public static DirectChannel getChannel(File file, boolean readOnly) throws IOException {
DirectIOLib lib = DirectIOLib.getLibForPath(file.toString());
if (null == lib) {
throw new IOException("No DirectIOLib found for path " + file);
}
return getChannel(lib, file, readOnly);
}

public static DirectChannel getChannel(DirectIOLib lib, File file, boolean readOnly) throws IOException {
int fd = lib.oDirectOpen(file.toString(), readOnly);
long length = file.length();
return new DirectChannelImpl(lib, fd, length, readOnly);
}

private void ensureOpen() throws ClosedChannelException {
if (!isOpen()) {
throw new ClosedChannelException();
}
}

private void ensureWritable() {
if (isReadOnly()) {
throw new NonWritableChannelException();
}
}

@Override
public int read(ByteBuffer dst, long position) throws IOException {
ensureOpen();
return lib.pread(fd, dst, position);
}

@Override
public int write(ByteBuffer src, long position) throws IOException {
ensureOpen();
ensureWritable();
assert src.position() == lib.blockStart(src.position());

int written = lib.pwrite(fd, src, position);

// update file length if we wrote past it
fileLength = Math.max(position + written, fileLength);
return written;
}

@Override
public DirectChannel truncate(final long length) throws IOException {
ensureOpen();
ensureWritable();
if (DirectIOLib.ftruncate(fd, length) < 0) {
throw new IOException("Error during truncate on descriptor " + fd + ": " +
DirectIOLib.getLastError());
}
fileLength = length;
return this;
}

@Override
public long size() {
return fileLength;
}

@Override
public int getFD() {
return fd;
}


@Override
public boolean isOpen() {
return isOpen;
}

@Override
public boolean isReadOnly() {
return isReadOnly;
}

@Override
public void close() throws IOException {
if (!isOpen()) {
return;
}
isOpen = false;
if (lib.close(fd) < 0) {
throw new IOException("Error closing file with descriptor " + fd + ": " +
DirectIOLib.getLastError());
}
}
}
Loading

0 comments on commit ee9e776

Please sign in to comment.