Skip to content

Commit

Permalink
fix: potential append race
Browse files Browse the repository at this point in the history
  • Loading branch information
Jorres committed Mar 19, 2024
1 parent 0312590 commit 3448a70
Showing 1 changed file with 19 additions and 8 deletions.
27 changes: 19 additions & 8 deletions pkg/rolling/rolling.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,10 @@ type Rolling struct {
state *state
opts options.RestartOptions
restarter restarters.Restarter

// TODO jorres@: maybe turn this into a local `map`
// variable in `processActionGroupStates`
completedActions []*Ydb_Maintenance.ActionUid
}

type state struct {
Expand Down Expand Up @@ -231,9 +235,11 @@ func (r *Rolling) processActionGroupStates(actions []*Ydb_Maintenance.ActionGrou

r.logger.Infof("%d ActionGroupStates moved to PERFORMED, will restart now...", len(performed))

actionsCompletedThisStep := []*Ydb_Maintenance.ActionUid{}
r.completedActions = []*Ydb_Maintenance.ActionUid{}
wg := new(sync.WaitGroup)

mutexForCompletedActions := new(sync.Mutex)

for _, gs := range performed {
var (
as = gs.ActionStates[0]
Expand All @@ -242,7 +248,7 @@ func (r *Rolling) processActionGroupStates(actions []*Ydb_Maintenance.ActionGrou
)

if collections.Contains(r.state.unreportedButFinishedActionIds, as.ActionUid.ActionId) {
actionsCompletedThisStep = append(actionsCompletedThisStep, as.ActionUid)
r.completedActions = append(r.completedActions, as.ActionUid)
r.logger.Debugf(
"Node id %v already restarted, but CompleteAction failed on last iteration, "+
"so CMS does not know it is complete yet.",
Expand Down Expand Up @@ -271,22 +277,19 @@ func (r *Rolling) processActionGroupStates(actions []*Ydb_Maintenance.ActionGrou
r.state.retriesMadeForNode[node.NodeId]++

if r.state.retriesMadeForNode[node.NodeId] == r.opts.RestartRetryNumber {
// TODO reduce copypaste with literally 5 lines below
r.state.unreportedButFinishedActionIds = append(r.state.unreportedButFinishedActionIds, as.ActionUid.ActionId)
actionsCompletedThisStep = append(actionsCompletedThisStep, as.ActionUid)
r.atomicRememberComplete(mutexForCompletedActions, as.ActionUid)
r.logger.Warnf("Failed to retry node %v specified number of times %v", node.NodeId, r.opts.RestartRetryNumber)
}
} else {
r.state.unreportedButFinishedActionIds = append(r.state.unreportedButFinishedActionIds, as.ActionUid.ActionId)
actionsCompletedThisStep = append(actionsCompletedThisStep, as.ActionUid)
r.atomicRememberComplete(mutexForCompletedActions, as.ActionUid)
r.logger.Debugf("Successfully restarted node with id: %d", node.NodeId)
}
}()
}

wg.Wait()

result, err := r.cms.CompleteAction(actionsCompletedThisStep)
result, err := r.cms.CompleteAction(r.completedActions)
if err != nil {
r.logger.Warnf("Failed to complete action: %+v", err)
return false
Expand All @@ -298,6 +301,14 @@ func (r *Rolling) processActionGroupStates(actions []*Ydb_Maintenance.ActionGrou
return len(actions) == len(result.ActionStatuses)
}

func (r *Rolling) atomicRememberComplete(m *sync.Mutex, actionUID *Ydb_Maintenance.ActionUid) {
m.Lock()
defer m.Unlock()

r.state.unreportedButFinishedActionIds = append(r.state.unreportedButFinishedActionIds, actionUID.ActionId)
r.completedActions = append(r.completedActions, actionUID)
}

func (r *Rolling) prepareState() (*state, error) {
tenants, err := r.cms.Tenants()
if err != nil {
Expand Down

0 comments on commit 3448a70

Please sign in to comment.