diff --git a/codecov.yml b/codecov.yml index 794112ad1..5de294a26 100644 --- a/codecov.yml +++ b/codecov.yml @@ -30,6 +30,8 @@ ignore: - "**/metadata/database/dao/*" - "**/*Exception*" - "apache/rocketmq/controller/**/*" + - "**/thirdparty/**/*" + - "**/benchmark/**/*" comment: layout: "diff, components, tree" diff --git a/s3stream/pom.xml b/s3stream/pom.xml index 95c250a25..60593b802 100644 --- a/s3stream/pom.xml +++ b/s3stream/pom.xml @@ -50,17 +50,6 @@ bucket4j-core 8.5.0 - - moe.cnkirito.kdio - kdio-core - 1.0.0 - - - log4j - log4j - - - org.apache.commons commons-lang3 @@ -100,6 +89,11 @@ argparse4j 0.9.0 + + net.java.dev.jna + jna + 5.2.0 + @@ -118,6 +112,7 @@ true true **/generated*/**/* + **/thirdparty/**/* check @@ -172,6 +167,18 @@ + + org.apache.maven.plugins + maven-compiler-plugin + + ${maven.compiler.source} + ${maven.compiler.target} + + --add-exports=java.base/sun.nio.ch=ALL-UNNAMED + --add-exports=java.base/jdk.internal.access=ALL-UNNAMED + + + org.apache.maven.plugins maven-surefire-plugin diff --git a/s3stream/src/main/java/com/automq/stream/s3/wal/util/WALBlockDeviceChannel.java b/s3stream/src/main/java/com/automq/stream/s3/wal/util/WALBlockDeviceChannel.java index 4f190d0a1..3dc3805d5 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/wal/util/WALBlockDeviceChannel.java +++ b/s3stream/src/main/java/com/automq/stream/s3/wal/util/WALBlockDeviceChannel.java @@ -17,20 +17,16 @@ package com.automq.stream.s3.wal.util; +import com.automq.stream.thirdparty.moe.cnkirito.kdio.DirectIOLib; +import com.automq.stream.thirdparty.moe.cnkirito.kdio.DirectIOUtils; +import com.automq.stream.thirdparty.moe.cnkirito.kdio.DirectRandomAccessFile; import io.netty.buffer.ByteBuf; -import moe.cnkirito.kdio.DirectIOLib; -import moe.cnkirito.kdio.DirectRandomAccessFile; import java.io.File; import java.io.IOException; import java.nio.ByteBuffer; public class WALBlockDeviceChannel implements WALChannel { - private static final int BLOCK_SIZE = Integer.parseInt(System.getProperty( - "automq.ebswal.blocksize", - "4096" - )); - private static final DirectIOLib DIRECT_IO_LIB = DirectIOLib.getLibForPath("/"); // TODO: move these to config private static final int PREALLOCATED_BYTE_BUFFER_SIZE = Integer.parseInt(System.getProperty( "automq.ebswal.preallocatedByteBufferSize", @@ -43,31 +39,34 @@ public class WALBlockDeviceChannel implements WALChannel { final String blockDevicePath; final long capacityWant; + final DirectIOLib directIOLib; long capacityFact = 0; DirectRandomAccessFile randomAccessFile; - ThreadLocal threadLocalByteBuffer = new ThreadLocal() { + ThreadLocal threadLocalByteBuffer = new ThreadLocal<>() { @Override protected ByteBuffer initialValue() { - return ByteBuffer.allocateDirect(PREALLOCATED_BYTE_BUFFER_SIZE); + return DirectIOUtils.allocateForDirectIO(directIOLib, PREALLOCATED_BYTE_BUFFER_SIZE); } }; public WALBlockDeviceChannel(String blockDevicePath, long blockDeviceCapacityWant) { this.blockDevicePath = blockDevicePath; this.capacityWant = blockDeviceCapacityWant; + DirectIOLib lib = DirectIOLib.getLibForPath(blockDevicePath); + if (null == lib || !DirectIOLib.binit) { + throw new RuntimeException("O_DIRECT not supported"); + } else { + this.directIOLib = lib; + } } @Override public void open() throws IOException { - if (DirectIOLib.binit) { - randomAccessFile = new DirectRandomAccessFile(new File(blockDevicePath), "rw"); - capacityFact = randomAccessFile.length(); - } else { - throw new RuntimeException("your system do not support direct io"); - } + randomAccessFile = new DirectRandomAccessFile(new File(blockDevicePath), "rw"); + capacityFact = randomAccessFile.length(); } @Override @@ -88,9 +87,11 @@ public long capacity() { private void makeThreadLocalBytebufferMatchDirectIO(int inputBufferDirectIOAlignedSize) { ByteBuffer byteBufferWrite = threadLocalByteBuffer.get(); if (inputBufferDirectIOAlignedSize > byteBufferWrite.capacity()) { - if (inputBufferDirectIOAlignedSize <= PREALLOCATED_BYTE_BUFFER_MAX_SIZE) + if (inputBufferDirectIOAlignedSize <= PREALLOCATED_BYTE_BUFFER_MAX_SIZE) { threadLocalByteBuffer.set(ByteBuffer.allocateDirect(inputBufferDirectIOAlignedSize)); - else throw new RuntimeException("too large write size"); + } else { + throw new RuntimeException("too large write size"); + } } } @@ -107,18 +108,18 @@ public void write(ByteBuf buf, long position) throws IOException { byteBufferWrite.position(0).limit(bufferDirectIOAlignedSize); int remaining = byteBufferWrite.limit(); - int writen = 0; + int written = 0; do { - ByteBuffer slice = byteBufferWrite.slice().position(writen).limit(remaining); + ByteBuffer slice = byteBufferWrite.slice().position(written).limit(remaining); // FIXME: make sure the position is aligned - int write = randomAccessFile.write(slice, position + writen); + int write = randomAccessFile.write(slice, position + written); if (write == -1) { throw new IOException("write -1"); - } else if (write % BLOCK_SIZE != 0) { + } else if (write % WALUtil.BLOCK_SIZE != 0) { // Should not happen. If it happens, it means that the system does not support direct IO - write -= write % BLOCK_SIZE; + write -= write % WALUtil.BLOCK_SIZE; } - writen += write; + written += write; remaining -= write; } while (remaining > 0); } diff --git a/s3stream/src/main/java/com/automq/stream/thirdparty/moe/cnkirito/kdio/DirectChannel.java b/s3stream/src/main/java/com/automq/stream/thirdparty/moe/cnkirito/kdio/DirectChannel.java new file mode 100755 index 000000000..33e4c88f3 --- /dev/null +++ b/s3stream/src/main/java/com/automq/stream/thirdparty/moe/cnkirito/kdio/DirectChannel.java @@ -0,0 +1,76 @@ +/** + * Copyright 2019 xujingfeng (kirito.moe@foxmail.com) + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.automq.stream.thirdparty.moe.cnkirito.kdio; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.Channel; + +public interface DirectChannel extends Channel { + /** + * Writes from the src buffer into this channel at position. + * + * @param src + * The {@link ByteBuffer} to write from + * + * @param position + * The position within the file at which to start writing + * + * @return How many bytes were written from src into the file + * @throws IOException + */ + int write(ByteBuffer src, long position) throws IOException; + + /** + * Reads from this channel into the dst buffer from position. + * + * @param dst + * The {@link ByteBuffer} to read into + * + * @param position + * The position within the file at which to start reading + * + * @return How many bytes were placed into dst + * @throws IOException + */ + int read(ByteBuffer dst, long position) throws IOException; + + /** + * @return The file size for this channel + */ + long size(); + + /** + * @return true if this channel is read only, false otherwise + */ + boolean isReadOnly(); + + /** + * Truncates this file's length to fileLength. + * + * @param fileLength The length to which to truncate + * + * @return This UnsafeByteAlignedChannel + * + * @throws IOException + */ + DirectChannel truncate(long fileLength) throws IOException; + + /** + * @return The file descriptor for this channel + */ + int getFD(); +} diff --git a/s3stream/src/main/java/com/automq/stream/thirdparty/moe/cnkirito/kdio/DirectChannelImpl.java b/s3stream/src/main/java/com/automq/stream/thirdparty/moe/cnkirito/kdio/DirectChannelImpl.java new file mode 100755 index 000000000..7ba12863b --- /dev/null +++ b/s3stream/src/main/java/com/automq/stream/thirdparty/moe/cnkirito/kdio/DirectChannelImpl.java @@ -0,0 +1,128 @@ +/** + * Copyright 2019 xujingfeng (kirito.moe@foxmail.com) + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.automq.stream.thirdparty.moe.cnkirito.kdio; + +import java.io.File; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.ClosedChannelException; +import java.nio.channels.NonWritableChannelException; + +public class DirectChannelImpl implements DirectChannel { + private final DirectIOLib lib; + private final int fd; + private boolean isOpen; + private long fileLength; + private final boolean isReadOnly; + + private DirectChannelImpl(DirectIOLib lib, int fd, long fileLength, boolean readOnly) { + this.lib = lib; + this.fd = fd; + this.isOpen = true; + this.isReadOnly = readOnly; + this.fileLength = fileLength; + } + + public static DirectChannel getChannel(File file, boolean readOnly) throws IOException { + DirectIOLib lib = DirectIOLib.getLibForPath(file.toString()); + if (null == lib) { + throw new IOException("No DirectIOLib found for path " + file); + } + return getChannel(lib, file, readOnly); + } + + public static DirectChannel getChannel(DirectIOLib lib, File file, boolean readOnly) throws IOException { + int fd = lib.oDirectOpen(file.toString(), readOnly); + long length = file.length(); + return new DirectChannelImpl(lib, fd, length, readOnly); + } + + private void ensureOpen() throws ClosedChannelException { + if (!isOpen()) { + throw new ClosedChannelException(); + } + } + + private void ensureWritable() { + if (isReadOnly()) { + throw new NonWritableChannelException(); + } + } + + @Override + public int read(ByteBuffer dst, long position) throws IOException { + ensureOpen(); + return lib.pread(fd, dst, position); + } + + @Override + public int write(ByteBuffer src, long position) throws IOException { + ensureOpen(); + ensureWritable(); + assert src.position() == lib.blockStart(src.position()); + + int written = lib.pwrite(fd, src, position); + + // update file length if we wrote past it + fileLength = Math.max(position + written, fileLength); + return written; + } + + @Override + public DirectChannel truncate(final long length) throws IOException { + ensureOpen(); + ensureWritable(); + if (DirectIOLib.ftruncate(fd, length) < 0) { + throw new IOException("Error during truncate on descriptor " + fd + ": " + + DirectIOLib.getLastError()); + } + fileLength = length; + return this; + } + + @Override + public long size() { + return fileLength; + } + + @Override + public int getFD() { + return fd; + } + + + @Override + public boolean isOpen() { + return isOpen; + } + + @Override + public boolean isReadOnly() { + return isReadOnly; + } + + @Override + public void close() throws IOException { + if (!isOpen()) { + return; + } + isOpen = false; + if (lib.close(fd) < 0) { + throw new IOException("Error closing file with descriptor " + fd + ": " + + DirectIOLib.getLastError()); + } + } +} diff --git a/s3stream/src/main/java/com/automq/stream/thirdparty/moe/cnkirito/kdio/DirectIOLib.java b/s3stream/src/main/java/com/automq/stream/thirdparty/moe/cnkirito/kdio/DirectIOLib.java new file mode 100755 index 000000000..52f0d3cfa --- /dev/null +++ b/s3stream/src/main/java/com/automq/stream/thirdparty/moe/cnkirito/kdio/DirectIOLib.java @@ -0,0 +1,389 @@ +/** + * Copyright (C) 2014 Stephen Macke (smacke@cs.stanford.edu) + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.automq.stream.thirdparty.moe.cnkirito.kdio; + +import com.sun.jna.Native; +import com.sun.jna.NativeLong; +import com.sun.jna.Platform; +import com.sun.jna.Pointer; +import com.sun.jna.ptr.PointerByReference; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import sun.nio.ch.DirectBuffer; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; + +/** + * Class containing native hooks and utility methods for performing direct I/O, using + * the Linux O_DIRECT flag. + *

+ * This class is initialized at class load time, by registering JNA hooks into native methods. + * It also calculates Linux kernel version-dependent alignment amount (in bytes) for use with the O_DIRECT flag, + * when given a string for a file or directory. + */ +public class DirectIOLib { + private static final Logger logger = LoggerFactory.getLogger(DirectIOLib.class); + public static boolean binit; + static final int PC_REC_XFER_ALIGN = 0x11; + + static { + binit = false; + try { + if (!Platform.isLinux()) { + logger.warn("Not running Linux, jaydio support disabled"); + } else { // now check to see if we have O_DIRECT... + + final int linuxVersion = 0; + final int majorRev = 1; + final int minorRev = 2; + + List versionNumbers = new ArrayList(); + for (String v : System.getProperty("os.version").split("[.\\-]")) { + if (v.matches("\\d")) { + versionNumbers.add(Integer.parseInt(v)); + } + } + + /* From "man 2 open": + * + * O_DIRECT support was added under Linux in kernel version 2.4.10. Older Linux kernels simply ignore this flag. Some file systems may not implement + * the flag and open() will fail with EINVAL if it is used. + */ + + // test to see whether kernel version >= 2.4.10 + if (versionNumbers.get(linuxVersion) > 2) { + binit = true; + } else if (versionNumbers.get(linuxVersion) == 2) { + if (versionNumbers.get(majorRev) > 4) { + binit = true; + } else if (versionNumbers.get(majorRev) == 4 && versionNumbers.get(minorRev) >= 10) { + binit = true; + } + } + + if (binit) { + // get access to open(), pread(), etc + Native.register(Platform.C_LIBRARY_NAME); + } else { + logger.warn(String.format("O_DIRECT not supported on your version of Linux: %d.%d.%d", linuxVersion, majorRev, minorRev)); + } + } + } catch (Throwable e) { + logger.warn("Unable to register libc at class load time: " + e.getMessage(), e); + } + } + + private final int fsBlockSize; + private final long fsBlockNotMask; + + public DirectIOLib(int fsBlockSize) { + this.fsBlockSize = fsBlockSize; + this.fsBlockNotMask = -((long) fsBlockSize); + } + + + /** + * Static method to register JNA hooks for doing direct I/O + * + * @param workingDir A directory within the mounted file system on which we'll be working + * Should preferably BE the directory in which we'll be working. + */ + public static DirectIOLib getLibForPath(String workingDir) { + int fsBlockSize = initilizeSoftBlockSize(workingDir); + if (fsBlockSize == -1) { + logger.warn("O_DIRECT support non available on your version of Linux (" + System.getProperty("os.version") + "), " + + "please upgrade your kernel in order to use jaydio."); + return null; + } + return new DirectIOLib(fsBlockSize); + } + + /** + * Finds a block size for use with O_DIRECT. Choose it in the most paranoid + * way possible to maximize probability that things work. + * + * @param fileOrDir A file or directory within which O_DIRECT access will be performed. + */ + private static int initilizeSoftBlockSize(String fileOrDir) { + + int fsBlockSize = -1; + + if (binit) { + // get file system block size for use with workingDir + // see "man 3 posix_memalign" for why we do this + fsBlockSize = pathconf(fileOrDir, PC_REC_XFER_ALIGN); + /* conservative for version >= 2.6 + * "man 2 open": + * + * Under Linux 2.6, alignment + * to 512-byte boundaries suffices. + */ + + // Since O_DIRECT requires pages to be memory aligned with the file system block size, + // we will do this too in case the page size and the block size are different for + // whatever reason. By taking the least common multiple, everything should be happy: + int pageSize = getpagesize(); + fsBlockSize = lcm(fsBlockSize, pageSize); + + // just being completely paranoid: + // (512 is the rule for 2.6+ kernels as mentioned before) + fsBlockSize = lcm(fsBlockSize, 512); + + // lastly, a sanity check + if (fsBlockSize <= 0 || ((fsBlockSize & (fsBlockSize - 1)) != 0)) { + logger.warn("file system block size should be a power of two, was found to be " + fsBlockSize); + logger.warn("Disabling O_DIRECT support"); + return -1; + } + } + + return fsBlockSize; + } + + + // -- Java interfaces to native methods + + /** + * Hooks into errno using Native.getLastError(), and parses it with native strerror function. + * + * @return An error message corresponding to the last errno + */ + public static String getLastError() { + return strerror(Native.getLastError()); + } + + /** + * Static variant of {@link #blockEnd(int)}. + * + * @param blockSize + * @param position + * @return The smallest number greater than or equal to position + * which is a multiple of the blockSize + */ + public static long blockEnd(int blockSize, long position) { + long ceil = (position + blockSize - 1) / blockSize; + return ceil * blockSize; + } + + /** + * Euclid's algo for gcd is more general than we need + * since we only have powers of 2, but w/e + * + * @param x + * @param y + * @return The least common multiple of x and y + */ + public static int lcm(long x, long y) { + // will hold gcd + long g = x; + long yc = y; + + // get the gcd first + while (yc != 0) { + long t = g; + g = yc; + yc = t % yc; + } + + return (int) (x * y / g); + } + + /** + * Given a pointer-to-pointer memptr, sets the dereferenced value to point to the start + * of an allocated block of size bytes, where the starting address is a multiple of + * alignment. It is guaranteed that the block may be freed by calling @{link {@link #free(Pointer)} + * on the starting address. See "man 3 posix_memalign". + * + * @param memptr The pointer-to-pointer which will point to the address of the allocated aligned block + * @param alignment The alignment multiple of the starting address of the allocated block + * @param size The number of bytes to allocate + * @return 0 on success, one of the C error codes on failure. + */ + public static native int posix_memalign(PointerByReference memptr, NativeLong alignment, NativeLong size); + + + // -- alignment logic utility methods + + /** + * See "man 3 free". + * + * @param ptr The pointer to the hunk of memory which needs freeing + */ + public static native void free(Pointer ptr); + + public static native int ftruncate(int fd, long length); + + private static native NativeLong pwrite(int fd, Pointer buf, NativeLong count, NativeLong offset); + + private static native NativeLong pread(int fd, Pointer buf, NativeLong count, NativeLong offset); + + private static native int open(String pathname, int flags); + + private static native int open(String pathname, int flags, int mode); + + private static native int getpagesize(); + + private static native int pathconf(String path, int name); + + private static native String strerror(int errnum); + + /** + * Interface into native pread function. Always reads an entire buffer, + * unlike {@link #pwrite(int, ByteBuffer, long) pwrite()} which uses buffer state + * to determine how much of buffer to write. + * + * @param fd A file discriptor to pass to native pread + * @param buf The direct buffer into which to record the file read + * @param offset The file offset at which to read + * @return The number of bytes successfully read from the file + * @throws IOException + */ + public int pread(int fd, ByteBuffer buf, long offset) throws IOException { + buf.clear(); // so that we read an entire buffer + final long address = ((DirectBuffer) buf).address(); + Pointer pointer = new Pointer(address); + int n = pread(fd, pointer, new NativeLong(buf.capacity()), new NativeLong(offset)).intValue(); + if (n < 0) { + throw new IOException("error reading file at offset " + offset + ": " + getLastError()); + } + buf.position(n); + return n; + } + + /** + * Interface into native pwrite function. Writes bytes corresponding to the nearest file + * system block boundaries between buf.position() and buf.limit(). + * + * @param fd A file descriptor to pass to native pwrite + * @param buf The direct buffer from which to write + * @param offset The file offset at which to write + * @return The number of bytes successfully written to the file + * @throws IOException + */ + public int pwrite(int fd, ByteBuffer buf, long offset) throws IOException { + + // must always write to end of current block + // To handle writes past the logical file size, + // we will later truncate. + final int start = buf.position(); + assert start == blockStart(start); + final int toWrite = blockEnd(buf.limit()) - start; + + final long address = ((DirectBuffer) buf).address(); + Pointer pointer = new Pointer(address); + + int n = pwrite(fd, pointer.share(start), new NativeLong(toWrite), new NativeLong(offset)).intValue(); + if (n < 0) { + throw new IOException("error writing file at offset " + offset + ": " + getLastError()); + } + return n; + } + + // -- more native function hooks -- + + /** + * Use the open Linux system call and pass in the O_DIRECT flag. + * Currently the only other flags passed in are O_RDONLY if readOnly + * is true, and (if not) O_RDWR and O_CREAT. + * + * @param pathname The path to the file to open. If file does not exist and we are opening + * with readOnly, this will throw an error. Otherwise, if it does + * not exist but we have readOnly set to false, create the file. + * @param readOnly Whether to pass in O_RDONLY + * @return An integer file descriptor for the opened file + */ + public int oDirectOpen(String pathname, boolean readOnly) throws IOException { + int flags = OpenFlags.O_DIRECT; + if (readOnly) { + flags |= OpenFlags.O_RDONLY; + } else { + flags |= OpenFlags.O_RDWR | OpenFlags.O_CREAT; + } + int fd = open(pathname, flags, 00644); + if (fd < 0) { + throw new IOException("Error opening " + pathname + ", got " + getLastError()); + } + return fd; + } + + /** + * @return The soft block size for use with transfer multiples + * and memory alignment multiples + */ + public int blockSize() { + return fsBlockSize; + } + + /** + * Returns the default buffer size for file channels doing O_DIRECT + * I/O. By default this is equal to the block size. + * + * @return The default buffer size + */ + public int defaultBufferSize() { + return fsBlockSize; + } + + /** + * Given value, find the largest number less than or equal + * to value which is a multiple of the fs block size. + * + * @param value + * @return The largest number less than or equal to value + * which is a multiple of the soft block size + */ + public long blockStart(long value) { + return value & fsBlockNotMask; + } + + /** + * @see #blockStart(long) + */ + public int blockStart(int value) { + return (int) (value & fsBlockNotMask); + } + + /** + * Given value, find the smallest number greater than or equal + * to value which is a multiple of the fs block size. + * + * @param value + * @return The smallest number greater than or equal to value + * which is a multiple of the soft block size + */ + public long blockEnd(long value) { + return (value + fsBlockSize - 1) & fsBlockNotMask; + } + + /** + * @see #blockEnd(long) + */ + public int blockEnd(int value) { + return (int) ((value + fsBlockSize - 1) & fsBlockNotMask); + } + + /** + * See "man 2 close" + * + * @param fd The file descriptor of the file to close + * @return 0 on success, -1 on error + */ + public native int close(int fd); // mustn't forget to do this + +} diff --git a/s3stream/src/main/java/com/automq/stream/thirdparty/moe/cnkirito/kdio/DirectIOUtils.java b/s3stream/src/main/java/com/automq/stream/thirdparty/moe/cnkirito/kdio/DirectIOUtils.java new file mode 100755 index 000000000..60bf15e6a --- /dev/null +++ b/s3stream/src/main/java/com/automq/stream/thirdparty/moe/cnkirito/kdio/DirectIOUtils.java @@ -0,0 +1,64 @@ +/** + * Copyright 2019 xujingfeng (kirito.moe@foxmail.com) + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.automq.stream.thirdparty.moe.cnkirito.kdio; + +import com.sun.jna.NativeLong; +import com.sun.jna.Pointer; +import com.sun.jna.ptr.PointerByReference; +import jdk.internal.access.SharedSecrets; + +import java.nio.ByteBuffer; +import java.nio.ByteOrder; + +public class DirectIOUtils { + public static final ByteOrder NATIVE_BYTE_ORDER = ByteOrder.nativeOrder(); + + /** + * Allocate capacity bytes of native memory for use as a buffer, and + * return a {@link ByteBuffer} which gives an interface to this memory. The + * memory is allocated with + * {@link DirectIOLib#posix_memalign(PointerByReference, NativeLong, NativeLong) DirectIOLib#posix_memalign()} + * to ensure that the buffer can be used with O_DIRECT. + * * + * + * @param capacity The requested number of bytes to allocate + * @return A new JnaMemAlignedBuffer of capacity bytes aligned in native memory. + */ + public static ByteBuffer allocateForDirectIO(DirectIOLib lib, int capacity) { + if (capacity % lib.blockSize() > 0) { + throw new IllegalArgumentException("Capacity (" + capacity + ") must be a multiple" + + "of the block size (" + lib.blockSize() + ")"); + } + NativeLong blockSize = new NativeLong(lib.blockSize()); + PointerByReference pointerToPointer = new PointerByReference(); + + // align memory for use with O_DIRECT + DirectIOLib.posix_memalign(pointerToPointer, blockSize, new NativeLong(capacity)); + return wrapPointer(Pointer.nativeValue(pointerToPointer.getValue()), capacity); + } + + /** + * @param ptr Pointer to wrap. + * @param len Memory location length. + * @return Byte buffer wrapping the given memory. + */ + public static ByteBuffer wrapPointer(long ptr, int len) { + ByteBuffer buf = SharedSecrets.getJavaNioAccess().newDirectByteBuffer(ptr, len, null, null); + + assert buf.isDirect(); + return buf; + } +} diff --git a/s3stream/src/main/java/com/automq/stream/thirdparty/moe/cnkirito/kdio/DirectRandomAccessFile.java b/s3stream/src/main/java/com/automq/stream/thirdparty/moe/cnkirito/kdio/DirectRandomAccessFile.java new file mode 100755 index 000000000..aabad015a --- /dev/null +++ b/s3stream/src/main/java/com/automq/stream/thirdparty/moe/cnkirito/kdio/DirectRandomAccessFile.java @@ -0,0 +1,86 @@ +/** + * Copyright (C) 2014 Stephen Macke (smacke@cs.stanford.edu) + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.automq.stream.thirdparty.moe.cnkirito.kdio; + +import java.io.Closeable; +import java.io.File; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.RandomAccessFile; +import java.nio.ByteBuffer; + +/** + * Class to emulate the behavior of {@link RandomAccessFile}, but using direct I/O. + * + */ +public class DirectRandomAccessFile implements Closeable { + + private final DirectChannel channel; + + + /** + * @param file The file to open + * + * @param mode Either "rw" or "r", depending on whether this file is read only + * + * @throws IOException + */ + public DirectRandomAccessFile(File file, String mode) + throws IOException { + + boolean readOnly = false; + if ("r".equals(mode)) { + readOnly = true; + } else if (!"rw".equals(mode)) { + throw new IllegalArgumentException("only r and rw modes supported"); + } + + if (readOnly && !file.isFile()) { + throw new FileNotFoundException("couldn't find file " + file); + } + + this.channel = DirectChannelImpl.getChannel(file, readOnly); + } + + @Override + public void close() throws IOException { + channel.close(); + } + + + public int write(ByteBuffer src, long position) throws IOException { + return channel.write(src, position); + } + + public int read(ByteBuffer dst, long position) throws IOException { + return channel.read(dst, position); + } + + /** + * @return The current position in the file + */ + public long getFilePointer() { + return channel.getFD(); + } + + /** + * @return The current length of the file + */ + public long length() { + return channel.size(); + } + +} diff --git a/s3stream/src/main/java/com/automq/stream/thirdparty/moe/cnkirito/kdio/OpenFlags.java b/s3stream/src/main/java/com/automq/stream/thirdparty/moe/cnkirito/kdio/OpenFlags.java new file mode 100755 index 000000000..08671a741 --- /dev/null +++ b/s3stream/src/main/java/com/automq/stream/thirdparty/moe/cnkirito/kdio/OpenFlags.java @@ -0,0 +1,32 @@ +/** + * Copyright (C) 2014 Stephen Macke (smacke@cs.stanford.edu) + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.automq.stream.thirdparty.moe.cnkirito.kdio; + +/** + * Constants for {@link DirectIOLib#oDirectOpen(String, boolean)}. + */ +public final class OpenFlags { + public static final int O_RDONLY = 00; + public static final int O_WRONLY = 01; + public static final int O_RDWR = 02; + public static final int O_CREAT = 0100; + public static final int O_TRUNC = 01000; + public static final int O_DIRECT = 040000; + public static final int O_SYNC = 04000000; + + private OpenFlags() { + } +}