From 5eee3be9a39cb2f28425d7f7149eccd56bfc12cb Mon Sep 17 00:00:00 2001 From: LGouellec Date: Tue, 9 Jul 2024 16:31:28 -0700 Subject: [PATCH 1/2] Reproducer #314 --- core/State/InMemory/InMemoryWindowStore.cs | 7 +- .../InMemory/Internal/ConcurrentHashSet.cs | 244 ++++++++++++++++++ environment/datagen_connector.json | 14 + environment/docker-compose-with-connect.yml | 8 +- samples/sample-stream/Program.cs | 2 +- samples/sample-stream/Reproducer314.cs | 75 ++++++ 6 files changed, 342 insertions(+), 8 deletions(-) create mode 100644 core/State/InMemory/Internal/ConcurrentHashSet.cs create mode 100644 environment/datagen_connector.json create mode 100644 samples/sample-stream/Reproducer314.cs diff --git a/core/State/InMemory/InMemoryWindowStore.cs b/core/State/InMemory/InMemoryWindowStore.cs index 997c9401..7ab51a39 100644 --- a/core/State/InMemory/InMemoryWindowStore.cs +++ b/core/State/InMemory/InMemoryWindowStore.cs @@ -12,6 +12,7 @@ using Streamiz.Kafka.Net.Metrics; using Streamiz.Kafka.Net.Metrics.Internal; using Streamiz.Kafka.Net.State.Helper; +using Streamiz.Kafka.Net.State.InMemory.Internal; namespace Streamiz.Kafka.Net.State.InMemory { @@ -260,7 +261,8 @@ public Windowed PeekNextKey() #endregion - internal class InMemoryWindowStore : IWindowStore + internal class + InMemoryWindowStore : IWindowStore { private readonly TimeSpan retention; private readonly long size; @@ -272,8 +274,7 @@ internal class InMemoryWindowStore : IWindowStore private int seqnum = 0; private readonly ConcurrentDictionary> map = new(); - - private readonly ISet openIterators = new HashSet(); + private readonly ConcurrentSet openIterators = new(); private readonly ILogger logger = Logger.GetLogger(typeof(InMemoryWindowStore)); diff --git a/core/State/InMemory/Internal/ConcurrentHashSet.cs b/core/State/InMemory/Internal/ConcurrentHashSet.cs new file mode 100644 index 00000000..57ed5895 --- /dev/null +++ b/core/State/InMemory/Internal/ConcurrentHashSet.cs @@ -0,0 +1,244 @@ +using System; +using System.Collections; +using System.Collections.Concurrent; +using System.Collections.Generic; +using System.Linq; + +namespace Streamiz.Kafka.Net.State.InMemory.Internal +{ + internal class ConcurrentSet : IEnumerable, ISet, ICollection + { + private readonly ConcurrentDictionary _dictionary = new(); + + /// + /// Returns an enumerator that iterates through the collection. + /// + /// + /// A that can be used to iterate through the collection. + /// + public IEnumerator GetEnumerator() + { + return _dictionary.Keys.GetEnumerator(); + } + + /// + /// Returns an enumerator that iterates through a collection. + /// + /// + /// An object that can be used to iterate through the collection. + /// + IEnumerator IEnumerable.GetEnumerator() + { + return GetEnumerator(); + } + + /// + /// Removes the first occurrence of a specific object from the . + /// + /// + /// true if was successfully removed from the ; otherwise, false. This method also returns false if is not found in the original . + /// + /// The object to remove from the .The is read-only. + public bool Remove(T item) + { + return TryRemove(item); + } + + /// + /// Gets the number of elements in the set. + /// + public int Count => _dictionary.Count; + + /// + /// Gets a value indicating whether the is read-only. + /// + /// + /// true if the is read-only; otherwise, false. + /// + public bool IsReadOnly => false; + + /// + /// Gets a value that indicates if the set is empty. + /// + public bool IsEmpty => _dictionary.IsEmpty; + + private ICollection Values => _dictionary.Keys; + + /// + /// Adds an item to the . + /// + /// The object to add to the .The is read-only. + void ICollection.Add(T item) + { + if (!Add(item)) + throw new ArgumentException("Item already exists in set."); + } + + /// + /// Modifies the current set so that it contains all elements that are present in both the current set and in the specified collection. + /// + /// The collection to compare to the current set. is null. + public void UnionWith(IEnumerable other) + { + foreach (var item in other) + TryAdd(item); + } + + /// + /// Modifies the current set so that it contains only elements that are also in a specified collection. + /// + /// The collection to compare to the current set. is null. + public void IntersectWith(IEnumerable other) + { + var enumerable = other as IList ?? other.ToArray(); + foreach (var item in this) + { + if (!enumerable.Contains(item)) + TryRemove(item); + } + } + + /// + /// Removes all elements in the specified collection from the current set. + /// + /// The collection of items to remove from the set. is null. + public void ExceptWith(IEnumerable other) + { + foreach (var item in other) + TryRemove(item); + } + + /// + /// Modifies the current set so that it contains only elements that are present either in the current set or in the specified collection, but not both. + /// + /// The collection to compare to the current set. is null. + public void SymmetricExceptWith(IEnumerable other) + { + throw new NotImplementedException(); + } + + /// + /// Determines whether a set is a subset of a specified collection. + /// + /// + /// true if the current set is a subset of ; otherwise, false. + /// + /// The collection to compare to the current set. is null. + public bool IsSubsetOf(IEnumerable other) + { + var enumerable = other as IList ?? other.ToArray(); + return this.AsParallel().All(enumerable.Contains); + } + + /// + /// Determines whether the current set is a superset of a specified collection. + /// + /// + /// true if the current set is a superset of ; otherwise, false. + /// + /// The collection to compare to the current set. is null. + public bool IsSupersetOf(IEnumerable other) + { + return other.AsParallel().All(Contains); + } + + /// + /// Determines whether the current set is a correct superset of a specified collection. + /// + /// + /// true if the object is a correct superset of ; otherwise, false. + /// + /// The collection to compare to the current set. is null. + public bool IsProperSupersetOf(IEnumerable other) + { + var enumerable = other as IList ?? other.ToArray(); + return this.Count != enumerable.Count && IsSupersetOf(enumerable); + } + + /// + /// Determines whether the current set is a property (strict) subset of a specified collection. + /// + /// + /// true if the current set is a correct subset of ; otherwise, false. + /// + /// The collection to compare to the current set. is null. + public bool IsProperSubsetOf(IEnumerable other) + { + var enumerable = other as IList ?? other.ToArray(); + return Count != enumerable.Count && IsSubsetOf(enumerable); + } + + /// + /// Determines whether the current set overlaps with the specified collection. + /// + /// + /// true if the current set and share at least one common element; otherwise, false. + /// + /// The collection to compare to the current set. is null. + public bool Overlaps(IEnumerable other) + { + return other.AsParallel().Any(Contains); + } + + /// + /// Determines whether the current set and the specified collection contain the same elements. + /// + /// + /// true if the current set is equal to ; otherwise, false. + /// + /// The collection to compare to the current set. is null. + public bool SetEquals(IEnumerable other) + { + var enumerable = other as IList ?? other.ToArray(); + return Count == enumerable.Count && enumerable.AsParallel().All(Contains); + } + + /// + /// Adds an element to the current set and returns a value to indicate if the element was successfully added. + /// + /// + /// true if the element is added to the set; false if the element is already in the set. + /// + /// The element to add to the set. + public bool Add(T item) + { + return TryAdd(item); + } + + public void Clear() + { + _dictionary.Clear(); + } + + public bool Contains(T item) + { + return _dictionary.ContainsKey(item); + } + + /// + /// Copies the elements of the to an , starting at a particular index. + /// + /// The one-dimensional that is the destination of the elements copied from . The must have zero-based indexing.The zero-based index in at which copying begins. is null. is less than 0. is multidimensional.-or-The number of elements in the source is greater than the available space from to the end of the destination .-or-Type cannot be cast automatically to the type of the destination . + public void CopyTo(T[] array, int arrayIndex) + { + Values.CopyTo(array, arrayIndex); + } + + public T[] ToArray() + { + return _dictionary.Keys.ToArray(); + } + + public bool TryAdd(T item) + { + return _dictionary.TryAdd(item, default(byte)); + } + + public bool TryRemove(T item) + { + byte donotcare; + return _dictionary.TryRemove(item, out donotcare); + } + } + +} \ No newline at end of file diff --git a/environment/datagen_connector.json b/environment/datagen_connector.json new file mode 100644 index 00000000..66abd216 --- /dev/null +++ b/environment/datagen_connector.json @@ -0,0 +1,14 @@ +{ + "name": "datagen-users", + "config": { + "connector.class": "io.confluent.kafka.connect.datagen.DatagenConnector", + "kafka.topic": "users", + "quickstart": "users", + "key.converter": "org.apache.kafka.connect.storage.StringConverter", + "value.converter": "org.apache.kafka.connect.json.JsonConverter", + "value.converter.schemas.enable": "false", + "max.interval": 1000, + "iterations": 10000000, + "tasks.max": "1" + } +} \ No newline at end of file diff --git a/environment/docker-compose-with-connect.yml b/environment/docker-compose-with-connect.yml index c65dad8d..3a4489ca 100644 --- a/environment/docker-compose-with-connect.yml +++ b/environment/docker-compose-with-connect.yml @@ -2,7 +2,7 @@ version: '2' services: zookeeper: - image: confluentinc/cp-zookeeper:7.4.0 + image: confluentinc/cp-zookeeper:7.6.1 hostname: zookeeper container_name: zookeeper ports: @@ -13,7 +13,7 @@ services: KAFKA_OPTS: "-Dzookeeper.4lw.commands.whitelist=*" broker: - image: confluentinc/cp-server:7.4.0 + image: confluentinc/cp-server:7.6.1 hostname: broker container_name: broker depends_on: @@ -42,7 +42,7 @@ services: CONFLUENT_SUPPORT_CUSTOMER_ID: 'anonymous' schema-registry: - image: confluentinc/cp-schema-registry:7.4.0 + image: confluentinc/cp-schema-registry:7.6.1 hostname: schema-registry container_name: schema-registry depends_on: @@ -55,7 +55,7 @@ services: SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081 connect: - image: cnfldemos/kafka-connect-datagen:0.6.0-7.3.0 + image: cnfldemos/kafka-connect-datagen:0.6.4-7.6.0 container_name: connect depends_on: - broker diff --git a/samples/sample-stream/Program.cs b/samples/sample-stream/Program.cs index 3238a83a..b1af4a78 100644 --- a/samples/sample-stream/Program.cs +++ b/samples/sample-stream/Program.cs @@ -14,7 +14,7 @@ namespace sample_stream { public static class Program { - public static async Task Main(string[] args) + public static async Task Main2(string[] args) { var config = new StreamConfig{ ApplicationId = $"test-app", diff --git a/samples/sample-stream/Reproducer314.cs b/samples/sample-stream/Reproducer314.cs new file mode 100644 index 00000000..44230759 --- /dev/null +++ b/samples/sample-stream/Reproducer314.cs @@ -0,0 +1,75 @@ +using System; +using System.Threading; +using System.Threading.Tasks; +using Confluent.Kafka; +using Streamiz.Kafka.Net; +using Streamiz.Kafka.Net.SerDes; +using Streamiz.Kafka.Net.State; +using Streamiz.Kafka.Net.Stream; +using Streamiz.Kafka.Net.Table; + +namespace sample_stream; + +public class Reproducer314 +{ + public static async Task Main(string[] args) + { + Console.WriteLine("Hello Streams"); + + var config = new StreamConfig + { + ApplicationId = $"test-windowedtable-bis", + BootstrapServers = "localhost:9092", + AutoOffsetReset = AutoOffsetReset.Earliest + }; + + var builder = CreateWindowedStore(); + var t = builder.Build(); + var windowedTableStream = new KafkaStream(t, config); + + await windowedTableStream.StartAsync(); + + // wait for the store to be restored and ready + Thread.Sleep(10000); + + GetValueFromWindowedStore(windowedTableStream, DateTime.UtcNow.AddHours(-1), new CancellationToken()); + + Console.WriteLine("Finished"); + } + + private static void GetValueFromWindowedStore(KafkaStream windowedTableStream, DateTime startUtcForWindowLookup, CancellationToken cancellationToken) + { + var windowedStore = windowedTableStream.Store(StoreQueryParameters.FromNameAndType("store", QueryableStoreTypes.WindowStore())); + + while (!cancellationToken.IsCancellationRequested) + { + var records = windowedStore.FetchAll(startUtcForWindowLookup, DateTime.UtcNow).ToList(); + + if (records.Count > 0) + { + foreach (var item in records) + { + Console.WriteLine($"Value from windowed store : KEY = {item.Key} VALUE = {item.Value}"); + } + + startUtcForWindowLookup = DateTime.UtcNow; + } + } + } + + private static StreamBuilder CreateWindowedStore() + { + var builder = new StreamBuilder(); + + builder + .Stream("users") + .GroupByKey() + .WindowedBy(TumblingWindowOptions.Of(60000)) + .Aggregate( + () => 0, + (k, v, agg) => Math.Max(v.Length, agg), + InMemoryWindows.As("store").WithValueSerdes()); + + return builder; + } +} \ No newline at end of file From 52bca511a9bc580dfa1d76425e0c9e613db30d3f Mon Sep 17 00:00:00 2001 From: LGouellec Date: Wed, 10 Jul 2024 15:16:26 -0700 Subject: [PATCH 2/2] Add unit tests --- core/State/InMemory/InMemoryWindowStore.cs | 5 +- .../InMemory/Internal/ConcurrentHashSet.cs | 183 +----------------- .../Private/ConcurrentSetTests.cs | 93 +++++++++ 3 files changed, 101 insertions(+), 180 deletions(-) create mode 100644 test/Streamiz.Kafka.Net.Tests/Private/ConcurrentSetTests.cs diff --git a/core/State/InMemory/InMemoryWindowStore.cs b/core/State/InMemory/InMemoryWindowStore.cs index 7ab51a39..fb981db5 100644 --- a/core/State/InMemory/InMemoryWindowStore.cs +++ b/core/State/InMemory/InMemoryWindowStore.cs @@ -310,8 +310,9 @@ public void Close() if (openIterators.Count != 0) { logger.LogWarning("Closing {OpenIteratorCount} open iterators for store {Name}", openIterators.Count, Name); - for (int i = 0; i< openIterators.Count; ++i) - openIterators.ElementAt(i).Close(); + foreach(var iterator in openIterators) + iterator.Close(); + openIterators.Clear(); } map.Clear(); diff --git a/core/State/InMemory/Internal/ConcurrentHashSet.cs b/core/State/InMemory/Internal/ConcurrentHashSet.cs index 57ed5895..e898b396 100644 --- a/core/State/InMemory/Internal/ConcurrentHashSet.cs +++ b/core/State/InMemory/Internal/ConcurrentHashSet.cs @@ -1,12 +1,9 @@ -using System; -using System.Collections; using System.Collections.Concurrent; using System.Collections.Generic; -using System.Linq; namespace Streamiz.Kafka.Net.State.InMemory.Internal { - internal class ConcurrentSet : IEnumerable, ISet, ICollection + internal class ConcurrentSet { private readonly ConcurrentDictionary _dictionary = new(); @@ -21,17 +18,6 @@ public IEnumerator GetEnumerator() return _dictionary.Keys.GetEnumerator(); } - /// - /// Returns an enumerator that iterates through a collection. - /// - /// - /// An object that can be used to iterate through the collection. - /// - IEnumerator IEnumerable.GetEnumerator() - { - return GetEnumerator(); - } - /// /// Removes the first occurrence of a specific object from the . /// @@ -49,150 +35,6 @@ public bool Remove(T item) /// public int Count => _dictionary.Count; - /// - /// Gets a value indicating whether the is read-only. - /// - /// - /// true if the is read-only; otherwise, false. - /// - public bool IsReadOnly => false; - - /// - /// Gets a value that indicates if the set is empty. - /// - public bool IsEmpty => _dictionary.IsEmpty; - - private ICollection Values => _dictionary.Keys; - - /// - /// Adds an item to the . - /// - /// The object to add to the .The is read-only. - void ICollection.Add(T item) - { - if (!Add(item)) - throw new ArgumentException("Item already exists in set."); - } - - /// - /// Modifies the current set so that it contains all elements that are present in both the current set and in the specified collection. - /// - /// The collection to compare to the current set. is null. - public void UnionWith(IEnumerable other) - { - foreach (var item in other) - TryAdd(item); - } - - /// - /// Modifies the current set so that it contains only elements that are also in a specified collection. - /// - /// The collection to compare to the current set. is null. - public void IntersectWith(IEnumerable other) - { - var enumerable = other as IList ?? other.ToArray(); - foreach (var item in this) - { - if (!enumerable.Contains(item)) - TryRemove(item); - } - } - - /// - /// Removes all elements in the specified collection from the current set. - /// - /// The collection of items to remove from the set. is null. - public void ExceptWith(IEnumerable other) - { - foreach (var item in other) - TryRemove(item); - } - - /// - /// Modifies the current set so that it contains only elements that are present either in the current set or in the specified collection, but not both. - /// - /// The collection to compare to the current set. is null. - public void SymmetricExceptWith(IEnumerable other) - { - throw new NotImplementedException(); - } - - /// - /// Determines whether a set is a subset of a specified collection. - /// - /// - /// true if the current set is a subset of ; otherwise, false. - /// - /// The collection to compare to the current set. is null. - public bool IsSubsetOf(IEnumerable other) - { - var enumerable = other as IList ?? other.ToArray(); - return this.AsParallel().All(enumerable.Contains); - } - - /// - /// Determines whether the current set is a superset of a specified collection. - /// - /// - /// true if the current set is a superset of ; otherwise, false. - /// - /// The collection to compare to the current set. is null. - public bool IsSupersetOf(IEnumerable other) - { - return other.AsParallel().All(Contains); - } - - /// - /// Determines whether the current set is a correct superset of a specified collection. - /// - /// - /// true if the object is a correct superset of ; otherwise, false. - /// - /// The collection to compare to the current set. is null. - public bool IsProperSupersetOf(IEnumerable other) - { - var enumerable = other as IList ?? other.ToArray(); - return this.Count != enumerable.Count && IsSupersetOf(enumerable); - } - - /// - /// Determines whether the current set is a property (strict) subset of a specified collection. - /// - /// - /// true if the current set is a correct subset of ; otherwise, false. - /// - /// The collection to compare to the current set. is null. - public bool IsProperSubsetOf(IEnumerable other) - { - var enumerable = other as IList ?? other.ToArray(); - return Count != enumerable.Count && IsSubsetOf(enumerable); - } - - /// - /// Determines whether the current set overlaps with the specified collection. - /// - /// - /// true if the current set and share at least one common element; otherwise, false. - /// - /// The collection to compare to the current set. is null. - public bool Overlaps(IEnumerable other) - { - return other.AsParallel().Any(Contains); - } - - /// - /// Determines whether the current set and the specified collection contain the same elements. - /// - /// - /// true if the current set is equal to ; otherwise, false. - /// - /// The collection to compare to the current set. is null. - public bool SetEquals(IEnumerable other) - { - var enumerable = other as IList ?? other.ToArray(); - return Count == enumerable.Count && enumerable.AsParallel().All(Contains); - } - /// /// Adds an element to the current set and returns a value to indicate if the element was successfully added. /// @@ -215,29 +57,14 @@ public bool Contains(T item) return _dictionary.ContainsKey(item); } - /// - /// Copies the elements of the to an , starting at a particular index. - /// - /// The one-dimensional that is the destination of the elements copied from . The must have zero-based indexing.The zero-based index in at which copying begins. is null. is less than 0. is multidimensional.-or-The number of elements in the source is greater than the available space from to the end of the destination .-or-Type cannot be cast automatically to the type of the destination . - public void CopyTo(T[] array, int arrayIndex) - { - Values.CopyTo(array, arrayIndex); - } - - public T[] ToArray() - { - return _dictionary.Keys.ToArray(); - } - - public bool TryAdd(T item) + private bool TryAdd(T item) { - return _dictionary.TryAdd(item, default(byte)); + return _dictionary.TryAdd(item, default); } - public bool TryRemove(T item) + private bool TryRemove(T item) { - byte donotcare; - return _dictionary.TryRemove(item, out donotcare); + return _dictionary.TryRemove(item, out _); } } diff --git a/test/Streamiz.Kafka.Net.Tests/Private/ConcurrentSetTests.cs b/test/Streamiz.Kafka.Net.Tests/Private/ConcurrentSetTests.cs new file mode 100644 index 00000000..d76168a0 --- /dev/null +++ b/test/Streamiz.Kafka.Net.Tests/Private/ConcurrentSetTests.cs @@ -0,0 +1,93 @@ +using System; +using System.Collections.Generic; +using System.Threading.Tasks; +using NUnit.Framework; +using Streamiz.Kafka.Net.State.InMemory.Internal; + +namespace Streamiz.Kafka.Net.Tests.Private; + +public class ConcurrentSetTests +{ + private ConcurrentSet concurrentSet; + + [SetUp] + public void Init() + { + concurrentSet = new(); + } + + [TearDown] + public void Dispose() + { + concurrentSet.Clear(); + } + + [TestCase(1000)] + public void ConcurrencyAdded(int numberTasks) + { + var taskList = new List(); + for (int i = 0; i < numberTasks; i++) + { + taskList.Add(Task.Factory.StartNew((Object obj) => + { + concurrentSet.Add(Guid.NewGuid().ToString()); + }, null)); + } + Task.WaitAll(taskList.ToArray()); + Assert.AreEqual(numberTasks, concurrentSet.Count); + } + + [TestCase(1000)] + public void ConcurrencyRemoved(int numberTasks) + { + for (int i = 0; i < numberTasks; i++) + concurrentSet.Add(i.ToString()); + + var taskList = new List(); + for (int i = 0; i < numberTasks; i++) + { + taskList.Add(Task.Factory.StartNew((Object obj) => + { + concurrentSet.Remove(obj.ToString()); + }, i)); + } + + Task.WaitAll(taskList.ToArray()); + Assert.AreEqual(0, concurrentSet.Count); + } + + [TestCase(10000)] + public void ConcurrencyAddedAndForeach(int numberTasks) + { + var taskList = new List(); + for (int i = 0; i < numberTasks; i++) + { + taskList.Add(Task.Factory.StartNew((Object obj) => + { + concurrentSet.Add(Guid.NewGuid().ToString()); + foreach (var c in concurrentSet) + ; + }, null)); + } + Task.WaitAll(taskList.ToArray()); + Assert.AreEqual(numberTasks, concurrentSet.Count); + } + + [TestCase(10000)] + public void ConcurrencyAddedAndContains(int numberTasks) + { + var taskList = new List(); + for (int i = 0; i < numberTasks; i++) + { + taskList.Add(Task.Factory.StartNew((Object obj) => + { + var guid = Guid.NewGuid().ToString(); + concurrentSet.Add(guid); + Assert.IsTrue(concurrentSet.Contains(guid)); + }, null)); + } + Task.WaitAll(taskList.ToArray()); + Assert.AreEqual(numberTasks, concurrentSet.Count); + } + +} \ No newline at end of file