Skip to content

Commit

Permalink
perf: Reduce loops in when clean queue in BroadcastHub (#1628)
Browse files Browse the repository at this point in the history
  • Loading branch information
He-Pin authored Dec 30, 2024
1 parent 136319f commit 9596ea4
Showing 1 changed file with 15 additions and 4 deletions.
19 changes: 15 additions & 4 deletions stream/src/main/scala/org/apache/pekko/stream/scaladsl/Hub.scala
Original file line number Diff line number Diff line change
Expand Up @@ -605,10 +605,7 @@ private[pekko] class BroadcastHub[T](startAfterNrOfConsumers: Int, bufferSize: I
else if (head != finalOffset) {
// If our final consumer goes away, we roll forward the buffer so a subsequent consumer does not
// see the already consumed elements. This feature is quite handy.
while (head != finalOffset) {
queue(head & Mask) = null
head += 1
}
cleanQueueInRange(head, finalOffset)
head = finalOffset
tryPull()
}
Expand All @@ -617,6 +614,20 @@ private[pekko] class BroadcastHub[T](startAfterNrOfConsumers: Int, bufferSize: I
}
}

private def cleanQueueInRange(headOffset: Int, upToOffset: Int): Unit = {
// We need to clean the queue from headOffset to upToOffset
if (headOffset != upToOffset) {
val startIdx = headOffset & Mask
val endIdx = upToOffset & Mask
if (startIdx <= endIdx) {
java.util.Arrays.fill(queue, startIdx, endIdx, null)
} else {
java.util.Arrays.fill(queue, startIdx, queue.length, null)
java.util.Arrays.fill(queue, 0, endIdx, null)
}
}
}

// Producer API
// We are full if the distance between the slowest (known) consumer and the fastest (known) consumer is
// the buffer size. We must wait until the slowest either advances, or cancels.
Expand Down

0 comments on commit 9596ea4

Please sign in to comment.