diff --git a/plugins/outputs/all/all.go b/plugins/outputs/all/all.go index 1a6c64721f262..8cc0d594108ad 100644 --- a/plugins/outputs/all/all.go +++ b/plugins/outputs/all/all.go @@ -1 +1,38 @@ package all + +import ( + _ "github.com/influxdata/telegraf/plugins/outputs/amon" + _ "github.com/influxdata/telegraf/plugins/outputs/amqp" + _ "github.com/influxdata/telegraf/plugins/outputs/application_insights" + _ "github.com/influxdata/telegraf/plugins/outputs/azure_monitor" + _ "github.com/influxdata/telegraf/plugins/outputs/cloud_pubsub" + _ "github.com/influxdata/telegraf/plugins/outputs/cloudwatch" + _ "github.com/influxdata/telegraf/plugins/outputs/cratedb" + _ "github.com/influxdata/telegraf/plugins/outputs/datadog" + _ "github.com/influxdata/telegraf/plugins/outputs/discard" + _ "github.com/influxdata/telegraf/plugins/outputs/elasticsearch" + _ "github.com/influxdata/telegraf/plugins/outputs/exec" + _ "github.com/influxdata/telegraf/plugins/outputs/file" + _ "github.com/influxdata/telegraf/plugins/outputs/graphite" + _ "github.com/influxdata/telegraf/plugins/outputs/graylog" + _ "github.com/influxdata/telegraf/plugins/outputs/health" + _ "github.com/influxdata/telegraf/plugins/outputs/http" + _ "github.com/influxdata/telegraf/plugins/outputs/influxdb" + _ "github.com/influxdata/telegraf/plugins/outputs/influxdb_v2" + _ "github.com/influxdata/telegraf/plugins/outputs/instrumental" + _ "github.com/influxdata/telegraf/plugins/outputs/kafka" + _ "github.com/influxdata/telegraf/plugins/outputs/kinesis" + _ "github.com/influxdata/telegraf/plugins/outputs/librato" + _ "github.com/influxdata/telegraf/plugins/outputs/mqtt" + _ "github.com/influxdata/telegraf/plugins/outputs/nats" + _ "github.com/influxdata/telegraf/plugins/outputs/nsq" + _ "github.com/influxdata/telegraf/plugins/outputs/opentsdb" + _ "github.com/influxdata/telegraf/plugins/outputs/prometheus_client" + _ "github.com/influxdata/telegraf/plugins/outputs/riemann" + _ "github.com/influxdata/telegraf/plugins/outputs/signalfx" + _ "github.com/influxdata/telegraf/plugins/outputs/socket_writer" + _ "github.com/influxdata/telegraf/plugins/outputs/stackdriver" + _ "github.com/influxdata/telegraf/plugins/outputs/syslog" + _ "github.com/influxdata/telegraf/plugins/outputs/warp10" + _ "github.com/influxdata/telegraf/plugins/outputs/wavefront" +) diff --git a/plugins/outputs/signalfx/README.md b/plugins/outputs/signalfx/README.md index a9bf1dc03e8fe..7562ce87716bc 100644 --- a/plugins/outputs/signalfx/README.md +++ b/plugins/outputs/signalfx/README.md @@ -42,3 +42,20 @@ to use them. ## metric name in the following list. included_event_names = ["plugin.metric_name"] ``` +```toml + ## SignalFx API Token + APIToken = "my-secret-key" # required. + + ## Ingest URL + DatapointIngestURL = "https://ingest.signalfx.com/v2/datapoint" + EventIngestURL = "https://ingest.signalfx.com/v2/event" + + ## Exclude metrics by metric name + Exclude = ["plugin.metric_name", "plugin2.metric_name"] + + ## Events or String typed metrics are omitted by default, + ## with the exception of host property events which are emitted by + ## the SignalFx Metadata Plugin. If you require a string typed metric + ## you must specify the metric name in the following list + Include = ["plugin.metric_name", "plugin2.metric_name"] +``` \ No newline at end of file diff --git a/plugins/outputs/signalfx/parse/parse.go b/plugins/outputs/signalfx/parse/parse.go new file mode 100644 index 0000000000000..41b7a43fafd2e --- /dev/null +++ b/plugins/outputs/signalfx/parse/parse.go @@ -0,0 +1,52 @@ +package parse + +import ( + "fmt" +) + +// RemoveSFXDimensions removes dimensions used only to identify special metrics for SignalFx +func RemoveSFXDimensions(metricDims map[string]string) { + // remove the sf_metric dimension + delete(metricDims, "sf_metric") +} + +// SetPluginDimension sets the plugin dimension to the metric name if it is not already supplied +func SetPluginDimension(metricName string, metricDims map[string]string) { + // If the plugin doesn't define a plugin name use metric.Name() + if _, in := metricDims["plugin"]; !in { + metricDims["plugin"] = metricName + } +} + +// GetMetricName combines telegraf fields and tags into a full metric name +func GetMetricName(metric string, field string, dims map[string]string) (string, bool) { + // If sf_metric is provided + if sfmetric, isSFX := dims["sf_metric"]; isSFX { + return sfmetric, isSFX + } + + // If it isn't a sf_metric then use metric name + name := metric + + // Include field in metric name when it adds to the metric name + if field != "value" { + name = fmt.Sprintf("%s.%s", name, field) + } + + return name, false +} + +// ExtractProperty of the metric according to the following rules +func ExtractProperty(name string, dims map[string]string) (map[string]interface{}, error) { + props := make(map[string]interface{}, 1) + // if the metric is a metadata object + if name == "objects.host-meta-data" { + // If property exists remap it + if _, in := dims["property"]; !in { + return props, fmt.Errorf("E! objects.host-metadata object doesn't have a property") + } + props["property"] = dims["property"] + delete(dims, "property") + } + return props, nil +} diff --git a/plugins/outputs/signalfx/parse/parse_test.go b/plugins/outputs/signalfx/parse/parse_test.go new file mode 100644 index 0000000000000..f1d9ba4b1b482 --- /dev/null +++ b/plugins/outputs/signalfx/parse/parse_test.go @@ -0,0 +1,186 @@ +package parse + +import ( + "reflect" + "testing" +) + +func TestRemoveSFXDimensions(t *testing.T) { + type args struct { + metricDims map[string]string + } + tests := []struct { + name string + args args + }{ + { + name: "remove sf_metric from sfx dimensions", + args: args{ + metricDims: map[string]string{ + "sf_metric": "", + "dimensionKey": "dimensionVal", + }, + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + RemoveSFXDimensions(tt.args.metricDims) + if _, isIn := tt.args.metricDims["sf_metric"]; isIn { + t.Errorf("RemoveSFXDimensions() got metricDims %v, but 'sf_metric' shouldn't be in it", tt.args.metricDims) + } + }) + } +} + +func TestSetPluginDimension(t *testing.T) { + type args struct { + metricName string + metricDims map[string]string + } + tests := []struct { + name string + args args + }{ + { + name: "", + args: args{ + metricName: "metricName", + metricDims: map[string]string{ + "dimensionKey": "dimensionVal", + }, + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if original, in := tt.args.metricDims["plugin"]; !in { + SetPluginDimension(tt.args.metricName, tt.args.metricDims) + if val, in := tt.args.metricDims["plugin"]; !in || val != tt.args.metricName { + t.Errorf("SetPluginDimension() got %v but wanted plugin dimension with value %s", tt.args.metricDims, tt.args.metricName) + } + } else { + SetPluginDimension(tt.args.metricName, tt.args.metricDims) + if val, in := tt.args.metricDims["plugin"]; !in || val != original { + t.Errorf("SetPluginDImension() got %v but wanted plugin dimension with value %s", tt.args.metricDims, original) + } + } + }) + } +} + +func TestGetMetricName(t *testing.T) { + type args struct { + metric string + field string + dims map[string]string + } + tests := []struct { + name string + args args + want string + wantsfx bool + }{ + { + name: "use sf_metric tag as metric name", + args: args{ + metric: "datapoint", + field: "test", + dims: map[string]string{ + "sf_metric": "sfxtestmetricname", + }, + }, + want: "sfxtestmetricname", + wantsfx: true, + }, + { + name: "fields that equal value should not be append to metricname", + args: args{ + metric: "datapoint", + field: "value", + dims: map[string]string{ + "testDimKey": "testDimVal", + }, + }, + want: "datapoint", + wantsfx: false, + }, + { + name: "fields other than 'value' with out sf_metric dim should return measurement.fieldname as metric name", + args: args{ + metric: "datapoint", + field: "test", + dims: map[string]string{ + "testDimKey": "testDimVal", + }, + }, + want: "datapoint.test", + wantsfx: false, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got, got1 := GetMetricName(tt.args.metric, tt.args.field, tt.args.dims) + if got != tt.want { + t.Errorf("GetMetricName() got = %v, want %v", got, tt.want) + } + if got1 != tt.wantsfx { + t.Errorf("GetMetricName() got1 = %v, want %v", got1, tt.wantsfx) + } + }) + } +} + +func TestExtractProperty(t *testing.T) { + type args struct { + name string + dims map[string]string + } + tests := []struct { + name string + args args + wantProps map[string]interface{} + wantErr bool + }{ + { + name: "ensure that sfx host metadata events remap dimension with key 'property' to properties", + args: args{ + name: "objects.host-meta-data", + dims: map[string]string{ + "property": "propertyValue", + "dimensionKey": "dimensionValue", + }, + }, + wantProps: map[string]interface{}{ + "property": "propertyValue", + }, + wantErr: false, + }, + { + name: "malformed sfx host metadata event should return an error", + args: args{ + name: "objects.host-meta-data", + dims: map[string]string{ + "dimensionKey": "dimensionValue", + }, + }, + wantProps: map[string]interface{}{}, + wantErr: true, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + gotProps, err := ExtractProperty(tt.args.name, tt.args.dims) + if (err != nil) != tt.wantErr { + t.Errorf("ExtractProperty() error = %v, wantErr %v", err, tt.wantErr) + return + } + if !reflect.DeepEqual(gotProps, tt.wantProps) { + t.Errorf("ExtractProperty() = %v, want %v", gotProps, tt.wantProps) + } + if _, ok := tt.args.dims["property"]; ok { + t.Errorf("ExtractProperty() did not remove property from dims %v", tt.args.dims) + } + }) + } +} diff --git a/plugins/outputs/signalfx/signalfx.go b/plugins/outputs/signalfx/signalfx.go index 0a9e00d9866cc..aca64b1856ca7 100644 --- a/plugins/outputs/signalfx/signalfx.go +++ b/plugins/outputs/signalfx/signalfx.go @@ -1,241 +1,324 @@ -//go:generate ../../../tools/readme_config_includer/generator package signalfx import ( "context" - _ "embed" - "errors" - "fmt" - "strings" + "log" + "sync" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/plugins/outputs" + "github.com/influxdata/telegraf/plugins/outputs/signalfx/parse" "github.com/signalfx/golib/v3/datapoint" "github.com/signalfx/golib/v3/datapoint/dpsink" "github.com/signalfx/golib/v3/event" "github.com/signalfx/golib/v3/sfxclient" - - "github.com/influxdata/telegraf" - "github.com/influxdata/telegraf/config" - "github.com/influxdata/telegraf/plugins/outputs" ) -//go:embed sample.conf -var sampleConfig string - -// init initializes the plugin context -func init() { - outputs.Add("signalfx", func() telegraf.Output { - return NewSignalFx() - }) +/*SignalFx plugin context*/ +type SignalFx struct { + APIToken string + BatchSize int + ChannelSize int + DatapointIngestURL string + EventIngestURL string + Exclude []string + Include []string + exclude map[string]bool + include map[string]bool + ctx context.Context + client dpsink.Sink + dps chan *datapoint.Datapoint + evts chan *event.Event + done chan struct{} + wg sync.WaitGroup } -// SignalFx plugin context -type SignalFx struct { - AccessToken config.Secret `toml:"access_token"` - SignalFxRealm string `toml:"signalfx_realm"` - IngestURL string `toml:"ingest_url"` - IncludedEventNames []string `toml:"included_event_names"` +var sampleConfig = ` + ## SignalFx API Token + APIToken = "my-secret-key" # required. - Log telegraf.Logger `toml:"-"` + ## BatchSize + BatchSize = 1000 - includedEventSet map[string]bool - client dpsink.Sink + ## Ingest URL + DatapointIngestURL = "https://ingest.signalfx.com/v2/datapoint" + EventIngestURL = "https://ingest.signalfx.com/v2/event" - ctx context.Context - cancel context.CancelFunc -} + ## Exclude metrics by metric name + Exclude = ["plugin.metric_name", ""] + + ## Events or String typed metrics are omitted by default, + ## with the exception of host property events which are emitted by + ## the SignalFx Metadata Plugin. If you require a string typed metric + ## you must specify the metric name in the following list + Include = ["plugin.metric_name", ""] +` -// GetMetricType returns the equivalent telegraf ValueType for a signalfx metric type -func GetMetricType(mtype telegraf.ValueType) (metricType datapoint.MetricType) { +// GetMetricType casts a telegraf ValueType to a signalfx metric type +func GetMetricType(mtype telegraf.ValueType) (metricType datapoint.MetricType, metricTypeString string) { switch mtype { case telegraf.Counter: + metricTypeString = "counter" metricType = datapoint.Counter case telegraf.Gauge: + metricTypeString = "gauge" metricType = datapoint.Gauge case telegraf.Summary: + metricTypeString = "summary" metricType = datapoint.Gauge + log.Println("D! Output [signalfx] GetMetricType() summary metrics will be sent as gauges") case telegraf.Histogram: + metricTypeString = "histogram" metricType = datapoint.Gauge + log.Println("D! Output [signalfx] GetMetricType() histogram metrics will be sent as gauges") case telegraf.Untyped: + metricTypeString = "untyped" metricType = datapoint.Gauge + log.Println("D! Output [signalfx] GetMetricType() untyped metrics will be sent as gauges") default: + metricTypeString = "unrecognized" metricType = datapoint.Gauge + log.Println("D! Output [signalfx] GetMetricType() unrecognized metric type defaulting to gauge") } - return metricType + return } // NewSignalFx - returns a new context for the SignalFx output plugin func NewSignalFx() *SignalFx { - ctx, cancel := context.WithCancel(context.Background()) return &SignalFx{ - IncludedEventNames: []string{""}, - ctx: ctx, - cancel: cancel, - client: sfxclient.NewHTTPSink(), + APIToken: "", + BatchSize: 1000, + ChannelSize: 100000, + DatapointIngestURL: "https://ingest.signalfx.com/v2/datapoint", + EventIngestURL: "https://ingest.signalfx.com/v2/event", + Exclude: []string{""}, + Include: []string{""}, + done: make(chan struct{}), } } -func (*SignalFx) SampleConfig() string { +/*Description returns a description for the plugin*/ +func (s *SignalFx) Description() string { + return "Send metrics to SignalFx" +} + +/*SampleConfig returns the sample configuration for the plugin*/ +func (s *SignalFx) SampleConfig() string { return sampleConfig } -// Connect establishes a connection to SignalFx +/*Connect establishes a connection to SignalFx*/ func (s *SignalFx) Connect() error { - client := s.client.(*sfxclient.HTTPSink) - - token, err := s.AccessToken.Get() - if err != nil { - return fmt.Errorf("getting token failed: %w", err) - } - client.AuthToken = token.String() - token.Destroy() - - if s.IngestURL != "" { - client.DatapointEndpoint = datapointEndpointForIngestURL(s.IngestURL) - client.EventEndpoint = eventEndpointForIngestURL(s.IngestURL) - } else if s.SignalFxRealm != "" { - client.DatapointEndpoint = datapointEndpointForRealm(s.SignalFxRealm) - client.EventEndpoint = eventEndpointForRealm(s.SignalFxRealm) - } else { - return errors.New("signalfx_realm or ingest_url must be configured") - } - + // Make a connection to the URL here + client := sfxclient.NewHTTPSink() + client.AuthToken = s.APIToken + client.DatapointEndpoint = s.DatapointIngestURL + client.EventEndpoint = s.EventIngestURL + s.client = client + s.ctx = context.Background() + s.dps = make(chan *datapoint.Datapoint, s.ChannelSize) + s.evts = make(chan *event.Event, s.ChannelSize) + s.wg.Add(2) + go func() { + s.emitDatapoints() + s.wg.Done() + }() + go func() { + s.emitEvents() + s.wg.Done() + }() + log.Printf("I! Output [signalfx] batch size is %d\n", s.BatchSize) return nil } -// Close closes any connections to SignalFx +/*Close closes the connection to SignalFx*/ func (s *SignalFx) Close() error { - s.cancel() - s.client.(*sfxclient.HTTPSink).Client.CloseIdleConnections() + close(s.done) // drain the input channels + s.wg.Wait() // wait for the input channels to be drained + s.client = nil // destroy the client return nil } -func (s *SignalFx) ConvertToSignalFx(metrics []telegraf.Metric) ([]*datapoint.Datapoint, []*event.Event) { - var dps []*datapoint.Datapoint - var events []*event.Event +func (s *SignalFx) emitDatapoints() { + var buf []*datapoint.Datapoint + for { + select { + case <-s.done: + return + case dp := <-s.dps: + buf = append(buf, dp) + s.fillAndSendDatapoints(buf) + buf = buf[:0] + } + } +} + +func (s *SignalFx) fillAndSendDatapoints(buf []*datapoint.Datapoint) { +outer: + for { + select { + case dp := <-s.dps: + buf = append(buf, dp) + if len(buf) >= s.BatchSize { + if err := s.client.AddDatapoints(s.ctx, buf); err != nil { + log.Println("E! Output [signalfx] ", err) + } + buf = buf[:0] + } + default: + break outer + } + } + if len(buf) > 0 { + if err := s.client.AddDatapoints(s.ctx, buf); err != nil { + log.Println("E! Output [signalfx] ", err) + } + } +} + +func (s *SignalFx) emitEvents() { + var buf []*event.Event + for { + select { + case <-s.done: + return + case e := <-s.evts: + buf = append(buf, e) + s.fillAndSendEvents(buf) + buf = buf[:0] + } + } +} + +func (s *SignalFx) fillAndSendEvents(buf []*event.Event) { +outer: + for { + select { + case e := <-s.evts: + buf = append(buf, e) + if len(buf) >= s.BatchSize { + if err := s.client.AddEvents(s.ctx, buf); err != nil { + log.Println("E! Output [signalfx] ", err) + } + buf = buf[:0] + } + default: + break outer + } + } + if len(buf) > 0 { + if err := s.client.AddEvents(s.ctx, buf); err != nil { + log.Println("E! Output [signalfx] ", err) + } + } +} +// GetObjects - converts telegraf metrics to signalfx datapoints and events, and pushes them on to the supplied channels +func (s *SignalFx) GetObjects(metrics []telegraf.Metric, dps chan *datapoint.Datapoint, evts chan *event.Event) { for _, metric := range metrics { - s.Log.Debugf("Processing the following measurement: %v", metric) + log.Println("D! Outputs [signalfx] processing the following measurement: ", metric) var timestamp = metric.Time() + var metricType datapoint.MetricType + var metricTypeString string + + metricType, metricTypeString = GetMetricType(metric.Type()) - metricType := GetMetricType(metric.Type()) for field, val := range metric.Fields() { // Copy the metric tags because they are meant to be treated as // immutable var metricDims = metric.Tags() // Generate the metric name - metricName := getMetricName(metric.Name(), field) + var metricName, isSFX = parse.GetMetricName(metric.Name(), field, metricDims) + + // Check if the metric is explicitly excluded + if s.isExcluded(metricName) { + log.Println("D! Outputs [signalfx] excluding the following metric: ", metricName, metric) + continue + } + + // If eligible, move the dimension "property" to properties + metricProps, propErr := parse.ExtractProperty(metricName, metricDims) + if propErr != nil { + log.Printf("E! Output [signalfx] %v", propErr) + continue + } + + // Add common dimensions + metricDims["agent"] = "telegraf" + metricDims["telegraf_type"] = metricTypeString + parse.SetPluginDimension(metric.Name(), metricDims) + parse.RemoveSFXDimensions(metricDims) // Get the metric value as a datapoint value - if metricValue, err := datapoint.CastMetricValueWithBool(val); err == nil { + if metricValue, err := datapoint.CastMetricValue(val); err == nil { var dp = datapoint.New(metricName, metricDims, - metricValue, + metricValue.(datapoint.Value), metricType, timestamp) - s.Log.Debugf("Datapoint: %v", dp.String()) + // log metric + log.Println("D! Output [signalfx] ", dp.String()) - dps = append(dps, dp) + // Add metric as a datapoint + dps <- dp } else { - // Skip if it's not an explicitly included event - if !s.isEventIncluded(metricName) { + // Skip if it's not an sfx event and it's not included + if !isSFX && !s.isIncluded(metricName) { continue } // We've already type checked field, so set property with value - metricProps := map[string]interface{}{"message": val} + metricProps["message"] = val var ev = event.NewWithProperties(metricName, event.AGENT, metricDims, metricProps, timestamp) - s.Log.Debugf("Event: %v", ev.String()) + // log event + log.Println("D! Output [signalfx] ", ev.String()) - events = append(events, ev) + // Add event + evts <- ev } } } - - return dps, events } -// Write call back for writing metrics +/*Write call back for writing metrics*/ func (s *SignalFx) Write(metrics []telegraf.Metric) error { - dps, events := s.ConvertToSignalFx(metrics) - - if len(dps) > 0 { - err := s.client.AddDatapoints(s.ctx, dps) - if err != nil { - return err - } - } - - if len(events) > 0 { - if err := s.client.AddEvents(s.ctx, events); err != nil { - // If events error out but we successfully sent some datapoints, - // don't return an error so that it won't ever retry -- that way we - // don't send the same datapoints twice. - if len(dps) == 0 { - return err - } - s.Log.Errorf("Failed to send SignalFx event: %v", err) - } - } - + s.GetObjects(metrics, s.dps, s.evts) return nil } -// isEventIncluded - checks whether a metric name for an event was put on the whitelist -func (s *SignalFx) isEventIncluded(name string) bool { - if s.includedEventSet == nil { - s.includedEventSet = make(map[string]bool, len(s.includedEventSet)) - for _, include := range s.IncludedEventNames { - s.includedEventSet[include] = true +// isExcluded - checks whether a metric name was put on the exclude list +func (s *SignalFx) isExcluded(name string) bool { + if s.exclude == nil { + s.exclude = make(map[string]bool, len(s.Exclude)) + for _, exclude := range s.Exclude { + s.exclude[exclude] = true } } - return s.includedEventSet[name] + return s.exclude[name] } -// getMetricName combines telegraf fields and tags into a full metric name -func getMetricName(metric string, field string) string { - name := metric - - // Include field in metric name when it adds to the metric name - if field != "value" { - name = fmt.Sprintf("%s.%s", name, field) +// isIncluded - checks whether a metric name was put on the include list +func (s *SignalFx) isIncluded(name string) bool { + if s.include == nil { + s.include = make(map[string]bool, len(s.Include)) + for _, include := range s.Include { + s.include[include] = true + } } - - return name -} - -// ingestURLForRealm returns the base ingest URL for a particular SignalFx -// realm -func ingestURLForRealm(realm string) string { - return fmt.Sprintf("https://ingest.%s.signalfx.com", realm) -} - -// datapointEndpointForRealm returns the endpoint to which datapoints should be -// POSTed for a particular realm. -func datapointEndpointForRealm(realm string) string { - return datapointEndpointForIngestURL(ingestURLForRealm(realm)) -} - -// datapointEndpointForRealm returns the endpoint to which datapoints should be -// POSTed for a particular ingest base URL. -func datapointEndpointForIngestURL(ingestURL string) string { - return strings.TrimRight(ingestURL, "/") + "/v2/datapoint" -} - -// eventEndpointForRealm returns the endpoint to which events should be -// POSTed for a particular realm. -func eventEndpointForRealm(realm string) string { - return eventEndpointForIngestURL(ingestURLForRealm(realm)) + return s.include[name] } -// eventEndpointForRealm returns the endpoint to which events should be -// POSTed for a particular ingest base URL. -func eventEndpointForIngestURL(ingestURL string) string { - return strings.TrimRight(ingestURL, "/") + "/v2/event" +/*init initializes the plugin context*/ +func init() { + outputs.Add("signalfx", func() telegraf.Output { + return NewSignalFx() + }) } diff --git a/plugins/outputs/signalfx/signalfx_test.go b/plugins/outputs/signalfx/signalfx_test.go index fea88ee4151ea..08cc65d8253f7 100644 --- a/plugins/outputs/signalfx/signalfx_test.go +++ b/plugins/outputs/signalfx/signalfx_test.go @@ -7,14 +7,13 @@ import ( "testing" "time" - "github.com/signalfx/golib/v3/datapoint" - "github.com/signalfx/golib/v3/event" - "github.com/stretchr/testify/require" - "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/metric" "github.com/influxdata/telegraf/plugins/outputs" - "github.com/influxdata/telegraf/testutil" + "github.com/signalfx/golib/v3/datapoint" + "github.com/signalfx/golib/v3/datapoint/dpsink" + "github.com/signalfx/golib/v3/event" + "github.com/stretchr/testify/require" ) type sink struct { @@ -22,11 +21,11 @@ type sink struct { evs []*event.Event } -func (s *sink) AddDatapoints(_ context.Context, points []*datapoint.Datapoint) error { +func (s *sink) AddDatapoints(ctx context.Context, points []*datapoint.Datapoint) error { s.dps = append(s.dps, points...) return nil } -func (s *sink) AddEvents(_ context.Context, events []*event.Event) error { +func (s *sink) AddEvents(ctx context.Context, events []*event.Event) error { s.evs = append(s.evs, events...) return nil } @@ -36,10 +35,10 @@ type errorsink struct { evs []*event.Event } -func (e *errorsink) AddDatapoints(_ context.Context, _ []*datapoint.Datapoint) error { +func (e *errorsink) AddDatapoints(ctx context.Context, points []*datapoint.Datapoint) error { return errors.New("not sending datapoints") } -func (e *errorsink) AddEvents(_ context.Context, _ []*event.Event) error { +func (e *errorsink) AddEvents(ctx context.Context, events []*event.Event) error { return errors.New("not sending events") } func TestSignalFx_SignalFx(t *testing.T) { @@ -51,7 +50,8 @@ func TestSignalFx_SignalFx(t *testing.T) { tp telegraf.ValueType } type fields struct { - IncludedEvents []string + Exclude []string + Include []string } type want struct { datapoints []*datapoint.Datapoint @@ -108,25 +108,16 @@ func TestSignalFx_SignalFx(t *testing.T) { fields: map[string]interface{}{"mymeasurement": float64(3.14)}, time: time.Date(2010, time.November, 10, 23, 0, 0, 0, time.UTC), }, - { - name: "datapoint", - tags: map[string]string{"host": "192.168.0.1"}, - fields: map[string]interface{}{"myboolmeasurement": true}, - time: time.Date(2010, time.November, 10, 23, 0, 0, 0, time.UTC), - }, - { - name: "datapoint", - tags: map[string]string{"host": "192.168.0.1"}, - fields: map[string]interface{}{"myboolmeasurement": false}, - time: time.Date(2010, time.November, 10, 23, 0, 0, 0, time.UTC), - }, }, want: want{ datapoints: []*datapoint.Datapoint{ datapoint.New( "datapoint.mymeasurement", map[string]string{ - "host": "192.168.0.1", + "plugin": "datapoint", + "agent": "telegraf", + "telegraf_type": "counter", + "host": "192.168.0.1", }, datapoint.NewFloatValue(float64(3.14)), datapoint.Counter, @@ -134,7 +125,10 @@ func TestSignalFx_SignalFx(t *testing.T) { datapoint.New( "datapoint.mymeasurement", map[string]string{ - "host": "192.168.0.1", + "plugin": "datapoint", + "agent": "telegraf", + "telegraf_type": "gauge", + "host": "192.168.0.1", }, datapoint.NewFloatValue(float64(3.14)), datapoint.Gauge, @@ -142,7 +136,10 @@ func TestSignalFx_SignalFx(t *testing.T) { datapoint.New( "datapoint.mymeasurement", map[string]string{ - "host": "192.168.0.1", + "plugin": "datapoint", + "agent": "telegraf", + "telegraf_type": "summary", + "host": "192.168.0.1", }, datapoint.NewFloatValue(float64(3.14)), datapoint.Gauge, @@ -150,7 +147,10 @@ func TestSignalFx_SignalFx(t *testing.T) { datapoint.New( "datapoint.mymeasurement", map[string]string{ - "host": "192.168.0.1", + "plugin": "datapoint", + "agent": "telegraf", + "telegraf_type": "histogram", + "host": "192.168.0.1", }, datapoint.NewFloatValue(float64(3.14)), datapoint.Gauge, @@ -158,7 +158,10 @@ func TestSignalFx_SignalFx(t *testing.T) { datapoint.New( "datapoint.mymeasurement", map[string]string{ - "host": "192.168.0.1", + "plugin": "datapoint", + "agent": "telegraf", + "telegraf_type": "untyped", + "host": "192.168.0.1", }, datapoint.NewFloatValue(float64(3.14)), datapoint.Gauge, @@ -166,27 +169,14 @@ func TestSignalFx_SignalFx(t *testing.T) { datapoint.New( "datapoint.mymeasurement", map[string]string{ - "host": "192.168.0.1", + "plugin": "datapoint", + "agent": "telegraf", + "telegraf_type": "unrecognized", + "host": "192.168.0.1", }, datapoint.NewFloatValue(float64(3.14)), datapoint.Gauge, time.Date(2010, time.November, 10, 23, 0, 0, 0, time.UTC)), - datapoint.New( - "datapoint.myboolmeasurement", - map[string]string{ - "host": "192.168.0.1", - }, - datapoint.NewIntValue(int64(1)), - datapoint.Gauge, - time.Date(2010, time.November, 10, 23, 0, 0, 0, time.UTC)), - datapoint.New( - "datapoint.myboolmeasurement", - map[string]string{ - "host": "192.168.0.1", - }, - datapoint.NewIntValue(int64(0)), - datapoint.Gauge, - time.Date(2010, time.November, 10, 23, 0, 0, 0, time.UTC)), }, events: []*event.Event{}, }, @@ -194,7 +184,7 @@ func TestSignalFx_SignalFx(t *testing.T) { { name: "add events of all types", fields: fields{ - IncludedEvents: []string{"event.mymeasurement"}, + Include: []string{"event.mymeasurement"}, }, measurements: []*measurement{ { @@ -246,7 +236,10 @@ func TestSignalFx_SignalFx(t *testing.T) { "event.mymeasurement", event.AGENT, map[string]string{ - "host": "192.168.0.1", + "plugin": "event", + "agent": "telegraf", + "telegraf_type": "counter", + "host": "192.168.0.1", }, map[string]interface{}{ "message": "hello world", @@ -256,7 +249,10 @@ func TestSignalFx_SignalFx(t *testing.T) { "event.mymeasurement", event.AGENT, map[string]string{ - "host": "192.168.0.1", + "plugin": "event", + "agent": "telegraf", + "telegraf_type": "gauge", + "host": "192.168.0.1", }, map[string]interface{}{ "message": "hello world", @@ -266,7 +262,10 @@ func TestSignalFx_SignalFx(t *testing.T) { "event.mymeasurement", event.AGENT, map[string]string{ - "host": "192.168.0.1", + "plugin": "event", + "agent": "telegraf", + "telegraf_type": "summary", + "host": "192.168.0.1", }, map[string]interface{}{ "message": "hello world", @@ -276,7 +275,10 @@ func TestSignalFx_SignalFx(t *testing.T) { "event.mymeasurement", event.AGENT, map[string]string{ - "host": "192.168.0.1", + "plugin": "event", + "agent": "telegraf", + "telegraf_type": "histogram", + "host": "192.168.0.1", }, map[string]interface{}{ "message": "hello world", @@ -286,7 +288,10 @@ func TestSignalFx_SignalFx(t *testing.T) { "event.mymeasurement", event.AGENT, map[string]string{ - "host": "192.168.0.1", + "plugin": "event", + "agent": "telegraf", + "telegraf_type": "untyped", + "host": "192.168.0.1", }, map[string]interface{}{ "message": "hello world", @@ -296,7 +301,10 @@ func TestSignalFx_SignalFx(t *testing.T) { "event.mymeasurement", event.AGENT, map[string]string{ - "host": "192.168.0.1", + "plugin": "event", + "agent": "telegraf", + "telegraf_type": "unrecognized", + "host": "192.168.0.1", }, map[string]interface{}{ "message": "hello world", @@ -306,9 +314,18 @@ func TestSignalFx_SignalFx(t *testing.T) { }, }, { - name: "exclude events by default", - fields: fields{}, + name: "exclude datapoints and events", + fields: fields{ + Exclude: []string{"datapoint"}, + }, measurements: []*measurement{ + { + name: "datapoint", + tags: map[string]string{"host": "192.168.0.1"}, + fields: map[string]interface{}{"value": float64(3.14)}, + time: time.Date(2010, time.November, 10, 23, 0, 0, 0, time.UTC), + tp: telegraf.Gauge, + }, { name: "event", tags: map[string]string{"host": "192.168.0.1"}, @@ -339,7 +356,10 @@ func TestSignalFx_SignalFx(t *testing.T) { datapoint.New( "datapoint", map[string]string{ - "host": "192.168.0.1", + "plugin": "datapoint", + "agent": "telegraf", + "telegraf_type": "gauge", + "host": "192.168.0.1", }, datapoint.NewFloatValue(float64(3.14)), datapoint.Gauge, @@ -351,7 +371,7 @@ func TestSignalFx_SignalFx(t *testing.T) { { name: "add event", fields: fields{ - IncludedEvents: []string{"event.mymeasurement"}, + Include: []string{"event.mymeasurement"}, }, measurements: []*measurement{ { @@ -369,7 +389,10 @@ func TestSignalFx_SignalFx(t *testing.T) { "event.mymeasurement", event.AGENT, map[string]string{ - "host": "192.168.0.1", + "plugin": "event", + "agent": "telegraf", + "telegraf_type": "untyped", + "host": "192.168.0.1", }, map[string]interface{}{ "message": "hello world", @@ -416,11 +439,10 @@ func TestSignalFx_SignalFx(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { s := outputs.Outputs["signalfx"]().(*SignalFx) - s.IncludedEventNames = tt.fields.IncludedEvents - s.SignalFxRealm = "test" - s.Log = testutil.Logger{} + s.Exclude = tt.fields.Exclude + s.Include = tt.fields.Include - require.NoError(t, s.Connect()) + s.Connect() s.client = &sink{ dps: []*datapoint.Datapoint{}, @@ -433,21 +455,23 @@ func TestSignalFx_SignalFx(t *testing.T) { m := metric.New( measurement.name, measurement.tags, measurement.fields, measurement.time, measurement.tp, ) - measurements = append(measurements, m) } - err := s.Write(measurements) - require.NoError(t, err) - require.Eventually(t, func() bool { return len(s.client.(*sink).dps) == len(tt.want.datapoints) }, 5*time.Second, 10*time.Millisecond) - require.Eventually(t, func() bool { return len(s.client.(*sink).evs) == len(tt.want.events) }, 5*time.Second, 10*time.Millisecond) - + for !(len(s.client.(*sink).dps) == len(tt.want.datapoints) && len(s.client.(*sink).evs) == len(tt.want.events)) { + time.Sleep(1 * time.Second) + } if !reflect.DeepEqual(s.client.(*sink).dps, tt.want.datapoints) { t.Errorf("Collected datapoints do not match desired. Collected: %v Desired: %v", s.client.(*sink).dps, tt.want.datapoints) } if !reflect.DeepEqual(s.client.(*sink).evs, tt.want.events) { t.Errorf("Collected events do not match desired. Collected: %v Desired: %v", s.client.(*sink).evs, tt.want.events) } + + _err := s.Close() + if _err != nil { + t.Errorf("Failed to close the plugin %v", err) + } }) } } @@ -461,7 +485,8 @@ func TestSignalFx_Errors(t *testing.T) { tp telegraf.ValueType } type fields struct { - IncludedEvents []string + Exclude []string + Include []string } type want struct { datapoints []*datapoint.Datapoint @@ -527,7 +552,7 @@ func TestSignalFx_Errors(t *testing.T) { { name: "add events of all types", fields: fields{ - IncludedEvents: []string{"event.mymeasurement"}, + Include: []string{"event.mymeasurement"}, }, measurements: []*measurement{ { @@ -582,11 +607,11 @@ func TestSignalFx_Errors(t *testing.T) { t.Run(tt.name, func(t *testing.T) { s := outputs.Outputs["signalfx"]().(*SignalFx) // constrain the buffer to cover code that emits when batch size is met - s.IncludedEventNames = tt.fields.IncludedEvents - s.SignalFxRealm = "test" - s.Log = testutil.Logger{} + s.BatchSize = 2 + s.Exclude = tt.fields.Exclude + s.Include = tt.fields.Include - require.NoError(t, s.Connect()) + s.Connect() s.client = &errorsink{ dps: []*datapoint.Datapoint{}, @@ -597,7 +622,6 @@ func TestSignalFx_Errors(t *testing.T) { m := metric.New( measurement.name, measurement.tags, measurement.fields, measurement.time, measurement.tp, ) - err := s.Write([]telegraf.Metric{m}) require.Error(t, err) } @@ -610,50 +634,676 @@ func TestSignalFx_Errors(t *testing.T) { if !reflect.DeepEqual(s.client.(*errorsink).evs, tt.want.events) { t.Errorf("Collected events do not match desired. Collected: %v Desired: %v", s.client.(*errorsink).evs, tt.want.events) } + + err := s.Close() + if err != nil { + t.Errorf("Failed to close the plugin %v", err) + } }) } } -func TestGetMetricName(t *testing.T) { +func TestSignalFx_fillAndSendDatapoints(t *testing.T) { + type fields struct { + APIToken string + BatchSize int + ChannelSize int + client dpsink.Sink + dps chan *datapoint.Datapoint + evts chan *event.Event + } type args struct { - metric string - field string - dims map[string]string + in []*datapoint.Datapoint + buf []*datapoint.Datapoint } tests := []struct { - name string - args args - want string - wantsfx bool + name string + fields fields + args args + want []*datapoint.Datapoint }{ { - name: "fields that equal value should not be append to metricname", + name: "test buffer fills until batch size is met", + fields: fields{ + BatchSize: 3, + dps: make(chan *datapoint.Datapoint, 10), + client: &sink{ + dps: []*datapoint.Datapoint{}, + }, + }, args: args{ - metric: "datapoint", - field: "value", - dims: map[string]string{ - "testDimKey": "testDimVal", + buf: []*datapoint.Datapoint{}, + in: []*datapoint.Datapoint{ + datapoint.New( + "datapoint.mymeasurement", + map[string]string{ + "plugin": "datapoint", + "agent": "telegraf", + "telegraf_type": "counter", + "host": "192.168.0.1", + }, + datapoint.NewFloatValue(float64(3.14)), + datapoint.Counter, + time.Date(2010, time.November, 10, 23, 0, 0, 0, time.UTC)), + datapoint.New( + "datapoint.mymeasurement", + map[string]string{ + "plugin": "datapoint", + "agent": "telegraf", + "telegraf_type": "gauge", + "host": "192.168.0.1", + }, + datapoint.NewFloatValue(float64(3.14)), + datapoint.Gauge, + time.Date(2010, time.November, 10, 23, 0, 0, 0, time.UTC)), + datapoint.New( + "datapoint.mymeasurement", + map[string]string{ + "plugin": "datapoint", + "agent": "telegraf", + "telegraf_type": "summary", + "host": "192.168.0.1", + }, + datapoint.NewFloatValue(float64(3.14)), + datapoint.Gauge, + time.Date(2010, time.November, 10, 23, 0, 0, 0, time.UTC)), }, }, - want: "datapoint", + want: []*datapoint.Datapoint{ + datapoint.New( + "datapoint.mymeasurement", + map[string]string{ + "plugin": "datapoint", + "agent": "telegraf", + "telegraf_type": "counter", + "host": "192.168.0.1", + }, + datapoint.NewFloatValue(float64(3.14)), + datapoint.Counter, + time.Date(2010, time.November, 10, 23, 0, 0, 0, time.UTC)), + datapoint.New( + "datapoint.mymeasurement", + map[string]string{ + "plugin": "datapoint", + "agent": "telegraf", + "telegraf_type": "gauge", + "host": "192.168.0.1", + }, + datapoint.NewFloatValue(float64(3.14)), + datapoint.Gauge, + time.Date(2010, time.November, 10, 23, 0, 0, 0, time.UTC)), + datapoint.New( + "datapoint.mymeasurement", + map[string]string{ + "plugin": "datapoint", + "agent": "telegraf", + "telegraf_type": "summary", + "host": "192.168.0.1", + }, + datapoint.NewFloatValue(float64(3.14)), + datapoint.Gauge, + time.Date(2010, time.November, 10, 23, 0, 0, 0, time.UTC)), + }, }, { - name: "fields other than 'value' with out sf_metric dim should return measurement.fieldname as metric name", + name: "test buffer fills until batch size is and has 1 remaining when it breaks", + fields: fields{ + BatchSize: 2, + dps: make(chan *datapoint.Datapoint, 10), + client: &sink{ + dps: []*datapoint.Datapoint{}, + }, + }, args: args{ - metric: "datapoint", - field: "test", - dims: map[string]string{ - "testDimKey": "testDimVal", + buf: []*datapoint.Datapoint{}, + in: []*datapoint.Datapoint{ + datapoint.New( + "datapoint.mymeasurement", + map[string]string{ + "plugin": "datapoint", + "agent": "telegraf", + "telegraf_type": "counter", + "host": "192.168.0.1", + }, + datapoint.NewFloatValue(float64(3.14)), + datapoint.Counter, + time.Date(2010, time.November, 10, 23, 0, 0, 0, time.UTC)), + datapoint.New( + "datapoint.mymeasurement", + map[string]string{ + "plugin": "datapoint", + "agent": "telegraf", + "telegraf_type": "gauge", + "host": "192.168.0.1", + }, + datapoint.NewFloatValue(float64(3.14)), + datapoint.Gauge, + time.Date(2010, time.November, 10, 23, 0, 0, 0, time.UTC)), + datapoint.New( + "datapoint.mymeasurement", + map[string]string{ + "plugin": "datapoint", + "agent": "telegraf", + "telegraf_type": "summary", + "host": "192.168.0.1", + }, + datapoint.NewFloatValue(float64(3.14)), + datapoint.Gauge, + time.Date(2010, time.November, 10, 23, 0, 0, 0, time.UTC)), + }, + }, + want: []*datapoint.Datapoint{ + datapoint.New( + "datapoint.mymeasurement", + map[string]string{ + "plugin": "datapoint", + "agent": "telegraf", + "telegraf_type": "counter", + "host": "192.168.0.1", + }, + datapoint.NewFloatValue(float64(3.14)), + datapoint.Counter, + time.Date(2010, time.November, 10, 23, 0, 0, 0, time.UTC)), + datapoint.New( + "datapoint.mymeasurement", + map[string]string{ + "plugin": "datapoint", + "agent": "telegraf", + "telegraf_type": "gauge", + "host": "192.168.0.1", + }, + datapoint.NewFloatValue(float64(3.14)), + datapoint.Gauge, + time.Date(2010, time.November, 10, 23, 0, 0, 0, time.UTC)), + datapoint.New( + "datapoint.mymeasurement", + map[string]string{ + "plugin": "datapoint", + "agent": "telegraf", + "telegraf_type": "summary", + "host": "192.168.0.1", + }, + datapoint.NewFloatValue(float64(3.14)), + datapoint.Gauge, + time.Date(2010, time.November, 10, 23, 0, 0, 0, time.UTC)), + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + s := &SignalFx{ + APIToken: tt.fields.APIToken, + BatchSize: tt.fields.BatchSize, + ChannelSize: tt.fields.ChannelSize, + client: tt.fields.client, + dps: tt.fields.dps, + evts: tt.fields.evts, + } + for _, e := range tt.args.in { + s.dps <- e + } + s.fillAndSendDatapoints(tt.args.buf) + if !reflect.DeepEqual(s.client.(*sink).dps, tt.want) { + t.Errorf("fillAndSendDatapoints() datapoints do not match desired. Collected: %v Desired: %v", s.client.(*sink).dps, tt.want) + } + }) + } +} + +func TestSignalFx_fillAndSendDatapointsWithError(t *testing.T) { + type fields struct { + APIToken string + BatchSize int + ChannelSize int + client dpsink.Sink + dps chan *datapoint.Datapoint + evts chan *event.Event + } + type args struct { + in []*datapoint.Datapoint + buf []*datapoint.Datapoint + } + tests := []struct { + name string + fields fields + args args + want []*datapoint.Datapoint + }{ + { + name: "test buffer fills until batch size is met", + fields: fields{ + BatchSize: 3, + dps: make(chan *datapoint.Datapoint, 10), + client: &errorsink{ + dps: []*datapoint.Datapoint{}, }, }, - want: "datapoint.test", + args: args{ + buf: []*datapoint.Datapoint{}, + in: []*datapoint.Datapoint{ + datapoint.New( + "datapoint.mymeasurement", + map[string]string{ + "plugin": "datapoint", + "agent": "telegraf", + "telegraf_type": "counter", + "host": "192.168.0.1", + }, + datapoint.NewFloatValue(float64(3.14)), + datapoint.Counter, + time.Date(2010, time.November, 10, 23, 0, 0, 0, time.UTC)), + datapoint.New( + "datapoint.mymeasurement", + map[string]string{ + "plugin": "datapoint", + "agent": "telegraf", + "telegraf_type": "gauge", + "host": "192.168.0.1", + }, + datapoint.NewFloatValue(float64(3.14)), + datapoint.Gauge, + time.Date(2010, time.November, 10, 23, 0, 0, 0, time.UTC)), + datapoint.New( + "datapoint.mymeasurement", + map[string]string{ + "plugin": "datapoint", + "agent": "telegraf", + "telegraf_type": "summary", + "host": "192.168.0.1", + }, + datapoint.NewFloatValue(float64(3.14)), + datapoint.Gauge, + time.Date(2010, time.November, 10, 23, 0, 0, 0, time.UTC)), + }, + }, + want: []*datapoint.Datapoint{}, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + s := &SignalFx{ + APIToken: tt.fields.APIToken, + BatchSize: tt.fields.BatchSize, + ChannelSize: tt.fields.ChannelSize, + client: tt.fields.client, + dps: tt.fields.dps, + evts: tt.fields.evts, + } + for _, e := range tt.args.in { + s.dps <- e + } + s.fillAndSendDatapoints(tt.args.buf) + if !reflect.DeepEqual(s.client.(*errorsink).dps, tt.want) { + t.Errorf("fillAndSendDatapoints() datapoints do not match desired. Collected: %v Desired: %v", s.client.(*errorsink).dps, tt.want) + } + }) + } +} + +func TestSignalFx_fillAndSendEvents(t *testing.T) { + type fields struct { + APIToken string + BatchSize int + ChannelSize int + client dpsink.Sink + dps chan *datapoint.Datapoint + evts chan *event.Event + } + type args struct { + in []*event.Event + buf []*event.Event + } + tests := []struct { + name string + fields fields + args args + want []*event.Event + }{ + { + name: "test buffer fills until batch size is met", + fields: fields{ + BatchSize: 3, + evts: make(chan *event.Event, 10), + client: &sink{ + evs: []*event.Event{}, + }, + }, + args: args{ + buf: []*event.Event{}, + in: []*event.Event{ + event.NewWithProperties( + "event.mymeasurement", + event.AGENT, + map[string]string{ + "plugin": "event", + "agent": "telegraf", + "telegraf_type": "counter", + "host": "192.168.0.1", + }, + map[string]interface{}{ + "message": "hello world", + }, + time.Date(2010, time.November, 10, 23, 0, 0, 0, time.UTC)), + event.NewWithProperties( + "event.mymeasurement", + event.AGENT, + map[string]string{ + "plugin": "event", + "agent": "telegraf", + "telegraf_type": "gauge", + "host": "192.168.0.1", + }, + map[string]interface{}{ + "message": "hello world", + }, + time.Date(2010, time.November, 10, 23, 0, 0, 0, time.UTC)), + event.NewWithProperties( + "event.mymeasurement", + event.AGENT, + map[string]string{ + "plugin": "event", + "agent": "telegraf", + "telegraf_type": "summary", + "host": "192.168.0.1", + }, + map[string]interface{}{ + "message": "hello world", + }, + time.Date(2010, time.November, 10, 23, 0, 0, 0, time.UTC)), + }, + }, + want: []*event.Event{ + event.NewWithProperties( + "event.mymeasurement", + event.AGENT, + map[string]string{ + "plugin": "event", + "agent": "telegraf", + "telegraf_type": "counter", + "host": "192.168.0.1", + }, + map[string]interface{}{ + "message": "hello world", + }, + time.Date(2010, time.November, 10, 23, 0, 0, 0, time.UTC)), + event.NewWithProperties( + "event.mymeasurement", + event.AGENT, + map[string]string{ + "plugin": "event", + "agent": "telegraf", + "telegraf_type": "gauge", + "host": "192.168.0.1", + }, + map[string]interface{}{ + "message": "hello world", + }, + time.Date(2010, time.November, 10, 23, 0, 0, 0, time.UTC)), + event.NewWithProperties( + "event.mymeasurement", + event.AGENT, + map[string]string{ + "plugin": "event", + "agent": "telegraf", + "telegraf_type": "summary", + "host": "192.168.0.1", + }, + map[string]interface{}{ + "message": "hello world", + }, + time.Date(2010, time.November, 10, 23, 0, 0, 0, time.UTC)), + }, + }, + { + name: "test buffer fills until batch size is and has 1 remaining when it breaks", + fields: fields{ + BatchSize: 2, + evts: make(chan *event.Event, 10), + client: &sink{ + evs: []*event.Event{}, + }, + }, + args: args{ + buf: []*event.Event{}, + in: []*event.Event{ + event.NewWithProperties( + "event.mymeasurement", + event.AGENT, + map[string]string{ + "plugin": "event", + "agent": "telegraf", + "telegraf_type": "counter", + "host": "192.168.0.1", + }, + map[string]interface{}{ + "message": "hello world", + }, + time.Date(2010, time.November, 10, 23, 0, 0, 0, time.UTC)), + event.NewWithProperties( + "event.mymeasurement", + event.AGENT, + map[string]string{ + "plugin": "event", + "agent": "telegraf", + "telegraf_type": "gauge", + "host": "192.168.0.1", + }, + map[string]interface{}{ + "message": "hello world", + }, + time.Date(2010, time.November, 10, 23, 0, 0, 0, time.UTC)), + event.NewWithProperties( + "event.mymeasurement", + event.AGENT, + map[string]string{ + "plugin": "event", + "agent": "telegraf", + "telegraf_type": "summary", + "host": "192.168.0.1", + }, + map[string]interface{}{ + "message": "hello world", + }, + time.Date(2010, time.November, 10, 23, 0, 0, 0, time.UTC)), + }, + }, + want: []*event.Event{ + event.NewWithProperties( + "event.mymeasurement", + event.AGENT, + map[string]string{ + "plugin": "event", + "agent": "telegraf", + "telegraf_type": "counter", + "host": "192.168.0.1", + }, + map[string]interface{}{ + "message": "hello world", + }, + time.Date(2010, time.November, 10, 23, 0, 0, 0, time.UTC)), + event.NewWithProperties( + "event.mymeasurement", + event.AGENT, + map[string]string{ + "plugin": "event", + "agent": "telegraf", + "telegraf_type": "gauge", + "host": "192.168.0.1", + }, + map[string]interface{}{ + "message": "hello world", + }, + time.Date(2010, time.November, 10, 23, 0, 0, 0, time.UTC)), + event.NewWithProperties( + "event.mymeasurement", + event.AGENT, + map[string]string{ + "plugin": "event", + "agent": "telegraf", + "telegraf_type": "summary", + "host": "192.168.0.1", + }, + map[string]interface{}{ + "message": "hello world", + }, + time.Date(2010, time.November, 10, 23, 0, 0, 0, time.UTC)), + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + s := &SignalFx{ + APIToken: tt.fields.APIToken, + BatchSize: tt.fields.BatchSize, + ChannelSize: tt.fields.ChannelSize, + client: tt.fields.client, + dps: tt.fields.dps, + evts: tt.fields.evts, + } + for _, e := range tt.args.in { + s.evts <- e + } + s.fillAndSendEvents(tt.args.buf) + if !reflect.DeepEqual(s.client.(*sink).evs, tt.want) { + t.Errorf("fillAndSendEvents() datapoints do not match desired. Collected: %v Desired: %v", s.client.(*sink).evs, tt.want) + } + }) + } +} + +func TestSignalFx_fillAndSendEventsWithErrors(t *testing.T) { + type fields struct { + APIToken string + BatchSize int + ChannelSize int + client dpsink.Sink + dps chan *datapoint.Datapoint + evts chan *event.Event + } + type args struct { + in []*event.Event + buf []*event.Event + } + tests := []struct { + name string + fields fields + args args + want []*event.Event + }{ + { + name: "test buffer fills until batch size is met, but add events returns error", + fields: fields{ + BatchSize: 3, + evts: make(chan *event.Event, 10), + client: &errorsink{ + evs: []*event.Event{}, + }, + }, + args: args{ + buf: []*event.Event{}, + in: []*event.Event{ + event.NewWithProperties( + "event.mymeasurement", + event.AGENT, + map[string]string{ + "plugin": "event", + "agent": "telegraf", + "telegraf_type": "counter", + "host": "192.168.0.1", + }, + map[string]interface{}{ + "message": "hello world", + }, + time.Date(2010, time.November, 10, 23, 0, 0, 0, time.UTC)), + event.NewWithProperties( + "event.mymeasurement", + event.AGENT, + map[string]string{ + "plugin": "event", + "agent": "telegraf", + "telegraf_type": "gauge", + "host": "192.168.0.1", + }, + map[string]interface{}{ + "message": "hello world", + }, + time.Date(2010, time.November, 10, 23, 0, 0, 0, time.UTC)), + event.NewWithProperties( + "event.mymeasurement", + event.AGENT, + map[string]string{ + "plugin": "event", + "agent": "telegraf", + "telegraf_type": "summary", + "host": "192.168.0.1", + }, + map[string]interface{}{ + "message": "hello world", + }, + time.Date(2010, time.November, 10, 23, 0, 0, 0, time.UTC)), + }, + }, + want: []*event.Event{}, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + s := &SignalFx{ + APIToken: tt.fields.APIToken, + BatchSize: tt.fields.BatchSize, + ChannelSize: tt.fields.ChannelSize, + client: tt.fields.client, + dps: tt.fields.dps, + evts: tt.fields.evts, + } + for _, e := range tt.args.in { + s.evts <- e + } + s.fillAndSendEvents(tt.args.buf) + if !reflect.DeepEqual(s.client.(*errorsink).evs, tt.want) { + t.Errorf("fillAndSendEvents() datapoints do not match desired. Collected: %v Desired: %v", s.client.(*errorsink).evs, tt.want) + } + }) + } +} + +// this is really just for complete code coverage +func TestSignalFx_Description(t *testing.T) { + tests := []struct { + name string + want string + }{ + { + name: "verify description is correct", + want: "Send metrics to SignalFx", + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + s := &SignalFx{} + if got := s.Description(); got != tt.want { + t.Errorf("SignalFx.Description() = %v, want %v", got, tt.want) + } + }) + } +} + +// this is also just for complete code coverage +func TestSignalFx_SampleConfig(t *testing.T) { + tests := []struct { + name string + want string + }{ + { + name: "verify sample config is returned", + want: sampleConfig, }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - got := getMetricName(tt.args.metric, tt.args.field) - if got != tt.want { - t.Errorf("getMetricName() got = %v, want %v", got, tt.want) + s := &SignalFx{} + if got := s.SampleConfig(); got != tt.want { + t.Errorf("SignalFx.SampleConfig() = %v, want %v", got, tt.want) } }) }