diff --git a/src/core/Akka.Streams.Tests/Dsl/HubSpec.cs b/src/core/Akka.Streams.Tests/Dsl/HubSpec.cs index 42b7a345aae..e362a340b46 100644 --- a/src/core/Akka.Streams.Tests/Dsl/HubSpec.cs +++ b/src/core/Akka.Streams.Tests/Dsl/HubSpec.cs @@ -24,6 +24,7 @@ using FluentAssertions.Extensions; using Xunit.Abstractions; using static FluentAssertions.FluentActions; +using Akka.Streams.Implementation; namespace Akka.Streams.Tests.Dsl { @@ -276,6 +277,36 @@ await EventFilter.Error(contains: "Upstream producer failed with exception").Exp }, Materializer); } + [Fact] + public async Task MergeHub_must_not_log_normal_shutdown_exception() + { + await this.AssertAllStagesStoppedAsync(async () => + { + var (sink, task) = MergeHub.Source(16).Take(10).ToMaterialized(Sink.Seq(), Keep.Both).Run(Materializer); + + await WithinAsync(10.Seconds(), async () => + { + await EventFilter + .Custom((e) => + { + if (e.Cause?.InnerException is NormalShutdownException nse && nse == ActorPublisher.NormalShutdownReason) + return true; + else + return false; + }) + + // await EventFilter.Error(contains: ActorPublisher.NormalShutdownReasonMessage) + .ExpectAsync(0, async () => + { + Source.Failed(ActorPublisher.NormalShutdownReason).RunWith(sink, Materializer); + Source.From(Enumerable.Range(1, 10)).RunWith(sink, Materializer); + var result = await task.ShouldCompleteWithin(3.Seconds()); + result.Should().BeEquivalentTo(Enumerable.Range(1, 10)); + }); + }); + }, Materializer); + } + [Fact] public async Task BroadcastHub_must_work_in_the_happy_case() { diff --git a/src/core/Akka.Streams/Dsl/Hub.cs b/src/core/Akka.Streams/Dsl/Hub.cs index 49dfc19359a..cdc087be1d1 100644 --- a/src/core/Akka.Streams/Dsl/Hub.cs +++ b/src/core/Akka.Streams/Dsl/Hub.cs @@ -380,8 +380,12 @@ private void PullWithDemand() // Make some noise public override void OnUpstreamFailure(Exception e) { - throw new MergeHub.ProducerFailed( - "Upstream producer failed with exception, removing from MergeHub now", e); + if(e is Implementation.NormalShutdownException) + CompleteStage(); + else { + throw new MergeHub.ProducerFailed( + "Upstream producer failed with exception, removing from MergeHub now", e); + } } private void OnDemand(long moreDemand)