Skip to content

Commit

Permalink
fix: GroupBy incorrectly failing for element on closed substream akka…
Browse files Browse the repository at this point in the history
  • Loading branch information
johanandren authored Mar 10, 2023
1 parent da61ab2 commit 9e09674
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -680,6 +680,23 @@ class FlowGroupBySpec extends StreamSpec("""
queue.complete()
}

"not throw tooManySubstreamsOpenException for element on closed substream" in {
val publisher = TestPublisher.Probe[(Int, Boolean)]()
val outProbe =
Source.fromPublisher(publisher).groupBy(2, _._1).takeWhile(_._2 != false).mergeSubstreams.runWith(TestSink())
outProbe.request(4)
publisher.sendNext((1, true))
outProbe.expectNext((1, true))
publisher.sendNext((2, true))
outProbe.expectNext((2, true))
publisher.sendNext((2, false)) // substream 2 completed
publisher.sendNext((2, false)) // should be dropped, not crash the stream
publisher.sendNext((1, true))
outProbe.expectNext((1, true))

outProbe.cancel()
}

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -351,10 +351,10 @@ import akka.util.ccompat.JavaConverters._
nextElementValue = elem
}
} else {
if (activeSubstreamsMap.size + closedSubstreams.size == maxSubstreams)
throw tooManySubstreamsOpenException
else if (closedSubstreams.contains(key) && !hasBeenPulled(in))
if (closedSubstreams.contains(key))
pull(in)
else if (activeSubstreamsMap.size + closedSubstreams.size == maxSubstreams)
throw tooManySubstreamsOpenException
else runSubstream(key, elem)
}
} catch {
Expand Down

0 comments on commit 9e09674

Please sign in to comment.