diff --git a/common/config/elasticsearch.go b/common/config/elasticsearch.go index 203f403b6c3..24a1bfee856 100644 --- a/common/config/elasticsearch.go +++ b/common/config/elasticsearch.go @@ -58,6 +58,9 @@ type ( // optional to add custom headers CustomHeaders map[string]string `yaml:"customHeaders,omitempty"` Migration VisibilityMigration `yaml:"migration"` + // optional, will use default consumer name if not provided + // default consumerName is topic + "-consumer" + ConsumerName string `yaml:"consumerName"` } // AWSSigning contains config to enable signing, diff --git a/config/development_es_opensearch_migration.yaml b/config/development_es_opensearch_migration.yaml index 91731e63a43..678845be144 100644 --- a/config/development_es_opensearch_migration.yaml +++ b/config/development_es_opensearch_migration.yaml @@ -25,6 +25,7 @@ persistence: visibility: cadence-visibility-dev migration: enabled: true + consumerName: "cadence-visibility-dev-os-consumer" kafka: tls: diff --git a/config/dynamicconfig/development_es.yaml b/config/dynamicconfig/development_es.yaml index 3131b24c915..310d7f47a79 100644 --- a/config/dynamicconfig/development_es.yaml +++ b/config/dynamicconfig/development_es.yaml @@ -2,6 +2,8 @@ frontend.enableClientVersionCheck: - value: true system.advancedVisibilityWritingMode: - value: "on" +system.advancedVisibilityMigrationWritingMode: + - value: "source" system.enableReadVisibilityFromES: - value: true frontend.validSearchAttributes: @@ -43,4 +45,3 @@ system.minRetentionDays: history.EnableConsistentQueryByDomain: - value: true constraints: {} - diff --git a/host/onebox.go b/host/onebox.go index 4c8de66b165..27ab3ee01c8 100644 --- a/host/onebox.go +++ b/host/onebox.go @@ -1006,6 +1006,7 @@ func (c *cadenceImpl) startWorkerIndexer(params *resource.Params, service Servic c.messagingClient, c.esClient, c.esConfig.Indices[common.VisibilityAppName], + c.esConfig.ConsumerName, c.logger, service.GetMetricsClient()) if err := c.indexer.Start(); err != nil { diff --git a/service/worker/indexer/esProcessor.go b/service/worker/indexer/esProcessor.go index ea59e5b1257..1692ff51b1c 100644 --- a/service/worker/indexer/esProcessor.go +++ b/service/worker/indexer/esProcessor.go @@ -48,7 +48,7 @@ const ( type ( // ESProcessorImpl implements ESProcessor, it's an agent of GenericBulkProcessor ESProcessorImpl struct { - bulkProcessor []bulk.GenericBulkProcessor + bulkProcessor bulk.GenericBulkProcessor mapToKafkaMsg collection.ConcurrentTxMap // used to map ES request to kafka message config *Config logger log.Logger @@ -92,67 +92,18 @@ func newESProcessor( return nil, err } - p.bulkProcessor = []bulk.GenericBulkProcessor{processor} - p.mapToKafkaMsg = collection.NewShardedConcurrentTxMap(1024, p.hashFn) - return p, nil -} - -// newESDualProcessor creates new ESDualProcessor which handles double writes to dual visibility clients -func newESDualProcessor( - name string, - config *Config, - esclient es.GenericClient, - osclient es.GenericClient, - logger log.Logger, - metricsClient metrics.Client, -) (*ESProcessorImpl, error) { - p := &ESProcessorImpl{ - config: config, - logger: logger.WithTags(tag.ComponentIndexerESProcessor), - scope: metricsClient.Scope(metrics.ESProcessorScope), - msgEncoder: defaultEncoder, - } - - params := &bulk.BulkProcessorParameters{ - Name: name, - NumOfWorkers: config.ESProcessorNumOfWorkers(), - BulkActions: config.ESProcessorBulkActions(), - BulkSize: config.ESProcessorBulkSize(), - FlushInterval: config.ESProcessorFlushInterval(), - Backoff: bulk.NewExponentialBackoff(esProcessorInitialRetryInterval, esProcessorMaxRetryInterval), - BeforeFunc: p.bulkBeforeAction, - AfterFunc: p.bulkAfterAction, - } - esprocessor, err := esclient.RunBulkProcessor(context.Background(), params) - if err != nil { - return nil, err - } - - // for the sceondary processor, we use shadow bulk after func which only logs errors and not ack/nack messages - // the primary processor will be the source of truth - shadowParams := *params - shadowParams.AfterFunc = p.shadowBulkAfterAction - osprocessor, err := osclient.RunBulkProcessor(context.Background(), &shadowParams) - if err != nil { - return nil, err - } - - p.bulkProcessor = []bulk.GenericBulkProcessor{esprocessor, osprocessor} + p.bulkProcessor = processor p.mapToKafkaMsg = collection.NewShardedConcurrentTxMap(1024, p.hashFn) return p, nil } func (p *ESProcessorImpl) Start() { // current implementation (v6 and v7) allows to invoke Start() multiple times - for _, processor := range p.bulkProcessor { - processor.Start(context.Background()) - } + p.bulkProcessor.Start(context.Background()) } func (p *ESProcessorImpl) Stop() { - for _, processor := range p.bulkProcessor { - processor.Stop() //nolint:errcheck - } + p.bulkProcessor.Stop() //nolint:errcheck p.mapToKafkaMsg = nil } @@ -167,9 +118,7 @@ func (p *ESProcessorImpl) Add(request *bulk.GenericBulkableAddRequest, key strin if isDup { return } - for _, processor := range p.bulkProcessor { - processor.Add(request) - } + p.bulkProcessor.Add(request) } // bulkBeforeAction is triggered before bulk bulkProcessor commit @@ -186,7 +135,10 @@ func (p *ESProcessorImpl) bulkAfterAction(id int64, requests []bulk.GenericBulka isRetryable := isResponseRetriable(err.Status) for _, request := range requests { - if !isRetryable { + if isRetryable { + // retryable errors will be retried by the bulk processor + p.logger.Error("ES request failed", tag.ESRequest(request.String())) + } else { key := p.retrieveKafkaKey(request) if key == "" { continue @@ -203,26 +155,31 @@ func (p *ESProcessorImpl) bulkAfterAction(id int64, requests []bulk.GenericBulka // 404 means the document does not exist // 409 means means the document's version does not match (or if the document has been updated or deleted by another process) // this can happen during the data migration, the doc was deleted in the old index but not exists in the new index - if err.Status == 409 || err.Status == 404 { - status := err.Status + if err.Status == 409 { + p.logger.Info("Request encountered a version conflict. Acknowledging to prevent retry.", + tag.ESResponseStatus(err.Status), tag.ESRequest(request.String()), + tag.WorkflowID(wid), + tag.WorkflowRunID(rid), + tag.WorkflowDomainID(domainID)) + p.ackKafkaMsg(key) + } else if err.Status == 404 { req, err := request.Source() - if err == nil { - if p.isDeleteRequest(req) { - p.logger.Info("Delete request encountered a version conflict. Acknowledging to prevent retry.", - tag.ESResponseStatus(status), tag.ESRequest(request.String()), - tag.WorkflowID(wid), - tag.WorkflowRunID(rid), - tag.WorkflowDomainID(domainID)) - p.ackKafkaMsg(key) - } + if err == nil && p.isDeleteRequest(req) { + p.logger.Info("Document has been deleted. Acknowledging to prevent retry.", + tag.ESResponseStatus(404), tag.ESRequest(request.String()), + tag.WorkflowID(wid), + tag.WorkflowRunID(rid), + tag.WorkflowDomainID(domainID)) + p.ackKafkaMsg(key) } else { p.logger.Error("Get request source err.", tag.Error(err), tag.ESRequest(request.String())) p.scope.IncCounter(metrics.ESProcessorCorruptedData) + p.nackKafkaMsg(key) } + } else { + // For all other non-retryable errors, nack the message + p.nackKafkaMsg(key) } - p.nackKafkaMsg(key) - } else { - p.logger.Error("ES request failed", tag.ESRequest(request.String())) } p.scope.IncCounter(metrics.ESProcessorFailures) } @@ -254,40 +211,6 @@ func (p *ESProcessorImpl) bulkAfterAction(id int64, requests []bulk.GenericBulka } } -// shadowBulkAfterAction is triggered after bulk bulkProcessor commit -func (p *ESProcessorImpl) shadowBulkAfterAction(id int64, requests []bulk.GenericBulkableRequest, response *bulk.GenericBulkResponse, err *bulk.GenericError) { - if err != nil { - // This happens after configured retry, which means something bad happens on cluster or index - // When cluster back to live, bulkProcessor will re-commit those failure requests - p.logger.Error("Error commit bulk request in secondary processor.", tag.Error(err.Details)) - - for _, request := range requests { - p.logger.Error("ES request failed in secondary processor", - tag.ESResponseStatus(err.Status), - tag.ESRequest(request.String())) - } - return - } - responseItems := response.Items - for i := 0; i < len(requests) && i < len(responseItems); i++ { - key := p.retrieveKafkaKey(requests[i]) - if key == "" { - continue - } - responseItem := responseItems[i] - // It is possible for err to be nil while the responses in response.Items might still contain errors or unsuccessful statuses for individual requests. - // This is because the err variable refers to the overall bulk request operation, but each individual request in the bulk operation has its own status code. - for _, resp := range responseItem { - if !isResponseSuccess(resp.Status) { - wid, rid, domainID := p.getMsgWithInfo(key) - p.logger.Error("ES request failed in secondary processor", - tag.ESResponseStatus(resp.Status), tag.ESResponseError(getErrorMsgFromESResp(resp)), tag.WorkflowID(wid), tag.WorkflowRunID(rid), - tag.WorkflowDomainID(domainID)) - } - } - } -} - func (p *ESProcessorImpl) ackKafkaMsg(key string) { p.ackKafkaMsgHelper(key, false) } diff --git a/service/worker/indexer/esProcessor_test.go b/service/worker/indexer/esProcessor_test.go index 6476eb5f6d4..866a42746f0 100644 --- a/service/worker/indexer/esProcessor_test.go +++ b/service/worker/indexer/esProcessor_test.go @@ -86,7 +86,7 @@ func (s *esProcessorSuite) SetupTest() { msgEncoder: defaultEncoder, } p.mapToKafkaMsg = collection.NewShardedConcurrentTxMap(1024, p.hashFn) - p.bulkProcessor = []bulk.GenericBulkProcessor{s.mockBulkProcessor} + p.bulkProcessor = s.mockBulkProcessor s.esProcessor = p @@ -526,18 +526,12 @@ func (s *esProcessorSuite) TestBulkAfterAction_Nack_Shadow_WithError() { mapVal := newKafkaMessageWithMetrics(mockKafkaMsg, &testStopWatch) s.esProcessor.mapToKafkaMsg.Put(testKey, mapVal) - // Add mocked secondary processor - secondaryProcessor := &mocks2.GenericBulkProcessor{} - s.esProcessor.bulkProcessor = append(s.esProcessor.bulkProcessor, secondaryProcessor) - // Mock Kafka message Nack and Value mockKafkaMsg.On("Nack").Return(nil).Once() mockKafkaMsg.On("Value").Return(payload).Once() s.mockScope.On("IncCounter", mock.AnythingOfType("int")).Return() // Execute bulkAfterAction for primary processor with error s.esProcessor.bulkAfterAction(0, requests, response, mockErr) - // Mocking secondary processor to test shadowBulkAfterAction with error - s.esProcessor.shadowBulkAfterAction(0, requests, response, mockErr) } func (s *esProcessorSuite) TestBulkAfterAction_Shadow_Fail_WithoutError() { @@ -572,16 +566,10 @@ func (s *esProcessorSuite) TestBulkAfterAction_Shadow_Fail_WithoutError() { mapVal := newKafkaMessageWithMetrics(mockKafkaMsg, &testStopWatch) s.esProcessor.mapToKafkaMsg.Put(testKey, mapVal) - // Add mocked secondary processor - secondaryProcessor := &mocks2.GenericBulkProcessor{} - s.esProcessor.bulkProcessor = append(s.esProcessor.bulkProcessor, secondaryProcessor) - // Mock Kafka message Nack and Value mockKafkaMsg.On("Nack").Return(nil).Once() mockKafkaMsg.On("Value").Return(payload).Once() s.mockScope.On("IncCounter", mock.AnythingOfType("int")).Return() // Execute bulkAfterAction for primary processor with error s.esProcessor.bulkAfterAction(0, requests, response, nil) - // Mocking secondary processor to test shadowBulkAfterAction with error - s.esProcessor.shadowBulkAfterAction(0, requests, response, nil) } diff --git a/service/worker/indexer/indexer.go b/service/worker/indexer/indexer.go index d645911c8e9..a6388015eca 100644 --- a/service/worker/indexer/indexer.go +++ b/service/worker/indexer/indexer.go @@ -44,8 +44,9 @@ import ( ) const ( - versionTypeExternal = "external" - processorName = "visibility-processor" + versionTypeExternal = "external" + processorName = "visibility-processor" + migrationProcessorName = "migration-visibility-processor" ) var ( @@ -74,6 +75,11 @@ type ( shutdownCh chan struct{} } + DualIndexer struct { + SourceIndexer *Indexer + DestIndexer *Indexer + } + // Config contains all configs for indexer Config struct { IndexerConcurrency dynamicconfig.IntPropertyFn @@ -92,6 +98,7 @@ func NewIndexer( client messaging.Client, visibilityClient es.GenericClient, visibilityName string, + consumerName string, logger log.Logger, metricsClient metrics.Client, ) *Indexer { @@ -102,7 +109,11 @@ func NewIndexer( logger.Fatal("Index ES processor state changed", tag.LifeCycleStartFailed, tag.Error(err)) } - consumer, err := client.NewConsumer(common.VisibilityAppName, getConsumerName(visibilityName)) + if consumerName == "" { + consumerName = getConsumerName(visibilityName) + } + + consumer, err := client.NewConsumer(common.VisibilityAppName, consumerName) if err != nil { logger.Fatal("Index consumer state changed", tag.LifeCycleStartFailed, tag.Error(err)) } diff --git a/service/worker/indexer/indexer_test.go b/service/worker/indexer/indexer_test.go index 589531362d4..96357aee09c 100644 --- a/service/worker/indexer/indexer_test.go +++ b/service/worker/indexer/indexer_test.go @@ -54,6 +54,7 @@ func TestNewDualIndexer(t *testing.T) { ESProcessorFlushInterval: dynamicconfig.GetDurationPropertyFn(1 * time.Minute), } processorName := "test-bulkProcessor" + consumerName := "test-bulkProcessor-os-consumer" mockESClient := &esMocks.GenericClient{} mockESClient.On("RunBulkProcessor", mock.Anything, mock.MatchedBy(func(input *bulk.BulkProcessorParameters) bool { return true @@ -61,8 +62,9 @@ func TestNewDualIndexer(t *testing.T) { mockMessagingClient := messaging.NewMockClient(ctrl) mockMessagingClient.EXPECT().NewConsumer("visibility", "test-bulkProcessor-consumer").Return(nil, nil).Times(1) + mockMessagingClient.EXPECT().NewConsumer("visibility", "test-bulkProcessor-os-consumer").Return(nil, nil).Times(1) - indexer := NewMigrationIndexer(config, mockMessagingClient, mockESClient, mockESClient, processorName, testlogger.New(t), metrics.NewNoopMetricsClient()) + indexer := NewMigrationDualIndexer(config, mockMessagingClient, mockESClient, mockESClient, processorName, processorName, "", consumerName, testlogger.New(t), metrics.NewNoopMetricsClient()) assert.NotNil(t, indexer) } @@ -85,7 +87,7 @@ func TestNewIndexer(t *testing.T) { mockMessagingClient := messaging.NewMockClient(ctrl) mockMessagingClient.EXPECT().NewConsumer("visibility", "test-bulkProcessor-consumer").Return(nil, nil).Times(1) - indexer := NewIndexer(config, mockMessagingClient, mockESClient, processorName, testlogger.New(t), metrics.NewNoopMetricsClient()) + indexer := NewIndexer(config, mockMessagingClient, mockESClient, processorName, "", testlogger.New(t), metrics.NewNoopMetricsClient()) assert.NotNil(t, indexer) } diff --git a/service/worker/indexer/migration_indexer.go b/service/worker/indexer/migration_indexer.go index f38cfa2ba0a..f13ac8dcaf7 100644 --- a/service/worker/indexer/migration_indexer.go +++ b/service/worker/indexer/migration_indexer.go @@ -29,31 +29,37 @@ import ( "github.com/uber/cadence/common/metrics" ) -// NewMigrationIndexer create a new Indexer that can index to both ES and OS -func NewMigrationIndexer( - config *Config, +// NewMigrationDualIndexer create a new Indexer that will be used during visibility migration +// When migrate from ES to OS, we will have this indexer to index to both ES and OS +func NewMigrationDualIndexer(config *Config, client messaging.Client, primaryClient es.GenericClient, secondaryClient es.GenericClient, - visibilityName string, + primaryVisibilityName string, + secondaryVisibilityName string, + primaryConsumerName string, + secondaryConsumerName string, logger log.Logger, - metricsClient metrics.Client, -) *Indexer { + metricsClient metrics.Client) *DualIndexer { + logger = logger.WithTags(tag.ComponentIndexer) - visibilityProcessor, err := newESDualProcessor(processorName, config, primaryClient, secondaryClient, logger, metricsClient) + visibilityProcessor, err := newESProcessor(processorName, config, primaryClient, logger, metricsClient) if err != nil { logger.Fatal("Index ES processor state changed", tag.LifeCycleStartFailed, tag.Error(err)) } - consumer, err := client.NewConsumer(common.VisibilityAppName, getConsumerName(visibilityName)) + if primaryConsumerName == "" { + primaryConsumerName = getConsumerName(primaryVisibilityName) + } + consumer, err := client.NewConsumer(common.VisibilityAppName, primaryConsumerName) if err != nil { logger.Fatal("Index consumer state changed", tag.LifeCycleStartFailed, tag.Error(err)) } - return &Indexer{ + sourceIndexer := &Indexer{ config: config, - esIndexName: visibilityName, + esIndexName: primaryVisibilityName, consumer: consumer, logger: logger.WithTags(tag.ComponentIndexerProcessor), scope: metricsClient.Scope(metrics.IndexProcessorScope), @@ -61,4 +67,52 @@ func NewMigrationIndexer( visibilityProcessor: visibilityProcessor, msgEncoder: defaultEncoder, } + + secondaryVisibilityProcessor, err := newESProcessor(migrationProcessorName, config, secondaryClient, logger, metricsClient) + if err != nil { + logger.Fatal("Migration Index ES processor state changed", tag.LifeCycleStartFailed, tag.Error(err)) + } + + if secondaryConsumerName == "" { + secondaryConsumerName = getConsumerName(primaryVisibilityName) + } + secondaryConsumer, err := client.NewConsumer(common.VisibilityAppName, secondaryConsumerName) + if err != nil { + logger.Fatal("Migration Index consumer state changed", tag.LifeCycleStartFailed, tag.Error(err)) + } + + destIndexer := &Indexer{ + config: config, + esIndexName: secondaryVisibilityName, + consumer: secondaryConsumer, + logger: logger.WithTags(tag.ComponentIndexerProcessor), + scope: metricsClient.Scope(metrics.IndexProcessorScope), + shutdownCh: make(chan struct{}), + visibilityProcessor: secondaryVisibilityProcessor, + msgEncoder: defaultEncoder, + } + + return &DualIndexer{ + SourceIndexer: sourceIndexer, + DestIndexer: destIndexer, + } +} + +func (i *DualIndexer) Start() error { + if err := i.SourceIndexer.Start(); err != nil { + i.SourceIndexer.Stop() + return err + } + + if err := i.DestIndexer.Start(); err != nil { + i.DestIndexer.Stop() + return err + } + + return nil +} + +func (i *DualIndexer) Stop() { + i.SourceIndexer.Stop() + i.DestIndexer.Stop() } diff --git a/service/worker/service.go b/service/worker/service.go index 699bffa19bb..cb48c30e5f1 100644 --- a/service/worker/service.go +++ b/service/worker/service.go @@ -221,7 +221,7 @@ func (s *Service) Start() { s.startFixerWorkflowWorker() if s.config.IndexerCfg != nil { if shouldStartMigrationIndexer(s.params) { - s.startMigrationIndexer() + s.startMigrationDualIndexer() } else { s.startIndexer() } @@ -386,6 +386,7 @@ func (s *Service) startIndexer() { s.GetMessagingClient(), s.params.ESClient, s.params.ESConfig.Indices[common.VisibilityAppName], + s.params.ESConfig.ConsumerName, s.GetLogger(), s.GetMetricsClient(), ) @@ -395,18 +396,21 @@ func (s *Service) startIndexer() { } } -func (s *Service) startMigrationIndexer() { - visibilityIndexer := indexer.NewMigrationIndexer( +func (s *Service) startMigrationDualIndexer() { + visibilityDualIndexer := indexer.NewMigrationDualIndexer( s.config.IndexerCfg, s.GetMessagingClient(), s.params.ESClient, s.params.OSClient, s.params.ESConfig.Indices[common.VisibilityAppName], + s.params.OSConfig.Indices[common.VisibilityAppName], + s.params.ESConfig.ConsumerName, + s.params.OSConfig.ConsumerName, s.GetLogger(), s.GetMetricsClient(), ) - if err := visibilityIndexer.Start(); err != nil { - visibilityIndexer.Stop() + if err := visibilityDualIndexer.Start(); err != nil { + // not need to call visibilityDualIndexer.Stop() since it has been called inside Start() s.GetLogger().Fatal("fail to start indexer", tag.Error(err)) } }