From 68f347374f9024cdee7bf5335894834f56bc8e4d Mon Sep 17 00:00:00 2001 From: Paul Date: Tue, 3 Sep 2024 17:03:09 +0200 Subject: [PATCH] Optimisations for indirect ingestion (#24) * First batch of optimisations * completed optimisation on multiget * new bachting system * Delete internals/ingester/test.json * Delete internals/ingester/.gitignore * Update .gitignore --- config/ingester-api.toml | 6 +- go.mod | 1 + go.sum | 1 + internals/configuration/constants.go | 1 + internals/ingester/worker.go | 3 +- internals/ingester/worker_v6.go | 1 + internals/ingester/worker_v8.go | 448 ++++++++++++------------- internals/ingester/worker_v8_direct.go | 130 +++++++ main.go | 15 +- 9 files changed, 370 insertions(+), 236 deletions(-) create mode 100644 internals/ingester/worker_v8_direct.go diff --git a/config/ingester-api.toml b/config/ingester-api.toml index dfab3ca..f9d98bc 100644 --- a/config/ingester-api.toml +++ b/config/ingester-api.toml @@ -84,9 +84,13 @@ ELASTICSEARCH_HTTP_TIMEOUT = "1m" # Default value: false ELASTICSEARCH_DIRECT_MULTI_GET_MODE = "false" +# How many documents will be in the mget request of each batch (max) +# Default value: 1000 +ELASTICSEARCH_MGET_BATCH_SIZE = "500" + # Specify the maximum number of concurrent worker by type of ingested document (1 type of document = n workers) # Default value: 2 -INGESTER_MAXIMUM_WORKERS = "1" +INGESTER_MAXIMUM_WORKERS = "2" # Specify the typed-ingesters maximum queue size # Default value: "5000" diff --git a/go.mod b/go.mod index 97374a7..c19e6d0 100644 --- a/go.mod +++ b/go.mod @@ -8,6 +8,7 @@ require ( github.com/go-chi/chi/v5 v5.0.8 github.com/go-kit/kit v0.12.0 github.com/golang/protobuf v1.5.3 + github.com/google/go-cmp v0.5.9 github.com/google/uuid v1.3.0 github.com/json-iterator/go v1.1.12 github.com/myrteametrics/myrtea-sdk/v4 v4.6.0 diff --git a/go.sum b/go.sum index ddfa33e..128e331 100644 --- a/go.sum +++ b/go.sum @@ -186,6 +186,7 @@ github.com/google/go-cmp v0.5.2/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/ github.com/google/go-cmp v0.5.4/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38= +github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= github.com/google/martian v2.1.0+incompatible/go.mod h1:9I4somxYTbIHy5NJKHRl3wXiIaQGbYVAs8BPL6v8lEs= github.com/google/martian/v3 v3.0.0/go.mod h1:y5Zk1BBys9G+gd6Jrk0W3cC1+ELVxBWuIGO+w/tUAp0= diff --git a/internals/configuration/constants.go b/internals/configuration/constants.go index 3a3ca36..1bc0d8e 100644 --- a/internals/configuration/constants.go +++ b/internals/configuration/constants.go @@ -21,6 +21,7 @@ var AllowedConfigKey = [][]helpers.ConfigKey{ { {Type: helpers.StringFlag, Name: "ELASTICSEARCH_HTTP_TIMEOUT", DefaultValue: "1m", Description: "Elasticsearch HTTP Client timeout"}, {Type: helpers.StringFlag, Name: "ELASTICSEARCH_DIRECT_MULTI_GET_MODE", DefaultValue: "true", Description: "Elasticsearch direct multi-get mode enabled"}, + {Type: helpers.StringFlag, Name: "ELASTICSEARCH_MGET_BATCH_SIZE", DefaultValue: "1000", Description: "Elasticsearch Mget max batch size"}, {Type: helpers.StringFlag, Name: "INGESTER_MAXIMUM_WORKERS", DefaultValue: "2", Description: "Typed Ingester's maximum parallel workers"}, {Type: helpers.StringFlag, Name: "TYPEDINGESTER_QUEUE_BUFFER_SIZE", DefaultValue: "5000", Description: "Typed ingester's internal queue size"}, {Type: helpers.StringFlag, Name: "WORKER_QUEUE_BUFFER_SIZE", DefaultValue: "5000", Description: "Worker's internal queue size"}, diff --git a/internals/ingester/worker.go b/internals/ingester/worker.go index ed74af7..8585b10 100644 --- a/internals/ingester/worker.go +++ b/internals/ingester/worker.go @@ -24,13 +24,14 @@ type IndexingWorker interface { func NewIndexingWorker(typedIngester *TypedIngester, id int) (IndexingWorker, error) { version := viper.GetInt("ELASTICSEARCH_VERSION") + mgetBatchSize := viper.GetInt("ELASTICSEARCH_MGET_BATCH_SIZE") switch version { case 6: return NewIndexingWorkerV6(typedIngester, id), nil case 7: fallthrough case 8: - return NewIndexingWorkerV8(typedIngester, id), nil + return NewIndexingWorkerV8(typedIngester, id, mgetBatchSize), nil default: zap.L().Fatal("Unsupported Elasticsearch version", zap.Int("version", version)) return nil, errors.New("Unsupported Elasticsearch version") diff --git a/internals/ingester/worker_v6.go b/internals/ingester/worker_v6.go index 31b09fc..8f3f781 100644 --- a/internals/ingester/worker_v6.go +++ b/internals/ingester/worker_v6.go @@ -35,6 +35,7 @@ type IndexingWorkerV6 struct { } // NewIndexingWorkerV6 returns a new IndexingWorkerV6 +// Deprecated func NewIndexingWorkerV6(typedIngester *TypedIngester, id int) *IndexingWorkerV6 { var data chan UpdateCommand diff --git a/internals/ingester/worker_v8.go b/internals/ingester/worker_v8.go index b981815..0ec0e31 100644 --- a/internals/ingester/worker_v8.go +++ b/internals/ingester/worker_v8.go @@ -10,7 +10,6 @@ import ( "github.com/elastic/go-elasticsearch/v8/esapi" "github.com/elastic/go-elasticsearch/v8/typedapi/core/mget" - "github.com/elastic/go-elasticsearch/v8/typedapi/some" "github.com/elastic/go-elasticsearch/v8/typedapi/types" "github.com/go-kit/kit/metrics" "github.com/google/uuid" @@ -28,6 +27,7 @@ type IndexingWorkerV8 struct { TypedIngester *TypedIngester ID int Data chan UpdateCommand + mgetBatchSize int metricWorkerQueueGauge metrics.Gauge metricWorkerMessage metrics.Counter metricWorkerFlushDuration metrics.Histogram @@ -40,8 +40,7 @@ type IndexingWorkerV8 struct { } // NewIndexingWorkerV8 returns a new IndexingWorkerV8 -func NewIndexingWorkerV8(typedIngester *TypedIngester, id int) *IndexingWorkerV8 { - +func NewIndexingWorkerV8(typedIngester *TypedIngester, id, mgetBatchSize int) *IndexingWorkerV8 { var data chan UpdateCommand if workerQueueSize := viper.GetInt("WORKER_QUEUE_BUFFER_SIZE"); workerQueueSize > 0 { data = make(chan UpdateCommand, workerQueueSize) @@ -53,6 +52,7 @@ func NewIndexingWorkerV8(typedIngester *TypedIngester, id int) *IndexingWorkerV8 Uuid: uuid.New(), TypedIngester: typedIngester, ID: id, + mgetBatchSize: mgetBatchSize, Data: data, metricWorkerQueueGauge: _metricWorkerQueueGauge.With("typedingester", typedIngester.DocumentType, "workerid", strconv.Itoa(id)), metricWorkerMessage: _metricWorkerMessage.With("typedingester", typedIngester.DocumentType, "workerid", strconv.Itoa(id)), @@ -107,9 +107,15 @@ func (worker *IndexingWorkerV8) Run() { // Send indexing bulk (when buffer is full or on timeout) case <-forceFlush: - zap.L().Info("Try on after timeout reached", zap.String("typedIngesterUUID", worker.TypedIngester.Uuid.String()), zap.String("workerUUID", worker.Uuid.String()), zap.String("TypedIngester", worker.TypedIngester.DocumentType), zap.Int("WorkerID", worker.ID), zap.Int("Messages", len(buffer)), zap.Int("workerLen", len(worker.Data)), zap.Int("Timeout", forceFlushTimeout)) + zap.L().Info("Try on after timeout reached", zap.String("typedIngesterUUID", worker.TypedIngester.Uuid.String()), + zap.String("workerUUID", worker.Uuid.String()), zap.String("TypedIngester", worker.TypedIngester.DocumentType), + zap.Int("WorkerID", worker.ID), zap.Int("Messages", len(buffer)), zap.Int("workerLen", len(worker.Data)), + zap.Int("Timeout", forceFlushTimeout)) if len(buffer) > 0 { - zap.L().Info("Flushing on timeout reached", zap.String("typedIngesterUUID", worker.TypedIngester.Uuid.String()), zap.String("workerUUID", worker.Uuid.String()), zap.String("TypedIngester", worker.TypedIngester.DocumentType), zap.Int("WorkerID", worker.ID), zap.Int("Messages", len(buffer)), zap.Int("workerLen", len(worker.Data)), zap.Int("Timeout", forceFlushTimeout)) + zap.L().Info("Flushing on timeout reached", zap.String("typedIngesterUUID", worker.TypedIngester.Uuid.String()), + zap.String("workerUUID", worker.Uuid.String()), zap.String("TypedIngester", worker.TypedIngester.DocumentType), + zap.Int("WorkerID", worker.ID), zap.Int("Messages", len(buffer)), zap.Int("workerLen", len(worker.Data)), + zap.Int("Timeout", forceFlushTimeout)) worker.flushEsBuffer(buffer) buffer = buffer[:0] } @@ -117,10 +123,14 @@ func (worker *IndexingWorkerV8) Run() { // Build indexing bulk case uc := <-worker.Data: - zap.L().Debug("Receive UpdateCommand", zap.String("typedIngesterUUID", worker.TypedIngester.Uuid.String()), zap.String("workerUUID", worker.Uuid.String()), zap.String("TypedIngester", worker.TypedIngester.DocumentType), zap.Int("WorkerID", worker.ID), zap.Any("UpdateCommand", uc)) + zap.L().Debug("Receive UpdateCommand", zap.String("typedIngesterUUID", worker.TypedIngester.Uuid.String()), + zap.String("workerUUID", worker.Uuid.String()), zap.String("TypedIngester", worker.TypedIngester.DocumentType), + zap.Int("WorkerID", worker.ID), zap.Any("UpdateCommand", uc)) buffer = append(buffer, uc) if len(buffer) >= bufferLength { - zap.L().Info("Try flushing on full buffer", zap.String("typedIngesterUUID", worker.TypedIngester.Uuid.String()), zap.String("workerUUID", worker.Uuid.String()), zap.String("TypedIngester", worker.TypedIngester.DocumentType), zap.Int("WorkerID", worker.ID), zap.Int("Messages", len(buffer)), zap.Int("workerLen", len(worker.Data))) + zap.L().Info("Try flushing on full buffer", zap.String("typedIngesterUUID", worker.TypedIngester.Uuid.String()), + zap.String("workerUUID", worker.Uuid.String()), zap.String("TypedIngester", worker.TypedIngester.DocumentType), + zap.Int("WorkerID", worker.ID), zap.Int("Messages", len(buffer)), zap.Int("workerLen", len(worker.Data))) worker.flushEsBuffer(buffer) buffer = buffer[:0] forceFlush = worker.resetForceFlush(forceFlushTimeout) @@ -167,150 +177,26 @@ func (worker *IndexingWorkerV8) flushEsBuffer(buffer []UpdateCommand) { worker.metricWorkerFlushDuration.Observe(float64(time.Since(start).Nanoseconds()) / 1e9) } -// directBulkChainedUpdate part of ELASTICSEARCH_DIRECT_MULTI_GET_MODE=true -func (worker *IndexingWorkerV8) directBulkChainedUpdate(updateCommandGroups [][]UpdateCommand) { - zap.L().Debug("DirectBulkChainUpdate", zap.String("TypedIngester", worker.TypedIngester.DocumentType), zap.Int("WorkerID", worker.ID), zap.String("step", "starting")) - zap.L().Debug("DirectBulkChainUpdate", zap.String("TypedIngester", worker.TypedIngester.DocumentType), zap.Int("WorkerID", worker.ID), zap.String("step", "directMultiGetDocs")) - - start := time.Now() - refDocs, err := worker.directMultiGetDocs(updateCommandGroups) - worker.metricWorkerDirectMultiGetDuration.Observe(float64(time.Since(start).Nanoseconds()) / 1e9) - - if err != nil { - zap.L().Error("directMultiGetDocs", zap.Error(err)) - } - - zap.L().Debug("DirectBulkChainUpdate", zap.String("TypedIngester", worker.TypedIngester.DocumentType), zap.Int("WorkerID", worker.ID), zap.String("step", "applyMerges")) - - start = time.Now() - push, err := worker.applyDirectMerges(updateCommandGroups, refDocs) - worker.metricWorkerApplyMergesDuration.Observe(float64(time.Since(start).Nanoseconds()) / 1e9) - - if err != nil { - zap.L().Error("applyDirectMerges", zap.Error(err)) - } - - zap.L().Debug("DirectBulkChainUpdate", zap.String("TypedIngester", worker.TypedIngester.DocumentType), zap.Int("WorkerID", worker.ID), zap.String("step", "bulkIndex")) - - start = time.Now() - err = worker.bulkIndex(push) - worker.metricWorkerBulkIndexDuration.Observe(float64(time.Since(start).Nanoseconds()) / 1e9) - - if err != nil { - zap.L().Error("bulkIndex", zap.Error(err)) - } - zap.L().Debug("DirectBulkChainUpdate", zap.String("TypedIngester", worker.TypedIngester.DocumentType), zap.Int("WorkerID", worker.ID), zap.String("step", "done")) -} - -// directMultiGetDocs part of ELASTICSEARCH_DIRECT_MULTI_GET_MODE=true -func (worker *IndexingWorkerV8) directMultiGetDocs(updateCommandGroups [][]UpdateCommand) ([]models.Document, error) { - docs := make([]*models.Document, 0) - for _, updateCommandGroup := range updateCommandGroups { - docs = append(docs, &models.Document{Index: updateCommandGroup[0].Index, ID: updateCommandGroup[0].DocumentID}) - } - - zap.L().Debug("Executing multiget", zap.String("TypedIngester", worker.TypedIngester.DocumentType), zap.Int("WorkerID", worker.ID)) - - source := make(map[string]interface{}) - sourceItems := make([]types.MgetOperation, len(docs)) - for i, doc := range docs { - sourceItems[i] = types.MgetOperation{Index_: some.String(doc.Index), Id_: doc.ID} - } - source["docs"] = sourceItems - - req := mget.NewRequest() - req.Docs = sourceItems - - ctx, cancel := context.WithTimeout(context.Background(), 2*time.Minute) - defer cancel() - response, err := elasticsearchv8.C().Mget().Request(req).Do(ctx) - if err != nil { - zap.L().Warn("json encode source", zap.Error(err)) - } - zap.L().Debug("Executing multiget", zap.String("TypedIngester", worker.TypedIngester.DocumentType), zap.Int("WorkerID", worker.ID), zap.String("status", "done")) - - if err != nil || response.Docs == nil || len(response.Docs) == 0 { - zap.L().Error("MultiGet (self)", zap.Error(err)) - } - - refDocs := make([]models.Document, 0) - for _, d := range response.Docs { - switch typedDoc := d.(type) { - // case types.MultiGetError: - // not working :( - - // case types.GetResult: - // not working :( - - case map[string]interface{}: - jsonString, err := jsoniter.Marshal(typedDoc) - if err != nil { - zap.L().Error("update multiget unmarshal", zap.Error(err)) - refDocs = append(refDocs, models.Document{}) - continue - } - - var typedDocOk types.GetResult - err = jsoniter.Unmarshal(jsonString, &typedDocOk) - if err != nil { - zap.L().Error("update multiget unmarshal", zap.Error(err)) - refDocs = append(refDocs, models.Document{}) - continue - } - if len(typedDocOk.Source_) == 0 { - // no source => MultiGetError - refDocs = append(refDocs, models.Document{}) - continue - } - - var source map[string]interface{} - err = jsoniter.Unmarshal(typedDocOk.Source_, &source) - if err != nil { - zap.L().Error("update multiget unmarshal", zap.Error(err)) - refDocs = append(refDocs, models.Document{}) - continue - } - - if typedDocOk.Found { - refDocs = append(refDocs, models.Document{ID: typedDocOk.Id_, Index: typedDocOk.Index_, IndexType: "_doc", Source: source}) - } else { - refDocs = append(refDocs, models.Document{}) - } - - // if len(refDocs) > i && refDocs[i].ID == "" { - // if typedDocOk.Found { - // refDocs[i] = models.Document{ID: typedDocOk.Id_, Index: typedDocOk.Index_, IndexType: "_doc", Source: source} - // } - // } else { - // if typedDocOk.Found { - // refDocs = append(refDocs, models.Document{ID: typedDocOk.Id_, Index: typedDocOk.Index_, IndexType: "_doc", Source: source}) - // } else { - // refDocs = append(refDocs, models.Document{}) - // } - // } - default: - zap.L().Error("Unknown response type", zap.Any("typedDoc", typedDoc), zap.Any("type", reflect.TypeOf(typedDoc))) - } - } - - return refDocs, nil -} - // bulkChainedUpdate process multiple groups of UpdateCommand // It execute sequentialy every single UpdateCommand on a specific "source" document, for each group of commands // part of ELASTICSEARCH_DIRECT_MULTI_GET_MODE=false func (worker *IndexingWorkerV8) bulkChainedUpdate(updateCommandGroups [][]UpdateCommand) { - zap.L().Debug("BulkChainUpdate", zap.String("TypedIngester", worker.TypedIngester.DocumentType), zap.Int("WorkerID", worker.ID), zap.String("step", "starting")) + zap.L().Debug("BulkChainUpdate", zap.String("TypedIngester", worker.TypedIngester.DocumentType), + zap.Int("WorkerID", worker.ID), zap.String("step", "starting")) + docs := make([]GetQuery, 0) for _, commands := range updateCommandGroups { docs = append(docs, GetQuery{DocumentType: commands[0].DocumentType, ID: commands[0].DocumentID}) } if len(docs) == 0 { - zap.L().Warn("empty docs update", zap.String("typedIngesterUUID", worker.TypedIngester.Uuid.String()), zap.String("workerUUID", worker.Uuid.String()), zap.String("TypedIngester", worker.TypedIngester.DocumentType), zap.Int("WorkerID", worker.ID)) + zap.L().Warn("empty docs update", zap.String("typedIngesterUUID", worker.TypedIngester.Uuid.String()), + zap.String("workerUUID", worker.Uuid.String()), + zap.String("TypedIngester", worker.TypedIngester.DocumentType), zap.Int("WorkerID", worker.ID)) return } - zap.L().Debug("BulkChainUpdate", zap.String("TypedIngester", worker.TypedIngester.DocumentType), zap.Int("WorkerID", worker.ID), zap.String("step", "getindices")) + zap.L().Debug("BulkChainUpdate", zap.String("TypedIngester", worker.TypedIngester.DocumentType), + zap.Int("WorkerID", worker.ID), zap.String("step", "getindices")) start := time.Now() indices, err := worker.getIndices(docs[0].DocumentType) worker.metricWorkerGetIndicesDuration.Observe(float64(time.Since(start).Nanoseconds()) / 1e9) @@ -318,17 +204,20 @@ func (worker *IndexingWorkerV8) bulkChainedUpdate(updateCommandGroups [][]Update zap.L().Error("getIndices", zap.Error(err)) } - zap.L().Debug("BulkChainUpdate", zap.String("TypedIngester", worker.TypedIngester.DocumentType), zap.Int("WorkerID", worker.ID), zap.String("step", "multiGetFindRefDocsFull")) + zap.L().Debug("BulkChainUpdate", zap.String("TypedIngester", worker.TypedIngester.DocumentType), + zap.Int("WorkerID", worker.ID), zap.String("step", "multiGetFindRefDocsFull")) start = time.Now() - refDocs, err := worker.multiGetFindRefDocsFull(indices, docs) + refDocs, err := worker.multiGetFindRefDocsFullV2(indices, docs) worker.metricWorkerDirectMultiGetDuration.Observe(float64(time.Since(start).Nanoseconds()) / 1e9) if err != nil { zap.L().Error("multiGetFindRefDocsFull", zap.Error(err)) } - zap.L().Debug("BulkChainUpdate", zap.String("TypedIngester", worker.TypedIngester.DocumentType), zap.Int("WorkerID", worker.ID), zap.String("step", "applyMerges")) + zap.L().Debug("BulkChainUpdate", zap.String("TypedIngester", worker.TypedIngester.DocumentType), + zap.Int("WorkerID", worker.ID), zap.String("step", "applyMerges")) + start = time.Now() push, err := worker.applyMerges(updateCommandGroups, refDocs) worker.metricWorkerApplyMergesDuration.Observe(float64(time.Since(start).Nanoseconds()) / 1e9) @@ -337,7 +226,9 @@ func (worker *IndexingWorkerV8) bulkChainedUpdate(updateCommandGroups [][]Update zap.L().Error("applyMerges", zap.Error(err)) } - zap.L().Debug("BulkChainUpdate", zap.String("TypedIngester", worker.TypedIngester.DocumentType), zap.Int("WorkerID", worker.ID), zap.String("step", "bulkIndex")) + zap.L().Debug("BulkChainUpdate", zap.String("TypedIngester", worker.TypedIngester.DocumentType), + zap.Int("WorkerID", worker.ID), zap.String("step", "bulkIndex")) + start = time.Now() err = worker.bulkIndex(push) worker.metricWorkerBulkIndexDuration.Observe(float64(time.Since(start).Nanoseconds()) / 1e9) @@ -345,7 +236,8 @@ func (worker *IndexingWorkerV8) bulkChainedUpdate(updateCommandGroups [][]Update zap.L().Error("bulkIndex", zap.Error(err)) } - zap.L().Info("BulkChainUpdate", zap.String("TypedIngester", worker.TypedIngester.DocumentType), zap.Int("WorkerID", worker.ID), zap.String("step", "done")) + zap.L().Info("BulkChainUpdate", zap.String("TypedIngester", worker.TypedIngester.DocumentType), + zap.Int("WorkerID", worker.ID), zap.String("step", "done")) } // getIndices part of ELASTICSEARCH_DIRECT_MULTI_GET_MODE=false @@ -371,7 +263,8 @@ func (worker *IndexingWorkerV8) getIndices(documentType string) ([]string, error Aliases map[string]interface{} `json:"aliases"` }) if err = jsoniter.NewDecoder(res.Body).Decode(&r); err != nil { - zap.L().Error("decode alias response", zap.Error(err), zap.String("alias", alias), zap.Any("request", esapi.IndicesGetAliasRequest{Name: []string{alias}})) + zap.L().Error("decode alias response", zap.Error(err), zap.String("alias", alias), + zap.Any("request", esapi.IndicesGetAliasRequest{Name: []string{alias}})) return make([]string, 0), errors.New("alias not found") } @@ -382,9 +275,170 @@ func (worker *IndexingWorkerV8) getIndices(documentType string) ([]string, error return indices, err } +// multiGetFindRefDocsFull part of ELASTICSEARCH_DIRECT_MULTI_GET_MODE=false +func (worker *IndexingWorkerV8) multiGetFindRefDocsFullV2(indices []string, docs []GetQuery) (map[string]models.Document, error) { + refDocs := map[string]models.Document{} + var mgetBatches []map[string]GetQuery + currentMgetBatch := map[string]GetQuery{} + + for i := 0; i < len(docs); i++ { + if i != 0 && worker.mgetBatchSize != 0 && i%worker.mgetBatchSize == 0 { + mgetBatches = append(mgetBatches, currentMgetBatch) + currentMgetBatch = map[string]GetQuery{} + } + + currentMgetBatch[docs[i].ID] = docs[i] + } + mgetBatches = append(mgetBatches, currentMgetBatch) + + // all batches must be checked on all indices, + // so we loop on indices and then on batches + for _, idx := range indices { + for _, batch := range mgetBatches { + if len(batch) == 0 { + continue + } + + responseDocs, err := worker.multiGetFindRefDocsV2(idx, batch) + if err != nil { + zap.L().Error("multiGetFindRefDocs", zap.Error(err)) + } + + for _, doc := range responseDocs.Docs { + if !doc.Found { + continue + } + + // a document was found, add it to the refDocs map + refDocs[doc.Id_] = models.Document{ID: doc.Id_, Index: doc.Index_, IndexType: "_doc", Source: doc.Source_} + + // remove it from the batch + delete(batch, doc.Id_) + } + + // Should we? + // mgetBatches = worker.reorderBatches(mgetBatches) + + } + } + + return refDocs, nil +} + +// reorderBatches FIXME: not really optimised, should not be used, but rewritten +func (worker *IndexingWorkerV8) reorderBatches(mgetBatch []map[string]GetQuery) []map[string]GetQuery { + // here we reorder the batches, so that the first batch is the one with the most found documents (to avoid useless requests) + // the first must always have the most but limited by worker.mgetBatchSize + if len(mgetBatch) <= 1 { + return mgetBatch + } + for i := 1; i < len(mgetBatch); i++ { + // check if batch is full + if len(mgetBatch[i]) >= worker.mgetBatchSize { + continue + } + + // check if we have a next batch, continue else + if i+1 >= len(mgetBatch) { + continue + } + + // move as much as possible from the next batch to the current one (limited by worker.mgetBatchSize) + for k, v := range mgetBatch[i+1] { + if len(mgetBatch[i]) >= worker.mgetBatchSize { + break + } + mgetBatch[i][k] = v + delete(mgetBatch[i+1], k) + } + } + return mgetBatch +} + +type multiGetResponseItem struct { + //Fields map[string]jsoniter.RawMessage `json:"fields,omitempty"` + Found bool `json:"found"` + Id_ string `json:"_id"` + Index_ string `json:"_index"` + //PrimaryTerm_ *int64 `json:"_primary_term,omitempty"` + //Routing_ *string `json:"_routing,omitempty"` + //SeqNo_ *int64 `json:"_seq_no,omitempty"` + Source_ map[string]interface{} `json:"_source,omitempty"` + //Version_ *int64 `json:"_version,omitempty"` +} + +type multiGetResponse struct { + Docs []multiGetResponseItem `json:"docs"` +} + +// multiGetFindRefDocs part of ELASTICSEARCH_DIRECT_MULTI_GET_MODE=false +func (worker *IndexingWorkerV8) multiGetFindRefDocsV2(index string, queries map[string]GetQuery) (*multiGetResponse, error) { + if len(queries) == 0 { + return nil, errors.New("docs[] is empty") + } + + zap.L().Debug("Executing multiget", zap.String("TypedIngester", worker.TypedIngester.DocumentType), zap.Int("WorkerID", worker.ID), zap.String("index", index)) + + source := make(map[string]interface{}) + sourceItems := make([]types.MgetOperation, len(queries)) + i := 0 + for id, _ := range queries { + sourceItems[i] = types.MgetOperation{Id_: id} + i++ + } + source["docs"] = sourceItems + + req := mget.NewRequest() + req.Docs = sourceItems + + ctx, cancel := context.WithTimeout(context.Background(), 3*time.Minute) + defer cancel() + + response, err := worker.perfomMgetRequest(elasticsearchv8.C().Mget().Index(index).Request(req), ctx) + if err != nil { + zap.L().Warn("json encode source", zap.Error(err)) + } + + zap.L().Debug("Executing multiget", zap.String("TypedIngester", worker.TypedIngester.DocumentType), zap.Int("WorkerID", worker.ID), zap.String("index", index), zap.String("status", "done")) + + if err != nil || response.Docs == nil || len(response.Docs) == 0 { + zap.L().Error("MultiGet (self)", zap.Error(err)) + return &multiGetResponse{Docs: make([]multiGetResponseItem, 0)}, err + } + return response, nil +} + +// performMgetRequest part of ELASTICSEARCH_DIRECT_MULTI_GET_MODE=false +func (worker *IndexingWorkerV8) perfomMgetRequest(r *mget.Mget, ctx context.Context) (*multiGetResponse, error) { + response := &multiGetResponse{} + + res, err := r.Perform(ctx) + if err != nil { + return nil, err + } + defer res.Body.Close() + + if res.StatusCode < 299 { + err = jsoniter.NewDecoder(res.Body).Decode(response) + if err != nil { + return nil, err + } + return response, nil + } + + errorResponse := types.NewElasticsearchError() + err = jsoniter.NewDecoder(res.Body).Decode(errorResponse) + if err != nil { + return nil, err + } + + return nil, errorResponse +} + // multiGetFindRefDocsFull part of ELASTICSEARCH_DIRECT_MULTI_GET_MODE=false func (worker *IndexingWorkerV8) multiGetFindRefDocsFull(indices []string, docs []GetQuery) ([]models.Document, error) { refDocs := make([]models.Document, 0) + var findDocs bool for _, doc := range docs { sliceDoc := []GetQuery{doc} @@ -423,11 +477,14 @@ func (worker *IndexingWorkerV8) multiGetFindRefDocsFull(indices []string, docs [ if typedDocOk.Found { findDocs = true - refDocs = append(refDocs, models.Document{ID: typedDocOk.Id_, Index: typedDocOk.Index_, IndexType: "_doc", Source: source}) + refDocs = append(refDocs, models.Document{ + ID: typedDocOk.Id_, Index: typedDocOk.Index_, IndexType: "_doc", Source: source, + }) break } default: - zap.L().Error("Unknown response type", zap.Any("typedDoc", typedDoc), zap.Any("type", reflect.TypeOf(typedDoc))) + zap.L().Error("Unknown response type", zap.Any("typedDoc", typedDoc), + zap.Any("type", reflect.TypeOf(typedDoc))) } } @@ -450,7 +507,8 @@ func (worker *IndexingWorkerV8) multiGetFindRefDocs(index string, queries []GetQ return nil, errors.New("docs[] is empty") } - zap.L().Debug("Executing multiget", zap.String("TypedIngester", worker.TypedIngester.DocumentType), zap.Int("WorkerID", worker.ID), zap.String("index", index)) + zap.L().Debug("Executing multiget", zap.String("TypedIngester", worker.TypedIngester.DocumentType), + zap.Int("WorkerID", worker.ID), zap.String("index", index)) source := make(map[string]interface{}) sourceItems := make([]types.MgetOperation, len(queries)) @@ -469,7 +527,8 @@ func (worker *IndexingWorkerV8) multiGetFindRefDocs(index string, queries []GetQ zap.L().Warn("json encode source", zap.Error(err)) } - zap.L().Debug("Executing multiget", zap.String("TypedIngester", worker.TypedIngester.DocumentType), zap.Int("WorkerID", worker.ID), zap.String("index", index), zap.String("status", "done")) + zap.L().Debug("Executing multiget", zap.String("TypedIngester", worker.TypedIngester.DocumentType), + zap.Int("WorkerID", worker.ID), zap.String("index", index), zap.String("status", "done")) if err != nil || response.Docs == nil || len(response.Docs) == 0 { zap.L().Error("MultiGet (self)", zap.Error(err)) @@ -479,51 +538,30 @@ func (worker *IndexingWorkerV8) multiGetFindRefDocs(index string, queries []GetQ } // applyMerges part of ELASTICSEARCH_DIRECT_MULTI_GET_MODE=false -func (worker *IndexingWorkerV8) applyMerges(documents [][]UpdateCommand, refDocs []models.Document) ([]models.Document, error) { +func (worker *IndexingWorkerV8) applyMerges(documents [][]UpdateCommand, refDocs map[string]models.Document) ([]models.Document, error) { var push = make([]models.Document, 0) - var i int + for _, commands := range documents { var doc models.Document - if len(refDocs) > i { - doc = refDocs[i] - if doc.Index == "" { - doc.ID = commands[0].DocumentID - doc.Index = buildAliasName(commands[0].DocumentType, index.Last) + + docID := commands[0].DocumentID + + if found, ok := refDocs[docID]; ok && found.Index != "" { + doc = refDocs[docID] + } else { + doc = models.Document{ + ID: docID, + Index: buildAliasName(commands[0].DocumentType, index.Last), } } // Index setup should probably not be here (be before in the indexing chain) for _, command := range commands { - // if command.NewDoc.Index == "" { - // command.NewDoc.Index = buildAliasName(command.DocumentType, index.Last) - // } doc = ApplyMergeLight(doc, command) } doc.IndexType = "document" push = append(push, doc) - i++ // synchronise map iteration with reponse.Docs - } - return push, nil -} - -// multiGetFindRefDocs part of ELASTICSEARCH_DIRECT_MULTI_GET_MODE=true -func (worker *IndexingWorkerV8) applyDirectMerges(updateCommandGroups [][]UpdateCommand, refDocs []models.Document) ([]models.Document, error) { - - push := make([]models.Document, 0) - for i, updateCommandGroup := range updateCommandGroups { - var pushDoc models.Document - if len(refDocs) > i { - pushDoc = models.Document{ID: refDocs[i].ID, Index: refDocs[i].Index, IndexType: refDocs[i].IndexType, Source: refDocs[i].Source} - } - for _, command := range updateCommandGroup { - if pushDoc.ID == "" { - pushDoc = models.Document{ID: command.NewDoc.ID, Index: command.NewDoc.Index, IndexType: command.NewDoc.IndexType, Source: command.NewDoc.Source} - } else { - pushDoc = ApplyMergeLight(pushDoc, command) - } - } - push = append(push, pushDoc) } return push, nil @@ -585,7 +623,7 @@ func (worker *IndexingWorkerV8) bulkIndex(docs []models.Document) error { return err } if res.IsError() { - zap.L().Error("error") + zap.L().Error("error", zap.Strings("warnings", res.Warnings()), zap.String("response", res.String())) return errors.New("error during bulkrequest") } @@ -628,49 +666,3 @@ func (worker *IndexingWorkerV8) bulkIndex(docs []models.Document) error { } return nil } - -// func (worker *IndexingWorkerV8) bulkIndexNew(docs []models.Document) error { - -// client := &elasticsearch.Client{ -// BaseClient: elasticsearchv8.C().BaseClient, -// } -// client.API = esapi.New(client) - -// bulkIndexer, err := esutil.NewBulkIndexer(esutil.BulkIndexerConfig{ -// Client: client, -// }) -// if err != nil { -// return err -// } - -// for _, doc := range docs { -// sourceStr, err := jsoniter.Marshal(doc.Source) -// if err != nil { -// return err -// } -// err = bulkIndexer.Add(context.Background(), esutil.BulkIndexerItem{ -// Index: doc.Index, -// DocumentID: doc.ID, -// Body: bytes.NewReader(sourceStr), -// // OnSuccess: func(ctx context.Context, item esutil.BulkIndexerItem, res esutil.BulkIndexerResponseItem) { -// // fmt.Printf("[%d] %s test/%s", res.Status, res.Result, item.DocumentID) -// // }, -// OnFailure: func(ctx context.Context, item esutil.BulkIndexerItem, res esutil.BulkIndexerResponseItem, err error) { -// if err != nil { -// zap.L().Warn("Fail to index document to elasticsearch", zap.Error(err)) -// } else { -// zap.L().Warn("Fail to index document to elasticsearch", -// zap.String("errorType", res.Error.Type), -// zap.String("errorReason", res.Error.Reason)) -// } -// }, -// }) -// if err != nil { -// return err -// } -// } - -// err = bulkIndexer.Close(context.Background()) - -// zap.L().Info("bulk", zap.Any("stats", bulkIndexer.Stats())) -// } diff --git a/internals/ingester/worker_v8_direct.go b/internals/ingester/worker_v8_direct.go new file mode 100644 index 0000000..394096d --- /dev/null +++ b/internals/ingester/worker_v8_direct.go @@ -0,0 +1,130 @@ +package ingester + +import ( + "context" + "github.com/elastic/go-elasticsearch/v8/typedapi/core/mget" + "github.com/elastic/go-elasticsearch/v8/typedapi/some" + "github.com/elastic/go-elasticsearch/v8/typedapi/types" + "github.com/myrteametrics/myrtea-sdk/v4/elasticsearchv8" + "github.com/myrteametrics/myrtea-sdk/v4/models" + "go.uber.org/zap" + "time" +) + +// directBulkChainedUpdate part of ELASTICSEARCH_DIRECT_MULTI_GET_MODE=true +func (worker *IndexingWorkerV8) directBulkChainedUpdate(updateCommandGroups [][]UpdateCommand) { + zap.L().Debug("DirectBulkChainUpdate", zap.String("TypedIngester", worker.TypedIngester.DocumentType), zap.Int("WorkerID", worker.ID), zap.String("step", "starting")) + zap.L().Debug("DirectBulkChainUpdate", zap.String("TypedIngester", worker.TypedIngester.DocumentType), zap.Int("WorkerID", worker.ID), zap.String("step", "directMultiGetDocs")) + + start := time.Now() + refDocs, err := worker.directMultiGetDocs(updateCommandGroups) + worker.metricWorkerDirectMultiGetDuration.Observe(float64(time.Since(start).Nanoseconds()) / 1e9) + + if err != nil { + zap.L().Error("directMultiGetDocs", zap.Error(err)) + } + + zap.L().Debug("DirectBulkChainUpdate", zap.String("TypedIngester", worker.TypedIngester.DocumentType), zap.Int("WorkerID", worker.ID), zap.String("step", "applyMerges")) + + start = time.Now() + push, err := worker.applyDirectMerges(updateCommandGroups, refDocs) + worker.metricWorkerApplyMergesDuration.Observe(float64(time.Since(start).Nanoseconds()) / 1e9) + + if err != nil { + zap.L().Error("applyDirectMerges", zap.Error(err)) + } + + zap.L().Debug("DirectBulkChainUpdate", zap.String("TypedIngester", worker.TypedIngester.DocumentType), zap.Int("WorkerID", worker.ID), zap.String("step", "bulkIndex")) + + start = time.Now() + err = worker.bulkIndex(push) + worker.metricWorkerBulkIndexDuration.Observe(float64(time.Since(start).Nanoseconds()) / 1e9) + + if err != nil { + zap.L().Error("bulkIndex", zap.Error(err)) + } + zap.L().Debug("DirectBulkChainUpdate", zap.String("TypedIngester", worker.TypedIngester.DocumentType), zap.Int("WorkerID", worker.ID), zap.String("step", "done")) +} + +// multiGetFindRefDocs part of ELASTICSEARCH_DIRECT_MULTI_GET_MODE=true +func (worker *IndexingWorkerV8) applyDirectMerges(updateCommandGroups [][]UpdateCommand, refDocs []models.Document) ([]models.Document, error) { + + push := make([]models.Document, 0) + for i, updateCommandGroup := range updateCommandGroups { + var pushDoc models.Document + if len(refDocs) > i { + pushDoc = models.Document{ID: refDocs[i].ID, Index: refDocs[i].Index, IndexType: refDocs[i].IndexType, Source: refDocs[i].Source} + } + for _, command := range updateCommandGroup { + if pushDoc.ID == "" { + pushDoc = models.Document{ID: command.NewDoc.ID, Index: command.NewDoc.Index, IndexType: command.NewDoc.IndexType, Source: command.NewDoc.Source} + } else { + pushDoc = ApplyMergeLight(pushDoc, command) + } + } + push = append(push, pushDoc) + } + + return push, nil +} + +// directMultiGetDocs part of ELASTICSEARCH_DIRECT_MULTI_GET_MODE=true +func (worker *IndexingWorkerV8) directMultiGetDocs(updateCommandGroups [][]UpdateCommand) ([]models.Document, error) { + docs := make([]*models.Document, 0) + for _, updateCommandGroup := range updateCommandGroups { + docs = append(docs, &models.Document{Index: updateCommandGroup[0].Index, ID: updateCommandGroup[0].DocumentID}) + } + + zap.L().Debug("Executing multiget", zap.String("TypedIngester", worker.TypedIngester.DocumentType), zap.Int("WorkerID", worker.ID)) + + source := make(map[string]interface{}) + sourceItems := make([]types.MgetOperation, len(docs)) + for i, doc := range docs { + sourceItems[i] = types.MgetOperation{Index_: some.String(doc.Index), Id_: doc.ID} + } + source["docs"] = sourceItems + + req := mget.NewRequest() + req.Docs = sourceItems + + ctx, cancel := context.WithTimeout(context.Background(), 3*time.Minute) + defer cancel() + zap.L().Debug("Executing multiget", zap.String("TypedIngester", worker.TypedIngester.DocumentType), zap.Int("WorkerID", worker.ID), zap.String("status", "done")) + response, err := worker.perfomMgetRequest(elasticsearchv8.C().Mget().Request(req), ctx) + if err != nil { + zap.L().Warn("perfomMgetRequest", zap.Error(err)) + } + + if err != nil || response.Docs == nil || len(response.Docs) == 0 { + zap.L().Error("MultiGet (self)", zap.Error(err)) + } + + refDocs := make([]models.Document, 0) + for _, d := range response.Docs { + if len(d.Source_) == 0 { + // no source => MultiGetError + refDocs = append(refDocs, models.Document{}) + continue + } + + if d.Found { + refDocs = append(refDocs, models.Document{ID: d.Id_, Index: d.Index_, IndexType: "_doc", Source: source}) + } else { + refDocs = append(refDocs, models.Document{}) + } + + // if len(refDocs) > i && refDocs[i].ID == "" { + // if typedDocOk.Found { + // refDocs[i] = models.Document{ID: typedDocOk.Id_, Index: typedDocOk.Index_, IndexType: "_doc", Source: source} + // } + // } else { + // if typedDocOk.Found { + // refDocs = append(refDocs, models.Document{ID: typedDocOk.Id_, Index: typedDocOk.Index_, IndexType: "_doc", Source: source}) + // } else { + // refDocs = append(refDocs, models.Document{}) + // } + // } + } + + return refDocs, nil +} diff --git a/main.go b/main.go index fd5630d..48b530b 100644 --- a/main.go +++ b/main.go @@ -3,6 +3,7 @@ package main import ( "context" "errors" + "github.com/myrteametrics/myrtea-sdk/v4/connector" "net/http" "os" "os/signal" @@ -56,12 +57,15 @@ func main() { serverTLSCert := viper.GetString("HTTP_SERVER_TLS_FILE_CRT") serverTLSKey := viper.GetString("HTTP_SERVER_TLS_FILE_KEY") - router := router.NewChiRouterSimple(router.ConfigSimple{ - Production: viper.GetBool("LOGGER_PRODUCTION"), - CORS: viper.GetBool("HTTP_SERVER_API_ENABLE_CORS"), - Security: viper.GetBool("HTTP_SERVER_API_ENABLE_SECURITY"), - GatewayMode: viper.GetBool("HTTP_SERVER_API_ENABLE_GATEWAY_MODE"), + done := make(chan os.Signal, 1) + apiKey := viper.GetString("ENGINE_API_KEY") + router := router.NewChiRouterSimple(router.ConfigSimple{ + Production: viper.GetBool("LOGGER_PRODUCTION"), + CORS: viper.GetBool("HTTP_SERVER_API_ENABLE_CORS"), + Security: viper.GetBool("HTTP_SERVER_API_ENABLE_SECURITY"), + GatewayMode: viper.GetBool("HTTP_SERVER_API_ENABLE_GATEWAY_MODE"), + Restarter: connector.NewRestarter(done, apiKey), VerboseError: false, AuthenticationMode: "BASIC", LogLevel: zapConfig.Level, @@ -80,7 +84,6 @@ func main() { srv = server.NewUnsecuredServer(serverPort, router) } - done := make(chan os.Signal, 1) signal.Notify(done, os.Interrupt, syscall.SIGINT, syscall.SIGTERM) go func() {