Skip to content

Commit

Permalink
feat(elasticsearch): add retry_on_conflict option on output (#3147)
Browse files Browse the repository at this point in the history
* feat(elasticsearch): add retry_on_conflict option on output

* fix: generate doc
  • Loading branch information
defgenx authored Feb 4, 2025
1 parent 4d7f9ad commit cb682fd
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 24 deletions.
10 changes: 10 additions & 0 deletions docs/modules/components/pages/outputs/elasticsearch.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ output:
id: ${!counter()}-${!timestamp_unix()}
type: ""
routing: ""
retry_on_conflict: 0
sniff: true
healthcheck: true
timeout: 5s
Expand Down Expand Up @@ -224,6 +225,15 @@ This field supports xref:configuration:interpolation.adoc#bloblang-queries[inter
*Default*: `""`
=== `retry_on_conflict`
When using the update or upsert action, retry_on_conflict can be used to specify how many times an update should be retried in the case of a version conflict.
*Type*: `int`
*Default*: `0`
=== `sniff`
Prompts Redpanda Connect to sniff for brokers to connect to when establishing a connection.
Expand Down
56 changes: 32 additions & 24 deletions internal/impl/elasticsearch/output.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,23 +33,24 @@ import (
)

const (
esoFieldURLs = "urls"
esoFieldSniff = "sniff"
esoFieldHealthcheck = "healthcheck"
esoFieldID = "id"
esoFieldAction = "action"
esoFieldIndex = "index"
esoFieldPipeline = "pipeline"
esoFieldRouting = "routing"
esoFieldType = "type"
esoFieldTimeout = "timeout"
esoFieldTLS = "tls"
esoFieldAuth = "basic_auth"
esoFieldAuthEnabled = "enabled"
esoFieldAuthUsername = "username"
esoFieldAuthPassword = "password"
esoFieldAPIKey = "api_key"
esoFieldAWS = "aws"
esoFieldURLs = "urls"
esoFieldSniff = "sniff"
esoFieldHealthcheck = "healthcheck"
esoFieldID = "id"
esoFieldAction = "action"
esoFieldIndex = "index"
esoFieldPipeline = "pipeline"
esoFieldRouting = "routing"
esoFieldRetryOnConflict = "retry_on_conflict"
esoFieldType = "type"
esoFieldTimeout = "timeout"
esoFieldTLS = "tls"
esoFieldAuth = "basic_auth"
esoFieldAuthEnabled = "enabled"
esoFieldAuthUsername = "username"
esoFieldAuthPassword = "password"
esoFieldAPIKey = "api_key"
esoFieldAWS = "aws"
// ESOFieldAWSEnabled enabled field.
ESOFieldAWSEnabled = "enabled"
esoFieldGzipCompression = "gzip_compression"
Expand Down Expand Up @@ -245,6 +246,10 @@ It's possible to enable AWS connectivity with this output using the `+"`aws`"+`
Description("The routing key to use for the document.").
Advanced().
Default(""),
service.NewIntField(esoFieldRetryOnConflict).
Description("When using the update or upsert action, retry_on_conflict can be used to specify how many times an update should be retried in the case of a version conflict.").
Advanced().
Default(0),
service.NewBoolField(esoFieldSniff).
Description("Prompts Redpanda Connect to sniff for brokers to connect to when establishing a connection.").
Advanced().
Expand Down Expand Up @@ -366,13 +371,14 @@ func shouldRetry(s int) bool {
}

type pendingBulkIndex struct {
Action string
Index string
Pipeline string
Routing string
Type string
Doc any
ID string
Action string
Index string
Pipeline string
Routing string
RetryOnConflict int
Type string
Doc any
ID string
}

// WriteBatch writes a message batch to the output.
Expand Down Expand Up @@ -491,6 +497,7 @@ func (e *Output) buildBulkableRequest(p *pendingBulkIndex) (elastic.BulkableRequ
r := elastic.NewBulkUpdateRequest().
Index(p.Index).
Routing(p.Routing).
RetryOnConflict(p.RetryOnConflict).
Id(p.ID).
Doc(p.Doc)
if p.Type != "" {
Expand All @@ -501,6 +508,7 @@ func (e *Output) buildBulkableRequest(p *pendingBulkIndex) (elastic.BulkableRequ
r := elastic.NewBulkUpdateRequest().
Index(p.Index).
Routing(p.Routing).
RetryOnConflict(p.RetryOnConflict).
Id(p.ID).
DocAsUpsert(true).
Doc(p.Doc)
Expand Down

0 comments on commit cb682fd

Please sign in to comment.