Skip to content

Commit

Permalink
properly support deletion in StorageAgent
Browse files Browse the repository at this point in the history
  • Loading branch information
HSGamer committed Feb 12, 2025
1 parent 756198d commit 1d07076
Showing 1 changed file with 21 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import me.hsgamer.topper.core.DataEntry;
import me.hsgamer.topper.core.DataHolder;
import me.hsgamer.topper.storage.core.DataStorage;
import org.jetbrains.annotations.Nullable;

import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
Expand All @@ -21,9 +22,9 @@ public class StorageAgent<K, V> implements Agent, DataEntryAgent<K, V>, Runnable

private final DataHolder<K, V> holder;
private final DataStorage<K, V> storage;
private final Queue<Map.Entry<K, V>> queue = new ConcurrentLinkedQueue<>(); // Value can be null representing removal
private final AtomicReference<Map<K, V>> storeMap = new AtomicReference<>(new ConcurrentHashMap<>());
private final AtomicReference<Map<K, V>> savingMap = new AtomicReference<>();
private final Queue<Map.Entry<K, ValueWrapper<V>>> queue = new ConcurrentLinkedQueue<>(); // Value can be null representing removal
private final AtomicReference<Map<K, ValueWrapper<V>>> storeMap = new AtomicReference<>(new ConcurrentHashMap<>());
private final AtomicReference<Map<K, ValueWrapper<V>>> savingMap = new AtomicReference<>();
private final AtomicBoolean saving = new AtomicBoolean(false);
private int maxEntryPerCall = 10;

Expand All @@ -39,14 +40,16 @@ private void save(boolean urgent) {
storeMap.getAndSet(new ConcurrentHashMap<>())
.forEach((key, value) -> queue.add(new AbstractMap.SimpleEntry<>(key, value)));

Map<K, V> map = savingMap.updateAndGet(old -> old == null ? new HashMap<>() : old);
Map<K, ValueWrapper<V>> map = savingMap.updateAndGet(old -> old == null ? new HashMap<>() : old);

for (int i = 0; i < (urgent || maxEntryPerCall <= 0 ? Integer.MAX_VALUE : maxEntryPerCall); i++) {
Map.Entry<K, V> entry = queue.poll();
int entryIndex = 0;
while (urgent || maxEntryPerCall <= 0 || entryIndex < maxEntryPerCall) {
Map.Entry<K, ValueWrapper<V>> entry = queue.poll();
if (entry == null) {
break;
}
map.put(entry.getKey(), entry.getValue());
entryIndex++;
}

if (map.isEmpty()) {
Expand All @@ -59,13 +62,13 @@ private void save(boolean urgent) {
Map<K, V> finalMap = map.entrySet()
.stream()
.filter(entry -> {
if (entry.getValue() == null) {
if (entry.getValue().value == null) {
removeKeys.add(entry.getKey());
return false;
}
return true;
})
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
.collect(Collectors.toMap(Map.Entry::getKey, entry -> entry.getValue().value));


Optional<DataStorage.Modifier<K, V>> optionalModifier = storage.modify();
Expand Down Expand Up @@ -112,12 +115,12 @@ public void beforeStop() {

@Override
public void onUpdate(DataEntry<K, V> entry, V oldValue, V newValue) {
storeMap.get().put(entry.getKey(), newValue);
storeMap.get().put(entry.getKey(), new ValueWrapper<>(newValue));
}

@Override
public void onRemove(DataEntry<K, V> entry) {
storeMap.get().put(entry.getKey(), null);
storeMap.get().put(entry.getKey(), new ValueWrapper<>(null));
}

@Override
Expand All @@ -132,4 +135,12 @@ public DataStorage<K, V> getStorage() {
public void setMaxEntryPerCall(int taskSaveEntryPerTick) {
this.maxEntryPerCall = taskSaveEntryPerTick;
}

private static final class ValueWrapper<V> {
private final @Nullable V value;

private ValueWrapper(@Nullable V value) {
this.value = value;
}
}
}

0 comments on commit 1d07076

Please sign in to comment.