Skip to content

Commit

Permalink
merge from develop
Browse files Browse the repository at this point in the history
  • Loading branch information
LGouellec committed Jul 30, 2024
2 parents 597bac7 + 602e514 commit 35c47ac
Show file tree
Hide file tree
Showing 115 changed files with 5,742 additions and 657 deletions.
9 changes: 9 additions & 0 deletions core/Crosscutting/Bytes.cs
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,11 @@ public Bytes(byte[] bytes)
{
Get = bytes;
}

/// <summary>
/// Create a Bytes using the byte array.
/// </summary>
public Bytes(){}

/// <summary>
///
Expand Down Expand Up @@ -127,6 +132,10 @@ internal static Bytes Wrap(byte[] bytes)
public virtual int CompareTo(Bytes other)
{
BytesComparer comparer = new BytesComparer();
if (other == null || other.Get == null)
return 1;
if (Get == null)
return -1;
return comparer.Compare(this, other);
}
}
Expand Down
19 changes: 19 additions & 0 deletions core/Crosscutting/DictionaryExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,25 @@ public static bool AddOrUpdate<K, V>(this IDictionary<K, V> map, K key, V value)
return true;
}

/// <summary>
/// Add the element if the key doesn't exist
/// </summary>
/// <typeparam name="K">Key type</typeparam>
/// <typeparam name="V">Value type</typeparam>
/// <param name="map">Source dictionary</param>
/// <param name="key">New key</param>
/// <param name="value">Value</param>
/// <returns>Return true if the key|value was added, false otherwise</returns>
public static bool AddIfNotExist<K, V>(this IDictionary<K, V> map, K key, V value)
{
if (!map.ContainsKey(key))
{
map.Add(key, value);
return true;
}
return false;
}

/// <summary>
/// Convert enumerable of <see cref="KeyValuePair{K, V}"/> to <see cref="IDictionary{K, V}"/>
/// </summary>
Expand Down
33 changes: 19 additions & 14 deletions core/Crosscutting/SortedDictionaryExtensions.cs
Original file line number Diff line number Diff line change
@@ -1,49 +1,54 @@
using System.Collections.Generic;
using System;
using System.Collections.Generic;

namespace Streamiz.Kafka.Net.Crosscutting
{
internal static class SortedDictionaryExtensions
{
internal static IEnumerable<KeyValuePair<K, V>> HeadMap<K, V>(this SortedDictionary<K, V> sortedDic, K key, bool inclusive)
internal static IEnumerable<KeyValuePair<K, V>> HeadMap<K, V>(this IEnumerable<KeyValuePair<K, V>> enumerable, K key, bool inclusive)
where K : IComparable<K>
{
foreach (K k in sortedDic.Keys) {
int r = sortedDic.Comparer.Compare(key, k);
foreach (var kv in enumerable)
{
int r = key.CompareTo(kv.Key);
if ((inclusive && r >= 0) || (!inclusive && r > 0))
yield return new KeyValuePair<K, V>(k, sortedDic[k]);
yield return new KeyValuePair<K, V>(kv.Key, kv.Value);
else
break;
}
}

internal static IEnumerable<KeyValuePair<K, V>> SubMap<K, V>(this SortedDictionary<K, V> sortedDic, K keyFrom, K keyTo , bool inclusiveFrom, bool inclusiveTo)
internal static IEnumerable<KeyValuePair<K, V>> SubMap<K, V>(this IEnumerable<KeyValuePair<K, V>> enumerable, K keyFrom, K keyTo , bool inclusiveFrom, bool inclusiveTo)
where K : IComparable<K>
{
foreach (K k in sortedDic.Keys)
foreach (var kv in enumerable)
{
int rF = sortedDic.Comparer.Compare(keyFrom, k);
int rT = sortedDic.Comparer.Compare(keyTo, k);
int rF = keyFrom.CompareTo(kv.Key);
int rT = keyTo.CompareTo(kv.Key);

if((inclusiveFrom && rF <= 0) || (!inclusiveFrom && rF < 0))
{
if ((inclusiveTo && rT >= 0) || (!inclusiveTo && rT > 0))
{
yield return new KeyValuePair<K, V>(k, sortedDic[k]);
yield return new KeyValuePair<K, V>(kv.Key, kv.Value);
}
else
break;
}
}
}

internal static IEnumerable<KeyValuePair<K, V>> TailMap<K, V>(this SortedDictionary<K, V> sortedDic, K keyFrom,
internal static IEnumerable<KeyValuePair<K, V>> TailMap<K, V>(this IEnumerable<KeyValuePair<K, V>> enumerable, K keyFrom,
bool inclusive)
where K : IComparable<K>
{
foreach (K k in sortedDic.Keys)
foreach (var kv in enumerable)
{
int rT = sortedDic.Comparer.Compare(keyFrom, k);
int rT = keyFrom.CompareTo(kv.Key);

if ((inclusive && rT <= 0) || (!inclusive && rT < 0))
{
yield return new KeyValuePair<K, V>(k, sortedDic[k]);
yield return new KeyValuePair<K, V>(kv.Key, kv.Value);
}
}
}
Expand Down
3 changes: 2 additions & 1 deletion core/Kafka/IRecordCollector.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
using System.Collections.Generic;
using System.Collections.Generic;
using Confluent.Kafka;
using Streamiz.Kafka.Net.SerDes;

Expand All @@ -12,5 +12,6 @@ internal interface IRecordCollector
void Close();
void Send<K, V>(string topic, K key, V value, Headers headers, long timestamp, ISerDes<K> keySerializer, ISerDes<V> valueSerializer);
void Send<K, V>(string topic, K key, V value, Headers headers, int partition, long timestamp, ISerDes<K> keySerializer, ISerDes<V> valueSerializer);
int PartitionsFor(string topic);
}
}
1 change: 0 additions & 1 deletion core/Kafka/Internal/DefaultKafkaClientSupplier.cs
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,6 @@ public IProducer<byte[], byte[]> GetProducer(ProducerConfig config)
producerStatisticsHandler.Publish(statistics);
});
}

