Skip to content

Commit

Permalink
Merge pull request #366 from LGouellec/feat/eos-v2
Browse files Browse the repository at this point in the history
Exactly Once Semantics V2
  • Loading branch information
LGouellec authored Sep 27, 2024
2 parents 68741e7 + 563eb64 commit 61456fa
Show file tree
Hide file tree
Showing 106 changed files with 1,595 additions and 2,446 deletions.
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,5 @@ TestResults
.idea/
.vscode/

confidential
confidential
roadmap.md
2 changes: 1 addition & 1 deletion .gitpod.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ tasks:
echo "1" > run
echo "🚀 Enjoy Streamiz the .NET Stream processing library for Apache Kafka (TM)";
sleep 2
dotnet run -f net6.0 --project samples/sample-stream-demo/sample-stream-demo.csproj --no-build --no-restore
dotnet run -f net6.0 --project launcher/sample-stream-demo/sample-stream-demo.csproj --no-build --no-restore
- name: producer
command: while [ ! -f run ]; do sleep 1; done; docker-compose -f environment/docker-compose.yml exec broker kafka-console-producer --bootstrap-server broker:29092 --topic input
- name: consumer
Expand Down
4 changes: 0 additions & 4 deletions Pointers.md

This file was deleted.

2 changes: 1 addition & 1 deletion core/Crosscutting/Bytes.cs
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ public class Bytes : IEquatable<Bytes>, IComparable<Bytes>
/// Create a Bytes using the byte array.
/// </summary>
/// <param name="bytes">This array becomes the backing storage for the object.</param>
[Obsolete("Will be removed last release version")]
[Obsolete("Will be removed in 1.8.0")]
public Bytes(byte[] bytes)
{
Get = bytes;
Expand Down
17 changes: 17 additions & 0 deletions core/Errors/StreamProducerException.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
using System;
using Confluent.Kafka;

namespace Streamiz.Kafka.Net.Errors
{
internal class StreamProducerException : Exception
{
public ProduceException<byte[], byte[]> OriginalProduceException { get; set; }
public ProductionExceptionHandlerResponse Response { get; set; }

public StreamProducerException(ProduceException<byte[], byte[]> originalProduceException, ProductionExceptionHandlerResponse response)
{
OriginalProduceException = originalProduceException;
Response = response;
}
}
}
4 changes: 2 additions & 2 deletions core/Kafka/IRecordCollector.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@ namespace Streamiz.Kafka.Net.Kafka
internal interface IRecordCollector
{
IDictionary<TopicPartition, long> CollectorOffsets { get; }
void Init(ref IProducer<byte[], byte[]> producer);
void Initialize();
void Flush();
void Close();
void Close(bool dirty);
void Send<K, V>(string topic, K key, V value, Headers headers, long timestamp, ISerDes<K> keySerializer, ISerDes<V> valueSerializer);
void Send<K, V>(string topic, K key, V value, Headers headers, int partition, long timestamp, ISerDes<K> keySerializer, ISerDes<V> valueSerializer);
int PartitionsFor(string topic);
Expand Down
263 changes: 106 additions & 157 deletions core/Kafka/Internal/RecordCollector.cs

Large diffs are not rendered by default.

232 changes: 232 additions & 0 deletions core/Kafka/Internal/StreamsProducer.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,232 @@
using System;
using System.Collections.Generic;
using System.Linq;
using Confluent.Kafka;
using Microsoft.Extensions.Logging;
using Streamiz.Kafka.Net.Errors;
using Streamiz.Kafka.Net.Processors;
using Streamiz.Kafka.Net.Crosscutting;

namespace Streamiz.Kafka.Net.Kafka.Internal
{
/// <summary>
/// Manages the producers within a Kafka Streams application.
/// If EOS is enabled, it is responsible to init and begin transactions if necessary.
/// It also tracks the transaction status, ie, if a transaction is in-fight.
/// For non-EOS, the user should not call transaction related methods.
/// </summary>
internal class StreamsProducer : IDisposable
{
private readonly ILogger log = Logger.GetLogger(typeof(StreamsProducer));
private readonly string _logPrefix;
private IProducer<byte[], byte[]> _producer;
private bool _transactionInitialized = false;
private readonly IStreamConfig _configuration;
private readonly IKafkaSupplier _kafkaSupplier;
private readonly ProducerConfig _producerConfig;
private readonly IAdminClient _adminClient;

public bool EosEnabled { get; }
public bool TransactionInFlight { get; private set; }

// for testing
internal IProducer<byte[], byte[]> Producer => _producer;

public string Name => _producer.Name;

private IDictionary<string, (int, DateTime)> _cachePartitionsForTopics =
new Dictionary<string, (int, DateTime)>();

public StreamsProducer(
IStreamConfig config,
String threadId,
Guid processId,
IKafkaSupplier kafkaSupplier,
String logPrefix)
{
_logPrefix = logPrefix;
_configuration = config;
_kafkaSupplier = kafkaSupplier;

_producerConfig = config.ToProducerConfig(StreamThread.GetThreadProducerClientId(threadId))
.Wrap(threadId);

_adminClient = kafkaSupplier.GetAdmin(config.ToAdminConfig(threadId));

switch (config.Guarantee)
{
case ProcessingGuarantee.AT_LEAST_ONCE:
break;
case ProcessingGuarantee.EXACTLY_ONCE:
_producerConfig.TransactionalId = $"{config.ApplicationId}-{processId}";
break;
default:
throw new StreamsException($"Guarantee {config.Guarantee} is not supported yet");
}

_producer = _kafkaSupplier.GetProducer(_producerConfig);
EosEnabled = config.Guarantee == ProcessingGuarantee.EXACTLY_ONCE;
}

internal static bool IsRecoverable(Error error)
{
return error.Code == ErrorCode.InvalidProducerIdMapping ||
error.Code == ErrorCode.ProducerFenced ||
error.Code == ErrorCode.InvalidProducerEpoch ||
error.Code == ErrorCode.UnknownProducerId;
}

public void InitTransaction()
{
if (!EosEnabled)
throw new IllegalStateException("Exactly-once is not enabled");

if (!_transactionInitialized)
{
try
{
_producer.InitTransactions(_configuration.TransactionTimeout);
_transactionInitialized = true;
}
catch (KafkaRetriableException)
{
log.LogWarning(
"Timeout exception caught trying to initialize transactions. " +
"The broker is either slow or in bad state (like not having enough replicas) in " +
"responding to the request, or the connection to broker was interrupted sending " +
"the request or receiving the response. " +
"Will retry initializing the task in the next loop. " +
"Consider overwriting 'transaction.timeout' to a larger value to avoid timeout errors"
);
throw;
}
catch (KafkaException kafkaException)
{
throw new StreamsException("Error encountered trying to initialize transactions", kafkaException);
}
}
}

public void ResetProducer()
{
Close();
_producer = _kafkaSupplier.GetProducer(_producerConfig);
}

private void StartTransaction()
{
if (EosEnabled && !TransactionInFlight)
{
try
{
_producer.BeginTransaction();
TransactionInFlight = true;
}
catch (KafkaException kafkaException)
{
if (IsRecoverable(kafkaException.Error))
throw new TaskMigratedException(
$"Producer got fenced trying to begin a new transaction : {kafkaException.Message}");

throw new StreamsException("Error encountered trying to begin a new transaction", kafkaException);
}
}
}

public void Produce(TopicPartition topicPartition, Message<byte[], byte[]> message,
Action<DeliveryReport<byte[], byte[]>> deliveryReport)
{
StartTransaction();
_producer.Produce(topicPartition, message, deliveryReport);
}

public void Produce(string topic, Message<byte[], byte[]> message,
Action<DeliveryReport<byte[], byte[]>> deliveryReport)
{
StartTransaction();
_producer.Produce(topic, message, deliveryReport);
}

public void CommitTransaction(IEnumerable<TopicPartitionOffset> offsets, IConsumerGroupMetadata groupMetadata)
{
if (!EosEnabled)
throw new IllegalStateException("Exactly-once is not enabled");

StartTransaction();
try
{
_producer.SendOffsetsToTransaction(offsets, groupMetadata, _configuration.TransactionTimeout);
_producer.CommitTransaction();
TransactionInFlight = false;
}
catch (KafkaException kafkaException)
{
if (IsRecoverable(kafkaException.Error))
throw new TaskMigratedException(
$"Producer got fenced trying to commit a transaction : {kafkaException.Message}");

throw new StreamsException("Error encountered trying to commit a transaction", kafkaException);
}
}

public void AbortTransaction()
{
if (!EosEnabled)
throw new IllegalStateException("Exactly-once is not enabled");

if (TransactionInFlight)
{
try
{
_producer.AbortTransaction();
}
catch (KafkaException kafkaException)
{
if (IsRecoverable(kafkaException.Error))
log.LogDebug(
$"Encountered {kafkaException.Message} while aborting the transaction; this is expected and hence swallowed");
else
throw new StreamsException("Error encounter trying to abort a transaction", kafkaException);

}
finally
{
TransactionInFlight = false;
}
}
}

public void Flush()
{
_producer.Flush();
}

private void Close()
{
TransactionInFlight = false;
_transactionInitialized = false;
}

public void Dispose()
{
Close();
_producer.Dispose();
_adminClient?.Dispose();
}

public int PartitionsFor(string topic)
{
var adminConfig = _configuration.ToAdminConfig("");
var refreshInterval = adminConfig.TopicMetadataRefreshIntervalMs ?? 5 * 60 * 1000;

if (_cachePartitionsForTopics.ContainsKey(topic) &&
_cachePartitionsForTopics[topic].Item2 + TimeSpan.FromMilliseconds(refreshInterval) > DateTime.Now)
return _cachePartitionsForTopics[topic].Item1;

var metadata = _adminClient.GetMetadata(topic, TimeSpan.FromSeconds(5));
var partitionCount = metadata.Topics.FirstOrDefault(t => t.Topic.Equals(topic))!.Partitions.Count;
_cachePartitionsForTopics.AddOrUpdate(topic, (partitionCount, DateTime.Now));
return partitionCount;
}
}
}
31 changes: 19 additions & 12 deletions core/Kafka/Internal/StreamsRebalanceListener.cs
Original file line number Diff line number Diff line change
Expand Up @@ -30,19 +30,26 @@ public void PartitionsAssigned(IConsumer<byte[], byte[]> consumer, List<TopicPar
{
try
{
log.LogInformation($"New partitions assign requested : {string.Join(",", partitions)}");

DateTime start = DateTime.Now;
manager.RebalanceInProgress = true;
manager.CreateTasks(partitions);
Thread.SetState(ThreadState.PARTITIONS_ASSIGNED);
Thread.LastPartitionAssignedTime = start.GetMilliseconds();
manager.RebalanceInProgress = false;
lock (manager._lock)
{
if (Thread.IsRunning)
{
log.LogInformation($"New partitions assign requested : {string.Join(",", partitions)}");

StringBuilder sb = new StringBuilder();
sb.AppendLine($"Partition assignment took {DateTime.Now - start} ms.");
sb.AppendLine($"\tCurrently assigned active tasks: {string.Join(",", this.manager.ActiveTaskIds)}");
log.LogInformation(sb.ToString());
DateTime start = DateTime.Now;
manager.RebalanceInProgress = true;
manager.CreateTasks(partitions);
Thread.SetState(ThreadState.PARTITIONS_ASSIGNED);
Thread.LastPartitionAssignedTime = start.GetMilliseconds();
manager.RebalanceInProgress = false;

StringBuilder sb = new StringBuilder();
sb.AppendLine($"Partition assignment took {DateTime.Now - start} ms.");
sb.AppendLine(
$"\tCurrently assigned active tasks: {string.Join(",", this.manager.ActiveTaskIds)}");
log.LogInformation(sb.ToString());
}
}
}
catch (Exception e)
{
Expand Down
10 changes: 7 additions & 3 deletions core/KafkaStream.cs
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,7 @@ public override string ToString()

private readonly CancellationTokenSource _cancelSource = new();
private readonly SequentiallyGracefullyShutdownHook shutdownHook;
private readonly Guid processId;

internal State StreamState { get; private set; }

Expand Down Expand Up @@ -301,8 +302,6 @@ public KafkaStream(Topology topology, IStreamConfig configuration, IKafkaSupplie
throw new StreamConfigException($"Stream configuration is not correct. Please set ApplicationId and BootstrapServers as minimal.");
}

var processID = Guid.NewGuid();
clientId = string.IsNullOrEmpty(configuration.ClientId) ? $"{configuration.ApplicationId.ToLower()}-{processID}" : configuration.ClientId;
logPrefix = $"stream-application[{configuration.ApplicationId}] ";
metricsRegistry = new StreamMetricsRegistry(clientId, configuration.MetricsRecording);
this.kafkaSupplier.MetricsRegistry = metricsRegistry;
Expand All @@ -314,6 +313,10 @@ public KafkaStream(Topology topology, IStreamConfig configuration, IKafkaSupplie

// sanity check
var processorTopology = topology.Builder.BuildTopology();
processId = StateDirectory
.GetInstance(this.configuration, topology.Builder.HasPersistentStores)
.InitializeProcessId();
clientId = string.IsNullOrEmpty(configuration.ClientId) ? $"{configuration.ApplicationId.ToLower()}-{processId}" : configuration.ClientId;

int numStreamThreads = topology.Builder.HasNoNonGlobalTopology ? 0 : configuration.NumStreamThreads;

Expand Down Expand Up @@ -376,6 +379,7 @@ string Protect(string str)

threads[i] = StreamThread.Create(
threadId,
processId,
clientId,
this.topology.Builder,
metricsRegistry,
Expand Down Expand Up @@ -418,7 +422,7 @@ string Protect(string str)
/// Because threads are started in the background, this method does not block.
/// </summary>
/// <param name="token">Token for propagates notification that the stream should be canceled.</param>
[Obsolete("This method is deprecated, please use StartAsync(...) instead. It will be removed in next release version.")]
[Obsolete("This method is deprecated, please use StartAsync(...) instead. It will be removed in 1.8.0.")]
public void Start(CancellationToken? token = null)
{
StartAsync(token).ConfigureAwait(false).GetAwaiter().GetResult();
Expand Down
1 change: 1 addition & 0 deletions core/Mock/ClusterInMemoryTopologyDriver.cs
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ public ClusterInMemoryTopologyDriver(string clientId, InternalTopologyBuilder to

threadTopology = StreamThread.Create(
$"{this.configuration.ApplicationId.ToLower()}-stream-thread-0",
Guid.NewGuid(),
clientId,
topologyBuilder,
metricsRegistry,
Expand Down
1 change: 1 addition & 0 deletions core/Mock/Pipes/SyncPipeOutput.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
using System;
using System.Collections.Generic;
using System.Threading;
using Streamiz.Kafka.Net.Kafka.Internal;

namespace Streamiz.Kafka.Net.Mock.Pipes
{
Expand Down
Loading

0 comments on commit 61456fa

Please sign in to comment.