Skip to content

Commit

Permalink
[ARCTIC-1751] Update the memory map in spillable map when new value w…
Browse files Browse the repository at this point in the history
…as existed for the key (apache#1752)

* fix spillable map bug

* fix test checkstyle

* review

* checkstyle
  • Loading branch information
XBaith authored Jul 31, 2023
1 parent 9ae4406 commit cbb8bb8
Show file tree
Hide file tree
Showing 3 changed files with 81 additions and 9 deletions.
2 changes: 1 addition & 1 deletion core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
<url>https://arctic.netease.com</url>

<properties>
<rocksdb.version>5.17.2</rocksdb.version>
<rocksdb.version>7.10.2</rocksdb.version>
<kryo.version>2.24.0</kryo.version>
<jol.version>0.16</jol.version>
</properties>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,11 @@ public void put(K key, T value) {
this.currentInMemoryMapSize = this.memoryMap.size() * this.estimatedPayloadSize;
}

if (memoryMap.containsKey(key)) {
memoryMap.put(key, value);
return;
}

if (this.currentInMemoryMapSize < maxInMemorySizeInBytes) {
if (memoryMap.put(key, value) == null) {
currentInMemoryMapSize += this.estimatedPayloadSize;
Expand All @@ -127,9 +132,8 @@ public void delete(K key) {
if (memoryMap.containsKey(key)) {
currentInMemoryMapSize -= estimatedPayloadSize;
memoryMap.remove(key);
} else {
diskBasedMap.ifPresent(map -> map.delete(key));
}
diskBasedMap.ifPresent(map -> map.delete(key));
}

public void close() {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.netease.arctic.utils.map;

import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
Expand Down Expand Up @@ -46,6 +47,71 @@ public void testSpillableMap() {
map.close();
}

@Test
public void testSpillableMapConsistency() {
SimpleSpillableMap<Key, Value> actualMap = new SimpleSpillableMap<>(
5 * (keySize + valueSize),
null,
new DefaultSizeEstimator<>(),
new DefaultSizeEstimator<>());

Map<Key, Value> expectedMap = Maps.newHashMap();
for (int i = 0; i < 10; i++) {
Key key = new Key();
Value value = new Value();
expectedMap.put(key, value);
actualMap.put(key, value);
}
Assert.assertTrue(actualMap.getSizeOfFileOnDiskInBytes() > 0);
Assert.assertTrue(actualMap.getMemoryMapSize() < expectedMap.size());
assertSimpleMaps(actualMap, expectedMap);

// update new value
Sets.newHashSet(expectedMap.keySet())
.forEach(k -> {
Value newValue = new Value();
actualMap.put(k, newValue);
expectedMap.put(k, newValue);
assertSimpleMaps(actualMap, expectedMap);
});

Sets.newHashSet(expectedMap.keySet())
.forEach(k -> {
actualMap.delete(k);
expectedMap.remove(k);
assertSimpleMaps(actualMap, expectedMap);
});
}

@Test
public void testSpillableMapRePut() {
SimpleSpillableMap<Key, Value> actualMap = new SimpleSpillableMap<>(
(keySize + valueSize),
null,
new DefaultSizeEstimator<>(),
new DefaultSizeEstimator<>());

Key k1 = new Key();
Value v1 = new Value();
Key k2 = new Key();
Value v2 = new Value();

actualMap.put(k1, v1);
actualMap.put(k2, v2);

// remove k1 from memory
actualMap.delete(k1);
Assert.assertEquals(v2, actualMap.get(k2));
// put a new value for k2
Value v3 = new Value();
actualMap.put(k2, v3);
Assert.assertEquals(v3, actualMap.get(k2));

actualMap.delete(k2);
// should not exist in memory or on disk
Assert.assertNull(actualMap.get(k2));
}

private SimpleSpillableMap<Key, Value> testMap(long expectMemorySize, int expectKeyCount) {
SimpleSpillableMap<Key, Value> actualMap = new SimpleSpillableMap<>(expectMemorySize * (keySize + valueSize),
null, new DefaultSizeEstimator<>(), new DefaultSizeEstimator<>());
Expand All @@ -57,21 +123,23 @@ private SimpleSpillableMap<Key, Value> testMap(long expectMemorySize, int expect
expectedMap.put(key, value);
actualMap.put(key, value);
}
for (Key key : expectedMap.keySet()) {
Assert.assertEquals(expectedMap.get(key), actualMap.get(key));
}
assertSimpleMaps(actualMap, expectedMap);
Assert.assertEquals(expectMemorySize, actualMap.getMemoryMapSize());
Assert.assertEquals(
expectMemorySize * (keySize + valueSize),
actualMap.getMemoryMapSpaceSize());
return actualMap;
}

private <K, V> void assertSimpleMaps(SimpleMap<K, V> actualMap, Map<K, V> expectedMap) {
for (K key : expectedMap.keySet()) {
Assert.assertEquals(expectedMap.get(key), actualMap.get(key));
}
}

private static class Key implements Serializable {
String id = UUID.randomUUID().toString();

Long num = random.nextLong();

@Override
public int hashCode() {
return id.hashCode();
Expand Down

0 comments on commit cbb8bb8

Please sign in to comment.