Skip to content

Commit

Permalink
Optimisations for indirect ingestion (#24)
Browse files Browse the repository at this point in the history
* First batch of optimisations

* completed optimisation on multiget

* new bachting system

* Delete internals/ingester/test.json

* Delete internals/ingester/.gitignore

* Update .gitignore
  • Loading branch information
SchawnnDev authored Sep 3, 2024
1 parent f4d5189 commit 68f3473
Show file tree
Hide file tree
Showing 9 changed files with 370 additions and 236 deletions.
6 changes: 5 additions & 1 deletion config/ingester-api.toml
Original file line number Diff line number Diff line change
Expand Up @@ -84,9 +84,13 @@ ELASTICSEARCH_HTTP_TIMEOUT = "1m"
# Default value: false
ELASTICSEARCH_DIRECT_MULTI_GET_MODE = "false"

# How many documents will be in the mget request of each batch (max)
# Default value: 1000
ELASTICSEARCH_MGET_BATCH_SIZE = "500"

# Specify the maximum number of concurrent worker by type of ingested document (1 type of document = n workers)
# Default value: 2
INGESTER_MAXIMUM_WORKERS = "1"
INGESTER_MAXIMUM_WORKERS = "2"

# Specify the typed-ingesters maximum queue size
# Default value: "5000"
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ require (
github.com/go-chi/chi/v5 v5.0.8
github.com/go-kit/kit v0.12.0
github.com/golang/protobuf v1.5.3
github.com/google/go-cmp v0.5.9
github.com/google/uuid v1.3.0
github.com/json-iterator/go v1.1.12
github.com/myrteametrics/myrtea-sdk/v4 v4.6.0
Expand Down
1 change: 1 addition & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,7 @@ github.com/google/go-cmp v0.5.2/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/
github.com/google/go-cmp v0.5.4/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38=
github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
github.com/google/martian v2.1.0+incompatible/go.mod h1:9I4somxYTbIHy5NJKHRl3wXiIaQGbYVAs8BPL6v8lEs=
github.com/google/martian/v3 v3.0.0/go.mod h1:y5Zk1BBys9G+gd6Jrk0W3cC1+ELVxBWuIGO+w/tUAp0=
Expand Down
1 change: 1 addition & 0 deletions internals/configuration/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ var AllowedConfigKey = [][]helpers.ConfigKey{
{
{Type: helpers.StringFlag, Name: "ELASTICSEARCH_HTTP_TIMEOUT", DefaultValue: "1m", Description: "Elasticsearch HTTP Client timeout"},
{Type: helpers.StringFlag, Name: "ELASTICSEARCH_DIRECT_MULTI_GET_MODE", DefaultValue: "true", Description: "Elasticsearch direct multi-get mode enabled"},
{Type: helpers.StringFlag, Name: "ELASTICSEARCH_MGET_BATCH_SIZE", DefaultValue: "1000", Description: "Elasticsearch Mget max batch size"},
{Type: helpers.StringFlag, Name: "INGESTER_MAXIMUM_WORKERS", DefaultValue: "2", Description: "Typed Ingester's maximum parallel workers"},
{Type: helpers.StringFlag, Name: "TYPEDINGESTER_QUEUE_BUFFER_SIZE", DefaultValue: "5000", Description: "Typed ingester's internal queue size"},
{Type: helpers.StringFlag, Name: "WORKER_QUEUE_BUFFER_SIZE", DefaultValue: "5000", Description: "Worker's internal queue size"},
Expand Down
3 changes: 2 additions & 1 deletion internals/ingester/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,14 @@ type IndexingWorker interface {

func NewIndexingWorker(typedIngester *TypedIngester, id int) (IndexingWorker, error) {
version := viper.GetInt("ELASTICSEARCH_VERSION")
mgetBatchSize := viper.GetInt("ELASTICSEARCH_MGET_BATCH_SIZE")
switch version {
case 6:
return NewIndexingWorkerV6(typedIngester, id), nil
case 7:
fallthrough
case 8:
return NewIndexingWorkerV8(typedIngester, id), nil
return NewIndexingWorkerV8(typedIngester, id, mgetBatchSize), nil
default:
zap.L().Fatal("Unsupported Elasticsearch version", zap.Int("version", version))
return nil, errors.New("Unsupported Elasticsearch version")
Expand Down
1 change: 1 addition & 0 deletions internals/ingester/worker_v6.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ type IndexingWorkerV6 struct {
}

// NewIndexingWorkerV6 returns a new IndexingWorkerV6
// Deprecated
func NewIndexingWorkerV6(typedIngester *TypedIngester, id int) *IndexingWorkerV6 {

var data chan UpdateCommand
Expand Down
448 changes: 220 additions & 228 deletions internals/ingester/worker_v8.go

Large diffs are not rendered by default.

130 changes: 130 additions & 0 deletions internals/ingester/worker_v8_direct.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
package ingester

import (
"context"
"github.com/elastic/go-elasticsearch/v8/typedapi/core/mget"
"github.com/elastic/go-elasticsearch/v8/typedapi/some"
"github.com/elastic/go-elasticsearch/v8/typedapi/types"
"github.com/myrteametrics/myrtea-sdk/v4/elasticsearchv8"
"github.com/myrteametrics/myrtea-sdk/v4/models"
"go.uber.org/zap"
"time"
)

// directBulkChainedUpdate part of ELASTICSEARCH_DIRECT_MULTI_GET_MODE=true
func (worker *IndexingWorkerV8) directBulkChainedUpdate(updateCommandGroups [][]UpdateCommand) {
zap.L().Debug("DirectBulkChainUpdate", zap.String("TypedIngester", worker.TypedIngester.DocumentType), zap.Int("WorkerID", worker.ID), zap.String("step", "starting"))
zap.L().Debug("DirectBulkChainUpdate", zap.String("TypedIngester", worker.TypedIngester.DocumentType), zap.Int("WorkerID", worker.ID), zap.String("step", "directMultiGetDocs"))

start := time.Now()
refDocs, err := worker.directMultiGetDocs(updateCommandGroups)
worker.metricWorkerDirectMultiGetDuration.Observe(float64(time.Since(start).Nanoseconds()) / 1e9)

if err != nil {
zap.L().Error("directMultiGetDocs", zap.Error(err))
}

zap.L().Debug("DirectBulkChainUpdate", zap.String("TypedIngester", worker.TypedIngester.DocumentType), zap.Int("WorkerID", worker.ID), zap.String("step", "applyMerges"))

start = time.Now()
push, err := worker.applyDirectMerges(updateCommandGroups, refDocs)
worker.metricWorkerApplyMergesDuration.Observe(float64(time.Since(start).Nanoseconds()) / 1e9)

if err != nil {
zap.L().Error("applyDirectMerges", zap.Error(err))
}

zap.L().Debug("DirectBulkChainUpdate", zap.String("TypedIngester", worker.TypedIngester.DocumentType), zap.Int("WorkerID", worker.ID), zap.String("step", "bulkIndex"))

start = time.Now()
err = worker.bulkIndex(push)
worker.metricWorkerBulkIndexDuration.Observe(float64(time.Since(start).Nanoseconds()) / 1e9)

if err != nil {
zap.L().Error("bulkIndex", zap.Error(err))
}
zap.L().Debug("DirectBulkChainUpdate", zap.String("TypedIngester", worker.TypedIngester.DocumentType), zap.Int("WorkerID", worker.ID), zap.String("step", "done"))
}

// multiGetFindRefDocs part of ELASTICSEARCH_DIRECT_MULTI_GET_MODE=true
func (worker *IndexingWorkerV8) applyDirectMerges(updateCommandGroups [][]UpdateCommand, refDocs []models.Document) ([]models.Document, error) {

push := make([]models.Document, 0)
for i, updateCommandGroup := range updateCommandGroups {
var pushDoc models.Document
if len(refDocs) > i {
pushDoc = models.Document{ID: refDocs[i].ID, Index: refDocs[i].Index, IndexType: refDocs[i].IndexType, Source: refDocs[i].Source}
}
for _, command := range updateCommandGroup {
if pushDoc.ID == "" {
pushDoc = models.Document{ID: command.NewDoc.ID, Index: command.NewDoc.Index, IndexType: command.NewDoc.IndexType, Source: command.NewDoc.Source}
} else {
pushDoc = ApplyMergeLight(pushDoc, command)
}
}
push = append(push, pushDoc)
}

return push, nil
}

// directMultiGetDocs part of ELASTICSEARCH_DIRECT_MULTI_GET_MODE=true
func (worker *IndexingWorkerV8) directMultiGetDocs(updateCommandGroups [][]UpdateCommand) ([]models.Document, error) {
docs := make([]*models.Document, 0)
for _, updateCommandGroup := range updateCommandGroups {
docs = append(docs, &models.Document{Index: updateCommandGroup[0].Index, ID: updateCommandGroup[0].DocumentID})
}

zap.L().Debug("Executing multiget", zap.String("TypedIngester", worker.TypedIngester.DocumentType), zap.Int("WorkerID", worker.ID))

source := make(map[string]interface{})
sourceItems := make([]types.MgetOperation, len(docs))
for i, doc := range docs {
sourceItems[i] = types.MgetOperation{Index_: some.String(doc.Index), Id_: doc.ID}
}
source["docs"] = sourceItems

req := mget.NewRequest()
req.Docs = sourceItems

ctx, cancel := context.WithTimeout(context.Background(), 3*time.Minute)
defer cancel()
zap.L().Debug("Executing multiget", zap.String("TypedIngester", worker.TypedIngester.DocumentType), zap.Int("WorkerID", worker.ID), zap.String("status", "done"))
response, err := worker.perfomMgetRequest(elasticsearchv8.C().Mget().Request(req), ctx)
if err != nil {
zap.L().Warn("perfomMgetRequest", zap.Error(err))
}

if err != nil || response.Docs == nil || len(response.Docs) == 0 {
zap.L().Error("MultiGet (self)", zap.Error(err))
}

refDocs := make([]models.Document, 0)
for _, d := range response.Docs {
if len(d.Source_) == 0 {
// no source => MultiGetError
refDocs = append(refDocs, models.Document{})
continue
}

if d.Found {
refDocs = append(refDocs, models.Document{ID: d.Id_, Index: d.Index_, IndexType: "_doc", Source: source})
} else {
refDocs = append(refDocs, models.Document{})
}

// if len(refDocs) > i && refDocs[i].ID == "" {
// if typedDocOk.Found {
// refDocs[i] = models.Document{ID: typedDocOk.Id_, Index: typedDocOk.Index_, IndexType: "_doc", Source: source}
// }
// } else {
// if typedDocOk.Found {
// refDocs = append(refDocs, models.Document{ID: typedDocOk.Id_, Index: typedDocOk.Index_, IndexType: "_doc", Source: source})
// } else {
// refDocs = append(refDocs, models.Document{})
// }
// }
}

return refDocs, nil
}
15 changes: 9 additions & 6 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package main
import (
"context"
"errors"
"github.com/myrteametrics/myrtea-sdk/v4/connector"
"net/http"
"os"
"os/signal"
Expand Down Expand Up @@ -56,12 +57,15 @@ func main() {
serverTLSCert := viper.GetString("HTTP_SERVER_TLS_FILE_CRT")
serverTLSKey := viper.GetString("HTTP_SERVER_TLS_FILE_KEY")

router := router.NewChiRouterSimple(router.ConfigSimple{
Production: viper.GetBool("LOGGER_PRODUCTION"),
CORS: viper.GetBool("HTTP_SERVER_API_ENABLE_CORS"),
Security: viper.GetBool("HTTP_SERVER_API_ENABLE_SECURITY"),
GatewayMode: viper.GetBool("HTTP_SERVER_API_ENABLE_GATEWAY_MODE"),
done := make(chan os.Signal, 1)
apiKey := viper.GetString("ENGINE_API_KEY")

router := router.NewChiRouterSimple(router.ConfigSimple{
Production: viper.GetBool("LOGGER_PRODUCTION"),
CORS: viper.GetBool("HTTP_SERVER_API_ENABLE_CORS"),
Security: viper.GetBool("HTTP_SERVER_API_ENABLE_SECURITY"),
GatewayMode: viper.GetBool("HTTP_SERVER_API_ENABLE_GATEWAY_MODE"),
Restarter: connector.NewRestarter(done, apiKey),
VerboseError: false,
AuthenticationMode: "BASIC",
LogLevel: zapConfig.Level,
Expand All @@ -80,7 +84,6 @@ func main() {
srv = server.NewUnsecuredServer(serverPort, router)
}

done := make(chan os.Signal, 1)
signal.Notify(done, os.Interrupt, syscall.SIGINT, syscall.SIGTERM)

go func() {
Expand Down

0 comments on commit 68f3473

Please sign in to comment.