Skip to content

Commit

Permalink
feat: more friendly panic handling (#91)
Browse files Browse the repository at this point in the history
* feat: more friendly panic handling with stack info

* fix: add simple pprof
  • Loading branch information
jjeffcaii authored Nov 29, 2020
1 parent 0fced05 commit 27b1748
Show file tree
Hide file tree
Showing 5 changed files with 125 additions and 68 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

# Output of the go coverage tool, specifically when used with LiteIDE
*.out
*.pprof

# Others
vendor/
Expand Down
13 changes: 7 additions & 6 deletions examples/echo_bench/echo_bench.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,10 @@ import (
"flag"
"log"
"math/rand"
"net/http"
_ "net/http/pprof"
"sync"
"time"

"github.com/pkg/profile"
"github.com/rsocket/rsocket-go"
"github.com/rsocket/rsocket-go/core/transport"
"github.com/rsocket/rsocket-go/internal/common"
Expand All @@ -26,21 +25,23 @@ func init() {
flag.Parse()
rand.Seed(time.Now().UnixNano())
tp = rsocket.TCPClient().SetHostAndPort("127.0.0.1", 7878).Build()

go func() {
log.Println(http.ListenAndServe(":5555", nil))
}()
}

func main() {
var (
n int
payloadSize int
mtu int
pprof bool
)
flag.IntVar(&n, "n", 100*10000, "request amount.")
flag.IntVar(&payloadSize, "size", 1024, "payload data size.")
flag.IntVar(&mtu, "mtu", 0, "mut size, zero means disabled.")
flag.BoolVar(&pprof, "pprof", false, "enable pprof")

if pprof {
defer profile.Start(profile.MemProfileHeap(), profile.CPUProfile, profile.ProfilePath(".")).Stop()
}

client, err := createClient(mtu)
if err != nil {
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ require (
github.com/gorilla/websocket v1.4.2
github.com/jjeffcaii/reactor-go v0.4.2
github.com/pkg/errors v0.9.1
github.com/pkg/profile v1.5.0
github.com/stretchr/testify v1.6.1
github.com/urfave/cli/v2 v2.1.1
go.uber.org/atomic v1.7.0
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ github.com/panjf2000/ants/v2 v2.4.3 h1:wHghL17YKFanB62QjPQ9o+DuM4q7WrQ7zAhoX8+eB
github.com/panjf2000/ants/v2 v2.4.3/go.mod h1:f6F0NZVFsGCp5A7QW/Zj/m92atWwOkY0OIhFxRNFr4A=
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/profile v1.5.0 h1:042Buzk+NhDI+DeSAA62RwJL8VAuZUMQZUjCsRz1Mug=
github.com/pkg/profile v1.5.0/go.mod h1:qBsxPvzyUincmltOk6iyRVxHYg4adc0OFOv72ZdLa18=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/russross/blackfriday/v2 v2.0.1 h1:lPqVAte+HuHNfhJ/0LC98ESWRz8afy9tM/0RK8m9o+Q=
Expand Down
176 changes: 114 additions & 62 deletions internal/socket/duplex.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,20 @@ const _minRequestSchedulerSize = 1000

var errSocketClosed = errors.New("rsocket: socket closed already")
var errRequestFailed = errors.New("rsocket: send request failed")
var _errRespondFailed = errors.New("rsocket: create responder failed")

var (
unsupportedRequestStream = []byte("Request-Stream not implemented.")
unsupportedRequestResponse = []byte("Request-Response not implemented.")
unsupportedRequestChannel = []byte("Request-Channel not implemented.")
)

func mustExecute(sc scheduler.Scheduler, handler func()) {
if err := sc.Worker().Do(handler); err == nil {
return
}
go handler()
}

// DuplexConnection represents a socket of RSocket which can be a requester or a responder.
type DuplexConnection struct {
reqSche scheduler.Scheduler
Expand Down Expand Up @@ -180,26 +186,42 @@ func (dc *DuplexConnection) destroySndBacklog() {
}

// FireAndForget start a request of FireAndForget.
func (dc *DuplexConnection) FireAndForget(sending payload.Payload) {
data := sending.Data()
size := core.FrameHeaderLen + len(sending.Data())
m, ok := sending.Metadata()
func (dc *DuplexConnection) FireAndForget(req payload.Payload) {
data := req.Data()
size := core.FrameHeaderLen + len(req.Data())
m, ok := req.Metadata()
if ok {
size += 3 + len(m)
}
sid := dc.nextStreamID()

releasable, isReleasable := req.(common.Releasable)
if isReleasable {
releasable.IncRef()
}

if !dc.shouldSplit(size) {
dc.sendFrame(framing.NewWriteableFireAndForgetFrame(sid, data, m, 0))
outMsg := framing.NewWriteableFireAndForgetFrame(sid, data, m, 0)
if isReleasable {
outMsg.HandleDone(func() {
releasable.Release()
})
}
dc.sendFrame(outMsg)
return
}
dc.doSplit(data, m, func(index int, result fragmentation.SplitResult) {
var f core.WriteableFrame
var outMsg core.WriteableFrame
if index == 0 {
f = framing.NewWriteableFireAndForgetFrame(sid, result.Data, result.Metadata, result.Flag)
outMsg = framing.NewWriteableFireAndForgetFrame(sid, result.Data, result.Metadata, result.Flag)
} else {
f = framing.NewWriteablePayloadFrame(sid, result.Data, result.Metadata, result.Flag|core.FlagNext)
outMsg = framing.NewWriteablePayloadFrame(sid, result.Data, result.Metadata, result.Flag|core.FlagNext)
}

if !result.Flag.Check(core.FlagFollow) && isReleasable {
releasable.Release()
}
dc.sendFrame(f)
dc.sendFrame(outMsg)
})
}

Expand Down Expand Up @@ -492,14 +514,23 @@ func (dc *DuplexConnection) respondRequestResponse(receiving fragmentation.Heade
sid := receiving.Header().StreamID()

// execute socket handler
sending, err := func() (mono mono.Mono, err error) {
sending, err := func() (resp mono.Mono, err error) {
defer func() {
if e := recover(); e != nil {
logger.Errorf("respond REQUEST_RESPONSE failed: %s\n", e)
err = _errRespondFailed
rec := recover()
if rec == nil {
return
}
if e, ok := rec.(error); ok {
err = errors.WithStack(e)
} else {
err = errors.Errorf("%v", e)
}
logger.Errorf("handle request-response failed: %+v\n", err)
}()
mono = dc.responder.RequestResponse(receiving)
resp = dc.responder.RequestResponse(receiving)
if resp == nil {
err = framing.NewWriteableErrorFrame(sid, core.ErrorCodeApplicationError, unsupportedRequestResponse)
}
return
}()
// sending error with panic
Expand All @@ -508,25 +539,16 @@ func (dc *DuplexConnection) respondRequestResponse(receiving fragmentation.Heade
dc.writeError(sid, err)
return nil
}
// sending error with unsupported handler
if sending == nil {
common.TryRelease(receiving)
dc.writeError(sid, framing.NewWriteableErrorFrame(sid, core.ErrorCodeApplicationError, unsupportedRequestResponse))
return nil
}

// async subscribe publisher
sub := borrowRequestResponseSubscriber(dc, sid, receiving)
if mono.IsSubscribeAsync(sending) {
sending.SubscribeWith(context.Background(), sub)
return nil
}

if err := dc.resSche.Worker().Do(func() {
mustExecute(dc.resSche, func() {
sending.SubscribeWith(context.Background(), sub)
}); err != nil {
go sending.SubscribeWith(context.Background(), sub)
}
})
return nil
}

Expand Down Expand Up @@ -587,15 +609,21 @@ func (dc *DuplexConnection) respondRequestChannel(req fragmentation.HeaderAndPay
SubscribeOn(dc.reqSche)

// TODO: if receiving == sending ???
sending, err := func() (flux flux.Flux, err error) {
sending, err := func() (resp flux.Flux, err error) {
defer func() {
if e := recover(); e != nil {
logger.Errorf("respond REQUEST_CHANNEL failed: %s\n", e)
err = _errRespondFailed
rec := recover()
if rec == nil {
return
}
if e, ok := rec.(error); ok {
err = errors.WithStack(e)
} else {
err = errors.Errorf("%v", e)
}
logger.Errorf("handle request-channel failed: %+v\n", err)
}()
flux = dc.responder.RequestChannel(receiving)
if flux == nil {
resp = dc.responder.RequestChannel(receiving)
if resp == nil {
err = framing.NewWriteableErrorFrame(sid, core.ErrorCodeApplicationError, unsupportedRequestChannel)
}
return
Expand All @@ -622,54 +650,72 @@ func (dc *DuplexConnection) respondRequestChannel(req fragmentation.HeaderAndPay
calls: finallyRequests,
}

if err := dc.reqSche.Worker().Do(func() {
mustExecute(dc.reqSche, func() {
sending.SubscribeWith(context.Background(), sub)
}); err != nil {
go sending.SubscribeWith(context.Background(), sub)
}
})

<-subscribed

return nil
}

func (dc *DuplexConnection) respondMetadataPush(input core.BufferedFrame) (err error) {
p := input.(*framing.MetadataPushFrame)
defer func() {
p.Release()
if e := recover(); e != nil {
logger.Errorf("respond METADATA_PUSH failed: %s\n", e)
}
}()
dc.responder.MetadataPush(p)
return
func (dc *DuplexConnection) respondMetadataPush(input core.BufferedFrame) error {
req := input.(*framing.MetadataPushFrame)
mustExecute(dc.resSche, func() {
defer func() {
req.Release()
rec := recover()
if rec == nil {
return
}
var err error
if e, ok := rec.(error); ok {
err = errors.WithStack(e)
} else {
err = errors.Errorf("%v", e)
}
logger.Errorf("handle metadata-push failed: %+v\n", err)
}()
dc.responder.MetadataPush(req)
})
return nil
}

func (dc *DuplexConnection) onFrameFNF(frame core.BufferedFrame) error {
receiving, ok := dc.doFragment(frame.(*framing.FireAndForgetFrame))
if !ok {
return nil
}
return dc.respondFNF(receiving)
return dc.respondFireAndForget(receiving)
}

func (dc *DuplexConnection) respondFNF(receiving fragmentation.HeaderAndPayload) (err error) {
defer func() {
common.TryRelease(receiving)
if e := recover(); e != nil {
logger.Errorf("respond FIRE_AND_FORGET failed: %s\n", e)
}
}()
dc.responder.FireAndForget(receiving)
return
func (dc *DuplexConnection) respondFireAndForget(receiving fragmentation.HeaderAndPayload) error {
mustExecute(dc.resSche, func() {
defer func() {
common.TryRelease(receiving)
rec := recover()
if rec == nil {
return
}
var err error
if e, ok := rec.(error); ok {
err = errors.WithStack(e)
} else {
err = errors.Errorf("%v", e)
}
logger.Errorf("handle fire-and-forget failed: %+v\n", err)
}()
dc.responder.FireAndForget(receiving)
})
return nil
}

func (dc *DuplexConnection) onFrameRequestStream(frame core.BufferedFrame) error {
receiving, ok := dc.doFragment(frame.(*framing.RequestStreamFrame))
if !ok {
return nil
}

}
return dc.respondRequestStream(receiving)
}

Expand All @@ -680,10 +726,16 @@ func (dc *DuplexConnection) respondRequestStream(receiving fragmentation.HeaderA
// execute request stream handler
sending, err := func() (resp flux.Flux, err error) {
defer func() {
if e := recover(); e != nil {
logger.Errorf("respond REQUEST_STREAM failed: %s\n", e)
err = _errRespondFailed
rec := recover()
if rec == nil {
return
}
if e, ok := rec.(error); ok {
err = errors.WithStack(e)
} else {
err = errors.Errorf("%v", err)
}
logger.Errorf("handle request-stream failed: %+v\n", err)
}()
resp = dc.responder.RequestStream(receiving)
if resp == nil {
Expand Down Expand Up @@ -735,7 +787,6 @@ func (dc *DuplexConnection) onFrameKeepalive(frame core.BufferedFrame) (err erro
f := frame.(*framing.KeepaliveFrame)
if !f.HasFlag(core.FlagRespond) {
return

}
// TODO: optimize, if keepalive frame support modify data.
data := common.CloneBytes(f.Data())
Expand Down Expand Up @@ -858,7 +909,7 @@ func (dc *DuplexConnection) onFramePayload(frame core.BufferedFrame) error {

switch h.Type() {
case core.FrameTypeRequestFNF:
return dc.respondFNF(next)
return dc.respondFireAndForget(next)
case core.FrameTypeRequestResponse:
return dc.respondRequestResponse(next)
case core.FrameTypeRequestStream:
Expand Down Expand Up @@ -1100,6 +1151,7 @@ func (dc *DuplexConnection) drain(leaseChan <-chan lease.Lease) bool {
var flush bool
cycle := len(dc.sndQueue)
if cycle < 1 {
runtime.Gosched()
cycle = 1
}
for i := 0; i < cycle; i++ {
Expand Down

0 comments on commit 27b1748

Please sign in to comment.