Skip to content

Commit

Permalink
fix unit tests
Browse files Browse the repository at this point in the history
  • Loading branch information
LGouellec committed Jul 3, 2024
1 parent c601bd5 commit ef9dea7
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 15 deletions.
28 changes: 28 additions & 0 deletions core/Processors/DefaultStreamPartitioner.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
using Confluent.Kafka;

namespace Streamiz.Kafka.Net.Processors
{
/// <summary>
/// Forward the source partition as the sink partition of the record if there is enough sink partitions, Partition.Any otherwise
/// </summary>
/// <typeparam name="K">Key record type</typeparam>
/// <typeparam name="V">Value record type</typeparam>
public class DefaultStreamPartitioner<K, V> : IStreamPartitioner<K, V>
{
/// <summary>
/// Function used to determine how records are distributed among partitions of the topic
/// </summary>
/// <param name="topic">Sink topic name</param>
/// <param name="key">record's key</param>
/// <param name="value">record's value</param>
/// <param name="sourcePartition">record's source partition</param>
/// <param name="numPartitions">number partitions of the sink topic</param>
/// <returns>Return the source partition as the sink partition of the record if there is enough sink partitions, Partition.Any otherwise</returns>
public Partition Partition(string topic, K key, V value, Partition sourcePartition, int numPartitions)
{
if (sourcePartition.Value <= numPartitions - 1)
return sourcePartition;
return Confluent.Kafka.Partition.Any;
}
}
}
9 changes: 9 additions & 0 deletions core/Processors/IStreamPartitioner.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,15 @@ namespace Streamiz.Kafka.Net.Processors
{
public interface IStreamPartitioner<K, V>
{
/// <summary>
/// Function used to determine how records are distributed among partitions of the topic
/// </summary>
/// <param name="topic">Sink topic name</param>
/// <param name="key">record's key</param>
/// <param name="value">record's value</param>
/// <param name="sourcePartition">record's source partition</param>
/// <param name="numPartitions">number partitions of the sink topic</param>
/// <returns>Return the destination partition for the current record</returns>
Partition Partition(string topic, K key, V value, Partition sourcePartition, int numPartitions);
}
}
15 changes: 0 additions & 15 deletions core/Processors/Internal/DefaultStreamPartitioner.cs

This file was deleted.

0 comments on commit ef9dea7

Please sign in to comment.