Skip to content

Commit

Permalink
Change refresh=true to refresh=wait_for
Browse files Browse the repository at this point in the history
  • Loading branch information
michel-laterman committed Jul 18, 2023
1 parent 0314cf9 commit 2347397
Show file tree
Hide file tree
Showing 7 changed files with 40 additions and 14 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
# Kind can be one of:
# - breaking-change: a change to previously-documented behavior
# - deprecation: functionality that is being removed in a later release
# - bug-fix: fixes a problem in a previous version
# - enhancement: extends functionality but does not break or fix existing behavior
# - feature: new functionality
# - known-issue: problems that we are aware of in a given version
# - security: impacts on the security of a product or a user’s deployment.
# - upgrade: important information for someone upgrading from a prior version
# - other: does not fit into any of the other categories
kind: feature

# Change summary; a 80ish characters long description of the change.
summary: Change refresh=true to refresh=wait_for

# Long description; in case the summary is not enough to describe the change
# this field accommodate a description without length limits.
# NOTE: This field will be rendered only for breaking-change and known-issue kinds at the moment.
description: Change the refresh param from "true" to "wait_for" to better support upcoming serverless offering. Remove refresh param from mget requests.

# Affected component; a word indicating the component this changeset affects.
component:

# PR URL; optional; the PR number that added the changeset.
# If not present is automatically filled by the tooling finding the PR where this changelog fragment has been added.
# NOTE: the tooling supports backports, so it's able to fill the original PR number instead of the backport PR number.
# Please provide it if you are adding a fragment for a different PR.
#pr: https://github.com/owner/repo/1234

# Issue URL; optional; the GitHub issue related to this changeset (either closes or is part of).
# If not present is automatically filled by the tooling with the issue linked to the PR number.
issue: 2581
2 changes: 2 additions & 0 deletions internal/pkg/apikey/create.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ import (
)

// Create generates a new APIKey in Elasticsearch using the given client.
//
// NOTE this is always invoked with refresh="false"
func Create(ctx context.Context, client *elasticsearch.Client, name, ttl, refresh string, roles []byte, meta interface{}) (*APIKey, error) {
payload := struct {
Name string `json:"name,omitempty"`
Expand Down
6 changes: 3 additions & 3 deletions internal/pkg/bulk/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -392,7 +392,7 @@ func (b *Bulker) dispatch(ctx context.Context, blk *bulkT) respT {
Err(ctx.Err()).
Str("mod", kModBulk).
Str("action", blk.action.String()).
Bool("refresh", blk.flags.Has(flagRefresh)).
Bool("refresh", blk.flags.Has(flagRefresh)). // TODO change refresh to string and fix mapping
Dur("rtt", time.Since(start)).
Msg("Dispatch abort queue")
return respT{err: ctx.Err()}
Expand All @@ -405,7 +405,7 @@ func (b *Bulker) dispatch(ctx context.Context, blk *bulkT) respT {
Err(resp.err).
Str("mod", kModBulk).
Str("action", blk.action.String()).
Bool("refresh", blk.flags.Has(flagRefresh)).
Bool("refresh", blk.flags.Has(flagRefresh)). // TODO change refresh to string and fix mapping
Dur("rtt", time.Since(start)).
Msg("Dispatch OK")

Expand All @@ -415,7 +415,7 @@ func (b *Bulker) dispatch(ctx context.Context, blk *bulkT) respT {
Err(ctx.Err()).
Str("mod", kModBulk).
Str("action", blk.action.String()).
Bool("refresh", blk.flags.Has(flagRefresh)).
Bool("refresh", blk.flags.Has(flagRefresh)). // TODO change refresh to string and fix mapping
Dur("rtt", time.Since(start)).
Msg("Dispatch abort response")
}
Expand Down
4 changes: 2 additions & 2 deletions internal/pkg/bulk/opBulk.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ func (b *Bulker) flushBulk(ctx context.Context, queue queueT) error {
}

if queue.ty == kQueueRefreshBulk {
req.Refresh = "true"
req.Refresh = "wait_for"
}

res, err := req.Do(ctx, b.es)
Expand Down Expand Up @@ -238,7 +238,7 @@ func (b *Bulker) flushBulk(ctx context.Context, queue queueT) error {

log.Trace().
Err(err).
Bool("refresh", queue.ty == kQueueRefreshBulk).
Bool("refresh", queue.ty == kQueueRefreshBulk). // TODO we may want to switch this to string, but we will need to ensure that the attribute mapping is correct
Str("mod", kModBulk).
Int("took", blk.Took).
Dur("rtt", time.Since(start)).
Expand Down
7 changes: 0 additions & 7 deletions internal/pkg/bulk/opRead.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,12 +80,6 @@ func (b *Bulker) flushRead(ctx context.Context, queue queueT) error {
Body: bytes.NewReader(payload),
}

var refresh bool
if queue.ty == kQueueRefreshRead {
refresh = true
req.Refresh = &refresh
}

res, err := req.Do(ctx, b.es)

if err != nil {
Expand Down Expand Up @@ -121,7 +115,6 @@ func (b *Bulker) flushRead(ctx context.Context, queue queueT) error {

log.Trace().
Err(err).
Bool("refresh", refresh).
Str("mod", kModBulk).
Dur("rtt", time.Since(start)).
Int("cnt", len(blk.Items)).
Expand Down
1 change: 0 additions & 1 deletion internal/pkg/dl/migration.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,6 @@ func applyMigration(ctx context.Context, name string, index string, bulker bulk.
opts := []func(*esapi.UpdateByQueryRequest){
client.UpdateByQuery.WithBody(reader),
client.UpdateByQuery.WithContext(ctx),
client.UpdateByQuery.WithRefresh(true),
client.UpdateByQuery.WithConflicts("proceed"),
}

Expand Down
2 changes: 1 addition & 1 deletion internal/pkg/file/uploader/es.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ func IndexChunk(ctx context.Context, client *elasticsearch.Client, body *cbor.Ch
}
req.Header.Set("Content-Type", "application/cbor")
req.Header.Set("Accept", "application/json")
req.Refresh = "true"
req.Refresh = "wait_for"
})
if err != nil {
return err
Expand Down

0 comments on commit 2347397

Please sign in to comment.