Skip to content

Commit

Permalink
polling goroutine permanant
Browse files Browse the repository at this point in the history
  • Loading branch information
jonfung-dydx committed May 24, 2024
1 parent 1066d2b commit d69c744
Showing 1 changed file with 7 additions and 4 deletions.
11 changes: 7 additions & 4 deletions protocol/streaming/grpc/grpc_streaming_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,11 +68,13 @@ func NewGrpcStreamingManager(

// Worker goroutine to consistently read from channel and send out updates
go func() {
for internalResponse := range grpcStreamingManager.updateBuffer {
grpcStreamingManager.logger.Info("start polling a response", "len", len(grpcStreamingManager.updateBuffer))
grpcStreamingManager.sendUpdateResponse(internalResponse)
grpcStreamingManager.logger.Info("finish polling a response", "len", len(grpcStreamingManager.updateBuffer))
for {
for internalResponse := range grpcStreamingManager.updateBuffer {
grpcStreamingManager.logger.Info("start polling a response", "len", len(grpcStreamingManager.updateBuffer))
grpcStreamingManager.sendUpdateResponse(internalResponse)
grpcStreamingManager.logger.Info("finish polling a response", "len", len(grpcStreamingManager.updateBuffer))

}
}
}()

Expand Down Expand Up @@ -268,6 +270,7 @@ func (sm *GrpcStreamingManagerImpl) mustEnqueueOrderbookUpdate(internalResponse
sm.removeSubscription(k)
}
// Clear out the buffer
close(sm.updateBuffer)
sm.updateBuffer = make(chan bufferInternalResponse, sm.updateBufferWindowSize)
}
sm.EmitMetrics()
Expand Down

0 comments on commit d69c744

Please sign in to comment.