Skip to content

Commit

Permalink
working prorotype
Browse files Browse the repository at this point in the history
  • Loading branch information
filkeith committed Oct 14, 2024
1 parent 8dcf706 commit 8df3cf9
Show file tree
Hide file tree
Showing 3 changed files with 8 additions and 12 deletions.
2 changes: 1 addition & 1 deletion handler_backup_records.go
Original file line number Diff line number Diff line change
Expand Up @@ -314,6 +314,6 @@ func (bh *backupRecordsHandler) recordReaderConfigForNode(
},
bh.scanLimiter,
bh.config.NoTTLOnly,
1000,
100,
)
}
12 changes: 4 additions & 8 deletions io/aerospike/custom_record_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ type scanResult struct {

func newScanResult(bufferSize int64) *scanResult {
return &scanResult{
records: make([]*a.Result, bufferSize),
records: make([]*a.Result, 0, bufferSize),
}
}

Expand Down Expand Up @@ -54,8 +54,6 @@ func (r *RecordReader) CustomRead() (*models.Token, error) {

// startCustomScan starts the scan for the RecordReader only for state save!
func (r *RecordReader) startCustomScan() (*customRecordSets, error) {
fmt.Println("START CUSTOM SCAN")

scanPolicy := *r.config.scanPolicy
scanPolicy.FilterExpression = getScanExpression(r.config.timeBounds, r.config.noTTLOnly)

Expand Down Expand Up @@ -100,17 +98,16 @@ func (r *RecordReader) scanPartitions(scanPolicy *a.ScanPolicy,
) ([]*scanResult, error) {
results := make([]*scanResult, 0)
scanPolicy.MaxRecords = r.config.pageSize
pf := *partitionFilter

for {
curFilter, err := models.NewPartitionFilterSerialized(&pf)
curFilter, err := models.NewPartitionFilterSerialized(partitionFilter)
if err != nil {
return nil, fmt.Errorf("failed to serialize partition filter: %w", err)
}

recSet, aErr := r.client.ScanPartitions(
scanPolicy,
&pf,
partitionFilter,
r.config.namespace,
set,
r.config.binList...,
Expand All @@ -126,7 +123,6 @@ func (r *RecordReader) scanPartitions(scanPolicy *a.ScanPolicy,
for res := range recSet.Results() {
counter++
if res.Err != nil {
fmt.Println("ERROR:", res.Err)
continue
} else {
result.records = append(result.records, res)
Expand All @@ -144,6 +140,6 @@ func (r *RecordReader) scanPartitions(scanPolicy *a.ScanPolicy,
break
}
}
fmt.Println("end scan")

return results, nil
}
6 changes: 3 additions & 3 deletions io/aerospike/custom_record_set.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package aerospike

import (
"fmt"
"log/slog"

a "github.com/aerospike/aerospike-client-go/v7"
Expand Down Expand Up @@ -49,11 +48,12 @@ func streamData(data []*scanResult, out chan *customResult) {
}

for _, d := range data {

for _, n := range d.records {
fmt.Println(n)

out <- newCustomResult(n, &d.Filter)
}
}
fmt.Println("closing")

close(out)
}

0 comments on commit 8df3cf9

Please sign in to comment.