diff --git a/internal/benchrunner/runners/rally/metrics.go b/internal/benchrunner/runners/rally/metrics.go index 11f5d405f..8cac07cae 100644 --- a/internal/benchrunner/runners/rally/metrics.go +++ b/internal/benchrunner/runners/rally/metrics.go @@ -112,9 +112,14 @@ func (c *collector) stop() { } func (c *collector) collectMetricsBeforeRallyRun() { - _, err := c.esAPI.Indices.Refresh(c.esAPI.Indices.Refresh.WithIndex(c.datastream)) + resp, err := c.esAPI.Indices.Refresh(c.esAPI.Indices.Refresh.WithIndex(c.datastream)) if err != nil { - logger.Errorf("unable to refresh data stream at the beginning of rally run") + logger.Errorf("unable to refresh data stream at the beginning of rally run: %s", err) + return + } + defer resp.Body.Close() + if resp.IsError() { + logger.Errorf("unable to refresh data stream at the beginning of rally run: %s", resp.String()) return } @@ -157,19 +162,13 @@ func (c *collector) publish(events [][]byte) { logger.Errorf("error indexing event in metricstore: %w", err) return } - - if resp.Body == nil { - logger.Errorf("empty index response body from metricstore: %w", err) - return - } + defer resp.Body.Close() body, err := io.ReadAll(resp.Body) if err != nil { logger.Errorf("failed to read index response body from metricstore: %w", err) } - resp.Body.Close() - if resp.StatusCode != 201 { logger.Errorf("error indexing event in metricstore (%d): %s: %v", resp.StatusCode, resp.Status(), elasticsearch.NewError(body)) } @@ -187,7 +186,7 @@ func (c *collector) createMetricsIndex() { logger.Debugf("creating %s index in metricstore...", c.indexName()) - createRes, err := c.metricsAPI.Indices.Create( + resp, err := c.metricsAPI.Indices.Create( c.indexName(), c.metricsAPI.Indices.Create.WithBody(reader), ) @@ -195,10 +194,10 @@ func (c *collector) createMetricsIndex() { logger.Errorf("could not create index: %w", err) return } - createRes.Body.Close() + defer resp.Body.Close() - if createRes.IsError() { - logger.Errorf("got a response error while creating index") + if resp.IsError() { + logger.Errorf("got a response error while creating index: %s", resp.String()) } } @@ -287,9 +286,14 @@ func (c *collector) collectDiskUsage() map[string]ingest.DiskUsage { } func (c *collector) collectMetricsAfterRallyRun() { - _, err := c.esAPI.Indices.Refresh(c.esAPI.Indices.Refresh.WithIndex(c.datastream)) + resp, err := c.esAPI.Indices.Refresh(c.esAPI.Indices.Refresh.WithIndex(c.datastream)) if err != nil { - logger.Errorf("unable to refresh data stream at the end of rally run") + logger.Errorf("unable to refresh data stream at the end of rally run: %s", err) + return + } + defer resp.Body.Close() + if resp.IsError() { + logger.Errorf("unable to refresh data stream at the end of rally run: %s", resp.String()) return } diff --git a/internal/benchrunner/runners/rally/runner.go b/internal/benchrunner/runners/rally/runner.go index 1fae90bc9..48e53e421 100644 --- a/internal/benchrunner/runners/rally/runner.go +++ b/internal/benchrunner/runners/rally/runner.go @@ -12,6 +12,7 @@ import ( "errors" "fmt" "io" + "net/http" "os" "os/exec" "path/filepath" @@ -336,10 +337,20 @@ func (r *runner) collectAndSummarizeMetrics() (*metricsSummary, error) { func (r *runner) deleteDataStreamDocs(dataStream string) error { body := strings.NewReader(`{ "query": { "match_all": {} } }`) - _, err := r.options.ESAPI.DeleteByQuery([]string{dataStream}, body) + resp, err := r.options.ESAPI.DeleteByQuery([]string{dataStream}, body) if err != nil { - return err + return fmt.Errorf("failed to delete data stream docs for data stream %s: %w", dataStream, err) + } + defer resp.Body.Close() + + if resp.StatusCode == http.StatusNotFound { + // Unavailable index is ok, this means that data is already not there. + return nil } + if resp.IsError() { + return fmt.Errorf("failed to delete data stream docs for data stream %s: %s", dataStream, resp.String()) + } + return nil } @@ -665,6 +676,9 @@ func (r *runner) reindexData() error { return fmt.Errorf("error getting mapping: %w", err) } defer mappingRes.Body.Close() + if mappingRes.IsError() { + return fmt.Errorf("error getting mapping: %s", mappingRes) + } body, err := io.ReadAll(mappingRes.Body) if err != nil { @@ -709,7 +723,7 @@ func (r *runner) reindexData() error { defer createRes.Body.Close() if createRes.IsError() { - return errors.New("got a response error while creating index") + return fmt.Errorf("got a response error while creating index: %s", createRes) } bodyReader := strings.NewReader(`{"query":{"match_all":{}}}`) @@ -725,21 +739,13 @@ func (r *runner) reindexData() error { return fmt.Errorf("error executing search: %w", err) } defer res.Body.Close() - - type searchRes struct { - Error *struct { - Reason string `json:"reson"` - } `json:"error"` - ScrollID string `json:"_scroll_id"` - Hits []struct { - ID string `json:"_id"` - Source map[string]interface{} `json:"_source"` - } `json:"hits"` + if res.IsError() { + return fmt.Errorf("error executing search: %s", res) } // Iterate through the search results using the Scroll API for { - var sr searchRes + var sr searchResponse if err := json.NewDecoder(res.Body).Decode(&sr); err != nil { return fmt.Errorf("error decoding search response: %w", err) } @@ -752,40 +758,67 @@ func (r *runner) reindexData() error { break } - var bulkBodyBuilder strings.Builder - for _, hit := range sr.Hits { - bulkBodyBuilder.WriteString(fmt.Sprintf("{\"index\":{\"_index\":\"%s\",\"_id\":\"%s\"}}\n", indexName, hit.ID)) - enriched := r.enrichEventWithBenchmarkMetadata(hit.Source) - src, err := json.Marshal(enriched) - if err != nil { - return fmt.Errorf("error decoding _source: %w", err) - } - bulkBodyBuilder.WriteString(fmt.Sprintf("%s\n", string(src))) + err := r.bulkMetrics(indexName, sr) + if err != nil { + return err } + } + + logger.Debug("reindexing operation finished") + return nil +} - logger.Debugf("bulk request of %d events...", len(sr.Hits)) +type searchResponse struct { + Error *struct { + Reason string `json:"reson"` + } `json:"error"` + ScrollID string `json:"_scroll_id"` + Hits []struct { + ID string `json:"_id"` + Source map[string]interface{} `json:"_source"` + } `json:"hits"` +} - bulkRes, err := r.options.ESMetricsAPI.Bulk(strings.NewReader(bulkBodyBuilder.String())) +func (r *runner) bulkMetrics(indexName string, sr searchResponse) error { + var bulkBodyBuilder strings.Builder + for _, hit := range sr.Hits { + bulkBodyBuilder.WriteString(fmt.Sprintf("{\"index\":{\"_index\":\"%s\",\"_id\":\"%s\"}}\n", indexName, hit.ID)) + enriched := r.enrichEventWithBenchmarkMetadata(hit.Source) + src, err := json.Marshal(enriched) if err != nil { - return fmt.Errorf("error performing the bulk index request: %w", err) + return fmt.Errorf("error decoding _source: %w", err) } - bulkRes.Body.Close() + bulkBodyBuilder.WriteString(fmt.Sprintf("%s\n", string(src))) + } - if sr.ScrollID == "" { - return errors.New("error getting scroll ID") - } + logger.Debugf("bulk request of %d events...", len(sr.Hits)) - res, err = r.options.ESAPI.Scroll( - r.options.ESAPI.Scroll.WithScrollID(sr.ScrollID), - r.options.ESAPI.Scroll.WithScroll(time.Minute), - ) - if err != nil { - return fmt.Errorf("error executing scroll: %s", err) - } - res.Body.Close() + resp, err := r.options.ESMetricsAPI.Bulk(strings.NewReader(bulkBodyBuilder.String())) + if err != nil { + return fmt.Errorf("error performing the bulk index request: %w", err) + } + defer resp.Body.Close() + if resp.IsError() { + return fmt.Errorf("error performing the bulk index request: %s", resp.String()) + } + + if sr.ScrollID == "" { + return errors.New("error getting scroll ID") + } + + resp, err = r.options.ESAPI.Scroll( + r.options.ESAPI.Scroll.WithScrollID(sr.ScrollID), + r.options.ESAPI.Scroll.WithScroll(time.Minute), + ) + if err != nil { + return fmt.Errorf("error executing scroll: %s", err) + } + defer resp.Body.Close() + + if resp.IsError() { + return fmt.Errorf("error executing scroll: %s", resp.String()) } - logger.Debug("reindexing operation finished") return nil } @@ -809,12 +842,17 @@ func (r *runner) enrichEventWithBenchmarkMetadata(e map[string]interface{}) map[ func getTotalHits(esapi *elasticsearch.API, dataStream string) (int, error) { resp, err := esapi.Count( esapi.Count.WithIndex(dataStream), + esapi.Count.WithIgnoreUnavailable(true), ) if err != nil { return 0, fmt.Errorf("could not search data stream: %w", err) } defer resp.Body.Close() + if resp.IsError() { + return 0, fmt.Errorf("failed to get hits count: %s", resp.String()) + } + var results struct { Count int Error *struct { diff --git a/internal/benchrunner/runners/system/metrics.go b/internal/benchrunner/runners/system/metrics.go index 2b48f227d..bd3c0cb51 100644 --- a/internal/benchrunner/runners/system/metrics.go +++ b/internal/benchrunner/runners/system/metrics.go @@ -196,7 +196,7 @@ func (c *collector) createMetricsIndex() { createRes.Body.Close() if createRes.IsError() { - logger.Debug("got a response error while creating index") + logger.Debug("got a response error while creating index: %s", createRes) } } diff --git a/internal/benchrunner/runners/system/runner.go b/internal/benchrunner/runners/system/runner.go index 868463a10..44119e9fe 100644 --- a/internal/benchrunner/runners/system/runner.go +++ b/internal/benchrunner/runners/system/runner.go @@ -11,6 +11,7 @@ import ( "errors" "fmt" "io" + "net/http" "os" "path/filepath" "strings" @@ -333,10 +334,20 @@ func (r *runner) collectAndSummarizeMetrics() (*metricsSummary, error) { func (r *runner) deleteDataStreamDocs(dataStream string) error { body := strings.NewReader(`{ "query": { "match_all": {} } }`) - _, err := r.options.ESAPI.DeleteByQuery([]string{dataStream}, body) + resp, err := r.options.ESAPI.DeleteByQuery([]string{dataStream}, body) if err != nil { - return err + return fmt.Errorf("failed to delete docs for data stream %s: %w", dataStream, err) + } + defer resp.Body.Close() + + if resp.StatusCode == http.StatusNotFound { + // Unavailable index is ok, this means that data is already not there. + return nil } + if resp.IsError() { + return fmt.Errorf("failed to delete data stream docs for data stream %s: %s", dataStream, resp.String()) + } + return nil } @@ -732,6 +743,9 @@ func (r *runner) reindexData() error { return fmt.Errorf("error getting mapping: %w", err) } defer mappingRes.Body.Close() + if mappingRes.IsError() { + return fmt.Errorf("error getting mapping: %s", mappingRes) + } body, err := io.ReadAll(mappingRes.Body) if err != nil { @@ -782,7 +796,7 @@ func (r *runner) reindexData() error { bodyReader := strings.NewReader(`{"query":{"match_all":{}}}`) logger.Debug("starting scrolling of events...") - res, err := r.options.ESAPI.Search( + resp, err := r.options.ESAPI.Search( r.options.ESAPI.Search.WithIndex(r.runtimeDataStream), r.options.ESAPI.Search.WithBody(bodyReader), r.options.ESAPI.Search.WithScroll(time.Minute), @@ -791,23 +805,16 @@ func (r *runner) reindexData() error { if err != nil { return fmt.Errorf("error executing search: %w", err) } - defer res.Body.Close() + defer resp.Body.Close() - type searchRes struct { - Error *struct { - Reason string `json:"reson"` - } `json:"error"` - ScrollID string `json:"_scroll_id"` - Hits []struct { - ID string `json:"_id"` - Source map[string]interface{} `json:"_source"` - } `json:"hits"` + if resp.IsError() { + return fmt.Errorf("failed to search events in data stream %s: %s", r.runtimeDataStream, resp.String()) } // Iterate through the search results using the Scroll API for { - var sr searchRes - if err := json.NewDecoder(res.Body).Decode(&sr); err != nil { + var sr searchResponse + if err := json.NewDecoder(resp.Body).Decode(&sr); err != nil { return fmt.Errorf("error decoding search response: %w", err) } @@ -819,40 +826,66 @@ func (r *runner) reindexData() error { break } - var bulkBodyBuilder strings.Builder - for _, hit := range sr.Hits { - bulkBodyBuilder.WriteString(fmt.Sprintf("{\"index\":{\"_index\":\"%s\",\"_id\":\"%s\"}}\n", indexName, hit.ID)) - enriched := r.enrichEventWithBenchmarkMetadata(hit.Source) - src, err := json.Marshal(enriched) - if err != nil { - return fmt.Errorf("error decoding _source: %w", err) - } - bulkBodyBuilder.WriteString(fmt.Sprintf("%s\n", string(src))) + err := r.bulkMetrics(indexName, sr) + if err != nil { + return err } + } - logger.Debugf("bulk request of %d events...", len(sr.Hits)) + logger.Debug("reindexing operation finished") + return nil +} + +type searchResponse struct { + Error *struct { + Reason string `json:"reson"` + } `json:"error"` + ScrollID string `json:"_scroll_id"` + Hits []struct { + ID string `json:"_id"` + Source map[string]interface{} `json:"_source"` + } `json:"hits"` +} - bulkRes, err := r.options.ESMetricsAPI.Bulk(strings.NewReader(bulkBodyBuilder.String())) +func (r *runner) bulkMetrics(indexName string, sr searchResponse) error { + var bulkBodyBuilder strings.Builder + for _, hit := range sr.Hits { + bulkBodyBuilder.WriteString(fmt.Sprintf("{\"index\":{\"_index\":\"%s\",\"_id\":\"%s\"}}\n", indexName, hit.ID)) + enriched := r.enrichEventWithBenchmarkMetadata(hit.Source) + src, err := json.Marshal(enriched) if err != nil { - return fmt.Errorf("error performing the bulk index request: %w", err) + return fmt.Errorf("error decoding _source: %w", err) } - bulkRes.Body.Close() + bulkBodyBuilder.WriteString(fmt.Sprintf("%s\n", string(src))) + } - if sr.ScrollID == "" { - return errors.New("error getting scroll ID") - } + logger.Debugf("bulk request of %d events...", len(sr.Hits)) - res, err = r.options.ESAPI.Scroll( - r.options.ESAPI.Scroll.WithScrollID(sr.ScrollID), - r.options.ESAPI.Scroll.WithScroll(time.Minute), - ) - if err != nil { - return fmt.Errorf("error executing scroll: %s", err) - } - res.Body.Close() + resp, err := r.options.ESMetricsAPI.Bulk(strings.NewReader(bulkBodyBuilder.String())) + if err != nil { + return fmt.Errorf("error performing the bulk index request: %w", err) + } + defer resp.Body.Close() + if resp.IsError() { + return fmt.Errorf("error performing the bulk index request: %s", resp.String()) + } + + if sr.ScrollID == "" { + return errors.New("error getting scroll ID") + } + + resp, err = r.options.ESAPI.Scroll( + r.options.ESAPI.Scroll.WithScrollID(sr.ScrollID), + r.options.ESAPI.Scroll.WithScroll(time.Minute), + ) + if err != nil { + return fmt.Errorf("error executing scroll: %s", err) + } + defer resp.Body.Close() + if resp.IsError() { + return fmt.Errorf("error executing scroll: %s", resp.String()) } - logger.Debug("reindexing operation finished") return nil } @@ -876,12 +909,17 @@ func (r *runner) enrichEventWithBenchmarkMetadata(e map[string]interface{}) map[ func getTotalHits(esapi *elasticsearch.API, dataStream string) (int, error) { resp, err := esapi.Count( esapi.Count.WithIndex(dataStream), + esapi.Count.WithIgnoreUnavailable(true), ) if err != nil { return 0, fmt.Errorf("could not search data stream: %w", err) } defer resp.Body.Close() + if resp.IsError() { + return 0, fmt.Errorf("failed to get hits count: %s", resp.String()) + } + var results struct { Count int Error *struct { diff --git a/internal/dump/componenttemplates.go b/internal/dump/componenttemplates.go index c6dec8447..a2a9fef6d 100644 --- a/internal/dump/componenttemplates.go +++ b/internal/dump/componenttemplates.go @@ -71,6 +71,10 @@ func getComponentTemplatesByName(ctx context.Context, api *elasticsearch.API, na } defer resp.Body.Close() + if resp.IsError() { + return nil, fmt.Errorf("failed to get component template %s: %s", name, resp.String()) + } + d, err := io.ReadAll(resp.Body) if err != nil { return nil, fmt.Errorf("failed to read response body: %w", err) diff --git a/internal/dump/ilmpolicies.go b/internal/dump/ilmpolicies.go index 60d2b5906..801bd5cf1 100644 --- a/internal/dump/ilmpolicies.go +++ b/internal/dump/ilmpolicies.go @@ -57,6 +57,10 @@ func getILMPolicyByName(ctx context.Context, api *elasticsearch.API, policy stri } defer resp.Body.Close() + if resp.IsError() { + return nil, fmt.Errorf("failed to get policy %s: %s", policy, resp.String()) + } + d, err := io.ReadAll(resp.Body) if err != nil { return nil, fmt.Errorf("failed to read response body: %w", err) diff --git a/internal/dump/indextemplates.go b/internal/dump/indextemplates.go index 5010c06b9..d48479d78 100644 --- a/internal/dump/indextemplates.go +++ b/internal/dump/indextemplates.go @@ -77,6 +77,10 @@ func getIndexTemplatesForPackage(ctx context.Context, api *elasticsearch.API, pa } defer resp.Body.Close() + if resp.IsError() { + return nil, fmt.Errorf("failed to get index templates: %s", resp.String()) + } + d, err := io.ReadAll(resp.Body) if err != nil { return nil, fmt.Errorf("failed to read response body: %w", err) diff --git a/internal/dump/ingestpipelines.go b/internal/dump/ingestpipelines.go index 71fc083b6..5e17b1ffa 100644 --- a/internal/dump/ingestpipelines.go +++ b/internal/dump/ingestpipelines.go @@ -71,6 +71,10 @@ func getIngestPipelineByID(ctx context.Context, api *elasticsearch.API, id strin } defer resp.Body.Close() + if resp.IsError() { + return nil, fmt.Errorf("failed to get ingest pipeline %s: %s", id, resp.String()) + } + d, err := io.ReadAll(resp.Body) if err != nil { return nil, fmt.Errorf("failed to read response body: %w", err) diff --git a/internal/elasticsearch/client.go b/internal/elasticsearch/client.go index ba43aa33e..986442102 100644 --- a/internal/elasticsearch/client.go +++ b/internal/elasticsearch/client.go @@ -132,6 +132,10 @@ func (client *Client) CheckHealth(ctx context.Context) error { } defer resp.Body.Close() + if resp.StatusCode != http.StatusOK { + return fmt.Errorf("failed to check cluster health: %s", resp.String()) + } + body, err := io.ReadAll(resp.Body) if err != nil { return fmt.Errorf("error reading cluster health response: %w", err) @@ -178,6 +182,10 @@ func (client *Client) redHealthCause(ctx context.Context) (string, error) { return "", fmt.Errorf("error reading internal health response: %w", err) } + if resp.StatusCode != http.StatusOK { + return "", fmt.Errorf("failed to get cause of red health; API status code = %d; response body = %s", resp.StatusCode, string(body)) + } + var internalHealth struct { Status string `json:"status"` Indicators map[string]struct { diff --git a/internal/elasticsearch/ingest/pipeline.go b/internal/elasticsearch/ingest/pipeline.go index f4fc1677c..c40f5b9bf 100644 --- a/internal/elasticsearch/ingest/pipeline.go +++ b/internal/elasticsearch/ingest/pipeline.go @@ -117,11 +117,24 @@ func SimulatePipeline(api *elasticsearch.API, pipelineName string, events []json func UninstallPipelines(api *elasticsearch.API, pipelines []Pipeline) error { for _, p := range pipelines { - resp, err := api.Ingest.DeletePipeline(p.Name) + err := uninstallPipeline(api, p.Name) if err != nil { - return fmt.Errorf("delete pipeline API call failed (pipelineName: %s): %w", p.Name, err) + return err } - resp.Body.Close() } return nil } + +func uninstallPipeline(api *elasticsearch.API, name string) error { + resp, err := api.Ingest.DeletePipeline(name) + if err != nil { + return fmt.Errorf("delete pipeline API call failed (pipelineName: %s): %w", name, err) + } + defer resp.Body.Close() + + if resp.IsError() { + return fmt.Errorf("failed to uninstall pipeline %s: %s", name, resp.String()) + } + + return nil +} diff --git a/internal/kibana/dashboards.go b/internal/kibana/dashboards.go index 9b47fccf7..38988a94c 100644 --- a/internal/kibana/dashboards.go +++ b/internal/kibana/dashboards.go @@ -8,6 +8,7 @@ import ( "encoding/json" "errors" "fmt" + "net/http" "strings" "github.com/elastic/elastic-package/internal/common" @@ -37,6 +38,10 @@ func (c *Client) Export(dashboardIDs []string) ([]common.MapStr, error) { return nil, fmt.Errorf("could not export dashboards; API status code = %d; response body = %s: %w", statusCode, respBody, err) } + if statusCode != http.StatusOK { + return nil, fmt.Errorf("could not export dashboards; API status code = %d; response body = %s", statusCode, respBody) + } + var exported exportedType err = json.Unmarshal(respBody, &exported) if err != nil { diff --git a/internal/kibana/saved_objects.go b/internal/kibana/saved_objects.go index 2956bf112..348332fb4 100644 --- a/internal/kibana/saved_objects.go +++ b/internal/kibana/saved_objects.go @@ -7,6 +7,7 @@ package kibana import ( "encoding/json" "fmt" + "net/http" "sort" "strings" @@ -95,6 +96,10 @@ func (c *Client) findDashboardsNextPage(page int) (*savedObjectsResponse, error) return nil, fmt.Errorf("could not find dashboards; API status code = %d; response body = %s: %w", statusCode, respBody, err) } + if statusCode != http.StatusOK { + return nil, fmt.Errorf("could not find dashboards; API status code = %d; response body = %s", statusCode, respBody) + } + var r savedObjectsResponse err = json.Unmarshal(respBody, &r) if err != nil { diff --git a/internal/testrunner/runners/system/runner.go b/internal/testrunner/runners/system/runner.go index 0e25fc39a..5f1ef8237 100644 --- a/internal/testrunner/runners/system/runner.go +++ b/internal/testrunner/runners/system/runner.go @@ -9,6 +9,7 @@ import ( "errors" "fmt" "math/rand" + "net/http" "os" "path/filepath" "regexp" @@ -322,15 +323,24 @@ func createTestRunID() string { } func (r *runner) isSyntheticsEnabled(dataStream, componentTemplatePackage string) (bool, error) { - logger.Debugf("check whether or not synthetics is enabled (component template %s)...", componentTemplatePackage) resp, err := r.options.API.Cluster.GetComponentTemplate( r.options.API.Cluster.GetComponentTemplate.WithName(componentTemplatePackage), ) if err != nil { - return false, fmt.Errorf("could not get component template from data stream %s: %w", dataStream, err) + return false, fmt.Errorf("could not get component template %s from data stream %s: %w", componentTemplatePackage, dataStream, err) } defer resp.Body.Close() + if resp.StatusCode == http.StatusNotFound { + // @package component template doesn't exist before 8.2. On these versions synthetics was not supported + // in any case, so just return false. + logger.Debugf("no component template %s found for data stream %s", componentTemplatePackage, dataStream) + return false, nil + } + if resp.IsError() { + return false, fmt.Errorf("could not get component template %s for data stream %s: %s", componentTemplatePackage, dataStream, resp.String()) + } + var results struct { ComponentTemplates []struct { Name string `json:"name"` @@ -351,11 +361,11 @@ func (r *runner) isSyntheticsEnabled(dataStream, componentTemplatePackage string } if len(results.ComponentTemplates) == 0 { - logger.Debugf("no component template found for data stream %s", dataStream) + logger.Debugf("no component template %s found for data stream %s", componentTemplatePackage, dataStream) return false, nil } if len(results.ComponentTemplates) != 1 { - return false, fmt.Errorf("unexpected response, not found component template") + return false, fmt.Errorf("ambiguous response, expected one component template for %s, found %d", componentTemplatePackage, len(results.ComponentTemplates)) } template := results.ComponentTemplates[0] @@ -390,12 +400,22 @@ func (r *runner) getDocs(dataStream string) (*hits, error) { r.options.API.Search.WithSize(elasticsearchQuerySize), r.options.API.Search.WithSource("true"), r.options.API.Search.WithBody(strings.NewReader(allFieldsBody)), + r.options.API.Search.WithIgnoreUnavailable(true), ) if err != nil { return nil, fmt.Errorf("could not search data stream: %w", err) } defer resp.Body.Close() + if resp.StatusCode == http.StatusServiceUnavailable && strings.Contains(resp.String(), "no_shard_available_action_exception") { + // Index is being created, but no shards are available yet. + // See https://github.com/elastic/elasticsearch/issues/65846 + return &hits{}, nil + } + if resp.IsError() { + return nil, fmt.Errorf("failed to search docs for data stream %s: %s", dataStream, resp.String()) + } + var results struct { Hits struct { Total struct { @@ -583,7 +603,10 @@ func (r *runner) runTest(config *testConfig, ctxt servicedeployer.ServiceContext } hits, err := r.getDocs(dataStream) - return hits.size() == 0, err + if err != nil { + return false, err + } + return hits.size() == 0, nil }, 2*time.Minute) if err != nil || !cleared { if err == nil { @@ -645,10 +668,13 @@ func (r *runner) runTest(config *testConfig, ctxt servicedeployer.ServiceContext var err error hits, err = r.getDocs(dataStream) + if err != nil { + return false, err + } if config.Assert.HitCount > 0 { if hits.size() < config.Assert.HitCount { - return false, err + return false, nil } ret := hits.size() == oldHits @@ -657,9 +683,10 @@ func (r *runner) runTest(config *testConfig, ctxt servicedeployer.ServiceContext time.Sleep(4 * time.Second) } - return ret, err + return ret, nil } - return hits.size() > 0, err + + return hits.size() > 0, nil }, waitForDataTimeout) if config.Service != "" && !config.IgnoreServiceError { @@ -681,6 +708,7 @@ func (r *runner) runTest(config *testConfig, ctxt servicedeployer.ServiceContext return result.WithError(fmt.Errorf("%s", result.FailureMsg)) } + logger.Debugf("check whether or not synthetics is enabled (component template %s)...", componentTemplatePackage) syntheticEnabled, err := r.isSyntheticsEnabled(dataStream, componentTemplatePackage) if err != nil { return result.WithError(fmt.Errorf("failed to check if synthetic source is enabled: %w", err)) @@ -1160,9 +1188,18 @@ func (r *runner) previewTransform(transformId string) ([]common.MapStr, error) { func deleteDataStreamDocs(api *elasticsearch.API, dataStream string) error { body := strings.NewReader(`{ "query": { "match_all": {} } }`) - _, err := api.DeleteByQuery([]string{dataStream}, body) + resp, err := api.DeleteByQuery([]string{dataStream}, body) if err != nil { - return err + return fmt.Errorf("failed to delete data stream docs: %w", err) + } + defer resp.Body.Close() + + if resp.StatusCode == http.StatusNotFound { + // Unavailable index is ok, this means that data is already not there. + return nil + } + if resp.IsError() { + return fmt.Errorf("failed to delete data stream docs for data stream %s: %s", dataStream, resp.String()) } return nil