Skip to content

Commit

Permalink
[HUDI-6850] Add tests and docs for ported Bloom Filter classes (apach…
Browse files Browse the repository at this point in the history
  • Loading branch information
yihua committed Feb 26, 2024
1 parent 232255e commit d0e98e1
Show file tree
Hide file tree
Showing 21 changed files with 345 additions and 18 deletions.
15 changes: 14 additions & 1 deletion LICENSE
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,20 @@ This product includes code from Apache Hadoop

* org.apache.hudi.common.bloom.InternalDynamicBloomFilter.java adapted from org.apache.hadoop.util.bloom.DynamicBloomFilter.java

* org.apache.hudi.common.bloom.InternalFilter copied from classes in org.apache.hadoop.util.bloom package
* org.apache.hudi.common.bloom.InternalFilter.java adapted from org.apache.hadoop.util.bloom.Filter.java
and org.apache.hadoop.io.Writable.java

* org.apache.hudi.common.bloom.InternalBloomFilter adapted from org.apache.hadoop.util.bloom.BloomFilter.java

* org.apache.hudi.common.bloom.Key.java adapted from org.apache.hadoop.util.bloom.Key.java

* org.apache.hudi.common.bloom.HashFunction.java ported from org.apache.hadoop.util.bloom.HashFunction.java

* org.apache.hudi.common.util.hash.Hash.java ported from org.apache.hadoop.util.hash.Hash.java

* org.apache.hudi.common.util.hash.JenkinsHash.java ported from org.apache.hadoop.util.hash.JenkinsHash.java

* org.apache.hudi.common.util.hash.MurmurHash.java ported from org.apache.hadoop.util.hash.MurmurHash.java

with the following license

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,51 @@
* specific language governing permissions and limitations
* under the License.
*/
/**
* Copyright (c) 2005, European Commission project OneLab under contract 034819
* (http://www.one-lab.org)
* <p>
* All rights reserved.
* Redistribution and use in source and binary forms, with or
* without modification, are permitted provided that the following
* conditions are met:
* - Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* - Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in
* the documentation and/or other materials provided with the distribution.
* - Neither the name of the University Catholique de Louvain - UCL
* nor the names of its contributors may be used to endorse or
* promote products derived from this software without specific prior
* written permission.
* <p>
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
* FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
* COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
* INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
* BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
* LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
* CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
* LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
* ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*/

package org.apache.hudi.common.bloom;

import org.apache.hudi.common.util.hash.Hash;

/**
* Implements a hash object that returns a certain number of hashed values.
* <p>
* The code in class is ported from {@link org.apache.hadoop.util.bloom.HashFunction} in Apache Hadoop.
*
* @see Key The general behavior of a key being stored in a bloom filter
* @see InternalBloomFilter The general behavior of a bloom filter
*/
public class HashFunction {
public final class HashFunction {
/**
* The number of hashed values.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,9 @@
* Implements a <i>Bloom filter</i>, as defined by Bloom in 1970.
* <p>
* The code in class is adapted from {@link org.apache.hadoop.util.bloom.BloomFilter} in Apache Hadoop.
* The serialization and deserialization are completely the same as and compatible with Hadoop's
* {@link org.apache.hadoop.util.bloom.BloomFilter}, so that this class correctly reads bloom
* filters serialized by older Hudi versions using Hadoop's BloomFilter.
* <p>
* Hudi serializes bloom filter(s) and write them to Parquet file footers and metadata table's
* bloom filter partition containing bloom filters for all data files. We want to maintain the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,20 @@
import java.util.List;

/**
* Ported from {@link org.apache.hadoop.util.bloom.Filter}.
* Defines the general behavior of a filter.
* <p>
* The code in class is adapted from {@link org.apache.hadoop.util.bloom.Filter} in Apache Hadoop.
* <p>
* A filter is a data structure which aims at offering a lossy summary of a set <code>A</code>. The
* key idea is to map entries of <code>A</code> (also called <i>keys</i>) into several positions
* in a vector through the use of several hash functions.
* <p>
* Typically, a filter will be implemented as a Bloom filter (or a Bloom filter extension).
* <p>
* It must be extended in order to define the real behavior.
*
* @see Key The general behavior of a key
* @see HashFunction A hash function
*/
abstract class InternalFilter {
private static final int VERSION = -1; // negative to accommodate for old format
Expand Down Expand Up @@ -160,13 +173,28 @@ public void add(Key[] keys) {
}
} //end add()

/**
* Serialize the fields of this object to <code>out</code>.
*
* @param out <code>DataOuput</code> to serialize this object into.
* @throws IOException
*/
public void write(DataOutput out) throws IOException {
out.writeInt(VERSION);
out.writeInt(this.nbHash);
out.writeByte(this.hashType);
out.writeInt(this.vectorSize);
}

/**
* Deserialize the fields of this object from <code>in</code>.
*
* <p>For efficiency, implementations should attempt to re-use storage in the
* existing object where possible.</p>
*
* @param in <code>DataInput</code> to deseriablize this object from.
* @throws IOException
*/
public void readFields(DataInput in) throws IOException {
int ver = in.readInt();
if (ver > 0) { // old non-versioned format
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,12 @@

/**
* The general behavior of a key that must be stored in a bloom filter.
* <p>
* The code in class is adapted from {@link org.apache.hadoop.util.bloom.Key} in Apache Hadoop.
*
* @see InternalBloomFilter The general behavior of a bloom filter and how the key is used.
*/
public final class Key implements Comparable<Key> {
public class Key implements Comparable<Key> {
/**
* Byte value of key
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
/**
* This class represents a common API for hashing functions used by
* {@link InternalBloomFilter}.
* <p>
* The code in class is ported from {@link org.apache.hadoop.util.hash.Hash} in Apache Hadoop.
*/
public abstract class Hash {
/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@

/**
* Produces 32-bit hash for hash table lookup.
*
* <p>
* The code in class is ported from {@link org.apache.hadoop.util.hash.JenkinsHash} in Apache Hadoop.
* <p>
* <pre>lookup3.c, by Bob Jenkins, May 2006, Public Domain.
*
* You can use this free for any purpose. It's in the public domain.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@
/**
* This is a very fast, non-cryptographic hash suitable for general hash-based
* lookup. See http://murmurhash.googlepages.com/ for more details.
*
* <p>
* The code in class is ported from {@link org.apache.hadoop.util.hash.MurmurHash} in Apache Hadoop.
* <p>
* <p>The C version of MurmurHash 2.0 found at that site was ported
* to Java by Andrzej Bialecki (ab at getopt org).</p>
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,21 @@

package org.apache.hudi.common.bloom;

import org.apache.hudi.common.util.hash.Hash;

import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.UUID;
import java.util.stream.Collectors;

import static org.apache.hudi.common.testutils.FileSystemTestUtils.readLastLineFromResourceFile;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;

/**
Expand Down Expand Up @@ -92,11 +98,75 @@ public void testSerialize(String typeCode) {
}
}

public static List<Arguments> bloomFilterParams() {
return Arrays.asList(
Arguments.of("hadoop", BloomFilterTypeCode.SIMPLE.name(), 200, 0.000001, Hash.MURMUR_HASH, -1),
Arguments.of("hadoop", BloomFilterTypeCode.SIMPLE.name(), 1000, 0.000001, Hash.MURMUR_HASH, -1),
Arguments.of("hadoop", BloomFilterTypeCode.SIMPLE.name(), 5000, 0.000001, Hash.MURMUR_HASH, -1),
Arguments.of("hadoop", BloomFilterTypeCode.SIMPLE.name(), 10000, 0.000001, Hash.MURMUR_HASH, -1),
Arguments.of("hadoop", BloomFilterTypeCode.SIMPLE.name(), 5000, 0.000001, Hash.JENKINS_HASH, -1),
Arguments.of("hadoop", BloomFilterTypeCode.DYNAMIC_V0.name(), 200, 0.000001, Hash.MURMUR_HASH, 1000),
Arguments.of("hadoop", BloomFilterTypeCode.DYNAMIC_V0.name(), 1000, 0.000001, Hash.MURMUR_HASH, 5000),
Arguments.of("hadoop", BloomFilterTypeCode.DYNAMIC_V0.name(), 1000, 0.000001, Hash.JENKINS_HASH, 5000),
Arguments.of("hudi", BloomFilterTypeCode.SIMPLE.name(), 1000, 0.000001, Hash.MURMUR_HASH, -1),
Arguments.of("hudi", BloomFilterTypeCode.SIMPLE.name(), 5000, 0.000001, Hash.MURMUR_HASH, -1),
Arguments.of("hudi", BloomFilterTypeCode.DYNAMIC_V0.name(), 1000, 0.000001, Hash.MURMUR_HASH, 5000)
);
}

@ParameterizedTest
@MethodSource("bloomFilterParams")
public void testDeserialize(String lib, String typeCode, int numEntries,
double errorRate, int hashType, int maxEntries) throws IOException {
// When the "lib" = "hadoop", this tests the backwards compatibility so that Hudi's
// {@link InternalBloomFilter} correctly reads the bloom filters serialized by Hadoop
List<String> keyList = Arrays.stream(
readLastLineFromResourceFile("/format/bloom-filter/hadoop/all_10000.keys.data").split(","))
.collect(Collectors.toList());
String serializedFilter;
if ("hadoop".equals(lib)) {
String fileName = (BloomFilterTypeCode.DYNAMIC_V0.name().equals(typeCode) ? "dynamic" : "simple")
+ "_" + numEntries
+ "_000001_"
+ (hashType == Hash.MURMUR_HASH ? "murmur" : "jenkins")
+ (BloomFilterTypeCode.DYNAMIC_V0.name().equals(typeCode) ? "_" + maxEntries : "")
+ ".bf.data";
serializedFilter = readLastLineFromResourceFile("/format/bloom-filter/hadoop/" + fileName);
} else {
BloomFilter inputFilter = getBloomFilter(typeCode, numEntries, errorRate, maxEntries);
for (String key : keyList) {
inputFilter.add(key);
}
serializedFilter = inputFilter.serializeToString();
}
validateBloomFilter(
serializedFilter, keyList, lib, typeCode, numEntries, errorRate, hashType, maxEntries);
}

BloomFilter getBloomFilter(String typeCode, int numEntries, double errorRate, int maxEntries) {
if (typeCode.equalsIgnoreCase(BloomFilterTypeCode.SIMPLE.name())) {
return BloomFilterFactory.createBloomFilter(numEntries, errorRate, -1, typeCode);
} else {
return BloomFilterFactory.createBloomFilter(numEntries, errorRate, maxEntries, typeCode);
}
}

private void validateBloomFilter(String serializedFilter, List<String> keyList, String lib,
String typeCode, int numEntries, double errorRate,
int hashType, int maxEntries) {
BloomFilter bloomFilter = BloomFilterFactory
.fromString(serializedFilter, typeCode);
for (String key : keyList) {
assertTrue(bloomFilter.mightContain(key), "Filter should have returned true for " + key);
}
if ("hadoop".equals(lib) && hashType == Hash.MURMUR_HASH) {
BloomFilter hudiBloomFilter = getBloomFilter(typeCode, numEntries, errorRate, maxEntries);
for (String key : keyList) {
hudiBloomFilter.add(key);
}
// Hadoop library-serialized bloom filter should be exactly the same as Hudi one,
// unless we made our customization in the future
assertEquals(hudiBloomFilter.serializeToString(), serializedFilter);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,10 @@

package org.apache.hudi.common.table.log;

import org.apache.hudi.common.util.FileIOUtils;

import org.junit.jupiter.api.Test;
import org.roaringbitmap.longlong.Roaring64NavigableMap;

import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
Expand All @@ -35,6 +32,7 @@
import java.util.Set;
import java.util.stream.Collectors;

import static org.apache.hudi.common.testutils.FileSystemTestUtils.readLastLineFromResourceFile;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;

Expand Down Expand Up @@ -92,11 +90,4 @@ public static void assertPositionEquals(List<Long> expectedPositions,
assertFalse(expectedIterator.hasNext());
assertFalse(iterator.hasNext());
}

private String readLastLineFromResourceFile(String resourceName) throws IOException {
try (InputStream inputStream = TestLogReaderUtils.class.getResourceAsStream(resourceName)) {
List<String> lines = FileIOUtils.readAsUTFStringLines(inputStream);
return lines.get(lines.size() - 1);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import org.apache.hudi.common.fs.inline.InLineFSUtils;
import org.apache.hudi.common.fs.inline.InLineFileSystem;
import org.apache.hudi.common.fs.inline.InMemoryFileSystem;
import org.apache.hudi.common.table.log.TestLogReaderUtils;
import org.apache.hudi.common.util.FileIOUtils;

import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
Expand All @@ -30,6 +32,7 @@

import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
Expand Down Expand Up @@ -86,4 +89,11 @@ public static List<FileStatus> listFiles(FileSystem fs, Path path, boolean recur
}
return statuses;
}

public static String readLastLineFromResourceFile(String resourceName) throws IOException {
try (InputStream inputStream = TestLogReaderUtils.class.getResourceAsStream(resourceName)) {
List<String> lines = FileIOUtils.readAsUTFStringLines(inputStream);
return lines.get(lines.size() - 1);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

package org.apache.hudi.common.testutils;

import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hudi.common.fs.HoodieWrapperFileSystem;
import org.apache.hudi.common.model.HoodieAvroPayload;
import org.apache.hudi.common.model.HoodieFileFormat;
Expand All @@ -34,6 +33,8 @@
import com.esotericsoftware.kryo.io.Output;
import com.esotericsoftware.kryo.serializers.JavaSerializer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.junit.jupiter.api.Assumptions;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
Expand All @@ -44,7 +45,6 @@
import java.util.Objects;
import java.util.Properties;
import java.util.UUID;
import org.junit.jupiter.api.Assumptions;

/**
* A utility class for testing.
Expand Down

Large diffs are not rendered by default.

Large diffs are not rendered by default.

Large diffs are not rendered by default.

Loading

0 comments on commit d0e98e1

Please sign in to comment.