diff --git a/.gitignore b/.gitignore index 8c2197a..3ba20ba 100644 --- a/.gitignore +++ b/.gitignore @@ -10,6 +10,7 @@ # Output of the go coverage tool, specifically when used with LiteIDE *.out +*.pprof # Others vendor/ diff --git a/examples/echo_bench/echo_bench.go b/examples/echo_bench/echo_bench.go index 229bd30..11b2102 100644 --- a/examples/echo_bench/echo_bench.go +++ b/examples/echo_bench/echo_bench.go @@ -6,11 +6,10 @@ import ( "flag" "log" "math/rand" - "net/http" - _ "net/http/pprof" "sync" "time" + "github.com/pkg/profile" "github.com/rsocket/rsocket-go" "github.com/rsocket/rsocket-go/core/transport" "github.com/rsocket/rsocket-go/internal/common" @@ -26,10 +25,6 @@ func init() { flag.Parse() rand.Seed(time.Now().UnixNano()) tp = rsocket.TCPClient().SetHostAndPort("127.0.0.1", 7878).Build() - - go func() { - log.Println(http.ListenAndServe(":5555", nil)) - }() } func main() { @@ -37,10 +32,16 @@ func main() { n int payloadSize int mtu int + pprof bool ) flag.IntVar(&n, "n", 100*10000, "request amount.") flag.IntVar(&payloadSize, "size", 1024, "payload data size.") flag.IntVar(&mtu, "mtu", 0, "mut size, zero means disabled.") + flag.BoolVar(&pprof, "pprof", false, "enable pprof") + + if pprof { + defer profile.Start(profile.MemProfileHeap(), profile.CPUProfile, profile.ProfilePath(".")).Stop() + } client, err := createClient(mtu) if err != nil { diff --git a/go.mod b/go.mod index 1b8c45f..814b433 100644 --- a/go.mod +++ b/go.mod @@ -8,6 +8,7 @@ require ( github.com/gorilla/websocket v1.4.2 github.com/jjeffcaii/reactor-go v0.4.2 github.com/pkg/errors v0.9.1 + github.com/pkg/profile v1.5.0 github.com/stretchr/testify v1.6.1 github.com/urfave/cli/v2 v2.1.1 go.uber.org/atomic v1.7.0 diff --git a/go.sum b/go.sum index 67486c0..0df19cc 100644 --- a/go.sum +++ b/go.sum @@ -16,6 +16,8 @@ github.com/panjf2000/ants/v2 v2.4.3 h1:wHghL17YKFanB62QjPQ9o+DuM4q7WrQ7zAhoX8+eB github.com/panjf2000/ants/v2 v2.4.3/go.mod h1:f6F0NZVFsGCp5A7QW/Zj/m92atWwOkY0OIhFxRNFr4A= github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= +github.com/pkg/profile v1.5.0 h1:042Buzk+NhDI+DeSAA62RwJL8VAuZUMQZUjCsRz1Mug= +github.com/pkg/profile v1.5.0/go.mod h1:qBsxPvzyUincmltOk6iyRVxHYg4adc0OFOv72ZdLa18= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/russross/blackfriday/v2 v2.0.1 h1:lPqVAte+HuHNfhJ/0LC98ESWRz8afy9tM/0RK8m9o+Q= diff --git a/internal/socket/duplex.go b/internal/socket/duplex.go index 7f8c228..300a750 100644 --- a/internal/socket/duplex.go +++ b/internal/socket/duplex.go @@ -31,7 +31,6 @@ const _minRequestSchedulerSize = 1000 var errSocketClosed = errors.New("rsocket: socket closed already") var errRequestFailed = errors.New("rsocket: send request failed") -var _errRespondFailed = errors.New("rsocket: create responder failed") var ( unsupportedRequestStream = []byte("Request-Stream not implemented.") @@ -39,6 +38,13 @@ var ( unsupportedRequestChannel = []byte("Request-Channel not implemented.") ) +func mustExecute(sc scheduler.Scheduler, handler func()) { + if err := sc.Worker().Do(handler); err == nil { + return + } + go handler() +} + // DuplexConnection represents a socket of RSocket which can be a requester or a responder. type DuplexConnection struct { reqSche scheduler.Scheduler @@ -180,26 +186,42 @@ func (dc *DuplexConnection) destroySndBacklog() { } // FireAndForget start a request of FireAndForget. -func (dc *DuplexConnection) FireAndForget(sending payload.Payload) { - data := sending.Data() - size := core.FrameHeaderLen + len(sending.Data()) - m, ok := sending.Metadata() +func (dc *DuplexConnection) FireAndForget(req payload.Payload) { + data := req.Data() + size := core.FrameHeaderLen + len(req.Data()) + m, ok := req.Metadata() if ok { size += 3 + len(m) } sid := dc.nextStreamID() + + releasable, isReleasable := req.(common.Releasable) + if isReleasable { + releasable.IncRef() + } + if !dc.shouldSplit(size) { - dc.sendFrame(framing.NewWriteableFireAndForgetFrame(sid, data, m, 0)) + outMsg := framing.NewWriteableFireAndForgetFrame(sid, data, m, 0) + if isReleasable { + outMsg.HandleDone(func() { + releasable.Release() + }) + } + dc.sendFrame(outMsg) return } dc.doSplit(data, m, func(index int, result fragmentation.SplitResult) { - var f core.WriteableFrame + var outMsg core.WriteableFrame if index == 0 { - f = framing.NewWriteableFireAndForgetFrame(sid, result.Data, result.Metadata, result.Flag) + outMsg = framing.NewWriteableFireAndForgetFrame(sid, result.Data, result.Metadata, result.Flag) } else { - f = framing.NewWriteablePayloadFrame(sid, result.Data, result.Metadata, result.Flag|core.FlagNext) + outMsg = framing.NewWriteablePayloadFrame(sid, result.Data, result.Metadata, result.Flag|core.FlagNext) + } + + if !result.Flag.Check(core.FlagFollow) && isReleasable { + releasable.Release() } - dc.sendFrame(f) + dc.sendFrame(outMsg) }) } @@ -492,14 +514,23 @@ func (dc *DuplexConnection) respondRequestResponse(receiving fragmentation.Heade sid := receiving.Header().StreamID() // execute socket handler - sending, err := func() (mono mono.Mono, err error) { + sending, err := func() (resp mono.Mono, err error) { defer func() { - if e := recover(); e != nil { - logger.Errorf("respond REQUEST_RESPONSE failed: %s\n", e) - err = _errRespondFailed + rec := recover() + if rec == nil { + return } + if e, ok := rec.(error); ok { + err = errors.WithStack(e) + } else { + err = errors.Errorf("%v", e) + } + logger.Errorf("handle request-response failed: %+v\n", err) }() - mono = dc.responder.RequestResponse(receiving) + resp = dc.responder.RequestResponse(receiving) + if resp == nil { + err = framing.NewWriteableErrorFrame(sid, core.ErrorCodeApplicationError, unsupportedRequestResponse) + } return }() // sending error with panic @@ -508,12 +539,6 @@ func (dc *DuplexConnection) respondRequestResponse(receiving fragmentation.Heade dc.writeError(sid, err) return nil } - // sending error with unsupported handler - if sending == nil { - common.TryRelease(receiving) - dc.writeError(sid, framing.NewWriteableErrorFrame(sid, core.ErrorCodeApplicationError, unsupportedRequestResponse)) - return nil - } // async subscribe publisher sub := borrowRequestResponseSubscriber(dc, sid, receiving) @@ -521,12 +546,9 @@ func (dc *DuplexConnection) respondRequestResponse(receiving fragmentation.Heade sending.SubscribeWith(context.Background(), sub) return nil } - - if err := dc.resSche.Worker().Do(func() { + mustExecute(dc.resSche, func() { sending.SubscribeWith(context.Background(), sub) - }); err != nil { - go sending.SubscribeWith(context.Background(), sub) - } + }) return nil } @@ -587,15 +609,21 @@ func (dc *DuplexConnection) respondRequestChannel(req fragmentation.HeaderAndPay SubscribeOn(dc.reqSche) // TODO: if receiving == sending ??? - sending, err := func() (flux flux.Flux, err error) { + sending, err := func() (resp flux.Flux, err error) { defer func() { - if e := recover(); e != nil { - logger.Errorf("respond REQUEST_CHANNEL failed: %s\n", e) - err = _errRespondFailed + rec := recover() + if rec == nil { + return + } + if e, ok := rec.(error); ok { + err = errors.WithStack(e) + } else { + err = errors.Errorf("%v", e) } + logger.Errorf("handle request-channel failed: %+v\n", err) }() - flux = dc.responder.RequestChannel(receiving) - if flux == nil { + resp = dc.responder.RequestChannel(receiving) + if resp == nil { err = framing.NewWriteableErrorFrame(sid, core.ErrorCodeApplicationError, unsupportedRequestChannel) } return @@ -622,27 +650,35 @@ func (dc *DuplexConnection) respondRequestChannel(req fragmentation.HeaderAndPay calls: finallyRequests, } - if err := dc.reqSche.Worker().Do(func() { + mustExecute(dc.reqSche, func() { sending.SubscribeWith(context.Background(), sub) - }); err != nil { - go sending.SubscribeWith(context.Background(), sub) - } + }) <-subscribed return nil } -func (dc *DuplexConnection) respondMetadataPush(input core.BufferedFrame) (err error) { - p := input.(*framing.MetadataPushFrame) - defer func() { - p.Release() - if e := recover(); e != nil { - logger.Errorf("respond METADATA_PUSH failed: %s\n", e) - } - }() - dc.responder.MetadataPush(p) - return +func (dc *DuplexConnection) respondMetadataPush(input core.BufferedFrame) error { + req := input.(*framing.MetadataPushFrame) + mustExecute(dc.resSche, func() { + defer func() { + req.Release() + rec := recover() + if rec == nil { + return + } + var err error + if e, ok := rec.(error); ok { + err = errors.WithStack(e) + } else { + err = errors.Errorf("%v", e) + } + logger.Errorf("handle metadata-push failed: %+v\n", err) + }() + dc.responder.MetadataPush(req) + }) + return nil } func (dc *DuplexConnection) onFrameFNF(frame core.BufferedFrame) error { @@ -650,26 +686,36 @@ func (dc *DuplexConnection) onFrameFNF(frame core.BufferedFrame) error { if !ok { return nil } - return dc.respondFNF(receiving) + return dc.respondFireAndForget(receiving) } -func (dc *DuplexConnection) respondFNF(receiving fragmentation.HeaderAndPayload) (err error) { - defer func() { - common.TryRelease(receiving) - if e := recover(); e != nil { - logger.Errorf("respond FIRE_AND_FORGET failed: %s\n", e) - } - }() - dc.responder.FireAndForget(receiving) - return +func (dc *DuplexConnection) respondFireAndForget(receiving fragmentation.HeaderAndPayload) error { + mustExecute(dc.resSche, func() { + defer func() { + common.TryRelease(receiving) + rec := recover() + if rec == nil { + return + } + var err error + if e, ok := rec.(error); ok { + err = errors.WithStack(e) + } else { + err = errors.Errorf("%v", e) + } + logger.Errorf("handle fire-and-forget failed: %+v\n", err) + }() + dc.responder.FireAndForget(receiving) + }) + return nil } func (dc *DuplexConnection) onFrameRequestStream(frame core.BufferedFrame) error { receiving, ok := dc.doFragment(frame.(*framing.RequestStreamFrame)) if !ok { return nil - } + } return dc.respondRequestStream(receiving) } @@ -680,10 +726,16 @@ func (dc *DuplexConnection) respondRequestStream(receiving fragmentation.HeaderA // execute request stream handler sending, err := func() (resp flux.Flux, err error) { defer func() { - if e := recover(); e != nil { - logger.Errorf("respond REQUEST_STREAM failed: %s\n", e) - err = _errRespondFailed + rec := recover() + if rec == nil { + return + } + if e, ok := rec.(error); ok { + err = errors.WithStack(e) + } else { + err = errors.Errorf("%v", err) } + logger.Errorf("handle request-stream failed: %+v\n", err) }() resp = dc.responder.RequestStream(receiving) if resp == nil { @@ -735,7 +787,6 @@ func (dc *DuplexConnection) onFrameKeepalive(frame core.BufferedFrame) (err erro f := frame.(*framing.KeepaliveFrame) if !f.HasFlag(core.FlagRespond) { return - } // TODO: optimize, if keepalive frame support modify data. data := common.CloneBytes(f.Data()) @@ -858,7 +909,7 @@ func (dc *DuplexConnection) onFramePayload(frame core.BufferedFrame) error { switch h.Type() { case core.FrameTypeRequestFNF: - return dc.respondFNF(next) + return dc.respondFireAndForget(next) case core.FrameTypeRequestResponse: return dc.respondRequestResponse(next) case core.FrameTypeRequestStream: @@ -1100,6 +1151,7 @@ func (dc *DuplexConnection) drain(leaseChan <-chan lease.Lease) bool { var flush bool cycle := len(dc.sndQueue) if cycle < 1 { + runtime.Gosched() cycle = 1 } for i := 0; i < cycle; i++ {