Skip to content

Commit

Permalink
fix: use ConfigureAwait(false) on all awaits
Browse files Browse the repository at this point in the history
  • Loading branch information
gnjack committed Apr 24, 2024
1 parent 16f7851 commit 4c9bcd8
Show file tree
Hide file tree
Showing 15 changed files with 61 additions and 55 deletions.
10 changes: 5 additions & 5 deletions src/KafkaFlow/Batching/BatchConsumeMiddleware.cs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public BatchConsumeMiddleware(

public async Task Invoke(IMessageContext context, MiddlewareDelegate next)
{
await _dispatchSemaphore.WaitAsync();
await _dispatchSemaphore.WaitAsync().ConfigureAwait(false);

try
{
Expand All @@ -59,7 +59,7 @@ public async Task Invoke(IMessageContext context, MiddlewareDelegate next)

if (_batch.Count >= _batchSize)
{
await this.TriggerDispatchAndWaitAsync();
await this.TriggerDispatchAndWaitAsync().ConfigureAwait(false);
}
}

Expand All @@ -72,11 +72,11 @@ public void Dispose()

private async Task TriggerDispatchAndWaitAsync()
{
await _dispatchSemaphore.WaitAsync();
await _dispatchSemaphore.WaitAsync().ConfigureAwait(false);
_dispatchTokenSource?.Cancel();
_dispatchSemaphore.Release();

await (_dispatchTask ?? Task.CompletedTask);
await (_dispatchTask ?? Task.CompletedTask).ConfigureAwait(false);
}

private void ScheduleExecution(IMessageContext context, MiddlewareDelegate next)
Expand All @@ -92,7 +92,7 @@ private void ScheduleExecution(IMessageContext context, MiddlewareDelegate next)

private async Task DispatchAsync(IMessageContext context, MiddlewareDelegate next)
{
await _dispatchSemaphore.WaitAsync();
await _dispatchSemaphore.WaitAsync().ConfigureAwait(false);

_dispatchTokenSource.Dispose();
_dispatchTokenSource = null;
Expand Down
6 changes: 3 additions & 3 deletions src/KafkaFlow/Clusters/ClusterManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ public async Task<IEnumerable<TopicPartitionOffset>> GetConsumerGroupOffsetsAsyn

foreach (var name in topicsName)
{
topicsMetadata.Add((name, await this.GetTopicMetadataAsync(name)));
topicsMetadata.Add((name, await this.GetTopicMetadataAsync(name).ConfigureAwait(false)));
}

var topics =
Expand All @@ -98,7 +98,7 @@ public async Task<IEnumerable<TopicPartitionOffset>> GetConsumerGroupOffsetsAsyn
.ToList();

var result = await _lazyAdminClient.Value.ListConsumerGroupOffsetsAsync(
new[] { new ConsumerGroupTopicPartitions(consumerGroup, topics) });
new[] { new ConsumerGroupTopicPartitions(consumerGroup, topics) }).ConfigureAwait(false);

if (!result.Any())
{
Expand All @@ -125,7 +125,7 @@ public async Task CreateIfNotExistsAsync(IEnumerable<TopicConfiguration> configu
})
.ToArray();

await _lazyAdminClient.Value.CreateTopicsAsync(topics);
await _lazyAdminClient.Value.CreateTopicsAsync(topics).ConfigureAwait(false);
}
catch (CreateTopicsException exception)
{
Expand Down
6 changes: 3 additions & 3 deletions src/KafkaFlow/Consumers/Consumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ public async ValueTask<ConsumeResult<byte[], byte[]>> ConsumeAsync(CancellationT
try
{
this.EnsureConsumer();
await _flowManager.BlockHeartbeat(cancellationToken);
await _flowManager.BlockHeartbeat(cancellationToken).ConfigureAwait(false);
return _consumer.Consume(cancellationToken);
}
catch (OperationCanceledException)
Expand All @@ -176,7 +176,7 @@ public async ValueTask<ConsumeResult<byte[], byte[]>> ConsumeAsync(CancellationT
"Max Poll Interval Exceeded",
new { this.Configuration.ConsumerName });

await _maxPollIntervalExceeded.FireAsync();
await _maxPollIntervalExceeded.FireAsync().ConfigureAwait(false);
}
catch (KafkaException ex) when (ex.Error.IsFatal)
{
Expand All @@ -187,7 +187,7 @@ public async ValueTask<ConsumeResult<byte[], byte[]>> ConsumeAsync(CancellationT

this.InvalidateConsumer();

await Task.Delay(5000, cancellationToken);
await Task.Delay(5000, cancellationToken).ConfigureAwait(false);
}
catch (Exception ex)
{
Expand Down
13 changes: 7 additions & 6 deletions src/KafkaFlow/Consumers/ConsumerManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -70,14 +70,14 @@ private void StartEvaluateWorkerCountTimer() => _evaluateWorkersCountTimer?.Chan

private async Task EvaluateWorkersCountAsync()
{
var newWorkersCount = await this.CalculateWorkersCount(this.Consumer.Assignment);
var newWorkersCount = await this.CalculateWorkersCount(this.Consumer.Assignment).ConfigureAwait(false);

if (newWorkersCount == this.WorkerPool.CurrentWorkersCount)
{
return;
}

await this.ChangeWorkersCountAsync(newWorkersCount);
await this.ChangeWorkersCountAsync(newWorkersCount).ConfigureAwait(false);
}

private async Task ChangeWorkersCountAsync(int workersCount)
Expand All @@ -86,10 +86,10 @@ private async Task ChangeWorkersCountAsync(int workersCount)
{
this.StopEvaluateWorkerCountTimer();

await this.Feeder.StopAsync();
await this.WorkerPool.StopAsync();
await this.Feeder.StopAsync().ConfigureAwait(false);
await this.WorkerPool.StopAsync().ConfigureAwait(false);

await this.WorkerPool.StartAsync(this.Consumer.Assignment, workersCount);
await this.WorkerPool.StartAsync(this.Consumer.Assignment, workersCount).ConfigureAwait(false);
this.Feeder.Start();

this.StartEvaluateWorkerCountTimer();
Expand Down Expand Up @@ -155,7 +155,8 @@ private async Task<int> CalculateWorkersCount(IEnumerable<Confluent.Kafka.TopicP
.Select(x => x.Partition.Value)
.ToList()))
.ToList()),
_dependencyResolver);
_dependencyResolver)
.ConfigureAwait(false);
}
catch (Exception e)
{
Expand Down
27 changes: 14 additions & 13 deletions src/KafkaFlow/Consumers/ConsumerWorker.cs
Original file line number Diff line number Diff line change
Expand Up @@ -75,19 +75,20 @@ public Task StartAsync(CancellationToken stopCancellationToken)

try
{
await foreach (var context in _messagesBuffer.Reader.ReadAllItemsAsync(stopCancellationToken))
await foreach (var context in _messagesBuffer.Reader.ReadAllItemsAsync(stopCancellationToken).ConfigureAwait(false))
{
currentContext = context;

await this
.ProcessMessageAsync(context, stopCancellationToken)
.WithCancellation(stopCancellationToken, true);
.WithCancellation(stopCancellationToken, true)
.ConfigureAwait(false);
}
}
catch (OperationCanceledException)
{
currentContext?.ConsumerContext.Discard();
await this.DiscardBufferedContextsAsync();
await this.DiscardBufferedContextsAsync().ConfigureAwait(false);
}
catch (Exception ex)
{
Expand All @@ -101,13 +102,13 @@ await this

public async Task StopAsync()
{
await _workerStoppingEvent.FireAsync();
await _workerStoppingEvent.FireAsync().ConfigureAwait(false);

_messagesBuffer.Writer.TryComplete();

await _backgroundTask;
await _backgroundTask.ConfigureAwait(false);

await _workerStoppedEvent.FireAsync();
await _workerStoppedEvent.FireAsync().ConfigureAwait(false);
}

public void Dispose()
Expand All @@ -118,7 +119,7 @@ public void Dispose()

private async Task DiscardBufferedContextsAsync()
{
await foreach (var context in _messagesBuffer.Reader.ReadAllItemsAsync(CancellationToken.None))
await foreach (var context in _messagesBuffer.Reader.ReadAllItemsAsync(CancellationToken.None).ConfigureAwait(false))
{
context.ConsumerContext.Discard();
}
Expand All @@ -130,29 +131,29 @@ private async Task ProcessMessageAsync(IMessageContext context, CancellationToke
{
try
{
await _globalEvents.FireMessageConsumeStartedAsync(new MessageEventContext(context));
await _globalEvents.FireMessageConsumeStartedAsync(new MessageEventContext(context)).ConfigureAwait(false);

_ = context.ConsumerContext.Completion.ContinueWith(
async task =>
{
if (task.IsFaulted)
{
await _globalEvents.FireMessageConsumeErrorAsync(new MessageErrorEventContext(context, task.Exception));
await _globalEvents.FireMessageConsumeErrorAsync(new MessageErrorEventContext(context, task.Exception)).ConfigureAwait(false);
}

await _globalEvents.FireMessageConsumeCompletedAsync(new MessageEventContext(context));
await _globalEvents.FireMessageConsumeCompletedAsync(new MessageEventContext(context)).ConfigureAwait(false);
},
CancellationToken.None);

await _middlewareExecutor.Execute(context, _ => Task.CompletedTask);
await _middlewareExecutor.Execute(context, _ => Task.CompletedTask).ConfigureAwait(false);
}
catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested)
{
context.ConsumerContext.ShouldStoreOffset = false;
}
catch (Exception ex)
{
await _globalEvents.FireMessageConsumeErrorAsync(new MessageErrorEventContext(context, ex));
await _globalEvents.FireMessageConsumeErrorAsync(new MessageErrorEventContext(context, ex)).ConfigureAwait(false);

_logHandler.Error(
"Error processing message",
Expand All @@ -172,7 +173,7 @@ private async Task ProcessMessageAsync(IMessageContext context, CancellationToke
context.ConsumerContext.Complete();
}

await _workerProcessingEnded.FireAsync(context);
await _workerProcessingEnded.FireAsync(context).ConfigureAwait(false);
}
}
catch (Exception ex)
Expand Down
20 changes: 11 additions & 9 deletions src/KafkaFlow/Consumers/ConsumerWorkerPool.cs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ public async Task StartAsync(IReadOnlyCollection<TopicPartition> partitions, int
new NullOffsetManager() :
new OffsetManager(_offsetCommitter, partitions);

await _offsetCommitter.StartAsync();
await _offsetCommitter.StartAsync().ConfigureAwait(false);

this.CurrentWorkersCount = workersCount;

Expand Down Expand Up @@ -94,7 +94,8 @@ await Task.WhenAll(
_workers.Add(worker);

return worker.StartAsync(_stopCancellationTokenSource.Token);
}));
}))
.ConfigureAwait(false);

