Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add new 'WithErrors' constructors #28

Merged
merged 2 commits into from
Apr 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions lib.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ func NewSenderWithNewDeviceWithErrors(dev *api.DeviceCreate, cfg *Config) (*Send
return cfg.startWithInternalErrors(client, d)
}

// NewSenderWithNewSiteAndDevice creates a new device and site then returns a flow Sender for that newly created device
func NewSenderWithNewSiteAndDevice(siteAndDevice *api.SiteAndDeviceCreate, errors chan<- error, cfg *Config) (*Sender, error) {
client := cfg.client()
d, err := client.CreateDeviceAndSite(siteAndDevice)
Expand All @@ -114,11 +115,31 @@ func NewSenderWithNewSiteAndDevice(siteAndDevice *api.SiteAndDeviceCreate, error
return cfg.start(client, d, errors)
}

// NewSenderWithNewSiteAndDeviceWithErrors is the same as NewSenderWithNewSiteAndDeviceWithErrors except rather than
// passing in a channel to receive errors, a channel is returned by the function. The channel is closed after Sender.Stop is called
// and all flow has been dispatched
func NewSenderWithNewSiteAndDeviceWithErrors(siteAndDevice *api.SiteAndDeviceCreate, cfg *Config) (*Sender, <-chan error, error) {
client := cfg.client()
d, err := client.CreateDeviceAndSite(siteAndDevice)
if err != nil {
return nil, nil, err
}

return cfg.startWithInternalErrors(client, d)
}

// NewSenderFromDevice returns a Sender for an existing Device
func NewSenderFromDevice(d *api.Device, errors chan<- error, cfg *Config) (*Sender, error) {
client := cfg.client()
return cfg.start(client, d, errors)
}

// NewSenderFromDeviceWithErrors returns a Sender and an error channel for an existing Device
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would be nice if the other public constructors also got this documentation

func NewSenderFromDeviceWithErrors(d *api.Device, cfg *Config) (*Sender, <-chan error, error) {
client := cfg.client()
return cfg.startWithInternalErrors(client, d)
}

func lookupdev(dev *api.Device, err error) (*api.Device, error) {
if err != nil {
switch {
Expand Down
118 changes: 118 additions & 0 deletions lib_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/kentik/libkflow/flow"
metrics2 "github.com/kentik/libkflow/metrics"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/goleak"
)

Expand Down Expand Up @@ -281,6 +282,123 @@ func TestNewSenderWithDeviceNameLeaks(t *testing.T) {
goleak.VerifyNone(t, ignore)
}

func TestNewSenderFromDeviceWithErrors(t *testing.T) {
client, server, device, err := test.NewClientServer()
if err != nil {
t.Fatal(err)
}

flowServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(400)
}))

apiurl = server.URL(test.API)
flowurl = server.URL(flowServer.URL)
metricsurl = server.URL(test.TSDB)

email = client.Email
token = client.Token

config := libkflow.NewConfig(email, token, "test", "0.0.1")
config.OverrideURLs(apiurl, flowurl, metricsurl)

l := stubLeveledLogger{}

registry := metrics.NewRegistry()
metrics2.StartWithSetConf(registry, &l, metricsurl.String(), email, token, "chf")
config.WithRegistry(registry)

s, errors, err := libkflow.NewSenderFromDeviceWithErrors(device, config)

require.NotNil(t, s)
require.Nil(t, err)

errorsFromChan := make([]error, 0)

wg := sync.WaitGroup{}
wg.Add(1)
go func() {
for err := range errors {
errorsFromChan = append(errorsFromChan, err)
}
wg.Done()
}()

s.Send(&flow.Flow{
TimestampNano: time.Now().UnixNano(),
})

s.Stop(time.Second)

wg.Wait()

assert.Len(t, errorsFromChan, 1)
}

func TestNewSenderWithNewSiteAndDeviceWithErrors(t *testing.T) {
client, server, device, err := test.NewClientServer()
if err != nil {
t.Fatal(err)
}

flowServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(400)
}))

apiurl = server.URL(test.API)
flowurl = server.URL(flowServer.URL)
metricsurl = server.URL(test.TSDB)

email = client.Email
token = client.Token

config := libkflow.NewConfig(email, token, "test", "0.0.1")
config.OverrideURLs(apiurl, flowurl, metricsurl)

l := stubLeveledLogger{}

registry := metrics.NewRegistry()
metrics2.StartWithSetConf(registry, &l, metricsurl.String(), email, token, "chf")
config.WithRegistry(registry)

s, errors, err := libkflow.NewSenderWithNewSiteAndDeviceWithErrors(&api.SiteAndDeviceCreate{
Site: &api.SiteCreate{
Title: "",
City: "",
Region: "",
Country: "",
},
Device: &api.DeviceCreate{
AllowNoIP: true,
Name: device.Name,
},
}, config)

require.NotNil(t, s)
require.Nil(t, err)

errorsFromChan := make([]error, 0)

wg := sync.WaitGroup{}
wg.Add(1)
go func() {
for err := range errors {
errorsFromChan = append(errorsFromChan, err)
}
wg.Done()
}()

s.Send(&flow.Flow{
TimestampNano: time.Now().UnixNano(),
})

s.Stop(time.Second)

wg.Wait()

assert.Len(t, errorsFromChan, 1)
}

func TestNewSenderFromDevice(t *testing.T) {
dev, assert := setupLibTest(t)

Expand Down
28 changes: 28 additions & 0 deletions vendor/github.com/stretchr/testify/require/doc.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

16 changes: 16 additions & 0 deletions vendor/github.com/stretchr/testify/require/forward_requirements.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading
Loading