Skip to content

Commit

Permalink
[Fleet] Ignore not yet created .fleet-policies index (#3218)
Browse files Browse the repository at this point in the history
  • Loading branch information
nchaulet authored Jan 11, 2024
1 parent 4b398a7 commit 75594ac
Show file tree
Hide file tree
Showing 4 changed files with 43 additions and 4 deletions.
23 changes: 23 additions & 0 deletions internal/pkg/bulk/bulk_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,29 @@ func TestBulkSearch(t *testing.T) {
}
}

func TestBulkSearchWithIgnoreUnavailable(t *testing.T) {
ctx, cn := context.WithCancel(context.Background())
defer cn()
ctx = testlog.SetLogger(t).WithContext(ctx)

_, bulker := SetupIndexWithBulk(ctx, t, testPolicy)

// Search
dsl := fmt.Sprintf(`{"query": { "term": {"kwval": "%s"}}}`, "random")

res, err := bulker.Search(ctx, ".fleet-policies-do-not-exists-yet", []byte(dsl), WithIgnoreUnavailble())

if err != nil {
t.Fatal(err)
}
if len(res.Hits) != 0 {
t.Fatalf("hit mismatch: %d", len(res.Hits))
}
if res == nil {
t.Fatal(nil)
}
}

func TestBulkDelete(t *testing.T) {
ctx, cn := context.WithCancel(context.Background())
defer cn()
Expand Down
12 changes: 10 additions & 2 deletions internal/pkg/bulk/opSearch.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func (b *Bulker) Search(ctx context.Context, index string, body []byte, opts ...
const kSlop = 64
blk.buf.Grow(len(body) + kSlop)

if err := b.writeMsearchMeta(&blk.buf, index, opt.Indices, opt.WaitForCheckpoints); err != nil {
if err := b.writeMsearchMeta(&blk.buf, index, opt.Indices, opt.WaitForCheckpoints, opt.IgnoreUnavailable); err != nil {
return nil, err
}

Expand All @@ -60,7 +60,7 @@ func (b *Bulker) Search(ctx context.Context, index string, body []byte, opts ...
return &es.ResultT{HitsT: r.Hits, Aggregations: r.Aggregations}, nil
}

func (b *Bulker) writeMsearchMeta(buf *Buf, index string, moreIndices []string, checkpoints []int64) error {
func (b *Bulker) writeMsearchMeta(buf *Buf, index string, moreIndices []string, checkpoints []int64, ignoreUnavailble bool) error {
if err := b.validateIndex(index); err != nil {
return err
}
Expand Down Expand Up @@ -91,6 +91,14 @@ func (b *Bulker) writeMsearchMeta(buf *Buf, index string, moreIndices []string,
needComma = false
}

if ignoreUnavailble {
if needComma {
_, _ = buf.WriteString(`,`)
}
_, _ = buf.WriteString(`"ignore_unavailable": true`)
needComma = true
}

if len(checkpoints) > 0 {
if needComma {
_, _ = buf.WriteString(`,`)
Expand Down
7 changes: 7 additions & 0 deletions internal/pkg/bulk/opt.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ type optionsT struct {
RetryOnConflict string
Indices []string
WaitForCheckpoints []int64
IgnoreUnavailable bool
spanLink *apm.SpanLink
}

Expand All @@ -35,6 +36,12 @@ func WithRefresh() Opt {
}
}

func WithIgnoreUnavailble() Opt {
return func(opt *optionsT) {
opt.IgnoreUnavailable = true
}
}

func WithRetryOnConflict(n int) Opt {
return func(opt *optionsT) {
opt.RetryOnConflict = strconv.Itoa(n)
Expand Down
5 changes: 3 additions & 2 deletions internal/pkg/dl/policies.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,14 +39,15 @@ func prepareQueryLatestPolicies() []byte {
// QueryLatestPolicies gets the latest revision for a policy
func QueryLatestPolicies(ctx context.Context, bulker bulk.Bulk, opt ...Option) ([]model.Policy, error) {
o := newOption(FleetPolicies, opt...)
res, err := bulker.Search(ctx, o.indexName, tmplQueryLatestPolicies)
res, err := bulker.Search(ctx, o.indexName, tmplQueryLatestPolicies, bulk.WithIgnoreUnavailble())
if err != nil {
return nil, err
}

policyID, ok := res.Aggregations[FieldPolicyID]
if !ok {
return nil, ErrMissingAggregations
// Aggregation will not be here if there index is not available
return []model.Policy{}, nil
}
if len(policyID.Buckets) == 0 {
return []model.Policy{}, nil
Expand Down

0 comments on commit 75594ac

Please sign in to comment.