Skip to content

Commit

Permalink
observation: cleanup channel after send response
Browse files Browse the repository at this point in the history
  • Loading branch information
jkralik committed Jan 4, 2021
1 parent e9522e2 commit ec27cba
Show file tree
Hide file tree
Showing 2 changed files with 89 additions and 61 deletions.
74 changes: 44 additions & 30 deletions tcp/clientobserve.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,26 +13,56 @@ import (
"github.com/plgd-dev/go-coap/v2/tcp/message/pool"
)

func NewObservationHandler(obsertionTokenHandler *HandlerContainer, next HandlerFunc) HandlerFunc {
return func(w *ResponseWriter, r *pool.Message) {
v, err := obsertionTokenHandler.Get(r.Token())
if err != nil {
next(w, r)
return
}
v(w, r)
}
}

//Observation represents subscription to resource on the server
type Observation struct {
token message.Token
path string
token message.Token
path string
cc *ClientConn
observeFunc func(req *pool.Message)
respCodeChan chan codes.Code

obsSequence uint32
etag []byte
cc *ClientConn
lastEvent time.Time
mutex sync.Mutex

mutex sync.Mutex
waitForReponse uint32
}

func NewObservationHandler(obsertionTokenHandler *HandlerContainer, next HandlerFunc) HandlerFunc {
return func(w *ResponseWriter, r *pool.Message) {
v, err := obsertionTokenHandler.Get(r.Token())
if err != nil {
next(w, r)
return
func newObservation(token message.Token, path string, cc *ClientConn, observeFunc func(req *pool.Message), respCodeChan chan codes.Code) *Observation {
return &Observation{
token: token,
path: path,
obsSequence: 0,
cc: cc,
waitForReponse: 1,
respCodeChan: respCodeChan,
observeFunc: observeFunc,
}
}

func (o *Observation) handler(w *ResponseWriter, r *pool.Message) {
code := r.Code()
if atomic.CompareAndSwapUint32(&o.waitForReponse, 1, 0) {
select {
case o.respCodeChan <- code:
default:
}
v(w, r)
o.respCodeChan = nil
}
if o.wantBeNotified(r) {
o.observeFunc(r)
}
}

Expand Down Expand Up @@ -89,14 +119,9 @@ func (cc *ClientConn) Observe(ctx context.Context, path string, observeFunc func
defer pool.ReleaseMessage(req)
token := req.Token()
req.SetObserve(0)
o := &Observation{
token: token,
path: path,
obsSequence: 0,
cc: cc,
}

respCodeChan := make(chan codes.Code, 1)
waitForReponse := uint32(1)
o := newObservation(token, path, cc, observeFunc, respCodeChan)

options, err := req.Options().Clone()
if err != nil {
Expand All @@ -110,18 +135,7 @@ func (cc *ClientConn) Observe(ctx context.Context, path string, observeFunc func
Options: options,
}
cc.observationRequests.Store(token.String(), obs)
err = o.cc.observationTokenHandler.Insert(token.String(), func(w *ResponseWriter, r *pool.Message) {
code := r.Code()
if atomic.CompareAndSwapUint32(&waitForReponse, 1, 0) {
select {
case respCodeChan <- code:
default:
}
}
if o.wantBeNotified(r) {
observeFunc(r)
}
})
err = o.cc.observationTokenHandler.Insert(token.String(), o.handler)
defer func(err *error) {
if *err != nil {
o.cleanUp()
Expand Down
76 changes: 45 additions & 31 deletions udp/client/clientobserve.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,18 +13,6 @@ import (
"github.com/plgd-dev/go-coap/v2/udp/message/pool"
)

//Observation represents subscription to resource on the server
type Observation struct {
token message.Token
path string
obsSequence uint32
etag []byte
cc *ClientConn
lastEvent time.Time

mutex sync.Mutex
}

func NewObservationHandler(obsertionTokenHandler *HandlerContainer, next HandlerFunc) HandlerFunc {
return func(w *ResponseWriter, r *pool.Message) {
v, err := obsertionTokenHandler.Get(r.Token())
Expand All @@ -41,6 +29,34 @@ func NewObservationHandler(obsertionTokenHandler *HandlerContainer, next Handler
}
}

//Observation represents subscription to resource on the server
type Observation struct {
token message.Token
path string
cc *ClientConn
observeFunc func(req *pool.Message)
respCodeChan chan codes.Code

obsSequence uint32
etag []byte
lastEvent time.Time
mutex sync.Mutex

waitForReponse uint32
}

func newObservation(token message.Token, path string, cc *ClientConn, observeFunc func(req *pool.Message), respCodeChan chan codes.Code) *Observation {
return &Observation{
token: token,
path: path,
obsSequence: 0,
cc: cc,
waitForReponse: 1,
respCodeChan: respCodeChan,
observeFunc: observeFunc,
}
}

func (o *Observation) cleanUp() {
o.cc.observationTokenHandler.Pop(o.token)
registeredRequest, ok := o.cc.observationRequests.PullOut(o.token.String())
Expand All @@ -49,6 +65,20 @@ func (o *Observation) cleanUp() {
}
}

func (o *Observation) handler(w *ResponseWriter, r *pool.Message) {
code := r.Code()
if atomic.CompareAndSwapUint32(&o.waitForReponse, 1, 0) {
select {
case o.respCodeChan <- code:
default:
}
o.respCodeChan = nil
}
if o.wantBeNotified(r) {
o.observeFunc(r)
}
}

// Cancel remove observation from server. For recreate observation use Observe.
func (o *Observation) Cancel(ctx context.Context) error {
o.cleanUp()
Expand Down Expand Up @@ -97,27 +127,11 @@ func (cc *ClientConn) Observe(ctx context.Context, path string, observeFunc func
}
token := req.Token()
req.SetObserve(0)
o := &Observation{
token: token,
path: path,
obsSequence: 0,
cc: cc,
}
respCodeChan := make(chan codes.Code, 1)
waitForReponse := uint32(1)
o := newObservation(token, path, cc, observeFunc, respCodeChan)

cc.observationRequests.Store(token.String(), req)
err = o.cc.observationTokenHandler.Insert(token.String(), func(w *ResponseWriter, r *pool.Message) {
code := r.Code()
if atomic.CompareAndSwapUint32(&waitForReponse, 1, 0) {
select {
case respCodeChan <- code:
default:
}
}
if o.wantBeNotified(r) {
observeFunc(r)
}
})
err = o.cc.observationTokenHandler.Insert(token.String(), o.handler)
defer func(err *error) {
if *err != nil {
o.cleanUp()
Expand Down

0 comments on commit ec27cba

Please sign in to comment.