diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowGroupBySpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowGroupBySpec.scala index 8a12c5592ee..116c8bfd672 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowGroupBySpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowGroupBySpec.scala @@ -680,6 +680,23 @@ class FlowGroupBySpec extends StreamSpec(""" queue.complete() } + "not throw tooManySubstreamsOpenException for element on closed substream" in { + val publisher = TestPublisher.Probe[(Int, Boolean)]() + val outProbe = + Source.fromPublisher(publisher).groupBy(2, _._1).takeWhile(_._2 != false).mergeSubstreams.runWith(TestSink()) + outProbe.request(4) + publisher.sendNext((1, true)) + outProbe.expectNext((1, true)) + publisher.sendNext((2, true)) + outProbe.expectNext((2, true)) + publisher.sendNext((2, false)) // substream 2 completed + publisher.sendNext((2, false)) // should be dropped, not crash the stream + publisher.sendNext((1, true)) + outProbe.expectNext((1, true)) + + outProbe.cancel() + } + } } diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/StreamOfStreams.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/StreamOfStreams.scala index ec475d46822..640308938b4 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/StreamOfStreams.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/StreamOfStreams.scala @@ -351,10 +351,10 @@ import akka.util.ccompat.JavaConverters._ nextElementValue = elem } } else { - if (activeSubstreamsMap.size + closedSubstreams.size == maxSubstreams) - throw tooManySubstreamsOpenException - else if (closedSubstreams.contains(key) && !hasBeenPulled(in)) + if (closedSubstreams.contains(key)) pull(in) + else if (activeSubstreamsMap.size + closedSubstreams.size == maxSubstreams) + throw tooManySubstreamsOpenException else runSubstream(key, elem) } } catch {