Skip to content

Commit

Permalink
Use dual indexer and separate consumers for OpenSearch migration (#6559)
Browse files Browse the repository at this point in the history
* Use dual indexer and separate consumers for OpenSearch migration

* Add consumer name, improve failure handle logic in es processor
  • Loading branch information
neil-xie authored Jan 9, 2025
1 parent 2e2c20a commit ad2d506
Show file tree
Hide file tree
Showing 10 changed files with 127 additions and 139 deletions.
3 changes: 3 additions & 0 deletions common/config/elasticsearch.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,9 @@ type (
// optional to add custom headers
CustomHeaders map[string]string `yaml:"customHeaders,omitempty"`
Migration VisibilityMigration `yaml:"migration"`
// optional, will use default consumer name if not provided
// default consumerName is topic + "-consumer"
ConsumerName string `yaml:"consumerName"`
}

// AWSSigning contains config to enable signing,
Expand Down
1 change: 1 addition & 0 deletions config/development_es_opensearch_migration.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ persistence:
visibility: cadence-visibility-dev
migration:
enabled: true
consumerName: "cadence-visibility-dev-os-consumer"

kafka:
tls:
Expand Down
3 changes: 2 additions & 1 deletion config/dynamicconfig/development_es.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ frontend.enableClientVersionCheck:
- value: true
system.advancedVisibilityWritingMode:
- value: "on"
system.advancedVisibilityMigrationWritingMode:
- value: "source"
system.enableReadVisibilityFromES:
- value: true
frontend.validSearchAttributes:
Expand Down Expand Up @@ -43,4 +45,3 @@ system.minRetentionDays:
history.EnableConsistentQueryByDomain:
- value: true
constraints: {}

1 change: 1 addition & 0 deletions host/onebox.go
Original file line number Diff line number Diff line change
Expand Up @@ -1006,6 +1006,7 @@ func (c *cadenceImpl) startWorkerIndexer(params *resource.Params, service Servic
c.messagingClient,
c.esClient,
c.esConfig.Indices[common.VisibilityAppName],
c.esConfig.ConsumerName,
c.logger,
service.GetMetricsClient())
if err := c.indexer.Start(); err != nil {
Expand Down
133 changes: 28 additions & 105 deletions service/worker/indexer/esProcessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ const (
type (
// ESProcessorImpl implements ESProcessor, it's an agent of GenericBulkProcessor
ESProcessorImpl struct {
bulkProcessor []bulk.GenericBulkProcessor
bulkProcessor bulk.GenericBulkProcessor
mapToKafkaMsg collection.ConcurrentTxMap // used to map ES request to kafka message
config *Config
logger log.Logger
Expand Down Expand Up @@ -92,67 +92,18 @@ func newESProcessor(
return nil, err
}

p.bulkProcessor = []bulk.GenericBulkProcessor{processor}
p.mapToKafkaMsg = collection.NewShardedConcurrentTxMap(1024, p.hashFn)
return p, nil
}

// newESDualProcessor creates new ESDualProcessor which handles double writes to dual visibility clients
func newESDualProcessor(
name string,
config *Config,
esclient es.GenericClient,
osclient es.GenericClient,
logger log.Logger,
metricsClient metrics.Client,
) (*ESProcessorImpl, error) {
p := &ESProcessorImpl{
config: config,
logger: logger.WithTags(tag.ComponentIndexerESProcessor),
scope: metricsClient.Scope(metrics.ESProcessorScope),
msgEncoder: defaultEncoder,
}

params := &bulk.BulkProcessorParameters{
Name: name,
NumOfWorkers: config.ESProcessorNumOfWorkers(),
BulkActions: config.ESProcessorBulkActions(),
BulkSize: config.ESProcessorBulkSize(),
FlushInterval: config.ESProcessorFlushInterval(),
Backoff: bulk.NewExponentialBackoff(esProcessorInitialRetryInterval, esProcessorMaxRetryInterval),
BeforeFunc: p.bulkBeforeAction,
AfterFunc: p.bulkAfterAction,
}
esprocessor, err := esclient.RunBulkProcessor(context.Background(), params)
if err != nil {
return nil, err
}

// for the sceondary processor, we use shadow bulk after func which only logs errors and not ack/nack messages
// the primary processor will be the source of truth
shadowParams := *params
shadowParams.AfterFunc = p.shadowBulkAfterAction
osprocessor, err := osclient.RunBulkProcessor(context.Background(), &shadowParams)
if err != nil {
return nil, err
}

p.bulkProcessor = []bulk.GenericBulkProcessor{esprocessor, osprocessor}
p.bulkProcessor = processor
p.mapToKafkaMsg = collection.NewShardedConcurrentTxMap(1024, p.hashFn)
return p, nil
}

func (p *ESProcessorImpl) Start() {
// current implementation (v6 and v7) allows to invoke Start() multiple times
for _, processor := range p.bulkProcessor {
processor.Start(context.Background())
}
p.bulkProcessor.Start(context.Background())

}
func (p *ESProcessorImpl) Stop() {
for _, processor := range p.bulkProcessor {
processor.Stop() //nolint:errcheck
}
p.bulkProcessor.Stop() //nolint:errcheck
p.mapToKafkaMsg = nil
}

Expand All @@ -167,9 +118,7 @@ func (p *ESProcessorImpl) Add(request *bulk.GenericBulkableAddRequest, key strin
if isDup {
return
}
for _, processor := range p.bulkProcessor {
processor.Add(request)
}
p.bulkProcessor.Add(request)
}

// bulkBeforeAction is triggered before bulk bulkProcessor commit
Expand All @@ -186,7 +135,10 @@ func (p *ESProcessorImpl) bulkAfterAction(id int64, requests []bulk.GenericBulka

isRetryable := isResponseRetriable(err.Status)
for _, request := range requests {
if !isRetryable {
if isRetryable {
// retryable errors will be retried by the bulk processor
p.logger.Error("ES request failed", tag.ESRequest(request.String()))
} else {
key := p.retrieveKafkaKey(request)
if key == "" {
continue
Expand All @@ -203,26 +155,31 @@ func (p *ESProcessorImpl) bulkAfterAction(id int64, requests []bulk.GenericBulka
// 404 means the document does not exist
// 409 means means the document's version does not match (or if the document has been updated or deleted by another process)
// this can happen during the data migration, the doc was deleted in the old index but not exists in the new index
if err.Status == 409 || err.Status == 404 {
status := err.Status
if err.Status == 409 {
p.logger.Info("Request encountered a version conflict. Acknowledging to prevent retry.",
tag.ESResponseStatus(err.Status), tag.ESRequest(request.String()),
tag.WorkflowID(wid),
tag.WorkflowRunID(rid),
tag.WorkflowDomainID(domainID))
p.ackKafkaMsg(key)
} else if err.Status == 404 {
req, err := request.Source()
if err == nil {
if p.isDeleteRequest(req) {
p.logger.Info("Delete request encountered a version conflict. Acknowledging to prevent retry.",
tag.ESResponseStatus(status), tag.ESRequest(request.String()),
tag.WorkflowID(wid),
tag.WorkflowRunID(rid),
tag.WorkflowDomainID(domainID))
p.ackKafkaMsg(key)
}
if err == nil && p.isDeleteRequest(req) {
p.logger.Info("Document has been deleted. Acknowledging to prevent retry.",
tag.ESResponseStatus(404), tag.ESRequest(request.String()),
tag.WorkflowID(wid),
tag.WorkflowRunID(rid),
tag.WorkflowDomainID(domainID))
p.ackKafkaMsg(key)
} else {
p.logger.Error("Get request source err.", tag.Error(err), tag.ESRequest(request.String()))
p.scope.IncCounter(metrics.ESProcessorCorruptedData)
p.nackKafkaMsg(key)
}
} else {
// For all other non-retryable errors, nack the message
p.nackKafkaMsg(key)
}
p.nackKafkaMsg(key)
} else {
p.logger.Error("ES request failed", tag.ESRequest(request.String()))
}
p.scope.IncCounter(metrics.ESProcessorFailures)
}
Expand Down Expand Up @@ -254,40 +211,6 @@ func (p *ESProcessorImpl) bulkAfterAction(id int64, requests []bulk.GenericBulka
}
}

// shadowBulkAfterAction is triggered after bulk bulkProcessor commit
func (p *ESProcessorImpl) shadowBulkAfterAction(id int64, requests []bulk.GenericBulkableRequest, response *bulk.GenericBulkResponse, err *bulk.GenericError) {
if err != nil {
// This happens after configured retry, which means something bad happens on cluster or index
// When cluster back to live, bulkProcessor will re-commit those failure requests
p.logger.Error("Error commit bulk request in secondary processor.", tag.Error(err.Details))

for _, request := range requests {
p.logger.Error("ES request failed in secondary processor",
tag.ESResponseStatus(err.Status),
tag.ESRequest(request.String()))
}
return
}
responseItems := response.Items
for i := 0; i < len(requests) && i < len(responseItems); i++ {
key := p.retrieveKafkaKey(requests[i])
if key == "" {
continue
}
responseItem := responseItems[i]
// It is possible for err to be nil while the responses in response.Items might still contain errors or unsuccessful statuses for individual requests.
// This is because the err variable refers to the overall bulk request operation, but each individual request in the bulk operation has its own status code.
for _, resp := range responseItem {
if !isResponseSuccess(resp.Status) {
wid, rid, domainID := p.getMsgWithInfo(key)
p.logger.Error("ES request failed in secondary processor",
tag.ESResponseStatus(resp.Status), tag.ESResponseError(getErrorMsgFromESResp(resp)), tag.WorkflowID(wid), tag.WorkflowRunID(rid),
tag.WorkflowDomainID(domainID))
}
}
}
}

func (p *ESProcessorImpl) ackKafkaMsg(key string) {
p.ackKafkaMsgHelper(key, false)
}
Expand Down
14 changes: 1 addition & 13 deletions service/worker/indexer/esProcessor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ func (s *esProcessorSuite) SetupTest() {
msgEncoder: defaultEncoder,
}
p.mapToKafkaMsg = collection.NewShardedConcurrentTxMap(1024, p.hashFn)
p.bulkProcessor = []bulk.GenericBulkProcessor{s.mockBulkProcessor}
p.bulkProcessor = s.mockBulkProcessor

s.esProcessor = p

Expand Down Expand Up @@ -526,18 +526,12 @@ func (s *esProcessorSuite) TestBulkAfterAction_Nack_Shadow_WithError() {
mapVal := newKafkaMessageWithMetrics(mockKafkaMsg, &testStopWatch)
s.esProcessor.mapToKafkaMsg.Put(testKey, mapVal)

// Add mocked secondary processor
secondaryProcessor := &mocks2.GenericBulkProcessor{}
s.esProcessor.bulkProcessor = append(s.esProcessor.bulkProcessor, secondaryProcessor)

// Mock Kafka message Nack and Value
mockKafkaMsg.On("Nack").Return(nil).Once()
mockKafkaMsg.On("Value").Return(payload).Once()
s.mockScope.On("IncCounter", mock.AnythingOfType("int")).Return()
// Execute bulkAfterAction for primary processor with error
s.esProcessor.bulkAfterAction(0, requests, response, mockErr)
// Mocking secondary processor to test shadowBulkAfterAction with error
s.esProcessor.shadowBulkAfterAction(0, requests, response, mockErr)
}

func (s *esProcessorSuite) TestBulkAfterAction_Shadow_Fail_WithoutError() {
Expand Down Expand Up @@ -572,16 +566,10 @@ func (s *esProcessorSuite) TestBulkAfterAction_Shadow_Fail_WithoutError() {
mapVal := newKafkaMessageWithMetrics(mockKafkaMsg, &testStopWatch)
s.esProcessor.mapToKafkaMsg.Put(testKey, mapVal)

// Add mocked secondary processor
secondaryProcessor := &mocks2.GenericBulkProcessor{}
s.esProcessor.bulkProcessor = append(s.esProcessor.bulkProcessor, secondaryProcessor)

// Mock Kafka message Nack and Value
mockKafkaMsg.On("Nack").Return(nil).Once()
mockKafkaMsg.On("Value").Return(payload).Once()
s.mockScope.On("IncCounter", mock.AnythingOfType("int")).Return()
// Execute bulkAfterAction for primary processor with error
s.esProcessor.bulkAfterAction(0, requests, response, nil)
// Mocking secondary processor to test shadowBulkAfterAction with error
s.esProcessor.shadowBulkAfterAction(0, requests, response, nil)
}
17 changes: 14 additions & 3 deletions service/worker/indexer/indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,9 @@ import (
)

const (
versionTypeExternal = "external"
processorName = "visibility-processor"
versionTypeExternal = "external"
processorName = "visibility-processor"
migrationProcessorName = "migration-visibility-processor"
)

var (
Expand Down Expand Up @@ -74,6 +75,11 @@ type (
shutdownCh chan struct{}
}

DualIndexer struct {
SourceIndexer *Indexer
DestIndexer *Indexer
}

// Config contains all configs for indexer
Config struct {
IndexerConcurrency dynamicconfig.IntPropertyFn
Expand All @@ -92,6 +98,7 @@ func NewIndexer(
client messaging.Client,
visibilityClient es.GenericClient,
visibilityName string,
consumerName string,
logger log.Logger,
metricsClient metrics.Client,
) *Indexer {
Expand All @@ -102,7 +109,11 @@ func NewIndexer(
logger.Fatal("Index ES processor state changed", tag.LifeCycleStartFailed, tag.Error(err))
}

consumer, err := client.NewConsumer(common.VisibilityAppName, getConsumerName(visibilityName))
if consumerName == "" {
consumerName = getConsumerName(visibilityName)
}

consumer, err := client.NewConsumer(common.VisibilityAppName, consumerName)
if err != nil {
logger.Fatal("Index consumer state changed", tag.LifeCycleStartFailed, tag.Error(err))
}
Expand Down
6 changes: 4 additions & 2 deletions service/worker/indexer/indexer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,15 +54,17 @@ func TestNewDualIndexer(t *testing.T) {
ESProcessorFlushInterval: dynamicconfig.GetDurationPropertyFn(1 * time.Minute),
}
processorName := "test-bulkProcessor"
consumerName := "test-bulkProcessor-os-consumer"
mockESClient := &esMocks.GenericClient{}
mockESClient.On("RunBulkProcessor", mock.Anything, mock.MatchedBy(func(input *bulk.BulkProcessorParameters) bool {
return true
})).Return(&mocks2.GenericBulkProcessor{}, nil).Times(2)

mockMessagingClient := messaging.NewMockClient(ctrl)
mockMessagingClient.EXPECT().NewConsumer("visibility", "test-bulkProcessor-consumer").Return(nil, nil).Times(1)
mockMessagingClient.EXPECT().NewConsumer("visibility", "test-bulkProcessor-os-consumer").Return(nil, nil).Times(1)

indexer := NewMigrationIndexer(config, mockMessagingClient, mockESClient, mockESClient, processorName, testlogger.New(t), metrics.NewNoopMetricsClient())
indexer := NewMigrationDualIndexer(config, mockMessagingClient, mockESClient, mockESClient, processorName, processorName, "", consumerName, testlogger.New(t), metrics.NewNoopMetricsClient())
assert.NotNil(t, indexer)
}

Expand All @@ -85,7 +87,7 @@ func TestNewIndexer(t *testing.T) {
mockMessagingClient := messaging.NewMockClient(ctrl)
mockMessagingClient.EXPECT().NewConsumer("visibility", "test-bulkProcessor-consumer").Return(nil, nil).Times(1)

indexer := NewIndexer(config, mockMessagingClient, mockESClient, processorName, testlogger.New(t), metrics.NewNoopMetricsClient())
indexer := NewIndexer(config, mockMessagingClient, mockESClient, processorName, "", testlogger.New(t), metrics.NewNoopMetricsClient())
assert.NotNil(t, indexer)
}

Expand Down
Loading

0 comments on commit ad2d506

Please sign in to comment.