Skip to content

Commit

Permalink
Merge branch 'develop' into feature/remote-storage
Browse files Browse the repository at this point in the history
  • Loading branch information
LGouellec authored Jul 30, 2024
2 parents 597bac7 + 9cfac50 commit 40686a3
Show file tree
Hide file tree
Showing 83 changed files with 2,721 additions and 446 deletions.
3 changes: 2 additions & 1 deletion core/Kafka/IRecordCollector.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
using System.Collections.Generic;
using System.Collections.Generic;
using Confluent.Kafka;
using Streamiz.Kafka.Net.SerDes;

Expand All @@ -12,5 +12,6 @@ internal interface IRecordCollector
void Close();
void Send<K, V>(string topic, K key, V value, Headers headers, long timestamp, ISerDes<K> keySerializer, ISerDes<V> valueSerializer);
void Send<K, V>(string topic, K key, V value, Headers headers, int partition, long timestamp, ISerDes<K> keySerializer, ISerDes<V> valueSerializer);
int PartitionsFor(string topic);
}
}
1 change: 0 additions & 1 deletion core/Kafka/Internal/DefaultKafkaClientSupplier.cs
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,6 @@ public IProducer<byte[], byte[]> GetProducer(ProducerConfig config)
producerStatisticsHandler.Publish(statistics);
});
}

return builder.Build();
}

Expand Down
30 changes: 26 additions & 4 deletions core/Kafka/Internal/RecordCollector.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Text;
using Microsoft.Extensions.Logging;
using Streamiz.Kafka.Net.Metrics;
Expand Down Expand Up @@ -81,24 +82,28 @@ public void Clear()
private readonly IStreamConfig configuration;
private readonly TaskId id;
private readonly Sensor droppedRecordsSensor;
private readonly IAdminClient _adminClient;
private Exception exception = null;

private readonly string logPrefix;
private readonly ILogger log = Logger.GetLogger(typeof(RecordCollector));

private readonly ConcurrentDictionary<TopicPartition, long> collectorsOffsets =
new ConcurrentDictionary<TopicPartition, long>();
private readonly ConcurrentDictionary<TopicPartition, long> collectorsOffsets = new();

private readonly RetryRecordContext retryRecordContext = new RetryRecordContext();
private readonly RetryRecordContext retryRecordContext = new();

public IDictionary<TopicPartition, long> CollectorOffsets => collectorsOffsets.ToDictionary();

public RecordCollector(string logPrefix, IStreamConfig configuration, TaskId id, Sensor droppedRecordsSensor)
public IDictionary<string, (int, DateTime)> cachePartitionsForTopics =
new Dictionary<string, (int, DateTime)>();

public RecordCollector(string logPrefix, IStreamConfig configuration, TaskId id, Sensor droppedRecordsSensor, IAdminClient adminClient)
{
this.logPrefix = $"{logPrefix}";
this.configuration = configuration;
this.id = id;
this.droppedRecordsSensor = droppedRecordsSensor;
_adminClient = adminClient;
}

public void Init(ref IProducer<byte[], byte[]> producer)
Expand Down Expand Up @@ -136,6 +141,8 @@ public void Close()
}
}
}

_adminClient?.Dispose();
}

public void Flush()
Expand Down Expand Up @@ -416,5 +423,20 @@ private void CheckForException()
throw e;
}
}

public int PartitionsFor(string topic)
{
var adminConfig = configuration.ToAdminConfig("");
var refreshInterval = adminConfig.TopicMetadataRefreshIntervalMs ?? 5 * 60 * 1000;

if (cachePartitionsForTopics.ContainsKey(topic) &&
cachePartitionsForTopics[topic].Item2 + TimeSpan.FromMilliseconds(refreshInterval) > DateTime.Now)
return cachePartitionsForTopics[topic].Item1;

var metadata = _adminClient.GetMetadata(topic, TimeSpan.FromSeconds(5));
var partitionCount = metadata.Topics.FirstOrDefault(t => t.Topic.Equals(topic))!.Partitions.Count;
cachePartitionsForTopics.Add(topic, (partitionCount, DateTime.Now));
return partitionCount;
}
}
}
2 changes: 1 addition & 1 deletion core/Metrics/StreamMetric.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ public class StreamMetric
private readonly IMetricValueProvider metricValueProvider;
private readonly MetricConfig config;

