Skip to content

Commit

Permalink
rename to retry policy
Browse files Browse the repository at this point in the history
  • Loading branch information
korotkov-aerospike committed Aug 15, 2024
1 parent d99ea42 commit 7a6de88
Show file tree
Hide file tree
Showing 6 changed files with 22 additions and 20 deletions.
2 changes: 1 addition & 1 deletion config_restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ type RestoreConfig struct {
CompressionPolicy *CompressionPolicy
// Configuration of retries for each restore write operation.
// If nil, no retries will be performed.
Retry *models.RetryPolicy
RetryPolicy *models.RetryPolicy
// Secret agent config.
SecretAgentConfig *SecretAgentConfig
// The sets to restore (optional, given an empty list, all sets will be restored).
Expand Down
2 changes: 1 addition & 1 deletion handler_restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,7 @@ func (rh *RestoreHandler) runRestoreBatch(
rh.logger,
useBatchWrites,
rh.config.BatchSize,
rh.config.Retry,
rh.config.RetryPolicy,
)

statsWriter := newWriterWithTokenStats(writer, &rh.stats, rh.logger)
Expand Down
15 changes: 8 additions & 7 deletions io/aerospike/record_batch_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ type batchRecordWriter struct {
writePolicy *a.WritePolicy
stats *models.RestoreStats
logger *slog.Logger
retry *models.RetryPolicy
retryPolicy *models.RetryPolicy
operationBuffer []a.BatchRecordIfc
batchSize int
}
Expand Down Expand Up @@ -93,18 +93,19 @@ func (rw *batchRecordWriter) flushBuffer() error {
}

func (rw *batchRecordWriter) executeBatchOperation() error {
var err a.Error
var (
err a.Error
attempt int
)

var attempt int

for attemptsLeft(rw.retry, attempt) {
for attemptsLeft(rw.retryPolicy, attempt) {
err = rw.asc.BatchOperate(nil, rw.operationBuffer)
if err == nil || isAcceptableError(err) {
return nil
}

if shouldRetry(err) {
sleep(rw.retry, attempt)
sleep(rw.retryPolicy, attempt)

attempt++

Expand All @@ -114,7 +115,7 @@ func (rw *batchRecordWriter) executeBatchOperation() error {
return err
}

return fmt.Errorf("max retry reached: %w", err)
return fmt.Errorf("max retryPolicy reached: %w", err)
}

func (rw *batchRecordWriter) processOperationResults() {
Expand Down
13 changes: 7 additions & 6 deletions io/aerospike/record_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ type singleRecordWriter struct {
asc dbWriter
writePolicy *a.WritePolicy
stats *models.RestoreStats
retry *models.RetryPolicy
retryPolicy *models.RetryPolicy
}

func (rw *singleRecordWriter) writeRecord(record *models.Record) error {
Expand All @@ -46,11 +46,12 @@ func (rw *singleRecordWriter) writeRecord(record *models.Record) error {
}

func (rw *singleRecordWriter) executeWrite(writePolicy *a.WritePolicy, record *models.Record) error {
var aerr a.Error
var (
aerr a.Error
attempt int
)

var attempt int

for attemptsLeft(rw.retry, attempt) {
for attemptsLeft(rw.retryPolicy, attempt) {
aerr = rw.asc.Put(writePolicy, record.Key, record.Bins)
if aerr == nil {
rw.stats.IncrRecordsInserted()
Expand All @@ -70,7 +71,7 @@ func (rw *singleRecordWriter) executeWrite(writePolicy *a.WritePolicy, record *m
}

if shouldRetry(aerr) {
sleep(rw.retry, attempt)
sleep(rw.retryPolicy, attempt)

attempt++

Expand Down
10 changes: 5 additions & 5 deletions io/aerospike/restore_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ type restoreWriter struct {

// NewRestoreWriter creates a new restoreWriter.
func NewRestoreWriter(asc dbWriter, writePolicy *a.WritePolicy, stats *models.RestoreStats,
logger *slog.Logger, useBatchWrites bool, batchSize int, maxRetries *models.RetryPolicy,
logger *slog.Logger, useBatchWrites bool, batchSize int, retryPolicy *models.RetryPolicy,
) pipeline.DataWriter[*models.Token] {
logger = logging.WithWriter(logger, uuid.NewString(), logging.WriterTypeRestore)
logger.Debug("created new restore writer")
Expand All @@ -61,7 +61,7 @@ func NewRestoreWriter(asc dbWriter, writePolicy *a.WritePolicy, stats *models.Re
writePolicy: writePolicy,
logger: logger,
},
recordWriter: newRecordWriter(asc, writePolicy, stats, logger, useBatchWrites, batchSize, maxRetries),
recordWriter: newRecordWriter(asc, writePolicy, stats, logger, useBatchWrites, batchSize, retryPolicy),
logger: logger,
}
}
Expand All @@ -71,7 +71,7 @@ func newRecordWriter(asc dbWriter, writePolicy *a.WritePolicy,
logger *slog.Logger,
useBatchWrites bool,
batchSize int,
retry *models.RetryPolicy,
retryPolicy *models.RetryPolicy,
) recordWriter {
if useBatchWrites {
return &batchRecordWriter{
Expand All @@ -80,15 +80,15 @@ func newRecordWriter(asc dbWriter, writePolicy *a.WritePolicy,
stats: stats,
logger: logger,
batchSize: batchSize,
retry: retry,
retryPolicy: retryPolicy,
}
}

return &singleRecordWriter{
asc: asc,
writePolicy: writePolicy,
stats: stats,
retry: retry,
retryPolicy: retryPolicy,
}
}

Expand Down
File renamed without changes.

0 comments on commit 7a6de88

Please sign in to comment.