return builder.Build();
}

Expand Down
37 changes: 32 additions & 5 deletions core/Kafka/Internal/RecordCollector.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Text;
using Microsoft.Extensions.Logging;
using Streamiz.Kafka.Net.Metrics;
Expand Down Expand Up @@ -81,24 +82,28 @@ public void Clear()
private readonly IStreamConfig configuration;
private readonly TaskId id;
private readonly Sensor droppedRecordsSensor;
private readonly IAdminClient _adminClient;
private Exception exception = null;

private readonly string logPrefix;
private readonly ILogger log = Logger.GetLogger(typeof(RecordCollector));

private readonly ConcurrentDictionary<TopicPartition, long> collectorsOffsets =
new ConcurrentDictionary<TopicPartition, long>();
private readonly ConcurrentDictionary<TopicPartition, long> collectorsOffsets = new();

private readonly RetryRecordContext retryRecordContext = new RetryRecordContext();
private readonly RetryRecordContext retryRecordContext = new();

public IDictionary<TopicPartition, long> CollectorOffsets => collectorsOffsets.ToDictionary();

public RecordCollector(string logPrefix, IStreamConfig configuration, TaskId id, Sensor droppedRecordsSensor)
public IDictionary<string, (int, DateTime)> cachePartitionsForTopics =
new Dictionary<string, (int, DateTime)>();

public RecordCollector(string logPrefix, IStreamConfig configuration, TaskId id, Sensor droppedRecordsSensor, IAdminClient adminClient)
{
this.logPrefix = $"{logPrefix}";
this.configuration = configuration;
this.id = id;
this.droppedRecordsSensor = droppedRecordsSensor;
_adminClient = adminClient;
}

public void Init(ref IProducer<byte[], byte[]> producer)
Expand Down Expand Up @@ -136,6 +141,8 @@ public void Close()
}
}
}

_adminClient?.Dispose();
}

public void Flush()
Expand Down Expand Up @@ -318,7 +325,12 @@ private void HandleError(DeliveryReport<byte[], byte[]> report)
log.LogDebug(
"{LogPrefix}Record persisted: (timestamp {Timestamp}) topic=[{Topic}] partition=[{Partition}] offset=[{Offset}]",
logPrefix, report.Message.Timestamp.UnixTimestampMs, report.Topic, report.Partition, report.Offset);
collectorsOffsets.AddOrUpdate(report.TopicPartition, report.Offset.Value);
if (collectorsOffsets.ContainsKey(report.TopicPartition) &&
collectorsOffsets[report.TopicPartition] < report.Offset.Value)
collectorsOffsets.TryUpdate(report.TopicPartition, report.Offset.Value,
collectorsOffsets[report.TopicPartition]);
else
collectorsOffsets.TryAdd(report.TopicPartition, report.Offset);
retryRecordContext.AckRecord(report);
}
}
Expand Down Expand Up @@ -416,5 +428,20 @@ private void CheckForException()
throw e;
}
}

