Skip to content

Commit

Permalink
BUGFIX: process incoming response correctly if request has been cance…
Browse files Browse the repository at this point in the history
…lled. (#70)
  • Loading branch information
jjeffcaii authored Sep 24, 2020
1 parent e5e6690 commit 2fa5447
Show file tree
Hide file tree
Showing 4 changed files with 42 additions and 32 deletions.
43 changes: 23 additions & 20 deletions core/transport/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ func (p *Transport) Start(ctx context.Context) error {
if err == io.EOF {
return nil
}
return errors.Wrap(err, "read and delivery frame failed")
return errors.Wrap(err, "dispatch incoming frame failed")
}
}
}
Expand All @@ -177,54 +177,51 @@ func (p *Transport) DispatchFrame(_ context.Context, frame core.Frame) (err erro

var handler FrameHandler

p.RLock()
defer p.RUnlock()

switch t {
case core.FrameTypeSetup:
p.maxLifetime = frame.(*framing.SetupFrame).MaxLifetime()
handler = p.handlers[OnSetup]
handler = p.getHandler(OnSetup)
case core.FrameTypeResume:
handler = p.handlers[OnResume]
handler = p.getHandler(OnResume)
case core.FrameTypeResumeOK:
p.lastRcvPos = frame.(*framing.ResumeOKFrame).LastReceivedClientPosition()
handler = p.handlers[OnResumeOK]
handler = p.getHandler(OnResumeOK)
case core.FrameTypeRequestFNF:
handler = p.handlers[OnFireAndForget]
handler = p.getHandler(OnFireAndForget)
case core.FrameTypeMetadataPush:
if sid != 0 {
// skip invalid metadata push
logger.Warnf("rsocket.Transport: omit MetadataPush with non-zero stream id %d\n", sid)
logger.Warnf("rsocket: omit MetadataPush with non-zero stream id %d\n", sid)
return
}
handler = p.handlers[OnMetadataPush]
handler = p.getHandler(OnMetadataPush)
case core.FrameTypeRequestResponse:
handler = p.handlers[OnRequestResponse]
handler = p.getHandler(OnRequestResponse)
case core.FrameTypeRequestStream:
handler = p.handlers[OnRequestStream]
handler = p.getHandler(OnRequestStream)
case core.FrameTypeRequestChannel:
handler = p.handlers[OnRequestChannel]
handler = p.getHandler(OnRequestChannel)
case core.FrameTypePayload:
handler = p.handlers[OnPayload]
handler = p.getHandler(OnPayload)
case core.FrameTypeRequestN:
handler = p.handlers[OnRequestN]
handler = p.getHandler(OnRequestN)
case core.FrameTypeError:
if sid == 0 {
err = errors.New(frame.(*framing.ErrorFrame).Error())
if call := p.handlers[OnErrorWithZeroStreamID]; call != nil {
if call := p.getHandler(OnErrorWithZeroStreamID); call != nil {
_ = call(frame)
}
return
}
handler = p.handlers[OnError]
handler = p.getHandler(OnError)
case core.FrameTypeCancel:
handler = p.handlers[OnCancel]
handler = p.getHandler(OnCancel)
case core.FrameTypeKeepalive:
ka := frame.(*framing.KeepaliveFrame)
p.lastRcvPos = ka.LastReceivedPosition()
handler = p.handlers[OnKeepalive]
handler = p.getHandler(OnKeepalive)
case core.FrameTypeLease:
handler = p.handlers[OnLease]
handler = p.getHandler(OnLease)
}

// Set deadline.
Expand All @@ -248,6 +245,12 @@ func (p *Transport) DispatchFrame(_ context.Context, frame core.Frame) (err erro
return
}

func (p *Transport) getHandler(t EventType) FrameHandler {
p.RLock()
defer p.RUnlock()
return p.handlers[t]
}

// NewTransport creates a new transport.
func NewTransport(c Conn) *Transport {
return &Transport{
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ require (
github.com/golang/mock v1.4.3
github.com/google/uuid v1.1.1
github.com/gorilla/websocket v1.4.1
github.com/jjeffcaii/reactor-go v0.2.4
github.com/jjeffcaii/reactor-go v0.2.5
github.com/pkg/errors v0.9.1
github.com/stretchr/testify v1.4.0
github.com/urfave/cli/v2 v2.1.1
Expand Down
7 changes: 5 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ github.com/google/uuid v1.1.1 h1:Gkbcsh/GbpXz7lPftLA3P6TYMwjCLYm83jiFQZF/3gY=
github.com/google/uuid v1.1.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/gorilla/websocket v1.4.1 h1:q7AeDBpnBk8AogcD4DSag/Ukw/KV+YhzLj2bP5HvKCM=
github.com/gorilla/websocket v1.4.1/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/jjeffcaii/reactor-go v0.2.4 h1:Q3N/0Ngt1Ywi7ezye2LQ+mU1vNdHxyG5ZRk3W2EWmYA=
github.com/jjeffcaii/reactor-go v0.2.4/go.mod h1:I4qZrpZcsqjzo3pjq0XWGBTpdFXB95XeYinrPYETNL4=
github.com/jjeffcaii/reactor-go v0.2.5 h1:6jVIWgaVBUx1qiWLZNWU/j+dbbirje6GntS/I8RKARI=
github.com/jjeffcaii/reactor-go v0.2.5/go.mod h1:I4qZrpZcsqjzo3pjq0XWGBTpdFXB95XeYinrPYETNL4=
github.com/panjf2000/ants/v2 v2.4.1 h1:7RtUqj5lGOw0WnZhSKDZ2zzJhaX5490ZW1sUolRXCxY=
github.com/panjf2000/ants/v2 v2.4.1/go.mod h1:f6F0NZVFsGCp5A7QW/Zj/m92atWwOkY0OIhFxRNFr4A=
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
Expand All @@ -31,6 +31,7 @@ github.com/urfave/cli/v2 v2.1.1/go.mod h1:SE9GqnLQmjVa0iPEY0f1w3ygNIYcIJ0OKPMoW2
go.uber.org/atomic v1.5.1 h1:rsqfU5vBkVknbhUGbAUwQKR2H4ItV8tjJ+6kJX4cxHM=
go.uber.org/atomic v1.5.1/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/lint v0.0.0-20190930215403-16217165b5de h1:5hukYrvBGR8/eNkX5mdUezrA6JiaEZDtJb9Ei+1LlBs=
golang.org/x/lint v0.0.0-20190930215403-16217165b5de/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc=
golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
Expand All @@ -40,8 +41,10 @@ golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fq
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs=
golang.org/x/tools v0.0.0-20190425150028-36563e24a262/go.mod h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q=
golang.org/x/tools v0.0.0-20191029041327-9cc4af7d6b2c h1:IGkKhmfzcztjm6gYkykvu/NiS8kaqbCWAEWWAyf8J5U=
golang.org/x/tools v0.0.0-20191029041327-9cc4af7d6b2c/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.7 h1:VUgggvou5XRW9mHwD/yXxIYSMtY0zoKQf/v226p2nyo=
Expand Down
22 changes: 13 additions & 9 deletions internal/socket/duplex.go
Original file line number Diff line number Diff line change
Expand Up @@ -659,7 +659,10 @@ func (dc *DuplexConnection) onFrameCancel(frame core.Frame) (err error) {

v, ok := dc.messages.Load(sid)
if !ok {
logger.Warnf("nothing cancelled: sid=%d\n", sid)
dc.fragments.Delete(sid)
if logger.IsDebugEnabled() {
logger.Debugf("unmatched frame CANCEL(id=%d), maybe original request has been cancelled\n", sid)
}
return
}

Expand All @@ -680,12 +683,14 @@ func (dc *DuplexConnection) onFrameCancel(frame core.Frame) (err error) {

func (dc *DuplexConnection) onFrameError(input core.Frame) (err error) {
f := input.(*framing.ErrorFrame)
logger.Errorf("handle error frame: %s\n", f)
sid := f.Header().StreamID()

v, ok := dc.messages.Load(sid)
if !ok {
err = fmt.Errorf("invalid stream id: %d", sid)
dc.fragments.Delete(sid)
if logger.IsDebugEnabled() {
logger.Debugf("unmatched frame ERROR(id=%d), maybe original request has been cancelled\n", sid)
}
return
}

Expand All @@ -707,8 +712,9 @@ func (dc *DuplexConnection) onFrameRequestN(input core.Frame) (err error) {
sid := f.Header().StreamID()
v, ok := dc.messages.Load(sid)
if !ok {
dc.fragments.Delete(sid)
if logger.IsDebugEnabled() {
logger.Debugf("ignore non-exists RequestN: id=%d\n", sid)
logger.Debugf("unmatched frame REQUEST_N(id=%d), maybe original request has been cancelled\n", sid)
}
return
}
Expand All @@ -720,8 +726,6 @@ func (dc *DuplexConnection) onFrameRequestN(input core.Frame) (err error) {
vv.snd.Request(n)
case requestChannelCallbackReverse:
vv.snd.Request(n)
default:
panic(fmt.Errorf("illegal requestN for %+v", vv))
}
return
}
Expand Down Expand Up @@ -771,7 +775,9 @@ func (dc *DuplexConnection) onFramePayload(frame core.Frame) error {
sid := h.StreamID()
v, ok := dc.messages.Load(sid)
if !ok {
logger.Warnf("unoccupied Payload(id=%d), maybe it has been canceled(server=%T)\n", sid, dc.sids)
if logger.IsDebugEnabled() {
logger.Debugf("unmatched frame PAYLOAD(id=%d), maybe original request has been cancelled\n", sid)
}
return nil
}

Expand Down Expand Up @@ -806,8 +812,6 @@ func (dc *DuplexConnection) onFramePayload(frame core.Frame) error {
if fg.Check(core.FlagComplete) {
vv.rcv.Complete()
}
default:
panic(fmt.Errorf("illegal Payload for %v", vv))
}
return nil
}
Expand Down

0 comments on commit 2fa5447

Please sign in to comment.