Skip to content

Commit

Permalink
Merge pull request #13 from xataio/add-es-internal-library
Browse files Browse the repository at this point in the history
Add es internal library
  • Loading branch information
eminano authored May 20, 2024
2 parents c252e97 + 3867ce2 commit 1e9cfda
Show file tree
Hide file tree
Showing 5 changed files with 888 additions and 0 deletions.
8 changes: 8 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@ go 1.22.2

require (
github.com/cenkalti/backoff v2.2.1+incompatible
github.com/elastic/go-elasticsearch/v8 v8.13.1
github.com/jackc/pglogrepl v0.0.0-20240307033717-828fbfe908e9
github.com/jackc/pgx/v5 v5.5.5
github.com/mitchellh/mapstructure v1.5.0
github.com/rs/xid v1.5.0
github.com/rs/zerolog v1.32.0
github.com/segmentio/kafka-go v0.4.47
Expand All @@ -15,6 +17,9 @@ require (

require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/elastic/elastic-transport-go/v8 v8.5.0 // indirect
github.com/go-logr/logr v1.3.0 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/jackc/pgio v1.0.0 // indirect
github.com/jackc/pgpassfile v1.0.0 // indirect
github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a // indirect
Expand All @@ -23,6 +28,9 @@ require (
github.com/mattn/go-isatty v0.0.19 // indirect
github.com/pierrec/lz4/v4 v4.1.15 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
go.opentelemetry.io/otel v1.21.0 // indirect
go.opentelemetry.io/otel/metric v1.21.0 // indirect
go.opentelemetry.io/otel/trace v1.21.0 // indirect
golang.org/x/crypto v0.20.0 // indirect
golang.org/x/sys v0.17.0 // indirect
golang.org/x/text v0.14.0 // indirect
Expand Down
21 changes: 21 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,18 @@ github.com/coreos/go-systemd/v22 v22.5.0/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSV
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/elastic/elastic-transport-go/v8 v8.5.0 h1:v5membAl7lvQgBTexPRDBO/RdnlQX+FM9fUVDyXxvH0=
github.com/elastic/elastic-transport-go/v8 v8.5.0/go.mod h1:YLHer5cj0csTzNFXoNQ8qhtGY1GTvSqPnKWKaqQE3Hk=
github.com/elastic/go-elasticsearch/v8 v8.13.1 h1:du5F8IzUUyCkzxyHdrO9AtopcG95I/qwi2WK8Kf1xlg=
github.com/elastic/go-elasticsearch/v8 v8.13.1/go.mod h1:DIn7HopJs4oZC/w0WoJR13uMUxtHeq92eI5bqv5CRfI=
github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A=
github.com/go-logr/logr v1.3.0 h1:2y3SDp0ZXuc6/cjLSZ+Q3ir+QB9T/iG5yYRXqsagWSY=
github.com/go-logr/logr v1.3.0/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY=
github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag=
github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE=
github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA=
github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/jackc/pgio v1.0.0 h1:g12B9UwVnzGhueNavwioyEEpAmqMe1E/BN9ES+8ovkE=
github.com/jackc/pgio v1.0.0/go.mod h1:oP+2QK2wFfUWgr+gxjoBH9KGBb31Eio69xUb0w5bYf8=
github.com/jackc/pglogrepl v0.0.0-20240307033717-828fbfe908e9 h1:86CQbMauoZdLS0HDLcEHYo6rErjiCBjVvcxGsioIn7s=
Expand All @@ -28,6 +39,8 @@ github.com/mattn/go-colorable v0.1.13/go.mod h1:7S9/ev0klgBDR4GtXTXX8a3vIGJpMovk
github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM=
github.com/mattn/go-isatty v0.0.19 h1:JITubQf0MOLdlGRuRq+jtsDlekdYPia9ZFsB8h/APPA=
github.com/mattn/go-isatty v0.0.19/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y=
github.com/mitchellh/mapstructure v1.5.0 h1:jeMsZIYE/09sWLaz43PL7Gy6RuMjD2eJVyuac5Z2hdY=
github.com/mitchellh/mapstructure v1.5.0/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo=
github.com/pierrec/lz4/v4 v4.1.15 h1:MO0/ucJhngq7299dKLwIMtgTfbkoSPF6AoMYDd8Q4q0=
github.com/pierrec/lz4/v4 v4.1.15/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4=
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
Expand Down Expand Up @@ -56,6 +69,14 @@ github.com/xdg-go/scram v1.1.2/go.mod h1:RT/sEzTbU5y00aCK8UOx6R7YryM0iF1N2MOmC3k
github.com/xdg-go/stringprep v1.0.4 h1:XLI/Ng3O1Atzq0oBs3TWm+5ZVgkq2aqdlvP9JtoZ6c8=
github.com/xdg-go/stringprep v1.0.4/go.mod h1:mPGuuIYwz7CmR2bT9j4GbQqutWS1zV24gijq1dTyGkM=
github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY=
go.opentelemetry.io/otel v1.21.0 h1:hzLeKBZEL7Okw2mGzZ0cc4k/A7Fta0uoPgaJCr8fsFc=
go.opentelemetry.io/otel v1.21.0/go.mod h1:QZzNPQPm1zLX4gZK4cMi+71eaorMSGT3A4znnUvNNEo=
go.opentelemetry.io/otel/metric v1.21.0 h1:tlYWfeo+Bocx5kLEloTjbcDwBuELRrIFxwdQ36PlJu4=
go.opentelemetry.io/otel/metric v1.21.0/go.mod h1:o1p3CA8nNHW8j5yuQLdc1eeqEaPfzug24uvsyIEJRWM=
go.opentelemetry.io/otel/sdk v1.21.0 h1:FTt8qirL1EysG6sTQRZ5TokkU8d0ugCj8htOgThZXQ8=
go.opentelemetry.io/otel/sdk v1.21.0/go.mod h1:Nna6Yv7PWTdgJHVRD9hIYywQBRx7pbox6nwBnZIxl/E=
go.opentelemetry.io/otel/trace v1.21.0 h1:WD9i5gzvoUPuXIXH24ZNBudiarZDKuekPqi/E8fpfLc=
go.opentelemetry.io/otel/trace v1.21.0/go.mod h1:LGbsEB0f9LGjN+OZaQQ26sohbOmiMR+BaslueVtS/qQ=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
golang.org/x/crypto v0.14.0/go.mod h1:MVFd36DqK4CsrnJYDkBA3VC4m2GkXAM0PvzMCn4JQf4=
Expand Down
182 changes: 182 additions & 0 deletions internal/es/es_api.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,182 @@
// SPDX-License-Identifier: Apache-2.0

package es

import (
"bytes"
"encoding/json"
"fmt"
"io"
)

type SearchRequest struct {
Index *string
ReturnVersion *bool
Size *int
From *int
Sort *string
SourceIncludes *string
Query io.Reader
}

type DeleteByQueryRequest struct {
Index []string
Query map[string]any
Refresh bool
}

type IndexRequest struct {
Index string
Body []byte
Refresh string
}

type IndexWithIDRequest struct {
Index string
ID string
Body []byte
Refresh string
}

type BulkItem struct {
Index *BulkIndex `json:"index,omitempty"`
Delete *BulkIndex `json:"delete,omitempty"`
Doc map[string]any `json:"-"`
Status int `json:"-"`
Error json.RawMessage `json:"-"`
}

type BulkIndex struct {
Index string `json:"_index"`
ID string `json:"_id"`
Version *int `json:"version,omitempty"`
VersionType string `json:"version_type,omitempty"`
}

type BulkResponseItem struct {
Index struct {
Status int `json:"status"`
Error json.RawMessage `json:"error"`
} `json:"index"`
Delete struct {
Status int `json:"status"`
Error json.RawMessage `json:"error"`
} `json:"delete"`
}

type Hit struct {
ID string `json:"_id"`
Index string `json:"_index"`
Version int `json:"_version"`
Source map[string]any `json:"_source"`
Score float64 `json:"_score"`
Highlight map[string]any `json:"highlight"`
}

type Hits struct {
Total struct {
Value int `json:"value"`
Relation string `json:"relation"`
} `json:"total"`
Hits []Hit `json:"hits"`
}

type SearchResponse struct {
Hits Hits `json:"hits"`
Aggregations map[string]any `json:"aggregations"`
}

type BulkResponse struct {
Errors bool `json:"errors"`
Items []BulkResponseItem
}

type IndexStats struct {
Index string `json:"index,omitempty"`
PrimarySizeBytes uint64 `json:"primary_size_bytes,omitempty"`
TotalSizeBytes uint64 `json:"total_size_bytes,omitempty"`
}

type Mappings struct {
Properties map[string]any
Dynamic string
}

type mappingResponse map[string]struct {
Mappings Mappings
}

type indexStatsResponse struct {
Indices map[string]struct {
Primaries struct {
Store struct {
SizeInBytes int `json:"size_in_bytes"`
} `json:"store"`
} `json:"primaries"`
Total struct {
Store struct {
SizeInBytes int `json:"size_in_bytes"`
} `json:"store"`
} `json:"total"`
} `json:"indices"`
}

type countResponse struct {
Count int `json:"count"`
}

func encodeBulkItems(buffer *bytes.Buffer, items []BulkItem) error {
encoder := json.NewEncoder(buffer)

for _, item := range items {
if err := encoder.Encode(item); err != nil {
return fmt.Errorf("bulk item [%v]: encode item action %w", item, err)
}

if item.Delete != nil {
continue
}

if item.Doc == nil {
buffer.WriteString("{}\n")
continue
}

if err := encoder.Encode(item.Doc); err != nil {
return fmt.Errorf("bulk item [%v]: encode item document action %w", item, err)
}
}

return nil
}

func verifyResponse(bodyBytes []byte, items []BulkItem) (failed []BulkItem, err error) {
var esResponse BulkResponse

if err := json.Unmarshal(bodyBytes, &esResponse); err != nil {
return nil, fmt.Errorf("error unmarshaling response from es: %w (%s)", err, bodyBytes)
}

if !esResponse.Errors {
return []BulkItem{}, nil
}

failed = []BulkItem{}
for i, respItem := range esResponse.Items {
if items[i].Index != nil {
if respItem.Index.Status > 299 {
items[i].Status = respItem.Index.Status
items[i].Error = respItem.Index.Error
failed = append(failed, items[i])
}
} else if items[i].Delete != nil {
if respItem.Delete.Status > 299 {
items[i].Status = respItem.Delete.Status
items[i].Error = respItem.Delete.Error
failed = append(failed, items[i])
}
}
}

return failed, nil
}
Loading

0 comments on commit 1e9cfda

Please sign in to comment.