diff --git a/docs/modules/components/pages/outputs/elasticsearch.adoc b/docs/modules/components/pages/outputs/elasticsearch.adoc index 0627054f78..7fd8e6e991 100644 --- a/docs/modules/components/pages/outputs/elasticsearch.adoc +++ b/docs/modules/components/pages/outputs/elasticsearch.adoc @@ -67,6 +67,7 @@ output: id: ${!counter()}-${!timestamp_unix()} type: "" routing: "" + retry_on_conflict: 0 sniff: true healthcheck: true timeout: 5s @@ -224,6 +225,15 @@ This field supports xref:configuration:interpolation.adoc#bloblang-queries[inter *Default*: `""` +=== `retry_on_conflict` + +When using the update or upsert action, retry_on_conflict can be used to specify how many times an update should be retried in the case of a version conflict. + + +*Type*: `int` + +*Default*: `0` + === `sniff` Prompts Redpanda Connect to sniff for brokers to connect to when establishing a connection. diff --git a/internal/impl/elasticsearch/output.go b/internal/impl/elasticsearch/output.go index 1c6c0387be..c15504c57a 100644 --- a/internal/impl/elasticsearch/output.go +++ b/internal/impl/elasticsearch/output.go @@ -33,23 +33,24 @@ import ( ) const ( - esoFieldURLs = "urls" - esoFieldSniff = "sniff" - esoFieldHealthcheck = "healthcheck" - esoFieldID = "id" - esoFieldAction = "action" - esoFieldIndex = "index" - esoFieldPipeline = "pipeline" - esoFieldRouting = "routing" - esoFieldType = "type" - esoFieldTimeout = "timeout" - esoFieldTLS = "tls" - esoFieldAuth = "basic_auth" - esoFieldAuthEnabled = "enabled" - esoFieldAuthUsername = "username" - esoFieldAuthPassword = "password" - esoFieldAPIKey = "api_key" - esoFieldAWS = "aws" + esoFieldURLs = "urls" + esoFieldSniff = "sniff" + esoFieldHealthcheck = "healthcheck" + esoFieldID = "id" + esoFieldAction = "action" + esoFieldIndex = "index" + esoFieldPipeline = "pipeline" + esoFieldRouting = "routing" + esoFieldRetryOnConflict = "retry_on_conflict" + esoFieldType = "type" + esoFieldTimeout = "timeout" + esoFieldTLS = "tls" + esoFieldAuth = "basic_auth" + esoFieldAuthEnabled = "enabled" + esoFieldAuthUsername = "username" + esoFieldAuthPassword = "password" + esoFieldAPIKey = "api_key" + esoFieldAWS = "aws" // ESOFieldAWSEnabled enabled field. ESOFieldAWSEnabled = "enabled" esoFieldGzipCompression = "gzip_compression" @@ -245,6 +246,10 @@ It's possible to enable AWS connectivity with this output using the `+"`aws`"+` Description("The routing key to use for the document."). Advanced(). Default(""), + service.NewIntField(esoFieldRetryOnConflict). + Description("When using the update or upsert action, retry_on_conflict can be used to specify how many times an update should be retried in the case of a version conflict."). + Advanced(). + Default(0), service.NewBoolField(esoFieldSniff). Description("Prompts Redpanda Connect to sniff for brokers to connect to when establishing a connection."). Advanced(). @@ -366,13 +371,14 @@ func shouldRetry(s int) bool { } type pendingBulkIndex struct { - Action string - Index string - Pipeline string - Routing string - Type string - Doc any - ID string + Action string + Index string + Pipeline string + Routing string + RetryOnConflict int + Type string + Doc any + ID string } // WriteBatch writes a message batch to the output. @@ -491,6 +497,7 @@ func (e *Output) buildBulkableRequest(p *pendingBulkIndex) (elastic.BulkableRequ r := elastic.NewBulkUpdateRequest(). Index(p.Index). Routing(p.Routing). + RetryOnConflict(p.RetryOnConflict). Id(p.ID). Doc(p.Doc) if p.Type != "" { @@ -501,6 +508,7 @@ func (e *Output) buildBulkableRequest(p *pendingBulkIndex) (elastic.BulkableRequ r := elastic.NewBulkUpdateRequest(). Index(p.Index). Routing(p.Routing). + RetryOnConflict(p.RetryOnConflict). Id(p.ID). DocAsUpsert(true). Doc(p.Doc)