Skip to content

Commit

Permalink
Merge pull request #107 from HSGamer/generic-key
Browse files Browse the repository at this point in the history
  • Loading branch information
HSGamer authored Jun 1, 2024
2 parents 53c433e + ab559ab commit 04719c6
Show file tree
Hide file tree
Showing 27 changed files with 246 additions and 242 deletions.
Original file line number Diff line number Diff line change
@@ -1,13 +1,11 @@
package me.hsgamer.topper.core.agent.snapshot;

import java.util.UUID;
public class DataSnapshot<K, V> {
public final K key;
public final V value;

public class DataSnapshot<T> {
public final UUID uuid;
public final T value;

DataSnapshot(UUID uuid, T value) {
this.uuid = uuid;
DataSnapshot(K key, V value) {
this.key = key;
this.value = value;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,26 +11,26 @@
import java.util.stream.IntStream;
import java.util.stream.Stream;

public class SnapshotAgent<T> extends TaskAgent {
private final AtomicReference<List<DataSnapshot<T>>> topSnapshot = new AtomicReference<>(Collections.emptyList());
private final AtomicReference<Map<UUID, Integer>> indexMap = new AtomicReference<>(Collections.emptyMap());
private final DataHolder<T> holder;
private final List<Predicate<DataSnapshot<T>>> filters = new ArrayList<>();
private Comparator<T> comparator;
public class SnapshotAgent<K, V> extends TaskAgent {
private final AtomicReference<List<DataSnapshot<K, V>>> topSnapshot = new AtomicReference<>(Collections.emptyList());
private final AtomicReference<Map<K, Integer>> indexMap = new AtomicReference<>(Collections.emptyMap());
private final DataHolder<K, V> holder;
private final List<Predicate<DataSnapshot<K, V>>> filters = new ArrayList<>();
private Comparator<V> comparator;

public SnapshotAgent(DataHolder<T> holder) {
public SnapshotAgent(DataHolder<K, V> holder) {
this.holder = holder;
}

@Override
protected Runnable getRunnable() {
return () -> {
List<DataSnapshot<T>> list = getUrgentSnapshot();
List<DataSnapshot<K, V>> list = getUrgentSnapshot();
topSnapshot.set(getUrgentSnapshot());

Map<UUID, Integer> map = IntStream.range(0, list.size())
Map<K, Integer> map = IntStream.range(0, list.size())
.boxed()
.collect(Collectors.toMap(i -> list.get(i).uuid, i -> i));
.collect(Collectors.toMap(i -> list.get(i).key, i -> i));
indexMap.set(map);
};
}
Expand All @@ -42,39 +42,39 @@ public void stop() {
indexMap.set(Collections.emptyMap());
}

public List<DataSnapshot<T>> getUrgentSnapshot() {
Stream<DataSnapshot<T>> stream = holder.getEntryMap().entrySet().stream()
public List<DataSnapshot<K, V>> getUrgentSnapshot() {
Stream<DataSnapshot<K, V>> stream = holder.getEntryMap().entrySet().stream()
.map(entry -> new DataSnapshot<>(entry.getKey(), entry.getValue().getValue()))
.filter(snapshot -> filters.stream().allMatch(filter -> filter.test(snapshot)));
if (comparator != null) {
stream = stream.sorted(Comparator.<DataSnapshot<T>, T>comparing(snapshot -> snapshot.value, comparator).reversed());
stream = stream.sorted(Comparator.<DataSnapshot<K, V>, V>comparing(snapshot -> snapshot.value, comparator).reversed());
}
return stream.collect(Collectors.toList());
}

public List<DataSnapshot<T>> getSnapshot() {
public List<DataSnapshot<K, V>> getSnapshot() {
return topSnapshot.get();
}

public int getSnapshotIndex(UUID uuid) {
return indexMap.get().getOrDefault(uuid, -1);
public int getSnapshotIndex(K key) {
return indexMap.get().getOrDefault(key, -1);
}

public Optional<DataSnapshot<T>> getSnapshotByIndex(int index) {
List<DataSnapshot<T>> list = getSnapshot();
public Optional<DataSnapshot<K, V>> getSnapshotByIndex(int index) {
List<DataSnapshot<K, V>> list = getSnapshot();
if (index < 0 || index >= list.size()) return Optional.empty();
return Optional.of(list.get(index));
}

public Optional<DataEntry<T>> getEntryByIndex(int index) {
return getSnapshotByIndex(index).flatMap(snapshot -> holder.getEntry(snapshot.uuid));
public Optional<DataEntry<K, V>> getEntryByIndex(int index) {
return getSnapshotByIndex(index).flatMap(snapshot -> holder.getEntry(snapshot.key));
}

public void setComparator(Comparator<T> comparator) {
public void setComparator(Comparator<V> comparator) {
this.comparator = comparator;
}

public void addFilter(Predicate<DataSnapshot<T>> filter) {
public void addFilter(Predicate<DataSnapshot<K, V>> filter) {
filters.add(filter);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,65 +9,64 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Queue;
import java.util.UUID;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.logging.Level;
import java.util.logging.Logger;

public class StorageAgent<T> extends TaskAgent {
public class StorageAgent<K, V> extends TaskAgent {
public static final EntryTempFlag NEED_SAVING = new EntryTempFlag("needSaving");
public static final EntryTempFlag IS_SAVING = new EntryTempFlag("isSaving");
private final Queue<UUID> saveQueue = new ConcurrentLinkedQueue<>();
private final Queue<K> saveQueue = new ConcurrentLinkedQueue<>();
private final List<Runnable> onLoadListeners = new ArrayList<>();
private final Logger logger;
private final DataHolder<T> holder;
private final DataStorage<T> storage;
private final DataHolder<K, V> holder;
private final DataStorage<K, V> storage;
private int maxEntryPerCall = 10;
private boolean urgentSave = false;
private boolean loadOnCreate = false;
private boolean urgentLoad = true;

public StorageAgent(Logger logger, DataStorage<T> storage) {
public StorageAgent(Logger logger, DataStorage<K, V> storage) {
this.logger = logger;
this.holder = storage.getHolder();
this.storage = storage;
}

private void save(DataEntry<T> entry) {
private void save(DataEntry<K, V> entry) {
if (entry.hasFlag(IS_SAVING)) return;
if (!entry.hasFlag(NEED_SAVING)) return;
entry.removeFlag(NEED_SAVING);
entry.addFlag(IS_SAVING);
storage.save(entry.getUuid(), entry.getValue(), urgentSave).whenComplete((result, throwable) -> entry.removeFlag(IS_SAVING));
storage.save(entry.getKey(), entry.getValue(), urgentSave).whenComplete((result, throwable) -> entry.removeFlag(IS_SAVING));
}

private void load(DataEntry<T> entry) {
storage.load(entry.getUuid(), urgentLoad).whenComplete((result, throwable) -> {
private void load(DataEntry<K, V> entry) {
storage.load(entry.getKey(), urgentLoad).whenComplete((result, throwable) -> {
if (throwable != null) {
logger.log(Level.WARNING, throwable, () -> "Failed to load " + entry.getUuid());
logger.log(Level.WARNING, throwable, () -> "Failed to load " + entry.getKey());
} else {
result.ifPresent(entry::setValue);
}
});
}

public void loadIfExist(UUID uuid) {
holder.getEntry(uuid).ifPresent(this::load);
public void loadIfExist(K key) {
holder.getEntry(key).ifPresent(this::load);
}

@Override
public void start() {
holder.getCreateListenerManager().add(entry -> {
saveQueue.add(entry.getUuid());
holder.getListenerManager().add(DataHolder.EventStates.CREATE, entry -> {
saveQueue.add(entry.getKey());
if (loadOnCreate) {
load(entry);
}
});
holder.getRemoveListenerManager().add(entry -> {
holder.getListenerManager().add(DataHolder.EventStates.REMOVE, entry -> {
save(entry);
saveQueue.remove(entry.getUuid());
saveQueue.remove(entry.getKey());
});
holder.getUpdateListenerManager().add(entry -> entry.addFlag(NEED_SAVING));
holder.getListenerManager().add(DataHolder.EventStates.UPDATE, entry -> entry.addFlag(NEED_SAVING));
storage.onRegister();
storage.load()
.whenComplete((entries, throwable) -> {
Expand Down Expand Up @@ -96,13 +95,13 @@ public void beforeStop() {
@Override
protected Runnable getRunnable() {
return () -> {
List<UUID> list = new ArrayList<>();
List<K> list = new ArrayList<>();
for (int i = 0; i < maxEntryPerCall; i++) {
UUID uuid = saveQueue.poll();
if (uuid == null) break;
DataEntry<T> entry = holder.getOrCreateEntry(uuid);
K k = saveQueue.poll();
if (k == null) break;
DataEntry<K, V> entry = holder.getOrCreateEntry(k);
save(entry);
list.add(uuid);
list.add(k);
}
if (!list.isEmpty()) {
saveQueue.addAll(list);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,43 +5,46 @@
import me.hsgamer.topper.core.flag.EntryTempFlag;
import me.hsgamer.topper.core.holder.DataHolder;

import java.util.*;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.Queue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.function.Function;

public class UpdateAgent<T> extends TaskAgent {
public class UpdateAgent<K, V> extends TaskAgent {
public static final EntryTempFlag IS_UPDATING = new EntryTempFlag("isUpdating");
public static final EntryTempFlag IGNORE_UPDATE = new EntryTempFlag("ignoreUpdate");
private final Queue<UUID> updateQueue = new ConcurrentLinkedQueue<>();
private final DataHolder<T> holder;
private final Queue<K> updateQueue = new ConcurrentLinkedQueue<>();
private final DataHolder<K, V> holder;
private int maxEntryPerCall = 10;
private Function<UUID, CompletableFuture<Optional<T>>> updateFunction;
private Function<K, CompletableFuture<Optional<V>>> updateFunction;

public UpdateAgent(DataHolder<T> holder) {
public UpdateAgent(DataHolder<K, V> holder) {
this.holder = holder;
}

public void setMaxEntryPerCall(int maxEntryPerCall) {
this.maxEntryPerCall = maxEntryPerCall;
}

public void setUpdateFunction(Function<UUID, CompletableFuture<Optional<T>>> updateFunction) {
public void setUpdateFunction(Function<K, CompletableFuture<Optional<V>>> updateFunction) {
this.updateFunction = updateFunction;
}

@Override
protected Runnable getRunnable() {
return () -> {
List<UUID> list = new ArrayList<>();
List<K> list = new ArrayList<>();
for (int i = 0; i < maxEntryPerCall; i++) {
UUID uuid = updateQueue.poll();
if (uuid == null) {
K k = updateQueue.poll();
if (k == null) {
break;
}
DataEntry<T> entry = holder.getOrCreateEntry(uuid);
DataEntry<K, V> entry = holder.getOrCreateEntry(k);
updateEntry(entry);
list.add(uuid);
list.add(k);
}
if (!list.isEmpty()) {
updateQueue.addAll(list);
Expand All @@ -54,16 +57,16 @@ public void start() {
if (updateFunction == null) {
throw new IllegalStateException("Update function is not set");
}
holder.getCreateListenerManager().add(entry -> updateQueue.add(entry.getUuid()));
holder.getRemoveListenerManager().add(entry -> updateQueue.remove(entry.getUuid()));
holder.getListenerManager().add(DataHolder.EventStates.CREATE, entry -> updateQueue.add(entry.getKey()));
holder.getListenerManager().add(DataHolder.EventStates.REMOVE, entry -> updateQueue.remove(entry.getKey()));
super.start();
}

private void updateEntry(DataEntry<T> entry) {
private void updateEntry(DataEntry<K, V> entry) {
if (entry.hasFlag(IGNORE_UPDATE)) return;
if (entry.hasFlag(IS_UPDATING)) return;
entry.addFlag(IS_UPDATING);
updateFunction.apply(entry.getUuid()).thenAccept(optional -> {
updateFunction.apply(entry.getKey()).thenAccept(optional -> {
optional.ifPresent(entry::setValue);
entry.removeFlag(IS_UPDATING);
});
Expand Down
39 changes: 19 additions & 20 deletions core/src/main/java/me/hsgamer/topper/core/entry/DataEntry.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,50 +5,49 @@

import java.util.Map;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.UnaryOperator;

public final class DataEntry<T> {
private final UUID uuid;
private final DataHolder<T> holder;
private final AtomicReference<T> value;
public final class DataEntry<K, V> {
private final K key;
private final DataHolder<K, V> holder;
private final AtomicReference<V> value;
private final Map<EntryTempFlag, Boolean> tempFlags = new ConcurrentHashMap<>();

public DataEntry(UUID uuid, DataHolder<T> holder) {
this.uuid = uuid;
public DataEntry(K key, DataHolder<K, V> holder) {
this.key = key;
this.holder = holder;
this.value = new AtomicReference<>(holder.getDefaultValue());
}

public UUID getUuid() {
return uuid;
public K getKey() {
return key;
}

public T getValue() {
public V getValue() {
return value.get();
}

public void setValue(T value) {
public void setValue(V value) {
setValue(value, true);
}

public void setValue(UnaryOperator<T> operator) {
public void setValue(UnaryOperator<V> operator) {
setValue(operator, true);
}

public void setValue(T value, boolean notify) {
public void setValue(V value, boolean notify) {
if (Objects.equals(this.value.get(), value)) return;
this.value.set(value);
if (notify) holder.getUpdateListenerManager().notifyListeners(this);
if (notify) holder.getListenerManager().call(DataHolder.EventStates.UPDATE, this);
}

public void setValue(UnaryOperator<T> operator, boolean notify) {
public void setValue(UnaryOperator<V> operator, boolean notify) {
setValue(operator.apply(value.get()), notify);
}

public DataHolder<T> getHolder() {
public DataHolder<K, V> getHolder() {
return holder;
}

Expand All @@ -67,13 +66,13 @@ public void removeFlag(EntryTempFlag flag) {
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (!(o instanceof DataEntry)) return false;
DataEntry<?> dataEntry = (DataEntry<?>) o;
return getUuid().equals(dataEntry.getUuid()) && getHolder().equals(dataEntry.getHolder()) && getValue().equals(dataEntry.getValue());
if (o == null || getClass() != o.getClass()) return false;
DataEntry<?, ?> dataEntry = (DataEntry<?, ?>) o;
return Objects.equals(getKey(), dataEntry.getKey()) && Objects.equals(getHolder(), dataEntry.getHolder()) && Objects.equals(getValue(), dataEntry.getValue());
}

@Override
public int hashCode() {
return Objects.hash(getUuid(), getHolder(), getValue());
return Objects.hash(getKey(), getHolder(), getValue());
}
}
Loading

0 comments on commit 04719c6

Please sign in to comment.