Skip to content

Commit

Permalink
Add close support to the websocket clients (#171)
Browse files Browse the repository at this point in the history
  • Loading branch information
cmmarslender authored Nov 12, 2024
1 parent 44868f8 commit 9bba038
Show file tree
Hide file tree
Showing 5 changed files with 47 additions and 10 deletions.
5 changes: 5 additions & 0 deletions pkg/httpclient/httpclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,11 @@ func (c *HTTPClient) Do(req *rpcinterface.Request, v interface{}) (*http.Respons
return resp, err
}

// Close closes the connection. Not applicable for the HTTPClient, so just returns no error
func (c *HTTPClient) Close() error {
return nil
}

func (c *HTTPClient) generateHTTPClientForService(service rpcinterface.ServiceType) (*http.Client, error) {
var (
keyPair *tls.Certificate
Expand Down
5 changes: 5 additions & 0 deletions pkg/publichttpclient/httpclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,11 @@ func (c *HTTPClient) Do(req *rpcinterface.Request, v interface{}) (*http.Respons
return resp, err
}

// Close closes the connection. Not applicable for the HTTPClient, so just returns no error
func (c *HTTPClient) Close() error {
return nil
}

func (c *HTTPClient) generateHTTPClientForService(service rpcinterface.ServiceType) (*http.Client, error) {
var transport http.RoundTripper

Expand Down
5 changes: 5 additions & 0 deletions pkg/rpc/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,11 @@ func (c *Client) Do(req *rpcinterface.Request, v interface{}) (*http.Response, e
return c.activeClient.Do(req, v)
}

// Close calls the close method on the active client
func (c *Client) Close() error {
return c.activeClient.Close()
}

// The following has a bunch of methods that are currently only used for the websocket implementation

// SubscribeSelf subscribes to responses to requests from this service
Expand Down
1 change: 1 addition & 0 deletions pkg/rpcinterface/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
type Client interface {
NewRequest(service ServiceType, rpcEndpoint Endpoint, opt interface{}) (*Request, error)
Do(req *Request, v interface{}) (*http.Response, error)
Close() error
SetBaseURL(url *url.URL) error

// SetLogHandler sets a slog compatible log handler
Expand Down
41 changes: 31 additions & 10 deletions pkg/websocketclient/websocketclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@ type WebsocketClient struct {
conn *websocket.Conn
lock sync.Mutex

// listenCancel is the cancel method of the listen context to stop listening
listenCancel context.CancelFunc

// listenSyncActive is tracking whether a client has opted to temporarily listen in sync mode for ALL requests
listenSyncActive bool

Expand Down Expand Up @@ -188,6 +191,17 @@ func (c *WebsocketClient) Do(req *rpcinterface.Request, v interface{}) (*http.Re
return c.responseHelper(request, v)
}

// Close closes the client/websocket
func (c *WebsocketClient) Close() error {
if c.listenCancel != nil {
c.listenCancel()
}
if c.conn != nil {
return c.conn.Close()
}
return nil
}

// responseHelper implements the logic to either immediately return in async mode
// or to wait for the expected response up to the defined timeout and returns the
// response in a synchronous fashion
Expand Down Expand Up @@ -399,6 +413,8 @@ func (c *WebsocketClient) ensureConnection() error {
// passed to the handler to deal with
func (c *WebsocketClient) listen() {
if !c.listenSyncActive {
var ctx context.Context
ctx, c.listenCancel = context.WithCancel(context.Background())
c.listenSyncActive = true
defer func() {
c.listenSyncActive = false
Expand All @@ -412,18 +428,23 @@ func (c *WebsocketClient) listen() {
for {
_, message, err := c.conn.ReadMessage()
if err != nil {
c.logger.Error("Error reading message on chia websocket", "error", err.Error())
var closeError *websocket.CloseError
if !errors.As(err, &closeError) {
c.logger.Debug("Chia websocket sent close message, attempting to close connection...")
closeConnErr := c.conn.Close()
if closeConnErr != nil {
c.logger.Error("Error closing chia websocket connection", "error", closeConnErr.Error())
select {
case <-ctx.Done():
return
default:
c.logger.Error("Error reading message on chia websocket", "error", err.Error())
var closeError *websocket.CloseError
if !errors.As(err, &closeError) {
c.logger.Debug("Chia websocket sent close message, attempting to close connection...")
closeConnErr := c.conn.Close()
if closeConnErr != nil {
c.logger.Error("Error closing chia websocket connection", "error", closeConnErr.Error())
}
}
c.conn = nil
c.reconnectLoop()
continue
}
c.conn = nil
c.reconnectLoop()
continue
}
messageChan <- message
}
Expand Down

0 comments on commit 9bba038

Please sign in to comment.