From 6ca011e21905c432b0806a506766183938c1d87e Mon Sep 17 00:00:00 2001 From: PengFei Li Date: Wed, 23 Aug 2023 19:32:54 +0800 Subject: [PATCH] [Bugfix] Fix NPE caused by concurrent access (#270) Signed-off-by: PengFei Li --- .../data/load/stream/v2/StreamLoadManagerV2.java | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/starrocks-stream-load-sdk/src/main/java/com/starrocks/data/load/stream/v2/StreamLoadManagerV2.java b/starrocks-stream-load-sdk/src/main/java/com/starrocks/data/load/stream/v2/StreamLoadManagerV2.java index f18db987..7663dfeb 100644 --- a/starrocks-stream-load-sdk/src/main/java/com/starrocks/data/load/stream/v2/StreamLoadManagerV2.java +++ b/starrocks-stream-load-sdk/src/main/java/com/starrocks/data/load/stream/v2/StreamLoadManagerV2.java @@ -35,10 +35,10 @@ import java.io.Serializable; import java.nio.charset.StandardCharsets; -import java.util.HashMap; -import java.util.LinkedList; import java.util.Map; import java.util.Queue; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; @@ -84,7 +84,7 @@ enum State { private final long maxCacheBytes; // threshold to block write private final long maxWriteBlockCacheBytes; - private final Map regions = new HashMap<>(); + private final Map regions = new ConcurrentHashMap<>(); private final AtomicLong currentCacheBytes = new AtomicLong(0L); private final AtomicLong totalFlushRows = new AtomicLong(0L); @@ -105,7 +105,7 @@ enum State { private final AtomicReference state = new AtomicReference<>(State.INACTIVE); private volatile Throwable e; - private final Queue flushQ = new LinkedList<>(); + private final Queue flushQ = new ConcurrentLinkedQueue<>(); /** * Whether write() has triggered a flush after currentCacheBytes > maxCacheBytes. @@ -419,6 +419,8 @@ protected TableRegion getCacheRegion(String uniqueKey, String database, String t TableRegion region = regions.get(uniqueKey); if (region == null) { + // currently write() will not be called concurrently, so regions will also not be + // created concurrently, but for future extension, protect it with synchronized synchronized (regions) { region = regions.get(uniqueKey); if (region == null) {