diff --git a/common/metrics/defs.go b/common/metrics/defs.go index 2edb6983d95..33ba84cd021 100644 --- a/common/metrics/defs.go +++ b/common/metrics/defs.go @@ -2526,6 +2526,8 @@ const ( ReplicationTasksApplied ReplicationTasksFailed ReplicationTasksLag + ReplicationTasksLagRaw + ReplicationTasksDelay ReplicationTasksFetched ReplicationTasksReturned ReplicationTasksReturnedDiff @@ -3228,6 +3230,8 @@ var MetricDefs = map[ServiceIdx]map[int]metricDefinition{ ReplicationTasksApplied: {metricName: "replication_tasks_applied", metricType: Counter}, ReplicationTasksFailed: {metricName: "replication_tasks_failed", metricType: Counter}, ReplicationTasksLag: {metricName: "replication_tasks_lag", metricType: Timer}, + ReplicationTasksLagRaw: {metricName: "replication_tasks_lag_raw", metricType: Timer}, + ReplicationTasksDelay: {metricName: "replication_tasks_delay", metricType: Timer}, ReplicationTasksFetched: {metricName: "replication_tasks_fetched", metricType: Timer}, ReplicationTasksReturned: {metricName: "replication_tasks_returned", metricType: Timer}, ReplicationTasksReturnedDiff: {metricName: "replication_tasks_returned_diff", metricType: Timer}, diff --git a/service/history/engine/engineimpl/history_engine.go b/service/history/engine/engineimpl/history_engine.go index 7881e1c02e4..59c1844823e 100644 --- a/service/history/engine/engineimpl/history_engine.go +++ b/service/history/engine/engineimpl/history_engine.go @@ -223,6 +223,7 @@ func NewEngineWithShardContext( shard.GetLogger(), replicationReader, replicationTaskStore, + shard.GetTimeSource(), ), replicationTaskStore: replicationTaskStore, replicationMetricsEmitter: replication.NewMetricsEmitter( diff --git a/service/history/replication/task_ack_manager.go b/service/history/replication/task_ack_manager.go index 5dd1e82cb1a..e765ffe3383 100644 --- a/service/history/replication/task_ack_manager.go +++ b/service/history/replication/task_ack_manager.go @@ -24,10 +24,12 @@ package replication import ( "context" + "errors" "strconv" "time" "github.com/uber/cadence/common" + "github.com/uber/cadence/common/clock" "github.com/uber/cadence/common/log" "github.com/uber/cadence/common/log/tag" "github.com/uber/cadence/common/metrics" @@ -45,6 +47,8 @@ type ( reader taskReader store *TaskStore + + timeSource clock.TimeSource } ackLevelStore interface { @@ -66,6 +70,7 @@ func NewTaskAckManager( logger log.Logger, reader taskReader, store *TaskStore, + timeSource clock.TimeSource, ) TaskAckManager { return TaskAckManager{ @@ -74,9 +79,10 @@ func NewTaskAckManager( metrics.ReplicatorQueueProcessorScope, metrics.InstanceTag(strconv.Itoa(shardID)), ), - logger: logger.WithTags(tag.ComponentReplicationAckManager), - reader: reader, - store: store, + logger: logger.WithTags(tag.ComponentReplicationAckManager), + reader: reader, + store: store, + timeSource: timeSource, } } @@ -92,22 +98,37 @@ func (t *TaskAckManager) GetTasks(ctx context.Context, pollingCluster string, la return nil, err } - var replicationTasks []*types.ReplicationTask - readLevel := lastReadTaskID -TaskInfoLoop: + // Happy path assumption - we will push all tasks to replication tasks. + replicationTasks := make([]*types.ReplicationTask, 0, len(tasks)) + + var ( + readLevel = lastReadTaskID + oldestUnprocessedTaskTimestamp = t.timeSource.Now().UnixNano() + oldestUnprocessedTaskID = t.ackLevels.GetTransferMaxReadLevel() + ) + + if len(tasks) > 0 { + // it does not matter if we can process task or not, but we need to know what was the oldest task information we have read. + // tasks must be ordered by taskID/time. + oldestUnprocessedTaskID = tasks[0].TaskID + oldestUnprocessedTaskTimestamp = tasks[0].CreationTime + } + for _, task := range tasks { replicationTask, err := t.store.Get(ctx, pollingCluster, *task) - - switch err.(type) { - case nil: - // No action - case *types.BadRequestError, *types.InternalDataInconsistencyError, *types.EntityNotExistsError: - t.logger.Warn("Failed to get replication task.", tag.Error(err)) - default: - t.logger.Error("Failed to get replication task. Return what we have so far.", tag.Error(err)) - hasMore = true - break TaskInfoLoop + if err != nil { + if errors.As(err, new(*types.BadRequestError)) || + errors.As(err, new(*types.InternalDataInconsistencyError)) || + errors.As(err, new(*types.EntityNotExistsError)) { + t.logger.Warn("Failed to get replication task.", tag.Error(err)) + } else { + t.logger.Error("Failed to get replication task. Return what we have so far.", tag.Error(err)) + hasMore = true + break + } } + + // We update readLevel only if we have found matching replication tasks on the passive side. readLevel = task.TaskID if replicationTask != nil { replicationTasks = append(replicationTasks, replicationTask) @@ -115,6 +136,9 @@ TaskInfoLoop: } taskGeneratedTimer.Stop() + t.scope.RecordTimer(metrics.ReplicationTasksLagRaw, time.Duration(t.ackLevels.GetTransferMaxReadLevel()-oldestUnprocessedTaskID)) + t.scope.RecordTimer(metrics.ReplicationTasksDelay, time.Duration(oldestUnprocessedTaskTimestamp-t.timeSource.Now().UnixNano())) + t.scope.RecordTimer(metrics.ReplicationTasksLag, time.Duration(t.ackLevels.GetTransferMaxReadLevel()-readLevel)) t.scope.RecordTimer(metrics.ReplicationTasksFetched, time.Duration(len(tasks))) t.scope.RecordTimer(metrics.ReplicationTasksReturned, time.Duration(len(replicationTasks))) diff --git a/service/history/replication/task_ack_manager_test.go b/service/history/replication/task_ack_manager_test.go index df8ec036897..e05cde18fe7 100644 --- a/service/history/replication/task_ack_manager_test.go +++ b/service/history/replication/task_ack_manager_test.go @@ -31,6 +31,7 @@ import ( "github.com/stretchr/testify/require" "github.com/uber/cadence/common/cache" + "github.com/uber/cadence/common/clock" "github.com/uber/cadence/common/log" "github.com/uber/cadence/common/metrics" "github.com/uber/cadence/common/persistence" @@ -142,7 +143,7 @@ func TestTaskAckManager_GetTasks(t *testing.T) { pollingCluster: testClusterB, lastReadLevel: 5, expectResult: &types.ReplicationMessages{ - ReplicationTasks: nil, + ReplicationTasks: []*types.ReplicationTask{}, LastRetrievedMessageID: 11, HasMore: false, }, @@ -189,6 +190,7 @@ func TestTaskAckManager_GetTasks(t *testing.T) { pollingCluster: testClusterA, lastReadLevel: 5, expectResult: &types.ReplicationMessages{ + ReplicationTasks: []*types.ReplicationTask{}, LastRetrievedMessageID: 5, HasMore: true, }, @@ -216,7 +218,7 @@ func TestTaskAckManager_GetTasks(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { taskStore := createTestTaskStore(t, tt.domains, tt.hydrator) - ackManager := NewTaskAckManager(testShardID, tt.ackLevels, metrics.NewNoopMetricsClient(), log.NewNoop(), tt.reader, taskStore) + ackManager := NewTaskAckManager(testShardID, tt.ackLevels, metrics.NewNoopMetricsClient(), log.NewNoop(), tt.reader, taskStore, clock.NewMockedTimeSource()) result, err := ackManager.GetTasks(context.Background(), tt.pollingCluster, tt.lastReadLevel) if tt.expectErr != "" {