Skip to content

Commit

Permalink
FMWK-570-backup-restore-state
Browse files Browse the repository at this point in the history
- added nil checks
  • Loading branch information
filkeith committed Oct 10, 2024
1 parent 2d87705 commit edfcae2
Show file tree
Hide file tree
Showing 9 changed files with 67 additions and 13 deletions.
25 changes: 21 additions & 4 deletions handler_backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,11 @@ func (bh *BackupHandler) makeWriteWorkers(
writeWorkers := make([]pipeline.Worker[*models.Token], len(backupWriters))

for i, w := range backupWriters {
var dataWriter pipeline.DataWriter[*models.Token] = newTokenWriter(bh.encoder, w, bh.logger, bh.state.RecordsChan)
var dataWriter pipeline.DataWriter[*models.Token] = newTokenWriter(bh.encoder, w, bh.logger, nil)
if bh.state != nil {
dataWriter = newTokenWriter(bh.encoder, w, bh.logger, bh.state.RecordsChan)
}

dataWriter = newWriterWithTokenStats(dataWriter, &bh.stats, bh.logger)
writeWorkers[i] = pipeline.NewWriteWorker(dataWriter, bh.limiter)
}
Expand Down Expand Up @@ -313,7 +317,11 @@ func (bh *BackupHandler) newWriter(ctx context.Context) (io.WriteCloser, error)
}

