Skip to content

Commit

Permalink
Merge pull request LGouellec#345 from LGouellec/issue/314
Browse files Browse the repository at this point in the history
Fix Issue/314
  • Loading branch information
LGouellec authored Jul 10, 2024
2 parents cc2fdf4 + 52bca51 commit 9cfac50
Show file tree
Hide file tree
Showing 7 changed files with 265 additions and 10 deletions.
12 changes: 7 additions & 5 deletions core/State/InMemory/InMemoryWindowStore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
using Streamiz.Kafka.Net.Metrics;
using Streamiz.Kafka.Net.Metrics.Internal;
using Streamiz.Kafka.Net.State.Helper;
using Streamiz.Kafka.Net.State.InMemory.Internal;

namespace Streamiz.Kafka.Net.State.InMemory
{
Expand Down Expand Up @@ -260,7 +261,8 @@ public Windowed<Bytes> PeekNextKey()

#endregion

internal class InMemoryWindowStore : IWindowStore<Bytes, byte[]>
internal class
InMemoryWindowStore : IWindowStore<Bytes, byte[]>
{
private readonly TimeSpan retention;
private readonly long size;
Expand All @@ -272,8 +274,7 @@ internal class InMemoryWindowStore : IWindowStore<Bytes, byte[]>
private int seqnum = 0;

private readonly ConcurrentDictionary<long, ConcurrentDictionary<Bytes, byte[]>> map = new();

private readonly ISet<InMemoryWindowStoreEnumeratorWrapper> openIterators = new HashSet<InMemoryWindowStoreEnumeratorWrapper>();
private readonly ConcurrentSet<InMemoryWindowStoreEnumeratorWrapper> openIterators = new();

private readonly ILogger logger = Logger.GetLogger(typeof(InMemoryWindowStore));

Expand Down Expand Up @@ -309,8 +310,9 @@ public void Close()
if (openIterators.Count != 0)
{
logger.LogWarning("Closing {OpenIteratorCount} open iterators for store {Name}", openIterators.Count, Name);
for (int i = 0; i< openIterators.Count; ++i)
openIterators.ElementAt(i).Close();
foreach(var iterator in openIterators)
iterator.Close();
openIterators.Clear();
}

map.Clear();
Expand Down
71 changes: 71 additions & 0 deletions core/State/InMemory/Internal/ConcurrentHashSet.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
using System.Collections.Concurrent;
using System.Collections.Generic;

namespace Streamiz.Kafka.Net.State.InMemory.Internal
{
internal class ConcurrentSet<T>
{
private readonly ConcurrentDictionary<T, byte> _dictionary = new();

/// <summary>
/// Returns an enumerator that iterates through the collection.
/// </summary>
/// <returns>
/// A <see cref="T:System.Collections.Generic.IEnumerator`1"/> that can be used to iterate through the collection.
/// </returns>
public IEnumerator<T> GetEnumerator()
{
return _dictionary.Keys.GetEnumerator();
}

/// <summary>
/// Removes the first occurrence of a specific object from the <see cref="T:System.Collections.Generic.ICollection`1"/>.
/// </summary>
/// <returns>
/// true if <paramref name="item"/> was successfully removed from the <see cref="T:System.Collections.Generic.ICollection`1"/>; otherwise, false. This method also returns false if <paramref name="item"/> is not found in the original <see cref="T:System.Collections.Generic.ICollection`1"/>.
/// </returns>
/// <param name="item">The object to remove from the <see cref="T:System.Collections.Generic.ICollection`1"/>.</param><exception cref="T:System.NotSupportedException">The <see cref="T:System.Collections.Generic.ICollection`1"/> is read-only.</exception>
public bool Remove(T item)
{
return TryRemove(item);
}

/// <summary>
/// Gets the number of elements in the set.
/// </summary>
public int Count => _dictionary.Count;

/// <summary>
/// Adds an element to the current set and returns a value to indicate if the element was successfully added.
/// </summary>
/// <returns>
/// true if the element is added to the set; false if the element is already in the set.
/// </returns>
/// <param name="item">The element to add to the set.</param>
public bool Add(T item)
{
return TryAdd(item);
}

public void Clear()
{
_dictionary.Clear();
}

public bool Contains(T item)
{
return _dictionary.ContainsKey(item);
}

private bool TryAdd(T item)
{
return _dictionary.TryAdd(item, default);
}

private bool TryRemove(T item)
{
return _dictionary.TryRemove(item, out _);
}
}

}
14 changes: 14 additions & 0 deletions environment/datagen_connector.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
{
"name": "datagen-users",
"config": {
"connector.class": "io.confluent.kafka.connect.datagen.DatagenConnector",
"kafka.topic": "users",
"quickstart": "users",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false",
"max.interval": 1000,
"iterations": 10000000,
"tasks.max": "1"
}
}
8 changes: 4 additions & 4 deletions environment/docker-compose-with-connect.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
version: '2'
services:
zookeeper:
image: confluentinc/cp-zookeeper:7.4.0
image: confluentinc/cp-zookeeper:7.6.1
hostname: zookeeper
container_name: zookeeper
ports:
Expand All @@ -13,7 +13,7 @@ services:
KAFKA_OPTS: "-Dzookeeper.4lw.commands.whitelist=*"

broker:
image: confluentinc/cp-server:7.4.0
image: confluentinc/cp-server:7.6.1
hostname: broker
container_name: broker
depends_on:
Expand Down Expand Up @@ -42,7 +42,7 @@ services:
CONFLUENT_SUPPORT_CUSTOMER_ID: 'anonymous'

schema-registry:
image: confluentinc/cp-schema-registry:7.4.0
image: confluentinc/cp-schema-registry:7.6.1
hostname: schema-registry
container_name: schema-registry
depends_on:
Expand All @@ -55,7 +55,7 @@ services:
SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081

connect:
image: cnfldemos/kafka-connect-datagen:0.6.0-7.3.0
image: cnfldemos/kafka-connect-datagen:0.6.4-7.6.0
container_name: connect
depends_on:
- broker
Expand Down
2 changes: 1 addition & 1 deletion samples/sample-stream/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ namespace sample_stream
{
public static class Program
{
public static async Task Main(string[] args)
public static async Task Main2(string[] args)
{
var config = new StreamConfig<StringSerDes, StringSerDes>{
ApplicationId = $"test-app",
Expand Down
75 changes: 75 additions & 0 deletions samples/sample-stream/Reproducer314.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
using System;
using System.Threading;
using System.Threading.Tasks;
using Confluent.Kafka;
using Streamiz.Kafka.Net;
using Streamiz.Kafka.Net.SerDes;
using Streamiz.Kafka.Net.State;
using Streamiz.Kafka.Net.Stream;
using Streamiz.Kafka.Net.Table;

namespace sample_stream;

public class Reproducer314
{
public static async Task Main(string[] args)
{
Console.WriteLine("Hello Streams");

var config = new StreamConfig<StringSerDes, StringSerDes>
{
ApplicationId = $"test-windowedtable-bis",
BootstrapServers = "localhost:9092",
AutoOffsetReset = AutoOffsetReset.Earliest
};

var builder = CreateWindowedStore();
var t = builder.Build();
var windowedTableStream = new KafkaStream(t, config);

await windowedTableStream.StartAsync();

// wait for the store to be restored and ready
Thread.Sleep(10000);

GetValueFromWindowedStore(windowedTableStream, DateTime.UtcNow.AddHours(-1), new CancellationToken());

Console.WriteLine("Finished");
}

private static void GetValueFromWindowedStore(KafkaStream windowedTableStream, DateTime startUtcForWindowLookup, CancellationToken cancellationToken)
{
var windowedStore = windowedTableStream.Store(StoreQueryParameters.FromNameAndType("store", QueryableStoreTypes.WindowStore<string, int>()));

while (!cancellationToken.IsCancellationRequested)
{
var records = windowedStore.FetchAll(startUtcForWindowLookup, DateTime.UtcNow).ToList();

if (records.Count > 0)
{
foreach (var item in records)
{
Console.WriteLine($"Value from windowed store : KEY = {item.Key} VALUE = {item.Value}");
}

startUtcForWindowLookup = DateTime.UtcNow;
}
}
}

private static StreamBuilder CreateWindowedStore()
{
var builder = new StreamBuilder();

builder
.Stream<string, string>("users")
.GroupByKey()
.WindowedBy(TumblingWindowOptions.Of(60000))
.Aggregate(
() => 0,
(k, v, agg) => Math.Max(v.Length, agg),
InMemoryWindows.As<string, int>("store").WithValueSerdes<Int32SerDes>());

return builder;
}
}
93 changes: 93 additions & 0 deletions test/Streamiz.Kafka.Net.Tests/Private/ConcurrentSetTests.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using NUnit.Framework;
using Streamiz.Kafka.Net.State.InMemory.Internal;

namespace Streamiz.Kafka.Net.Tests.Private;

public class ConcurrentSetTests
{
private ConcurrentSet<string> concurrentSet;

[SetUp]
public void Init()
{
concurrentSet = new();
}

[TearDown]
public void Dispose()
{
concurrentSet.Clear();
}

[TestCase(1000)]
public void ConcurrencyAdded(int numberTasks)
{
var taskList = new List<Task>();
for (int i = 0; i < numberTasks; i++)
{
taskList.Add(Task.Factory.StartNew((Object obj) =>
{
concurrentSet.Add(Guid.NewGuid().ToString());
}, null));
}
Task.WaitAll(taskList.ToArray());
Assert.AreEqual(numberTasks, concurrentSet.Count);
}

[TestCase(1000)]
public void ConcurrencyRemoved(int numberTasks)
{
for (int i = 0; i < numberTasks; i++)
concurrentSet.Add(i.ToString());

var taskList = new List<Task>();
for (int i = 0; i < numberTasks; i++)
{
taskList.Add(Task.Factory.StartNew((Object obj) =>
{
concurrentSet.Remove(obj.ToString());
}, i));
}

Task.WaitAll(taskList.ToArray());
Assert.AreEqual(0, concurrentSet.Count);
}

[TestCase(10000)]
public void ConcurrencyAddedAndForeach(int numberTasks)
{
var taskList = new List<Task>();
for (int i = 0; i < numberTasks; i++)
{
taskList.Add(Task.Factory.StartNew((Object obj) =>
{
concurrentSet.Add(Guid.NewGuid().ToString());
foreach (var c in concurrentSet)
;
}, null));
}
Task.WaitAll(taskList.ToArray());
Assert.AreEqual(numberTasks, concurrentSet.Count);
}

[TestCase(10000)]
public void ConcurrencyAddedAndContains(int numberTasks)
{
var taskList = new List<Task>();
for (int i = 0; i < numberTasks; i++)
{
taskList.Add(Task.Factory.StartNew((Object obj) =>
{
var guid = Guid.NewGuid().ToString();
concurrentSet.Add(guid);
Assert.IsTrue(concurrentSet.Contains(guid));
}, null));
}
Task.WaitAll(taskList.ToArray());
Assert.AreEqual(numberTasks, concurrentSet.Count);
}

}

0 comments on commit 9cfac50

Please sign in to comment.