Skip to content

Commit

Permalink
fix: worker assignment race (#397)
Browse files Browse the repository at this point in the history
* new api-contract for workflow run events

* feat: initial implementation for new subscribe listener

* fix: sync issues and send workflow runs immediately

* refactor: add context to all engine db queries, fix deadlocking query

* fix: use new ctx for deleting dispatcher and ticker

* fix: retry no worker attempts

* Update internal/repository/prisma/get_group_key_run.go

Co-authored-by: abelanger5 <[email protected]>

---------

Co-authored-by: Alexander Belanger <[email protected]>
  • Loading branch information
grutt and abelanger5 authored Apr 19, 2024
1 parent 4ce1dd8 commit 4c39dce
Show file tree
Hide file tree
Showing 4 changed files with 83 additions and 24 deletions.
2 changes: 1 addition & 1 deletion internal/repository/prisma/dbsqlc/step_runs.sql
Original file line number Diff line number Diff line change
Expand Up @@ -457,7 +457,7 @@ WITH selected_worker AS (
ws."workerId" IS NULL OR
ws."slots" > 0
)
ORDER BY ws."slots" DESC NULLS FIRST
ORDER BY ws."slots" DESC NULLS FIRST, RANDOM()
LIMIT 1
),
step_run AS (
Expand Down
2 changes: 1 addition & 1 deletion internal/repository/prisma/dbsqlc/step_runs.sql.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions internal/repository/prisma/get_group_key_run.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func (s *getGroupKeyRunRepository) AssignGetGroupKeyRunToWorker(ctx context.Cont
// var assigned
var assigned *dbsqlc.AssignGetGroupKeyRunToWorkerRow

err = retrier(s.l, func() (err error) {
err = deadlockRetry(s.l, func() (err error) {
assigned, err = s.queries.AssignGetGroupKeyRunToWorker(ctx, s.pool, dbsqlc.AssignGetGroupKeyRunToWorkerParams{
Getgroupkeyrunid: sqlchelpers.UUIDFromStr(getGroupKeyRunId),
Tenantid: sqlchelpers.UUIDFromStr(tenantId),
Expand Down Expand Up @@ -74,7 +74,7 @@ func (s *getGroupKeyRunRepository) AssignGetGroupKeyRunToTicker(ctx context.Cont
// var assigned
var assigned *dbsqlc.AssignGetGroupKeyRunToTickerRow

err = retrier(s.l, func() (err error) {
err = deadlockRetry(s.l, func() (err error) {
assigned, err = s.queries.AssignGetGroupKeyRunToTicker(ctx, s.pool, dbsqlc.AssignGetGroupKeyRunToTickerParams{
Getgroupkeyrunid: sqlchelpers.UUIDFromStr(getGroupKeyRunId),
Tenantid: sqlchelpers.UUIDFromStr(tenantId),
Expand Down
99 changes: 79 additions & 20 deletions internal/repository/prisma/step_run.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,22 +248,34 @@ func (s *stepRunEngineRepository) ListStepRunsToReassign(ctx context.Context, te
return stepRuns, nil
}

var retrier = func(l *zerolog.Logger, f func() error) error {
var deadlockRetry = func(l *zerolog.Logger, f func() error) error {
return genericRetry(l, 3, f, "deadlock", func(err error) bool {
return strings.Contains(err.Error(), "deadlock detected")
})
}

var unassignedRetry = func(l *zerolog.Logger, f func() error) error {
return genericRetry(l, 5, f, "unassigned", func(err error) bool {
return errors.Is(err, repository.ErrNoWorkerAvailable)
})
}

var genericRetry = func(l *zerolog.Logger, maxRetries int, f func() error, msg string, condition func(err error) bool) error {
retries := 0

for {
err := f()

if err != nil {
// deadlock detected, retry
if strings.Contains(err.Error(), "deadlock detected") {
// condition detected, retry
if condition(err) {
retries++

if retries > 3 {
if retries > maxRetries {
return err
}

l.Err(err).Msgf("deadlock detected, retry %d", retries)
l.Err(err).Msgf("retry (%s) condition met, retry %d", msg, retries)

// sleep with jitter
sleepWithJitter(100*time.Millisecond, 300*time.Millisecond)
Expand All @@ -274,7 +286,7 @@ var retrier = func(l *zerolog.Logger, f func() error) error {

if err == nil {
if retries > 0 {
l.Info().Msgf("deadlock resolved after %d retries", retries)
l.Info().Msgf("retry (%s) condition resolved after %d retries", msg, retries)
}

break
Expand All @@ -284,11 +296,11 @@ var retrier = func(l *zerolog.Logger, f func() error) error {
return nil
}

func (s *stepRunEngineRepository) AssignStepRunToWorker(ctx context.Context, stepRun *dbsqlc.GetStepRunForEngineRow) (string, string, error) {
func (s *stepRunEngineRepository) incrementWorkerSemaphore(ctx context.Context, stepRun *dbsqlc.GetStepRunForEngineRow) error {
tx, err := s.pool.Begin(ctx)

if err != nil {
return "", "", err
return err
}

defer deferRollback(ctx, s.l, tx.Rollback)
Expand All @@ -304,7 +316,23 @@ func (s *stepRunEngineRepository) AssignStepRunToWorker(ctx context.Context, ste
isNoRowsErr := err != nil && errors.Is(err, pgx.ErrNoRows)

if err != nil && !isNoRowsErr {
return "", "", fmt.Errorf("could not upsert old worker semaphore: %w", err)
return fmt.Errorf("could not upsert old worker semaphore: %w", err)
}

return nil
}

func (s *stepRunEngineRepository) assignStepRunToWorkerAttempt(ctx context.Context, stepRun *dbsqlc.GetStepRunForEngineRow) (*dbsqlc.AssignStepRunToWorkerRow, error) {
tx, err := s.pool.Begin(ctx)

if err != nil {
return nil, err
}

defer deferRollback(ctx, s.l, tx.Rollback)

if err != nil {
return nil, err
}

assigned, err := s.queries.AssignStepRunToWorker(ctx, tx, dbsqlc.AssignStepRunToWorkerParams{
Expand All @@ -316,10 +344,10 @@ func (s *stepRunEngineRepository) AssignStepRunToWorker(ctx context.Context, ste

if err != nil {
if errors.Is(err, pgx.ErrNoRows) {
return "", "", repository.ErrNoWorkerAvailable
return nil, repository.ErrNoWorkerAvailable
}

return "", "", fmt.Errorf("query to assign worker failed: %w", err)
return nil, fmt.Errorf("query to assign worker failed: %w", err)
}

semaphore, err := s.queries.UpdateWorkerSemaphore(ctx, tx, dbsqlc.UpdateWorkerSemaphoreParams{
Expand All @@ -328,14 +356,14 @@ func (s *stepRunEngineRepository) AssignStepRunToWorker(ctx context.Context, ste
Tenantid: stepRun.StepRun.TenantId,
})

isNoRowsErr = err != nil && errors.Is(err, pgx.ErrNoRows)
isNoRowsErr := err != nil && errors.Is(err, pgx.ErrNoRows)

if err != nil && !isNoRowsErr {
return "", "", fmt.Errorf("could not upsert new worker semaphore: %w", err)
return nil, fmt.Errorf("could not upsert new worker semaphore: %w", err)
}

if !isNoRowsErr && semaphore.Slots < 0 {
return "", "", repository.ErrNoWorkerAvailable
return nil, repository.ErrNoWorkerAvailable
}

rateLimits, err := s.queries.UpdateStepRateLimits(ctx, tx, dbsqlc.UpdateStepRateLimitsParams{
Expand All @@ -344,19 +372,50 @@ func (s *stepRunEngineRepository) AssignStepRunToWorker(ctx context.Context, ste
})

if err != nil && !errors.Is(err, pgx.ErrNoRows) {
return "", "", fmt.Errorf("could not update rate limit: %w", err)
return nil, fmt.Errorf("could not update rate limit: %w", err)
}

if len(rateLimits) > 0 {
for _, rateLimit := range rateLimits {
if rateLimit.Value < 0 {
return "", "", repository.ErrRateLimitExceeded
return nil, repository.ErrRateLimitExceeded
}
}
}

err = tx.Commit(ctx)

if err != nil {
return nil, err
}

return assigned, nil
}

func (s *stepRunEngineRepository) AssignStepRunToWorker(ctx context.Context, stepRun *dbsqlc.GetStepRunForEngineRow) (string, string, error) {

err := s.incrementWorkerSemaphore(ctx, stepRun)

if err != nil {
return "", "", err
}

var assigned *dbsqlc.AssignStepRunToWorkerRow

err = unassignedRetry(s.l, func() (err error) {
assigned, err = s.assignStepRunToWorkerAttempt(ctx, stepRun)

if err != nil {
if errors.Is(err, repository.ErrNoWorkerAvailable) {
return err
}

return fmt.Errorf("could not assign worker: %w", err)
}

return nil
})

if err != nil {
return "", "", err
}
Expand All @@ -382,7 +441,7 @@ func (s *stepRunEngineRepository) UpdateStepRun(ctx context.Context, tenantId, s

var stepRun *dbsqlc.GetStepRunForEngineRow

err = retrier(s.l, func() error {
err = deadlockRetry(s.l, func() error {
tx, err := s.pool.Begin(ctx)

if err != nil {
Expand All @@ -406,7 +465,7 @@ func (s *stepRunEngineRepository) UpdateStepRun(ctx context.Context, tenantId, s
return nil, nil, err
}

err = retrier(s.l, func() error {
err = deadlockRetry(s.l, func() error {
tx, err := s.pool.Begin(ctx)

if err != nil {
Expand Down Expand Up @@ -542,7 +601,7 @@ func (s *stepRunEngineRepository) QueueStepRun(ctx context.Context, tenantId, st
var stepRun *dbsqlc.GetStepRunForEngineRow
var isNotPending bool

retrierErr := retrier(s.l, func() error {
retrierErr := deadlockRetry(s.l, func() error {
tx, err := s.pool.Begin(ctx)

if err != nil {
Expand Down Expand Up @@ -594,7 +653,7 @@ func (s *stepRunEngineRepository) QueueStepRun(ctx context.Context, tenantId, st
return nil, repository.ErrStepRunIsNotPending
}

retrierExtraErr := retrier(s.l, func() error {
retrierExtraErr := deadlockRetry(s.l, func() error {
tx, err := s.pool.Begin(ctx)

if err != nil {
Expand Down

0 comments on commit 4c39dce

Please sign in to comment.