Skip to content

Commit

Permalink
Add retry (#113)
Browse files Browse the repository at this point in the history
Signed-off-by: Jarema <[email protected]>
  • Loading branch information
Jarema authored Feb 21, 2023
1 parent f9408bd commit 4bd2b10
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 30 deletions.
43 changes: 25 additions & 18 deletions controllers/jetstream/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
k8sapi "k8s.io/api/core/v1"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
k8smeta "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/util/retry"
)

func (c *Controller) runConsumerQueue() {
Expand Down Expand Up @@ -434,15 +435,18 @@ func setConsumerOK(ctx context.Context, s *apis.Consumer, i typed.ConsumerInterf
Message: "Consumer successfully created",
})

ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()

res, err := i.UpdateStatus(ctx, sc, k8smeta.UpdateOptions{})
if err != nil {
return nil, fmt.Errorf("failed to set consumer %q status: %w", s.Spec.DurableName, err)
}

return res, nil
var res *apis.Consumer
err := retry.RetryOnConflict(retry.DefaultRetry, func() error {
var err error
ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()
res, err = i.UpdateStatus(ctx, sc, k8smeta.UpdateOptions{})
if err != nil {
return fmt.Errorf("failed to set consumer %q status: %w", s.Spec.DurableName, err)
}
return nil
})
return res, err
}

func setConsumerErrored(ctx context.Context, s *apis.Consumer, sif typed.ConsumerInterface, err error) (*apis.Consumer, error) {
Expand All @@ -459,13 +463,16 @@ func setConsumerErrored(ctx context.Context, s *apis.Consumer, sif typed.Consume
Message: err.Error(),
})

ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()

res, err := sif.UpdateStatus(ctx, sc, k8smeta.UpdateOptions{})
if err != nil {
return nil, fmt.Errorf("failed to set consumer errored status: %w", err)
}

return res, nil
var res *apis.Consumer
err = retry.RetryOnConflict(retry.DefaultRetry, func() error {
var err error
ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()
res, err = sif.UpdateStatus(ctx, sc, k8smeta.UpdateOptions{})
if err != nil {
return fmt.Errorf("failed to set consumer errored status: %w", err)
}
return nil
})
return res, err
}
37 changes: 25 additions & 12 deletions controllers/jetstream/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
k8sapi "k8s.io/api/core/v1"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
k8smeta "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/util/retry"
)

func (c *Controller) runStreamQueue() {
Expand Down Expand Up @@ -517,12 +518,18 @@ func setStreamErrored(ctx context.Context, s *apis.Stream, sif typed.StreamInter
ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()

res, err := sif.UpdateStatus(ctx, sc, k8smeta.UpdateOptions{})
if err != nil {
return nil, fmt.Errorf("failed to set stream errored status: %w", err)
}

return res, nil
var res *apis.Stream
err = retry.RetryOnConflict(retry.DefaultRetry, func() error {
var err error
ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()
res, err = sif.UpdateStatus(ctx, sc, k8smeta.UpdateOptions{})
if err != nil {
return fmt.Errorf("failed to set stream errored status: %w", err)
}
return nil
})
return res, err
}

func setStreamOK(ctx context.Context, s *apis.Stream, i typed.StreamInterface) (*apis.Stream, error) {
Expand All @@ -540,12 +547,18 @@ func setStreamOK(ctx context.Context, s *apis.Stream, i typed.StreamInterface) (
ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()

res, err := i.UpdateStatus(ctx, sc, k8smeta.UpdateOptions{})
if err != nil {
return nil, fmt.Errorf("failed to set stream %q status: %w", s.Spec.Name, err)
}

return res, nil
var res *apis.Stream
err := retry.RetryOnConflict(retry.DefaultRetry, func() error {
var err error
ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()
res, err = i.UpdateStatus(ctx, sc, k8smeta.UpdateOptions{})
if err != nil {
return fmt.Errorf("failed to set stream %q status: %w", s.Spec.Name, err)
}
return nil
})
return res, err
}

func getMaxAge(v string) (time.Duration, error) {
Expand Down

0 comments on commit 4bd2b10

Please sign in to comment.