diff --git a/internal/k6runner/k6runner.go b/internal/k6runner/k6runner.go index 8ea54186..a2d08824 100644 --- a/internal/k6runner/k6runner.go +++ b/internal/k6runner/k6runner.go @@ -4,7 +4,6 @@ import ( "bytes" "context" "encoding/json" - "errors" "fmt" "io" "net/http" @@ -61,39 +60,44 @@ func NewScript(script []byte, k6runner Runner) (*Script, error) { return &r, nil } -func (r Script) Run(ctx context.Context, registry *prometheus.Registry, logger logger.Logger, internalLogger zerolog.Logger) error { +func (r Script) Run(ctx context.Context, registry *prometheus.Registry, logger logger.Logger, internalLogger zerolog.Logger) (bool, error) { k6runner := r.runner.WithLogger(&internalLogger) - var runResult error - - result, scriptExited := k6runner.Run(ctx, r.script) - - if scriptExited != nil { - runResult = scriptExited + result, err := k6runner.Run(ctx, r.script) + if err != nil { internalLogger.Debug(). - Err(scriptExited). + Err(err). Msg("k6 script exited with error code") + return false, err } - if !result.Success && runResult == nil { - runResult = errors.New("finished run with errors") - } + var ( + collector sampleCollector + resultCollector checkResultCollector + ) - if err := textToRegistry(result.Metrics, registry, internalLogger); err != nil { + if err := extractMetricSamples(result.Metrics, internalLogger, collector.process, resultCollector.process); err != nil { internalLogger.Debug(). Err(err). - Msg("cannot add metrics to registry") - return err + Msg("cannot extract metric samples") + return false, err + } + + if err := registry.Register(&collector.collector); err != nil { + internalLogger.Error(). + Err(err). + Msg("cannot register collector") + return false, err } if err := k6LogsToLogger(result.Logs, logger); err != nil { internalLogger.Debug(). Err(err). Msg("cannot load logs to logger") - return err + return false, err } - return runResult + return !resultCollector.failure, nil } type customCollector struct { @@ -114,8 +118,60 @@ func (c *customCollector) Collect(ch chan<- prometheus.Metric) { } } -func textToRegistry(metrics []byte, registry prometheus.Registerer, logger zerolog.Logger) error { - collector := &customCollector{} +type sampleProcessorFunc func(*dto.MetricFamily, *model.Sample) error + +type sampleCollector struct { + collector customCollector +} + +func (sc *sampleCollector) process(mf *dto.MetricFamily, sample *model.Sample) error { + // TODO(mem): This is really crappy. We have a + // set of metric families, and we are + // converting back to samples so that we can + // add that to the registry. We need to rework + // the logic in the prober so that it can + // return a set of metric families. The probes + // that don't have this, can create a registry + // locally and get the metric families from + // that. + name, found := sample.Metric[model.MetricNameLabel] + if !found { + return fmt.Errorf("missing metric name") + } + + desc := prometheus.NewDesc(string(name), mf.GetHelp(), nil, metricToLabels(sample.Metric)) + // TODO(mem): maybe change this to untyped? + m, err := prometheus.NewConstMetric(desc, prometheus.GaugeValue, float64(sample.Value)) + if err != nil { + return fmt.Errorf("creating prometheus metric: %w", err) + } + + sc.collector.metrics = append(sc.collector.metrics, m) + + return nil +} + +type checkResultCollector struct { + failure bool +} + +func (rc *checkResultCollector) process(mf *dto.MetricFamily, sample *model.Sample) error { + if sample.Metric[model.MetricNameLabel] != "probe_checks_total" { + return nil + } + + if sample.Metric["result"] != "fail" { + return nil + } + + if sample.Value != 0 { + rc.failure = true + } + + return nil +} + +func extractMetricSamples(metrics []byte, logger zerolog.Logger, processors ...sampleProcessorFunc) error { promDecoder := expfmt.NewDecoder(bytes.NewBuffer(metrics), expfmt.FmtText) decoderOpts := expfmt.DecodeOptions{Timestamp: model.Now()} for { @@ -130,40 +186,17 @@ func textToRegistry(metrics []byte, registry prometheus.Registerer, logger zerol } for _, sample := range samples { - // TODO(mem): This is really crappy. We have a - // set of metric families, and we are - // converting back to samples so that we can - // add that to the registry. We need to rework - // the logic in the prober so that it can - // return a set of metric families. The probes - // that don't have this, can create a registry - // locally and get the metric families from - // that. - name, found := sample.Metric[model.MetricNameLabel] - if !found { - logger.Error().Err(err).Msg("missing metric name") - return fmt.Errorf("missing metric name") + for _, p := range processors { + err := p(&mf, sample) + if err != nil { + logger.Error().Err(err).Msg("processing samples") + return err + } } - - delete(sample.Metric, model.MetricNameLabel) - - desc := prometheus.NewDesc(string(name), mf.GetHelp(), nil, metricToLabels(sample.Metric)) - // TODO(mem): maybe change this to untyped? - m, err := prometheus.NewConstMetric(desc, prometheus.GaugeValue, float64(sample.Value)) - if err != nil { - logger.Error().Err(err).Msg("creating prometheus metric") - return err - } - - collector.metrics = append(collector.metrics, m) } case io.EOF: // nothing was returned, we are done - if err := registry.Register(collector); err != nil { - return err - } - return nil default: @@ -177,7 +210,11 @@ func metricToLabels(metrics model.Metric) prometheus.Labels { // Ugh. labels := make(prometheus.Labels) for name, value := range metrics { - labels[string(name)] = string(value) + name := string(name) + if name == model.MetricNameLabel { + continue + } + labels[name] = string(value) } return labels } @@ -228,7 +265,6 @@ type RunRequest struct { type RunResponse struct { Metrics []byte `json:"metrics"` Logs []byte `json:"logs"` - Success bool `json:"success"` } func (r HttpRunner) WithLogger(logger *zerolog.Logger) Runner { @@ -279,14 +315,6 @@ func (r HttpRunner) Run(ctx context.Context, script []byte) (*RunResponse, error r.logger.Debug().Bytes("metrics", result.Metrics).Bytes("logs", result.Logs).Msg("script result") - result.Success = true - - for _, log := range result.Logs { - if strings.Contains(string(log), "Assertion failed") { - result.Success = false - } - } - return &result, nil } @@ -381,17 +409,13 @@ func (r LocalRunner) Run(ctx context.Context, script []byte) (*RunResponse, erro cmd.Stdout = &stdout cmd.Stderr = &stderr - var checkError error - start := time.Now() r.logger.Info().Str("command", cmd.String()).Bytes("script", script).Msg("running k6 script") if err := cmd.Run(); err != nil { r.logger.Error().Err(err).Str("stdout", stdout.String()).Str("stderr", stderr.String()).Msg("k6 exited with error") - // We can't return early here, because exiting with an error status might just mean a check failure, not an actual runtime problem - // return nil, fmt.Errorf("cannot execute k6 script: %w", err) - checkError = err + return nil, fmt.Errorf("cannot execute k6 script: %w", err) } duration := time.Since(start) @@ -413,18 +437,6 @@ func (r LocalRunner) Run(ctx context.Context, script []byte) (*RunResponse, erro return nil, fmt.Errorf("cannot read logs: %w", err) } - if checkError == nil { - result.Success = true - } else { - result.Success = false - } - - for _, log := range result.Logs { - if strings.Contains(string(log), "Assertion failed") { - result.Success = false - } - } - r.logger.Debug().Bytes("metrics", result.Metrics).Bytes("logs", result.Logs).Msg("k6 result") return &result, nil diff --git a/internal/k6runner/k6runner_test.go b/internal/k6runner/k6runner_test.go index 34661e9a..a54de693 100644 --- a/internal/k6runner/k6runner_test.go +++ b/internal/k6runner/k6runner_test.go @@ -65,8 +65,9 @@ func TestScriptRun(t *testing.T) { // We already know tha parsing the metrics and the logs is working, so // we are only interested in verifying that the script runs without // errors. - err = script.Run(ctx, registry, &logger, zlogger) + success, err := script.Run(ctx, registry, &logger, zlogger) require.NoError(t, err) + require.True(t, success) } func TestHttpRunnerRun(t *testing.T) { @@ -154,7 +155,6 @@ func (r *testRunner) Run(ctx context.Context, script []byte) (*RunResponse, erro return &RunResponse{ Metrics: r.metrics, Logs: r.logs, - Success: true, }, nil } @@ -183,10 +183,16 @@ DEC_LOOP: } } - registry := prometheus.NewRegistry() + var sampleCollector sampleCollector + buf := bytes.Buffer{} logger := zerolog.New(&buf) - err := textToRegistry(data, registry, logger) + + err := extractMetricSamples(data, logger, sampleCollector.process) + require.NoError(t, err) + + registry := prometheus.NewRegistry() + err = registry.Register(&sampleCollector.collector) require.NoError(t, err) mfs, err := registry.Gather() diff --git a/internal/prober/k6/k6.go b/internal/prober/k6/k6.go index a095f225..3ba5316b 100644 --- a/internal/prober/k6/k6.go +++ b/internal/prober/k6/k6.go @@ -54,13 +54,13 @@ func (p Prober) Name() string { } func (p Prober) Probe(ctx context.Context, target string, registry *prometheus.Registry, logger logger.Logger) bool { - err := p.script.Run(ctx, registry, logger, p.logger) + success, err := p.script.Run(ctx, registry, logger, p.logger) if err != nil { p.logger.Warn().Err(err).Msg("running probe") return false } - return true + return success } func settingsToModule(settings *sm.K6Settings) Module { diff --git a/internal/prober/multihttp/multihttp.go b/internal/prober/multihttp/multihttp.go index 779bd9ab..d123ca35 100644 --- a/internal/prober/multihttp/multihttp.go +++ b/internal/prober/multihttp/multihttp.go @@ -70,13 +70,13 @@ func (p Prober) Name() string { } func (p Prober) Probe(ctx context.Context, target string, registry *prometheus.Registry, logger logger.Logger) bool { - err := p.script.Run(ctx, registry, logger, p.logger) + success, err := p.script.Run(ctx, registry, logger, p.logger) if err != nil { p.logger.Warn().Err(err).Msg("running probe") return false } - return true + return success } func settingsToModule(settings *sm.MultiHttpSettings) Module {