Skip to content

Commit

Permalink
complete MergeHub Sink gracefully on NormalShutdownException
Browse files Browse the repository at this point in the history
  • Loading branch information
anpin committed Jan 14, 2025
1 parent 6fd8701 commit 1542899
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 2 deletions.
31 changes: 31 additions & 0 deletions src/core/Akka.Streams.Tests/Dsl/HubSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
using FluentAssertions.Extensions;
using Xunit.Abstractions;
using static FluentAssertions.FluentActions;
using Akka.Streams.Implementation;

namespace Akka.Streams.Tests.Dsl
{
Expand Down Expand Up @@ -276,6 +277,36 @@ await EventFilter.Error(contains: "Upstream producer failed with exception").Exp
}, Materializer);
}

[Fact]
public async Task MergeHub_must_not_log_normal_shutdown_exception()
{
await this.AssertAllStagesStoppedAsync(async () =>
{
var (sink, task) = MergeHub.Source<int>(16).Take(10).ToMaterialized(Sink.Seq<int>(), Keep.Both).Run(Materializer);

await WithinAsync(10.Seconds(), async () =>
{
await EventFilter
.Custom((e) =>
{
if (e.Cause?.InnerException is NormalShutdownException nse && nse == ActorPublisher.NormalShutdownReason)
return true;
else
return false;
})

// await EventFilter.Error(contains: ActorPublisher.NormalShutdownReasonMessage)
.ExpectAsync(0, async () =>
{
Source.Failed<int>(ActorPublisher.NormalShutdownReason).RunWith(sink, Materializer);
Source.From(Enumerable.Range(1, 10)).RunWith(sink, Materializer);
var result = await task.ShouldCompleteWithin(3.Seconds());
result.Should().BeEquivalentTo(Enumerable.Range(1, 10));
});
});
}, Materializer);
}

[Fact]
public async Task BroadcastHub_must_work_in_the_happy_case()
{
Expand Down
8 changes: 6 additions & 2 deletions src/core/Akka.Streams/Dsl/Hub.cs
Original file line number Diff line number Diff line change
Expand Up @@ -380,8 +380,12 @@ private void PullWithDemand()
// Make some noise
public override void OnUpstreamFailure(Exception e)
{
throw new MergeHub.ProducerFailed(
"Upstream producer failed with exception, removing from MergeHub now", e);
if(e is Implementation.NormalShutdownException)
CompleteStage();
else {
throw new MergeHub.ProducerFailed(
"Upstream producer failed with exception, removing from MergeHub now", e);
}
}

private void OnDemand(long moreDemand)
Expand Down

0 comments on commit 1542899

Please sign in to comment.