From 3448a70a38ec55aabb92ff5b81dd334a924e3e39 Mon Sep 17 00:00:00 2001 From: Jorres Tarasov Date: Tue, 19 Mar 2024 16:41:19 +0300 Subject: [PATCH] fix: potential append race --- pkg/rolling/rolling.go | 27 +++++++++++++++++++-------- 1 file changed, 19 insertions(+), 8 deletions(-) diff --git a/pkg/rolling/rolling.go b/pkg/rolling/rolling.go index c8c3c2c..c15f10c 100644 --- a/pkg/rolling/rolling.go +++ b/pkg/rolling/rolling.go @@ -30,6 +30,10 @@ type Rolling struct { state *state opts options.RestartOptions restarter restarters.Restarter + + // TODO jorres@: maybe turn this into a local `map` + // variable in `processActionGroupStates` + completedActions []*Ydb_Maintenance.ActionUid } type state struct { @@ -231,9 +235,11 @@ func (r *Rolling) processActionGroupStates(actions []*Ydb_Maintenance.ActionGrou r.logger.Infof("%d ActionGroupStates moved to PERFORMED, will restart now...", len(performed)) - actionsCompletedThisStep := []*Ydb_Maintenance.ActionUid{} + r.completedActions = []*Ydb_Maintenance.ActionUid{} wg := new(sync.WaitGroup) + mutexForCompletedActions := new(sync.Mutex) + for _, gs := range performed { var ( as = gs.ActionStates[0] @@ -242,7 +248,7 @@ func (r *Rolling) processActionGroupStates(actions []*Ydb_Maintenance.ActionGrou ) if collections.Contains(r.state.unreportedButFinishedActionIds, as.ActionUid.ActionId) { - actionsCompletedThisStep = append(actionsCompletedThisStep, as.ActionUid) + r.completedActions = append(r.completedActions, as.ActionUid) r.logger.Debugf( "Node id %v already restarted, but CompleteAction failed on last iteration, "+ "so CMS does not know it is complete yet.", @@ -271,14 +277,11 @@ func (r *Rolling) processActionGroupStates(actions []*Ydb_Maintenance.ActionGrou r.state.retriesMadeForNode[node.NodeId]++ if r.state.retriesMadeForNode[node.NodeId] == r.opts.RestartRetryNumber { - // TODO reduce copypaste with literally 5 lines below - r.state.unreportedButFinishedActionIds = append(r.state.unreportedButFinishedActionIds, as.ActionUid.ActionId) - actionsCompletedThisStep = append(actionsCompletedThisStep, as.ActionUid) + r.atomicRememberComplete(mutexForCompletedActions, as.ActionUid) r.logger.Warnf("Failed to retry node %v specified number of times %v", node.NodeId, r.opts.RestartRetryNumber) } } else { - r.state.unreportedButFinishedActionIds = append(r.state.unreportedButFinishedActionIds, as.ActionUid.ActionId) - actionsCompletedThisStep = append(actionsCompletedThisStep, as.ActionUid) + r.atomicRememberComplete(mutexForCompletedActions, as.ActionUid) r.logger.Debugf("Successfully restarted node with id: %d", node.NodeId) } }() @@ -286,7 +289,7 @@ func (r *Rolling) processActionGroupStates(actions []*Ydb_Maintenance.ActionGrou wg.Wait() - result, err := r.cms.CompleteAction(actionsCompletedThisStep) + result, err := r.cms.CompleteAction(r.completedActions) if err != nil { r.logger.Warnf("Failed to complete action: %+v", err) return false @@ -298,6 +301,14 @@ func (r *Rolling) processActionGroupStates(actions []*Ydb_Maintenance.ActionGrou return len(actions) == len(result.ActionStatuses) } +func (r *Rolling) atomicRememberComplete(m *sync.Mutex, actionUID *Ydb_Maintenance.ActionUid) { + m.Lock() + defer m.Unlock() + + r.state.unreportedButFinishedActionIds = append(r.state.unreportedButFinishedActionIds, actionUID.ActionId) + r.completedActions = append(r.completedActions, actionUID) +} + func (r *Rolling) prepareState() (*state, error) { tenants, err := r.cms.Tenants() if err != nil {