Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added ignore_available & allow_no_indices es options to export #188

Merged
merged 2 commits into from
Jun 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 32 additions & 17 deletions internals/export/elasticsearch.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,15 @@ type StreamedExport struct {
Data chan []reader.Hit
}

type ElasticParams struct {
Indices string
Client string
Limit int64
SearchRequest *search.Request
IgnoreUnavailable bool
AllowNoIndices bool
}

// NewStreamedExport returns a pointer to a new StreamedExport instance
// One instance per StreamedExport since the channel Data will be closed after export is finished
func NewStreamedExport() *StreamedExport {
Expand Down Expand Up @@ -62,25 +71,30 @@ func (export StreamedExport) StreamedExportFactHitsFull(ctx context.Context, f e
indices := fact.FindIndices(f, ti, false)
indicesStr := strings.Join(indices, ",")

return export.ProcessStreamedExport(ctx, "", indicesStr, searchRequest, limit)
params := ElasticParams{
Indices: indicesStr,
Limit: limit,
SearchRequest: searchRequest,
}
return export.ProcessStreamedExport(ctx, params)
}

// ProcessStreamedExport export data from ElasticSearch to a channel
// using a given elastic client, request and indices to query
// Please note that the channel is not closed when this function is executed
func (export StreamedExport) ProcessStreamedExport(ctx context.Context, elasticClient, indicesStr string, searchRequest *search.Request, limit int64) error {
func (export StreamedExport) ProcessStreamedExport(ctx context.Context, params ElasticParams) error {
cli := elasticsearch.C() // defaults to singleton instance

if elasticClient != "" {
config, b, err := esconfig.R().GetByName(elasticClient)
if params.Client != "" {
config, b, err := esconfig.R().GetByName(params.Client)
if err != nil {
zap.L().Error("Error when getting esconfig from repository",
zap.String("elasticClient", elasticClient), zap.Error(err))
zap.String("elasticClient", params.Client), zap.Error(err))
return err
}
if !b {
zap.L().Error("Error when getting esconfig from repository",
zap.String("elasticClient", elasticClient), zap.Error(err))
zap.String("elasticClient", params.Client), zap.Error(err))
return errors.New("selected elasticClient not exists")
}
if !config.ExportActivated {
Expand All @@ -102,7 +116,7 @@ func (export StreamedExport) ProcessStreamedExport(ctx context.Context, elasticC
}

// handle pit creation
pit, err := cli.OpenPointInTime(indicesStr).KeepAlive("5m").Do(context.Background())
pit, err := cli.OpenPointInTime(params.Indices).KeepAlive("5m").Do(context.Background())
if err != nil {
zap.L().Error("OpenPointInTime failed", zap.Error(err))
return err
Expand All @@ -121,34 +135,35 @@ func (export StreamedExport) ProcessStreamedExport(ctx context.Context, elasticC
}
}()

searchRequest.Pit = &types.PointInTimeReference{Id: pit.Id, KeepAlive: "5m"}
searchRequest.SearchAfter = []types.FieldValue{}
searchRequest.Sort = append(searchRequest.Sort, "_shard_doc")
params.SearchRequest.Pit = &types.PointInTimeReference{Id: pit.Id, KeepAlive: "5m"}
params.SearchRequest.SearchAfter = []types.FieldValue{}
params.SearchRequest.Sort = append(params.SearchRequest.Sort, "_shard_doc")
// searchRequest.TrackTotalHits = false // Speeds up pagination (maybe impl?)

processed := int64(0)
hasLimit := limit > 0
hasLimit := params.Limit > 0
var size int

for {

if hasLimit && processed >= limit {
if hasLimit && processed >= params.Limit {
break
}

if !hasLimit || limit-processed > 10000 {
if !hasLimit || params.Limit-processed > 10000 {
size = 10000
} else {
size = int(limit - processed)
size = int(params.Limit - processed)
}

response, err := cli.Search().
Request(searchRequest).
Request(params.SearchRequest).
Size(size).
IgnoreUnavailable(params.IgnoreUnavailable).
AllowNoIndices(params.AllowNoIndices).
Do(context.Background())
if err != nil {
zap.L().Error("ES Search failed", zap.Error(err))
// TODO: maybe close PIT (defer close function?)
return err
}

Expand All @@ -168,7 +183,7 @@ func (export StreamedExport) ProcessStreamedExport(ctx context.Context, elasticC
}

// Handle SearchAfter to paginate
searchRequest.SearchAfter = response.Hits.Hits[hitsLen-1].Sort
params.SearchRequest.SearchAfter = response.Hits.Hits[hitsLen-1].Sort

// if ctx was cancelled, stop data pulling
select {
Expand Down
10 changes: 9 additions & 1 deletion internals/export/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,15 @@ func (e *ExportWorker) Start(item WrapperItem, ctx context.Context) {
defer close(streamedExport.Data)

if item.Custom {
writerErr = streamedExport.ProcessStreamedExport(ctx, item.ElasticName, item.Indices, item.SearchRequest, item.Params.Limit)
params := ElasticParams{
Indices: item.Indices,
Limit: item.Params.Limit,
Client: item.ElasticName,
SearchRequest: item.SearchRequest,
IgnoreUnavailable: item.IgnoreUnavailable,
AllowNoIndices: item.AllowNoIndices,
}
writerErr = streamedExport.ProcessStreamedExport(ctx, params)
} else {
for _, f := range item.Facts {
writerErr = streamedExport.StreamedExportFactHitsFull(ctx, f, item.Params.Limit, item.FactParameters)
Expand Down
20 changes: 12 additions & 8 deletions internals/export/wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,10 +55,12 @@ type WrapperItem struct {
FactParameters map[string]string `json:"factParameters"`

// For custom export requests
Custom bool `json:"custom"`
ElasticName string `json:"elasticName"`
Indices string `json:"-"`
SearchRequest *search.Request `json:"-"`
Custom bool `json:"custom"`
ElasticName string `json:"elasticName"`
Indices string `json:"-"`
SearchRequest *search.Request `json:"-"`
AllowNoIndices bool `json:"-"`
IgnoreUnavailable bool `json:"-"`
}

type Wrapper struct {
Expand Down Expand Up @@ -231,7 +233,7 @@ func (ew *Wrapper) AddToQueue(facts []engine.Fact, title string, params CSVParam
// AddToQueueCustom Adds a new export with a custom elastic connection and a custom search request
//
// addHashPrefix adds a hash as prefix to resulting files, to avoid duplicates
func (ew *Wrapper) AddToQueueCustom(elasticName string, request *search.Request, indices, title string, params CSVParameters, user users.User, addHashPrefix bool) (*WrapperItem, int) {
func (ew *Wrapper) AddToQueueCustom(title string, esParams ElasticParams, params CSVParameters, user users.User, addHashPrefix bool) (*WrapperItem, int) {
ew.queueMutex.Lock()
defer ew.queueMutex.Unlock()

Expand All @@ -257,9 +259,11 @@ func (ew *Wrapper) AddToQueueCustom(elasticName string, request *search.Request,

item := NewWrapperItem([]engine.Fact{}, title, params, user, map[string]string{}, addHashPrefix)
item.Custom = true
item.ElasticName = elasticName
item.SearchRequest = request
item.Indices = indices
item.ElasticName = esParams.Client
item.SearchRequest = esParams.SearchRequest
item.Indices = esParams.Indices
item.IgnoreUnavailable = esParams.IgnoreUnavailable
item.AllowNoIndices = esParams.AllowNoIndices
ew.queue = append(ew.queue, item)
return item, CodeAdded
}
Expand Down
9 changes: 5 additions & 4 deletions internals/export/wrapper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,14 +91,15 @@ func TestAddToQueueCustom(t *testing.T) {
wrapper := NewWrapper("/tmp", 1, 1, 1)
user1 := users.User{Login: "bla"}
user2 := users.User{Login: "blabla"}
esParams := ElasticParams{}
csvParams := CSVParameters{}
_, result := wrapper.AddToQueueCustom("", nil, "", "test.txt", csvParams, user1, false)
_, result := wrapper.AddToQueueCustom("test.txt", esParams, csvParams, user1, false)
expression.AssertEqual(t, result, CodeAdded, "AddToQueueCustom should return CodeAdded")
_, result = wrapper.AddToQueueCustom("", nil, "", "test.txt", csvParams, user1, false)
_, result = wrapper.AddToQueueCustom("test.txt", esParams, csvParams, user1, false)
expression.AssertEqual(t, result, CodeUserExists, "AddToQueueCustom should return CodeUserExists")
_, result = wrapper.AddToQueueCustom("", nil, "", "test.txt", csvParams, user2, false)
_, result = wrapper.AddToQueueCustom("test.txt", esParams, csvParams, user2, false)
expression.AssertEqual(t, result, CodeUserAdded, "AddToQueueCustom should return CodeUserAdded")
_, result = wrapper.AddToQueueCustom("", nil, "", "test2.txt", csvParams, user2, false)
_, result = wrapper.AddToQueueCustom("test2.txt", esParams, csvParams, user2, false)
expression.AssertEqual(t, result, CodeQueueFull, "AddToQueueCustom should return CodeQueueFull")
}

Expand Down
21 changes: 15 additions & 6 deletions internals/handlers/export_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,12 @@ type ExportRequest struct {
// CustomExportRequest represents a request for an custom export
type CustomExportRequest struct {
export.CSVParameters
Title string `json:"title"`
Indices string `json:"indices"`
SearchRequest search.Request `json:"searchRequest"`
ElasticName string `json:"elasticName"`
Title string `json:"title"`
Indices string `json:"indices"`
SearchRequest search.Request `json:"searchRequest"`
ElasticName string `json:"elasticName"`
IgnoreUnavailableIndices bool `json:"ignoreUnavailableIndices"`
AllowNoIndices bool `json:"allowNoIndices"`
}

// ExportFactStreamed godoc
Expand Down Expand Up @@ -443,8 +445,15 @@ func (e *ExportHandler) ExportCustom(w http.ResponseWriter, r *http.Request) {
return
}

item, status := e.exportWrapper.AddToQueueCustom(elastic.Name, &request.SearchRequest,
request.Indices, request.Title, request.CSVParameters, userCtx.User, true)
params := export.ElasticParams{
Indices: request.Indices,
Limit: request.Limit,
Client: elastic.Name,
SearchRequest: &request.SearchRequest,
IgnoreUnavailable: request.IgnoreUnavailableIndices,
AllowNoIndices: request.AllowNoIndices,
}
item, status := e.exportWrapper.AddToQueueCustom(request.Title, params, request.CSVParameters, userCtx.User, true)

e.handleAddToQueueResponse(w, r, status, item)
}
Expand Down
Loading