Skip to content

Commit

Permalink
Backport of services: retry failed Nomad service deregistrations from…
Browse files Browse the repository at this point in the history
… client into release/1.7.x (#20607)

Co-authored-by: Tim Gross <[email protected]>
  • Loading branch information
hc-github-team-nomad-core and tgross authored May 16, 2024
1 parent 297e3ff commit 9199d4b
Show file tree
Hide file tree
Showing 3 changed files with 99 additions and 30 deletions.
3 changes: 3 additions & 0 deletions .changelog/20596.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:bug
services: Added retry to Nomad service deregistration RPCs during alloc stop
```
78 changes: 60 additions & 18 deletions client/serviceregistration/nsd/nsd.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,13 @@ import (
"fmt"
"strings"
"sync"
"time"

"github.com/hashicorp/go-hclog"
"github.com/hashicorp/go-multierror"
"github.com/hashicorp/nomad/client/serviceregistration"
"github.com/hashicorp/nomad/nomad/structs"
"oss.indeed.com/go/libtime/decay"
)

type ServiceRegistrationHandler struct {
Expand All @@ -34,6 +36,9 @@ type ServiceRegistrationHandler struct {
// shutDownCh coordinates shutting down the handler and any long-running
// processes, such as the RPC retry.
shutDownCh chan struct{}

backoffMax time.Duration
backoffInitial time.Duration
}

// ServiceRegistrationHandlerCfg holds critical information used during the
Expand Down Expand Up @@ -62,20 +67,39 @@ type ServiceRegistrationHandlerCfg struct {
// CheckWatcher watches checks of services in the Nomad service provider,
// and restarts associated tasks in accordance with their check_restart block.
CheckWatcher serviceregistration.CheckWatcher

// BackoffMax is the maximum amont of time failed RemoveWorkload RPCs will
// be retried, defaults to 1s
BackoffMax time.Duration

// BackoffInitial is the initial gap before retrying failed RemoveWorkload
// RPCs, defaults to 100ms. This will double each attempt until BackoffMax
// is reached
BackoffInitial time.Duration
}

// NewServiceRegistrationHandler returns a ready to use
// ServiceRegistrationHandler which implements the serviceregistration.Handler
// interface.
func NewServiceRegistrationHandler(log hclog.Logger, cfg *ServiceRegistrationHandlerCfg) serviceregistration.Handler {
go cfg.CheckWatcher.Run(context.TODO())
return &ServiceRegistrationHandler{

s := &ServiceRegistrationHandler{
cfg: cfg,
log: log.Named("service_registration.nomad"),
registrationEnabled: cfg.Enabled,
checkWatcher: cfg.CheckWatcher,
shutDownCh: make(chan struct{}),
backoffMax: cfg.BackoffMax,
backoffInitial: cfg.BackoffInitial,
}
if s.backoffInitial == 0 {
s.backoffInitial = 100 * time.Millisecond
}
if s.backoffMax == 0 {
s.backoffMax = time.Second
}
return s
}

func (s *ServiceRegistrationHandler) RegisterWorkload(workload *serviceregistration.WorkloadServices) error {
Expand Down Expand Up @@ -183,26 +207,44 @@ func (s *ServiceRegistrationHandler) removeWorkload(

var deleteResp structs.ServiceRegistrationDeleteByIDResponse

err := s.cfg.RPCFn(structs.ServiceRegistrationDeleteByIDRPCMethod, &deleteArgs, &deleteResp)
if err == nil {
return
backoffOpts := decay.BackoffOptions{
MaxSleepTime: s.backoffMax,
InitialGapSize: s.backoffInitial,
}
backoffErr := decay.Backoff(func() (bool, error) {

// The Nomad API exposes service registration deletion to handle
// orphaned service registrations. In the event a service is removed
// accidentally that is still running, we will hit this error when we
// eventually want to remove it. We therefore want to handle this,
// while ensuring the operator can see.
if strings.Contains(err.Error(), "service registration not found") {
s.log.Info("attempted to delete non-existent service registration",
"service_id", id, "namespace", workload.ProviderNamespace)
return
}
select {
case <-s.shutDownCh:
return true, nil
default:
}

err := s.cfg.RPCFn(structs.ServiceRegistrationDeleteByIDRPCMethod,
&deleteArgs, &deleteResp)
if err == nil {
return false, nil
}

// Log the error as there is nothing left to do, so the operator can see it
// and identify any problems.
s.log.Error("failed to delete service registration",
"error", err, "service_id", id, "namespace", workload.ProviderNamespace)
// The Nomad API exposes service registration deletion to handle
// orphaned service registrations. In the event a service is removed
// accidentally that is still running, we will hit this error when we
// eventually want to remove it. We therefore want to handle this,
// while ensuring the operator can see.
if strings.Contains(err.Error(), "service registration not found") {
s.log.Info("attempted to delete non-existent service registration",
"service_id", id, "namespace", workload.ProviderNamespace)
return false, nil
}

return true, err
}, backoffOpts)

if backoffErr != nil {
// Log the error as there is nothing left to do, so the operator can see
// it and identify any problems.
s.log.Error("failed to delete service registration",
"error", backoffErr, "service_id", id, "namespace", workload.ProviderNamespace)
}
}

func (s *ServiceRegistrationHandler) UpdateWorkload(old, new *serviceregistration.WorkloadServices) error {
Expand Down
48 changes: 36 additions & 12 deletions client/serviceregistration/nsd/nsd_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,10 @@ import (

"github.com/hashicorp/go-hclog"
"github.com/hashicorp/nomad/client/serviceregistration"
"github.com/hashicorp/nomad/helper/testlog"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/shoenig/test"
"github.com/shoenig/test/must"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
Expand Down Expand Up @@ -110,6 +112,7 @@ func TestServiceRegistrationHandler_RemoveWorkload(t *testing.T) {
name string
inputCfg *ServiceRegistrationHandlerCfg
inputWorkload *serviceregistration.WorkloadServices
returnedDeleteErr error
expectedRPCs map[string]int
expectedError error
expWatch, expUnWatch int
Expand Down Expand Up @@ -138,26 +141,39 @@ func TestServiceRegistrationHandler_RemoveWorkload(t *testing.T) {
expWatch: 0,
expUnWatch: 2,
},
{
name: "failed deregister",
inputCfg: &ServiceRegistrationHandlerCfg{
Enabled: true,
CheckWatcher: new(mockCheckWatcher),
BackoffMax: 75 * time.Millisecond,
BackoffInitial: 50 * time.Millisecond,
},
inputWorkload: mockWorkload(),
returnedDeleteErr: errors.New("unrecoverable error"),
expectedRPCs: map[string]int{structs.ServiceRegistrationDeleteByIDRPCMethod: 4},
expectedError: nil,
expWatch: 0,
expUnWatch: 2,
},
}

// Create a logger we can use for all tests.
log := hclog.NewNullLogger()

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {

// Add the mock RPC functionality.
mockRPC := mockRPC{callCounts: map[string]int{}}
mockRPC := mockRPC{
callCounts: map[string]int{},
deleteResponseErr: tc.returnedDeleteErr,
}
tc.inputCfg.RPCFn = mockRPC.RPC

// Create the handler and run the tests.
h := NewServiceRegistrationHandler(log, tc.inputCfg)
h := NewServiceRegistrationHandler(testlog.HCLogger(t), tc.inputCfg)

h.RemoveWorkload(tc.inputWorkload)

require.Eventually(t, func() bool {
return assert.Equal(t, tc.expectedRPCs, mockRPC.calls())
}, 100*time.Millisecond, 10*time.Millisecond)
must.Eq(t, tc.expectedRPCs, mockRPC.calls())
tc.inputCfg.CheckWatcher.(*mockCheckWatcher).assert(t, tc.expWatch, tc.expUnWatch)
})
}
Expand Down Expand Up @@ -647,6 +663,9 @@ type mockRPC struct {
// lock should be used to access this.
callCounts map[string]int
l sync.RWMutex

deleteResponseErr error
upsertResponseErr error
}

// calls returns the mapping counting the number of calls made to each RPC
Expand All @@ -659,12 +678,17 @@ func (mr *mockRPC) calls() map[string]int {

// RPC mocks the server RPCs, acting as though any request succeeds.
func (mr *mockRPC) RPC(method string, _, _ interface{}) error {
mr.l.Lock()
defer mr.l.Unlock()

switch method {
case structs.ServiceRegistrationUpsertRPCMethod, structs.ServiceRegistrationDeleteByIDRPCMethod:
mr.l.Lock()
case structs.ServiceRegistrationUpsertRPCMethod:
mr.callCounts[method]++
return mr.upsertResponseErr

case structs.ServiceRegistrationDeleteByIDRPCMethod:
mr.callCounts[method]++
mr.l.Unlock()
return nil
return mr.deleteResponseErr
default:
return fmt.Errorf("unexpected RPC method: %v", method)
}
Expand Down

0 comments on commit 9199d4b

Please sign in to comment.