From 2c241f1978b1a366c115aa9abd417f2fda3530a6 Mon Sep 17 00:00:00 2001 From: Jaime Soriano Pastor Date: Mon, 13 Nov 2023 10:50:46 +0100 Subject: [PATCH] Review handling of status code in HTTP requests (#1549) In a number of places we were not handling HTTP status codes, what may hide issues or complicate debugging. Also ensure that body is closed in some cases where it was not being closed. --- internal/benchrunner/runners/rally/metrics.go | 34 ++--- internal/benchrunner/runners/rally/runner.go | 116 +++++++++++------ .../benchrunner/runners/system/metrics.go | 2 +- internal/benchrunner/runners/system/runner.go | 118 ++++++++++++------ internal/dump/componenttemplates.go | 4 + internal/dump/ilmpolicies.go | 4 + internal/dump/indextemplates.go | 4 + internal/dump/ingestpipelines.go | 4 + internal/elasticsearch/client.go | 8 ++ internal/elasticsearch/ingest/pipeline.go | 19 ++- internal/kibana/dashboards.go | 5 + internal/kibana/saved_objects.go | 5 + internal/testrunner/runners/system/runner.go | 57 +++++++-- 13 files changed, 272 insertions(+), 108 deletions(-) diff --git a/internal/benchrunner/runners/rally/metrics.go b/internal/benchrunner/runners/rally/metrics.go index 11f5d405f..8cac07cae 100644 --- a/internal/benchrunner/runners/rally/metrics.go +++ b/internal/benchrunner/runners/rally/metrics.go @@ -112,9 +112,14 @@ func (c *collector) stop() { } func (c *collector) collectMetricsBeforeRallyRun() { - _, err := c.esAPI.Indices.Refresh(c.esAPI.Indices.Refresh.WithIndex(c.datastream)) + resp, err := c.esAPI.Indices.Refresh(c.esAPI.Indices.Refresh.WithIndex(c.datastream)) if err != nil { - logger.Errorf("unable to refresh data stream at the beginning of rally run") + logger.Errorf("unable to refresh data stream at the beginning of rally run: %s", err) + return + } + defer resp.Body.Close() + if resp.IsError() { + logger.Errorf("unable to refresh data stream at the beginning of rally run: %s", resp.String()) return } @@ -157,19 +162,13 @@ func (c *collector) publish(events [][]byte) { logger.Errorf("error indexing event in metricstore: %w", err) return } - - if resp.Body == nil { - logger.Errorf("empty index response body from metricstore: %w", err) - return - } + defer resp.Body.Close() body, err := io.ReadAll(resp.Body) if err != nil { logger.Errorf("failed to read index response body from metricstore: %w", err) } - resp.Body.Close() - if resp.StatusCode != 201 { logger.Errorf("error indexing event in metricstore (%d): %s: %v", resp.StatusCode, resp.Status(), elasticsearch.NewError(body)) } @@ -187,7 +186,7 @@ func (c *collector) createMetricsIndex() { logger.Debugf("creating %s index in metricstore...", c.indexName()) - createRes, err := c.metricsAPI.Indices.Create( + resp, err := c.metricsAPI.Indices.Create( c.indexName(), c.metricsAPI.Indices.Create.WithBody(reader), ) @@ -195,10 +194,10 @@ func (c *collector) createMetricsIndex() { logger.Errorf("could not create index: %w", err) return } - createRes.Body.Close() + defer resp.Body.Close() - if createRes.IsError() { - logger.Errorf("got a response error while creating index") + if resp.IsError() { + logger.Errorf("got a response error while creating index: %s", resp.String()) } } @@ -287,9 +286,14 @@ func (c *collector) collectDiskUsage() map[string]ingest.DiskUsage { } func (c *collector) collectMetricsAfterRallyRun() { - _, err := c.esAPI.Indices.Refresh(c.esAPI.Indices.Refresh.WithIndex(c.datastream)) + resp, err := c.esAPI.Indices.Refresh(c.esAPI.Indices.Refresh.WithIndex(c.datastream)) if err != nil { - logger.Errorf("unable to refresh data stream at the end of rally run") + logger.Errorf("unable to refresh data stream at the end of rally run: %s", err) + return + } + defer resp.Body.Close() + if resp.IsError() { + logger.Errorf("unable to refresh data stream at the end of rally run: %s", resp.String()) return } diff --git a/internal/benchrunner/runners/rally/runner.go b/internal/benchrunner/runners/rally/runner.go index 1fae90bc9..48e53e421 100644 --- a/internal/benchrunner/runners/rally/runner.go +++ b/internal/benchrunner/runners/rally/runner.go @@ -12,6 +12,7 @@ import ( "errors" "fmt" "io" + "net/http" "os" "os/exec" "path/filepath" @@ -336,10 +337,20 @@ func (r *runner) collectAndSummarizeMetrics() (*metricsSummary, error) { func (r *runner) deleteDataStreamDocs(dataStream string) error { body := strings.NewReader(`{ "query": { "match_all": {} } }`) - _, err := r.options.ESAPI.DeleteByQuery([]string{dataStream}, body) + resp, err := r.options.ESAPI.DeleteByQuery([]string{dataStream}, body) if err != nil { - return err + return fmt.Errorf("failed to delete data stream docs for data stream %s: %w", dataStream, err) + } + defer resp.Body.Close() + + if resp.StatusCode == http.StatusNotFound { + // Unavailable index is ok, this means that data is already not there. + return nil } + if resp.IsError() { + return fmt.Errorf("failed to delete data stream docs for data stream %s: %s", dataStream, resp.String()) + } + return nil } @@ -665,6 +676,9 @@ func (r *runner) reindexData() error { return fmt.Errorf("error getting mapping: %w", err) } defer mappingRes.Body.Close() + if mappingRes.IsError() { + return fmt.Errorf("error getting mapping: %s", mappingRes) + } body, err := io.ReadAll(mappingRes.Body) if err != nil { @@ -709,7 +723,7 @@ func (r *runner) reindexData() error { defer createRes.Body.Close() if createRes.IsError() { - return errors.New("got a response error while creating index") + return fmt.Errorf("got a response error while creating index: %s", createRes) } bodyReader := strings.NewReader(`{"query":{"match_all":{}}}`) @@ -725,21 +739,13 @@ func (r *runner) reindexData() error { return fmt.Errorf("error executing search: %w", err) } defer res.Body.Close() - - type searchRes struct { - Error *struct { - Reason string `json:"reson"` - } `json:"error"` - ScrollID string `json:"_scroll_id"` - Hits []struct { - ID string `json:"_id"` - Source map[string]interface{} `json:"_source"` - } `json:"hits"` + if res.IsError() { + return fmt.Errorf("error executing search: %s", res) } // Iterate through the search results using the Scroll API for { - var sr searchRes + var sr searchResponse if err := json.NewDecoder(res.Body).Decode(&sr); err != nil { return fmt.Errorf("error decoding search response: %w", err) } @@ -752,40 +758,67 @@ func (r *runner) reindexData() error { break } - var bulkBodyBuilder strings.Builder - for _, hit := range sr.Hits { - bulkBodyBuilder.WriteString(fmt.Sprintf("{\"index\":{\"_index\":\"%s\",\"_id\":\"%s\"}}\n", indexName, hit.ID)) - enriched := r.enrichEventWithBenchmarkMetadata(hit.Source) - src, err := json.Marshal(enriched) - if err != nil { - return fmt.Errorf("error decoding _source: %w", err) - } - bulkBodyBuilder.WriteString(fmt.Sprintf("%s\n", string(src))) + err := r.bulkMetrics(indexName, sr) + if err != nil { + return err } + } + + logger.Debug("reindexing operation finished") + return nil +} - logger.Debugf("bulk request of %d events...", len(sr.Hits)) +type searchResponse struct { + Error *struct { + Reason string `json:"reson"` + } `json:"error"` + ScrollID string `json:"_scroll_id"` + Hits []struct { + ID string `json:"_id"` + Source map[string]interface{} `json:"_source"` + } `json:"hits"` +} - bulkRes, err := r.options.ESMetricsAPI.Bulk(strings.NewReader(bulkBodyBuilder.String())) +func (r *runner) bulkMetrics(indexName string, sr searchResponse) error { + var bulkBodyBuilder strings.Builder + for _, hit := range sr.Hits { + bulkBodyBuilder.WriteString(fmt.Sprintf("{\"index\":{\"_index\":\"%s\",\"_id\":\"%s\"}}\n", indexName, hit.ID)) + enriched := r.enrichEventWithBenchmarkMetadata(hit.Source) + src, err := json.Marshal(enriched) if err != nil { - return fmt.Errorf("error performing the bulk index request: %w", err) + return fmt.Errorf("error decoding _source: %w", err) } - bulkRes.Body.Close() + bulkBodyBuilder.WriteString(fmt.Sprintf("%s\n", string(src))) + } - if sr.ScrollID == "" { - return errors.New("error getting scroll ID") - } + logger.Debugf("bulk request of %d events...", len(sr.Hits)) - res, err = r.options.ESAPI.Scroll( - r.options.ESAPI.Scroll.WithScrollID(sr.ScrollID), - r.options.ESAPI.Scroll.WithScroll(time.Minute), - ) - if err != nil { - return fmt.Errorf("error executing scroll: %s", err) - } - res.Body.Close() + resp, err := r.options.ESMetricsAPI.Bulk(strings.NewReader(bulkBodyBuilder.String())) + if err != nil { + return fmt.Errorf("error performing the bulk index request: %w", err) + } + defer resp.Body.Close() + if resp.IsError() { + return fmt.Errorf("error performing the bulk index request: %s", resp.String()) + } + + if sr.ScrollID == "" { + return errors.New("error getting scroll ID") + } + + resp, err = r.options.ESAPI.Scroll( + r.options.ESAPI.Scroll.WithScrollID(sr.ScrollID), + r.options.ESAPI.Scroll.WithScroll(time.Minute), + ) + if err != nil { + return fmt.Errorf("error executing scroll: %s", err) + } + defer resp.Body.Close() + + if resp.IsError() { + return fmt.Errorf("error executing scroll: %s", resp.String()) } - logger.Debug("reindexing operation finished") return nil } @@ -809,12 +842,17 @@ func (r *runner) enrichEventWithBenchmarkMetadata(e map[string]interface{}) map[ func getTotalHits(esapi *elasticsearch.API, dataStream string) (int, error) { resp, err := esapi.Count( esapi.Count.WithIndex(dataStream), + esapi.Count.WithIgnoreUnavailable(true), ) if err != nil { return 0, fmt.Errorf("could not search data stream: %w", err) } defer resp.Body.Close() + if resp.IsError() { + return 0, fmt.Errorf("failed to get hits count: %s", resp.String()) + } + var results struct { Count int Error *struct { diff --git a/internal/benchrunner/runners/system/metrics.go b/internal/benchrunner/runners/system/metrics.go index 2b48f227d..bd3c0cb51 100644 --- a/internal/benchrunner/runners/system/metrics.go +++ b/internal/benchrunner/runners/system/metrics.go @@ -196,7 +196,7 @@ func (c *collector) createMetricsIndex() { createRes.Body.Close() if createRes.IsError() { - logger.Debug("got a response error while creating index") + logger.Debug("got a response error while creating index: %s", createRes) } } diff --git a/internal/benchrunner/runners/system/runner.go b/internal/benchrunner/runners/system/runner.go index 868463a10..44119e9fe 100644 --- a/internal/benchrunner/runners/system/runner.go +++ b/internal/benchrunner/runners/system/runner.go @@ -11,6 +11,7 @@ import ( "errors" "fmt" "io" + "net/http" "os" "path/filepath" "strings" @@ -333,10 +334,20 @@ func (r *runner) collectAndSummarizeMetrics() (*metricsSummary, error) { func (r *runner) deleteDataStreamDocs(dataStream string) error { body := strings.NewReader(`{ "query": { "match_all": {} } }`) - _, err := r.options.ESAPI.DeleteByQuery([]string{dataStream}, body) + resp, err := r.options.ESAPI.DeleteByQuery([]string{dataStream}, body) if err != nil { - return err + return fmt.Errorf("failed to delete docs for data stream %s: %w", dataStream, err) + } + defer resp.Body.Close() + + if resp.StatusCode == http.StatusNotFound { + // Unavailable index is ok, this means that data is already not there. + return nil } + if resp.IsError() { + return fmt.Errorf("failed to delete data stream docs for data stream %s: %s", dataStream, resp.String()) + } + return nil } @@ -732,6 +743,9 @@ func (r *runner) reindexData() error { return fmt.Errorf("error getting mapping: %w", err) } defer mappingRes.Body.Close() + if mappingRes.IsError() { + return fmt.Errorf("error getting mapping: %s", mappingRes) + } body, err := io.ReadAll(mappingRes.Body) if err != nil { @@ -782,7 +796,7 @@ func (r *runner) reindexData() error { bodyReader := strings.NewReader(`{"query":{"match_all":{}}}`) logger.Debug("starting scrolling of events...") - res, err := r.options.ESAPI.Search( + resp, err := r.options.ESAPI.Search( r.options.ESAPI.Search.WithIndex(r.runtimeDataStream), r.options.ESAPI.Search.WithBody(bodyReader), r.options.ESAPI.Search.WithScroll(time.Minute), @@ -791,23 +805,16 @@ func (r *runner) reindexData() error { if err != nil { return fmt.Errorf("error executing search: %w", err) } - defer res.Body.Close() + defer resp.Body.Close() - type searchRes struct { - Error *struct { - Reason string `json:"reson"` - } `json:"error"` - ScrollID string `json:"_scroll_id"` - Hits []struct { - ID string `json:"_id"` - Source map[string]interface{} `json:"_source"` - } `json:"hits"` + if resp.IsError() { + return fmt.Errorf("failed to search events in data stream %s: %s", r.runtimeDataStream, resp.String()) } // Iterate through the search results using the Scroll API for { - var sr searchRes - if err := json.NewDecoder(res.Body).Decode(&sr); err != nil { + var sr searchResponse + if err := json.NewDecoder(resp.Body).Decode(&sr); err != nil { return fmt.Errorf("error decoding search response: %w", err) } @@ -819,40 +826,66 @@ func (r *runner) reindexData() error { break } - var bulkBodyBuilder strings.Builder - for _, hit := range sr.Hits { - bulkBodyBuilder.WriteString(fmt.Sprintf("{\"index\":{\"_index\":\"%s\",\"_id\":\"%s\"}}\n", indexName, hit.ID)) - enriched := r.enrichEventWithBenchmarkMetadata(hit.Source) - src, err := json.Marshal(enriched) - if err != nil { - return fmt.Errorf("error decoding _source: %w", err) - } - bulkBodyBuilder.WriteString(fmt.Sprintf("%s\n", string(src))) + err := r.bulkMetrics(indexName, sr) + if err != nil { + return err } + } - logger.Debugf("bulk request of %d events...", len(sr.Hits)) + logger.Debug("reindexing operation finished") + return nil +} + +type searchResponse struct { + Error *struct { + Reason string `json:"reson"` + } `json:"error"` + ScrollID string `json:"_scroll_id"` + Hits []struct { + ID string `json:"_id"` + Source map[string]interface{} `json:"_source"` + } `json:"hits"` +} - bulkRes, err := r.options.ESMetricsAPI.Bulk(strings.NewReader(bulkBodyBuilder.String())) +func (r *runner) bulkMetrics(indexName string, sr searchResponse) error { + var bulkBodyBuilder strings.Builder + for _, hit := range sr.Hits { + bulkBodyBuilder.WriteString(fmt.Sprintf("{\"index\":{\"_index\":\"%s\",\"_id\":\"%s\"}}\n", indexName, hit.ID)) + enriched := r.enrichEventWithBenchmarkMetadata(hit.Source) + src, err := json.Marshal(enriched) if err != nil { - return fmt.Errorf("error performing the bulk index request: %w", err) + return fmt.Errorf("error decoding _source: %w", err) } - bulkRes.Body.Close() + bulkBodyBuilder.WriteString(fmt.Sprintf("%s\n", string(src))) + } - if sr.ScrollID == "" { - return errors.New("error getting scroll ID") - } + logger.Debugf("bulk request of %d events...", len(sr.Hits)) - res, err = r.options.ESAPI.Scroll( - r.options.ESAPI.Scroll.WithScrollID(sr.ScrollID), - r.options.ESAPI.Scroll.WithScroll(time.Minute), - ) - if err != nil { - return fmt.Errorf("error executing scroll: %s", err) - } - res.Body.Close() + resp, err := r.options.ESMetricsAPI.Bulk(strings.NewReader(bulkBodyBuilder.String())) + if err != nil { + return fmt.Errorf("error performing the bulk index request: %w", err) + } + defer resp.Body.Close() + if resp.IsError() { + return fmt.Errorf("error performing the bulk index request: %s", resp.String()) + } + + if sr.ScrollID == "" { + return errors.New("error getting scroll ID") + } + + resp, err = r.options.ESAPI.Scroll( + r.options.ESAPI.Scroll.WithScrollID(sr.ScrollID), + r.options.ESAPI.Scroll.WithScroll(time.Minute), + ) + if err != nil { + return fmt.Errorf("error executing scroll: %s", err) + } + defer resp.Body.Close() + if resp.IsError() { + return fmt.Errorf("error executing scroll: %s", resp.String()) } - logger.Debug("reindexing operation finished") return nil } @@ -876,12 +909,17 @@ func (r *runner) enrichEventWithBenchmarkMetadata(e map[string]interface{}) map[ func getTotalHits(esapi *elasticsearch.API, dataStream string) (int, error) { resp, err := esapi.Count( esapi.Count.WithIndex(dataStream), + esapi.Count.WithIgnoreUnavailable(true), ) if err != nil { return 0, fmt.Errorf("could not search data stream: %w", err) } defer resp.Body.Close() + if resp.IsError() { + return 0, fmt.Errorf("failed to get hits count: %s", resp.String()) + } + var results struct { Count int Error *struct { diff --git a/internal/dump/componenttemplates.go b/internal/dump/componenttemplates.go index c6dec8447..a2a9fef6d 100644 --- a/internal/dump/componenttemplates.go +++ b/internal/dump/componenttemplates.go @@ -71,6 +71,10 @@ func getComponentTemplatesByName(ctx context.Context, api *elasticsearch.API, na } defer resp.Body.Close() + if resp.IsError() { + return nil, fmt.Errorf("failed to get component template %s: %s", name, resp.String()) + } + d, err := io.ReadAll(resp.Body) if err != nil { return nil, fmt.Errorf("failed to read response body: %w", err) diff --git a/internal/dump/ilmpolicies.go b/internal/dump/ilmpolicies.go index 60d2b5906..801bd5cf1 100644 --- a/internal/dump/ilmpolicies.go +++ b/internal/dump/ilmpolicies.go @@ -57,6 +57,10 @@ func getILMPolicyByName(ctx context.Context, api *elasticsearch.API, policy stri } defer resp.Body.Close() + if resp.IsError() { + return nil, fmt.Errorf("failed to get policy %s: %s", policy, resp.String()) + } + d, err := io.ReadAll(resp.Body) if err != nil { return nil, fmt.Errorf("failed to read response body: %w", err) diff --git a/internal/dump/indextemplates.go b/internal/dump/indextemplates.go index 5010c06b9..d48479d78 100644 --- a/internal/dump/indextemplates.go +++ b/internal/dump/indextemplates.go @@ -77,6 +77,10 @@ func getIndexTemplatesForPackage(ctx context.Context, api *elasticsearch.API, pa } defer resp.Body.Close() + if resp.IsError() { + return nil, fmt.Errorf("failed to get index templates: %s", resp.String()) + } + d, err := io.ReadAll(resp.Body) if err != nil { return nil, fmt.Errorf("failed to read response body: %w", err) diff --git a/internal/dump/ingestpipelines.go b/internal/dump/ingestpipelines.go index 71fc083b6..5e17b1ffa 100644 --- a/internal/dump/ingestpipelines.go +++ b/internal/dump/ingestpipelines.go @@ -71,6 +71,10 @@ func getIngestPipelineByID(ctx context.Context, api *elasticsearch.API, id strin } defer resp.Body.Close() + if resp.IsError() { + return nil, fmt.Errorf("failed to get ingest pipeline %s: %s", id, resp.String()) + } + d, err := io.ReadAll(resp.Body) if err != nil { return nil, fmt.Errorf("failed to read response body: %w", err) diff --git a/internal/elasticsearch/client.go b/internal/elasticsearch/client.go index ba43aa33e..986442102 100644 --- a/internal/elasticsearch/client.go +++ b/internal/elasticsearch/client.go @@ -132,6 +132,10 @@ func (client *Client) CheckHealth(ctx context.Context) error { } defer resp.Body.Close() + if resp.StatusCode != http.StatusOK { + return fmt.Errorf("failed to check cluster health: %s", resp.String()) + } + body, err := io.ReadAll(resp.Body) if err != nil { return fmt.Errorf("error reading cluster health response: %w", err) @@ -178,6 +182,10 @@ func (client *Client) redHealthCause(ctx context.Context) (string, error) { return "", fmt.Errorf("error reading internal health response: %w", err) } + if resp.StatusCode != http.StatusOK { + return "", fmt.Errorf("failed to get cause of red health; API status code = %d; response body = %s", resp.StatusCode, string(body)) + } + var internalHealth struct { Status string `json:"status"` Indicators map[string]struct { diff --git a/internal/elasticsearch/ingest/pipeline.go b/internal/elasticsearch/ingest/pipeline.go index f4fc1677c..c40f5b9bf 100644 --- a/internal/elasticsearch/ingest/pipeline.go +++ b/internal/elasticsearch/ingest/pipeline.go @@ -117,11 +117,24 @@ func SimulatePipeline(api *elasticsearch.API, pipelineName string, events []json func UninstallPipelines(api *elasticsearch.API, pipelines []Pipeline) error { for _, p := range pipelines { - resp, err := api.Ingest.DeletePipeline(p.Name) + err := uninstallPipeline(api, p.Name) if err != nil { - return fmt.Errorf("delete pipeline API call failed (pipelineName: %s): %w", p.Name, err) + return err } - resp.Body.Close() } return nil } + +func uninstallPipeline(api *elasticsearch.API, name string) error { + resp, err := api.Ingest.DeletePipeline(name) + if err != nil { + return fmt.Errorf("delete pipeline API call failed (pipelineName: %s): %w", name, err) + } + defer resp.Body.Close() + + if resp.IsError() { + return fmt.Errorf("failed to uninstall pipeline %s: %s", name, resp.String()) + } + + return nil +} diff --git a/internal/kibana/dashboards.go b/internal/kibana/dashboards.go index 9b47fccf7..38988a94c 100644 --- a/internal/kibana/dashboards.go +++ b/internal/kibana/dashboards.go @@ -8,6 +8,7 @@ import ( "encoding/json" "errors" "fmt" + "net/http" "strings" "github.com/elastic/elastic-package/internal/common" @@ -37,6 +38,10 @@ func (c *Client) Export(dashboardIDs []string) ([]common.MapStr, error) { return nil, fmt.Errorf("could not export dashboards; API status code = %d; response body = %s: %w", statusCode, respBody, err) } + if statusCode != http.StatusOK { + return nil, fmt.Errorf("could not export dashboards; API status code = %d; response body = %s", statusCode, respBody) + } + var exported exportedType err = json.Unmarshal(respBody, &exported) if err != nil { diff --git a/internal/kibana/saved_objects.go b/internal/kibana/saved_objects.go index 2956bf112..348332fb4 100644 --- a/internal/kibana/saved_objects.go +++ b/internal/kibana/saved_objects.go @@ -7,6 +7,7 @@ package kibana import ( "encoding/json" "fmt" + "net/http" "sort" "strings" @@ -95,6 +96,10 @@ func (c *Client) findDashboardsNextPage(page int) (*savedObjectsResponse, error) return nil, fmt.Errorf("could not find dashboards; API status code = %d; response body = %s: %w", statusCode, respBody, err) } + if statusCode != http.StatusOK { + return nil, fmt.Errorf("could not find dashboards; API status code = %d; response body = %s", statusCode, respBody) + } + var r savedObjectsResponse err = json.Unmarshal(respBody, &r) if err != nil { diff --git a/internal/testrunner/runners/system/runner.go b/internal/testrunner/runners/system/runner.go index 0e25fc39a..5f1ef8237 100644 --- a/internal/testrunner/runners/system/runner.go +++ b/internal/testrunner/runners/system/runner.go @@ -9,6 +9,7 @@ import ( "errors" "fmt" "math/rand" + "net/http" "os" "path/filepath" "regexp" @@ -322,15 +323,24 @@ func createTestRunID() string { } func (r *runner) isSyntheticsEnabled(dataStream, componentTemplatePackage string) (bool, error) { - logger.Debugf("check whether or not synthetics is enabled (component template %s)...", componentTemplatePackage) resp, err := r.options.API.Cluster.GetComponentTemplate( r.options.API.Cluster.GetComponentTemplate.WithName(componentTemplatePackage), ) if err != nil { - return false, fmt.Errorf("could not get component template from data stream %s: %w", dataStream, err) + return false, fmt.Errorf("could not get component template %s from data stream %s: %w", componentTemplatePackage, dataStream, err) } defer resp.Body.Close() + if resp.StatusCode == http.StatusNotFound { + // @package component template doesn't exist before 8.2. On these versions synthetics was not supported + // in any case, so just return false. + logger.Debugf("no component template %s found for data stream %s", componentTemplatePackage, dataStream) + return false, nil + } + if resp.IsError() { + return false, fmt.Errorf("could not get component template %s for data stream %s: %s", componentTemplatePackage, dataStream, resp.String()) + } + var results struct { ComponentTemplates []struct { Name string `json:"name"` @@ -351,11 +361,11 @@ func (r *runner) isSyntheticsEnabled(dataStream, componentTemplatePackage string } if len(results.ComponentTemplates) == 0 { - logger.Debugf("no component template found for data stream %s", dataStream) + logger.Debugf("no component template %s found for data stream %s", componentTemplatePackage, dataStream) return false, nil } if len(results.ComponentTemplates) != 1 { - return false, fmt.Errorf("unexpected response, not found component template") + return false, fmt.Errorf("ambiguous response, expected one component template for %s, found %d", componentTemplatePackage, len(results.ComponentTemplates)) } template := results.ComponentTemplates[0] @@ -390,12 +400,22 @@ func (r *runner) getDocs(dataStream string) (*hits, error) { r.options.API.Search.WithSize(elasticsearchQuerySize), r.options.API.Search.WithSource("true"), r.options.API.Search.WithBody(strings.NewReader(allFieldsBody)), + r.options.API.Search.WithIgnoreUnavailable(true), ) if err != nil { return nil, fmt.Errorf("could not search data stream: %w", err) } defer resp.Body.Close() + if resp.StatusCode == http.StatusServiceUnavailable && strings.Contains(resp.String(), "no_shard_available_action_exception") { + // Index is being created, but no shards are available yet. + // See https://github.com/elastic/elasticsearch/issues/65846 + return &hits{}, nil + } + if resp.IsError() { + return nil, fmt.Errorf("failed to search docs for data stream %s: %s", dataStream, resp.String()) + } + var results struct { Hits struct { Total struct { @@ -583,7 +603,10 @@ func (r *runner) runTest(config *testConfig, ctxt servicedeployer.ServiceContext } hits, err := r.getDocs(dataStream) - return hits.size() == 0, err + if err != nil { + return false, err + } + return hits.size() == 0, nil }, 2*time.Minute) if err != nil || !cleared { if err == nil { @@ -645,10 +668,13 @@ func (r *runner) runTest(config *testConfig, ctxt servicedeployer.ServiceContext var err error hits, err = r.getDocs(dataStream) + if err != nil { + return false, err + } if config.Assert.HitCount > 0 { if hits.size() < config.Assert.HitCount { - return false, err + return false, nil } ret := hits.size() == oldHits @@ -657,9 +683,10 @@ func (r *runner) runTest(config *testConfig, ctxt servicedeployer.ServiceContext time.Sleep(4 * time.Second) } - return ret, err + return ret, nil } - return hits.size() > 0, err + + return hits.size() > 0, nil }, waitForDataTimeout) if config.Service != "" && !config.IgnoreServiceError { @@ -681,6 +708,7 @@ func (r *runner) runTest(config *testConfig, ctxt servicedeployer.ServiceContext return result.WithError(fmt.Errorf("%s", result.FailureMsg)) } + logger.Debugf("check whether or not synthetics is enabled (component template %s)...", componentTemplatePackage) syntheticEnabled, err := r.isSyntheticsEnabled(dataStream, componentTemplatePackage) if err != nil { return result.WithError(fmt.Errorf("failed to check if synthetic source is enabled: %w", err)) @@ -1160,9 +1188,18 @@ func (r *runner) previewTransform(transformId string) ([]common.MapStr, error) { func deleteDataStreamDocs(api *elasticsearch.API, dataStream string) error { body := strings.NewReader(`{ "query": { "match_all": {} } }`) - _, err := api.DeleteByQuery([]string{dataStream}, body) + resp, err := api.DeleteByQuery([]string{dataStream}, body) if err != nil { - return err + return fmt.Errorf("failed to delete data stream docs: %w", err) + } + defer resp.Body.Close() + + if resp.StatusCode == http.StatusNotFound { + // Unavailable index is ok, this means that data is already not there. + return nil + } + if resp.IsError() { + return fmt.Errorf("failed to delete data stream docs for data stream %s: %s", dataStream, resp.String()) } return nil