From 3867ce29d5bc6a60d76ad07831ac7580796523a2 Mon Sep 17 00:00:00 2001 From: eminano Date: Mon, 20 May 2024 14:16:32 +0200 Subject: [PATCH] Add es internal library --- go.mod | 8 + go.sum | 21 ++ internal/es/es_api.go | 182 ++++++++++++++ internal/es/es_client.go | 498 +++++++++++++++++++++++++++++++++++++++ internal/es/es_errors.go | 179 ++++++++++++++ 5 files changed, 888 insertions(+) create mode 100644 internal/es/es_api.go create mode 100644 internal/es/es_client.go create mode 100644 internal/es/es_errors.go diff --git a/go.mod b/go.mod index 9326790..06c5806 100644 --- a/go.mod +++ b/go.mod @@ -4,8 +4,10 @@ go 1.22.2 require ( github.com/cenkalti/backoff v2.2.1+incompatible + github.com/elastic/go-elasticsearch/v8 v8.13.1 github.com/jackc/pglogrepl v0.0.0-20240307033717-828fbfe908e9 github.com/jackc/pgx/v5 v5.5.5 + github.com/mitchellh/mapstructure v1.5.0 github.com/rs/xid v1.5.0 github.com/rs/zerolog v1.32.0 github.com/segmentio/kafka-go v0.4.47 @@ -15,6 +17,9 @@ require ( require ( github.com/davecgh/go-spew v1.1.1 // indirect + github.com/elastic/elastic-transport-go/v8 v8.5.0 // indirect + github.com/go-logr/logr v1.3.0 // indirect + github.com/go-logr/stdr v1.2.2 // indirect github.com/jackc/pgio v1.0.0 // indirect github.com/jackc/pgpassfile v1.0.0 // indirect github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a // indirect @@ -23,6 +28,9 @@ require ( github.com/mattn/go-isatty v0.0.19 // indirect github.com/pierrec/lz4/v4 v4.1.15 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect + go.opentelemetry.io/otel v1.21.0 // indirect + go.opentelemetry.io/otel/metric v1.21.0 // indirect + go.opentelemetry.io/otel/trace v1.21.0 // indirect golang.org/x/crypto v0.20.0 // indirect golang.org/x/sys v0.17.0 // indirect golang.org/x/text v0.14.0 // indirect diff --git a/go.sum b/go.sum index 71720cd..272adcc 100644 --- a/go.sum +++ b/go.sum @@ -4,7 +4,18 @@ github.com/coreos/go-systemd/v22 v22.5.0/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSV github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/elastic/elastic-transport-go/v8 v8.5.0 h1:v5membAl7lvQgBTexPRDBO/RdnlQX+FM9fUVDyXxvH0= +github.com/elastic/elastic-transport-go/v8 v8.5.0/go.mod h1:YLHer5cj0csTzNFXoNQ8qhtGY1GTvSqPnKWKaqQE3Hk= +github.com/elastic/go-elasticsearch/v8 v8.13.1 h1:du5F8IzUUyCkzxyHdrO9AtopcG95I/qwi2WK8Kf1xlg= +github.com/elastic/go-elasticsearch/v8 v8.13.1/go.mod h1:DIn7HopJs4oZC/w0WoJR13uMUxtHeq92eI5bqv5CRfI= +github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= +github.com/go-logr/logr v1.3.0 h1:2y3SDp0ZXuc6/cjLSZ+Q3ir+QB9T/iG5yYRXqsagWSY= +github.com/go-logr/logr v1.3.0/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= +github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= +github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE= github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA= +github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= +github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/jackc/pgio v1.0.0 h1:g12B9UwVnzGhueNavwioyEEpAmqMe1E/BN9ES+8ovkE= github.com/jackc/pgio v1.0.0/go.mod h1:oP+2QK2wFfUWgr+gxjoBH9KGBb31Eio69xUb0w5bYf8= github.com/jackc/pglogrepl v0.0.0-20240307033717-828fbfe908e9 h1:86CQbMauoZdLS0HDLcEHYo6rErjiCBjVvcxGsioIn7s= @@ -28,6 +39,8 @@ github.com/mattn/go-colorable v0.1.13/go.mod h1:7S9/ev0klgBDR4GtXTXX8a3vIGJpMovk github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM= github.com/mattn/go-isatty v0.0.19 h1:JITubQf0MOLdlGRuRq+jtsDlekdYPia9ZFsB8h/APPA= github.com/mattn/go-isatty v0.0.19/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= +github.com/mitchellh/mapstructure v1.5.0 h1:jeMsZIYE/09sWLaz43PL7Gy6RuMjD2eJVyuac5Z2hdY= +github.com/mitchellh/mapstructure v1.5.0/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo= github.com/pierrec/lz4/v4 v4.1.15 h1:MO0/ucJhngq7299dKLwIMtgTfbkoSPF6AoMYDd8Q4q0= github.com/pierrec/lz4/v4 v4.1.15/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= @@ -56,6 +69,14 @@ github.com/xdg-go/scram v1.1.2/go.mod h1:RT/sEzTbU5y00aCK8UOx6R7YryM0iF1N2MOmC3k github.com/xdg-go/stringprep v1.0.4 h1:XLI/Ng3O1Atzq0oBs3TWm+5ZVgkq2aqdlvP9JtoZ6c8= github.com/xdg-go/stringprep v1.0.4/go.mod h1:mPGuuIYwz7CmR2bT9j4GbQqutWS1zV24gijq1dTyGkM= github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY= +go.opentelemetry.io/otel v1.21.0 h1:hzLeKBZEL7Okw2mGzZ0cc4k/A7Fta0uoPgaJCr8fsFc= +go.opentelemetry.io/otel v1.21.0/go.mod h1:QZzNPQPm1zLX4gZK4cMi+71eaorMSGT3A4znnUvNNEo= +go.opentelemetry.io/otel/metric v1.21.0 h1:tlYWfeo+Bocx5kLEloTjbcDwBuELRrIFxwdQ36PlJu4= +go.opentelemetry.io/otel/metric v1.21.0/go.mod h1:o1p3CA8nNHW8j5yuQLdc1eeqEaPfzug24uvsyIEJRWM= +go.opentelemetry.io/otel/sdk v1.21.0 h1:FTt8qirL1EysG6sTQRZ5TokkU8d0ugCj8htOgThZXQ8= +go.opentelemetry.io/otel/sdk v1.21.0/go.mod h1:Nna6Yv7PWTdgJHVRD9hIYywQBRx7pbox6nwBnZIxl/E= +go.opentelemetry.io/otel/trace v1.21.0 h1:WD9i5gzvoUPuXIXH24ZNBudiarZDKuekPqi/E8fpfLc= +go.opentelemetry.io/otel/trace v1.21.0/go.mod h1:LGbsEB0f9LGjN+OZaQQ26sohbOmiMR+BaslueVtS/qQ= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= golang.org/x/crypto v0.14.0/go.mod h1:MVFd36DqK4CsrnJYDkBA3VC4m2GkXAM0PvzMCn4JQf4= diff --git a/internal/es/es_api.go b/internal/es/es_api.go new file mode 100644 index 0000000..d0ae04b --- /dev/null +++ b/internal/es/es_api.go @@ -0,0 +1,182 @@ +// SPDX-License-Identifier: Apache-2.0 + +package es + +import ( + "bytes" + "encoding/json" + "fmt" + "io" +) + +type SearchRequest struct { + Index *string + ReturnVersion *bool + Size *int + From *int + Sort *string + SourceIncludes *string + Query io.Reader +} + +type DeleteByQueryRequest struct { + Index []string + Query map[string]any + Refresh bool +} + +type IndexRequest struct { + Index string + Body []byte + Refresh string +} + +type IndexWithIDRequest struct { + Index string + ID string + Body []byte + Refresh string +} + +type BulkItem struct { + Index *BulkIndex `json:"index,omitempty"` + Delete *BulkIndex `json:"delete,omitempty"` + Doc map[string]any `json:"-"` + Status int `json:"-"` + Error json.RawMessage `json:"-"` +} + +type BulkIndex struct { + Index string `json:"_index"` + ID string `json:"_id"` + Version *int `json:"version,omitempty"` + VersionType string `json:"version_type,omitempty"` +} + +type BulkResponseItem struct { + Index struct { + Status int `json:"status"` + Error json.RawMessage `json:"error"` + } `json:"index"` + Delete struct { + Status int `json:"status"` + Error json.RawMessage `json:"error"` + } `json:"delete"` +} + +type Hit struct { + ID string `json:"_id"` + Index string `json:"_index"` + Version int `json:"_version"` + Source map[string]any `json:"_source"` + Score float64 `json:"_score"` + Highlight map[string]any `json:"highlight"` +} + +type Hits struct { + Total struct { + Value int `json:"value"` + Relation string `json:"relation"` + } `json:"total"` + Hits []Hit `json:"hits"` +} + +type SearchResponse struct { + Hits Hits `json:"hits"` + Aggregations map[string]any `json:"aggregations"` +} + +type BulkResponse struct { + Errors bool `json:"errors"` + Items []BulkResponseItem +} + +type IndexStats struct { + Index string `json:"index,omitempty"` + PrimarySizeBytes uint64 `json:"primary_size_bytes,omitempty"` + TotalSizeBytes uint64 `json:"total_size_bytes,omitempty"` +} + +type Mappings struct { + Properties map[string]any + Dynamic string +} + +type mappingResponse map[string]struct { + Mappings Mappings +} + +type indexStatsResponse struct { + Indices map[string]struct { + Primaries struct { + Store struct { + SizeInBytes int `json:"size_in_bytes"` + } `json:"store"` + } `json:"primaries"` + Total struct { + Store struct { + SizeInBytes int `json:"size_in_bytes"` + } `json:"store"` + } `json:"total"` + } `json:"indices"` +} + +type countResponse struct { + Count int `json:"count"` +} + +func encodeBulkItems(buffer *bytes.Buffer, items []BulkItem) error { + encoder := json.NewEncoder(buffer) + + for _, item := range items { + if err := encoder.Encode(item); err != nil { + return fmt.Errorf("bulk item [%v]: encode item action %w", item, err) + } + + if item.Delete != nil { + continue + } + + if item.Doc == nil { + buffer.WriteString("{}\n") + continue + } + + if err := encoder.Encode(item.Doc); err != nil { + return fmt.Errorf("bulk item [%v]: encode item document action %w", item, err) + } + } + + return nil +} + +func verifyResponse(bodyBytes []byte, items []BulkItem) (failed []BulkItem, err error) { + var esResponse BulkResponse + + if err := json.Unmarshal(bodyBytes, &esResponse); err != nil { + return nil, fmt.Errorf("error unmarshaling response from es: %w (%s)", err, bodyBytes) + } + + if !esResponse.Errors { + return []BulkItem{}, nil + } + + failed = []BulkItem{} + for i, respItem := range esResponse.Items { + if items[i].Index != nil { + if respItem.Index.Status > 299 { + items[i].Status = respItem.Index.Status + items[i].Error = respItem.Index.Error + failed = append(failed, items[i]) + } + } else if items[i].Delete != nil { + if respItem.Delete.Status > 299 { + items[i].Status = respItem.Delete.Status + items[i].Error = respItem.Delete.Error + failed = append(failed, items[i]) + } + } + } + + return failed, nil +} diff --git a/internal/es/es_client.go b/internal/es/es_client.go new file mode 100644 index 0000000..15d3bf1 --- /dev/null +++ b/internal/es/es_client.go @@ -0,0 +1,498 @@ +// SPDX-License-Identifier: Apache-2.0 + +package es + +import ( + "bufio" + "bytes" + "context" + "encoding/json" + "errors" + "fmt" + "io" + "net/http" + + "github.com/elastic/go-elasticsearch/v8" + "github.com/elastic/go-elasticsearch/v8/esapi" +) + +type Client struct { + client *elasticsearch.Client +} + +var ( + ErrResourceNotFound = errors.New("elasticsearch resource not found") + errInvalidSearchEnvelope = errors.New("invalid search response") +) + +func NewClient(url string) (*Client, error) { + es, err := newClient(url) + if err != nil { + return nil, fmt.Errorf("create elasticsearch client: %w", err) + } + return &Client{client: es}, nil +} + +func (ec *Client) CloseIndex(ctx context.Context, index string) error { + res, err := ec.client.Indices.Close( + []string{index}, + ec.client.Indices.Close.WithContext(ctx), + ) + if err != nil { + return fmt.Errorf("[CloseIndex] error from Elasticsearch: %w", err) + } + defer res.Body.Close() + + if err := ec.isErrResponse(res); err != nil { + return fmt.Errorf("[CloseIndex] error response from Elasticsearch: %w", err) + } + + return nil +} + +func (ec *Client) Count(ctx context.Context, index string) (int, error) { + res, err := ec.client.Count( + ec.client.Count.WithIndex(index), + ec.client.Count.WithContext(ctx)) + if err != nil { + return 0, fmt.Errorf("[Count] error from Elasticsearch: %w", err) + } + defer res.Body.Close() + + if err := ec.isErrResponse(res); err != nil { + return 0, fmt.Errorf("[Count] error response from Elasticsearch: %w", err) + } + + count := &countResponse{} + if err := json.NewDecoder(res.Body).Decode(count); err != nil { + return 0, fmt.Errorf("[Count] error decoding Elasticsearch response: %w", err) + } + + return count.Count, nil +} + +func (ec *Client) CreateIndex(ctx context.Context, index string, body map[string]any) error { + reader, err := createReader(body) + if err != nil { + return err + } + res, err := ec.client.Indices.Create(index, + ec.client.Indices.Create.WithContext(ctx), + ec.client.Indices.Create.WithBody(reader), + ) + if err != nil { + return fmt.Errorf("[CreateIndex] error from Elasticsearch: %w", err) + } + defer res.Body.Close() + + if err := ec.isErrResponse(res); err != nil { + return fmt.Errorf("[CreateIndex] error response from Elasticsearch: %w", err) + } + + return nil +} + +func (ec *Client) DeleteByQuery(ctx context.Context, req *DeleteByQueryRequest) error { + reader, err := createReader(req.Query) + if err != nil { + return err + } + + res, err := ec.client.DeleteByQuery(req.Index, + reader, + ec.client.DeleteByQuery.WithContext(ctx), + ec.client.DeleteByQuery.WithSlices("auto"), + ec.client.DeleteByQuery.WithWaitForCompletion(false), + ec.client.DeleteByQuery.WithRefresh(req.Refresh), + ) + if err != nil { + return fmt.Errorf("[DeleteByQuery] error from Elasticsearch: %w", err) + } + defer res.Body.Close() + + if err := ec.isErrResponse(res); err != nil { + return fmt.Errorf("[DeleteByQuery] error response from Elasticsearch: %w", err) + } + + return nil +} + +func (ec *Client) DeleteIndex(ctx context.Context, index []string) error { + res, err := ec.client.Indices.Delete( + index, + ec.client.Indices.Delete.WithContext(ctx), + ) + if err != nil { + return fmt.Errorf("[DeleteIndex] error from Elasticsearch: %w", err) + } + defer res.Body.Close() + + if err := ec.isErrResponse(res); err != nil { + return fmt.Errorf("[DeleteIndex] error response from Elasticsearch: %w", err) + } + + return nil +} + +func (ec *Client) Index(ctx context.Context, req *IndexRequest) error { + res, err := ec.client.Index(req.Index, + bytes.NewReader(req.Body), + ec.client.Index.WithContext(ctx), + ec.client.Index.WithRefresh(req.Refresh), + ) + if err != nil { + return fmt.Errorf("[Index] error from Elasticsearch: %w", err) + } + defer res.Body.Close() + + if err := ec.isErrResponse(res); err != nil { + return fmt.Errorf("[Index] error response from Elasticsearch: %w", err) + } + + return nil +} + +func (ec *Client) IndexWithID(ctx context.Context, req *IndexWithIDRequest) error { + res, err := ec.client.Index(req.Index, + bytes.NewReader(req.Body), + ec.client.Index.WithContext(ctx), + ec.client.Index.WithRefresh(req.Refresh), + ec.client.Index.WithDocumentID(req.ID), + ) + if err != nil { + return fmt.Errorf("[IndexWithID] error from Elasticsearch: %w", err) + } + defer res.Body.Close() + + if err := ec.isErrResponse(res); err != nil { + return fmt.Errorf("[IndexWithID] error response from Elasticsearch: %w", err) + } + + return nil +} + +func (ec *Client) IndexExists(ctx context.Context, index string) (bool, error) { + res, err := ec.client.Indices.Exists([]string{index}, + ec.client.Indices.Exists.WithContext(ctx), + ) + if err != nil { + return false, fmt.Errorf("[IndexExists] error from Elasticsearch: %w", err) + } + defer res.Body.Close() + + if res.IsError() && res.StatusCode != http.StatusNotFound { + return false, fmt.Errorf("[IndexExists] error response from Elasticsearch: %w", err) + } + + return res.StatusCode == http.StatusOK, nil +} + +func (ec *Client) GetIndexAlias(ctx context.Context, name string) (map[string]any, error) { + res, err := ec.client.Indices.GetAlias( + ec.client.Indices.GetAlias.WithContext(ctx), + ec.client.Indices.GetAlias.WithName(name), + ) + if err != nil { + return nil, fmt.Errorf("[GetIndexAlias] error from Elasticsearch: %w", err) + } + defer res.Body.Close() + + if err := ec.isErrResponse(res); err != nil { + return nil, fmt.Errorf("[GetIndexAlias] error response from Elasticsearch: %w", err) + } + + resMap := map[string]any{} + resData, err := io.ReadAll(res.Body) + if err != nil { + return nil, fmt.Errorf("[GetIndexAlias] error reading Elasticsearch response body: %w", err) + } + + if err := json.Unmarshal(resData, &resMap); err != nil { + return nil, fmt.Errorf("[GetIndexAlias] error unmarshalling Elasticsearch response: %w", err) + } + return resMap, nil +} + +func (ec *Client) GetIndexMappings(ctx context.Context, index string) (*Mappings, error) { + res, err := ec.client.Indices.GetMapping( + ec.client.Indices.GetMapping.WithIndex(index), + ec.client.Indices.GetMapping.WithContext(ctx)) + if err != nil { + return nil, fmt.Errorf("[GetIndexMapping] error from Elasticsearch: %w", err) + } + defer res.Body.Close() + + if err := ec.isErrResponse(res); err != nil { + return nil, fmt.Errorf("[GetIndexMapping] error response from Elasticsearch: %w", err) + } + + var indexMappings mappingResponse + if err = json.NewDecoder(res.Body).Decode(&indexMappings); err != nil { + return nil, err + } + + mappings := indexMappings[index] + + return &mappings.Mappings, nil +} + +// GetIndicesStats uses the index stats API to fetch statistics about indices. indexPattern is a +// wildcard pattern used to select the indices we care about. +func (ec *Client) GetIndicesStats(ctx context.Context, indexPattern string) ([]IndexStats, error) { + res, err := ec.client.Indices.Stats( + ec.client.Indices.Stats.WithContext(ctx), + ec.client.Indices.Stats.WithIndex(indexPattern), + ) + if err != nil { + return nil, fmt.Errorf("[GetIndicesStats] querying OpenSearch Cat API: %w", err) + } + defer res.Body.Close() + + var response indexStatsResponse + if err = json.NewDecoder(res.Body).Decode(&response); err != nil { + return nil, fmt.Errorf("[GetIndicesStats] decoding response body: %w", err) + } + + usage := make([]IndexStats, 0, len(response.Indices)) + for index, r := range response.Indices { + usage = append(usage, IndexStats{ + Index: index, + TotalSizeBytes: uint64(r.Total.Store.SizeInBytes), + PrimarySizeBytes: uint64(r.Primaries.Store.SizeInBytes), + }) + } + + return usage, nil +} + +// ListIndices returns the list of indices that match the index name pattern on +// input from the OS cluster +func (ec *Client) ListIndices(ctx context.Context, indices []string) ([]string, error) { + res, err := ec.client.Cat.Indices( + ec.client.Cat.Indices.WithContext(ctx), + ec.client.Cat.Indices.WithIndex(indices...), + ec.client.Cat.Indices.WithH("index"), + ) + if err != nil { + return []string{}, fmt.Errorf("[ListIndices] error from Elasticsearch: %w", err) + } + defer res.Body.Close() + + if err := ec.isErrResponse(res); err != nil { + return []string{}, fmt.Errorf("[ListIndices] error response from Elasticsearch: %w", err) + } + + scanner := bufio.NewScanner(res.Body) + scanner.Split(bufio.ScanLines) + + resp := []string{} + for scanner.Scan() { + line := scanner.Text() + resp = append(resp, line) + } + + if err := scanner.Err(); err != nil { + return nil, fmt.Errorf("[ListIndices] error scanning response from Elasticsearch: %w", err) + } + + return resp, nil +} + +func (ec *Client) PutIndexAlias(ctx context.Context, index []string, name string) error { + res, err := ec.client.Indices.PutAlias( + index, + name, + ec.client.Indices.PutAlias.WithContext(ctx), + ) + if err != nil { + return fmt.Errorf("[PutIndexAlias] error from Elasticsearch: %w", err) + } + defer res.Body.Close() + + if err := ec.isErrResponse(res); err != nil { + return fmt.Errorf("[PutIndexAlias] error response from Elasticsearch: %w", err) + } + + return nil +} + +// PutIndexMappings add field type mapping data to a previously created ES index +// Dynamic mapping is disabled upon index creation, so it is a requirement to explicitly define mappings for each column +func (ec *Client) PutIndexMappings(ctx context.Context, index string, mapping map[string]any) error { + reader, err := createReader(mapping) + if err != nil { + return err + } + res, err := ec.client.Indices.PutMapping( + []string{index}, + reader, + ec.client.Indices.PutMapping.WithContext(ctx)) + if err != nil { + return fmt.Errorf("[PutIndexMappings] error from Elasticsearch: %w", err) + } + defer res.Body.Close() + + if err := ec.isErrResponse(res); err != nil { + return fmt.Errorf("[PutIndexMappings] error response from Elasticsearch: %w", err) + } + + return nil +} + +func (ec *Client) PutIndexSettings(ctx context.Context, index string, settings map[string]any) error { + reader, err := createReader(settings) + if err != nil { + return err + } + res, err := ec.client.Indices.PutSettings( + reader, + ec.client.Indices.PutSettings.WithContext(ctx), + ec.client.Indices.PutSettings.WithIndex(index)) + if err != nil { + return fmt.Errorf("[PutIndexSettings] error from Elasticsearch: %w", err) + } + defer res.Body.Close() + + if err := ec.isErrResponse(res); err != nil { + return fmt.Errorf("[PutIndexSettings] error response from Elasticsearch: %w", err) + } + + return nil +} + +func (ec *Client) RefreshIndex(ctx context.Context, index string) error { + res, err := ec.client.Indices.Refresh( + ec.client.Indices.Refresh.WithIndex(index), + ec.client.Indices.Refresh.WithContext(ctx), + ) + if err != nil { + return fmt.Errorf("[RefreshIndex] error from Elasticsearch: %w", err) + } + defer res.Body.Close() + + if err := ec.isErrResponse(res); err != nil { + return fmt.Errorf("[RefreshIndex] error response from Elasticsearch: %w", err) + } + + return nil +} + +func (ec *Client) Perform(req *http.Request) (*http.Response, error) { + return ec.client.Transport.Perform(req) +} + +func (ec *Client) Search(ctx context.Context, req *SearchRequest) (*SearchResponse, error) { + res, err := ec.client.Search(ec.parseSearchRequest(ctx, req)...) + if err != nil { + return nil, fmt.Errorf("[Search] error from Elasticsearch: %w", err) + } + defer res.Body.Close() + if err := ec.isErrResponse(res); err != nil { + return nil, fmt.Errorf("[Search] error response from Elasticsearch: %w", err) + } + + var response SearchResponse + err = json.NewDecoder(res.Body).Decode(&response) + if err != nil { + return nil, fmt.Errorf("[Search] decoding response body: %w: %w", errInvalidSearchEnvelope, err) + } + + return &response, nil +} + +// SendBulkRequest can perform multiple indexing or delete operations in a single call +func (ec *Client) SendBulkRequest(ctx context.Context, items []BulkItem) ([]BulkItem, error) { + buffer := new(bytes.Buffer) + + if err := encodeBulkItems(buffer, items); err != nil { + return nil, err + } + + req, err := http.NewRequest("POST", "/_bulk", buffer) + if err != nil { + return nil, fmt.Errorf("new http request: %w", err) + } + req.Header.Add("Content-Type", "application/x-ndjson") + req = req.WithContext(ctx) + + resp, err := ec.Perform(req) + if err != nil { + return nil, fmt.Errorf("perform: %w", err) + } + defer resp.Body.Close() + bodyBytes, err := io.ReadAll(resp.Body) + if err != nil { + return nil, fmt.Errorf("read body: %w", err) + } + if resp.StatusCode > 299 { + return nil, fmt.Errorf("error from Elasticsearch: %d: %s", resp.StatusCode, bodyBytes) + } + + return verifyResponse(bodyBytes, items) +} + +func (ec *Client) parseSearchRequest(ctx context.Context, req *SearchRequest) []func(*esapi.SearchRequest) { + opts := []func(*esapi.SearchRequest){ + ec.client.Search.WithContext(ctx), + } + if req.Index != nil { + opts = append(opts, ec.client.Search.WithIndex(*req.Index)) + } + if req.ReturnVersion != nil { + opts = append(opts, ec.client.Search.WithVersion(*req.ReturnVersion)) + } + if req.Size != nil { + opts = append(opts, ec.client.Search.WithSize(*req.Size)) + } + if req.From != nil { + opts = append(opts, ec.client.Search.WithFrom(*req.From)) + } + if req.Sort != nil { + opts = append(opts, ec.client.Search.WithSort(*req.Sort)) + } + if req.Query != nil { + opts = append(opts, ec.client.Search.WithBody(req.Query)) + } + if req.SourceIncludes != nil { + opts = append(opts, ec.client.Search.WithSourceIncludes(*req.SourceIncludes)) + } + + return opts +} + +func (ec *Client) isErrResponse(res *esapi.Response) error { + if res.IsError() { + if res.StatusCode == http.StatusNotFound { + return fmt.Errorf("%w: %w", ErrResourceNotFound, extractResponseError(res)) + } + return extractResponseError(res) + } + + return nil +} + +func newClient(address string) (*elasticsearch.Client, error) { + if address == "" { + return nil, errors.New("no address provided") + } + + cfg := elasticsearch.Config{ + Addresses: []string{ + address, + }, + Transport: http.DefaultTransport, + } + + return elasticsearch.NewClient(cfg) +} + +// createReader returns a reader on the JSON representation of the given value. +func createReader(value any) (*bytes.Reader, error) { + bytesValue, err := json.Marshal(value) + if err != nil { + return nil, fmt.Errorf("unexpected marshaling error: %w", err) + } + return bytes.NewReader(bytesValue), nil +} + +func Ptr[T any](i T) *T { return &i } diff --git a/internal/es/es_errors.go b/internal/es/es_errors.go new file mode 100644 index 0000000..42e3b48 --- /dev/null +++ b/internal/es/es_errors.go @@ -0,0 +1,179 @@ +// SPDX-License-Identifier: Apache-2.0 + +package es + +import ( + "encoding/json" + "errors" + "fmt" + "net/http" + "strings" + + "github.com/elastic/go-elasticsearch/v8/esapi" + "github.com/mitchellh/mapstructure" +) + +type ResponseError struct { + Type string `mapstructure:"type"` + Reason string `mapstructure:"reason"` + FailedShards []map[string]any `mapstructure:"failed_shards"` + CausedBy *CausedBy `mapstructure:"caused_by"` + RootCause []RootCause `mapstructure:"root_cause"` +} + +type CausedBy struct { + Type string `mapstructure:"type"` + Reason string `mapstructure:"reason"` + MaxBuckets int `mapstructure:"max_buckets"` +} + +type RootCause struct { + Type string `mapstructure:"type"` + Reason string `mapstructure:"reason"` +} + +type RetryableError struct { + Cause error +} + +func (r RetryableError) Error() string { + return fmt.Sprintf("%v", r.Cause) +} + +func (r RetryableError) Unwrap() error { + return r.Cause +} + +type ErrIllegalArgument struct { + Reason string +} + +func (e *ErrIllegalArgument) Error() string { + return e.Reason +} + +type ErrResourceAlreadyExists struct { + Reason string +} + +func (e ErrResourceAlreadyExists) Error() string { + return fmt.Sprintf("resource already exists: %s", e.Reason) +} + +type ErrQueryInvalid struct { + Cause error +} + +func (e ErrQueryInvalid) Error() string { + return e.Cause.Error() +} + +const ( + SearchExecutionException = "search_phase_execution_exception" + TooManyBucketsException = "too_many_buckets_exception" + TooManyNestedClausesException = "too_many_nested_clauses" + TooManyClausesException = "too_many_clauses" + IllegalArgumentException = "illegal_argument_exception" + SnapshotInProgressException = "snapshot_in_progress_exception" + ResourceAlreadyExistsException = "resource_already_exists_exception" +) + +var ( + ErrTooManyRequests = errors.New("too many requests") + ErrTooManyBuckets = errors.New("too many buckets") + ErrTooManyNestedClauses = errors.New("too many nested clauses") + ErrTooManyClauses = errors.New("too many clauses") +) + +func extractResponseError(res *esapi.Response) error { + var e map[string]any + if err := json.NewDecoder(res.Body).Decode(&e); err != nil { + return fmt.Errorf("decoding error response: %w", err) + } + + var errType, errReason any + if eErr, ok := e["error"]; ok { + var esError ResponseError + err := mapstructure.Decode(eErr, &esError) + if err != nil { + errType = "" + errReason = "" + } else { + errType = esError.Type + errReason = esError.Reason + if esError.Type == SearchExecutionException { + marshalled, _ := json.Marshal(esError.FailedShards) + errReason = string(marshalled) + + if esError.CausedBy != nil { + if esError.CausedBy.Type == TooManyBucketsException { + return ErrTooManyBuckets + } + if esError.CausedBy.Type == TooManyNestedClausesException { + return ErrTooManyNestedClauses + } + if esError.CausedBy.Type == TooManyClausesException { + return ErrTooManyClauses + } + if esError.CausedBy.Type == IllegalArgumentException { + return &ErrIllegalArgument{Reason: esError.CausedBy.Reason} + } + } + if len(esError.RootCause) > 0 { + if esError.RootCause[0].Type == TooManyNestedClausesException { + return ErrTooManyNestedClauses + } + if strings.Contains(esError.RootCause[0].Reason, "function score query returned an invalid score") { + return &ErrIllegalArgument{Reason: esError.RootCause[0].Reason} + } + } + } + } + } + + if err, ok := getRetryableError(res.StatusCode); ok { + return RetryableError{Cause: err} + } + + if res.StatusCode == http.StatusNotFound { + return fmt.Errorf("%w: [%d]: %s: %s", ErrResourceNotFound, res.StatusCode, errType, errReason) + } + + if res.StatusCode == http.StatusBadRequest { + switch errType { + case ResourceAlreadyExistsException: + reason, _ := errReason.(string) + return ErrResourceAlreadyExists{Reason: reason} + case SnapshotInProgressException: + return RetryableError{Cause: fmt.Errorf("[%d] %s: %s", res.StatusCode, errType, errReason)} + default: + // Generic bad request + return ErrQueryInvalid{ + Cause: fmt.Errorf("%v", errReason), + } + } + } + + return fmt.Errorf("[%d] %s: %s", res.StatusCode, errType, errReason) +} + +func getRetryableError(statusCode int) (error, bool) { + switch statusCode { + case http.StatusRequestTimeout: + return errors.New("request timeout"), true + case http.StatusLocked: + return errors.New("resource locked"), true + case http.StatusTooEarly: + return errors.New("too early"), true + case http.StatusTooManyRequests: + return ErrTooManyRequests, true + case http.StatusBadGateway: + return errors.New("bad gateway"), true + case http.StatusServiceUnavailable: + return errors.New("service unavailable"), true + case http.StatusGatewayTimeout: + return errors.New("gateway timeout"), true + } + + return nil, false +}