Skip to content

Commit

Permalink
drain channel instead of making new channel
Browse files Browse the repository at this point in the history
  • Loading branch information
jonfung-dydx committed May 28, 2024
1 parent d69c744 commit 040ad24
Showing 1 changed file with 7 additions and 9 deletions.
16 changes: 7 additions & 9 deletions protocol/streaming/grpc/grpc_streaming_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,14 +68,11 @@ func NewGrpcStreamingManager(

// Worker goroutine to consistently read from channel and send out updates
go func() {
for {
for internalResponse := range grpcStreamingManager.updateBuffer {
grpcStreamingManager.logger.Info("start polling a response", "len", len(grpcStreamingManager.updateBuffer))
grpcStreamingManager.sendUpdateResponse(internalResponse)
grpcStreamingManager.logger.Info("finish polling a response", "len", len(grpcStreamingManager.updateBuffer))

}
for internalResponse := range grpcStreamingManager.updateBuffer {
grpcStreamingManager.logger.Info("start polling a response", "len", len(grpcStreamingManager.updateBuffer))
grpcStreamingManager.sendUpdateResponse(internalResponse)
}
grpcStreamingManager.logger.Error("Should never see this. Poller has failed.")
}()

return grpcStreamingManager
Expand Down Expand Up @@ -270,8 +267,9 @@ func (sm *GrpcStreamingManagerImpl) mustEnqueueOrderbookUpdate(internalResponse
sm.removeSubscription(k)
}
// Clear out the buffer
close(sm.updateBuffer)
sm.updateBuffer = make(chan bufferInternalResponse, sm.updateBufferWindowSize)
for len(sm.updateBuffer) > 0 {
<-sm.updateBuffer
}
}
sm.EmitMetrics()
}
Expand Down

0 comments on commit 040ad24

Please sign in to comment.