Skip to content

Commit

Permalink
Support for Senders to use a shared metrics registry and return error…
Browse files Browse the repository at this point in the history
… channel on construction (#26)

* adds support for Senders to use a shared registry which is managed externally to libkflow

* sender unregisters its own metrics

* lower goleak version

* Add functionality to return an internally managed error channel when creating a Sender

* func name changes and comments added
  • Loading branch information
ktkenny authored Jan 31, 2024
1 parent 0ed6079 commit e782ee1
Show file tree
Hide file tree
Showing 84 changed files with 13,630 additions and 75 deletions.
6 changes: 6 additions & 0 deletions api/test/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ type Server struct {
res chan *api.DNSResponse
mux *mux.Router
listener net.Listener
cron *cron.Cron
}

var (
Expand Down Expand Up @@ -109,6 +110,7 @@ func (s *Server) Serve(email, token string, dev *api.Device) error {
s.mux.HandleFunc(API+"/devices", s.wrap(s.devices))

c := cron.New()
s.cron = c
c.AddFunc("* * * * * *", func() {
flows := atomic.SwapUint64(&flowCounter, 0)
packets := atomic.SwapUint64(&packetCounter, 0)
Expand All @@ -122,6 +124,10 @@ func (s *Server) Serve(email, token string, dev *api.Device) error {
return http.Serve(s.listener, s.mux)
}

func (s *Server) Close() {
s.cron.Stop()
}

func (s *Server) URL(path string) *url.URL {
url, _ := url.Parse(fmt.Sprintf("http://%s:%d%s", s.Host, s.Port, path))
return url
Expand Down
61 changes: 44 additions & 17 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"strconv"
"time"

go_metrics "github.com/kentik/kit/go/legacy/go-metrics"
"github.com/kentik/libkflow/agg"
"github.com/kentik/libkflow/api"
"github.com/kentik/libkflow/log"
Expand All @@ -17,19 +18,21 @@ import (

// Config describes the libkflow configuration.
type Config struct {
email string
token string
capture Capture
proxy *url.URL
api *url.URL
flow *url.URL
metrics *url.URL
sample int
timeout time.Duration
retries int
logger interface{}
program string
version string
email string
token string
capture Capture
proxy *url.URL
api *url.URL
flow *url.URL
metrics *url.URL
sample int
timeout time.Duration
retries int
logger interface{}
program string
version string
registry go_metrics.Registry
useInternalErrors bool

metricsPrefix string
metricsInterval time.Duration
Expand Down Expand Up @@ -147,6 +150,11 @@ func (c *Config) SetMetricsInterval(dur time.Duration) {
c.metricsInterval = dur
}

// WithRegistry allows setting a registry which will act as a shared registry between multiple Senders.
func (c *Config) WithRegistry(registry go_metrics.Registry) {
c.registry = registry
}

func (c *Config) client() *api.Client {
return api.NewClient(api.ClientConfig{
Email: c.email,
Expand All @@ -160,22 +168,41 @@ func (c *Config) client() *api.Client {
})
}

func (c *Config) startWithInternalErrors(client *api.Client, dev *api.Device) (*Sender, <-chan error, error) {
errChan := make(chan error)
sender, err := c.start(client, dev, errChan)
if err != nil {
close(errChan)
return nil, nil, err
}

sender.useInternalErrors = true

return sender, errChan, nil
}

func (c *Config) start(client *api.Client, dev *api.Device, errors chan<- error) (*Sender, error) {
if c.metricsInterval == 0 {
c.metricsInterval = 60 * time.Second
}
metrics := c.NewMetrics(dev)
metrics.Start(c.metrics.String(), c.email, c.token, c.metricsPrefix, c.metricsInterval, c.proxy)

agg, err := agg.NewAgg(time.Second, dev.MaxFlowRate, metrics)
var senderMetrics *metrics.Metrics
if c.registry == nil {
senderMetrics = c.NewMetrics(dev)
senderMetrics.Start(c.metrics.String(), c.email, c.token, c.metricsPrefix, c.metricsInterval, c.proxy)
} else {
senderMetrics = metrics.NewWithRegistry(c.registry, dev.CompanyID, dev.ID, c.program, c.version)
}

agg, err := agg.NewAgg(time.Second, dev.MaxFlowRate, senderMetrics)
if err != nil {
return nil, fmt.Errorf("agg setup error: %s", err)
}

sender := newSender(c.flow, c.timeout)
sender.Errors = errors
sender.sample = c.sample
sender.Metrics = metrics
sender.Metrics = senderMetrics

if c.capture.Device != "" {
nif, err := net.InterfaceByName(c.capture.Device)
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ require (
github.com/robfig/cron v0.0.0-20160927164231-9585fd555638
github.com/stretchr/testify v1.8.0
github.com/tinylib/msgp v1.1.6
go.uber.org/goleak v1.1.10
zombiezen.com/go/capnproto2 v2.18.2+incompatible
)

Expand Down
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1024,6 +1024,7 @@ go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE=
go.uber.org/atomic v1.5.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ=
go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
go.uber.org/automaxprocs v1.4.0/go.mod h1:/mTEdr7LvHhs0v7mjdxDreTz1OG5zdZGqgOnhWiR/+Q=
go.uber.org/goleak v1.1.10 h1:z+mqJhf6ss6BSfSM671tgKyZBFPTTJM+HLxnhPC3wu0=
go.uber.org/goleak v1.1.10/go.mod h1:8a7PlsEVH3e/a/GLqe5IIrQx6GzcnRmZEufDUTk4A7A=
go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0=
go.uber.org/multierr v1.3.0/go.mod h1:VgVr7evmIr6uPjLBxg28wmKNXyqE9akIJ5XnfpiKl+4=
Expand Down Expand Up @@ -1086,6 +1087,7 @@ golang.org/x/lint v0.0.0-20191125180803-fdd1cda4f05f/go.mod h1:5qLYkcX4OjUUV8bRu
golang.org/x/lint v0.0.0-20200130185559-910be7a94367/go.mod h1:3xt1FjdF8hUf6vQPIChWIBhFzV8gjjsPE/fR3IyQdNY=
golang.org/x/lint v0.0.0-20200302205851-738671d3881b/go.mod h1:3xt1FjdF8hUf6vQPIChWIBhFzV8gjjsPE/fR3IyQdNY=
golang.org/x/lint v0.0.0-20201208152925-83fdc39ff7b5/go.mod h1:3xt1FjdF8hUf6vQPIChWIBhFzV8gjjsPE/fR3IyQdNY=
golang.org/x/lint v0.0.0-20210508222113-6edffad5e616 h1:VLliZ0d+/avPrXXH+OakdXhpJuEoBZuwh1m2j7U6Iug=
golang.org/x/lint v0.0.0-20210508222113-6edffad5e616/go.mod h1:3xt1FjdF8hUf6vQPIChWIBhFzV8gjjsPE/fR3IyQdNY=
golang.org/x/mobile v0.0.0-20190312151609-d3739f865fa6/go.mod h1:z+o9i4GpDbdi3rU15maQ/Ox0txvL9dWGYEHz965HBQE=
golang.org/x/mobile v0.0.0-20190719004257-d2bd2a29d028/go.mod h1:E/iHnbuqvinMTCcRqshq8CkpyQDoeVncDDYHnLhea+o=
Expand All @@ -1098,6 +1100,7 @@ golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/mod v0.4.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/mod v0.4.1/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/mod v0.4.2/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4 h1:6zppjxzCulZykYSLyVDYbneBfbaBIQPYMevg0bEwv2s=
golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4=
golang.org/x/net v0.0.0-20180530234432-1e491301e022/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
Expand Down Expand Up @@ -1380,6 +1383,7 @@ golang.org/x/tools v0.1.0/go.mod h1:xkSsbof2nBLbhDlRMhhhyNLN/zl3eTqcnHD5viDpcZ0=
golang.org/x/tools v0.1.2/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk=
golang.org/x/tools v0.1.5/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk=
golang.org/x/tools v0.1.6-0.20210726203631-07bc1bf47fb2/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk=
golang.org/x/tools v0.1.12 h1:VveCTK38A2rkS8ZqFY25HIDFscX5X9OoEhJd3quQmXU=
golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
Expand Down
24 changes: 24 additions & 0 deletions lib.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,18 @@ func NewSenderWithDeviceName(name string, errors chan<- error, cfg *Config) (*Se
return cfg.start(client, d, errors)
}

// NewSenderWithDeviceNameWithErrors creates a new flow Sender given a device name address and Config.
// The channel is closed after Sender.Stop is called and all flow has been dispatched.
func NewSenderWithDeviceNameWithErrors(name string, cfg *Config) (*Sender, <-chan error, error) {
client := cfg.client()
d, err := lookupdev(client.GetDeviceByName(name))
if err != nil {
return nil, nil, err
}

return cfg.startWithInternalErrors(client, d)
}

// NewSenderWithNewDevice creates a new device given device creation parameters,
// and then creates a new flow Sender with that device, the error channel, and
// the Config.
Expand All @@ -80,6 +92,18 @@ func NewSenderWithNewDevice(dev *api.DeviceCreate, errors chan<- error, cfg *Con
return cfg.start(client, d, errors)
}

// NewSenderWithNewDeviceWithErrors creates a new device and returns a new flow Sender and an error channel which will
// report errors generated from the Sender. The channel is closed after Sender.Stop is called and all flow has been dispatched
func NewSenderWithNewDeviceWithErrors(dev *api.DeviceCreate, cfg *Config) (*Sender, <-chan error, error) {
client := cfg.client()
d, err := client.CreateDevice(dev)
if err != nil {
return nil, nil, err
}

return cfg.startWithInternalErrors(client, d)
}

func NewSenderWithNewSiteAndDevice(siteAndDevice *api.SiteAndDeviceCreate, errors chan<- error, cfg *Config) (*Sender, error) {
client := cfg.client()
d, err := client.CreateDeviceAndSite(siteAndDevice)
Expand Down
Loading

0 comments on commit e782ee1

Please sign in to comment.