diff --git a/core/State/InMemory/InMemoryWindowStore.cs b/core/State/InMemory/InMemoryWindowStore.cs index 7ab51a39..fb981db5 100644 --- a/core/State/InMemory/InMemoryWindowStore.cs +++ b/core/State/InMemory/InMemoryWindowStore.cs @@ -310,8 +310,9 @@ public void Close() if (openIterators.Count != 0) { logger.LogWarning("Closing {OpenIteratorCount} open iterators for store {Name}", openIterators.Count, Name); - for (int i = 0; i< openIterators.Count; ++i) - openIterators.ElementAt(i).Close(); + foreach(var iterator in openIterators) + iterator.Close(); + openIterators.Clear(); } map.Clear(); diff --git a/core/State/InMemory/Internal/ConcurrentHashSet.cs b/core/State/InMemory/Internal/ConcurrentHashSet.cs index 57ed5895..e898b396 100644 --- a/core/State/InMemory/Internal/ConcurrentHashSet.cs +++ b/core/State/InMemory/Internal/ConcurrentHashSet.cs @@ -1,12 +1,9 @@ -using System; -using System.Collections; using System.Collections.Concurrent; using System.Collections.Generic; -using System.Linq; namespace Streamiz.Kafka.Net.State.InMemory.Internal { - internal class ConcurrentSet : IEnumerable, ISet, ICollection + internal class ConcurrentSet { private readonly ConcurrentDictionary _dictionary = new(); @@ -21,17 +18,6 @@ public IEnumerator GetEnumerator() return _dictionary.Keys.GetEnumerator(); } - /// - /// Returns an enumerator that iterates through a collection. - /// - /// - /// An object that can be used to iterate through the collection. - /// - IEnumerator IEnumerable.GetEnumerator() - { - return GetEnumerator(); - } - /// /// Removes the first occurrence of a specific object from the . /// @@ -49,150 +35,6 @@ public bool Remove(T item) /// public int Count => _dictionary.Count; - /// - /// Gets a value indicating whether the is read-only. - /// - /// - /// true if the is read-only; otherwise, false. - /// - public bool IsReadOnly => false; - - /// - /// Gets a value that indicates if the set is empty. - /// - public bool IsEmpty => _dictionary.IsEmpty; - - private ICollection Values => _dictionary.Keys; - - /// - /// Adds an item to the . - /// - /// The object to add to the .The is read-only. - void ICollection.Add(T item) - { - if (!Add(item)) - throw new ArgumentException("Item already exists in set."); - } - - /// - /// Modifies the current set so that it contains all elements that are present in both the current set and in the specified collection. - /// - /// The collection to compare to the current set. is null. - public void UnionWith(IEnumerable other) - { - foreach (var item in other) - TryAdd(item); - } - - /// - /// Modifies the current set so that it contains only elements that are also in a specified collection. - /// - /// The collection to compare to the current set. is null. - public void IntersectWith(IEnumerable other) - { - var enumerable = other as IList ?? other.ToArray(); - foreach (var item in this) - { - if (!enumerable.Contains(item)) - TryRemove(item); - } - } - - /// - /// Removes all elements in the specified collection from the current set. - /// - /// The collection of items to remove from the set. is null. - public void ExceptWith(IEnumerable other) - { - foreach (var item in other) - TryRemove(item); - } - - /// - /// Modifies the current set so that it contains only elements that are present either in the current set or in the specified collection, but not both. - /// - /// The collection to compare to the current set. is null. - public void SymmetricExceptWith(IEnumerable other) - { - throw new NotImplementedException(); - } - - /// - /// Determines whether a set is a subset of a specified collection. - /// - /// - /// true if the current set is a subset of ; otherwise, false. - /// - /// The collection to compare to the current set. is null. - public bool IsSubsetOf(IEnumerable other) - { - var enumerable = other as IList ?? other.ToArray(); - return this.AsParallel().All(enumerable.Contains); - } - - /// - /// Determines whether the current set is a superset of a specified collection. - /// - /// - /// true if the current set is a superset of ; otherwise, false. - /// - /// The collection to compare to the current set. is null. - public bool IsSupersetOf(IEnumerable other) - { - return other.AsParallel().All(Contains); - } - - /// - /// Determines whether the current set is a correct superset of a specified collection. - /// - /// - /// true if the object is a correct superset of ; otherwise, false. - /// - /// The collection to compare to the current set. is null. - public bool IsProperSupersetOf(IEnumerable other) - { - var enumerable = other as IList ?? other.ToArray(); - return this.Count != enumerable.Count && IsSupersetOf(enumerable); - } - - /// - /// Determines whether the current set is a property (strict) subset of a specified collection. - /// - /// - /// true if the current set is a correct subset of ; otherwise, false. - /// - /// The collection to compare to the current set. is null. - public bool IsProperSubsetOf(IEnumerable other) - { - var enumerable = other as IList ?? other.ToArray(); - return Count != enumerable.Count && IsSubsetOf(enumerable); - } - - /// - /// Determines whether the current set overlaps with the specified collection. - /// - /// - /// true if the current set and share at least one common element; otherwise, false. - /// - /// The collection to compare to the current set. is null. - public bool Overlaps(IEnumerable other) - { - return other.AsParallel().Any(Contains); - } - - /// - /// Determines whether the current set and the specified collection contain the same elements. - /// - /// - /// true if the current set is equal to ; otherwise, false. - /// - /// The collection to compare to the current set. is null. - public bool SetEquals(IEnumerable other) - { - var enumerable = other as IList ?? other.ToArray(); - return Count == enumerable.Count && enumerable.AsParallel().All(Contains); - } - /// /// Adds an element to the current set and returns a value to indicate if the element was successfully added. /// @@ -215,29 +57,14 @@ public bool Contains(T item) return _dictionary.ContainsKey(item); } - /// - /// Copies the elements of the to an , starting at a particular index. - /// - /// The one-dimensional that is the destination of the elements copied from . The must have zero-based indexing.The zero-based index in at which copying begins. is null. is less than 0. is multidimensional.-or-The number of elements in the source is greater than the available space from to the end of the destination .-or-Type cannot be cast automatically to the type of the destination . - public void CopyTo(T[] array, int arrayIndex) - { - Values.CopyTo(array, arrayIndex); - } - - public T[] ToArray() - { - return _dictionary.Keys.ToArray(); - } - - public bool TryAdd(T item) + private bool TryAdd(T item) { - return _dictionary.TryAdd(item, default(byte)); + return _dictionary.TryAdd(item, default); } - public bool TryRemove(T item) + private bool TryRemove(T item) { - byte donotcare; - return _dictionary.TryRemove(item, out donotcare); + return _dictionary.TryRemove(item, out _); } } diff --git a/test/Streamiz.Kafka.Net.Tests/Private/ConcurrentSetTests.cs b/test/Streamiz.Kafka.Net.Tests/Private/ConcurrentSetTests.cs new file mode 100644 index 00000000..d76168a0 --- /dev/null +++ b/test/Streamiz.Kafka.Net.Tests/Private/ConcurrentSetTests.cs @@ -0,0 +1,93 @@ +using System; +using System.Collections.Generic; +using System.Threading.Tasks; +using NUnit.Framework; +using Streamiz.Kafka.Net.State.InMemory.Internal; + +namespace Streamiz.Kafka.Net.Tests.Private; + +public class ConcurrentSetTests +{ + private ConcurrentSet concurrentSet; + + [SetUp] + public void Init() + { + concurrentSet = new(); + } + + [TearDown] + public void Dispose() + { + concurrentSet.Clear(); + } + + [TestCase(1000)] + public void ConcurrencyAdded(int numberTasks) + { + var taskList = new List(); + for (int i = 0; i < numberTasks; i++) + { + taskList.Add(Task.Factory.StartNew((Object obj) => + { + concurrentSet.Add(Guid.NewGuid().ToString()); + }, null)); + } + Task.WaitAll(taskList.ToArray()); + Assert.AreEqual(numberTasks, concurrentSet.Count); + } + + [TestCase(1000)] + public void ConcurrencyRemoved(int numberTasks) + { + for (int i = 0; i < numberTasks; i++) + concurrentSet.Add(i.ToString()); + + var taskList = new List(); + for (int i = 0; i < numberTasks; i++) + { + taskList.Add(Task.Factory.StartNew((Object obj) => + { + concurrentSet.Remove(obj.ToString()); + }, i)); + } + + Task.WaitAll(taskList.ToArray()); + Assert.AreEqual(0, concurrentSet.Count); + } + + [TestCase(10000)] + public void ConcurrencyAddedAndForeach(int numberTasks) + { + var taskList = new List(); + for (int i = 0; i < numberTasks; i++) + { + taskList.Add(Task.Factory.StartNew((Object obj) => + { + concurrentSet.Add(Guid.NewGuid().ToString()); + foreach (var c in concurrentSet) + ; + }, null)); + } + Task.WaitAll(taskList.ToArray()); + Assert.AreEqual(numberTasks, concurrentSet.Count); + } + + [TestCase(10000)] + public void ConcurrencyAddedAndContains(int numberTasks) + { + var taskList = new List(); + for (int i = 0; i < numberTasks; i++) + { + taskList.Add(Task.Factory.StartNew((Object obj) => + { + var guid = Guid.NewGuid().ToString(); + concurrentSet.Add(guid); + Assert.IsTrue(concurrentSet.Contains(guid)); + }, null)); + } + Task.WaitAll(taskList.ToArray()); + Assert.AreEqual(numberTasks, concurrentSet.Count); + } + +} \ No newline at end of file