Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bugfix] Fix NPE caused by concurrent access #270

Merged
merged 2 commits into from
Aug 23, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,10 @@

import java.io.Serializable;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -84,7 +84,7 @@ enum State {
private final long maxCacheBytes;
// threshold to block write
private final long maxWriteBlockCacheBytes;
private final Map<String, TableRegion> regions = new HashMap<>();
private final Map<String, TableRegion> regions = new ConcurrentHashMap<>();
private final AtomicLong currentCacheBytes = new AtomicLong(0L);
private final AtomicLong totalFlushRows = new AtomicLong(0L);

Expand All @@ -105,7 +105,7 @@ enum State {
private final AtomicReference<State> state = new AtomicReference<>(State.INACTIVE);
private volatile Throwable e;

private final Queue<TransactionTableRegion> flushQ = new LinkedList<>();
private final Queue<TransactionTableRegion> flushQ = new ConcurrentLinkedQueue<>();

/**
* Whether write() has triggered a flush after currentCacheBytes > maxCacheBytes.
Expand Down Expand Up @@ -419,6 +419,8 @@ protected TableRegion getCacheRegion(String uniqueKey, String database, String t

TableRegion region = regions.get(uniqueKey);
if (region == null) {
// currently write() will not be called concurrently, so regions will also not be
// created concurrently, but for future extension, protect it with synchronized
synchronized (regions) {
region = regions.get(uniqueKey);
if (region == null) {
Expand Down
Loading