private object lastValue;
private object lastValue = -1L;

internal StreamMetric(
MetricName metricName,
Expand Down
1 change: 0 additions & 1 deletion core/Mock/ClusterInMemoryTopologyDriver.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Threading;
using Streamiz.Kafka.Net.Metrics;
using Streamiz.Kafka.Net.Stream.Internal;
Expand Down
16 changes: 12 additions & 4 deletions core/Mock/Kafka/MockCluster.cs
Original file line number Diff line number Diff line change
Expand Up @@ -756,7 +756,12 @@ internal ConsumeResult<byte[], byte[]> Consume(MockConsumer mockConsumer, TimeSp
Topic = p.Topic,
Partition = p.Partition,
Message = new Message<byte[], byte[]>
{Key = record.Key, Value = record.Value}
{
Key = record.Key,
Value = record.Value,
Timestamp = new Timestamp(
record.Timestamp ?? DateTime.Now, TimestampType.NotAvailable)
}
};
++offset.OffsetConsumed;
log.LogDebug($"Consumer {mockConsumer.Name} consume message from topic/partition {p}, offset {offset.OffsetConsumed}");
Expand Down Expand Up @@ -795,7 +800,7 @@ internal DeliveryReport<byte[], byte[]> Produce(string topic, Message<byte[], by
else
partition = Math.Abs(MurMurHash3.Hash(new MemoryStream(message.Key))) % topics[topic].PartitionNumber;

topics[topic].AddMessage(message.Key, message.Value, partition);
topics[topic].AddMessage(message.Key, message.Value, partition, message.Timestamp.UnixTimestampMs);

r.Message = message;
r.Partition = partition;
Expand All @@ -809,18 +814,21 @@ internal DeliveryReport<byte[], byte[]> Produce(string topic, Message<byte[], by

internal DeliveryReport<byte[], byte[]> Produce(TopicPartition topicPartition, Message<byte[], byte[]> message)
{
if (topicPartition.Partition.Equals(Partition.Any))
return Produce(topicPartition.Topic, message);

DeliveryReport<byte[], byte[]> r = new DeliveryReport<byte[], byte[]>();
r.Status = PersistenceStatus.NotPersisted;
CreateTopic(topicPartition.Topic);
if (topics[topicPartition.Topic].PartitionNumber > topicPartition.Partition)
{
topics[topicPartition.Topic].AddMessage(message.Key, message.Value, topicPartition.Partition);
topics[topicPartition.Topic].AddMessage(message.Key, message.Value, topicPartition.Partition, message.Timestamp.UnixTimestampMs);
r.Status = PersistenceStatus.Persisted;
}
else
{
topics[topicPartition.Topic].CreateNewPartitions(topicPartition.Partition);
topics[topicPartition.Topic].AddMessage(message.Key, message.Value, topicPartition.Partition);
topics[topicPartition.Topic].AddMessage(message.Key, message.Value, topicPartition.Partition, message.Timestamp.UnixTimestampMs);
r.Status = PersistenceStatus.Persisted;
}

Expand Down
13 changes: 9 additions & 4 deletions core/Mock/Kafka/MockPartition.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ namespace Streamiz.Kafka.Net.Mock.Kafka
{
internal class MockPartition
{
private readonly List<(byte[], byte[])> log = new();
private readonly List<(byte[], byte[], long)> log = new();
private readonly Dictionary<long, long> mappingOffsets = new();

public MockPartition(int indice)
Expand All @@ -21,10 +21,10 @@ public MockPartition(int indice)
public long LowOffset { get; private set; } = Offset.Unset;
public long HighOffset { get; private set; } = Offset.Unset;

internal void AddMessageInLog(byte[] key, byte[] value)
internal void AddMessageInLog(byte[] key, byte[] value, long timestamp)
{
mappingOffsets.Add(Size, log.Count);
log.Add((key, value));
log.Add((key, value, timestamp));
++Size;
UpdateOffset();
}
Expand All @@ -43,7 +43,12 @@ internal TestRecord<byte[], byte[]> GetMessage(long offset)
if (mappingOffsets.ContainsKey(offset))
{
var record = log[(int) mappingOffsets[offset]];
return new TestRecord<byte[], byte[]> {Key = record.Item1, Value = record.Item2};
return new TestRecord<byte[], byte[]>
{
Key = record.Item1,
Value = record.Item2,
Timestamp = record.Item3.FromMilliseconds()
};
}

return null;
Expand Down
4 changes: 2 additions & 2 deletions core/Mock/Kafka/MockTopic.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@ public MockTopic(string topic, int part)
public int PartitionNumber { get; private set; }
public IEnumerable<MockPartition> Partitions => partitions.AsReadOnly();

public void AddMessage(byte[] key, byte[] value, int partition)
public void AddMessage(byte[] key, byte[] value, int partition, long timestamp = 0)
{
partitions[partition].AddMessageInLog(key, value);
partitions[partition].AddMessageInLog(key, value, timestamp);
}

public TestRecord<byte[], byte[]> GetMessage(int partition, long consumerOffset)
Expand Down
3 changes: 1 addition & 2 deletions core/Mock/MockChangelogRegister.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
using System;
using System.Collections.Generic;
using System.Collections.Generic;
using Confluent.Kafka;
using Streamiz.Kafka.Net.Processors.Internal;

Expand Down
3 changes: 1 addition & 2 deletions core/Mock/MockOffsetCheckpointManager.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
using System;
using System.Collections.Generic;
using System.Collections.Generic;
using Confluent.Kafka;
using Streamiz.Kafka.Net.Processors.Internal;
using Streamiz.Kafka.Net.State;
Expand Down
38 changes: 28 additions & 10 deletions core/Mock/Sync/SyncAdminClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
using Confluent.Kafka.Admin;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using Streamiz.Kafka.Net.Mock.Kafka;

Expand All @@ -10,11 +11,13 @@ namespace Streamiz.Kafka.Net.Mock.Sync
internal class SyncAdminClient : BasedAdminClient
{
private readonly SyncProducer producer;
private readonly bool _autoCreateTopic;
private AdminClientConfig config;

public SyncAdminClient(SyncProducer producer)
public SyncAdminClient(SyncProducer producer, bool autoCreateTopic)
{
this.producer = producer;
_autoCreateTopic = autoCreateTopic;
}

internal void UseConfig(AdminClientConfig config)
Expand Down Expand Up @@ -66,20 +69,35 @@ public override void Dispose()
public override Metadata GetMetadata(string topic, TimeSpan timeout)
{
var error = new Error(ErrorCode.NoError);

var brokersMetadata = new List<BrokerMetadata> {
new BrokerMetadata(1, "localhost", 9092)
};

var partitionsMetadata = new List<PartitionMetadata>
var topics = producer.GetAllTopics();
var brokersMetadata = new List<BrokerMetadata>
{
new PartitionMetadata(1, 1, new int[1]{1}, new int[1]{1}, error)
new(1, "localhost", 9092)
};

if (topics.Contains(topic))
{
var partitionsMetadata = new List<PartitionMetadata>
{
new(1, 1, new int[1] { 1 }, new int[1] { 1 }, error)
};

var topicMetadata = new TopicMetadata(topic, partitionsMetadata, error);
var topicMetadata = new TopicMetadata(topic, partitionsMetadata, error);

return new Metadata(brokersMetadata,
new List<TopicMetadata>() { topicMetadata },
1, "localhost");
}

if (_autoCreateTopic)
{
// auto create topic if not exist
producer.CreateTopic(topic);
return GetMetadata(topic, timeout);
}

return new Metadata(brokersMetadata,
new List<TopicMetadata>() { topicMetadata },
new List<TopicMetadata>(),
1, "localhost");
}

Expand Down
4 changes: 2 additions & 2 deletions core/Mock/Sync/SyncKafkaSupplier.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,10 @@ internal class SyncKafkaSupplier : IKafkaSupplier
protected SyncProducer producer = null;
protected SyncAdminClient admin = null;

public SyncKafkaSupplier()
public SyncKafkaSupplier(bool autoCreateTopic = true)
{
producer = new SyncProducer();
admin = new SyncAdminClient(producer);
admin = new SyncAdminClient(producer, autoCreateTopic);
}

public virtual IAdminClient GetAdmin(AdminClientConfig config)
Expand Down
8 changes: 3 additions & 5 deletions core/Mock/TaskSynchronousTopologyDriver.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,10 @@
using Streamiz.Kafka.Net.SerDes;
using System;
using System.Collections.Generic;
using System.ComponentModel;
using System.Data.SqlTypes;
using System.IO;
using System.Linq;
using System.Threading;
using Streamiz.Kafka.Net.Crosscutting;
using Streamiz.Kafka.Net.Kafka.Internal;
using Streamiz.Kafka.Net.Metrics;
using Streamiz.Kafka.Net.Stream.Internal;

Expand Down Expand Up @@ -78,13 +75,13 @@ public TaskSynchronousTopologyDriver(string clientId, InternalTopologyBuilder to
partitionsByTaskId.Add(taskId, new List<TopicPartition> {part});
}

var adminClient = this.supplier.GetAdmin(configuration.ToAdminConfig($"{clientId}-admin"));
ProcessorTopology globalTaskTopology = topologyBuilder.BuildGlobalStateTopology();
hasGlobalTopology = globalTaskTopology != null;
if (hasGlobalTopology)
{
var globalConsumer =
this.supplier.GetGlobalConsumer(configuration.ToGlobalConsumerConfig($"{clientId}-global-consumer"));
var adminClient = this.supplier.GetAdmin(configuration.ToAdminConfig($"{clientId}-admin"));
var stateManager =
new GlobalStateManager(globalConsumer, globalTaskTopology, adminClient, configuration);
globalProcessorContext = new GlobalProcessorContext(configuration, stateManager, metricsRegistry);
Expand All @@ -105,7 +102,8 @@ public TaskSynchronousTopologyDriver(string clientId, InternalTopologyBuilder to
topologyBuilder.BuildTopology(taskId).GetSourceProcessor(requestTopic),
this.supplier.GetProducer(configuration.ToProducerConfig($"ext-thread-producer-{requestTopic}")),
configuration,
metricsRegistry));
metricsRegistry,
adminClient));
}
}

Expand Down
3 changes: 1 addition & 2 deletions core/Mock/TestMultiInputTopic.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@

using Confluent.Kafka;
using Confluent.Kafka;
using Streamiz.Kafka.Net.Mock.Pipes;
using Streamiz.Kafka.Net.SerDes;
using System;
Expand Down
1 change: 0 additions & 1 deletion core/Mock/TestOutputTopic.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
using Streamiz.Kafka.Net.Errors;
using Streamiz.Kafka.Net.Mock.Pipes;
using Streamiz.Kafka.Net.SerDes;
using System;
using System.Collections.Generic;
using System.Linq;
using Microsoft.Extensions.Logging;
Expand Down
6 changes: 2 additions & 4 deletions core/ProcessorContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -68,13 +68,11 @@ public class ProcessorContext
/// </summary>
public virtual string StateDir => $"{Path.Combine(Configuration.StateDir, Configuration.ApplicationId, Id.ToString())}";

internal bool ConfigEnableCache => Configuration.StateStoreCacheMaxBytes > 0;

// FOR TESTING
internal ProcessorContext()
{

}

internal ProcessorContext(AbstractTask task, IStreamConfig configuration, IStateManager stateManager,
StreamMetricsRegistry streamMetricsRegistry)
{
Expand Down
3 changes: 1 addition & 2 deletions core/Processors/AbstractKTableProcessor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,7 @@ public override void Init(ProcessorContext context)
new Change<VS>(sendOldValues ? kv.Value.OldValue.Value : default, kv.Value.NewValue.Value),
kv.Value.NewValue.Timestamp);
},
sendOldValues,
context.ConfigEnableCache);
sendOldValues);
}

if (throwException && (queryableStoreName == null || store == null || tupleForwarder == null))
Expand Down
Loading

0 comments on commit 40686a3

Please sign in to comment.