Skip to content

Commit

Permalink
Fix missing handler causing timeout (#21)
Browse files Browse the repository at this point in the history
  • Loading branch information
alei121 authored Mar 17, 2023
1 parent 5e69fc4 commit d92dc52
Show file tree
Hide file tree
Showing 2 changed files with 110 additions and 61 deletions.
104 changes: 43 additions & 61 deletions internal/pubsub/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,48 +3,50 @@

// Package pubsub implements functionality to interact with DxHub using PubSub semantics.
//
// Examples
// # Examples
//
// Following examples explain how to use the pubsub package at the high level.
//
// Connection
//
// config := &pubsub.Config{App: "test-app", Domain: "test-domain", APIKeyProvider: func() string { return "APIKey"}}
// conn, err := pubsub.NewConnection(config)
// err = conn.Connect(context.Background())
// go func() {
// err = <- conn.Error // Monitor connection for any errors
// }
// ...
// conn.Disconnect() // Disconnect from the server
// config := &pubsub.Config{App: "test-app", Domain: "test-domain", APIKeyProvider: func() string { return "APIKey"}}
// conn, err := pubsub.NewConnection(config)
// err = conn.Connect(context.Background())
// go func() {
// err = <- conn.Error // Monitor connection for any errors
// }
// ...
// conn.Disconnect() // Disconnect from the server
//
// Subscribe
// conn.Subscribe("stream", func(err error, id string, headers map[string]string, payload []byte){
// fmt.Printf("Received new message: %s, %v, %s", id, headers, payload)
// })
//
// conn.Subscribe("stream", func(err error, id string, headers map[string]string, payload []byte){
// fmt.Printf("Received new message: %s, %v, %s", id, headers, payload)
// })
//
// Publish
// ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
// headers := map[string]string{
// "key1": "value1",
// "key2": "value2",
// }
// payload := []byte("message payload")
// resp, err := conn.Publish(ctx, "stream", headers, payload)
// fmt.Printf("Message publish result: %v", resp)
//
// ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
// headers := map[string]string{
// "key1": "value1",
// "key2": "value2",
// }
// payload := []byte("message payload")
// resp, err := conn.Publish(ctx, "stream", headers, payload)
// fmt.Printf("Message publish result: %v", resp)
//
// PublishAsync
//
// headers := map[string]string{
// "key1": "value1",
// "key2": "value2",
// }
// payload := []byte("message payload")
// respCh := make(chan *PublishResult)
// id, cancel, err := conn.PublishAsync("stream", headers, payload, respCh)
// defer cancel()
// resp := <- respCh
// fmt.Printf("Message publish result: %v", resp)
// headers := map[string]string{
// "key1": "value1",
// "key2": "value2",
// }
// payload := []byte("message payload")
// respCh := make(chan *PublishResult)
// id, cancel, err := conn.PublishAsync("stream", headers, payload, respCh)
// defer cancel()
// resp := <- respCh
// fmt.Printf("Message publish result: %v", resp)
package pubsub

import (
Expand All @@ -69,6 +71,7 @@ var (
pingPeriod = 55 * time.Second
pongWait = 60 * time.Second
defaultPollInterval = 1 * time.Second
handlersExpiration = 3 * time.Minute
webSocketScheme = "wss"
httpScheme = "https"
apiPaths = struct {
Expand Down Expand Up @@ -135,10 +138,7 @@ type internalConnection struct {
table map[string]*subscription // map of subscriptions indexed by stream name
sync.Mutex // lock to protect the table
}
msgHandlers struct { // message handlers
table map[string]func(*rpc.Response) // map of handlers indexed by message id
sync.Mutex // lock to protect the table
}
msgHandlers *handlerMap

// consumeTimeout to signify there was a consume timeout within subscriber
consumeTimeout bool
Expand All @@ -161,15 +161,15 @@ func newInternalConnection(config Config) (*internalConnection, error) {
httpClient.SetTransport(config.Transport)
}
c := &internalConnection{
config: config,
restClient: httpClient,
closed: make(chan struct{}),
Error: make(chan error, 1), // buffer of 1 to make sure that error is not lost
readerCh: make(chan []byte, 64), // buffer of 64 helps with latency and provides a buffer to catch up during processing
writerCh: make(chan *msgRequest, 64), // buffer of 64 helps with latency and provides a buffer to catch up during processing
config: config,
restClient: httpClient,
closed: make(chan struct{}),
Error: make(chan error, 1), // buffer of 1 to make sure that error is not lost
readerCh: make(chan []byte, 64), // buffer of 64 helps with latency and provides a buffer to catch up during processing
writerCh: make(chan *msgRequest, 64), // buffer of 64 helps with latency and provides a buffer to catch up during processing
msgHandlers: NewHandlerMap(handlersExpiration),
}
c.subs.table = make(map[string]*subscription)
c.msgHandlers.table = make(map[string]func(*rpc.Response))

if config.APIKeyProvider != nil {
c.authHeader.key = headerStrApiKey
Expand Down Expand Up @@ -267,9 +267,7 @@ loop:
defer cancel()

if msg.handler != nil {
c.msgHandlers.Lock()
c.msgHandlers.table[msg.req.ID] = msg.handler
c.msgHandlers.Unlock()
c.msgHandlers.Set(msg.req.ID, msg.handler)
}
err := c.ws.Write(ctx, websocket.MessageText, msg.req.Bytes())
if err != nil {
Expand All @@ -287,8 +285,6 @@ loop:
// processor goroutine processes the incoming messages from the WebSocket connection and sends ping
// messages when required
func (c *internalConnection) processor() {
cleaner := time.NewTicker(3 * time.Minute)
var oldHandlersTable map[string]func(*rpc.Response)
loop:
for {
select {
Expand All @@ -300,26 +296,12 @@ loop:
log.Logger.Errorf("Received unknown message: %s", msg)
continue
}

c.msgHandlers.Lock()
handler := c.msgHandlers.table[resp.ID]
delete(c.msgHandlers.table, resp.ID)
c.msgHandlers.Unlock()

handler := c.msgHandlers.GetAndDelete(resp.ID)
if handler != nil {
handler(resp)
}
case <-time.After(pingPeriod):
c.ping()
case <-cleaner.C:
// cleanup old messages for which we didn't receive any response from the server
for id, handler := range oldHandlersTable {
handler(rpc.NewErrorResponse(id, fmt.Errorf("timed out waiting for response from server")))
}
c.msgHandlers.Lock()
oldHandlersTable = c.msgHandlers.table
c.msgHandlers.table = make(map[string]func(*rpc.Response))
c.msgHandlers.Unlock()
}
}
log.Logger.Debugf("processor shutdown complete")
Expand Down
67 changes: 67 additions & 0 deletions internal/pubsub/handlermap.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package pubsub

import (
"fmt"
"sync"
"time"

rpc "github.com/cisco-pxgrid/cloud-sdk-go/internal/rpc"
)

// handlerMap is thread safe and expires entries
// Using 2 maps to expire entries instead of keeping track of timestamps for all entries
// olderHandlers are still valid. When moving current to older, the entries are aged from 0 to atLeast
// No thread running. Expiry checks only when Get or Set is called
// This is constantly getting called as the code consumes every second
type handlerMap struct {
expireDuration time.Duration
newExpireTime time.Time
currentHandlers map[string]func(*rpc.Response)
olderHandlers map[string]func(*rpc.Response)
sync.RWMutex
}

func NewHandlerMap(expireDuration time.Duration) *handlerMap {
return &handlerMap{
currentHandlers: make(map[string]func(*rpc.Response)),
olderHandlers: make(map[string]func(*rpc.Response)),
expireDuration: expireDuration,
newExpireTime: time.Now().Add(expireDuration),
}
}

func (h *handlerMap) GetAndDelete(id string) func(*rpc.Response) {
h.expireCheck()
h.RLock()
defer h.RUnlock()
handler := h.currentHandlers[id]
delete(h.currentHandlers, id)
if handler == nil {
handler = h.olderHandlers[id]
delete(h.olderHandlers, id)
}
return handler
}

func (h *handlerMap) Set(id string, handler func(*rpc.Response)) {
h.expireCheck()
h.Lock()
defer h.Unlock()
h.currentHandlers[id] = handler
}

// expireCheck deletes older and moves current to older
func (h *handlerMap) expireCheck() {
if time.Now().Before(h.newExpireTime) {
return
}
h.Lock()
h.newExpireTime = time.Now().Add(h.expireDuration)
expiredHandlers := h.olderHandlers
h.olderHandlers = h.currentHandlers
h.currentHandlers = make(map[string]func(*rpc.Response))
h.Unlock()
for id, handler := range expiredHandlers {
handler(rpc.NewErrorResponse(id, fmt.Errorf("timed out waiting for response from server")))
}
}

0 comments on commit d92dc52

Please sign in to comment.