From e6d17857e0981372757d4d9279dfeec59189ee20 Mon Sep 17 00:00:00 2001 From: Andrew Cholakian Date: Wed, 30 Aug 2023 02:58:27 +0000 Subject: [PATCH] Add summarizer tests --- heartbeat/monitors/jobs/testing.go | 11 +- .../wrappers/summarizer/summarizer.go | 23 +++- .../wrappers/summarizer/summarizer_test.go | 128 +++++++++++++++++- 3 files changed, 152 insertions(+), 10 deletions(-) diff --git a/heartbeat/monitors/jobs/testing.go b/heartbeat/monitors/jobs/testing.go index a654ea3e517..a28ca7a8002 100644 --- a/heartbeat/monitors/jobs/testing.go +++ b/heartbeat/monitors/jobs/testing.go @@ -45,17 +45,12 @@ func ExecJobAndConts(t *testing.T, j Job) ([]*beat.Event, error) { event := &beat.Event{} results = append(results, event) cont, err := j(event) - if err != nil { - return nil, err - } for _, cj := range cont { - cjResults, err := ExecJobAndConts(t, cj) - if err != nil { - return nil, err - } + var cjResults []*beat.Event + cjResults, err = ExecJobAndConts(t, cj) results = append(results, cjResults...) } - return results, nil + return results, err } diff --git a/heartbeat/monitors/wrappers/summarizer/summarizer.go b/heartbeat/monitors/wrappers/summarizer/summarizer.go index 60d90b98304..df5b200d81e 100644 --- a/heartbeat/monitors/wrappers/summarizer/summarizer.go +++ b/heartbeat/monitors/wrappers/summarizer/summarizer.go @@ -20,6 +20,7 @@ package summarizer import ( "fmt" "sync" + "time" "github.com/gofrs/uuid" @@ -40,9 +41,11 @@ type Summarizer struct { checkGroup string stateTracker *monitorstate.Tracker sf stdfields.StdMonitorFields + retryDelay time.Duration } type JobSummary struct { + Id int Attempt uint16 `json:"attempt"` MaxAttempts uint16 `json:"max_attempts"` FinalAttempt bool `json:"final_attempt"` @@ -65,15 +68,21 @@ func NewSummarizer(rootJob jobs.Job, sf stdfields.StdMonitorFields, mst *monitor checkGroup: uu.String(), stateTracker: mst, sf: sf, + // private property, but can be overriden in tests to speed them up + retryDelay: time.Second, } } +var id int + func NewJobSummary(attempt uint16, maxAttempts uint16, retryGroup string) *JobSummary { if maxAttempts < 1 { maxAttempts = 1 } + id++ return &JobSummary{ + Id: id, MaxAttempts: maxAttempts, Attempt: attempt, RetryGroup: retryGroup, @@ -137,7 +146,19 @@ func (s *Summarizer) Wrap(j jobs.Job) jobs.Job { s.jobSummary = NewJobSummary(js.Attempt+1, js.MaxAttempts, js.RetryGroup) s.contsRemaining = 1 s.checkGroup = fmt.Sprintf("%s-%d", s.checkGroup, s.jobSummary.Attempt) - conts = []jobs.Job{s.rootJob} + + // Delay retries by 1s for two reasons: + // 1. Since ES timestamps are millisecond resolution they can happen so fast + // that it's hard to tell the sequence in which jobs executed apart in our + // kibana queries + // 2. If the site error is very short 1s gives it a tiny bit of time to recover + delayedRootJob := jobs.Wrap(s.rootJob, func(j jobs.Job) jobs.Job { + return func(event *beat.Event) ([]jobs.Job, error) { + time.Sleep(s.retryDelay) + return j(event) + } + }) + conts = []jobs.Job{delayedRootJob} } } diff --git a/heartbeat/monitors/wrappers/summarizer/summarizer_test.go b/heartbeat/monitors/wrappers/summarizer/summarizer_test.go index 97d9063ff03..f46492863b8 100644 --- a/heartbeat/monitors/wrappers/summarizer/summarizer_test.go +++ b/heartbeat/monitors/wrappers/summarizer/summarizer_test.go @@ -17,8 +17,134 @@ package summarizer -import "testing" +import ( + "fmt" + "testing" + "time" + + "github.com/elastic/beats/v7/heartbeat/monitors/jobs" + "github.com/elastic/beats/v7/heartbeat/monitors/stdfields" + "github.com/elastic/beats/v7/heartbeat/monitors/wrappers/monitorstate" + "github.com/elastic/beats/v7/libbeat/beat" + "github.com/elastic/elastic-agent-libs/mapstr" + "github.com/stretchr/testify/require" +) func TestSummarizer(t *testing.T) { + t.Parallel() + charToStatus := func(c uint8) monitorstate.StateStatus { + if c == 'u' { + return monitorstate.StatusUp + } else { + return monitorstate.StatusDown + } + } + + tests := []struct { + name string + maxAttempts int + statusSequence string + expectedStates string + }{ + { + "start down, transition to up", + 2, + "du", + "du", + }, + { + "start up, stay up", + 2, + "uuuuuuuu", + "uuuuuuuu", + }, + { + "start down, stay down", + 2, + "dddddddd", + "dddddddd", + }, + { + "start up - go down with one retry - thenrecover", + 2, + "udddduuu", + "uuddduuu", + }, + { + "start up, transient down, recover", + 2, + "uuuduuuu", + "uuuuuuuu", + }, + { + "start up, multiple transient down, recover", + 2, + "uuudududu", + "uuuuuuuuu", + }, + { + "no retries, single down", + 1, + "uuuduuuu", + "uuuduuuu", + }, + } + + for _, tt := range tests { + tt := tt + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + dummyErr := fmt.Errorf("dummyerr") + + // The job runs through each char in the status sequence and + // returns an error if it's set to 'd' + pos := 0 + job := func(event *beat.Event) (j []jobs.Job, retErr error) { + status := charToStatus(tt.statusSequence[pos]) + if status == monitorstate.StatusDown { + retErr = dummyErr + } + event.Fields = mapstr.M{ + "monitor": mapstr.M{ + "id": "test", + "status": string(status), + }, + } + + pos++ + return nil, retErr + } + + tracker := monitorstate.NewTracker(monitorstate.NilStateLoader, false) + sf := stdfields.StdMonitorFields{ID: "testmon", Name: "testmon", MaxAttempts: uint16(tt.maxAttempts)} + rcvdStatuses := "" + rcvdStates := "" + i := 0 + for { + s := NewSummarizer(job, sf, tracker) + // Shorten retry delay to make tests run faster + s.retryDelay = 2 * time.Millisecond + wrapped := s.Wrap(job) + events, _ := jobs.ExecJobAndConts(t, wrapped) + for _, event := range events { + eventStatus, _ := event.GetValue("monitor.status") + eventStatusStr := eventStatus.(string) + rcvdStatuses += eventStatusStr[:1] + state, _ := event.GetValue("state") + if state != nil { + rcvdStates += string(state.(*monitorstate.State).Status)[:1] + } else { + rcvdStates += "_" + } + } + i += len(events) + if i >= len(tt.statusSequence) { + break + } + } + require.Equal(t, tt.statusSequence, rcvdStatuses) + require.Equal(t, tt.expectedStates, rcvdStates) + }) + } }