diff --git a/core/State/RocksDb/RocksDbKeyValueStore.cs b/core/State/RocksDb/RocksDbKeyValueStore.cs index cab1a2f2..9e949144 100644 --- a/core/State/RocksDb/RocksDbKeyValueStore.cs +++ b/core/State/RocksDb/RocksDbKeyValueStore.cs @@ -5,6 +5,7 @@ using Streamiz.Kafka.Net.State.Enumerator; using System; using System.Collections; +using System.Collections.Concurrent; using System.Collections.Generic; using System.IO; using Microsoft.Extensions.Logging; @@ -61,8 +62,7 @@ public class RocksDbKeyValueStore : IKeyValueStore { private static readonly ILogger log = Logger.GetLogger(typeof(RocksDbKeyValueStore)); - private readonly ISet openIterators - = new HashSet(); + private readonly ConcurrentDictionary openIterators = new(); private const Compression COMPRESSION_TYPE = Compression.No; @@ -162,8 +162,8 @@ public void Close() if (openIterators.Count != 0) { log.LogWarning("Closing {openIteratorsCount} open iterators for store {Name}", openIterators.Count, Name); - foreach (WrappedRocksRbKeyValueEnumerator enumerator in openIterators) - enumerator.Dispose(); + foreach (KeyValuePair entry in openIterators) + entry.Key.Dispose(); } IsOpen = false; @@ -440,16 +440,18 @@ private IKeyValueEnumerator Range(Bytes from, Bytes to, bool forw CheckStateStoreOpen(); var rocksEnumerator = DbAdapter.Range(from, to, forward); - var wrapped = new WrappedRocksRbKeyValueEnumerator(rocksEnumerator, openIterators.Remove); - openIterators.Add(wrapped); + Func remove = it => openIterators.TryRemove(it, out _); + var wrapped = new WrappedRocksRbKeyValueEnumerator(rocksEnumerator, remove); + openIterators.TryAdd(wrapped, true); return wrapped; } private IEnumerable> All(bool forward) { var enumerator = DbAdapter.All(forward); - var wrapped = new WrappedRocksRbKeyValueEnumerator(enumerator, openIterators.Remove); - openIterators.Add(wrapped); + Func remove = it => openIterators.TryRemove(it, out _); + var wrapped = new WrappedRocksRbKeyValueEnumerator(enumerator, remove); + openIterators.AddOrUpdate(wrapped, true); return new KeyValueEnumerable(Name, wrapped); } diff --git a/core/State/Stores.cs b/core/State/Stores.cs index e3b7863a..a0184e90 100644 --- a/core/State/Stores.cs +++ b/core/State/Stores.cs @@ -1,8 +1,10 @@ -using Streamiz.Kafka.Net.SerDes; -using Streamiz.Kafka.Net.State.InMemory; + + +using System; +using Streamiz.Kafka.Net.SerDes; using Streamiz.Kafka.Net.State; +using Streamiz.Kafka.Net.State.InMemory; using Streamiz.Kafka.Net.State.Supplier; -using System; namespace Streamiz.Kafka.Net.State {