Skip to content

Commit

Permalink
Determine whether the script was successful or not based on assertions
Browse files Browse the repository at this point in the history
Signed-off-by: Marcelo E. Magallon <[email protected]>
  • Loading branch information
mem committed Oct 5, 2023
1 parent 0547651 commit c801c5b
Show file tree
Hide file tree
Showing 4 changed files with 101 additions and 83 deletions.
162 changes: 87 additions & 75 deletions internal/k6runner/k6runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
"io"
"net/http"
Expand Down Expand Up @@ -61,39 +60,44 @@ func NewScript(script []byte, k6runner Runner) (*Script, error) {
return &r, nil
}

func (r Script) Run(ctx context.Context, registry *prometheus.Registry, logger logger.Logger, internalLogger zerolog.Logger) error {
func (r Script) Run(ctx context.Context, registry *prometheus.Registry, logger logger.Logger, internalLogger zerolog.Logger) (bool, error) {
k6runner := r.runner.WithLogger(&internalLogger)

var runResult error

result, scriptExited := k6runner.Run(ctx, r.script)

if scriptExited != nil {
runResult = scriptExited
result, err := k6runner.Run(ctx, r.script)
if err != nil {
internalLogger.Debug().
Err(scriptExited).
Err(err).
Msg("k6 script exited with error code")
return false, err
}

if !result.Success && runResult == nil {
runResult = errors.New("finished run with errors")
}
var (
collector sampleCollector
resultCollector checkResultCollector
)

if err := textToRegistry(result.Metrics, registry, internalLogger); err != nil {
if err := extractMetricSamples(result.Metrics, internalLogger, collector.process, resultCollector.process); err != nil {
internalLogger.Debug().
Err(err).
Msg("cannot add metrics to registry")
return err
Msg("cannot extract metric samples")
return false, err
}

if err := registry.Register(&collector.collector); err != nil {
internalLogger.Error().
Err(err).
Msg("cannot register collector")
return false, err
}

if err := k6LogsToLogger(result.Logs, logger); err != nil {
internalLogger.Debug().
Err(err).
Msg("cannot load logs to logger")
return err
return false, err
}

return runResult
return !resultCollector.failure, nil
}

type customCollector struct {
Expand All @@ -114,8 +118,60 @@ func (c *customCollector) Collect(ch chan<- prometheus.Metric) {
}
}

func textToRegistry(metrics []byte, registry prometheus.Registerer, logger zerolog.Logger) error {
collector := &customCollector{}
type sampleProcessorFunc func(*dto.MetricFamily, *model.Sample) error

type sampleCollector struct {
collector customCollector
}

func (sc *sampleCollector) process(mf *dto.MetricFamily, sample *model.Sample) error {
// TODO(mem): This is really crappy. We have a
// set of metric families, and we are
// converting back to samples so that we can
// add that to the registry. We need to rework
// the logic in the prober so that it can
// return a set of metric families. The probes
// that don't have this, can create a registry
// locally and get the metric families from
// that.
name, found := sample.Metric[model.MetricNameLabel]
if !found {
return fmt.Errorf("missing metric name")
}

desc := prometheus.NewDesc(string(name), mf.GetHelp(), nil, metricToLabels(sample.Metric))
// TODO(mem): maybe change this to untyped?
m, err := prometheus.NewConstMetric(desc, prometheus.GaugeValue, float64(sample.Value))
if err != nil {
return fmt.Errorf("creating prometheus metric: %w", err)
}

sc.collector.metrics = append(sc.collector.metrics, m)

return nil
}

type checkResultCollector struct {
failure bool
}

func (rc *checkResultCollector) process(mf *dto.MetricFamily, sample *model.Sample) error {
if sample.Metric[model.MetricNameLabel] != "probe_checks_total" {
return nil
}

if sample.Metric["result"] != "fail" {
return nil
}

if sample.Value != 0 {
rc.failure = true
}

return nil
}

func extractMetricSamples(metrics []byte, logger zerolog.Logger, processors ...sampleProcessorFunc) error {
promDecoder := expfmt.NewDecoder(bytes.NewBuffer(metrics), expfmt.FmtText)
decoderOpts := expfmt.DecodeOptions{Timestamp: model.Now()}
for {
Expand All @@ -130,40 +186,17 @@ func textToRegistry(metrics []byte, registry prometheus.Registerer, logger zerol
}

for _, sample := range samples {
// TODO(mem): This is really crappy. We have a
// set of metric families, and we are
// converting back to samples so that we can
// add that to the registry. We need to rework
// the logic in the prober so that it can
// return a set of metric families. The probes
// that don't have this, can create a registry
// locally and get the metric families from
// that.
name, found := sample.Metric[model.MetricNameLabel]
if !found {
logger.Error().Err(err).Msg("missing metric name")
return fmt.Errorf("missing metric name")
for _, p := range processors {
err := p(&mf, sample)
if err != nil {
logger.Error().Err(err).Msg("processing samples")
return err
}
}

delete(sample.Metric, model.MetricNameLabel)

desc := prometheus.NewDesc(string(name), mf.GetHelp(), nil, metricToLabels(sample.Metric))
// TODO(mem): maybe change this to untyped?
m, err := prometheus.NewConstMetric(desc, prometheus.GaugeValue, float64(sample.Value))
if err != nil {
logger.Error().Err(err).Msg("creating prometheus metric")
return err
}

collector.metrics = append(collector.metrics, m)
}

case io.EOF:
// nothing was returned, we are done
if err := registry.Register(collector); err != nil {
return err
}

return nil

default:
Expand All @@ -177,7 +210,11 @@ func metricToLabels(metrics model.Metric) prometheus.Labels {
// Ugh.
labels := make(prometheus.Labels)
for name, value := range metrics {
labels[string(name)] = string(value)
name := string(name)
if name == model.MetricNameLabel {
continue
}
labels[name] = string(value)
}
return labels
}
Expand Down Expand Up @@ -228,7 +265,6 @@ type RunRequest struct {
type RunResponse struct {
Metrics []byte `json:"metrics"`
Logs []byte `json:"logs"`
Success bool `json:"success"`
}

func (r HttpRunner) WithLogger(logger *zerolog.Logger) Runner {
Expand Down Expand Up @@ -279,14 +315,6 @@ func (r HttpRunner) Run(ctx context.Context, script []byte) (*RunResponse, error

r.logger.Debug().Bytes("metrics", result.Metrics).Bytes("logs", result.Logs).Msg("script result")

result.Success = true

for _, log := range result.Logs {
if strings.Contains(string(log), "Assertion failed") {
result.Success = false
}
}

return &result, nil
}

Expand Down Expand Up @@ -381,17 +409,13 @@ func (r LocalRunner) Run(ctx context.Context, script []byte) (*RunResponse, erro
cmd.Stdout = &stdout
cmd.Stderr = &stderr

var checkError error

start := time.Now()

r.logger.Info().Str("command", cmd.String()).Bytes("script", script).Msg("running k6 script")

if err := cmd.Run(); err != nil {
r.logger.Error().Err(err).Str("stdout", stdout.String()).Str("stderr", stderr.String()).Msg("k6 exited with error")
// We can't return early here, because exiting with an error status might just mean a check failure, not an actual runtime problem
// return nil, fmt.Errorf("cannot execute k6 script: %w", err)
checkError = err
return nil, fmt.Errorf("cannot execute k6 script: %w", err)
}

duration := time.Since(start)
Expand All @@ -413,18 +437,6 @@ func (r LocalRunner) Run(ctx context.Context, script []byte) (*RunResponse, erro
return nil, fmt.Errorf("cannot read logs: %w", err)
}

if checkError == nil {
result.Success = true
} else {
result.Success = false
}

for _, log := range result.Logs {
if strings.Contains(string(log), "Assertion failed") {
result.Success = false
}
}

r.logger.Debug().Bytes("metrics", result.Metrics).Bytes("logs", result.Logs).Msg("k6 result")

return &result, nil
Expand Down
14 changes: 10 additions & 4 deletions internal/k6runner/k6runner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,9 @@ func TestScriptRun(t *testing.T) {
// We already know tha parsing the metrics and the logs is working, so
// we are only interested in verifying that the script runs without
// errors.
err = script.Run(ctx, registry, &logger, zlogger)
success, err := script.Run(ctx, registry, &logger, zlogger)
require.NoError(t, err)
require.True(t, success)
}

func TestHttpRunnerRun(t *testing.T) {
Expand Down Expand Up @@ -154,7 +155,6 @@ func (r *testRunner) Run(ctx context.Context, script []byte) (*RunResponse, erro
return &RunResponse{
Metrics: r.metrics,
Logs: r.logs,
Success: true,
}, nil
}

Expand Down Expand Up @@ -183,10 +183,16 @@ DEC_LOOP:
}
}

registry := prometheus.NewRegistry()
var sampleCollector sampleCollector

buf := bytes.Buffer{}
logger := zerolog.New(&buf)
err := textToRegistry(data, registry, logger)

err := extractMetricSamples(data, logger, sampleCollector.process)
require.NoError(t, err)

registry := prometheus.NewRegistry()
err = registry.Register(&sampleCollector.collector)
require.NoError(t, err)

mfs, err := registry.Gather()
Expand Down
4 changes: 2 additions & 2 deletions internal/prober/k6/k6.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,13 +54,13 @@ func (p Prober) Name() string {
}

func (p Prober) Probe(ctx context.Context, target string, registry *prometheus.Registry, logger logger.Logger) bool {
err := p.script.Run(ctx, registry, logger, p.logger)
success, err := p.script.Run(ctx, registry, logger, p.logger)
if err != nil {
p.logger.Warn().Err(err).Msg("running probe")
return false
}

return true
return success
}

func settingsToModule(settings *sm.K6Settings) Module {
Expand Down
4 changes: 2 additions & 2 deletions internal/prober/multihttp/multihttp.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,13 +70,13 @@ func (p Prober) Name() string {
}

func (p Prober) Probe(ctx context.Context, target string, registry *prometheus.Registry, logger logger.Logger) bool {
err := p.script.Run(ctx, registry, logger, p.logger)
success, err := p.script.Run(ctx, registry, logger, p.logger)
if err != nil {
p.logger.Warn().Err(err).Msg("running probe")
return false
}

return true
return success
}

func settingsToModule(settings *sm.MultiHttpSettings) Module {
Expand Down

0 comments on commit c801c5b

Please sign in to comment.