diff --git a/fluss-common/src/main/java/com/alibaba/fluss/utils/BytesUtils.java b/fluss-common/src/main/java/com/alibaba/fluss/utils/BytesUtils.java index 6a609d869..db6d22212 100644 --- a/fluss-common/src/main/java/com/alibaba/fluss/utils/BytesUtils.java +++ b/fluss-common/src/main/java/com/alibaba/fluss/utils/BytesUtils.java @@ -17,8 +17,11 @@ package com.alibaba.fluss.utils; import com.alibaba.fluss.annotation.Internal; +import com.alibaba.fluss.utils.crc.Java; +import java.lang.reflect.Method; import java.nio.ByteBuffer; +import java.util.Arrays; /** Utils for bytes. */ @Internal @@ -58,4 +61,100 @@ public static byte[] toArray(ByteBuffer buffer, int offset, int size) { } return dest; } + + /** + * Check if the given first byte array ({@code prefix}) is a prefix of the second byte array + * ({@code bytes}). + * + * @param prefix the prefix byte array + * @param bytes the byte array to check if it has the prefix + * @return true if the given bytes has the given prefix, false otherwise. + */ + public static boolean prefixEquals(byte[] prefix, byte[] bytes) { + return BEST_EQUAL_COMPARER.prefixEquals(prefix, bytes); + } + + // ------------------------------------------------------------------------------------------- + + private static final BytesPrefixComparer BEST_EQUAL_COMPARER; + + static { + if (Java.IS_JAVA9_COMPATIBLE) { + BEST_EQUAL_COMPARER = new Java9BytesPrefixComparer(); + } else { + BEST_EQUAL_COMPARER = new PureJavaBytesPrefixComparer(); + } + } + + /** Compare two byte arrays for equality. */ + private interface BytesPrefixComparer { + + /** + * Check if the given first byte array ({@code prefix}) is a prefix of the second byte array + * ({@code bytes}). + * + * @param prefix The prefix byte array + * @param bytes The byte array to check if it has the prefix + * @return true if the given bytes has the given prefix, false otherwise + */ + boolean prefixEquals(byte[] prefix, byte[] bytes); + } + + private static final class Java9BytesPrefixComparer implements BytesPrefixComparer { + private static final Method EQUALS_METHOD; + + static { + try { + EQUALS_METHOD = + Class.forName(Arrays.class.getName()) + .getMethod( + "equals", + byte[].class, + int.class, + int.class, + byte[].class, + int.class, + int.class); + } catch (Exception e) { + throw new RuntimeException("Failed to load Arrays.equals method", e); + } + } + + @Override + public boolean prefixEquals(byte[] prefix, byte[] bytes) { + if (prefix.length > bytes.length) { + return false; + } + try { + int fromIndex = 0; // inclusive + int toIndex = prefix.length; // exclusive + return (boolean) + EQUALS_METHOD.invoke( + null, prefix, fromIndex, toIndex, bytes, fromIndex, toIndex); + } catch (Throwable e) { + // should never happen + throw new RuntimeException(e); + } + } + } + + /** + * A pure Java implementation of the {@link BytesPrefixComparer} that does not rely on Java 9 + * APIs and compares bytes one by one which is slower. + */ + private static final class PureJavaBytesPrefixComparer implements BytesPrefixComparer { + + @Override + public boolean prefixEquals(byte[] prefix, byte[] bytes) { + if (prefix.length > bytes.length) { + return false; + } + for (int i = 0; i < prefix.length; i++) { + if (prefix[i] != bytes[i]) { + return false; + } + } + return true; + } + } } diff --git a/fluss-server/src/main/java/com/alibaba/fluss/server/kv/rocksdb/RocksDBKv.java b/fluss-server/src/main/java/com/alibaba/fluss/server/kv/rocksdb/RocksDBKv.java index 7b3ad9bec..015bcd882 100644 --- a/fluss-server/src/main/java/com/alibaba/fluss/server/kv/rocksdb/RocksDBKv.java +++ b/fluss-server/src/main/java/com/alibaba/fluss/server/kv/rocksdb/RocksDBKv.java @@ -19,8 +19,8 @@ import com.alibaba.fluss.exception.FlussRuntimeException; import com.alibaba.fluss.rocksdb.RocksDBOperationUtils; import com.alibaba.fluss.server.utils.ResourceGuard; +import com.alibaba.fluss.utils.BytesUtils; import com.alibaba.fluss.utils.IOUtils; -import com.alibaba.fluss.utils.crc.Java; import org.rocksdb.ColumnFamilyHandle; import org.rocksdb.ColumnFamilyOptions; @@ -33,9 +33,7 @@ import javax.annotation.Nullable; import java.io.IOException; -import java.lang.reflect.Method; import java.util.ArrayList; -import java.util.Arrays; import java.util.List; /** A wrapper for the operation of {@link org.rocksdb.RocksDB}. */ @@ -53,8 +51,6 @@ public class RocksDBKv implements AutoCloseable { /** The write options to use in the states. We disable write ahead logging. */ private final WriteOptions writeOptions; - private final PrefixComparer prefixComparer; - /** * We are not using the default column family for KV ops, but we still need to remember this * handle so that we can close it properly when the kv is closed. Note that the one returned by @@ -79,7 +75,6 @@ public RocksDBKv( this.rocksDBResourceGuard = rocksDBResourceGuard; this.writeOptions = optionsContainer.getWriteOptions(); this.defaultColumnFamilyHandle = defaultColumnFamilyHandle; - this.prefixComparer = new PrefixComparer(); } public ResourceGuard getResourceGuard() { @@ -112,7 +107,7 @@ public List prefixLookup(byte[] prefixKey) { RocksIterator iterator = db.newIterator(defaultColumnFamilyHandle, readOptions); try { iterator.seek(prefixKey); - while (iterator.isValid() && prefixComparer.isPrefixEquals(prefixKey, iterator.key())) { + while (iterator.isValid() && BytesUtils.prefixEquals(prefixKey, iterator.key())) { pkList.add(iterator.value()); iterator.next(); } @@ -210,82 +205,4 @@ public void close() throws Exception { public RocksDB getDb() { return db; } - - private static class PrefixComparer { - private static final boolean isJava9OrAbove; - private static Method compareMethod; - - static { - if (Java.IS_JAVA9_COMPATIBLE) { - isJava9OrAbove = true; - try { - compareMethod = - Arrays.class.getMethod( - "compare", - byte[].class, - int.class, - int.class, - byte[].class, - int.class, - int.class); - } catch (NoSuchMethodException e) { - throw new RuntimeException("Failed to get compare method for byte arrays", e); - } - } else { - isJava9OrAbove = false; - } - } - - public boolean isPrefixEquals(byte[] prefix, byte[] bytes) { - if (isJava9OrAbove) { - return isPrefixEqualsJava9(prefix, bytes); - } else { - return isPrefixEqualsBelowJava9(prefix, bytes); - } - } - - /** - * Check if the given first byte array ({@code prefix}) is a prefix of the second byte array - * ({@code bytes}) upper java9. - * - * @param prefix The prefix byte array - * @param bytes The byte array to check if it has the prefix - * @return true if the given bytes has the given prefix, false otherwise - */ - private boolean isPrefixEqualsJava9(byte[] prefix, byte[] bytes) { - try { - if (prefix.length > bytes.length) { - return false; - } - - int result = - (int) - compareMethod.invoke( - null, prefix, 0, prefix.length, bytes, 0, prefix.length); - return result == 0; - } catch (Exception e) { - throw new RuntimeException("Error invoking compare method", e); - } - } - - /** - * Check if the given first byte array ({@code prefix}) is a prefix of the second byte array - * ({@code bytes}) below java9. - * - * @param prefix The prefix byte array - * @param bytes The byte array to check if it has the prefix - * @return true if the given bytes has the given prefix, false otherwise - */ - private static boolean isPrefixEqualsBelowJava9(byte[] prefix, byte[] bytes) { - if (prefix.length > bytes.length) { - return false; - } - for (int i = 0; i < prefix.length; i++) { - if (prefix[i] != bytes[i]) { - return false; - } - } - return true; - } - } }