diff --git a/s3stream/pom.xml b/s3stream/pom.xml index 349be288f..451be8c40 100644 --- a/s3stream/pom.xml +++ b/s3stream/pom.xml @@ -167,6 +167,20 @@ + + org.apache.maven.plugins + maven-compiler-plugin + + ${maven.compiler.source} + ${maven.compiler.target} + + --add-exports + java.base/sun.nio.ch=ALL-UNNAMED + --add-exports + java.base/jdk.internal.access=ALL-UNNAMED + + + org.apache.maven.plugins maven-surefire-plugin diff --git a/s3stream/src/main/java/com/automq/stream/s3/wal/util/WALBlockDeviceChannel.java b/s3stream/src/main/java/com/automq/stream/s3/wal/util/WALBlockDeviceChannel.java index 4f190d0a1..3dc3805d5 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/wal/util/WALBlockDeviceChannel.java +++ b/s3stream/src/main/java/com/automq/stream/s3/wal/util/WALBlockDeviceChannel.java @@ -17,20 +17,16 @@ package com.automq.stream.s3.wal.util; +import com.automq.stream.thirdparty.moe.cnkirito.kdio.DirectIOLib; +import com.automq.stream.thirdparty.moe.cnkirito.kdio.DirectIOUtils; +import com.automq.stream.thirdparty.moe.cnkirito.kdio.DirectRandomAccessFile; import io.netty.buffer.ByteBuf; -import moe.cnkirito.kdio.DirectIOLib; -import moe.cnkirito.kdio.DirectRandomAccessFile; import java.io.File; import java.io.IOException; import java.nio.ByteBuffer; public class WALBlockDeviceChannel implements WALChannel { - private static final int BLOCK_SIZE = Integer.parseInt(System.getProperty( - "automq.ebswal.blocksize", - "4096" - )); - private static final DirectIOLib DIRECT_IO_LIB = DirectIOLib.getLibForPath("/"); // TODO: move these to config private static final int PREALLOCATED_BYTE_BUFFER_SIZE = Integer.parseInt(System.getProperty( "automq.ebswal.preallocatedByteBufferSize", @@ -43,31 +39,34 @@ public class WALBlockDeviceChannel implements WALChannel { final String blockDevicePath; final long capacityWant; + final DirectIOLib directIOLib; long capacityFact = 0; DirectRandomAccessFile randomAccessFile; - ThreadLocal threadLocalByteBuffer = new ThreadLocal() { + ThreadLocal threadLocalByteBuffer = new ThreadLocal<>() { @Override protected ByteBuffer initialValue() { - return ByteBuffer.allocateDirect(PREALLOCATED_BYTE_BUFFER_SIZE); + return DirectIOUtils.allocateForDirectIO(directIOLib, PREALLOCATED_BYTE_BUFFER_SIZE); } }; public WALBlockDeviceChannel(String blockDevicePath, long blockDeviceCapacityWant) { this.blockDevicePath = blockDevicePath; this.capacityWant = blockDeviceCapacityWant; + DirectIOLib lib = DirectIOLib.getLibForPath(blockDevicePath); + if (null == lib || !DirectIOLib.binit) { + throw new RuntimeException("O_DIRECT not supported"); + } else { + this.directIOLib = lib; + } } @Override public void open() throws IOException { - if (DirectIOLib.binit) { - randomAccessFile = new DirectRandomAccessFile(new File(blockDevicePath), "rw"); - capacityFact = randomAccessFile.length(); - } else { - throw new RuntimeException("your system do not support direct io"); - } + randomAccessFile = new DirectRandomAccessFile(new File(blockDevicePath), "rw"); + capacityFact = randomAccessFile.length(); } @Override @@ -88,9 +87,11 @@ public long capacity() { private void makeThreadLocalBytebufferMatchDirectIO(int inputBufferDirectIOAlignedSize) { ByteBuffer byteBufferWrite = threadLocalByteBuffer.get(); if (inputBufferDirectIOAlignedSize > byteBufferWrite.capacity()) { - if (inputBufferDirectIOAlignedSize <= PREALLOCATED_BYTE_BUFFER_MAX_SIZE) + if (inputBufferDirectIOAlignedSize <= PREALLOCATED_BYTE_BUFFER_MAX_SIZE) { threadLocalByteBuffer.set(ByteBuffer.allocateDirect(inputBufferDirectIOAlignedSize)); - else throw new RuntimeException("too large write size"); + } else { + throw new RuntimeException("too large write size"); + } } } @@ -107,18 +108,18 @@ public void write(ByteBuf buf, long position) throws IOException { byteBufferWrite.position(0).limit(bufferDirectIOAlignedSize); int remaining = byteBufferWrite.limit(); - int writen = 0; + int written = 0; do { - ByteBuffer slice = byteBufferWrite.slice().position(writen).limit(remaining); + ByteBuffer slice = byteBufferWrite.slice().position(written).limit(remaining); // FIXME: make sure the position is aligned - int write = randomAccessFile.write(slice, position + writen); + int write = randomAccessFile.write(slice, position + written); if (write == -1) { throw new IOException("write -1"); - } else if (write % BLOCK_SIZE != 0) { + } else if (write % WALUtil.BLOCK_SIZE != 0) { // Should not happen. If it happens, it means that the system does not support direct IO - write -= write % BLOCK_SIZE; + write -= write % WALUtil.BLOCK_SIZE; } - writen += write; + written += write; remaining -= write; } while (remaining > 0); }