From 9e09674c453167817387cf90183cc78825b9c6fe Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Johan=20Andr=C3=A9n?= Date: Fri, 10 Mar 2023 16:23:20 +0100 Subject: [PATCH] fix: GroupBy incorrectly failing for element on closed substream #30205 (#31872) --- .../akka/stream/scaladsl/FlowGroupBySpec.scala | 17 +++++++++++++++++ .../stream/impl/fusing/StreamOfStreams.scala | 6 +++--- 2 files changed, 20 insertions(+), 3 deletions(-) diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowGroupBySpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowGroupBySpec.scala index 8a12c5592ee..116c8bfd672 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowGroupBySpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowGroupBySpec.scala @@ -680,6 +680,23 @@ class FlowGroupBySpec extends StreamSpec(""" queue.complete() } + "not throw tooManySubstreamsOpenException for element on closed substream" in { + val publisher = TestPublisher.Probe[(Int, Boolean)]() + val outProbe = + Source.fromPublisher(publisher).groupBy(2, _._1).takeWhile(_._2 != false).mergeSubstreams.runWith(TestSink()) + outProbe.request(4) + publisher.sendNext((1, true)) + outProbe.expectNext((1, true)) + publisher.sendNext((2, true)) + outProbe.expectNext((2, true)) + publisher.sendNext((2, false)) // substream 2 completed + publisher.sendNext((2, false)) // should be dropped, not crash the stream + publisher.sendNext((1, true)) + outProbe.expectNext((1, true)) + + outProbe.cancel() + } + } } diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/StreamOfStreams.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/StreamOfStreams.scala index ec475d46822..640308938b4 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/StreamOfStreams.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/StreamOfStreams.scala @@ -351,10 +351,10 @@ import akka.util.ccompat.JavaConverters._ nextElementValue = elem } } else { - if (activeSubstreamsMap.size + closedSubstreams.size == maxSubstreams) - throw tooManySubstreamsOpenException - else if (closedSubstreams.contains(key) && !hasBeenPulled(in)) + if (closedSubstreams.contains(key)) pull(in) + else if (activeSubstreamsMap.size + closedSubstreams.size == maxSubstreams) + throw tooManySubstreamsOpenException else runSubstream(key, elem) } } catch {