diff --git a/internal/socket/duplex.go b/internal/socket/duplex.go index ce5cecc..7e5c014 100644 --- a/internal/socket/duplex.go +++ b/internal/socket/duplex.go @@ -510,6 +510,7 @@ func (dc *DuplexConnection) respondRequestResponse(receiving fragmentation.Heade sending.SubscribeWith(context.Background(), sub) return nil } + if err := scheduler.Elastic().Worker().Do(func() { sending.SubscribeWith(context.Background(), sub) }); err != nil { @@ -866,7 +867,8 @@ func (dc *DuplexConnection) onFramePayload(frame core.BufferedFrame) error { switch handler := v.(type) { case *requestResponseCallback: handler.cache = next - handler.sink.Success(next) + // TODO: workaround for processor sink bug + go handler.sink.Success(next) case requestStreamCallback: fg := h.Flag() isNext := fg.Check(core.FlagNext)