diff --git a/handler_backup_records.go b/handler_backup_records.go index 5efb3f6a..7b3dfe3a 100644 --- a/handler_backup_records.go +++ b/handler_backup_records.go @@ -314,6 +314,6 @@ func (bh *backupRecordsHandler) recordReaderConfigForNode( }, bh.scanLimiter, bh.config.NoTTLOnly, - 1000, + 100, ) } diff --git a/io/aerospike/custom_record_reader.go b/io/aerospike/custom_record_reader.go index b2abdfb5..ad1d25b7 100644 --- a/io/aerospike/custom_record_reader.go +++ b/io/aerospike/custom_record_reader.go @@ -15,7 +15,7 @@ type scanResult struct { func newScanResult(bufferSize int64) *scanResult { return &scanResult{ - records: make([]*a.Result, bufferSize), + records: make([]*a.Result, 0, bufferSize), } } @@ -54,8 +54,6 @@ func (r *RecordReader) CustomRead() (*models.Token, error) { // startCustomScan starts the scan for the RecordReader only for state save! func (r *RecordReader) startCustomScan() (*customRecordSets, error) { - fmt.Println("START CUSTOM SCAN") - scanPolicy := *r.config.scanPolicy scanPolicy.FilterExpression = getScanExpression(r.config.timeBounds, r.config.noTTLOnly) @@ -100,17 +98,16 @@ func (r *RecordReader) scanPartitions(scanPolicy *a.ScanPolicy, ) ([]*scanResult, error) { results := make([]*scanResult, 0) scanPolicy.MaxRecords = r.config.pageSize - pf := *partitionFilter for { - curFilter, err := models.NewPartitionFilterSerialized(&pf) + curFilter, err := models.NewPartitionFilterSerialized(partitionFilter) if err != nil { return nil, fmt.Errorf("failed to serialize partition filter: %w", err) } recSet, aErr := r.client.ScanPartitions( scanPolicy, - &pf, + partitionFilter, r.config.namespace, set, r.config.binList..., @@ -126,7 +123,6 @@ func (r *RecordReader) scanPartitions(scanPolicy *a.ScanPolicy, for res := range recSet.Results() { counter++ if res.Err != nil { - fmt.Println("ERROR:", res.Err) continue } else { result.records = append(result.records, res) @@ -144,6 +140,6 @@ func (r *RecordReader) scanPartitions(scanPolicy *a.ScanPolicy, break } } - fmt.Println("end scan") + return results, nil } diff --git a/io/aerospike/custom_record_set.go b/io/aerospike/custom_record_set.go index 8a284984..b85580b0 100644 --- a/io/aerospike/custom_record_set.go +++ b/io/aerospike/custom_record_set.go @@ -1,7 +1,6 @@ package aerospike import ( - "fmt" "log/slog" a "github.com/aerospike/aerospike-client-go/v7" @@ -49,11 +48,12 @@ func streamData(data []*scanResult, out chan *customResult) { } for _, d := range data { + for _, n := range d.records { - fmt.Println(n) + out <- newCustomResult(n, &d.Filter) } } - fmt.Println("closing") + close(out) }