diff --git a/core/Kafka/IRecordCollector.cs b/core/Kafka/IRecordCollector.cs index f2d5f85a..4f03c8a8 100644 --- a/core/Kafka/IRecordCollector.cs +++ b/core/Kafka/IRecordCollector.cs @@ -1,4 +1,4 @@ -using System.Collections.Generic; +using System.Collections.Generic; using Confluent.Kafka; using Streamiz.Kafka.Net.SerDes; @@ -12,5 +12,6 @@ internal interface IRecordCollector void Close(); void Send(string topic, K key, V value, Headers headers, long timestamp, ISerDes keySerializer, ISerDes valueSerializer); void Send(string topic, K key, V value, Headers headers, int partition, long timestamp, ISerDes keySerializer, ISerDes valueSerializer); + int PartitionsFor(string topic); } } diff --git a/core/Kafka/Internal/DefaultKafkaClientSupplier.cs b/core/Kafka/Internal/DefaultKafkaClientSupplier.cs index f2606f52..df80259a 100644 --- a/core/Kafka/Internal/DefaultKafkaClientSupplier.cs +++ b/core/Kafka/Internal/DefaultKafkaClientSupplier.cs @@ -110,7 +110,6 @@ public IProducer GetProducer(ProducerConfig config) producerStatisticsHandler.Publish(statistics); }); } - return builder.Build(); } diff --git a/core/Kafka/Internal/RecordCollector.cs b/core/Kafka/Internal/RecordCollector.cs index e0be4679..a05d320c 100644 --- a/core/Kafka/Internal/RecordCollector.cs +++ b/core/Kafka/Internal/RecordCollector.cs @@ -7,6 +7,7 @@ using System.Collections.Concurrent; using System.Collections.Generic; using System.Diagnostics; +using System.Linq; using System.Text; using Microsoft.Extensions.Logging; using Streamiz.Kafka.Net.Metrics; @@ -81,24 +82,28 @@ public void Clear() private readonly IStreamConfig configuration; private readonly TaskId id; private readonly Sensor droppedRecordsSensor; + private readonly IAdminClient _adminClient; private Exception exception = null; private readonly string logPrefix; private readonly ILogger log = Logger.GetLogger(typeof(RecordCollector)); - private readonly ConcurrentDictionary collectorsOffsets = - new ConcurrentDictionary(); + private readonly ConcurrentDictionary collectorsOffsets = new(); - private readonly RetryRecordContext retryRecordContext = new RetryRecordContext(); + private readonly RetryRecordContext retryRecordContext = new(); public IDictionary CollectorOffsets => collectorsOffsets.ToDictionary(); - public RecordCollector(string logPrefix, IStreamConfig configuration, TaskId id, Sensor droppedRecordsSensor) + public IDictionary cachePartitionsForTopics = + new Dictionary(); + + public RecordCollector(string logPrefix, IStreamConfig configuration, TaskId id, Sensor droppedRecordsSensor, IAdminClient adminClient) { this.logPrefix = $"{logPrefix}"; this.configuration = configuration; this.id = id; this.droppedRecordsSensor = droppedRecordsSensor; + _adminClient = adminClient; } public void Init(ref IProducer producer) @@ -136,6 +141,8 @@ public void Close() } } } + + _adminClient?.Dispose(); } public void Flush() @@ -416,5 +423,20 @@ private void CheckForException() throw e; } } + + public int PartitionsFor(string topic) + { + var adminConfig = configuration.ToAdminConfig(""); + var refreshInterval = adminConfig.TopicMetadataRefreshIntervalMs ?? 5 * 60 * 1000; + + if (cachePartitionsForTopics.ContainsKey(topic) && + cachePartitionsForTopics[topic].Item2 + TimeSpan.FromMilliseconds(refreshInterval) > DateTime.Now) + return cachePartitionsForTopics[topic].Item1; + + var metadata = _adminClient.GetMetadata(topic, TimeSpan.FromSeconds(5)); + var partitionCount = metadata.Topics.FirstOrDefault(t => t.Topic.Equals(topic))!.Partitions.Count; + cachePartitionsForTopics.Add(topic, (partitionCount, DateTime.Now)); + return partitionCount; + } } } \ No newline at end of file diff --git a/core/Metrics/StreamMetric.cs b/core/Metrics/StreamMetric.cs index f57283cf..23bcef8f 100644 --- a/core/Metrics/StreamMetric.cs +++ b/core/Metrics/StreamMetric.cs @@ -15,7 +15,7 @@ public class StreamMetric private readonly IMetricValueProvider metricValueProvider; private readonly MetricConfig config; - private object lastValue; + private object lastValue = -1L; internal StreamMetric( MetricName metricName, diff --git a/core/Mock/Kafka/MockCluster.cs b/core/Mock/Kafka/MockCluster.cs index 806a59d1..70d559da 100644 --- a/core/Mock/Kafka/MockCluster.cs +++ b/core/Mock/Kafka/MockCluster.cs @@ -756,7 +756,12 @@ internal ConsumeResult Consume(MockConsumer mockConsumer, TimeSp Topic = p.Topic, Partition = p.Partition, Message = new Message - {Key = record.Key, Value = record.Value} + { + Key = record.Key, + Value = record.Value, + Timestamp = new Timestamp( + record.Timestamp ?? DateTime.Now, TimestampType.NotAvailable) + } }; ++offset.OffsetConsumed; log.LogDebug($"Consumer {mockConsumer.Name} consume message from topic/partition {p}, offset {offset.OffsetConsumed}"); @@ -795,7 +800,7 @@ internal DeliveryReport Produce(string topic, Message Produce(string topic, Message Produce(TopicPartition topicPartition, Message message) { + if (topicPartition.Partition.Equals(Partition.Any)) + return Produce(topicPartition.Topic, message); + DeliveryReport r = new DeliveryReport(); r.Status = PersistenceStatus.NotPersisted; CreateTopic(topicPartition.Topic); if (topics[topicPartition.Topic].PartitionNumber > topicPartition.Partition) { - topics[topicPartition.Topic].AddMessage(message.Key, message.Value, topicPartition.Partition); + topics[topicPartition.Topic].AddMessage(message.Key, message.Value, topicPartition.Partition, message.Timestamp.UnixTimestampMs); r.Status = PersistenceStatus.Persisted; } else { topics[topicPartition.Topic].CreateNewPartitions(topicPartition.Partition); - topics[topicPartition.Topic].AddMessage(message.Key, message.Value, topicPartition.Partition); + topics[topicPartition.Topic].AddMessage(message.Key, message.Value, topicPartition.Partition, message.Timestamp.UnixTimestampMs); r.Status = PersistenceStatus.Persisted; } diff --git a/core/Mock/Kafka/MockPartition.cs b/core/Mock/Kafka/MockPartition.cs index a7bd00b9..987ab39f 100644 --- a/core/Mock/Kafka/MockPartition.cs +++ b/core/Mock/Kafka/MockPartition.cs @@ -7,7 +7,7 @@ namespace Streamiz.Kafka.Net.Mock.Kafka { internal class MockPartition { - private readonly List<(byte[], byte[])> log = new(); + private readonly List<(byte[], byte[], long)> log = new(); private readonly Dictionary mappingOffsets = new(); public MockPartition(int indice) @@ -21,10 +21,10 @@ public MockPartition(int indice) public long LowOffset { get; private set; } = Offset.Unset; public long HighOffset { get; private set; } = Offset.Unset; - internal void AddMessageInLog(byte[] key, byte[] value) + internal void AddMessageInLog(byte[] key, byte[] value, long timestamp) { mappingOffsets.Add(Size, log.Count); - log.Add((key, value)); + log.Add((key, value, timestamp)); ++Size; UpdateOffset(); } @@ -43,7 +43,12 @@ internal TestRecord GetMessage(long offset) if (mappingOffsets.ContainsKey(offset)) { var record = log[(int) mappingOffsets[offset]]; - return new TestRecord {Key = record.Item1, Value = record.Item2}; + return new TestRecord + { + Key = record.Item1, + Value = record.Item2, + Timestamp = record.Item3.FromMilliseconds() + }; } return null; diff --git a/core/Mock/Kafka/MockTopic.cs b/core/Mock/Kafka/MockTopic.cs index dfda305f..d96f82fa 100644 --- a/core/Mock/Kafka/MockTopic.cs +++ b/core/Mock/Kafka/MockTopic.cs @@ -23,9 +23,9 @@ public MockTopic(string topic, int part) public int PartitionNumber { get; private set; } public IEnumerable Partitions => partitions.AsReadOnly(); - public void AddMessage(byte[] key, byte[] value, int partition) + public void AddMessage(byte[] key, byte[] value, int partition, long timestamp = 0) { - partitions[partition].AddMessageInLog(key, value); + partitions[partition].AddMessageInLog(key, value, timestamp); } public TestRecord GetMessage(int partition, long consumerOffset) diff --git a/core/Mock/TaskSynchronousTopologyDriver.cs b/core/Mock/TaskSynchronousTopologyDriver.cs index df9b06a4..08200679 100644 --- a/core/Mock/TaskSynchronousTopologyDriver.cs +++ b/core/Mock/TaskSynchronousTopologyDriver.cs @@ -78,13 +78,13 @@ public TaskSynchronousTopologyDriver(string clientId, InternalTopologyBuilder to partitionsByTaskId.Add(taskId, new List {part}); } + var adminClient = this.supplier.GetAdmin(configuration.ToAdminConfig($"{clientId}-admin")); ProcessorTopology globalTaskTopology = topologyBuilder.BuildGlobalStateTopology(); hasGlobalTopology = globalTaskTopology != null; if (hasGlobalTopology) { var globalConsumer = this.supplier.GetGlobalConsumer(configuration.ToGlobalConsumerConfig($"{clientId}-global-consumer")); - var adminClient = this.supplier.GetAdmin(configuration.ToAdminConfig($"{clientId}-admin")); var stateManager = new GlobalStateManager(globalConsumer, globalTaskTopology, adminClient, configuration); globalProcessorContext = new GlobalProcessorContext(configuration, stateManager, metricsRegistry); @@ -105,7 +105,8 @@ public TaskSynchronousTopologyDriver(string clientId, InternalTopologyBuilder to topologyBuilder.BuildTopology(taskId).GetSourceProcessor(requestTopic), this.supplier.GetProducer(configuration.ToProducerConfig($"ext-thread-producer-{requestTopic}")), configuration, - metricsRegistry)); + metricsRegistry, + adminClient)); } } diff --git a/core/Processors/DefaultStreamPartitioner.cs b/core/Processors/DefaultStreamPartitioner.cs new file mode 100644 index 00000000..3345b6ac --- /dev/null +++ b/core/Processors/DefaultStreamPartitioner.cs @@ -0,0 +1,28 @@ +using Confluent.Kafka; + +namespace Streamiz.Kafka.Net.Processors +{ + /// + /// Forward the source partition as the sink partition of the record if there is enough sink partitions, Partition.Any otherwise + /// + /// Key record type + /// Value record type + public class DefaultStreamPartitioner : IStreamPartitioner + { + /// + /// Function used to determine how records are distributed among partitions of the topic + /// + /// Sink topic name + /// record's key + /// record's value + /// record's source partition + /// number partitions of the sink topic + /// Return the source partition as the sink partition of the record if there is enough sink partitions, Partition.Any otherwise + public Partition Partition(string topic, K key, V value, Partition sourcePartition, int numPartitions) + { + if (sourcePartition.Value <= numPartitions - 1) + return sourcePartition; + return Confluent.Kafka.Partition.Any; + } + } +} \ No newline at end of file diff --git a/core/Processors/ExternalProcessorTopologyExecutor.cs b/core/Processors/ExternalProcessorTopologyExecutor.cs index d3f9f6bf..76e681d2 100644 --- a/core/Processors/ExternalProcessorTopologyExecutor.cs +++ b/core/Processors/ExternalProcessorTopologyExecutor.cs @@ -125,7 +125,8 @@ public ExternalProcessorTopologyExecutor( ISourceProcessor sourceProcessor, IProducer producer, IStreamConfig config, - StreamMetricsRegistry streamMetricsRegistry) + StreamMetricsRegistry streamMetricsRegistry, + IAdminClient adminClient) { this.threadId = threadId; this.producerId = producer.Name; @@ -140,7 +141,7 @@ public ExternalProcessorTopologyExecutor( processSensor = TaskMetrics.ProcessSensor(threadId, taskId, streamMetricsRegistry); processLatencySensor = TaskMetrics.ProcessLatencySensor(threadId, taskId, streamMetricsRegistry); - recordCollector = new RecordCollector(logPrefix, config, taskId, droppedRecordsSensor); + recordCollector = new RecordCollector(logPrefix, config, taskId, droppedRecordsSensor, adminClient); recordCollector.Init(ref producer); context = new ProcessorContext(ExternalStreamTask.Create(taskId), config, null, streamMetricsRegistry); diff --git a/core/Processors/ExternalStreamThread.cs b/core/Processors/ExternalStreamThread.cs index 9020636b..480045fb 100644 --- a/core/Processors/ExternalStreamThread.cs +++ b/core/Processors/ExternalStreamThread.cs @@ -372,7 +372,8 @@ private ExternalProcessorTopologyExecutor GetExternalProcessorTopology(string to topology.GetSourceProcessor(topic), producer, configuration, - streamMetricsRegistry); + streamMetricsRegistry, + adminClient); externalProcessorTopologies.Add(topic, externalProcessorTopologyExecutor); return externalProcessorTopologyExecutor; diff --git a/core/Processors/IStreamPartitioner.cs b/core/Processors/IStreamPartitioner.cs new file mode 100644 index 00000000..c29d5318 --- /dev/null +++ b/core/Processors/IStreamPartitioner.cs @@ -0,0 +1,18 @@ +using Confluent.Kafka; + +namespace Streamiz.Kafka.Net.Processors +{ + public interface IStreamPartitioner + { + /// + /// Function used to determine how records are distributed among partitions of the topic + /// + /// Sink topic name + /// record's key + /// record's value + /// record's source partition + /// number partitions of the sink topic + /// Return the destination partition for the current record + Partition Partition(string topic, K key, V value, Partition sourcePartition, int numPartitions); + } +} \ No newline at end of file diff --git a/core/Processors/Internal/NodeFactory.cs b/core/Processors/Internal/NodeFactory.cs index 3527a292..b92abc06 100644 --- a/core/Processors/Internal/NodeFactory.cs +++ b/core/Processors/Internal/NodeFactory.cs @@ -80,21 +80,16 @@ internal class SinkNodeFactory : NodeFactory, ISinkNodeFactory public IRecordTimestampExtractor TimestampExtractor { get; } public ISerDes KeySerdes { get; } public ISerDes ValueSerdes { get; } - public Func ProducedPartitioner { get; } + public IStreamPartitioner ProducedPartitioner { get; } - public string Topic - { - get - { - return TopicExtractor is StaticTopicNameExtractor ? - ((StaticTopicNameExtractor)TopicExtractor).TopicName : - null; - } - } + public string Topic => + (TopicExtractor as StaticTopicNameExtractor)?.TopicName; - public SinkNodeFactory(string name, string[] previous, ITopicNameExtractor topicExtractor, + public SinkNodeFactory(string name, string[] previous, ITopicNameExtractor topicExtractor, IRecordTimestampExtractor timestampExtractor, - ISerDes keySerdes, ISerDes valueSerdes, Func producedPartitioner) + ISerDes keySerdes, + ISerDes valueSerdes, + IStreamPartitioner producedPartitioner) : base(name, previous) { TopicExtractor = topicExtractor; diff --git a/core/Processors/Internal/WrapperStreamPartitioner.cs b/core/Processors/Internal/WrapperStreamPartitioner.cs new file mode 100644 index 00000000..608c8ec2 --- /dev/null +++ b/core/Processors/Internal/WrapperStreamPartitioner.cs @@ -0,0 +1,18 @@ +using System; +using Confluent.Kafka; + +namespace Streamiz.Kafka.Net.Processors.Internal +{ + internal class WrapperStreamPartitioner : IStreamPartitioner + { + private readonly Func _partitioner; + + public WrapperStreamPartitioner(Func partitioner) + { + _partitioner = partitioner; + } + + public Partition Partition(string topic, K key, V value, Partition sourcePartition, int numPartitions) + => _partitioner(topic, key, value, sourcePartition, numPartitions); + } +} \ No newline at end of file diff --git a/core/Processors/SinkProcessor.cs b/core/Processors/SinkProcessor.cs index 63c4a128..9470a9f6 100644 --- a/core/Processors/SinkProcessor.cs +++ b/core/Processors/SinkProcessor.cs @@ -1,4 +1,5 @@ using System; +using Confluent.Kafka; using Microsoft.Extensions.Logging; using Streamiz.Kafka.Net.Errors; using Streamiz.Kafka.Net.Processors.Internal; @@ -15,9 +16,15 @@ internal class SinkProcessor : AbstractProcessor, ISinkProcessor { private ITopicNameExtractor topicNameExtractor; private readonly IRecordTimestampExtractor timestampExtractor; - private readonly Func partitioner; + private readonly IStreamPartitioner partitioner; - internal SinkProcessor(string name, ITopicNameExtractor topicNameExtractor, IRecordTimestampExtractor timestampExtractor, ISerDes keySerdes, ISerDes valueSerdes, Func partitioner = null) + internal SinkProcessor( + string name, + ITopicNameExtractor topicNameExtractor, + IRecordTimestampExtractor timestampExtractor, + ISerDes keySerdes, + ISerDes valueSerdes, + IStreamPartitioner partitioner = null) : base(name, keySerdes, valueSerdes) { this.topicNameExtractor = topicNameExtractor; @@ -62,11 +69,21 @@ public override void Process(K key, V value) { throw new StreamsException($"Invalid (negative) timestamp of {timestamp} for output record <{key}:{value}>."); } - + if (partitioner != null) { - int partition = partitioner.Invoke(topicName, key, value); - Context.RecordCollector.Send(topicName, key, value, Context.RecordContext.Headers, partition, timestamp, KeySerDes, + Partition partition = partitioner.Partition( + topicName, + key, value, + Context.Partition, + Context.RecordCollector.PartitionsFor(topicName)); + + Context.RecordCollector.Send( + topicName, + key, value, + Context.RecordContext.Headers, + partition, + timestamp, KeySerDes, ValueSerDes); } else diff --git a/core/Processors/StreamTask.cs b/core/Processors/StreamTask.cs index 10b004c0..7c2c4096 100644 --- a/core/Processors/StreamTask.cs +++ b/core/Processors/StreamTask.cs @@ -73,7 +73,8 @@ public StreamTask(string threadId, TaskId id, IEnumerable partit } var droppedRecordsSensor = TaskMetrics.DroppedRecordsSensor(this.threadId, Id, this.streamMetricsRegistry); - collector = new RecordCollector(logPrefix, configuration, id, droppedRecordsSensor); + var adminClient = kafkaSupplier.GetAdmin(configuration.ToAdminConfig(this.threadId)); + collector = new RecordCollector(logPrefix, configuration, id, droppedRecordsSensor, adminClient); collector.Init(ref this.producer); Context = new ProcessorContext(this, configuration, stateMgr, streamMetricsRegistry) diff --git a/core/Stream/IKStream.cs b/core/Stream/IKStream.cs index f2972074..04c830a3 100644 --- a/core/Stream/IKStream.cs +++ b/core/Stream/IKStream.cs @@ -7,6 +7,7 @@ using System.Collections; using System.Collections.Generic; using System.Threading.Tasks; +using Confluent.Kafka; using Streamiz.Kafka.Net.Processors.Public; using Streamiz.Kafka.Net.Processors.Internal; @@ -113,6 +114,30 @@ public interface IKStream /// Throw if is null /// /// Throw if is incorrect void To(string topicName, string named = null); + + /// + /// Materialize this stream to a topic using default serializers specified in the config and producer's. + /// The specified topic should be manually created before it is used(i.e., before the Kafka Streams application is + /// started). + /// + /// the topic name + /// The function used to determine how records are distributed among partitions of the topic + /// A config used to name the processor in the topology. Default : null + /// Throw if is null + /// /// Throw if is incorrect + void To(string topicName, Func partitioner, string named = null); + + /// + /// Materialize this stream to a topic using default serializers specified in the config and producer's. + /// The specified topic should be manually created before it is used(i.e., before the Kafka Streams application is + /// started). + /// + /// the topic name + /// The function used to determine how records are distributed among partitions of the topic + /// A config used to name the processor in the topology. Default : null + /// Throw if is null + /// /// Throw if is incorrect + void To(string topicName, IStreamPartitioner partitioner, string named = null); /// /// Materialize this stream to a topic using serializers specified in the method parameters. @@ -126,6 +151,20 @@ public interface IKStream /// Throw if is null /// /// Throw if is incorrect void To(string topicName, ISerDes keySerdes, ISerDes valueSerdes, string named = null); + + /// + /// Materialize this stream to a topic using serializers specified in the method parameters. + /// The specified topic should be manually created before it is used(i.e., before the Kafka Streams application is + /// started). + /// + /// the topic name + /// The function used to determine how records are distributed among partitions of the topic + /// Key serializer + /// Value serializer + /// A config used to name the processor in the topology. Default : null + /// Throw if is null + /// /// Throw if is incorrect + void To(string topicName, Func partitioner, ISerDes keySerdes, ISerDes valueSerdes, string named = null); /// /// Dynamically materialize this stream to topics using default serializers specified in the config and producer's. @@ -134,6 +173,15 @@ public interface IKStream /// Extractor function to determine the name of the Kafka topic to write to for each record /// A config used to name the processor in the topology. Default : null void To(Func topicExtractor, string named = null); + + /// + /// Dynamically materialize this stream to topics using default serializers specified in the config and producer's. + /// The topic names for each record to send to is dynamically determined based on the Func<K, V, IRecordContext, string>. + /// + /// Extractor function to determine the name of the Kafka topic to write to for each record + /// The function used to determine how records are distributed among partitions of the topic + /// A config used to name the processor in the topology. Default : null + void To(Func topicExtractor, Func partitioner, string named = null); /// /// Dynamically materialize this stream to topics using default serializers specified in the config and producer's. @@ -145,6 +193,18 @@ public interface IKStream /// A config used to name the processor in the topology. Default : null void To(Func topicExtractor, ISerDes keySerdes, ISerDes valueSerdes, string named = null); + /// + /// Dynamically materialize this stream to topics using default serializers specified in the config and producer's. + /// The topic names for each record to send to is dynamically determined based on the Func<K, V, IRecordContext, string>. + /// + /// Extractor function to determine the name of the Kafka topic to write to for each record + /// The function used to determine how records are distributed among partitions of the topic + /// Key serializer + /// Value serializer + /// A config used to name the processor in the topology. Default : null + void To(Func topicExtractor, Func partitioner, ISerDes keySerdes, ISerDes valueSerdes, string named = null); + + /// /// Dynamically materialize this stream to topics using default serializers specified in the config and producer's. /// The topic names for each record to send to is dynamically determined based on the }. @@ -152,7 +212,87 @@ public interface IKStream /// The extractor to determine the name of the Kafka topic to write to for each record /// A config used to name the processor in the topology. Default : null void To(ITopicNameExtractor topicExtractor, string named = null); + + /// + /// Dynamically materialize this stream to a topic using serializers specified in the method parameters. + /// The topic names for each record to send to is dynamically determined based on the }. + /// + /// The extractor to determine the name of the Kafka topic to write to for each record + /// Key serializer + /// Value serializer + /// A onfig used to name the processor in the topology. Default : null + void To(ITopicNameExtractor topicExtractor, ISerDes keySerdes, ISerDes valueSerdes, string named = null); + + + /// + /// Dynamically materialize this stream to a topic using serializers specified in the method parameters. + /// The topic names for each record to send to is dynamically determined based on the }. + /// + /// The extractor to determine the name of the Kafka topic to write to for each record + /// The function used to determine how records are distributed among partitions of the topic + /// Key serializer + /// Value serializer + /// A onfig used to name the processor in the topology. Default : null + void To(ITopicNameExtractor topicExtractor, IStreamPartitioner partitioner, ISerDes keySerdes, ISerDes valueSerdes, string named = null); + + /// + /// Dynamically materialize this stream to topics using default serializers specified in the config and producer's. + /// The topic names for each record to send to is dynamically determined based on the Func<K, V, IRecordContext, string>. + /// + /// Extractor function to determine the name of the Kafka topic to write to for each record + /// Extractor function to determine the timestamp of the record stored in the Kafka topic + /// A config used to name the processor in the topology. Default : null + void To(Func topicExtractor, Func recordTimestampExtractor, string named = null); + + /// + /// Dynamically materialize this stream to topics using default serializers specified in the config and producer's. + /// The topic names for each record to send to is dynamically determined based on the Func<K, V, IRecordContext, string>. + /// + /// Extractor function to determine the name of the Kafka topic to write to for each record + /// Extractor function to determine the timestamp of the record stored in the Kafka topic + /// The function used to determine how records are distributed among partitions of the topic + /// A config used to name the processor in the topology. Default : null + void To(Func topicExtractor, Func recordTimestampExtractor, Func partitioner, string named = null); + /// + /// Dynamically materialize this stream to topics using default serializers specified in the config and producer's. + /// The topic names for each record to send to is dynamically determined based on the Func<K, V, IRecordContext, string>. + /// + /// Extractor function to determine the name of the Kafka topic to write to for each record + /// Key serializer + /// Value serializer + /// Extractor function to determine the timestamp of the record stored in the Kafka topic + /// A config used to name the processor in the topology. Default : null + void To(Func topicExtractor, ISerDes keySerdes, ISerDes valueSerdes, Func recordTimestampExtractor, string named = null); + + /// + /// Dynamically materialize this stream to topics using default serializers specified in the config and producer's. + /// The topic names for each record to send to is dynamically determined based on the }. + /// + /// The extractor to determine the name of the Kafka topic to write to for each record + /// Extractor function to determine the timestamp of the record stored in the Kafka topic + /// A config used to name the processor in the topology. Default : null + void To(ITopicNameExtractor topicExtractor, IRecordTimestampExtractor recordTimestampExtractor, string named = null); + + /// + /// Dynamically materialize this stream to topics using default serializers specified in the config and producer's. + /// The topic names for each record to send to is dynamically determined based on the }. + /// + /// The extractor to determine the name of the Kafka topic to write to for each record + /// The function used to determine how records are distributed among partitions of the topic + /// A config used to name the processor in the topology. Default : null + void To(ITopicNameExtractor topicExtractor, IStreamPartitioner partitioner, string named = null); + + /// + /// Dynamically materialize this stream to topics using default serializers specified in the config and producer's. + /// The topic names for each record to send to is dynamically determined based on the }. + /// + /// The extractor to determine the name of the Kafka topic to write to for each record + /// Extractor function to determine the timestamp of the record stored in the Kafka topic + /// The function used to determine how records are distributed among partitions of the topic + /// A config used to name the processor in the topology. Default : null + void To(ITopicNameExtractor topicExtractor, IRecordTimestampExtractor recordTimestampExtractor, IStreamPartitioner partitioner, string named = null); + /// /// Dynamically materialize this stream to a topic using serializers specified in the method parameters. /// The topic names for each record to send to is dynamically determined based on the }. @@ -160,49 +300,22 @@ public interface IKStream /// The extractor to determine the name of the Kafka topic to write to for each record4 /// Key serializer /// Value serializer + /// Extractor function to determine the timestamp of the record stored in the Kafka topic /// A config used to name the processor in the topology. Default : null - void To(ITopicNameExtractor topicExtractor, ISerDes keySerdes, ISerDes valueSerdes, string named = null); - - /// - /// Dynamically materialize this stream to topics using default serializers specified in the config and producer's. - /// The topic names for each record to send to is dynamically determined based on the Func<K, V, IRecordContext, string>. - /// - /// Extractor function to determine the name of the Kafka topic to write to for each record - /// Extractor function to determine the timestamp of the record stored in the Kafka topic - /// A config used to name the processor in the topology. Default : null - void To(Func topicExtractor, Func recordTimestampExtractor, string named = null); - - /// - /// Dynamically materialize this stream to topics using default serializers specified in the config and producer's. - /// The topic names for each record to send to is dynamically determined based on the Func<K, V, IRecordContext, string>. - /// - /// Extractor function to determine the name of the Kafka topic to write to for each record - /// Key serializer - /// Value serializer - /// Extractor function to determine the timestamp of the record stored in the Kafka topic - /// A config used to name the processor in the topology. Default : null - void To(Func topicExtractor, ISerDes keySerdes, ISerDes valueSerdes, Func recordTimestampExtractor, string named = null); - - /// - /// Dynamically materialize this stream to topics using default serializers specified in the config and producer's. - /// The topic names for each record to send to is dynamically determined based on the }. - /// - /// The extractor to determine the name of the Kafka topic to write to for each record - /// Extractor function to determine the timestamp of the record stored in the Kafka topic - /// A config used to name the processor in the topology. Default : null - void To(ITopicNameExtractor topicExtractor, IRecordTimestampExtractor recordTimestampExtractor, string named = null); - - /// - /// Dynamically materialize this stream to a topic using serializers specified in the method parameters. - /// The topic names for each record to send to is dynamically determined based on the }. - /// - /// The extractor to determine the name of the Kafka topic to write to for each record4 - /// Key serializer - /// Value serializer - /// Extractor function to determine the timestamp of the record stored in the Kafka topic - /// A config used to name the processor in the topology. Default : null void To(ITopicNameExtractor topicExtractor, ISerDes keySerdes, ISerDes valueSerdes, IRecordTimestampExtractor recordTimestampExtractor, string named = null); + /// + /// Dynamically materialize this stream to a topic using serializers specified in the method parameters. + /// The topic names for each record to send to is dynamically determined based on the }. + /// + /// The extractor to determine the name of the Kafka topic to write to for each record4 + /// Key serializer + /// Value serializer + /// Extractor function to determine the timestamp of the record stored in the Kafka topic + /// The function used to determine how records are distributed among partitions of the topic + /// A config used to name the processor in the topology. Default : null + void To(ITopicNameExtractor topicExtractor, ISerDes keySerdes, ISerDes valueSerdes, IRecordTimestampExtractor recordTimestampExtractor, IStreamPartitioner partitioner, string named = null); + /// /// Materialize this stream to a topic using and serializers specified in the method parameters. /// The specified topic should be manually created before it is used(i.e., before the Kafka Streams application is @@ -213,6 +326,29 @@ public interface IKStream /// the topic name /// A config used to name the processor in the topology. Default : null void To(string topicName, string named = null) where KS : ISerDes, new() where VS : ISerDes, new(); + + /// + /// Materialize this stream to a topic using and serializers specified in the method parameters. + /// The specified topic should be manually created before it is used(i.e., before the Kafka Streams application is + /// started). + /// + /// New type key serializer + /// New type value serializer + /// the topic name + /// The function used to determine how records are distributed among partitions of the topic + /// A config used to name the processor in the topology. Default : null + void To(string topicName, Func partitioner, string named = null) where KS : ISerDes, new() where VS : ISerDes, new(); + + /// + /// Dynamically materialize this stream to a topic using and serializers specified in the method parameters. + /// The topic names for each record to send to is dynamically determined based on the Func<K, V, IRecordContext, string>. + /// + /// New type key serializer + /// New type value serializer + /// Extractor function to determine the name of the Kafka topic to write to for each record + /// The function used to determine how records are distributed among partitions of the topic + /// A config used to name the processor in the topology. Default : null + void To(Func topicExtractor, Func partitioner, string named = null) where KS : ISerDes, new() where VS : ISerDes, new(); /// /// Dynamically materialize this stream to a topic using and serializers specified in the method parameters. @@ -223,7 +359,62 @@ public interface IKStream /// Extractor function to determine the name of the Kafka topic to write to for each record /// A config used to name the processor in the topology. Default : null void To(Func topicExtractor, string named = null) where KS : ISerDes, new() where VS : ISerDes, new(); + + /// + /// Dynamically materialize this stream to a topic using and serializers specified in the method parameters. + /// The topic names for each record to send to is dynamically determined based on the }. + /// + /// New type key serializer + /// New type value serializer + /// The extractor to determine the name of the Kafka topic to write to for each record + /// A config used to name the processor in the topology. Default : null + void To(ITopicNameExtractor topicExtractor, string named = null) where KS : ISerDes, new() where VS : ISerDes, new(); + + /// + /// Dynamically materialize this stream to a topic using and serializers specified in the method parameters. + /// The topic names for each record to send to is dynamically determined based on the }. + /// + /// New type key serializer + /// New type value serializer + /// The extractor to determine the name of the Kafka topic to write to for each record + /// The function used to determine how records are distributed among partitions of the topic + /// A config used to name the processor in the topology. Default : null + void To(ITopicNameExtractor topicExtractor, IStreamPartitioner partitioner, string named = null) where KS : ISerDes, new() where VS : ISerDes, new(); + /// + /// Dynamically materialize this stream to a topic using and serializers specified in the method parameters. + /// The topic names for each record to send to is dynamically determined based on the Func<K, V, IRecordContext, string>. + /// + /// New type key serializer + /// New type value serializer + /// Extractor function to determine the name of the Kafka topic to write to for each record + /// Extractor function to determine the timestamp of the record stored in the Kafka topic + /// A config used to name the processor in the topology. Default : null + void To(Func topicExtractor, Func recordTimestampExtractor, string named = null) where KS : ISerDes, new() where VS : ISerDes, new(); + + /// + /// Dynamically materialize this stream to a topic using and serializers specified in the method parameters. + /// The topic names for each record to send to is dynamically determined based on the Func<K, V, IRecordContext, string>. + /// + /// New type key serializer + /// New type value serializer + /// Extractor function to determine the name of the Kafka topic to write to for each record + /// Extractor function to determine the timestamp of the record stored in the Kafka topic + /// The function used to determine how records are distributed among partitions of the topic + /// A config used to name the processor in the topology. Default : null + void To(Func topicExtractor, Func recordTimestampExtractor, Func partitioner, string named = null) where KS : ISerDes, new() where VS : ISerDes, new(); + + /// + /// Dynamically materialize this stream to a topic using and serializers specified in the method parameters. + /// The topic names for each record to send to is dynamically determined based on the }. + /// + /// New type key serializer + /// New type value serializer + /// The extractor to determine the name of the Kafka topic to write to for each record + /// Extractor function to determine the timestamp of the record stored in the Kafka topic + /// A config used to name the processor in the topology. Default : null + void To(ITopicNameExtractor topicExtractor, IRecordTimestampExtractor recordTimestampExtractor, string named = null) where KS : ISerDes, new() where VS : ISerDes, new(); + /// /// Dynamically materialize this stream to a topic using and serializers specified in the method parameters. /// The topic names for each record to send to is dynamically determined based on the }. @@ -231,35 +422,15 @@ public interface IKStream /// New type key serializer /// New type value serializer /// The extractor to determine the name of the Kafka topic to write to for each record + /// Extractor function to determine the timestamp of the record stored in the Kafka topic + /// The function used to determine how records are distributed among partitions of the topic /// A config used to name the processor in the topology. Default : null - void To(ITopicNameExtractor topicExtractor, string named = null) where KS : ISerDes, new() where VS : ISerDes, new(); - - /// - /// Dynamically materialize this stream to a topic using and serializers specified in the method parameters. - /// The topic names for each record to send to is dynamically determined based on the Func<K, V, IRecordContext, string>. - /// - /// New type key serializer - /// New type value serializer - /// Extractor function to determine the name of the Kafka topic to write to for each record - /// Extractor function to determine the timestamp of the record stored in the Kafka topic - /// A config used to name the processor in the topology. Default : null - void To(Func topicExtractor, Func recordTimestampExtractor, string named = null) where KS : ISerDes, new() where VS : ISerDes, new(); - - /// - /// Dynamically materialize this stream to a topic using and serializers specified in the method parameters. - /// The topic names for each record to send to is dynamically determined based on the }. - /// - /// New type key serializer - /// New type value serializer - /// The extractor to determine the name of the Kafka topic to write to for each record - /// Extractor function to determine the timestamp of the record stored in the Kafka topic - /// A config used to name the processor in the topology. Default : null - void To(ITopicNameExtractor topicExtractor, IRecordTimestampExtractor recordTimestampExtractor, string named = null) where KS : ISerDes, new() where VS : ISerDes, new(); + void To(ITopicNameExtractor topicExtractor, IRecordTimestampExtractor recordTimestampExtractor, IStreamPartitioner partitioner, string named = null) where KS : ISerDes, new() where VS : ISerDes, new(); #endregion - + #region FlatMap - + /// /// Transform each record of the input stream into zero or more records in the output stream (bot /// can be altered arbitrarily). @@ -1982,12 +2153,12 @@ void ForeachAsync( Func, ExternalContext, Task> asyncAction, RetryPolicy retryPolicy = null, RequestSerDes requestSerDes = null, - string named = null); - + string named = null); + #endregion - + #region Process - + /// /// Process all records in this stream, one record at a time, by applying a processor (provided by the given /// . @@ -2026,12 +2197,12 @@ void ForeachAsync( /// an instance of which contains the processor and a potential state store. Use to build this supplier. /// A config used to name the processor in the topology. Default : null /// The names of the state stores used by the processor. - void Process(ProcessorSupplier processorSupplier, string named = null, params string[] storeNames); - + void Process(ProcessorSupplier processorSupplier, string named = null, params string[] storeNames); + #endregion - + #region Transform - + /// /// Transform each record of the input stream into zero or one record in the output stream (both key and value type /// can be altered arbitrarily). @@ -2076,8 +2247,8 @@ void ForeachAsync( /// the key type of the new stream /// the value type of the new stream /// a that contains more or less records with new key and value (possibly of different type) - IKStream Transform(TransformerSupplier transformerSupplier, string named = null, params string[] storeNames); - + IKStream Transform(TransformerSupplier transformerSupplier, string named = null, params string[] storeNames); + /// /// Transform each record of the input stream into zero or one record in the output stream (only value type, the original key will be through to the upstream processors ). /// A (provided by the given ) is applied to each input record and @@ -2120,20 +2291,20 @@ void ForeachAsync( /// The names of the state stores used by the processor. /// the value type of the new stream /// a that contains more or less records with new key and value (possibly of different type) - IKStream TransformValues(TransformerSupplier transformerSupplier, string named = null, params string[] storeNames); - + IKStream TransformValues(TransformerSupplier transformerSupplier, string named = null, params string[] storeNames); + #endregion - + #region WithRecordTimestamp - - /// - /// Updates the timestamp of the record with one provided by the function. Negative timestamps will be ignored. - /// - /// - /// A config used to name the processor in the topology. Default : null - /// a that has records with timestamp explicitly provided by - IKStream WithRecordTimestamp(Func timestampExtractor, string named = null); - + + /// + /// Updates the timestamp of the record with one provided by the function. Negative timestamps will be ignored. + /// + /// + /// A config used to name the processor in the topology. Default : null + /// a that has records with timestamp explicitly provided by + IKStream WithRecordTimestamp(Func timestampExtractor, string named = null); + #endregion } } diff --git a/core/Stream/Internal/Graph/Nodes/RepartitionNode.cs b/core/Stream/Internal/Graph/Nodes/RepartitionNode.cs index ce4f14e9..292bb265 100644 --- a/core/Stream/Internal/Graph/Nodes/RepartitionNode.cs +++ b/core/Stream/Internal/Graph/Nodes/RepartitionNode.cs @@ -1,4 +1,5 @@ using System; +using Streamiz.Kafka.Net.Processors; using Streamiz.Kafka.Net.Processors.Internal; using Streamiz.Kafka.Net.SerDes; @@ -12,7 +13,7 @@ public RepartitionNode(string streamGraphNode, string sourceName, ProcessorParam } public int? NumberOfPartition { get; set; } - public Func StreamPartitioner { get; set; } + public IStreamPartitioner StreamPartitioner { get; set; } public override void WriteToTopology(InternalTopologyBuilder builder) { diff --git a/core/Stream/Internal/Graph/Nodes/StreamSinkNode.cs b/core/Stream/Internal/Graph/Nodes/StreamSinkNode.cs index 2ed39ac2..0ccb0663 100644 --- a/core/Stream/Internal/Graph/Nodes/StreamSinkNode.cs +++ b/core/Stream/Internal/Graph/Nodes/StreamSinkNode.cs @@ -17,7 +17,10 @@ internal class StreamSinkNode : StreamSinkNode private readonly Produced produced; private readonly IRecordTimestampExtractor timestampExtractor; - public StreamSinkNode(ITopicNameExtractor topicNameExtractor, IRecordTimestampExtractor timestampExtractor, string streamGraphNode, Produced produced) + public StreamSinkNode( + ITopicNameExtractor topicNameExtractor, + IRecordTimestampExtractor timestampExtractor, + string streamGraphNode, Produced produced) : base(streamGraphNode) { this.topicNameExtractor = topicNameExtractor; @@ -27,7 +30,7 @@ public StreamSinkNode(ITopicNameExtractor topicNameExtractor, IRecordTimes public override void WriteToTopology(InternalTopologyBuilder builder) { - builder.AddSinkOperator(topicNameExtractor, timestampExtractor, this.streamGraphNode, produced, ParentNodeNames()); + builder.AddSinkOperator(topicNameExtractor, timestampExtractor, streamGraphNode, produced, ParentNodeNames()); } } } diff --git a/core/Stream/Internal/KStream.cs b/core/Stream/Internal/KStream.cs index b3bb2076..f21ed209 100644 --- a/core/Stream/Internal/KStream.cs +++ b/core/Stream/Internal/KStream.cs @@ -13,6 +13,7 @@ using System.Collections.Generic; using System.Linq; using System.Threading.Tasks; +using Confluent.Kafka; using Streamiz.Kafka.Net.Processors.Public; namespace Streamiz.Kafka.Net.Stream.Internal @@ -178,55 +179,177 @@ public IKStream Repartition(Repartitioned repartitioned = null) public void To(string topicName, string named = null) { - if (topicName == null) + if (string.IsNullOrEmpty(topicName)) { - throw new ArgumentNullException(nameof(topicName)); + throw new ArgumentException("topicName must not be null or empty"); } + To(new StaticTopicNameExtractor(topicName), new DefaultStreamPartitioner(), named); + } + + public void To(string topicName, Func partitioner, string named = null) + { if (string.IsNullOrEmpty(topicName)) { - throw new ArgumentException("topicName must be empty"); + throw new ArgumentException("topicName must not be null or empty"); } - To(new StaticTopicNameExtractor(topicName), named); + To( + new StaticTopicNameExtractor(topicName), + new WrapperStreamPartitioner(partitioner), + named); } - public void To(string topicName, ISerDes keySerdes, ISerDes valueSerdes, string named = null) + public void To(string topicName, IStreamPartitioner partitioner, string named = null) { - if (topicName == null) + if (string.IsNullOrEmpty(topicName)) { - throw new ArgumentNullException(nameof(topicName)); + throw new ArgumentException("topicName must not be null or empty"); } + To( + new StaticTopicNameExtractor(topicName), + partitioner, + named); + } + + public void To(string topicName, ISerDes keySerdes, ISerDes valueSerdes, string named = null) + { + if (string.IsNullOrEmpty(topicName)) + throw new ArgumentException("topicName must not be null or empty"); + + To(new StaticTopicNameExtractor(topicName), new DefaultStreamPartitioner(), keySerdes, valueSerdes, named); + } + + public void To(string topicName, Func partitioner, ISerDes keySerdes, ISerDes valueSerdes, string named = null) + { if (string.IsNullOrEmpty(topicName)) - { throw new ArgumentException("topicName must be empty"); - } - To(new StaticTopicNameExtractor(topicName), keySerdes, valueSerdes, named); + To(new StaticTopicNameExtractor(topicName), + new WrapperStreamPartitioner(partitioner), + keySerdes, valueSerdes, named); } - public void To(ITopicNameExtractor topicExtractor, string named = null) => DoTo(topicExtractor, new DefaultRecordTimestampExtractor(), Produced.Create(KeySerdes, ValueSerdes).WithName(named)); + public void To(Func topicExtractor, + Func partitioner, ISerDes keySerdes, ISerDes valueSerdes, + string named = null) + => To( + new WrapperTopicNameExtractor(topicExtractor), + KeySerdes, ValueSerdes, + new DefaultRecordTimestampExtractor(), + new WrapperStreamPartitioner(partitioner), + named); + + public void To(ITopicNameExtractor topicExtractor, string named = null) + => To( + topicExtractor, + KeySerdes, ValueSerdes, + new DefaultRecordTimestampExtractor(), + new DefaultStreamPartitioner(), + named); + + public void To(ITopicNameExtractor topicExtractor, ISerDes keySerdes, ISerDes valueSerdes, + string named = null) + => To(topicExtractor, + keySerdes, valueSerdes, + new DefaultRecordTimestampExtractor(), + new DefaultStreamPartitioner(), + named); - public void To(ITopicNameExtractor topicExtractor, ISerDes keySerdes, ISerDes valueSerdes, string named = null) - => DoTo(topicExtractor, new DefaultRecordTimestampExtractor(), Produced.Create(keySerdes, valueSerdes).WithName(named)); + public void To(ITopicNameExtractor topicExtractor, IStreamPartitioner partitioner, + ISerDes keySerdes, ISerDes valueSerdes, + string named = null) + => To(topicExtractor, + keySerdes, valueSerdes, + new DefaultRecordTimestampExtractor(), + partitioner, + named); - public void To(Func topicExtractor, string named = null) => To(new WrapperTopicNameExtractor(topicExtractor), named); + public void To(Func topicExtractor, string named = null) + => To(new WrapperTopicNameExtractor(topicExtractor), named); + + public void To(Func topicExtractor, + Func partitioner, string named = null) + => To(new WrapperTopicNameExtractor(topicExtractor), + new WrapperStreamPartitioner(partitioner), + named); public void To(Func topicExtractor, ISerDes keySerdes, ISerDes valueSerdes, string named = null) - => To(new WrapperTopicNameExtractor(topicExtractor), keySerdes, valueSerdes, named); - - public void To(Func topicExtractor, Func recordTimestampExtractor, string named = null) => - DoTo(new WrapperTopicNameExtractor(topicExtractor), new WrapperRecordTimestampExtractor(recordTimestampExtractor), Produced.Create(KeySerdes, ValueSerdes).WithName(named)); - - public void To(Func topicExtractor, ISerDes keySerdes, ISerDes valueSerdes, Func recordTimestampExtractor, string named = null) => - DoTo(new WrapperTopicNameExtractor(topicExtractor), new WrapperRecordTimestampExtractor(recordTimestampExtractor), Produced.Create(keySerdes, valueSerdes).WithName(named)); - - public void To(ITopicNameExtractor topicExtractor, IRecordTimestampExtractor recordTimestampExtractor, string named = null) => - DoTo(topicExtractor, recordTimestampExtractor, Produced.Create(KeySerdes, ValueSerdes).WithName(named)); - - public void To(ITopicNameExtractor topicExtractor, ISerDes keySerdes, ISerDes valueSerdes, IRecordTimestampExtractor recordTimestampExtractor, string named = null) => - DoTo(topicExtractor, recordTimestampExtractor, Produced.Create(keySerdes, valueSerdes).WithName(named)); + => To(new WrapperTopicNameExtractor(topicExtractor), keySerdes, valueSerdes, named); + + public void To(Func topicExtractor, + Func recordTimestampExtractor, string named = null) + => To( + new WrapperTopicNameExtractor(topicExtractor), + KeySerdes, ValueSerdes, + new WrapperRecordTimestampExtractor(recordTimestampExtractor), + new DefaultStreamPartitioner(), + named); + + public void To(Func topicExtractor, + Func recordTimestampExtractor, Func partitioner, + string named = null) + => To( + new WrapperTopicNameExtractor(topicExtractor), + KeySerdes, ValueSerdes, + new WrapperRecordTimestampExtractor(recordTimestampExtractor), + new WrapperStreamPartitioner(partitioner), + named); + + public void To(Func topicExtractor, ISerDes keySerdes, ISerDes valueSerdes, + Func recordTimestampExtractor, string named = null) + => To(new WrapperTopicNameExtractor(topicExtractor), + keySerdes, valueSerdes, + new WrapperRecordTimestampExtractor(recordTimestampExtractor), + new DefaultStreamPartitioner(), + named); + + public void To(ITopicNameExtractor topicExtractor, + IRecordTimestampExtractor recordTimestampExtractor, string named = null) + => To(topicExtractor, + KeySerdes, ValueSerdes, + recordTimestampExtractor, + new DefaultStreamPartitioner(), + named); + + public void To(ITopicNameExtractor topicExtractor, IStreamPartitioner partitioner, + string named = null) + => To(topicExtractor, + KeySerdes, ValueSerdes, + new DefaultRecordTimestampExtractor(), + partitioner, + named); + + public void To(ITopicNameExtractor topicExtractor, IRecordTimestampExtractor recordTimestampExtractor, + IStreamPartitioner partitioner, string named = null) + => To(topicExtractor, + KeySerdes, ValueSerdes, + recordTimestampExtractor, + partitioner, + named); + + public void To(ITopicNameExtractor topicExtractor, ISerDes keySerdes, ISerDes valueSerdes, IRecordTimestampExtractor recordTimestampExtractor, string named = null) + => To( + topicExtractor, + keySerdes, valueSerdes, + recordTimestampExtractor, + new DefaultStreamPartitioner(), + named); + + public void To(ITopicNameExtractor topicExtractor, ISerDes keySerdes, ISerDes valueSerdes, + IRecordTimestampExtractor recordTimestampExtractor, IStreamPartitioner partitioner, string named = null) + => DoTo( + topicExtractor, + recordTimestampExtractor, + partitioner, + Produced.Create(keySerdes, valueSerdes).WithName(named)); + + public void To(Func topicExtractor, Func partitioner, string named = null) where KS : ISerDes, new() where VS : ISerDes, new() + => To( + new WrapperTopicNameExtractor(topicExtractor), + new WrapperStreamPartitioner(partitioner), + named); public void To(Func topicExtractor, string named = null) where KS : ISerDes, new() @@ -238,20 +361,44 @@ public void To(string topicName, string named = null) where VS : ISerDes, new() => To(new StaticTopicNameExtractor(topicName), named); + public void To(string topicName, Func partitioner, string named = null) where KS : ISerDes, new() where VS : ISerDes, new() + => To(new StaticTopicNameExtractor(topicName), + new WrapperStreamPartitioner(partitioner), + named); + public void To(ITopicNameExtractor topicExtractor, string named = null) where KS : ISerDes, new() where VS : ISerDes, new() - => DoTo(topicExtractor, new DefaultRecordTimestampExtractor(), Produced.Create().WithName(named)); - - public void To(Func topicExtractor, Func recordTimestampExtractor, string named = null) - where KS : ISerDes, new() - where VS : ISerDes, new() - => To(new WrapperTopicNameExtractor(topicExtractor), new WrapperRecordTimestampExtractor(recordTimestampExtractor), named); - - public void To(ITopicNameExtractor topicExtractor, IRecordTimestampExtractor recordTimestampExtractor, string named = null) - where KS : ISerDes, new() - where VS : ISerDes, new() - => DoTo(topicExtractor, recordTimestampExtractor, Produced.Create().WithName(named)); + => To(topicExtractor, new DefaultStreamPartitioner(), named); + + public void To(ITopicNameExtractor topicExtractor, IStreamPartitioner partitioner, string named = null) where KS : ISerDes, new() where VS : ISerDes, new() + => To( + topicExtractor, + new DefaultRecordTimestampExtractor(), + partitioner, + named); + + public void To(Func topicExtractor, Func recordTimestampExtractor, string named = null) + where KS : ISerDes, new() + where VS : ISerDes, new() + => To(new WrapperTopicNameExtractor(topicExtractor), new WrapperRecordTimestampExtractor(recordTimestampExtractor), named); + + public void To(Func topicExtractor, Func recordTimestampExtractor, Func partitioner, string named = null) where KS : ISerDes, new() where VS : ISerDes, new() + => To(new WrapperTopicNameExtractor(topicExtractor), new WrapperRecordTimestampExtractor(recordTimestampExtractor), new WrapperStreamPartitioner(partitioner), named); + + public void To(ITopicNameExtractor topicExtractor, + IRecordTimestampExtractor recordTimestampExtractor, string named = null) + where KS : ISerDes, new() + where VS : ISerDes, new() + => To(topicExtractor, recordTimestampExtractor, new DefaultStreamPartitioner(), named); + + public void To(ITopicNameExtractor topicExtractor, IRecordTimestampExtractor recordTimestampExtractor, + IStreamPartitioner partitioner, string named = null) where KS : ISerDes, new() where VS : ISerDes, new() + => DoTo( + topicExtractor, + recordTimestampExtractor, + partitioner, + Produced.Create().WithName(named)); #endregion @@ -1018,10 +1165,10 @@ public IKTable ToTable(Materialized> m tableSource, tableNode, builder); - } - + } + #endregion - + #region WithRecordTimestamp public IKStream WithRecordTimestamp(Func timestampExtractor, string named = null) @@ -1029,22 +1176,22 @@ public IKStream WithRecordTimestamp(Func timestampExtractor, s if (timestampExtractor == null) { throw new ArgumentNullException($"Extractor function can't be null"); - } - + } + String name = new Named(named).OrElseGenerateWithPrefix(builder, KStream.RECORD_TIMESTAMP_NAME); ProcessorParameters processorParameters = new ProcessorParameters(new KStreamTimestampExtractor(timestampExtractor), name); ProcessorGraphNode timestampExtractorNode = new ProcessorGraphNode(name, processorParameters); - builder.AddGraphNode(Node, timestampExtractorNode); - + builder.AddGraphNode(Node, timestampExtractorNode); + return new KStream(name, KeySerdes, ValueSerdes, SetSourceNodes, RepartitionRequired, timestampExtractorNode, builder); } - + #endregion - + #region Private - + private IKStream DoFilter(Func predicate, string named, bool not) { if (predicate == null) @@ -1058,13 +1205,17 @@ private IKStream DoFilter(Func predicate, string named, bool n builder.AddGraphNode(Node, filterProcessorNode); return new KStream(name, KeySerdes, ValueSerdes, SetSourceNodes, RepartitionRequired, filterProcessorNode, builder); - } - - private void DoTo(ITopicNameExtractor topicExtractor, IRecordTimestampExtractor timestampExtractor, Produced produced) + } + + private void DoTo(ITopicNameExtractor topicExtractor, IRecordTimestampExtractor timestampExtractor, IStreamPartitioner partitioner, Produced produced) { string name = new Named(produced.Named).OrElseGenerateWithPrefix(builder, KStream.SINK_NAME); - StreamSinkNode sinkNode = new StreamSinkNode(topicExtractor, timestampExtractor, name, produced); + StreamSinkNode sinkNode = new StreamSinkNode( + topicExtractor, + timestampExtractor, + name, + produced.WithPartitioner(partitioner)); builder.AddGraphNode(Node, sinkNode); } @@ -1342,8 +1493,11 @@ string asyncProcessorName responseSinkProcessorName, responseSourceProcessorName, asyncProcessorName); - } - + } + #endregion + + + } } \ No newline at end of file diff --git a/core/Stream/Internal/Produced.cs b/core/Stream/Internal/Produced.cs index 35f79a94..dcb96a5c 100644 --- a/core/Stream/Internal/Produced.cs +++ b/core/Stream/Internal/Produced.cs @@ -1,4 +1,5 @@ using System; +using Streamiz.Kafka.Net.Processors; using Streamiz.Kafka.Net.SerDes; namespace Streamiz.Kafka.Net.Stream.Internal @@ -9,7 +10,7 @@ internal class Produced internal ISerDes ValueSerdes { get; } internal string Named { get; private set; } - internal Func Partitioner { get; private set; } + internal IStreamPartitioner Partitioner { get; private set; } internal Produced(ISerDes keySerdes, ISerDes valueSerdes) { @@ -35,9 +36,9 @@ internal Produced WithName(string named) return this; } - internal Produced WithPartitioner(Func partitioner) + internal Produced WithPartitioner(IStreamPartitioner partitioner) { - this.Partitioner = partitioner; + Partitioner = partitioner; return this; } } diff --git a/core/Stream/Repartitioned.cs b/core/Stream/Repartitioned.cs index 0ab205f9..acde2cba 100644 --- a/core/Stream/Repartitioned.cs +++ b/core/Stream/Repartitioned.cs @@ -1,4 +1,7 @@ using System; +using Confluent.Kafka; +using Streamiz.Kafka.Net.Processors; +using Streamiz.Kafka.Net.Processors.Internal; using Streamiz.Kafka.Net.SerDes; namespace Streamiz.Kafka.Net.Stream @@ -14,14 +17,14 @@ public class Repartitioned internal ISerDes KeySerdes { get; private set; } internal ISerDes ValueSerdes { get; private set; } internal int? NumberOfPartition { get; private set; } - internal Func StreamPartitioner { get; private set; } + internal IStreamPartitioner StreamPartitioner { get; private set; } private Repartitioned( string named, ISerDes keySerdes, ISerDes valueSerdes, int? numberOfPartition, - Func streamPartitioner) + IStreamPartitioner streamPartitioner) { Named = named; KeySerdes = keySerdes; @@ -72,7 +75,16 @@ public static Repartitioned Create() /// /// the function used to determine how records are distributed among partitions of the topic /// A new instance of - public static Repartitioned Partitioner(Func streamPartitioner) => new(null, null, null, null, streamPartitioner); + public static Repartitioned Partitioner(IStreamPartitioner streamPartitioner) => new(null, null, null, null, streamPartitioner); + + /// + /// Create a instance with provided partitioner. + /// + /// the function used to determine how records are distributed among partitions of the topic + /// A new instance of + public static Repartitioned Partitioner(Func streamPartitioner) + => Partitioner(new WrapperStreamPartitioner(streamPartitioner)); + #endregion @@ -127,12 +139,21 @@ public Repartitioned WithNumberOfPartitions(int numberOfPartitions) /// /// Function to determine the partition where the repartition record will be persist /// this - public Repartitioned WithStreamPartitioner(Func streamPartitioner) + public Repartitioned WithStreamPartitioner(IStreamPartitioner streamPartitioner) { StreamPartitioner = streamPartitioner; return this; } - + + /// + /// Set the stream partitioner + /// + /// Function to determine the partition where the repartition record will be persist + /// this + public Repartitioned WithStreamPartitioner(Func streamPartitioner) + => WithStreamPartitioner(new WrapperStreamPartitioner(streamPartitioner)); + + #endregion } } \ No newline at end of file diff --git a/samples/sample-stream/Program.cs b/samples/sample-stream/Program.cs index 59468230..19060913 100644 --- a/samples/sample-stream/Program.cs +++ b/samples/sample-stream/Program.cs @@ -7,9 +7,7 @@ using Streamiz.Kafka.Net.Metrics; using Streamiz.Kafka.Net.Metrics.Prometheus; using Streamiz.Kafka.Net.Stream; -using System.Collections.Generic; using Microsoft.Extensions.Logging; -using Streamiz.Kafka.Net.Metrics.Prometheus; using Streamiz.Kafka.Net.Table; namespace sample_stream @@ -22,9 +20,6 @@ public static async Task Main(string[] args) ApplicationId = $"test-app", BootstrapServers = "localhost:9092", AutoOffsetReset = AutoOffsetReset.Earliest, - PartitionAssignmentStrategy = PartitionAssignmentStrategy.RoundRobin, - Partitioner = Partitioner.ConsistentRandom, - Debug = "broker,topic,msg", Logger = LoggerFactory.Create((b) => { b.AddConsole(); @@ -49,7 +44,7 @@ private static Topology BuildTopology() TimeSpan windowSize = TimeSpan.FromHours(1); var builder = new StreamBuilder(); - builder.Stream("input") + /*builder.Stream("input") .GroupByKey() .WindowedBy(TumblingWindowOptions.Of(windowSize)) .Count(RocksDbWindows.As("count-store") @@ -60,7 +55,11 @@ private static Topology BuildTopology() .Map((k,v) => new KeyValuePair(k.ToString(), v.ToString())) .To("output", new StringSerDes(), - new StringSerDes()); + new StringSerDes());*/ + + builder.Stream("input") + .To( + "output");//, (s, s1, arg3, arg4) => new Partition(0)); return builder.Build(); diff --git a/test/Streamiz.Kafka.Net.Tests/Helpers/ExecutorService.cs b/test/Streamiz.Kafka.Net.Tests/Helpers/ExecutorService.cs deleted file mode 100644 index f7684a5b..00000000 --- a/test/Streamiz.Kafka.Net.Tests/Helpers/ExecutorService.cs +++ /dev/null @@ -1,72 +0,0 @@ -using System; -using System.Threading; -using System.Threading.Tasks; -using NUnit.Framework; - -namespace Streamiz.Kafka.Net.Tests.Helpers -{ - internal class ExecutorService : IDisposable - { - private Action action; - private readonly TimeSpan timeout; - private Thread _thread; - private bool disposed = false; - - private ExecutorService(Action action, TimeSpan timeout) - { - this.action = action; - this.timeout = timeout; - _thread = new Thread(() => - { - DateTime dt = DateTime.Now; - bool @continue = true; - try - { - while (@continue) - { - try - { - action(); - @continue = false; - } - catch (AssertionException e) - { - if (dt.Add(timeout) < DateTime.Now) - { - @continue = false; - throw; - } - } - } - } - catch (Exception e) - { - Console.WriteLine(e.Message); - throw; - } - }); - } - - public static ExecutorService NewSingleThreadExecutor(Action action, TimeSpan timeout) - { - return new ExecutorService(action, timeout); - } - - public void Start() - { - _thread.Start(); - } - - public void Stop() - { - _thread.Join(); - disposed = true; - } - - public void Dispose() - { - if(!disposed) - Stop(); - } - } -} diff --git a/test/Streamiz.Kafka.Net.Tests/Metrics/StateStoreMetricTests.cs b/test/Streamiz.Kafka.Net.Tests/Metrics/StateStoreMetricTests.cs index fb4ca9e5..9fa7a3f2 100644 --- a/test/Streamiz.Kafka.Net.Tests/Metrics/StateStoreMetricTests.cs +++ b/test/Streamiz.Kafka.Net.Tests/Metrics/StateStoreMetricTests.cs @@ -241,8 +241,8 @@ public void KeyValueStoreMetricsTest() StreamMetricsRegistry.LATENCY_SUFFIX + StreamMetricsRegistry.MAX_SUFFIX, StreamMetricsRegistry.STATE_STORE_LEVEL_GROUP); - Assert.IsTrue((double)latencyAvg.Value > 0); - Assert.IsTrue((double)latencyMax.Value > 0); + Assert.IsTrue((double)latencyAvg.Value >= 0); + Assert.IsTrue((double)latencyMax.Value >= 0); for(int i = 0 ; i < nbMessage2 ; ++i) meteredKeyValueStore.Put($"test{i}", $"test{i}"); @@ -346,8 +346,8 @@ private void AssertAvgAndMaxLatency(string sensorName) sensorName, StreamMetricsRegistry.LATENCY_SUFFIX + StreamMetricsRegistry.MAX_SUFFIX, StreamMetricsRegistry.STATE_STORE_LEVEL_GROUP); - Assert.IsTrue((double)latencyAvg.Value > 0); - Assert.IsTrue((double)latencyMax.Value > 0); + Assert.IsTrue((double)latencyAvg.Value >= 0); + Assert.IsTrue((double)latencyMax.Value >= 0); } private StreamMetric GetSensorMetric(string sensorName, string metricSuffix, string group) diff --git a/test/Streamiz.Kafka.Net.Tests/Metrics/TaskMetricsTests.cs b/test/Streamiz.Kafka.Net.Tests/Metrics/TaskMetricsTests.cs index 09218b55..75136493 100644 --- a/test/Streamiz.Kafka.Net.Tests/Metrics/TaskMetricsTests.cs +++ b/test/Streamiz.Kafka.Net.Tests/Metrics/TaskMetricsTests.cs @@ -169,7 +169,7 @@ public void TaskMetricsTest() Assert.IsTrue( (double)processLatency.Metrics[MetricName.NameAndGroup( TaskMetrics.PROCESS_LATENCY + StreamMetricsRegistry.MAX_SUFFIX, - StreamMetricsRegistry.TASK_LEVEL_GROUP)].Value > 0d); + StreamMetricsRegistry.TASK_LEVEL_GROUP)].Value >= 0d); var commitSensor = sensors.FirstOrDefault(s => s.Name.Equals(GetSensorName(TaskMetrics.COMMIT))); Assert.AreEqual(2, commitSensor.Metrics.Count()); diff --git a/test/Streamiz.Kafka.Net.Tests/Metrics/ThreadMetricsTests.cs b/test/Streamiz.Kafka.Net.Tests/Metrics/ThreadMetricsTests.cs index d831c293..30f813ad 100644 --- a/test/Streamiz.Kafka.Net.Tests/Metrics/ThreadMetricsTests.cs +++ b/test/Streamiz.Kafka.Net.Tests/Metrics/ThreadMetricsTests.cs @@ -172,11 +172,11 @@ public void ThreadMetricsTest() Assert.IsTrue( (double)commitSensor.Metrics[MetricName.NameAndGroup( ThreadMetrics.COMMIT + StreamMetricsRegistry.LATENCY_SUFFIX + StreamMetricsRegistry.AVG_SUFFIX, - StreamMetricsRegistry.THREAD_LEVEL_GROUP)].Value > 0d); + StreamMetricsRegistry.THREAD_LEVEL_GROUP)].Value >= 0d); Assert.IsTrue( (double)commitSensor.Metrics[MetricName.NameAndGroup( ThreadMetrics.COMMIT + StreamMetricsRegistry.LATENCY_SUFFIX + StreamMetricsRegistry.MAX_SUFFIX, - StreamMetricsRegistry.THREAD_LEVEL_GROUP)].Value > 0d); + StreamMetricsRegistry.THREAD_LEVEL_GROUP)].Value >= 0d); var pollSensor = sensors.FirstOrDefault(s => s.Name.Equals(GetSensorName(ThreadMetrics.POLL))); Assert.AreEqual(4, pollSensor.Metrics.Count()); @@ -191,11 +191,11 @@ public void ThreadMetricsTest() Assert.IsTrue( (double)pollSensor.Metrics[MetricName.NameAndGroup( ThreadMetrics.POLL + StreamMetricsRegistry.LATENCY_SUFFIX + StreamMetricsRegistry.AVG_SUFFIX, - StreamMetricsRegistry.THREAD_LEVEL_GROUP)].Value > 0d); + StreamMetricsRegistry.THREAD_LEVEL_GROUP)].Value >= 0d); Assert.IsTrue( (double)pollSensor.Metrics[MetricName.NameAndGroup( ThreadMetrics.POLL + StreamMetricsRegistry.LATENCY_SUFFIX + StreamMetricsRegistry.MAX_SUFFIX, - StreamMetricsRegistry.THREAD_LEVEL_GROUP)].Value > 0d); + StreamMetricsRegistry.THREAD_LEVEL_GROUP)].Value >= 0d); var pollRecordsSensor = sensors.FirstOrDefault(s => s.Name.Equals(GetSensorName(ThreadMetrics.POLL + StreamMetricsRegistry.RECORDS_SUFFIX))); Assert.AreEqual(2, pollRecordsSensor.Metrics.Count()); @@ -235,11 +235,11 @@ public void ThreadMetricsTest() Assert.IsTrue( (double)processLatencySensor.Metrics[MetricName.NameAndGroup( ThreadMetrics.PROCESS + StreamMetricsRegistry.LATENCY_SUFFIX + StreamMetricsRegistry.AVG_SUFFIX, - StreamMetricsRegistry.THREAD_LEVEL_GROUP)].Value > 0d); + StreamMetricsRegistry.THREAD_LEVEL_GROUP)].Value >= 0d); Assert.IsTrue( (double)processLatencySensor.Metrics[MetricName.NameAndGroup( ThreadMetrics.PROCESS + StreamMetricsRegistry.LATENCY_SUFFIX + StreamMetricsRegistry.MAX_SUFFIX, - StreamMetricsRegistry.THREAD_LEVEL_GROUP)].Value > 0d); + StreamMetricsRegistry.THREAD_LEVEL_GROUP)].Value >= 0d); // Punctuate sensor var punctuateSensor = sensors.FirstOrDefault(s => s.Name.Equals(GetSensorName(ThreadMetrics.PUNCTUATE))); @@ -255,11 +255,11 @@ public void ThreadMetricsTest() Assert.IsTrue( (double)punctuateSensor.Metrics[MetricName.NameAndGroup( ThreadMetrics.PUNCTUATE + StreamMetricsRegistry.LATENCY_SUFFIX + StreamMetricsRegistry.AVG_SUFFIX, - StreamMetricsRegistry.THREAD_LEVEL_GROUP)].Value > 0d); + StreamMetricsRegistry.THREAD_LEVEL_GROUP)].Value >= 0d); Assert.IsTrue( (double)punctuateSensor.Metrics[MetricName.NameAndGroup( ThreadMetrics.PUNCTUATE + StreamMetricsRegistry.LATENCY_SUFFIX + StreamMetricsRegistry.MAX_SUFFIX, - StreamMetricsRegistry.THREAD_LEVEL_GROUP)].Value > 0d); + StreamMetricsRegistry.THREAD_LEVEL_GROUP)].Value >= 0d); // ratio sensors var processRatioSensor = sensors.FirstOrDefault(s => s.Name.Equals(GetSensorName(ThreadMetrics.PROCESS + StreamMetricsRegistry.RATIO_SUFFIX))); diff --git a/test/Streamiz.Kafka.Net.Tests/Private/RecordCollectorTests.cs b/test/Streamiz.Kafka.Net.Tests/Private/RecordCollectorTests.cs index 6bc2c6f8..827c8209 100644 --- a/test/Streamiz.Kafka.Net.Tests/Private/RecordCollectorTests.cs +++ b/test/Streamiz.Kafka.Net.Tests/Private/RecordCollectorTests.cs @@ -1,6 +1,7 @@ using System; using Confluent.Kafka; using NUnit.Framework; +using Moq; using Streamiz.Kafka.Net.Crosscutting; using Streamiz.Kafka.Net.Kafka.Internal; using Streamiz.Kafka.Net.Metrics.Internal; @@ -21,11 +22,13 @@ public void Init() config = new StreamConfig(); config.ApplicationId = "collector-unit-test"; config.ProductionExceptionHandler = (_) => ProductionExceptionHandlerResponse.RETRY; + var client = new Mock(); collector = new RecordCollector( "test-collector", config, new TaskId {Id = 0, Partition = 0}, - NoRunnableSensor.Empty); + NoRunnableSensor.Empty, + client.Object); } [TearDown] diff --git a/test/Streamiz.Kafka.Net.Tests/Processors/KStreamPassThoughTests.cs b/test/Streamiz.Kafka.Net.Tests/Processors/KStreamPassThoughTests.cs index a66faf25..9a84b876 100644 --- a/test/Streamiz.Kafka.Net.Tests/Processors/KStreamPassThoughTests.cs +++ b/test/Streamiz.Kafka.Net.Tests/Processors/KStreamPassThoughTests.cs @@ -73,7 +73,7 @@ public void ShouldNotAllowNullTopicDest() var builder = new StreamBuilder(); var stream = builder.Stream("topic"); string topicDes = null; - Assert.Throws(() => stream.To(topicDes)); + Assert.Throws(() => stream.To(topicDes)); } [Test] diff --git a/test/Streamiz.Kafka.Net.Tests/Processors/KStreamRepartitionTests.cs b/test/Streamiz.Kafka.Net.Tests/Processors/KStreamRepartitionTests.cs index f4817ba0..d26c2ea1 100644 --- a/test/Streamiz.Kafka.Net.Tests/Processors/KStreamRepartitionTests.cs +++ b/test/Streamiz.Kafka.Net.Tests/Processors/KStreamRepartitionTests.cs @@ -89,7 +89,7 @@ public void RepartitionWithPartitioner() { builder .Stream("topic") - .Repartition(Repartitioned.Empty().WithStreamPartitioner((t,k,v) => 0)) + .Repartition(Repartitioned.Empty().WithStreamPartitioner((t,k,v,_, c) => 0)) .To("output"); }, TopologyTestDriver.Mode.ASYNC_CLUSTER_IN_MEMORY, 10); diff --git a/test/Streamiz.Kafka.Net.Tests/Processors/KStreamToTests.cs b/test/Streamiz.Kafka.Net.Tests/Processors/KStreamToTests.cs new file mode 100644 index 00000000..d3e73c98 --- /dev/null +++ b/test/Streamiz.Kafka.Net.Tests/Processors/KStreamToTests.cs @@ -0,0 +1,1031 @@ +using System; +using System.Linq; +using Confluent.Kafka; +using NUnit.Framework; +using Streamiz.Kafka.Net.Crosscutting; +using Streamiz.Kafka.Net.Mock; +using Streamiz.Kafka.Net.Mock.Kafka; +using Streamiz.Kafka.Net.Processors; +using Streamiz.Kafka.Net.SerDes; +using Streamiz.Kafka.Net.Stream; + +namespace Streamiz.Kafka.Net.Tests.Processors; + +public class KStreamToTests +{ + private class MyStreamPartitioner : IStreamPartitioner + { + public Partition Partition(string topic, string key, string value, Partition sourcePartition, int numPartitions) + { + switch (key) + { + case "order": + return new Partition(0); + case "sale": + return new Partition(1); + default: + return Confluent.Kafka.Partition.Any; + } + } + } + + private class MyTopicNameExtractor : ITopicNameExtractor + { + public string Extract(string key, string value, IRecordContext recordContext) + => "output"; + } + + private class MyRecordTimestampExtractor : IRecordTimestampExtractor + { + private readonly long _ts; + + public MyRecordTimestampExtractor(long ts) + { + _ts = ts; + } + + public long Extract(string key, string value, IRecordContext recordContext) + => _ts; + } + + + [Test] + public void StreamToLambdaCustomPartitionerTest() + { + var config = new StreamConfig + { + ApplicationId = "test-stream-table-left-join" + }; + + StreamBuilder builder = new StreamBuilder(); + + builder + .Stream("stream") + .MapValues((v) => v.ToUpper()) + .To("output", + (_, _, _,_, _) => new Partition(0)); + + Topology t = builder.Build(); + + var mockSupplier = new MockKafkaSupplier(4); + mockSupplier.CreateTopic("output"); + mockSupplier.CreateTopic("stream"); + + using var driver = new TopologyTestDriver(t.Builder, config, TopologyTestDriver.Mode.ASYNC_CLUSTER_IN_MEMORY, mockSupplier); + var inputTopic = driver.CreateInputTopic("stream"); + var outputTopic = driver.CreateOuputTopic("output"); + inputTopic.PipeInput("test1", "test1"); + inputTopic.PipeInput("test2", "test2"); + inputTopic.PipeInput("test3", "test3"); + var record = IntegrationTestUtils.WaitUntilMinKeyValueRecordsReceived(outputTopic, 3); + Assert.IsNotNull(record); + Assert.AreEqual(3, record.Count); + Assert.AreEqual("test1", record[0].Message.Key); + Assert.AreEqual("TEST1", record[0].Message.Value); + Assert.AreEqual(0, record[0].Partition.Value); + + + Assert.AreEqual("test2", record[1].Message.Key); + Assert.AreEqual("TEST2", record[1].Message.Value); + Assert.AreEqual(0, record[1].Partition.Value); + + + Assert.AreEqual("test3", record[2].Message.Key); + Assert.AreEqual("TEST3", record[2].Message.Value); + Assert.AreEqual(0, record[2].Partition.Value); + } + + [Test] + public void StreamToLambdaCustomPartitionerWithSerdesTest() + { + var config = new StreamConfig + { + ApplicationId = "test-stream-table-left-join" + }; + + StreamBuilder builder = new StreamBuilder(); + + builder + .Stream("stream") + .MapValues((v) => v.ToUpper()) + .To("output", + (_, _, _,_, _) => new Partition(0), + new StringSerDes(), + new StringSerDes()); + + Topology t = builder.Build(); + + var mockSupplier = new MockKafkaSupplier(4); + mockSupplier.CreateTopic("output"); + mockSupplier.CreateTopic("stream"); + + using var driver = new TopologyTestDriver(t.Builder, config, TopologyTestDriver.Mode.ASYNC_CLUSTER_IN_MEMORY, mockSupplier); + var inputTopic = driver.CreateInputTopic("stream"); + var outputTopic = driver.CreateOuputTopic("output"); + inputTopic.PipeInput("test1", "test1"); + inputTopic.PipeInput("test2", "test2"); + inputTopic.PipeInput("test3", "test3"); + var record = IntegrationTestUtils.WaitUntilMinKeyValueRecordsReceived(outputTopic, 3); + Assert.IsNotNull(record); + Assert.AreEqual(3, record.Count); + Assert.AreEqual("test1", record[0].Message.Key); + Assert.AreEqual("TEST1", record[0].Message.Value); + Assert.AreEqual(0, record[0].Partition.Value); + + + Assert.AreEqual("test2", record[1].Message.Key); + Assert.AreEqual("TEST2", record[1].Message.Value); + Assert.AreEqual(0, record[1].Partition.Value); + + + Assert.AreEqual("test3", record[2].Message.Key); + Assert.AreEqual("TEST3", record[2].Message.Value); + Assert.AreEqual(0, record[2].Partition.Value); + } + + [Test] + public void StreamToLambdaCustomPartitionerWithSerdesTypeTest() + { + var config = new StreamConfig + { + ApplicationId = "test-stream-table-left-join" + }; + + StreamBuilder builder = new StreamBuilder(); + + builder + .Stream("stream") + .MapValues((v) => v.ToUpper()) + .To("output", + (_, _,_, _, _) => new Partition(0)); + + Topology t = builder.Build(); + + var mockSupplier = new MockKafkaSupplier(4); + mockSupplier.CreateTopic("output"); + mockSupplier.CreateTopic("stream"); + + using var driver = new TopologyTestDriver(t.Builder, config, TopologyTestDriver.Mode.ASYNC_CLUSTER_IN_MEMORY, mockSupplier); + var inputTopic = driver.CreateInputTopic("stream"); + var outputTopic = driver.CreateOuputTopic("output"); + inputTopic.PipeInput("test1", "test1"); + inputTopic.PipeInput("test2", "test2"); + inputTopic.PipeInput("test3", "test3"); + var record = IntegrationTestUtils.WaitUntilMinKeyValueRecordsReceived(outputTopic, 3); + Assert.IsNotNull(record); + Assert.AreEqual(3, record.Count); + Assert.AreEqual("test1", record[0].Message.Key); + Assert.AreEqual("TEST1", record[0].Message.Value); + Assert.AreEqual(0, record[0].Partition.Value); + + + Assert.AreEqual("test2", record[1].Message.Key); + Assert.AreEqual("TEST2", record[1].Message.Value); + Assert.AreEqual(0, record[1].Partition.Value); + + + Assert.AreEqual("test3", record[2].Message.Key); + Assert.AreEqual("TEST3", record[2].Message.Value); + Assert.AreEqual(0, record[2].Partition.Value); + } + + [Test] + public void StreamToTopicExtractorLambdaCustomPartitionerWithSerdesTest() + { + var config = new StreamConfig + { + ApplicationId = "test-stream-table-left-join" + }; + + StreamBuilder builder = new StreamBuilder(); + + builder + .Stream("stream") + .MapValues((v) => v.ToUpper()) + .To((_,_,_) => "output", + (_, _,_, _, _) => new Partition(0), + new StringSerDes(), + new StringSerDes()); + + Topology t = builder.Build(); + + var mockSupplier = new MockKafkaSupplier(4); + mockSupplier.CreateTopic("output"); + mockSupplier.CreateTopic("stream"); + + using var driver = new TopologyTestDriver(t.Builder, config, TopologyTestDriver.Mode.ASYNC_CLUSTER_IN_MEMORY, mockSupplier); + var inputTopic = driver.CreateInputTopic("stream"); + var outputTopic = driver.CreateOuputTopic("output"); + inputTopic.PipeInput("test1", "test1"); + inputTopic.PipeInput("test2", "test2"); + inputTopic.PipeInput("test3", "test3"); + var record = IntegrationTestUtils.WaitUntilMinKeyValueRecordsReceived(outputTopic, 3); + Assert.IsNotNull(record); + Assert.AreEqual(3, record.Count); + Assert.AreEqual("test1", record[0].Message.Key); + Assert.AreEqual("TEST1", record[0].Message.Value); + Assert.AreEqual(0, record[0].Partition.Value); + + + Assert.AreEqual("test2", record[1].Message.Key); + Assert.AreEqual("TEST2", record[1].Message.Value); + Assert.AreEqual(0, record[1].Partition.Value); + + + Assert.AreEqual("test3", record[2].Message.Key); + Assert.AreEqual("TEST3", record[2].Message.Value); + Assert.AreEqual(0, record[2].Partition.Value); + } + + [Test] + public void StreamToTopicExtractorLambdaCustomPartitionerWithSerdesTypeTest() + { + var config = new StreamConfig + { + ApplicationId = "test-stream-table-left-join" + }; + + StreamBuilder builder = new StreamBuilder(); + + builder + .Stream("stream") + .MapValues((v) => v.ToUpper()) + .To((_,_,_) => "output", + (_, _,_, _, _) => new Partition(0)); + + Topology t = builder.Build(); + + var mockSupplier = new MockKafkaSupplier(4); + mockSupplier.CreateTopic("output"); + mockSupplier.CreateTopic("stream"); + + using var driver = new TopologyTestDriver(t.Builder, config, TopologyTestDriver.Mode.ASYNC_CLUSTER_IN_MEMORY, mockSupplier); + var inputTopic = driver.CreateInputTopic("stream"); + var outputTopic = driver.CreateOuputTopic("output"); + inputTopic.PipeInput("test1", "test1"); + inputTopic.PipeInput("test2", "test2"); + inputTopic.PipeInput("test3", "test3"); + var record = IntegrationTestUtils.WaitUntilMinKeyValueRecordsReceived(outputTopic, 3); + Assert.IsNotNull(record); + Assert.AreEqual(3, record.Count); + Assert.AreEqual("test1", record[0].Message.Key); + Assert.AreEqual("TEST1", record[0].Message.Value); + Assert.AreEqual(0, record[0].Partition.Value); + + + Assert.AreEqual("test2", record[1].Message.Key); + Assert.AreEqual("TEST2", record[1].Message.Value); + Assert.AreEqual(0, record[1].Partition.Value); + + + Assert.AreEqual("test3", record[2].Message.Key); + Assert.AreEqual("TEST3", record[2].Message.Value); + Assert.AreEqual(0, record[2].Partition.Value); + } + + [Test] + public void StreamToTopicExtractorLambdaCustomPartitionerTest() + { + var config = new StreamConfig { + ApplicationId = "test-stream-table-left-join" + }; + + StreamBuilder builder = new StreamBuilder(); + + builder + .Stream("stream") + .MapValues((v) => v.ToUpper()) + .To((_,_,_) => "output", + (_, _,_, _, _) => new Partition(0)); + + Topology t = builder.Build(); + + var mockSupplier = new MockKafkaSupplier(4); + mockSupplier.CreateTopic("output"); + mockSupplier.CreateTopic("stream"); + + using var driver = new TopologyTestDriver(t.Builder, config, TopologyTestDriver.Mode.ASYNC_CLUSTER_IN_MEMORY, mockSupplier); + var inputTopic = driver.CreateInputTopic("stream"); + var outputTopic = driver.CreateOuputTopic("output"); + inputTopic.PipeInput("test1", "test1"); + inputTopic.PipeInput("test2", "test2"); + inputTopic.PipeInput("test3", "test3"); + var record = IntegrationTestUtils.WaitUntilMinKeyValueRecordsReceived(outputTopic, 3); + Assert.IsNotNull(record); + Assert.AreEqual(3, record.Count); + Assert.AreEqual("test1", record[0].Message.Key); + Assert.AreEqual("TEST1", record[0].Message.Value); + Assert.AreEqual(0, record[0].Partition.Value); + + + Assert.AreEqual("test2", record[1].Message.Key); + Assert.AreEqual("TEST2", record[1].Message.Value); + Assert.AreEqual(0, record[1].Partition.Value); + + + Assert.AreEqual("test3", record[2].Message.Key); + Assert.AreEqual("TEST3", record[2].Message.Value); + Assert.AreEqual(0, record[2].Partition.Value); + } + + [Test] + public void StreamToTopicExtractorWithSerdesTest() + { + var config = new StreamConfig + { + ApplicationId = "test-stream-table-left-join" + }; + + StreamBuilder builder = new StreamBuilder(); + + builder + .Stream("stream") + .MapValues((v) => v.ToUpper()) + .To(new MyTopicNameExtractor(), + new StringSerDes(), + new StringSerDes()); + + Topology t = builder.Build(); + + var mockSupplier = new MockKafkaSupplier(4); + mockSupplier.CreateTopic("output"); + mockSupplier.CreateTopic("stream"); + + using var driver = new TopologyTestDriver(t.Builder, config, TopologyTestDriver.Mode.ASYNC_CLUSTER_IN_MEMORY, mockSupplier); + var inputTopic = driver.CreateInputTopic("stream"); + var outputTopic = driver.CreateOuputTopic("output"); + inputTopic.PipeInput("test1", "test1"); + inputTopic.PipeInput("test2", "test2"); + inputTopic.PipeInput("test3", "test3"); + var record = IntegrationTestUtils.WaitUntilMinKeyValueRecordsReceived(outputTopic, 3); + Assert.IsNotNull(record); + Assert.AreEqual(3, record.Count); + Assert.AreEqual("test1", record[0].Message.Key); + Assert.AreEqual("TEST1", record[0].Message.Value); + + Assert.AreEqual("test2", record[1].Message.Key); + Assert.AreEqual("TEST2", record[1].Message.Value); + + Assert.AreEqual("test3", record[2].Message.Key); + Assert.AreEqual("TEST3", record[2].Message.Value); + } + + [Test] + public void StreamToTopicExtractorWithSerdesTypeTest() + { + var config = new StreamConfig + { + ApplicationId = "test-stream-table-left-join" + }; + + StreamBuilder builder = new StreamBuilder(); + + builder + .Stream("stream") + .MapValues((v) => v.ToUpper()) + .To(new MyTopicNameExtractor()); + + Topology t = builder.Build(); + + var mockSupplier = new MockKafkaSupplier(4); + mockSupplier.CreateTopic("output"); + mockSupplier.CreateTopic("stream"); + + using var driver = new TopologyTestDriver(t.Builder, config, TopologyTestDriver.Mode.ASYNC_CLUSTER_IN_MEMORY, mockSupplier); + var inputTopic = driver.CreateInputTopic("stream"); + var outputTopic = driver.CreateOuputTopic("output"); + inputTopic.PipeInput("test1", "test1"); + inputTopic.PipeInput("test2", "test2"); + inputTopic.PipeInput("test3", "test3"); + var record = IntegrationTestUtils.WaitUntilMinKeyValueRecordsReceived(outputTopic, 3); + Assert.IsNotNull(record); + Assert.AreEqual(3, record.Count); + Assert.AreEqual("test1", record[0].Message.Key); + Assert.AreEqual("TEST1", record[0].Message.Value); + + Assert.AreEqual("test2", record[1].Message.Key); + Assert.AreEqual("TEST2", record[1].Message.Value); + + Assert.AreEqual("test3", record[2].Message.Key); + Assert.AreEqual("TEST3", record[2].Message.Value); + } + + [Test] + public void StreamToCustomPartitionerTest() + { + var config = new StreamConfig + { + ApplicationId = "test-stream-table-left-join" + }; + + StreamBuilder builder = new StreamBuilder(); + + builder + .Stream("stream") + .MapValues((v) => v.ToUpper()) + .To("output", new MyStreamPartitioner()); + + Topology t = builder.Build(); + + var mockSupplier = new MockKafkaSupplier(4); + mockSupplier.CreateTopic("output"); + mockSupplier.CreateTopic("stream"); + + using var driver = new TopologyTestDriver(t.Builder, config, TopologyTestDriver.Mode.ASYNC_CLUSTER_IN_MEMORY, mockSupplier); + var inputTopic = driver.CreateInputTopic("stream"); + var outputTopic = driver.CreateOuputTopic("output", TimeSpan.FromSeconds(2)); + inputTopic.PipeInput("order", "order1"); + inputTopic.PipeInput("order", "order2"); + inputTopic.PipeInput("sale", "sale1"); + inputTopic.PipeInput("other", "other"); + inputTopic.PipeInput("test", "test1"); + + var record = IntegrationTestUtils.WaitUntilMinKeyValueRecordsReceived(outputTopic, 5) + .ToList(); + + var orderRecord = record.Where(o => o.Message.Key.Equals("order")).ToList(); + var saleRecord = record.Where(o => o.Message.Key.Equals("sale")).ToList(); + var otherRecord = record.Where(o => o.Message.Key.Equals("other") || o.Message.Key.Equals("test")).ToList(); + + Assert.IsNotNull(record); + Assert.AreEqual(5, record.Count); + + Assert.AreEqual("order", orderRecord[0].Message.Key); + Assert.AreEqual("ORDER1", orderRecord[0].Message.Value); + Assert.AreEqual(0, orderRecord[0].Partition.Value); + + + Assert.AreEqual("order", orderRecord[1].Message.Key); + Assert.AreEqual("ORDER2", orderRecord[1].Message.Value); + Assert.AreEqual(0, orderRecord[1].Partition.Value); + + + Assert.AreEqual("sale", saleRecord[0].Message.Key); + Assert.AreEqual("SALE1", saleRecord[0].Message.Value); + Assert.AreEqual(1, saleRecord[0].Partition.Value); + + Assert.AreEqual(2, otherRecord.Count); + } + + [Test] + public void StreamToTopicExtractorLambdaCustomPartitionerRecordExtractorTimestampTest() + { + var config = new StreamConfig { + ApplicationId = "test-stream-table-left-join" + }; + + StreamBuilder builder = new StreamBuilder(); + + var tst = DateTime.Now.GetMilliseconds(); + builder + .Stream("stream") + .MapValues((v) => v.ToUpper()) + .To((_,_,_) => "output", + (_,_,_) => tst, + (_, _,_, _, _) => new Partition(0)); + + Topology t = builder.Build(); + + var mockSupplier = new MockKafkaSupplier(4); + mockSupplier.CreateTopic("output"); + mockSupplier.CreateTopic("stream"); + + using var driver = new TopologyTestDriver(t.Builder, config, TopologyTestDriver.Mode.ASYNC_CLUSTER_IN_MEMORY, mockSupplier); + var inputTopic = driver.CreateInputTopic("stream"); + var outputTopic = driver.CreateOuputTopic("output"); + inputTopic.PipeInput("test1", "test1"); + inputTopic.PipeInput("test2", "test2"); + inputTopic.PipeInput("test3", "test3"); + var record = IntegrationTestUtils.WaitUntilMinKeyValueRecordsReceived(outputTopic, 3); + Assert.IsNotNull(record); + Assert.AreEqual(3, record.Count); + Assert.AreEqual("test1", record[0].Message.Key); + Assert.AreEqual("TEST1", record[0].Message.Value); + Assert.AreEqual(tst, record[0].Message.Timestamp.UnixTimestampMs); + Assert.AreEqual(0, record[0].Partition.Value); + + + Assert.AreEqual("test2", record[1].Message.Key); + Assert.AreEqual("TEST2", record[1].Message.Value); + Assert.AreEqual(tst, record[1].Message.Timestamp.UnixTimestampMs); + Assert.AreEqual(0, record[1].Partition.Value); + + + Assert.AreEqual("test3", record[2].Message.Key); + Assert.AreEqual("TEST3", record[2].Message.Value); + Assert.AreEqual(tst, record[2].Message.Timestamp.UnixTimestampMs); + Assert.AreEqual(0, record[2].Partition.Value); + } + + [Test] + public void StreamToTopicExtractorRecordExtractorTimestampWithSerdesTest() + { + var config = new StreamConfig { + ApplicationId = "test-stream-table-left-join" + }; + + StreamBuilder builder = new StreamBuilder(); + + var tst = DateTime.Now.GetMilliseconds(); + builder + .Stream("stream") + .MapValues((v) => v.ToUpper()) + .To((_,_,_) => "output", + new StringSerDes(), new StringSerDes(), + (_,_,_) => tst); + + Topology t = builder.Build(); + + var mockSupplier = new MockKafkaSupplier(4); + mockSupplier.CreateTopic("output"); + mockSupplier.CreateTopic("stream"); + + using var driver = new TopologyTestDriver(t.Builder, config, TopologyTestDriver.Mode.ASYNC_CLUSTER_IN_MEMORY, mockSupplier); + var inputTopic = driver.CreateInputTopic("stream"); + var outputTopic = driver.CreateOuputTopic("output"); + inputTopic.PipeInput("test1", "test1"); + inputTopic.PipeInput("test2", "test2"); + inputTopic.PipeInput("test3", "test3"); + var record = IntegrationTestUtils.WaitUntilMinKeyValueRecordsReceived(outputTopic, 3); + Assert.IsNotNull(record); + Assert.AreEqual(3, record.Count); + Assert.AreEqual("test1", record[0].Message.Key); + Assert.AreEqual("TEST1", record[0].Message.Value); + Assert.AreEqual(tst, record[0].Message.Timestamp.UnixTimestampMs); + + Assert.AreEqual("test2", record[1].Message.Key); + Assert.AreEqual("TEST2", record[1].Message.Value); + Assert.AreEqual(tst, record[1].Message.Timestamp.UnixTimestampMs); + + Assert.AreEqual("test3", record[2].Message.Key); + Assert.AreEqual("TEST3", record[2].Message.Value); + Assert.AreEqual(tst, record[2].Message.Timestamp.UnixTimestampMs); + } + + [Test] + public void StreamToTopicExtractorRecordExtractorTimestampWithSerdesTypeTest() + { + var config = new StreamConfig { + ApplicationId = "test-stream-table-left-join" + }; + + StreamBuilder builder = new StreamBuilder(); + + var tst = DateTime.Now.GetMilliseconds(); + builder + .Stream("stream") + .MapValues((v) => v.ToUpper()) + .To((_,_,_) => "output", + (_,_,_) => tst); + + Topology t = builder.Build(); + + var mockSupplier = new MockKafkaSupplier(4); + mockSupplier.CreateTopic("output"); + mockSupplier.CreateTopic("stream"); + + using var driver = new TopologyTestDriver(t.Builder, config, TopologyTestDriver.Mode.ASYNC_CLUSTER_IN_MEMORY, mockSupplier); + var inputTopic = driver.CreateInputTopic("stream"); + var outputTopic = driver.CreateOuputTopic("output"); + inputTopic.PipeInput("test1", "test1"); + inputTopic.PipeInput("test2", "test2"); + inputTopic.PipeInput("test3", "test3"); + var record = IntegrationTestUtils.WaitUntilMinKeyValueRecordsReceived(outputTopic, 3); + Assert.IsNotNull(record); + Assert.AreEqual(3, record.Count); + Assert.AreEqual("test1", record[0].Message.Key); + Assert.AreEqual("TEST1", record[0].Message.Value); + Assert.AreEqual(tst, record[0].Message.Timestamp.UnixTimestampMs); + + Assert.AreEqual("test2", record[1].Message.Key); + Assert.AreEqual("TEST2", record[1].Message.Value); + Assert.AreEqual(tst, record[1].Message.Timestamp.UnixTimestampMs); + + Assert.AreEqual("test3", record[2].Message.Key); + Assert.AreEqual("TEST3", record[2].Message.Value); + Assert.AreEqual(tst, record[2].Message.Timestamp.UnixTimestampMs); + } + + [Test] + public void StreamToTopicExtractorRecordExtractorTimestampTest() + { + var config = new StreamConfig { + ApplicationId = "test-stream-table-left-join" + }; + + StreamBuilder builder = new StreamBuilder(); + + var tst = DateTime.Now.GetMilliseconds(); + builder + .Stream("stream") + .MapValues((v) => v.ToUpper()) + .To(new MyTopicNameExtractor(), + new MyRecordTimestampExtractor(tst)); + + Topology t = builder.Build(); + + var mockSupplier = new MockKafkaSupplier(4); + mockSupplier.CreateTopic("output"); + mockSupplier.CreateTopic("stream"); + + using var driver = new TopologyTestDriver(t.Builder, config, TopologyTestDriver.Mode.ASYNC_CLUSTER_IN_MEMORY, mockSupplier); + var inputTopic = driver.CreateInputTopic("stream"); + var outputTopic = driver.CreateOuputTopic("output"); + inputTopic.PipeInput("test1", "test1"); + inputTopic.PipeInput("test2", "test2"); + inputTopic.PipeInput("test3", "test3"); + var record = IntegrationTestUtils.WaitUntilMinKeyValueRecordsReceived(outputTopic, 3); + Assert.IsNotNull(record); + Assert.AreEqual(3, record.Count); + Assert.AreEqual("test1", record[0].Message.Key); + Assert.AreEqual("TEST1", record[0].Message.Value); + Assert.AreEqual(tst, record[0].Message.Timestamp.UnixTimestampMs); + + Assert.AreEqual("test2", record[1].Message.Key); + Assert.AreEqual("TEST2", record[1].Message.Value); + Assert.AreEqual(tst, record[1].Message.Timestamp.UnixTimestampMs); + + Assert.AreEqual("test3", record[2].Message.Key); + Assert.AreEqual("TEST3", record[2].Message.Value); + Assert.AreEqual(tst, record[2].Message.Timestamp.UnixTimestampMs); + } + + [Test] + public void StreamToTopicExtractorStreamPartitionerTest() + { + var config = new StreamConfig { + ApplicationId = "test-stream-table-left-join" + }; + + StreamBuilder builder = new StreamBuilder(); + + var tst = DateTime.Now.GetMilliseconds(); + builder + .Stream("stream") + .MapValues((v) => v.ToUpper()) + .To(new MyTopicNameExtractor(), + new MyStreamPartitioner()); + + Topology t = builder.Build(); + + var mockSupplier = new MockKafkaSupplier(4); + mockSupplier.CreateTopic("output"); + mockSupplier.CreateTopic("stream"); + + using var driver = new TopologyTestDriver(t.Builder, config, TopologyTestDriver.Mode.ASYNC_CLUSTER_IN_MEMORY, mockSupplier); + var inputTopic = driver.CreateInputTopic("stream"); + var outputTopic = driver.CreateOuputTopic("output"); + inputTopic.PipeInput("test1", "test1"); + inputTopic.PipeInput("test2", "test2"); + inputTopic.PipeInput("test3", "test3"); + var record = IntegrationTestUtils.WaitUntilMinKeyValueRecordsReceived(outputTopic, 3); + Assert.IsNotNull(record); + Assert.AreEqual(3, record.Count); + Assert.AreEqual("test1", record[0].Message.Key); + Assert.AreEqual("TEST1", record[0].Message.Value); + Assert.IsTrue(record[0].TopicPartition.Partition.Value >= 0); + + Assert.AreEqual("test2", record[1].Message.Key); + Assert.AreEqual("TEST2", record[1].Message.Value); + Assert.IsTrue(record[1].TopicPartition.Partition.Value >= 0); + + Assert.AreEqual("test3", record[2].Message.Key); + Assert.AreEqual("TEST3", record[2].Message.Value); + Assert.IsTrue(record[2].TopicPartition.Partition.Value >= 0); + } + + [Test] + public void StreamToTopicExtractorRecordExtractorTimestampStreamPartitionerTest() + { + var config = new StreamConfig { + ApplicationId = "test-stream-table-left-join" + }; + + StreamBuilder builder = new StreamBuilder(); + + var tst = DateTime.Now.GetMilliseconds(); + builder + .Stream("stream") + .MapValues((v) => v.ToUpper()) + .To(new MyTopicNameExtractor(), + new MyRecordTimestampExtractor(tst), + new MyStreamPartitioner()); + + Topology t = builder.Build(); + + var mockSupplier = new MockKafkaSupplier(4); + mockSupplier.CreateTopic("output"); + mockSupplier.CreateTopic("stream"); + + using var driver = new TopologyTestDriver(t.Builder, config, TopologyTestDriver.Mode.ASYNC_CLUSTER_IN_MEMORY, mockSupplier); + var inputTopic = driver.CreateInputTopic("stream"); + var outputTopic = driver.CreateOuputTopic("output"); + inputTopic.PipeInput("test1", "test1"); + inputTopic.PipeInput("test2", "test2"); + inputTopic.PipeInput("test3", "test3"); + var record = IntegrationTestUtils.WaitUntilMinKeyValueRecordsReceived(outputTopic, 3); + Assert.IsNotNull(record); + Assert.AreEqual(3, record.Count); + Assert.AreEqual("test1", record[0].Message.Key); + Assert.AreEqual("TEST1", record[0].Message.Value); + Assert.AreEqual(tst, record[0].Message.Timestamp.UnixTimestampMs); + Assert.IsTrue(record[0].TopicPartition.Partition.Value >= 0); + + Assert.AreEqual("test2", record[1].Message.Key); + Assert.AreEqual("TEST2", record[1].Message.Value); + Assert.AreEqual(tst, record[1].Message.Timestamp.UnixTimestampMs); + Assert.IsTrue(record[1].TopicPartition.Partition.Value >= 0); + + Assert.AreEqual("test3", record[2].Message.Key); + Assert.AreEqual("TEST3", record[2].Message.Value); + Assert.AreEqual(tst, record[2].Message.Timestamp.UnixTimestampMs); + Assert.IsTrue(record[2].TopicPartition.Partition.Value >= 0); + } + + [Test] + public void StreamToTopicExtractorRecordExtractorTimestampWithSerdesWrapperTest() + { + var config = new StreamConfig { + ApplicationId = "test-stream-table-left-join" + }; + + StreamBuilder builder = new StreamBuilder(); + + var tst = DateTime.Now.GetMilliseconds(); + builder + .Stream("stream") + .MapValues((v) => v.ToUpper()) + .To(new MyTopicNameExtractor(), + new StringSerDes(), new StringSerDes(), + new MyRecordTimestampExtractor(tst)); + + Topology t = builder.Build(); + + var mockSupplier = new MockKafkaSupplier(4); + mockSupplier.CreateTopic("output"); + mockSupplier.CreateTopic("stream"); + + using var driver = new TopologyTestDriver(t.Builder, config, TopologyTestDriver.Mode.ASYNC_CLUSTER_IN_MEMORY, mockSupplier); + var inputTopic = driver.CreateInputTopic("stream"); + var outputTopic = driver.CreateOuputTopic("output"); + inputTopic.PipeInput("test1", "test1"); + inputTopic.PipeInput("test2", "test2"); + inputTopic.PipeInput("test3", "test3"); + var record = IntegrationTestUtils.WaitUntilMinKeyValueRecordsReceived(outputTopic, 3); + Assert.IsNotNull(record); + Assert.AreEqual(3, record.Count); + Assert.AreEqual("test1", record[0].Message.Key); + Assert.AreEqual("TEST1", record[0].Message.Value); + Assert.AreEqual(tst, record[0].Message.Timestamp.UnixTimestampMs); + + Assert.AreEqual("test2", record[1].Message.Key); + Assert.AreEqual("TEST2", record[1].Message.Value); + Assert.AreEqual(tst, record[1].Message.Timestamp.UnixTimestampMs); + + Assert.AreEqual("test3", record[2].Message.Key); + Assert.AreEqual("TEST3", record[2].Message.Value); + Assert.AreEqual(tst, record[2].Message.Timestamp.UnixTimestampMs); + } + + [Test] + public void StreamToTopicExtractorRecordExtractorTimestampWithSerdesTypeWrapperTest() + { + var config = new StreamConfig { + ApplicationId = "test-stream-table-left-join" + }; + + StreamBuilder builder = new StreamBuilder(); + + var tst = DateTime.Now.GetMilliseconds(); + builder + .Stream("stream") + .MapValues((v) => v.ToUpper()) + .To(new MyTopicNameExtractor(), + new MyRecordTimestampExtractor(tst)); + + Topology t = builder.Build(); + + var mockSupplier = new MockKafkaSupplier(4); + mockSupplier.CreateTopic("output"); + mockSupplier.CreateTopic("stream"); + + using var driver = new TopologyTestDriver(t.Builder, config, TopologyTestDriver.Mode.ASYNC_CLUSTER_IN_MEMORY, mockSupplier); + var inputTopic = driver.CreateInputTopic("stream"); + var outputTopic = driver.CreateOuputTopic("output"); + inputTopic.PipeInput("test1", "test1"); + inputTopic.PipeInput("test2", "test2"); + inputTopic.PipeInput("test3", "test3"); + var record = IntegrationTestUtils.WaitUntilMinKeyValueRecordsReceived(outputTopic, 3); + Assert.IsNotNull(record); + Assert.AreEqual(3, record.Count); + Assert.AreEqual("test1", record[0].Message.Key); + Assert.AreEqual("TEST1", record[0].Message.Value); + Assert.AreEqual(tst, record[0].Message.Timestamp.UnixTimestampMs); + + Assert.AreEqual("test2", record[1].Message.Key); + Assert.AreEqual("TEST2", record[1].Message.Value); + Assert.AreEqual(tst, record[1].Message.Timestamp.UnixTimestampMs); + + Assert.AreEqual("test3", record[2].Message.Key); + Assert.AreEqual("TEST3", record[2].Message.Value); + Assert.AreEqual(tst, record[2].Message.Timestamp.UnixTimestampMs); + } + + + [Test] + public void StreamToLambdaTopicExtractorWithSerdesTest() + { + var config = new StreamConfig { + ApplicationId = "test-stream-table-left-join" + }; + + StreamBuilder builder = new StreamBuilder(); + + var tst = DateTime.Now.GetMilliseconds(); + builder + .Stream("stream") + .MapValues((v) => v.ToUpper()) + .To((_, _, _) => "output", new StringSerDes(), new StringSerDes()); + + Topology t = builder.Build(); + + var mockSupplier = new MockKafkaSupplier(4); + mockSupplier.CreateTopic("output"); + mockSupplier.CreateTopic("stream"); + + using var driver = new TopologyTestDriver(t.Builder, config, TopologyTestDriver.Mode.ASYNC_CLUSTER_IN_MEMORY, mockSupplier); + var inputTopic = driver.CreateInputTopic("stream"); + var outputTopic = driver.CreateOuputTopic("output"); + inputTopic.PipeInput("test1", "test1"); + inputTopic.PipeInput("test2", "test2"); + inputTopic.PipeInput("test3", "test3"); + var record = IntegrationTestUtils.WaitUntilMinKeyValueRecordsReceived(outputTopic, 3); + Assert.IsNotNull(record); + Assert.AreEqual(3, record.Count); + Assert.AreEqual("test1", record[0].Message.Key); + Assert.AreEqual("TEST1", record[0].Message.Value); + + Assert.AreEqual("test2", record[1].Message.Key); + Assert.AreEqual("TEST2", record[1].Message.Value); + + Assert.AreEqual("test3", record[2].Message.Key); + Assert.AreEqual("TEST3", record[2].Message.Value); + } + + [Test] + public void StreamToLambdaTopicExtractorWithSerdesTypeTest() + { + var config = new StreamConfig { + ApplicationId = "test-stream-table-left-join" + }; + + StreamBuilder builder = new StreamBuilder(); + + var tst = DateTime.Now.GetMilliseconds(); + builder + .Stream("stream") + .MapValues((v) => v.ToUpper()) + .To((_, _, _) => "output"); + + Topology t = builder.Build(); + + var mockSupplier = new MockKafkaSupplier(4); + mockSupplier.CreateTopic("output"); + mockSupplier.CreateTopic("stream"); + + using var driver = new TopologyTestDriver(t.Builder, config, TopologyTestDriver.Mode.ASYNC_CLUSTER_IN_MEMORY, mockSupplier); + var inputTopic = driver.CreateInputTopic("stream"); + var outputTopic = driver.CreateOuputTopic("output"); + inputTopic.PipeInput("test1", "test1"); + inputTopic.PipeInput("test2", "test2"); + inputTopic.PipeInput("test3", "test3"); + var record = IntegrationTestUtils.WaitUntilMinKeyValueRecordsReceived(outputTopic, 3); + Assert.IsNotNull(record); + Assert.AreEqual(3, record.Count); + Assert.AreEqual("test1", record[0].Message.Key); + Assert.AreEqual("TEST1", record[0].Message.Value); + + Assert.AreEqual("test2", record[1].Message.Key); + Assert.AreEqual("TEST2", record[1].Message.Value); + + Assert.AreEqual("test3", record[2].Message.Key); + Assert.AreEqual("TEST3", record[2].Message.Value); + } + + [Test] + public void StreamToAllLambdaWithSerdesTypeTest() + { + var config = new StreamConfig { + ApplicationId = "test-stream-table-left-join" + }; + + StreamBuilder builder = new StreamBuilder(); + + var tst = DateTime.Now.GetMilliseconds(); + builder + .Stream("stream") + .MapValues((v) => v.ToUpper()) + .To( + (_, _, _) => "output", + (_,_,_) => tst, + (_,_,_,_,_) => new Partition(0)); + + Topology t = builder.Build(); + + var mockSupplier = new MockKafkaSupplier(4); + mockSupplier.CreateTopic("output"); + mockSupplier.CreateTopic("stream"); + + using var driver = new TopologyTestDriver(t.Builder, config, TopologyTestDriver.Mode.ASYNC_CLUSTER_IN_MEMORY, mockSupplier); + var inputTopic = driver.CreateInputTopic("stream"); + var outputTopic = driver.CreateOuputTopic("output"); + inputTopic.PipeInput("test1", "test1"); + inputTopic.PipeInput("test2", "test2"); + inputTopic.PipeInput("test3", "test3"); + var record = IntegrationTestUtils.WaitUntilMinKeyValueRecordsReceived(outputTopic, 3); + Assert.IsNotNull(record); + Assert.AreEqual(3, record.Count); + Assert.AreEqual("test1", record[0].Message.Key); + Assert.AreEqual("TEST1", record[0].Message.Value); + Assert.AreEqual(tst, record[0].Message.Timestamp.UnixTimestampMs); + Assert.AreEqual(0, record[0].Partition.Value); + + Assert.AreEqual("test2", record[1].Message.Key); + Assert.AreEqual("TEST2", record[1].Message.Value); + Assert.AreEqual(tst, record[1].Message.Timestamp.UnixTimestampMs); + Assert.AreEqual(0, record[1].Partition.Value); + + Assert.AreEqual("test3", record[2].Message.Key); + Assert.AreEqual("TEST3", record[2].Message.Value); + Assert.AreEqual(tst, record[2].Message.Timestamp.UnixTimestampMs); + Assert.AreEqual(0, record[2].Partition.Value); + } + + [Test] + public void StreamArgumentExceptionTopicEmptyTest() + { + var config = new StreamConfig { + ApplicationId = "test-stream-table-left-join" + }; + + StreamBuilder builder = new StreamBuilder(); + + var stream = builder + .Stream("stream") + .MapValues((v) => v.ToUpper()); + + Assert.Throws(() => stream.To( + string.Empty, (_,_,_,_,_) => Partition.Any)); + } + + [Test] + public void StreamArgumentExceptionTopicEmpty2Test() + { + var config = new StreamConfig { + ApplicationId = "test-stream-table-left-join" + }; + + StreamBuilder builder = new StreamBuilder(); + + var stream = builder + .Stream("stream") + .MapValues((v) => v.ToUpper()); + + Assert.Throws(() => stream.To( + string.Empty, new MyStreamPartitioner())); + } + + [Test] + public void StreamArgumentExceptionTopicEmpty3Test() + { + var config = new StreamConfig { + ApplicationId = "test-stream-table-left-join" + }; + + StreamBuilder builder = new StreamBuilder(); + + var stream = builder + .Stream("stream") + .MapValues((v) => v.ToUpper()); + + Assert.Throws(() => stream.To( + string.Empty, new StringSerDes(), new StringSerDes())); + } + + [Test] + public void StreamArgumentExceptionTopicEmpty4Test() + { + var config = new StreamConfig { + ApplicationId = "test-stream-table-left-join" + }; + + StreamBuilder builder = new StreamBuilder(); + + var stream = builder + .Stream("stream") + .MapValues((v) => v.ToUpper()); + + Assert.Throws(() => stream.To( + string.Empty, (_,_,_,_,_) => Partition.Any, new StringSerDes(), new StringSerDes())); + } +} \ No newline at end of file diff --git a/test/Streamiz.Kafka.Net.Tests/Public/ProcessorContextTests.cs b/test/Streamiz.Kafka.Net.Tests/Public/ProcessorContextTests.cs index 59c30e9d..075f1fee 100644 --- a/test/Streamiz.Kafka.Net.Tests/Public/ProcessorContextTests.cs +++ b/test/Streamiz.Kafka.Net.Tests/Public/ProcessorContextTests.cs @@ -79,12 +79,13 @@ private StreamTask GetTask(TaskId taskId) config.ClientId = "test"; config.ApplicationId = "test-app"; + var syncKafkaSupplier = new SyncKafkaSupplier(true); var streamTask = new StreamTask( "thread", taskId, new List(), new Stream.Internal.ProcessorTopology(null, null, null, null, null, null, null, null), - null, config , null, new SyncProducer(config.ToProducerConfig()), new MockChangelogRegister(), new StreamMetricsRegistry()); + null, config , syncKafkaSupplier, new SyncProducer(config.ToProducerConfig()), new MockChangelogRegister(), new StreamMetricsRegistry()); return streamTask; } diff --git a/test/Streamiz.Kafka.Net.Tests/Stores/ChangeLoggingTimestampedKeyValueBytesStoreTests.cs b/test/Streamiz.Kafka.Net.Tests/Stores/ChangeLoggingTimestampedKeyValueBytesStoreTests.cs index be7e7e9b..a0a14c16 100644 --- a/test/Streamiz.Kafka.Net.Tests/Stores/ChangeLoggingTimestampedKeyValueBytesStoreTests.cs +++ b/test/Streamiz.Kafka.Net.Tests/Stores/ChangeLoggingTimestampedKeyValueBytesStoreTests.cs @@ -51,9 +51,14 @@ public void Begin() var producerConfig = new ProducerConfig(); producerConfig.ClientId = "producer-1"; - var producerClient = kafkaSupplier.GetProducer(producerConfig); - recordCollector = new RecordCollector("p-1", config, id, new NoRunnableSensor("s", "s", MetricsRecordingLevel.DEBUG)); + var adminConfig = new AdminClientConfig(); + adminConfig.ClientId = "admin-client"; + + var producerClient = kafkaSupplier.GetProducer(producerConfig); + var adminClient = kafkaSupplier.GetAdmin(adminConfig); + + recordCollector = new RecordCollector("p-1", config, id, new NoRunnableSensor("s", "s", MetricsRecordingLevel.DEBUG), adminClient); recordCollector.Init(ref producerClient); var changelogsTopics = new Dictionary{ diff --git a/test/Streamiz.Kafka.Net.Tests/Stores/ChangeLoggingTimestampedWindowBytesStoreTests.cs b/test/Streamiz.Kafka.Net.Tests/Stores/ChangeLoggingTimestampedWindowBytesStoreTests.cs index 35d4da6d..5870c20c 100644 --- a/test/Streamiz.Kafka.Net.Tests/Stores/ChangeLoggingTimestampedWindowBytesStoreTests.cs +++ b/test/Streamiz.Kafka.Net.Tests/Stores/ChangeLoggingTimestampedWindowBytesStoreTests.cs @@ -50,9 +50,14 @@ public void Begin() var producerConfig = new ProducerConfig(); producerConfig.ClientId = "producer-1"; + + var adminConfig = new AdminClientConfig(); + adminConfig.ClientId = "admin-client"; + var producerClient = kafkaSupplier.GetProducer(producerConfig); - - recordCollector = new RecordCollector("p-1", config, id, new NoRunnableSensor("s", "s", MetricsRecordingLevel.DEBUG)); + var adminClient = kafkaSupplier.GetAdmin(adminConfig); + + recordCollector = new RecordCollector("p-1", config, id, new NoRunnableSensor("s", "s", MetricsRecordingLevel.DEBUG), adminClient); recordCollector.Init(ref producerClient); var changelogsTopics = new Dictionary{ diff --git a/test/Streamiz.Kafka.Net.Tests/Stores/ChangeLoggingWindowBytesStoreTests.cs b/test/Streamiz.Kafka.Net.Tests/Stores/ChangeLoggingWindowBytesStoreTests.cs index 329029f7..1d45821c 100644 --- a/test/Streamiz.Kafka.Net.Tests/Stores/ChangeLoggingWindowBytesStoreTests.cs +++ b/test/Streamiz.Kafka.Net.Tests/Stores/ChangeLoggingWindowBytesStoreTests.cs @@ -47,9 +47,14 @@ public void Begin() var producerConfig = new ProducerConfig(); producerConfig.ClientId = "producer-1"; + + var adminConfig = new AdminClientConfig(); + adminConfig.ClientId = "admin-client"; + var producerClient = kafkaSupplier.GetProducer(producerConfig); - - recordCollector = new RecordCollector("p-1", config, id, new NoRunnableSensor("s", "s", MetricsRecordingLevel.DEBUG)); + var adminClient = kafkaSupplier.GetAdmin(adminConfig); + + recordCollector = new RecordCollector("p-1", config, id, new NoRunnableSensor("s", "s", MetricsRecordingLevel.DEBUG), adminClient); recordCollector.Init(ref producerClient); var changelogsTopics = new Dictionary{