Skip to content

Commit

Permalink
add test for aggregator
Browse files Browse the repository at this point in the history
PDOK-16462
  • Loading branch information
roelarents committed May 21, 2024
1 parent 2050bf1 commit 9eea8c4
Show file tree
Hide file tree
Showing 6 changed files with 164 additions and 16 deletions.
2 changes: 1 addition & 1 deletion cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ func createAggregatorFromCliCtx(c *cli.Context) (*agg.Aggregator, error) {
duReader,
config.Labels,
config.Rules,
), nil
)
}

func loadConfig(configFile string) (*Config, error) {
Expand Down
4 changes: 2 additions & 2 deletions example/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ labels:
type: other
tenant: other
rules:
- pattern: ^(?P<type>Y2U0ZWI1Zjc3OD|NTg0NmRjZmUwNW|YTAzZTI1ZWE2NT)/.+
- pattern: ^(?P<type>Y2U0ZWI1Zjc3OD|NTg0NmRjZmUwNW|YTAzZTI1ZWE2NT)(/|$)
labels:
tenant: special
- pattern: ^(?P<type>[^/]+)/(?P<tenant>[^/]+)/.+
- pattern: ^(?P<type>[^/]+)/(?P<tenant>[^/]+)
2 changes: 1 addition & 1 deletion example/pdok-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,5 @@ labels:
owner: other
dataset: other
rules:
- pattern: ^(?P<container>argo-artifacts|container-logs|mimir-blocks|elasticsearch-snapshots)/
- pattern: ^(?P<container>argo-artifacts|container-logs|mimir-blocks|elasticsearch-snapshots)(/|$)
- pattern: ^(?P<container>[^/]+)/(?P<owner>[^/]+)/(?P<dataset>[^/]+)
7 changes: 5 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,13 @@ go 1.22
require (
github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v1.3.2
github.com/go-co-op/gocron/v2 v2.5.0
github.com/google/uuid v1.6.0
github.com/iancoleman/strcase v0.3.0
github.com/jmoiron/sqlx v1.4.0
github.com/marcboeker/go-duckdb v1.6.3
github.com/oriser/regroup v0.0.0-20230527212431-1b00c9bdbc5b
github.com/prometheus/client_golang v1.19.1
github.com/stretchr/testify v1.9.0
github.com/urfave/cli/v2 v2.27.2
golang.org/x/exp v0.0.0-20240416160154-fe59bbe5cc7f
gopkg.in/yaml.v2 v2.4.0
Expand All @@ -22,20 +24,20 @@ require (
github.com/beorn7/perks v1.0.1 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/cpuguy83/go-md2man/v2 v2.0.4 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/goccy/go-json v0.10.2 // indirect
github.com/google/flatbuffers v23.5.26+incompatible // indirect
github.com/google/uuid v1.6.0 // indirect
github.com/jonboulle/clockwork v0.4.0 // indirect
github.com/klauspost/compress v1.16.7 // indirect
github.com/klauspost/cpuid/v2 v2.2.5 // indirect
github.com/mitchellh/mapstructure v1.5.0 // indirect
github.com/pierrec/lz4/v4 v4.1.18 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/prometheus/client_model v0.5.0 // indirect
github.com/prometheus/common v0.48.0 // indirect
github.com/prometheus/procfs v0.12.0 // indirect
github.com/robfig/cron/v3 v3.0.1 // indirect
github.com/russross/blackfriday/v2 v2.1.0 // indirect
github.com/tobshub/go-sortedmap v1.0.3 // indirect
github.com/xrash/smetrics v0.0.0-20240312152122-5f08fbb34913 // indirect
github.com/zeebo/xxh3 v1.0.2 // indirect
golang.org/x/mod v0.17.0 // indirect
Expand All @@ -46,4 +48,5 @@ require (
golang.org/x/tools v0.20.0 // indirect
golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 // indirect
google.golang.org/protobuf v1.33.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
26 changes: 16 additions & 10 deletions internal/agg/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package agg

import (
"encoding/json"
"errors"
"log"
"slices"
"time"
Expand Down Expand Up @@ -42,12 +43,15 @@ type Aggregator struct {
rules []AggregationRule
}

func NewAggregator(duReader du.Reader, labels Labels, rules []AggregationRule) *Aggregator {
func NewAggregator(duReader du.Reader, labels Labels, rules []AggregationRule) (*Aggregator, error) {
if _, exists := labels[Deleted]; exists {
return nil, errors.New("cannot use deleted as a label")
}
return &Aggregator{
duReader: duReader,
labelsWithDefaults: labels,
rules: rules,
}
}, nil
}

func (a *Aggregator) GetLabelNames() []string {
Expand All @@ -63,23 +67,25 @@ func (a *Aggregator) Aggregate(previousRunDate time.Time) (aggregationResults []
return nil, runDate, err
}
if !runDate.After(previousRunDate) {
return nil, runDate, err
return nil, runDate, nil
}

intermediateResults := make(map[string]du.StorageUsage)
i := 0
for rowsCh != nil && errCh != nil {
select {
case err, open := <-errCh:
if !open {
case err, ok := <-errCh:
if !ok {
errCh = nil
continue
}
if err != nil {
return nil, runDate, err
}
case row, open := <-rowsCh:
if !open {
case row, ok := <-rowsCh:
if !ok {
rowsCh = nil
continue
}
aggregationGroup := a.applyRulesToAggregate(row)
intermediateResults[marshalAggregationGroup(aggregationGroup)] += row.Bytes
Expand All @@ -89,12 +95,12 @@ func (a *Aggregator) Aggregate(previousRunDate time.Time) (aggregationResults []
i++
}
}
log.Printf("done aggregating/querying blob inventory, %d du rows processed", i)
log.Printf("done aggregating blob inventory, %d du rows processed", i)

return intermediateResultsToAggregationResults(intermediateResults), runDate, nil
}

// The key in intermediate results of aggregateRun is a JSON representation of AggregationGroup
// The key in intermediate results of Aggregator.Aggregate is a JSON representation of AggregationGroup
// because a map is not a comparable type.
// Property order in the JSON is predictable/constant.
func marshalAggregationGroup(aggregationGroup AggregationGroup) string {
Expand Down Expand Up @@ -146,7 +152,7 @@ func (a *Aggregator) applyRulesToAggregate(row du.Row) AggregationGroup {
}

func (a *Aggregator) applyRuleDefaults(labelsFromPattern Labels, rule AggregationRule) Labels {
labels := a.labelsWithDefaults
labels := maps.Clone(a.labelsWithDefaults)
for label, defaultVal := range labels {
labels[label] = defaultStr(
labelsFromPattern[label], // first use a match group
Expand Down
139 changes: 139 additions & 0 deletions internal/agg/aggregator_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
package agg

import (
"errors"
"reflect"
"testing"
"time"

"github.com/PDOK/azure-storage-usage-exporter/internal/du"
"github.com/stretchr/testify/require"
)

func TestAggregator_Aggregate(t *testing.T) {
someFixedTime, _ := time.Parse(time.DateOnly, "2024-04-20")
type fields struct {
duReader du.Reader
labelsWithDefaults Labels
rules []AggregationRule
}
type args struct {
previousRunDate time.Time
}
tests := []struct {
name string
fields fields
args args
wantAggregationResults []AggregationResult
wantRunDate time.Time
wantErr bool
}{{
name: "basic",
fields: fields{
duReader: &fakeDuReader{
runDate: someFixedTime,
rows: []du.Row{
{Dir: "dir1/dir2", Deleted: boolPtr(false), Bytes: 100, Count: 12},
{Dir: "unallocatable", Deleted: boolPtr(false), Bytes: 666, Count: 666},
{Dir: "dir1/dir2", Deleted: boolPtr(true), Bytes: 200, Count: 30},
{Dir: "special/delivery", Deleted: boolPtr(false), Bytes: 321, Count: 1},
},
},
labelsWithDefaults: Labels{
"level1": "default1",
"level2": "default2",
},
rules: []AggregationRule{
{Pattern: NewReGroup(`^(?P<level1>special)(/|$)`), StaticLabels: Labels{"level2": "sauce"}},
{Pattern: NewReGroup(`^(?P<level1>[^/]+)/(?P<level2>[^/]+)`), StaticLabels: Labels{}},
},
},
args: args{
previousRunDate: someFixedTime.Add(-24 * time.Hour),
},
wantAggregationResults: []AggregationResult{
{AggregationGroup: AggregationGroup{Labels: Labels{"level1": "default1", "level2": "default2"}, Deleted: false}, StorageUsage: 666},
{AggregationGroup: AggregationGroup{Labels: Labels{"level1": "special", "level2": "sauce"}, Deleted: false}, StorageUsage: 321},
{AggregationGroup: AggregationGroup{Labels: Labels{"level1": "dir1", "level2": "dir2"}, Deleted: true}, StorageUsage: 200},
{AggregationGroup: AggregationGroup{Labels: Labels{"level1": "dir1", "level2": "dir2"}, Deleted: false}, StorageUsage: 100},
},
wantRunDate: someFixedTime,
wantErr: false,
}, {
name: "error starting to read",
fields: fields{
duReader: &fakeDuReader{
runDate: someFixedTime,
errorImmediately: true,
},
},
args: args{
previousRunDate: someFixedTime.Add(-24 * time.Hour),
},
wantRunDate: time.Time{},
wantErr: true,
}, {
name: "error while reading",
fields: fields{
duReader: &fakeDuReader{
runDate: someFixedTime,
errorInChannel: true,
},
},
args: args{
previousRunDate: someFixedTime.Add(-24 * time.Hour),
},
wantRunDate: someFixedTime,
wantErr: true,
}}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
a, err := NewAggregator(tt.fields.duReader, tt.fields.labelsWithDefaults, tt.fields.rules)
require.Nil(t, err)
gotAggregationResults, gotRunDate, err := a.Aggregate(tt.args.previousRunDate)
if (err != nil) != tt.wantErr {
t.Errorf("Aggregate() error = %v, wantErr %v", err, tt.wantErr)
return
}
if !reflect.DeepEqual(gotAggregationResults, tt.wantAggregationResults) {
t.Errorf("Aggregate() gotAggregationResults = %v, want %v", gotAggregationResults, tt.wantAggregationResults)
}
if !reflect.DeepEqual(gotRunDate, tt.wantRunDate) {
t.Errorf("Aggregate() gotRunDate = %v, want %v", gotRunDate, tt.wantRunDate)
}
})
}
}

type fakeDuReader struct {
runDate time.Time
rows []du.Row
errorImmediately bool
errorInChannel bool
}

func (f *fakeDuReader) Read(previousRunDate time.Time) (time.Time, <-chan du.Row, <-chan error, error) {
if f.errorImmediately {
return time.Time{}, nil, nil, errors.New("error starting to read")
}
if !f.runDate.After(previousRunDate) {
return f.runDate, nil, nil, errors.New("last run date is not after previous run date")
}
rowsCh := make(chan du.Row)
errCh := make(chan error)
go func() {
for _, row := range f.rows {
rowsCh <- row
}
if f.errorInChannel {
errCh <- errors.New("error while reading")
}
close(rowsCh)
close(errCh)
}()
return f.runDate, rowsCh, errCh, nil
}

func boolPtr(b bool) *bool {
return &b
}

0 comments on commit 9eea8c4

Please sign in to comment.