From 6756eec778ae0b69263301700a7d69b326db77db Mon Sep 17 00:00:00 2001 From: Anis Eleuch Date: Tue, 27 Aug 2024 10:24:19 +0100 Subject: [PATCH] entrust: Close body to avoid some conn leaks Also ensure that we drain the body before closing to it to reuse the existing connections when possible. --- cmd/kes/update.go | 6 ++--- internal/http/close.go | 29 ++++++++++++++++++++++++ internal/http/retry.go | 9 ++++++++ internal/keystore/aws/secrets-manager.go | 2 +- internal/keystore/azure/key-vault.go | 2 +- internal/keystore/entrust/keycontrol.go | 9 +++++--- internal/keystore/fortanix/keystore.go | 4 ++-- internal/keystore/gcp/secret-manager.go | 2 +- internal/keystore/gemalto/client.go | 2 +- internal/keystore/gemalto/key-secure.go | 10 ++++---- 10 files changed, 58 insertions(+), 17 deletions(-) create mode 100644 internal/http/close.go diff --git a/cmd/kes/update.go b/cmd/kes/update.go index 2ba846c5..3893a263 100644 --- a/cmd/kes/update.go +++ b/cmd/kes/update.go @@ -138,7 +138,7 @@ func updateCmd(args []string) { if err != nil { cli.Fatalf("failed to download KES release information: %v", err) } - defer resp.Body.Close() + defer xhttp.DrainBody(resp.Body) var response map[string]any if err = json.NewDecoder(mem.LimitReader(resp.Body, MaxBody)).Decode(&response); err != nil { @@ -195,7 +195,7 @@ func updateCmd(args []string) { if err != nil { cli.Fatalf("failed to download minisign signature: %v", err) } - defer resp.Body.Close() + defer xhttp.DrainBody(resp.Body) bytes, err := io.ReadAll(io.LimitReader(resp.Body, int64(1*mem.MB))) if err != nil { @@ -218,7 +218,7 @@ func updateCmd(args []string) { if err != nil { cli.Fatalf("failed to download binary: %v", err) } - defer resp.Body.Close() + defer xhttp.DrainBody(resp.Body) // If the outputFile does not exist we create an empty // one such that selfupdate can do a successful rename diff --git a/internal/http/close.go b/internal/http/close.go new file mode 100644 index 00000000..ac336375 --- /dev/null +++ b/internal/http/close.go @@ -0,0 +1,29 @@ +// Copyright 2024 - MinIO, Inc. All rights reserved. +// Use of this source code is governed by the AGPLv3 +// license that can be found in the LICENSE file. + +package http + +import ( + "io" +) + +// DrainBody close non nil response with any response Body. +// convenient wrapper to drain any remaining data on response body. +// +// Subsequently this allows golang http RoundTripper +// to reuse the same connection for future requests. +func DrainBody(respBody io.ReadCloser) { + // Callers should close resp.Body when done reading from it. + // If resp.Body is not closed, the Client's underlying RoundTripper + // (typically Transport) may not be able to reuse a persistent TCP + // connection to the server for a subsequent "keep-alive" request. + if respBody != nil { + // Drain any remaining Body and then close the connection. + // Without this closing connection would disallow re-using + // the same connection for future uses. + // - http://stackoverflow.com/a/17961593/4465767 + io.Copy(io.Discard, respBody) + respBody.Close() + } +} diff --git a/internal/http/retry.go b/internal/http/retry.go index fa7ae9d3..9cba2080 100644 --- a/internal/http/retry.go +++ b/internal/http/retry.go @@ -188,6 +188,11 @@ func (r *Retry) Do(req *http.Request) (*http.Response, error) { resp, err := r.Client.Do(req) for N > 0 && (isTemporary(err) || (resp != nil && resp.StatusCode >= http.StatusInternalServerError)) { + if resp != nil { + xhttp.DrainBody(resp.Body) + resp = nil + } + N-- var delay time.Duration switch { @@ -222,6 +227,10 @@ func (r *Retry) Do(req *http.Request) (*http.Response, error) { resp, err = r.Client.Do(req) // Now, retry. } if isTemporary(err) { + if resp != nil { + xhttp.DrainBody(resp.Body) + resp = nil + } // If the request still fails with a temporary error // we wrap the error to provide more information to the // caller. diff --git a/internal/keystore/aws/secrets-manager.go b/internal/keystore/aws/secrets-manager.go index 96a9c0a9..7b4620f6 100644 --- a/internal/keystore/aws/secrets-manager.go +++ b/internal/keystore/aws/secrets-manager.go @@ -116,7 +116,7 @@ func (s *Store) Status(ctx context.Context) (kes.KeyStoreState, error) { if err != nil { return kes.KeyStoreState{}, &keystore.ErrUnreachable{Err: err} } - defer resp.Body.Close() + defer xhttp.DrainBody(resp.Body) return kes.KeyStoreState{ Latency: time.Since(start), diff --git a/internal/keystore/azure/key-vault.go b/internal/keystore/azure/key-vault.go index 654d0a23..5e1fca58 100644 --- a/internal/keystore/azure/key-vault.go +++ b/internal/keystore/azure/key-vault.go @@ -63,7 +63,7 @@ func (s *Store) Status(ctx context.Context) (kes.KeyStoreState, error) { if err != nil { return kes.KeyStoreState{}, &keystore.ErrUnreachable{Err: err} } - defer resp.Body.Close() + defer xhttp.DrainBody(resp.Body) return kes.KeyStoreState{ Latency: time.Since(start), diff --git a/internal/keystore/entrust/keycontrol.go b/internal/keystore/entrust/keycontrol.go index a5d06a17..33a621c3 100644 --- a/internal/keystore/entrust/keycontrol.go +++ b/internal/keystore/entrust/keycontrol.go @@ -148,6 +148,7 @@ func (kc *KeyControl) Status(ctx context.Context) (kes.KeyStoreState, error) { Err: fmt.Errorf("keycontrol: failed to fetch status: %v", err), } } + defer xhttp.DrainBody(resp.Body) latency := time.Since(start) if resp.StatusCode != http.StatusOK { @@ -247,7 +248,7 @@ func (kc *KeyControl) Get(ctx context.Context, name string) ([]byte, error) { if err != nil { return nil, fmt.Errorf("keycontrol: failed to fetch key: %v", err) } - defer resp.Body.Close() + defer xhttp.DrainBody(resp.Body) if resp.StatusCode != http.StatusOK { return nil, parseErrorResponse(resp) @@ -295,6 +296,7 @@ func (kc *KeyControl) Delete(ctx context.Context, name string) error { if err != nil { return fmt.Errorf("keycontrol: failed to delete key: %v", err) } + defer xhttp.DrainBody(resp.Body) if resp.StatusCode != http.StatusOK { return parseErrorResponse(resp) } @@ -380,6 +382,7 @@ func (kc *KeyControl) List(ctx context.Context, prefix string, n int) ([]string, if err != nil { return nil, "", fmt.Errorf("keycontrol: failed to list keys: %v", err) } + defer xhttp.DrainBody(resp.Body) if resp.StatusCode != http.StatusOK { return nil, "", parseErrorResponse(resp) } @@ -491,7 +494,7 @@ func login(ctx context.Context, rt http.RoundTripper, endpoint, vaultID, usernam if err != nil { return "", time.Time{}, err } - defer resp.Body.Close() + defer xhttp.DrainBody(resp.Body) if resp.StatusCode != http.StatusOK { return "", time.Time{}, parseErrorResponse(resp) @@ -539,7 +542,7 @@ func renewToken(ctx context.Context, rt http.RoundTripper, endpoint, token strin if err != nil { return "", time.Time{}, err } - defer resp.Body.Close() + defer xhttp.DrainBody(resp.Body) if resp.StatusCode != http.StatusOK { return "", time.Time{}, parseErrorResponse(resp) diff --git a/internal/keystore/fortanix/keystore.go b/internal/keystore/fortanix/keystore.go index 145ee3b0..358f4036 100644 --- a/internal/keystore/fortanix/keystore.go +++ b/internal/keystore/fortanix/keystore.go @@ -193,7 +193,7 @@ func (s *Store) Status(ctx context.Context) (kes.KeyStoreState, error) { if err != nil { return kes.KeyStoreState{}, &keystore.ErrUnreachable{Err: err} } - defer resp.Body.Close() + defer xhttp.DrainBody(resp.Body) return kes.KeyStoreState{ Latency: time.Since(start), @@ -476,7 +476,7 @@ func parseErrorResponse(resp *http.Response) error { if resp.Body == nil { return kesdk.NewError(resp.StatusCode, resp.Status) } - defer resp.Body.Close() + defer xhttp.DrainBody(resp.Body) const MaxSize = 1 * mem.MiB size := mem.Size(resp.ContentLength) diff --git a/internal/keystore/gcp/secret-manager.go b/internal/keystore/gcp/secret-manager.go index b7a14609..0d8fae7e 100644 --- a/internal/keystore/gcp/secret-manager.go +++ b/internal/keystore/gcp/secret-manager.go @@ -116,7 +116,7 @@ func (s *Store) Status(ctx context.Context) (kes.KeyStoreState, error) { if err != nil { return kes.KeyStoreState{}, &keystore.ErrUnreachable{Err: err} } - defer resp.Body.Close() + defer xhttp.DrainBody(resp.Body) return kes.KeyStoreState{ Latency: time.Since(start), diff --git a/internal/keystore/gemalto/client.go b/internal/keystore/gemalto/client.go index 41bc025e..79fd50c0 100644 --- a/internal/keystore/gemalto/client.go +++ b/internal/keystore/gemalto/client.go @@ -78,7 +78,7 @@ func (c *client) Authenticate(ctx context.Context, endpoint string, login Creden if err != nil { return err } - defer resp.Body.Close() + defer xhttp.DrainBody(resp.Body) if resp.StatusCode != http.StatusOK { response, err := parseServerError(resp) diff --git a/internal/keystore/gemalto/key-secure.go b/internal/keystore/gemalto/key-secure.go index feae5159..b530513f 100644 --- a/internal/keystore/gemalto/key-secure.go +++ b/internal/keystore/gemalto/key-secure.go @@ -126,7 +126,7 @@ func (s *Store) Status(ctx context.Context) (kes.KeyStoreState, error) { if err != nil { return kes.KeyStoreState{}, &keystore.ErrUnreachable{Err: err} } - defer resp.Body.Close() + defer xhttp.DrainBody(resp.Body) return kes.KeyStoreState{ Latency: time.Since(start), @@ -167,7 +167,7 @@ func (s *Store) Create(ctx context.Context, name string, value []byte) error { if err != nil { return fmt.Errorf("gemalto: failed to create key '%s': %v", name, err) } - defer resp.Body.Close() + defer xhttp.DrainBody(resp.Body) if resp.StatusCode == http.StatusConflict { return kesdk.ErrKeyExists @@ -210,7 +210,7 @@ func (s *Store) Get(ctx context.Context, name string) ([]byte, error) { if err != nil { return nil, fmt.Errorf("gemalto: failed to access key '%s': %v", name, err) } - defer resp.Body.Close() + defer xhttp.DrainBody(resp.Body) if resp.StatusCode == http.StatusNotFound { return nil, kesdk.ErrKeyNotFound @@ -250,7 +250,7 @@ func (s *Store) Delete(ctx context.Context, name string) error { if err != nil { return fmt.Errorf("gemalto: failed to delete key '%s': %v", name, err) } - defer resp.Body.Close() + defer xhttp.DrainBody(resp.Body) if resp.StatusCode != http.StatusNoContent && resp.StatusCode != http.StatusNotFound { // BUG(aead): The KeySecure server returns 404 NotFound if the @@ -320,7 +320,7 @@ func (s *Store) List(ctx context.Context, prefix string, n int) ([]string, strin if err != nil { return nil, "", err } - defer resp.Body.Close() + defer xhttp.DrainBody(resp.Body) if resp.StatusCode != http.StatusOK { response, err := parseServerError(resp)