From cbb8bb83c2a9afa6c42b8af1367d0d7916e971f3 Mon Sep 17 00:00:00 2001 From: Xavier Bai Date: Mon, 31 Jul 2023 12:13:38 +0800 Subject: [PATCH] [ARCTIC-1751] Update the memory map in spillable map when new value was existed for the key (#1752) * fix spillable map bug * fix test checkstyle * review * checkstyle --- core/pom.xml | 2 +- .../arctic/utils/map/SimpleSpillableMap.java | 8 +- .../utils/map/TestSimpleSpillableMap.java | 80 +++++++++++++++++-- 3 files changed, 81 insertions(+), 9 deletions(-) diff --git a/core/pom.xml b/core/pom.xml index 42c07f9c01..80ba37dbbc 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -33,7 +33,7 @@ https://arctic.netease.com - 5.17.2 + 7.10.2 2.24.0 0.16 diff --git a/core/src/main/java/com/netease/arctic/utils/map/SimpleSpillableMap.java b/core/src/main/java/com/netease/arctic/utils/map/SimpleSpillableMap.java index 7a29c6e3c2..7534e62576 100644 --- a/core/src/main/java/com/netease/arctic/utils/map/SimpleSpillableMap.java +++ b/core/src/main/java/com/netease/arctic/utils/map/SimpleSpillableMap.java @@ -111,6 +111,11 @@ public void put(K key, T value) { this.currentInMemoryMapSize = this.memoryMap.size() * this.estimatedPayloadSize; } + if (memoryMap.containsKey(key)) { + memoryMap.put(key, value); + return; + } + if (this.currentInMemoryMapSize < maxInMemorySizeInBytes) { if (memoryMap.put(key, value) == null) { currentInMemoryMapSize += this.estimatedPayloadSize; @@ -127,9 +132,8 @@ public void delete(K key) { if (memoryMap.containsKey(key)) { currentInMemoryMapSize -= estimatedPayloadSize; memoryMap.remove(key); - } else { - diskBasedMap.ifPresent(map -> map.delete(key)); } + diskBasedMap.ifPresent(map -> map.delete(key)); } public void close() { diff --git a/core/src/test/java/com/netease/arctic/utils/map/TestSimpleSpillableMap.java b/core/src/test/java/com/netease/arctic/utils/map/TestSimpleSpillableMap.java index 57d15d7f61..6cd4a5066c 100644 --- a/core/src/test/java/com/netease/arctic/utils/map/TestSimpleSpillableMap.java +++ b/core/src/test/java/com/netease/arctic/utils/map/TestSimpleSpillableMap.java @@ -1,6 +1,7 @@ package com.netease.arctic.utils.map; -import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; import org.junit.Assert; import org.junit.Before; import org.junit.Test; @@ -46,6 +47,71 @@ public void testSpillableMap() { map.close(); } + @Test + public void testSpillableMapConsistency() { + SimpleSpillableMap actualMap = new SimpleSpillableMap<>( + 5 * (keySize + valueSize), + null, + new DefaultSizeEstimator<>(), + new DefaultSizeEstimator<>()); + + Map expectedMap = Maps.newHashMap(); + for (int i = 0; i < 10; i++) { + Key key = new Key(); + Value value = new Value(); + expectedMap.put(key, value); + actualMap.put(key, value); + } + Assert.assertTrue(actualMap.getSizeOfFileOnDiskInBytes() > 0); + Assert.assertTrue(actualMap.getMemoryMapSize() < expectedMap.size()); + assertSimpleMaps(actualMap, expectedMap); + + // update new value + Sets.newHashSet(expectedMap.keySet()) + .forEach(k -> { + Value newValue = new Value(); + actualMap.put(k, newValue); + expectedMap.put(k, newValue); + assertSimpleMaps(actualMap, expectedMap); + }); + + Sets.newHashSet(expectedMap.keySet()) + .forEach(k -> { + actualMap.delete(k); + expectedMap.remove(k); + assertSimpleMaps(actualMap, expectedMap); + }); + } + + @Test + public void testSpillableMapRePut() { + SimpleSpillableMap actualMap = new SimpleSpillableMap<>( + (keySize + valueSize), + null, + new DefaultSizeEstimator<>(), + new DefaultSizeEstimator<>()); + + Key k1 = new Key(); + Value v1 = new Value(); + Key k2 = new Key(); + Value v2 = new Value(); + + actualMap.put(k1, v1); + actualMap.put(k2, v2); + + // remove k1 from memory + actualMap.delete(k1); + Assert.assertEquals(v2, actualMap.get(k2)); + // put a new value for k2 + Value v3 = new Value(); + actualMap.put(k2, v3); + Assert.assertEquals(v3, actualMap.get(k2)); + + actualMap.delete(k2); + // should not exist in memory or on disk + Assert.assertNull(actualMap.get(k2)); + } + private SimpleSpillableMap testMap(long expectMemorySize, int expectKeyCount) { SimpleSpillableMap actualMap = new SimpleSpillableMap<>(expectMemorySize * (keySize + valueSize), null, new DefaultSizeEstimator<>(), new DefaultSizeEstimator<>()); @@ -57,9 +123,7 @@ private SimpleSpillableMap testMap(long expectMemorySize, int expect expectedMap.put(key, value); actualMap.put(key, value); } - for (Key key : expectedMap.keySet()) { - Assert.assertEquals(expectedMap.get(key), actualMap.get(key)); - } + assertSimpleMaps(actualMap, expectedMap); Assert.assertEquals(expectMemorySize, actualMap.getMemoryMapSize()); Assert.assertEquals( expectMemorySize * (keySize + valueSize), @@ -67,11 +131,15 @@ private SimpleSpillableMap testMap(long expectMemorySize, int expect return actualMap; } + private void assertSimpleMaps(SimpleMap actualMap, Map expectedMap) { + for (K key : expectedMap.keySet()) { + Assert.assertEquals(expectedMap.get(key), actualMap.get(key)); + } + } + private static class Key implements Serializable { String id = UUID.randomUUID().toString(); - Long num = random.nextLong(); - @Override public int hashCode() { return id.hashCode();