Skip to content

Commit

Permalink
merge manually PR
Browse files Browse the repository at this point in the history
  • Loading branch information
LGouellec committed Jun 20, 2024
1 parent bf0400d commit 6ffb5c6
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 11 deletions.
18 changes: 10 additions & 8 deletions core/State/RocksDb/RocksDbKeyValueStore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
using Streamiz.Kafka.Net.State.Enumerator;
using System;
using System.Collections;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.IO;
using Microsoft.Extensions.Logging;
Expand Down Expand Up @@ -61,8 +62,7 @@ public class RocksDbKeyValueStore : IKeyValueStore<Bytes, byte[]>
{
private static readonly ILogger log = Logger.GetLogger(typeof(RocksDbKeyValueStore));

private readonly ISet<WrappedRocksRbKeyValueEnumerator> openIterators
= new HashSet<WrappedRocksRbKeyValueEnumerator>();
private readonly ConcurrentDictionary<WrappedRocksRbKeyValueEnumerator, bool> openIterators = new();


private const Compression COMPRESSION_TYPE = Compression.No;
Expand Down Expand Up @@ -162,8 +162,8 @@ public void Close()
if (openIterators.Count != 0)
{
log.LogWarning("Closing {openIteratorsCount} open iterators for store {Name}", openIterators.Count, Name);
foreach (WrappedRocksRbKeyValueEnumerator enumerator in openIterators)
enumerator.Dispose();
foreach (KeyValuePair<WrappedRocksRbKeyValueEnumerator, bool> entry in openIterators)
entry.Key.Dispose();
}

IsOpen = false;
Expand Down Expand Up @@ -440,16 +440,18 @@ private IKeyValueEnumerator<Bytes, byte[]> Range(Bytes from, Bytes to, bool forw
CheckStateStoreOpen();

var rocksEnumerator = DbAdapter.Range(from, to, forward);
var wrapped = new WrappedRocksRbKeyValueEnumerator(rocksEnumerator, openIterators.Remove);
openIterators.Add(wrapped);
Func<WrappedRocksRbKeyValueEnumerator, bool> remove = it => openIterators.TryRemove(it, out _);
var wrapped = new WrappedRocksRbKeyValueEnumerator(rocksEnumerator, remove);
openIterators.TryAdd(wrapped, true);
return wrapped;
}

private IEnumerable<KeyValuePair<Bytes, byte[]>> All(bool forward)
{
var enumerator = DbAdapter.All(forward);
var wrapped = new WrappedRocksRbKeyValueEnumerator(enumerator, openIterators.Remove);
openIterators.Add(wrapped);
Func<WrappedRocksRbKeyValueEnumerator, bool> remove = it => openIterators.TryRemove(it, out _);
var wrapped = new WrappedRocksRbKeyValueEnumerator(enumerator, remove);
openIterators.AddOrUpdate(wrapped, true);
return new KeyValueEnumerable(Name, wrapped);
}

Expand Down
8 changes: 5 additions & 3 deletions core/State/Stores.cs
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
using Streamiz.Kafka.Net.SerDes;
using Streamiz.Kafka.Net.State.InMemory;


using System;
using Streamiz.Kafka.Net.SerDes;
using Streamiz.Kafka.Net.State;
using Streamiz.Kafka.Net.State.InMemory;
using Streamiz.Kafka.Net.State.Supplier;
using System;

namespace Streamiz.Kafka.Net.State
{
Expand Down

0 comments on commit 6ffb5c6

Please sign in to comment.