From 9bba0386c1de6d813fb00d82456effa5989aaa56 Mon Sep 17 00:00:00 2001 From: Chris Marslender Date: Tue, 12 Nov 2024 17:18:55 -0600 Subject: [PATCH] Add close support to the websocket clients (#171) --- pkg/httpclient/httpclient.go | 5 ++++ pkg/publichttpclient/httpclient.go | 5 ++++ pkg/rpc/client.go | 5 ++++ pkg/rpcinterface/client.go | 1 + pkg/websocketclient/websocketclient.go | 41 +++++++++++++++++++------- 5 files changed, 47 insertions(+), 10 deletions(-) diff --git a/pkg/httpclient/httpclient.go b/pkg/httpclient/httpclient.go index b8692b9..407b214 100644 --- a/pkg/httpclient/httpclient.go +++ b/pkg/httpclient/httpclient.go @@ -193,6 +193,11 @@ func (c *HTTPClient) Do(req *rpcinterface.Request, v interface{}) (*http.Respons return resp, err } +// Close closes the connection. Not applicable for the HTTPClient, so just returns no error +func (c *HTTPClient) Close() error { + return nil +} + func (c *HTTPClient) generateHTTPClientForService(service rpcinterface.ServiceType) (*http.Client, error) { var ( keyPair *tls.Certificate diff --git a/pkg/publichttpclient/httpclient.go b/pkg/publichttpclient/httpclient.go index ed6c852..7ca935e 100644 --- a/pkg/publichttpclient/httpclient.go +++ b/pkg/publichttpclient/httpclient.go @@ -154,6 +154,11 @@ func (c *HTTPClient) Do(req *rpcinterface.Request, v interface{}) (*http.Respons return resp, err } +// Close closes the connection. Not applicable for the HTTPClient, so just returns no error +func (c *HTTPClient) Close() error { + return nil +} + func (c *HTTPClient) generateHTTPClientForService(service rpcinterface.ServiceType) (*http.Client, error) { var transport http.RoundTripper diff --git a/pkg/rpc/client.go b/pkg/rpc/client.go index be28104..d4e2280 100644 --- a/pkg/rpc/client.go +++ b/pkg/rpc/client.go @@ -91,6 +91,11 @@ func (c *Client) Do(req *rpcinterface.Request, v interface{}) (*http.Response, e return c.activeClient.Do(req, v) } +// Close calls the close method on the active client +func (c *Client) Close() error { + return c.activeClient.Close() +} + // The following has a bunch of methods that are currently only used for the websocket implementation // SubscribeSelf subscribes to responses to requests from this service diff --git a/pkg/rpcinterface/client.go b/pkg/rpcinterface/client.go index 1de52ff..6453dd4 100644 --- a/pkg/rpcinterface/client.go +++ b/pkg/rpcinterface/client.go @@ -13,6 +13,7 @@ import ( type Client interface { NewRequest(service ServiceType, rpcEndpoint Endpoint, opt interface{}) (*Request, error) Do(req *Request, v interface{}) (*http.Response, error) + Close() error SetBaseURL(url *url.URL) error // SetLogHandler sets a slog compatible log handler diff --git a/pkg/websocketclient/websocketclient.go b/pkg/websocketclient/websocketclient.go index e1b7621..6608391 100644 --- a/pkg/websocketclient/websocketclient.go +++ b/pkg/websocketclient/websocketclient.go @@ -40,6 +40,9 @@ type WebsocketClient struct { conn *websocket.Conn lock sync.Mutex + // listenCancel is the cancel method of the listen context to stop listening + listenCancel context.CancelFunc + // listenSyncActive is tracking whether a client has opted to temporarily listen in sync mode for ALL requests listenSyncActive bool @@ -188,6 +191,17 @@ func (c *WebsocketClient) Do(req *rpcinterface.Request, v interface{}) (*http.Re return c.responseHelper(request, v) } +// Close closes the client/websocket +func (c *WebsocketClient) Close() error { + if c.listenCancel != nil { + c.listenCancel() + } + if c.conn != nil { + return c.conn.Close() + } + return nil +} + // responseHelper implements the logic to either immediately return in async mode // or to wait for the expected response up to the defined timeout and returns the // response in a synchronous fashion @@ -399,6 +413,8 @@ func (c *WebsocketClient) ensureConnection() error { // passed to the handler to deal with func (c *WebsocketClient) listen() { if !c.listenSyncActive { + var ctx context.Context + ctx, c.listenCancel = context.WithCancel(context.Background()) c.listenSyncActive = true defer func() { c.listenSyncActive = false @@ -412,18 +428,23 @@ func (c *WebsocketClient) listen() { for { _, message, err := c.conn.ReadMessage() if err != nil { - c.logger.Error("Error reading message on chia websocket", "error", err.Error()) - var closeError *websocket.CloseError - if !errors.As(err, &closeError) { - c.logger.Debug("Chia websocket sent close message, attempting to close connection...") - closeConnErr := c.conn.Close() - if closeConnErr != nil { - c.logger.Error("Error closing chia websocket connection", "error", closeConnErr.Error()) + select { + case <-ctx.Done(): + return + default: + c.logger.Error("Error reading message on chia websocket", "error", err.Error()) + var closeError *websocket.CloseError + if !errors.As(err, &closeError) { + c.logger.Debug("Chia websocket sent close message, attempting to close connection...") + closeConnErr := c.conn.Close() + if closeConnErr != nil { + c.logger.Error("Error closing chia websocket connection", "error", closeConnErr.Error()) + } } + c.conn = nil + c.reconnectLoop() + continue } - c.conn = nil - c.reconnectLoop() - continue } messageChan <- message }