From e423f9d21d63f21126667d00d5e1f1b6105c677d Mon Sep 17 00:00:00 2001 From: SchawnnDev Date: Mon, 3 Jun 2024 15:42:09 +0200 Subject: [PATCH 1/2] Added ignore_available & allow_no_indices es options to export --- internals/export/elasticsearch.go | 49 +++++++++++++++++---------- internals/export/worker.go | 10 +++++- internals/export/wrapper.go | 15 +++++--- internals/handlers/export_handlers.go | 10 +++--- 4 files changed, 57 insertions(+), 27 deletions(-) diff --git a/internals/export/elasticsearch.go b/internals/export/elasticsearch.go index e0311e7..1d8ae8f 100644 --- a/internals/export/elasticsearch.go +++ b/internals/export/elasticsearch.go @@ -22,6 +22,15 @@ type StreamedExport struct { Data chan []reader.Hit } +type ElasticParams struct { + Indices string + Client string + Limit int64 + SearchRequest *search.Request + IgnoreUnavailable bool + AllowNoIndices bool +} + // NewStreamedExport returns a pointer to a new StreamedExport instance // One instance per StreamedExport since the channel Data will be closed after export is finished func NewStreamedExport() *StreamedExport { @@ -62,25 +71,30 @@ func (export StreamedExport) StreamedExportFactHitsFull(ctx context.Context, f e indices := fact.FindIndices(f, ti, false) indicesStr := strings.Join(indices, ",") - return export.ProcessStreamedExport(ctx, "", indicesStr, searchRequest, limit) + params := ElasticParams{ + Indices: indicesStr, + Limit: limit, + SearchRequest: searchRequest, + } + return export.ProcessStreamedExport(ctx, params) } // ProcessStreamedExport export data from ElasticSearch to a channel // using a given elastic client, request and indices to query // Please note that the channel is not closed when this function is executed -func (export StreamedExport) ProcessStreamedExport(ctx context.Context, elasticClient, indicesStr string, searchRequest *search.Request, limit int64) error { +func (export StreamedExport) ProcessStreamedExport(ctx context.Context, params ElasticParams) error { cli := elasticsearch.C() // defaults to singleton instance - if elasticClient != "" { - config, b, err := esconfig.R().GetByName(elasticClient) + if params.Client != "" { + config, b, err := esconfig.R().GetByName(params.Client) if err != nil { zap.L().Error("Error when getting esconfig from repository", - zap.String("elasticClient", elasticClient), zap.Error(err)) + zap.String("elasticClient", params.Client), zap.Error(err)) return err } if !b { zap.L().Error("Error when getting esconfig from repository", - zap.String("elasticClient", elasticClient), zap.Error(err)) + zap.String("elasticClient", params.Client), zap.Error(err)) return errors.New("selected elasticClient not exists") } if !config.ExportActivated { @@ -102,7 +116,7 @@ func (export StreamedExport) ProcessStreamedExport(ctx context.Context, elasticC } // handle pit creation - pit, err := cli.OpenPointInTime(indicesStr).KeepAlive("5m").Do(context.Background()) + pit, err := cli.OpenPointInTime(params.Indices).KeepAlive("5m").Do(context.Background()) if err != nil { zap.L().Error("OpenPointInTime failed", zap.Error(err)) return err @@ -121,34 +135,35 @@ func (export StreamedExport) ProcessStreamedExport(ctx context.Context, elasticC } }() - searchRequest.Pit = &types.PointInTimeReference{Id: pit.Id, KeepAlive: "5m"} - searchRequest.SearchAfter = []types.FieldValue{} - searchRequest.Sort = append(searchRequest.Sort, "_shard_doc") + params.SearchRequest.Pit = &types.PointInTimeReference{Id: pit.Id, KeepAlive: "5m"} + params.SearchRequest.SearchAfter = []types.FieldValue{} + params.SearchRequest.Sort = append(params.SearchRequest.Sort, "_shard_doc") // searchRequest.TrackTotalHits = false // Speeds up pagination (maybe impl?) processed := int64(0) - hasLimit := limit > 0 + hasLimit := params.Limit > 0 var size int for { - if hasLimit && processed >= limit { + if hasLimit && processed >= params.Limit { break } - if !hasLimit || limit-processed > 10000 { + if !hasLimit || params.Limit-processed > 10000 { size = 10000 } else { - size = int(limit - processed) + size = int(params.Limit - processed) } response, err := cli.Search(). - Request(searchRequest). + Request(params.SearchRequest). Size(size). + IgnoreUnavailable(params.IgnoreUnavailable). + AllowNoIndices(params.AllowNoIndices). Do(context.Background()) if err != nil { zap.L().Error("ES Search failed", zap.Error(err)) - // TODO: maybe close PIT (defer close function?) return err } @@ -168,7 +183,7 @@ func (export StreamedExport) ProcessStreamedExport(ctx context.Context, elasticC } // Handle SearchAfter to paginate - searchRequest.SearchAfter = response.Hits.Hits[hitsLen-1].Sort + params.SearchRequest.SearchAfter = response.Hits.Hits[hitsLen-1].Sort // if ctx was cancelled, stop data pulling select { diff --git a/internals/export/worker.go b/internals/export/worker.go index 3cafd4c..2b7e66d 100644 --- a/internals/export/worker.go +++ b/internals/export/worker.go @@ -162,7 +162,15 @@ func (e *ExportWorker) Start(item WrapperItem, ctx context.Context) { defer close(streamedExport.Data) if item.Custom { - writerErr = streamedExport.ProcessStreamedExport(ctx, item.ElasticName, item.Indices, item.SearchRequest, item.Params.Limit) + params := ElasticParams{ + Indices: item.Indices, + Limit: item.Params.Limit, + Client: item.ElasticName, + SearchRequest: item.SearchRequest, + IgnoreUnavailable: item.IgnoreUnavailable, + AllowNoIndices: item.AllowNoIndices, + } + writerErr = streamedExport.ProcessStreamedExport(ctx, params) } else { for _, f := range item.Facts { writerErr = streamedExport.StreamedExportFactHitsFull(ctx, f, item.Params.Limit, item.FactParameters) diff --git a/internals/export/wrapper.go b/internals/export/wrapper.go index 1ba283f..2d4d896 100644 --- a/internals/export/wrapper.go +++ b/internals/export/wrapper.go @@ -55,10 +55,12 @@ type WrapperItem struct { FactParameters map[string]string `json:"factParameters"` // For custom export requests - Custom bool `json:"custom"` - ElasticName string `json:"elasticName"` - Indices string `json:"-"` - SearchRequest *search.Request `json:"-"` + Custom bool `json:"custom"` + ElasticName string `json:"elasticName"` + Indices string `json:"-"` + SearchRequest *search.Request `json:"-"` + AllowNoIndices bool `json:"-"` + IgnoreUnavailable bool `json:"-"` } type Wrapper struct { @@ -231,7 +233,8 @@ func (ew *Wrapper) AddToQueue(facts []engine.Fact, title string, params CSVParam // AddToQueueCustom Adds a new export with a custom elastic connection and a custom search request // // addHashPrefix adds a hash as prefix to resulting files, to avoid duplicates -func (ew *Wrapper) AddToQueueCustom(elasticName string, request *search.Request, indices, title string, params CSVParameters, user users.User, addHashPrefix bool) (*WrapperItem, int) { +func (ew *Wrapper) AddToQueueCustom(elasticName string, request *search.Request, indices, title string, params CSVParameters, + user users.User, addHashPrefix, ignoreUnavailable, allowNoIndices bool) (*WrapperItem, int) { ew.queueMutex.Lock() defer ew.queueMutex.Unlock() @@ -260,6 +263,8 @@ func (ew *Wrapper) AddToQueueCustom(elasticName string, request *search.Request, item.ElasticName = elasticName item.SearchRequest = request item.Indices = indices + item.IgnoreUnavailable = ignoreUnavailable + item.AllowNoIndices = allowNoIndices ew.queue = append(ew.queue, item) return item, CodeAdded } diff --git a/internals/handlers/export_handlers.go b/internals/handlers/export_handlers.go index 67dc070..4e8534e 100644 --- a/internals/handlers/export_handlers.go +++ b/internals/handlers/export_handlers.go @@ -45,10 +45,12 @@ type ExportRequest struct { // CustomExportRequest represents a request for an custom export type CustomExportRequest struct { export.CSVParameters - Title string `json:"title"` - Indices string `json:"indices"` - SearchRequest search.Request `json:"searchRequest"` - ElasticName string `json:"elasticName"` + Title string `json:"title"` + Indices string `json:"indices"` + SearchRequest search.Request `json:"searchRequest"` + ElasticName string `json:"elasticName"` + IgnoreUnavailableIndices bool `json:"ignoreUnavailableIndices"` + AllowNoIndices bool `json:"allowNoIndices"` } // ExportFactStreamed godoc From f1b2dd507f8bdeac21203ade9ccaa5cf814fe8a3 Mon Sep 17 00:00:00 2001 From: SchawnnDev Date: Mon, 3 Jun 2024 15:54:44 +0200 Subject: [PATCH 2/2] Added missing params to export handler & fixed tests --- internals/export/wrapper.go | 13 ++++++------- internals/export/wrapper_test.go | 9 +++++---- internals/handlers/export_handlers.go | 11 +++++++++-- 3 files changed, 20 insertions(+), 13 deletions(-) diff --git a/internals/export/wrapper.go b/internals/export/wrapper.go index 2d4d896..297a3d2 100644 --- a/internals/export/wrapper.go +++ b/internals/export/wrapper.go @@ -233,8 +233,7 @@ func (ew *Wrapper) AddToQueue(facts []engine.Fact, title string, params CSVParam // AddToQueueCustom Adds a new export with a custom elastic connection and a custom search request // // addHashPrefix adds a hash as prefix to resulting files, to avoid duplicates -func (ew *Wrapper) AddToQueueCustom(elasticName string, request *search.Request, indices, title string, params CSVParameters, - user users.User, addHashPrefix, ignoreUnavailable, allowNoIndices bool) (*WrapperItem, int) { +func (ew *Wrapper) AddToQueueCustom(title string, esParams ElasticParams, params CSVParameters, user users.User, addHashPrefix bool) (*WrapperItem, int) { ew.queueMutex.Lock() defer ew.queueMutex.Unlock() @@ -260,11 +259,11 @@ func (ew *Wrapper) AddToQueueCustom(elasticName string, request *search.Request, item := NewWrapperItem([]engine.Fact{}, title, params, user, map[string]string{}, addHashPrefix) item.Custom = true - item.ElasticName = elasticName - item.SearchRequest = request - item.Indices = indices - item.IgnoreUnavailable = ignoreUnavailable - item.AllowNoIndices = allowNoIndices + item.ElasticName = esParams.Client + item.SearchRequest = esParams.SearchRequest + item.Indices = esParams.Indices + item.IgnoreUnavailable = esParams.IgnoreUnavailable + item.AllowNoIndices = esParams.AllowNoIndices ew.queue = append(ew.queue, item) return item, CodeAdded } diff --git a/internals/export/wrapper_test.go b/internals/export/wrapper_test.go index 3d01633..e030d63 100644 --- a/internals/export/wrapper_test.go +++ b/internals/export/wrapper_test.go @@ -91,14 +91,15 @@ func TestAddToQueueCustom(t *testing.T) { wrapper := NewWrapper("/tmp", 1, 1, 1) user1 := users.User{Login: "bla"} user2 := users.User{Login: "blabla"} + esParams := ElasticParams{} csvParams := CSVParameters{} - _, result := wrapper.AddToQueueCustom("", nil, "", "test.txt", csvParams, user1, false) + _, result := wrapper.AddToQueueCustom("test.txt", esParams, csvParams, user1, false) expression.AssertEqual(t, result, CodeAdded, "AddToQueueCustom should return CodeAdded") - _, result = wrapper.AddToQueueCustom("", nil, "", "test.txt", csvParams, user1, false) + _, result = wrapper.AddToQueueCustom("test.txt", esParams, csvParams, user1, false) expression.AssertEqual(t, result, CodeUserExists, "AddToQueueCustom should return CodeUserExists") - _, result = wrapper.AddToQueueCustom("", nil, "", "test.txt", csvParams, user2, false) + _, result = wrapper.AddToQueueCustom("test.txt", esParams, csvParams, user2, false) expression.AssertEqual(t, result, CodeUserAdded, "AddToQueueCustom should return CodeUserAdded") - _, result = wrapper.AddToQueueCustom("", nil, "", "test2.txt", csvParams, user2, false) + _, result = wrapper.AddToQueueCustom("test2.txt", esParams, csvParams, user2, false) expression.AssertEqual(t, result, CodeQueueFull, "AddToQueueCustom should return CodeQueueFull") } diff --git a/internals/handlers/export_handlers.go b/internals/handlers/export_handlers.go index 4e8534e..e335f6c 100644 --- a/internals/handlers/export_handlers.go +++ b/internals/handlers/export_handlers.go @@ -445,8 +445,15 @@ func (e *ExportHandler) ExportCustom(w http.ResponseWriter, r *http.Request) { return } - item, status := e.exportWrapper.AddToQueueCustom(elastic.Name, &request.SearchRequest, - request.Indices, request.Title, request.CSVParameters, userCtx.User, true) + params := export.ElasticParams{ + Indices: request.Indices, + Limit: request.Limit, + Client: elastic.Name, + SearchRequest: &request.SearchRequest, + IgnoreUnavailable: request.IgnoreUnavailableIndices, + AllowNoIndices: request.AllowNoIndices, + } + item, status := e.exportWrapper.AddToQueueCustom(request.Title, params, request.CSVParameters, userCtx.User, true) e.handleAddToQueueResponse(w, r, status, item) }