func (bh *BackupHandler) newConfiguredWriter(ctx context.Context) (io.WriteCloser, error) {
filename := bh.encoder.GenerateFilename(bh.state.getFileSuffix())
suffix := ""
if bh.state != nil {
suffix = bh.state.getFileSuffix()
}
filename := bh.encoder.GenerateFilename(suffix)

Check failure on line 324 in handler_backup.go

View workflow job for this annotation

GitHub Actions / lint

assignments should only be cuddled with other assignments (wsl)

Check failure on line 324 in handler_backup.go

View workflow job for this annotation

GitHub Actions / lint

assignments should only be cuddled with other assignments (wsl)

storageWriter, err := bh.writer.NewWriter(ctx, filename)
if err != nil {
Expand Down Expand Up @@ -437,7 +445,11 @@ func (bh *BackupHandler) backupSIndexes(
reader := aerospike.NewSIndexReader(bh.infoClient, bh.config.Namespace, bh.logger)
sindexReadWorker := pipeline.NewReadWorker[*models.Token](reader)

sindexWriter := pipeline.DataWriter[*models.Token](newTokenWriter(bh.encoder, writer, bh.logger, bh.state.RecordsChan))
sindexWriter := pipeline.DataWriter[*models.Token](newTokenWriter(bh.encoder, writer, bh.logger, nil))
if bh.state != nil {
sindexWriter = pipeline.DataWriter[*models.Token](newTokenWriter(bh.encoder, writer, bh.logger, bh.state.RecordsChan))
}

sindexWriter = newWriterWithTokenStats(sindexWriter, &bh.stats, bh.logger)
sindexWriteWorker := pipeline.NewWriteWorker(sindexWriter, bh.limiter)

Expand All @@ -456,7 +468,12 @@ func (bh *BackupHandler) backupUDFs(
reader := aerospike.NewUDFReader(bh.infoClient, bh.logger)
udfReadWorker := pipeline.NewReadWorker[*models.Token](reader)

udfWriter := pipeline.DataWriter[*models.Token](newTokenWriter(bh.encoder, writer, bh.logger, bh.state.RecordsChan))
udfWriter := pipeline.DataWriter[*models.Token](newTokenWriter(bh.encoder, writer, bh.logger, nil))

if bh.state != nil {
udfWriter = pipeline.DataWriter[*models.Token](newTokenWriter(bh.encoder, writer, bh.logger, bh.state.RecordsChan))
}

udfWriter = newWriterWithTokenStats(udfWriter, &bh.stats, bh.logger)
udfWriteWorker := pipeline.NewWriteWorker(udfWriter, bh.limiter)

Expand Down
15 changes: 10 additions & 5 deletions io/aerospike/record_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,12 +142,17 @@ func (r *RecordReader) Read() (*models.Token, error) {
Record: res.Record,
}

pfs, err := models.NewPartitionFilterSerialized(r.config.partitionFilter)
if err != nil {
return nil, fmt.Errorf("failed to serialize partition filter: %w", err)
}
recToken := models.NewRecordToken(&rec, 0, nil)

recToken := models.NewRecordToken(&rec, 0, pfs)
// For indexes and udf, partition filter will be nil.
if r.config.partitionFilter != nil {
pfs, err := models.NewPartitionFilterSerialized(r.config.partitionFilter)
if err != nil {
return nil, fmt.Errorf("failed to serialize partition filter: %w", err)
}

recToken = models.NewRecordToken(&rec, 0, pfs)
}

return recToken, nil
}
Expand Down
10 changes: 10 additions & 0 deletions io/aws/s3/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ type options struct {
// startAfter is where you want Amazon S3 to start listing from. Amazon S3 starts
// listing after this specified key. StartAfter can be any key in the bucket.
startAfter string
// skipDirCheck if true, backup directory won't be checked.
skipDirCheck bool
}

type Opt func(*options)
Expand Down Expand Up @@ -80,3 +82,11 @@ func WithStartAfter(v string) Opt {
r.startAfter = v
}
}

// WithSkipDirCheck adds skip dir check flags.
// Which means that backup directory won't be checked for emptiness.
func WithSkipDirCheck() Opt {
return func(r *options) {
r.skipDirCheck = true
}
}
2 changes: 1 addition & 1 deletion io/aws/s3/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ func NewWriter(
return nil, fmt.Errorf("bucket does not exist or you don't have access: %w", err)
}

if w.isDir {
if w.isDir && !w.skipDirCheck {
// Check if backup dir is empty.
isEmpty, err := isEmptyDirectory(ctx, client, bucketName, w.prefix)
if err != nil {
Expand Down
10 changes: 10 additions & 0 deletions io/azure/blob/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ type options struct {
// as the value for the marker parameter in a subsequent call to request the next
// page of list items. The marker value is opaque to the client.
marker string
// skipDirCheck if true, backup directory won't be checked.
skipDirCheck bool
}

type Opt func(*options)
Expand Down Expand Up @@ -98,3 +100,11 @@ func WithMarker(v string) Opt {
r.marker = v
}
}

// WithSkipDirCheck adds skip dir check flags.
// Which means that backup directory won't be checked for emptiness.
func WithSkipDirCheck() Opt {
return func(r *options) {
r.skipDirCheck = true
}
}
2 changes: 1 addition & 1 deletion io/azure/blob/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ func NewWriter(
return nil, fmt.Errorf("unable to get container properties: %w", err)
}

if w.isDir {
if w.isDir && !w.skipDirCheck {
// Check if backup dir is empty.
isEmpty, err := isEmptyDirectory(ctx, client, containerName, prefix)
if err != nil {
Expand Down
10 changes: 10 additions & 0 deletions io/gcp/storage/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ type options struct {
// startOffset is used to filter results to objects whose names are
// lexicographically equal to or after startOffset.
startOffset string
// skipDirCheck if true, backup directory won't be checked.
skipDirCheck bool
}

type Opt func(*options)
Expand Down Expand Up @@ -82,3 +84,11 @@ func WithStartOffset(v string) Opt {
r.startOffset = v
}
}

// WithSkipDirCheck adds skip dir check flags.
// Which means that backup directory won't be checked for emptiness.
func WithSkipDirCheck() Opt {
return func(r *options) {
r.skipDirCheck = true
}
}
2 changes: 1 addition & 1 deletion io/gcp/storage/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ func NewWriter(
return nil, fmt.Errorf("failed to get bucketHandler %s attr: %w", bucketName, err)
}

if w.isDir {
if w.isDir && !w.skipDirCheck {
// Check if backup dir is empty.
isEmpty, err := isEmptyDirectory(ctx, bucketHandler, prefix)
if err != nil {
Expand Down
4 changes: 3 additions & 1 deletion writers.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,9 @@ func (w *tokenWriter) Write(v *models.Token) (int, error) {
return 0, fmt.Errorf("error encoding token: %w", err)
}

w.stateChan <- v.Filter
if w.stateChan != nil {
w.stateChan <- v.Filter
}

return w.output.Write(data)
}
Expand Down

0 comments on commit edfcae2

Please sign in to comment.