_distributionStrategy = _distributionStrategyFactory(_consumerDependencyResolver);
_distributionStrategy.Initialize(_workers.AsReadOnly());
Expand Down Expand Up @@ -129,24 +130,25 @@ public async Task StopAsync()
_stopCancellationTokenSource.CancelAfter(_consumer.Configuration.WorkerStopTimeout);
}

await Task.WhenAll(currentWorkers.Select(x => x.StopAsync()));
await Task.WhenAll(currentWorkers.Select(x => x.StopAsync())).ConfigureAwait(false);
await _offsetManager
.WaitContextsCompletionAsync()
.WithCancellation(_stopCancellationTokenSource.Token, false);
.WithCancellation(_stopCancellationTokenSource.Token, false)
.ConfigureAwait(false);

currentWorkers.ForEach(worker => worker.Dispose());
_stopCancellationTokenSource?.Dispose();

_offsetManager = null;

await _workerPoolStoppedSubject.FireAsync();
await _workerPoolStoppedSubject.FireAsync().ConfigureAwait(false);

await _offsetCommitter.StopAsync();
await _offsetCommitter.StopAsync().ConfigureAwait(false);
}

public async Task EnqueueAsync(ConsumeResult<byte[], byte[]> message, CancellationToken stopCancellationToken)
{
await _startedTaskSource.Task;
await _startedTaskSource.Task.ConfigureAwait(false);

var worker = (IConsumerWorker)await _distributionStrategy
.GetWorkerAsync(
Expand All @@ -155,7 +157,7 @@ public async Task EnqueueAsync(ConsumeResult<byte[], byte[]> message, Cancellati
message.Topic,
message.Partition.Value,
message.Message.Key,
stopCancellationToken));
stopCancellationToken)).ConfigureAwait(false);

