Skip to content

Commit

Permalink
test: reduce beater tests time (#12472)
Browse files Browse the repository at this point in the history
TestTimeoutMiddleware: replace the hardcoded 1s time.Sleep with twice the context timeout. In
this case 2ns. Test time went from 1s to 0s.

TestServerWaitForIntegrationElasticsearch: replace mutex with atomic int to reduce lock contention
and set flush_interval to 1ms (default 1s). Test time went from a bit over 1s to 0.1s

TestServerElasticsearchOutput: replace flush_bytes 1kb with flush_interval 1ms. The flush_bytes setting
had no effect on this case even if the test data is >1kb because compression is enabled so it's
never going to hit the flush_bytes and is hitting the default timeout instead (1s). Test time
went from a bit over 1s to 0.1s
  • Loading branch information
kruskall authored Jan 22, 2024
1 parent 8ad7b49 commit 398ab6e
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 20 deletions.
5 changes: 3 additions & 2 deletions internal/beater/middleware/timeout_middleware_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,11 +60,12 @@ func TestTimeoutMiddleware(t *testing.T) {
}
}()
test(t, request.Handler(func(c *request.Context) {
timeout := 1 * time.Nanosecond
ctx := c.Request.Context()
ctx, cancel = context.WithTimeout(ctx, time.Nanosecond)
ctx, cancel = context.WithTimeout(ctx, timeout)
r := c.Request.WithContext(ctx)
c.Request = r
time.Sleep(time.Second)
time.Sleep(2 * timeout) // let the context expire
}))
})
}
33 changes: 15 additions & 18 deletions internal/beater/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ import (
"reflect"
"runtime"
"strings"
"sync"
"sync/atomic"
"testing"
"time"
Expand Down Expand Up @@ -386,10 +385,9 @@ func TestServerWaitForIntegrationKibana(t *testing.T) {
}

func TestServerWaitForIntegrationElasticsearch(t *testing.T) {
var mu sync.Mutex
var tracesRequests int
tracesRequestsCh := make(chan int)
bulkCh := make(chan struct{}, 1)
var tracesRequests atomic.Int64
tracesRequestsCh := make(chan int, 2)
bulkCh := make(chan struct{}, 2)
mux := http.NewServeMux()
mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("X-Elastic-Product", "Elasticsearch")
Expand All @@ -398,15 +396,13 @@ func TestServerWaitForIntegrationElasticsearch(t *testing.T) {
fmt.Fprintln(w, `{"version":{"number":"1.2.3"}}`)
})
mux.HandleFunc("/_index_template/", func(w http.ResponseWriter, r *http.Request) {
mu.Lock()
defer mu.Unlock()
template := path.Base(r.URL.Path)
if template == "traces-apm" {
tracesRequests++
if tracesRequests == 1 {
count := tracesRequests.Add(1)
if count == 1 {
w.WriteHeader(404)
}
tracesRequestsCh <- tracesRequests
tracesRequestsCh <- int(count)
}
})
mux.HandleFunc("/_bulk", func(w http.ResponseWriter, r *http.Request) {
Expand All @@ -424,9 +420,10 @@ func TestServerWaitForIntegrationElasticsearch(t *testing.T) {
"data_streams.wait_for_integration": true,
},
"output.elasticsearch": map[string]interface{}{
"hosts": []string{elasticsearchServer.URL},
"backoff": map[string]interface{}{"init": "10ms", "max": "10ms"},
"max_retries": 1000,
"hosts": []string{elasticsearchServer.URL},
"backoff": map[string]interface{}{"init": "10ms", "max": "10ms"},
"max_retries": 1000,
"flush_interval": "1ms",
},
})))

Expand Down Expand Up @@ -638,11 +635,11 @@ func TestServerElasticsearchOutput(t *testing.T) {

srv := beatertest.NewServer(t, beatertest.WithConfig(agentconfig.MustNewConfigFrom(map[string]interface{}{
"output.elasticsearch": map[string]interface{}{
"hosts": []string{elasticsearchServer.URL},
"flush_bytes": "1kb", // test data is >1kb
"backoff": map[string]interface{}{"init": "1ms", "max": "1ms"},
"max_retries": 0,
"max_requests": 10,
"hosts": []string{elasticsearchServer.URL},
"flush_interval": "1ms",
"backoff": map[string]interface{}{"init": "1ms", "max": "1ms"},
"max_retries": 0,
"max_requests": 10,
},
})))

Expand Down

0 comments on commit 398ab6e

Please sign in to comment.