public int PartitionsFor(string topic)
{
var adminConfig = configuration.ToAdminConfig("");
var refreshInterval = adminConfig.TopicMetadataRefreshIntervalMs ?? 5 * 60 * 1000;

if (cachePartitionsForTopics.ContainsKey(topic) &&
cachePartitionsForTopics[topic].Item2 + TimeSpan.FromMilliseconds(refreshInterval) > DateTime.Now)
return cachePartitionsForTopics[topic].Item1;

var metadata = _adminClient.GetMetadata(topic, TimeSpan.FromSeconds(5));
var partitionCount = metadata.Topics.FirstOrDefault(t => t.Topic.Equals(topic))!.Partitions.Count;
cachePartitionsForTopics.Add(topic, (partitionCount, DateTime.Now));
return partitionCount;
}
}
}
12 changes: 8 additions & 4 deletions core/KafkaStream.cs
Original file line number Diff line number Diff line change
Expand Up @@ -431,7 +431,8 @@ public async Task StartAsync(CancellationToken? token = null)
Dispose();
});
}
await Task.Factory.StartNew(async () =>

await Task.Factory.StartNew(() =>
{
if (SetState(State.REBALANCING))
{
Expand All @@ -449,7 +450,8 @@ await Task.Factory.StartNew(async () =>
SetState(State.PENDING_SHUTDOWN);
SetState(State.ERROR);
}
return;

return Task.CompletedTask;
}

RunMiddleware(true, true);
Expand All @@ -464,6 +466,8 @@ await Task.Factory.StartNew(async () =>

RunMiddleware(false, true);
}

return Task.CompletedTask;
}, token ?? _cancelSource.Token);


Expand Down Expand Up @@ -625,8 +629,8 @@ private async Task InitializeInternalTopicManagerAsync()
{
// Create internal topics (changelogs & repartition) if need
var adminClientInternalTopicManager = kafkaSupplier.GetAdmin(configuration.ToAdminConfig(StreamThread.GetSharedAdminClientId($"{configuration.ApplicationId.ToLower()}-admin-internal-topic-manager")));
using(var internalTopicManager = new DefaultTopicManager(configuration, adminClientInternalTopicManager))
await InternalTopicManagerUtils.New().CreateInternalTopicsAsync(internalTopicManager, topology.Builder);
using var internalTopicManager = new DefaultTopicManager(configuration, adminClientInternalTopicManager);
await InternalTopicManagerUtils.New().CreateInternalTopicsAsync(internalTopicManager, topology.Builder);
}

private void RunMiddleware(bool before, bool start)
Expand Down
2 changes: 1 addition & 1 deletion core/Metrics/StreamMetric.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ public class StreamMetric
private readonly IMetricValueProvider metricValueProvider;
private readonly MetricConfig config;

private object lastValue;
private object lastValue = -1L;

internal StreamMetric(
MetricName metricName,
Expand Down
1 change: 0 additions & 1 deletion core/Mock/ClusterInMemoryTopologyDriver.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Threading;
using Streamiz.Kafka.Net.Metrics;
using Streamiz.Kafka.Net.Stream.Internal;
Expand Down
17 changes: 13 additions & 4 deletions core/Mock/Kafka/MockCluster.cs
Original file line number Diff line number Diff line change
Expand Up @@ -756,7 +756,13 @@ internal ConsumeResult<byte[], byte[]> Consume(MockConsumer mockConsumer, TimeSp
Topic = p.Topic,
Partition = p.Partition,
Message = new Message<byte[], byte[]>
{Key = record.Key, Value = record.Value}
{
Key = record.Key,
Value = record.Value,
Timestamp = new Timestamp(
record.Timestamp ?? DateTime.Now, TimestampType.NotAvailable),
Headers = record.Headers
}
};
++offset.OffsetConsumed;
log.LogDebug($"Consumer {mockConsumer.Name} consume message from topic/partition {p}, offset {offset.OffsetConsumed}");
Expand Down Expand Up @@ -795,7 +801,7 @@ internal DeliveryReport<byte[], byte[]> Produce(string topic, Message<byte[], by
else
partition = Math.Abs(MurMurHash3.Hash(new MemoryStream(message.Key))) % topics[topic].PartitionNumber;

topics[topic].AddMessage(message.Key, message.Value, partition);
topics[topic].AddMessage(message.Key, message.Value, partition, message.Timestamp.UnixTimestampMs, message.Headers);

r.Message = message;
r.Partition = partition;
Expand All @@ -809,18 +815,21 @@ internal DeliveryReport<byte[], byte[]> Produce(string topic, Message<byte[], by

internal DeliveryReport<byte[], byte[]> Produce(TopicPartition topicPartition, Message<byte[], byte[]> message)
{
if (topicPartition.Partition.Equals(Partition.Any))
return Produce(topicPartition.Topic, message);

DeliveryReport<byte[], byte[]> r = new DeliveryReport<byte[], byte[]>();
r.Status = PersistenceStatus.NotPersisted;
CreateTopic(topicPartition.Topic);
if (topics[topicPartition.Topic].PartitionNumber > topicPartition.Partition)
{
topics[topicPartition.Topic].AddMessage(message.Key, message.Value, topicPartition.Partition);
topics[topicPartition.Topic].AddMessage(message.Key, message.Value, topicPartition.Partition, message.Timestamp.UnixTimestampMs, message.Headers);
r.Status = PersistenceStatus.Persisted;
}
else
{
topics[topicPartition.Topic].CreateNewPartitions(topicPartition.Partition);
topics[topicPartition.Topic].AddMessage(message.Key, message.Value, topicPartition.Partition);
topics[topicPartition.Topic].AddMessage(message.Key, message.Value, topicPartition.Partition, message.Timestamp.UnixTimestampMs, message.Headers);
r.Status = PersistenceStatus.Persisted;
}

Expand Down
14 changes: 10 additions & 4 deletions core/Mock/Kafka/MockPartition.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ namespace Streamiz.Kafka.Net.Mock.Kafka
{
internal class MockPartition
{
private readonly List<(byte[], byte[])> log = new();
private readonly List<(byte[], byte[], long, Headers)> log = new();
private readonly Dictionary<long, long> mappingOffsets = new();

public MockPartition(int indice)
Expand All @@ -21,10 +21,10 @@ public MockPartition(int indice)
public long LowOffset { get; private set; } = Offset.Unset;
public long HighOffset { get; private set; } = Offset.Unset;

internal void AddMessageInLog(byte[] key, byte[] value)
internal void AddMessageInLog(byte[] key, byte[] value, long timestamp, Headers headers)
{
mappingOffsets.Add(Size, log.Count);
log.Add((key, value));
log.Add((key, value, timestamp, headers));
++Size;
UpdateOffset();
}
Expand All @@ -43,7 +43,13 @@ internal TestRecord<byte[], byte[]> GetMessage(long offset)
if (mappingOffsets.ContainsKey(offset))
{
var record = log[(int) mappingOffsets[offset]];
return new TestRecord<byte[], byte[]> {Key = record.Item1, Value = record.Item2};
return new TestRecord<byte[], byte[]>
{
Key = record.Item1,
Value = record.Item2,
Timestamp = record.Item3.FromMilliseconds(),
Headers = record.Item4
};
}

return null;
Expand Down
4 changes: 2 additions & 2 deletions core/Mock/Kafka/MockTopic.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@ public MockTopic(string topic, int part)
public int PartitionNumber { get; private set; }
public IEnumerable<MockPartition> Partitions => partitions.AsReadOnly();

public void AddMessage(byte[] key, byte[] value, int partition)
public void AddMessage(byte[] key, byte[] value, int partition, long timestamp = 0, Headers headers = null)
{
partitions[partition].AddMessageInLog(key, value);
partitions[partition].AddMessageInLog(key, value, timestamp, headers);
}

public TestRecord<byte[], byte[]> GetMessage(int partition, long consumerOffset)
Expand Down
3 changes: 1 addition & 2 deletions core/Mock/MockChangelogRegister.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
using System;
using System.Collections.Generic;
using System.Collections.Generic;
using Confluent.Kafka;
using Streamiz.Kafka.Net.Processors.Internal;

Expand Down
3 changes: 1 addition & 2 deletions core/Mock/MockOffsetCheckpointManager.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
using System;
using System.Collections.Generic;
using System.Collections.Generic;
using Confluent.Kafka;
using Streamiz.Kafka.Net.Processors.Internal;
using Streamiz.Kafka.Net.State;
Expand Down
Loading

0 comments on commit 35c47ac

Please sign in to comment.