if (worker is null)
{
Expand All @@ -166,7 +168,7 @@ public async Task EnqueueAsync(ConsumeResult<byte[], byte[]> message, Cancellati

_offsetManager.Enqueue(context.ConsumerContext);

await worker.EnqueueAsync(context);
await worker.EnqueueAsync(context).ConfigureAwait(false);
}

private MessageContext CreateMessageContext(ConsumeResult<byte[], byte[]> message, IConsumerWorker worker)
Expand Down
2 changes: 1 addition & 1 deletion src/KafkaFlow/Consumers/WorkerPoolFeeder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,6 @@ public async Task StopAsync()
_stopTokenSource.Dispose();
}

await (_feederTask ?? Task.CompletedTask);
await (_feederTask ?? Task.CompletedTask).ConfigureAwait(false);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ public ConsumerLagWorkerBalancer(

public async Task<int> GetWorkersCountAsync(WorkersCountContext context)
{
var workers = await this.CalculateAsync(context);
var workers = await this.CalculateAsync(context).ConfigureAwait(false);

_logHandler.Info(
"New workers count calculated",
Expand Down Expand Up @@ -97,13 +97,14 @@ private async Task<int> CalculateAsync(WorkersCountContext context)
return DefaultWorkersCount;
}

var topicsMetadata = await this.GetTopicsMetadataAsync(context);
var topicsMetadata = await this.GetTopicsMetadataAsync(context).ConfigureAwait(false);

var lastOffsets = this.GetPartitionsLastOffset(context.ConsumerName, topicsMetadata);

var partitionsOffset = await _clusterManager.GetConsumerGroupOffsetsAsync(
context.ConsumerGroupId,
context.AssignedTopicsPartitions.Select(t => t.Name));
context.AssignedTopicsPartitions.Select(t => t.Name))
.ConfigureAwait(false);

var partitionsLag = CalculatePartitionsLag(lastOffsets, partitionsOffset);
var instanceLag = CalculateMyPartitionsLag(context, partitionsLag);
Expand Down Expand Up @@ -156,7 +157,7 @@ private async Task<int> CalculateAsync(WorkersCountContext context)

foreach (var topic in context.AssignedTopicsPartitions)
{
topicsMetadata.Add((topic.Name, await _clusterManager.GetTopicMetadataAsync(topic.Name)));
topicsMetadata.Add((topic.Name, await _clusterManager.GetTopicMetadataAsync(topic.Name).ConfigureAwait(false)));
}

return topicsMetadata;
Expand Down
2 changes: 1 addition & 1 deletion src/KafkaFlow/Extensions/ChannelExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ public static async IAsyncEnumerable<T> ReadAllItemsAsync<T>(
this ChannelReader<T> reader,
[EnumeratorCancellation] CancellationToken cancellationToken = default)
{
while (await reader.WaitToReadAsync(cancellationToken))
while (await reader.WaitToReadAsync(cancellationToken).ConfigureAwait(false))
{
while (reader.TryRead(out var item))
{
Expand Down
2 changes: 1 addition & 1 deletion src/KafkaFlow/Extensions/TaskExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ public static async Task WithCancellation(
return;
}

await Task.WhenAny(task, tcs.Task);
await Task.WhenAny(task, tcs.Task).ConfigureAwait(false);

void TrySetResult()
{
Expand Down
5 changes: 3 additions & 2 deletions src/KafkaFlow/KafkaBus.cs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ public async Task StartAsync(CancellationToken stopCancellationToken = default)

foreach (var cluster in _configuration.Clusters)
{
await this.CreateMissingClusterTopics(cluster);
await this.CreateMissingClusterTopics(cluster).ConfigureAwait(false);

foreach (var consumerConfiguration in cluster.Consumers)
{
Expand Down Expand Up @@ -101,6 +101,7 @@ private async Task CreateMissingClusterTopics(ClusterConfiguration cluster)
}

await _clusterManagerAccessor[cluster.Name].CreateIfNotExistsAsync(
cluster.TopicsToCreateIfNotExist);
cluster.TopicsToCreateIfNotExist)
.ConfigureAwait(false);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ public async Task<bool> TryExecuteActionAsync(long metricValue)
return false;
}

await _action.ExecuteAsync();
await _action.ExecuteAsync().ConfigureAwait(false);

return true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ public async Task Invoke(IMessageContext context, MiddlewareDelegate next)
return;
}

var messageType = await _typeResolver.OnConsumeAsync(context);
var messageType = await _typeResolver.OnConsumeAsync(context).ConfigureAwait(false);

if (messageType is null)
{
Expand Down
Loading

0 comments on commit 4c9bcd8

Please sign in to comment.