Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Exactly Once Semantics V2 #366

Merged
merged 7 commits into from
Sep 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,5 @@ TestResults
.idea/
.vscode/

confidential
confidential
roadmap.md
2 changes: 1 addition & 1 deletion .gitpod.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ tasks:
echo "1" > run
echo "🚀 Enjoy Streamiz the .NET Stream processing library for Apache Kafka (TM)";
sleep 2
dotnet run -f net6.0 --project samples/sample-stream-demo/sample-stream-demo.csproj --no-build --no-restore
dotnet run -f net6.0 --project launcher/sample-stream-demo/sample-stream-demo.csproj --no-build --no-restore
- name: producer
command: while [ ! -f run ]; do sleep 1; done; docker-compose -f environment/docker-compose.yml exec broker kafka-console-producer --bootstrap-server broker:29092 --topic input
- name: consumer
Expand Down
4 changes: 0 additions & 4 deletions Pointers.md

This file was deleted.

2 changes: 1 addition & 1 deletion core/Crosscutting/Bytes.cs
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ public class Bytes : IEquatable<Bytes>, IComparable<Bytes>
/// Create a Bytes using the byte array.
/// </summary>
/// <param name="bytes">This array becomes the backing storage for the object.</param>
[Obsolete("Will be removed last release version")]
[Obsolete("Will be removed in 1.8.0")]
public Bytes(byte[] bytes)
{
Get = bytes;
Expand Down
17 changes: 17 additions & 0 deletions core/Errors/StreamProducerException.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
using System;
using Confluent.Kafka;

namespace Streamiz.Kafka.Net.Errors
{
internal class StreamProducerException : Exception
{
public ProduceException<byte[], byte[]> OriginalProduceException { get; set; }
public ProductionExceptionHandlerResponse Response { get; set; }

public StreamProducerException(ProduceException<byte[], byte[]> originalProduceException, ProductionExceptionHandlerResponse response)
{
OriginalProduceException = originalProduceException;
Response = response;
}
}
}
4 changes: 2 additions & 2 deletions core/Kafka/IRecordCollector.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@ namespace Streamiz.Kafka.Net.Kafka
internal interface IRecordCollector
{
IDictionary<TopicPartition, long> CollectorOffsets { get; }
void Init(ref IProducer<byte[], byte[]> producer);
void Initialize();
void Flush();
void Close();
void Close(bool dirty);
void Send<K, V>(string topic, K key, V value, Headers headers, long timestamp, ISerDes<K> keySerializer, ISerDes<V> valueSerializer);
void Send<K, V>(string topic, K key, V value, Headers headers, int partition, long timestamp, ISerDes<K> keySerializer, ISerDes<V> valueSerializer);
int PartitionsFor(string topic);
Expand Down
263 changes: 106 additions & 157 deletions core/Kafka/Internal/RecordCollector.cs

Large diffs are not rendered by default.

232 changes: 232 additions & 0 deletions core/Kafka/Internal/StreamsProducer.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,232 @@
using System;
using System.Collections.Generic;
using System.Linq;
using Confluent.Kafka;
using Microsoft.Extensions.Logging;
using Streamiz.Kafka.Net.Errors;
using Streamiz.Kafka.Net.Processors;
using Streamiz.Kafka.Net.Crosscutting;

namespace Streamiz.Kafka.Net.Kafka.Internal
{
/// <summary>
/// Manages the producers within a Kafka Streams application.
/// If EOS is enabled, it is responsible to init and begin transactions if necessary.
/// It also tracks the transaction status, ie, if a transaction is in-fight.
/// For non-EOS, the user should not call transaction related methods.
/// </summary>
internal class StreamsProducer : IDisposable
{
private readonly ILogger log = Logger.GetLogger(typeof(StreamsProducer));
private readonly string _logPrefix;
private IProducer<byte[], byte[]> _producer;
private bool _transactionInitialized = false;
private readonly IStreamConfig _configuration;
private readonly IKafkaSupplier _kafkaSupplier;
private readonly ProducerConfig _producerConfig;
private readonly IAdminClient _adminClient;

public bool EosEnabled { get; }
public bool TransactionInFlight { get; private set; }

// for testing
internal IProducer<byte[], byte[]> Producer => _producer;

public string Name => _producer.Name;

private IDictionary<string, (int, DateTime)> _cachePartitionsForTopics =
new Dictionary<string, (int, DateTime)>();

public StreamsProducer(
IStreamConfig config,
String threadId,
Guid processId,
IKafkaSupplier kafkaSupplier,
String logPrefix)
{
_logPrefix = logPrefix;
_configuration = config;
_kafkaSupplier = kafkaSupplier;

_producerConfig = config.ToProducerConfig(StreamThread.GetThreadProducerClientId(threadId))
.Wrap(threadId);

_adminClient = kafkaSupplier.GetAdmin(config.ToAdminConfig(threadId));

switch (config.Guarantee)
{
case ProcessingGuarantee.AT_LEAST_ONCE:
break;
case ProcessingGuarantee.EXACTLY_ONCE:
_producerConfig.TransactionalId = $"{config.ApplicationId}-{processId}";
break;
default:
throw new StreamsException($"Guarantee {config.Guarantee} is not supported yet");
}

_producer = _kafkaSupplier.GetProducer(_producerConfig);
EosEnabled = config.Guarantee == ProcessingGuarantee.EXACTLY_ONCE;
}

internal static bool IsRecoverable(Error error)
{
return error.Code == ErrorCode.InvalidProducerIdMapping ||
error.Code == ErrorCode.ProducerFenced ||
error.Code == ErrorCode.InvalidProducerEpoch ||
error.Code == ErrorCode.UnknownProducerId;
}

public void InitTransaction()
{
if (!EosEnabled)
throw new IllegalStateException("Exactly-once is not enabled");

if (!_transactionInitialized)
{
try
{
_producer.InitTransactions(_configuration.TransactionTimeout);
_transactionInitialized = true;
}
catch (KafkaRetriableException)
{
log.LogWarning(
"Timeout exception caught trying to initialize transactions. " +
"The broker is either slow or in bad state (like not having enough replicas) in " +
"responding to the request, or the connection to broker was interrupted sending " +
"the request or receiving the response. " +
"Will retry initializing the task in the next loop. " +
"Consider overwriting 'transaction.timeout' to a larger value to avoid timeout errors"
);
throw;
}
catch (KafkaException kafkaException)
{
throw new StreamsException("Error encountered trying to initialize transactions", kafkaException);
}
}
}

public void ResetProducer()
{
Close();
_producer = _kafkaSupplier.GetProducer(_producerConfig);
}

private void StartTransaction()
{
if (EosEnabled && !TransactionInFlight)
{
try
{
_producer.BeginTransaction();
TransactionInFlight = true;
}
catch (KafkaException kafkaException)
{
if (IsRecoverable(kafkaException.Error))
throw new TaskMigratedException(
$"Producer got fenced trying to begin a new transaction : {kafkaException.Message}");

throw new StreamsException("Error encountered trying to begin a new transaction", kafkaException);
}
}
}

public void Produce(TopicPartition topicPartition, Message<byte[], byte[]> message,
Action<DeliveryReport<byte[], byte[]>> deliveryReport)
{
StartTransaction();
_producer.Produce(topicPartition, message, deliveryReport);
}

public void Produce(string topic, Message<byte[], byte[]> message,
Action<DeliveryReport<byte[], byte[]>> deliveryReport)
{
StartTransaction();
_producer.Produce(topic, message, deliveryReport);
}

public void CommitTransaction(IEnumerable<TopicPartitionOffset> offsets, IConsumerGroupMetadata groupMetadata)
{
if (!EosEnabled)
throw new IllegalStateException("Exactly-once is not enabled");

StartTransaction();
try
{
_producer.SendOffsetsToTransaction(offsets, groupMetadata, _configuration.TransactionTimeout);
_producer.CommitTransaction();
TransactionInFlight = false;
}
catch (KafkaException kafkaException)
{
if (IsRecoverable(kafkaException.Error))
throw new TaskMigratedException(
$"Producer got fenced trying to commit a transaction : {kafkaException.Message}");

throw new StreamsException("Error encountered trying to commit a transaction", kafkaException);
}
}

public void AbortTransaction()
{
if (!EosEnabled)
throw new IllegalStateException("Exactly-once is not enabled");

if (TransactionInFlight)
{
try
{
_producer.AbortTransaction();
}
catch (KafkaException kafkaException)
{
if (IsRecoverable(kafkaException.Error))
log.LogDebug(
$"Encountered {kafkaException.Message} while aborting the transaction; this is expected and hence swallowed");
else
throw new StreamsException("Error encounter trying to abort a transaction", kafkaException);

}
finally
{
TransactionInFlight = false;
}
}
}

public void Flush()
{
_producer.Flush();
}

private void Close()
{
TransactionInFlight = false;
_transactionInitialized = false;
}

public void Dispose()
{
Close();
_producer.Dispose();
_adminClient?.Dispose();
}

public int PartitionsFor(string topic)
{
var adminConfig = _configuration.ToAdminConfig("");
var refreshInterval = adminConfig.TopicMetadataRefreshIntervalMs ?? 5 * 60 * 1000;

if (_cachePartitionsForTopics.ContainsKey(topic) &&
_cachePartitionsForTopics[topic].Item2 + TimeSpan.FromMilliseconds(refreshInterval) > DateTime.Now)
return _cachePartitionsForTopics[topic].Item1;

var metadata = _adminClient.GetMetadata(topic, TimeSpan.FromSeconds(5));
var partitionCount = metadata.Topics.FirstOrDefault(t => t.Topic.Equals(topic))!.Partitions.Count;
_cachePartitionsForTopics.AddOrUpdate(topic, (partitionCount, DateTime.Now));
return partitionCount;
}
}
}
31 changes: 19 additions & 12 deletions core/Kafka/Internal/StreamsRebalanceListener.cs
Original file line number Diff line number Diff line change
Expand Up @@ -30,19 +30,26 @@ public void PartitionsAssigned(IConsumer<byte[], byte[]> consumer, List<TopicPar
{
try
{
log.LogInformation($"New partitions assign requested : {string.Join(",", partitions)}");

DateTime start = DateTime.Now;
manager.RebalanceInProgress = true;
manager.CreateTasks(partitions);
Thread.SetState(ThreadState.PARTITIONS_ASSIGNED);
Thread.LastPartitionAssignedTime = start.GetMilliseconds();
manager.RebalanceInProgress = false;
lock (manager._lock)
{
if (Thread.IsRunning)
{
log.LogInformation($"New partitions assign requested : {string.Join(",", partitions)}");

StringBuilder sb = new StringBuilder();
sb.AppendLine($"Partition assignment took {DateTime.Now - start} ms.");
sb.AppendLine($"\tCurrently assigned active tasks: {string.Join(",", this.manager.ActiveTaskIds)}");
log.LogInformation(sb.ToString());
DateTime start = DateTime.Now;
manager.RebalanceInProgress = true;
manager.CreateTasks(partitions);
Thread.SetState(ThreadState.PARTITIONS_ASSIGNED);
Thread.LastPartitionAssignedTime = start.GetMilliseconds();
manager.RebalanceInProgress = false;

StringBuilder sb = new StringBuilder();
sb.AppendLine($"Partition assignment took {DateTime.Now - start} ms.");
sb.AppendLine(
$"\tCurrently assigned active tasks: {string.Join(",", this.manager.ActiveTaskIds)}");
log.LogInformation(sb.ToString());
}
}
}
catch (Exception e)
{
Expand Down
10 changes: 7 additions & 3 deletions core/KafkaStream.cs
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,7 @@ public override string ToString()

private readonly CancellationTokenSource _cancelSource = new();
private readonly SequentiallyGracefullyShutdownHook shutdownHook;
private readonly Guid processId;

internal State StreamState { get; private set; }

Expand Down Expand Up @@ -301,8 +302,6 @@ public KafkaStream(Topology topology, IStreamConfig configuration, IKafkaSupplie
throw new StreamConfigException($"Stream configuration is not correct. Please set ApplicationId and BootstrapServers as minimal.");
}

var processID = Guid.NewGuid();
clientId = string.IsNullOrEmpty(configuration.ClientId) ? $"{configuration.ApplicationId.ToLower()}-{processID}" : configuration.ClientId;
logPrefix = $"stream-application[{configuration.ApplicationId}] ";
metricsRegistry = new StreamMetricsRegistry(clientId, configuration.MetricsRecording);
this.kafkaSupplier.MetricsRegistry = metricsRegistry;
Expand All @@ -314,6 +313,10 @@ public KafkaStream(Topology topology, IStreamConfig configuration, IKafkaSupplie

// sanity check
var processorTopology = topology.Builder.BuildTopology();
processId = StateDirectory
.GetInstance(this.configuration, topology.Builder.HasPersistentStores)
.InitializeProcessId();
clientId = string.IsNullOrEmpty(configuration.ClientId) ? $"{configuration.ApplicationId.ToLower()}-{processId}" : configuration.ClientId;

int numStreamThreads = topology.Builder.HasNoNonGlobalTopology ? 0 : configuration.NumStreamThreads;

Expand Down Expand Up @@ -376,6 +379,7 @@ string Protect(string str)

threads[i] = StreamThread.Create(
threadId,
processId,
clientId,
this.topology.Builder,
metricsRegistry,
Expand Down Expand Up @@ -418,7 +422,7 @@ string Protect(string str)
/// Because threads are started in the background, this method does not block.
/// </summary>
/// <param name="token">Token for propagates notification that the stream should be canceled.</param>
[Obsolete("This method is deprecated, please use StartAsync(...) instead. It will be removed in next release version.")]
[Obsolete("This method is deprecated, please use StartAsync(...) instead. It will be removed in 1.8.0.")]
public void Start(CancellationToken? token = null)
{
StartAsync(token).ConfigureAwait(false).GetAwaiter().GetResult();
Expand Down
1 change: 1 addition & 0 deletions core/Mock/ClusterInMemoryTopologyDriver.cs
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ public ClusterInMemoryTopologyDriver(string clientId, InternalTopologyBuilder to

threadTopology = StreamThread.Create(
$"{this.configuration.ApplicationId.ToLower()}-stream-thread-0",
Guid.NewGuid(),
clientId,
topologyBuilder,
metricsRegistry,
Expand Down
1 change: 1 addition & 0 deletions core/Mock/Pipes/SyncPipeOutput.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
using System;
using System.Collections.Generic;
using System.Threading;
using Streamiz.Kafka.Net.Kafka.Internal;

namespace Streamiz.Kafka.Net.Mock.Pipes
{
Expand Down
Loading
Loading