Skip to content

Commit

Permalink
Add refresh param config
Browse files Browse the repository at this point in the history
  • Loading branch information
michel-laterman committed Jul 19, 2023
1 parent 2347397 commit d45babd
Show file tree
Hide file tree
Showing 10 changed files with 56 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,12 @@
kind: feature

# Change summary; a 80ish characters long description of the change.
summary: Change refresh=true to refresh=wait_for
summary: Add refresh param config

# Long description; in case the summary is not enough to describe the change
# this field accommodate a description without length limits.
# NOTE: This field will be rendered only for breaking-change and known-issue kinds at the moment.
description: Change the refresh param from "true" to "wait_for" to better support upcoming serverless offering. Remove refresh param from mget requests.
description: Add a parameter that allows for the change of the refresh param from "true" to "wait_for" that effects file upload and bulk creation calls.

# Affected component; a word indicating the component this changeset affects.
component:
Expand Down
1 change: 1 addition & 0 deletions fleet-server.reference.yml
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,7 @@ fleet:
# flush_threshold_cnt: 2048
# flush_threshold_size: 1048567 # 1MiB
# flush_max_pending: 8
# refresh: "true" # The refresh param, must be one of [true, false, wasit_for], defaults to "true"
#
# # gc controls fleet-server index garbage collection operations
# # currently manages actions cleanup
Expand Down
28 changes: 15 additions & 13 deletions internal/pkg/api/handleUpload.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,13 @@ var (

// FIXME Should we use the structs in openapi.gen.go instead of the generic ones? Will need to rework the uploader if we do
type UploadT struct {
bulker bulk.Bulk
chunkClient *elasticsearch.Client
cache cache.Cache
uploader *uploader.Uploader
authAgent func(*http.Request, *string, bulk.Bulk, cache.Cache) (*model.Agent, error) // injectable for testing purposes
authAPIKey func(*http.Request, bulk.Bulk, cache.Cache) (*apikey.APIKey, error) // as above
bulker bulk.Bulk
chunkClient *elasticsearch.Client
cache cache.Cache
uploader *uploader.Uploader
authAgent func(*http.Request, *string, bulk.Bulk, cache.Cache) (*model.Agent, error) // injectable for testing purposes
authAPIKey func(*http.Request, bulk.Bulk, cache.Cache) (*apikey.APIKey, error) // as above
refreshParam string
}

func NewUploadT(cfg *config.Server, bulker bulk.Bulk, chunkClient *elasticsearch.Client, cache cache.Cache) *UploadT {
Expand All @@ -59,12 +60,13 @@ func NewUploadT(cfg *config.Server, bulker bulk.Bulk, chunkClient *elasticsearch
Msg("upload limits")

return &UploadT{
chunkClient: chunkClient,
bulker: bulker,
cache: cache,
uploader: uploader.New(chunkClient, bulker, cache, maxFileSize, maxUploadTimer),
authAgent: authAgent,
authAPIKey: authAPIKey,
chunkClient: chunkClient,
bulker: bulker,
cache: cache,
uploader: uploader.New(chunkClient, bulker, cache, maxFileSize, maxUploadTimer),
authAgent: authAgent,
authAPIKey: authAPIKey,
refreshParam: string(cfg.Bulk.Refresh),
}
}

Expand Down Expand Up @@ -126,7 +128,7 @@ func (ut *UploadT) handleUploadChunk(zlog zerolog.Logger, w http.ResponseWriter,
copier := io.TeeReader(data, hash)

ce := cbor.NewChunkWriter(copier, chunkInfo.Last, chunkInfo.BID, chunkInfo.SHA2, upinfo.ChunkSize)
if err := uploader.IndexChunk(r.Context(), ut.chunkClient, ce, upinfo.Source, chunkInfo.BID, chunkInfo.Pos); err != nil {
if err := uploader.IndexChunk(r.Context(), ut.chunkClient, ce, upinfo.Source, chunkInfo.BID, chunkInfo.Pos, ut.refreshParam); err != nil {
return err
}

Expand Down
1 change: 1 addition & 0 deletions internal/pkg/bulk/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ const (
defaultBlockQueueSz = 32 // Small capacity to allow multiOp to spin fast
defaultAPIKeyMaxParallel = 32
defaultApikeyMaxReqSize = 100 * 1024 * 1024
defaultRefreshParam = "true"
)

func NewBulker(es esapi.Transport, tracer *apm.Tracer, opts ...BulkOpt) *Bulker {
Expand Down
2 changes: 1 addition & 1 deletion internal/pkg/bulk/opBulk.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ func (b *Bulker) flushBulk(ctx context.Context, queue queueT) error {
}

if queue.ty == kQueueRefreshBulk {
req.Refresh = "wait_for"
req.Refresh = b.opts.refreshParam
}

res, err := req.Do(ctx, b.es)
Expand Down
8 changes: 8 additions & 0 deletions internal/pkg/bulk/opRead.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,13 @@ func (b *Bulker) flushRead(ctx context.Context, queue queueT) error {
Body: bytes.NewReader(payload),
}

// FIXME change refresh into a string param once mget supports strings.
var refresh bool
if queue.ty == kQueueRefreshRead {
refresh = true
req.Refresh = &refresh
}

res, err := req.Do(ctx, b.es)

if err != nil {
Expand Down Expand Up @@ -115,6 +122,7 @@ func (b *Bulker) flushRead(ctx context.Context, queue queueT) error {

log.Trace().
Err(err).
Bool("refresh", refresh).
Str("mod", kModBulk).
Dur("rtt", time.Since(start)).
Int("cnt", len(blk.Items)).
Expand Down
9 changes: 9 additions & 0 deletions internal/pkg/bulk/opt.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ type bulkOptT struct {
blockQueueSz int
apikeyMaxParallel int
apikeyMaxReqSize int
refreshParam string
}

type BulkOpt func(*bulkOptT)
Expand Down Expand Up @@ -118,6 +119,12 @@ func WithAPIKeyMaxRequestSize(maxBytes int) BulkOpt {
}
}

func WithRefreshParam(refresh config.Refresh) BulkOpt {
return func(opt *bulkOptT) {
opt.refreshParam = string(refresh)
}
}

func parseBulkOpts(opts ...BulkOpt) bulkOptT {
bopt := bulkOptT{
flushInterval: defaultFlushInterval,
Expand All @@ -127,6 +134,7 @@ func parseBulkOpts(opts ...BulkOpt) bulkOptT {
apikeyMaxParallel: defaultAPIKeyMaxParallel,
blockQueueSz: defaultBlockQueueSz,
apikeyMaxReqSize: defaultApikeyMaxReqSize,
refreshParam: defaultRefreshParam,
}

for _, f := range opts {
Expand Down Expand Up @@ -165,5 +173,6 @@ func BulkOptsFromCfg(cfg *config.Config) []BulkOpt {
WithMaxPending(bulkCfg.FlushMaxPending),
WithAPIKeyMaxParallel(maxKeyParallel),
WithAPIKeyMaxRequestSize(cfg.Output.Elasticsearch.MaxContentLength),
WithRefreshParam(bulkCfg.Refresh),
}
}
16 changes: 16 additions & 0 deletions internal/pkg/config/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,18 +42,34 @@ type ServerTLS struct {
Cert string `config:"cert"`
}

// Refresh represents the refresh parameter used in Elasticsearch requests
type Refresh string

// Possible values of the refresh param as defined by [Elasticsearch Docs].
// Note that not all operations support all values.
// For example mget requests still define the refresh param as a boolean.
//
// [Elasticsearch Docs]: https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-refresh.html
const (
RefreshTrue = "true"
RefreshFalse = "false"
RefreshWaitFor = "wait_for"
)

type ServerBulk struct {
FlushInterval time.Duration `config:"flush_interval"`
FlushThresholdCount int `config:"flush_threshold_cnt"`
FlushThresholdSize int `config:"flush_threshold_size"`
FlushMaxPending int `config:"flush_max_pending"`
Refresh Refresh `config:"refresh"`
}

func (c *ServerBulk) InitDefaults() {
c.FlushInterval = 250 * time.Millisecond
c.FlushThresholdCount = 2048
c.FlushThresholdSize = 1024 * 1024
c.FlushMaxPending = 8
c.Refresh = RefreshTrue
}

// Server is the configuration for the server
Expand Down
1 change: 1 addition & 0 deletions internal/pkg/dl/migration.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ func applyMigration(ctx context.Context, name string, index string, bulker bulk.
opts := []func(*esapi.UpdateByQueryRequest){
client.UpdateByQuery.WithBody(reader),
client.UpdateByQuery.WithContext(ctx),
client.UpdateByQuery.WithRefresh(true), // FIXME change to wait_for once the API supports it.
client.UpdateByQuery.WithConflicts("proceed"),
}

Expand Down
4 changes: 2 additions & 2 deletions internal/pkg/file/uploader/es.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ func UpdateFileDoc(ctx context.Context, bulker bulk.Bulk, source string, fileID
Chunk Operations
*/

func IndexChunk(ctx context.Context, client *elasticsearch.Client, body *cbor.ChunkEncoder, source string, fileID string, chunkNum int) error {
func IndexChunk(ctx context.Context, client *elasticsearch.Client, body *cbor.ChunkEncoder, source string, fileID string, chunkNum int, refreshParam string) error {
chunkDocID := fmt.Sprintf("%s.%d", fileID, chunkNum)
resp, err := client.Create(fmt.Sprintf(UploadDataIndexPattern, source), chunkDocID, body, func(req *esapi.CreateRequest) {
req.DocumentID = chunkDocID
Expand All @@ -123,7 +123,7 @@ func IndexChunk(ctx context.Context, client *elasticsearch.Client, body *cbor.Ch
}
req.Header.Set("Content-Type", "application/cbor")
req.Header.Set("Accept", "application/json")
req.Refresh = "wait_for"
req.Refresh = refreshParam
})
if err != nil {
return err
Expand Down

0 comments on commit d45babd

Please sign in to comment.