Skip to content

Commit

Permalink
[history] Adding more metrics for replication (#6673)
Browse files Browse the repository at this point in the history
* [history] Adding more metrics for replication
  • Loading branch information
3vilhamster authored Feb 21, 2025
1 parent e8da0e9 commit eeba22d
Show file tree
Hide file tree
Showing 4 changed files with 49 additions and 18 deletions.
4 changes: 4 additions & 0 deletions common/metrics/defs.go
Original file line number Diff line number Diff line change
Expand Up @@ -2526,6 +2526,8 @@ const (
ReplicationTasksApplied
ReplicationTasksFailed
ReplicationTasksLag
ReplicationTasksLagRaw
ReplicationTasksDelay
ReplicationTasksFetched
ReplicationTasksReturned
ReplicationTasksReturnedDiff
Expand Down Expand Up @@ -3228,6 +3230,8 @@ var MetricDefs = map[ServiceIdx]map[int]metricDefinition{
ReplicationTasksApplied: {metricName: "replication_tasks_applied", metricType: Counter},
ReplicationTasksFailed: {metricName: "replication_tasks_failed", metricType: Counter},
ReplicationTasksLag: {metricName: "replication_tasks_lag", metricType: Timer},
ReplicationTasksLagRaw: {metricName: "replication_tasks_lag_raw", metricType: Timer},
ReplicationTasksDelay: {metricName: "replication_tasks_delay", metricType: Timer},
ReplicationTasksFetched: {metricName: "replication_tasks_fetched", metricType: Timer},
ReplicationTasksReturned: {metricName: "replication_tasks_returned", metricType: Timer},
ReplicationTasksReturnedDiff: {metricName: "replication_tasks_returned_diff", metricType: Timer},
Expand Down
1 change: 1 addition & 0 deletions service/history/engine/engineimpl/history_engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,7 @@ func NewEngineWithShardContext(
shard.GetLogger(),
replicationReader,
replicationTaskStore,
shard.GetTimeSource(),
),
replicationTaskStore: replicationTaskStore,
replicationMetricsEmitter: replication.NewMetricsEmitter(
Expand Down
56 changes: 40 additions & 16 deletions service/history/replication/task_ack_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,12 @@ package replication

import (
"context"
"errors"
"strconv"
"time"

"github.com/uber/cadence/common"
"github.com/uber/cadence/common/clock"
"github.com/uber/cadence/common/log"
"github.com/uber/cadence/common/log/tag"
"github.com/uber/cadence/common/metrics"
Expand All @@ -45,6 +47,8 @@ type (

reader taskReader
store *TaskStore

timeSource clock.TimeSource
}

ackLevelStore interface {
Expand All @@ -66,6 +70,7 @@ func NewTaskAckManager(
logger log.Logger,
reader taskReader,
store *TaskStore,
timeSource clock.TimeSource,
) TaskAckManager {

return TaskAckManager{
Expand All @@ -74,9 +79,10 @@ func NewTaskAckManager(
metrics.ReplicatorQueueProcessorScope,
metrics.InstanceTag(strconv.Itoa(shardID)),
),
logger: logger.WithTags(tag.ComponentReplicationAckManager),
reader: reader,
store: store,
logger: logger.WithTags(tag.ComponentReplicationAckManager),
reader: reader,
store: store,
timeSource: timeSource,
}
}

Expand All @@ -92,29 +98,47 @@ func (t *TaskAckManager) GetTasks(ctx context.Context, pollingCluster string, la
return nil, err
}

var replicationTasks []*types.ReplicationTask
readLevel := lastReadTaskID
TaskInfoLoop:
// Happy path assumption - we will push all tasks to replication tasks.
replicationTasks := make([]*types.ReplicationTask, 0, len(tasks))

var (
readLevel = lastReadTaskID
oldestUnprocessedTaskTimestamp = t.timeSource.Now().UnixNano()
oldestUnprocessedTaskID = t.ackLevels.GetTransferMaxReadLevel()
)

if len(tasks) > 0 {
// it does not matter if we can process task or not, but we need to know what was the oldest task information we have read.
// tasks must be ordered by taskID/time.
oldestUnprocessedTaskID = tasks[0].TaskID
oldestUnprocessedTaskTimestamp = tasks[0].CreationTime
}

for _, task := range tasks {
replicationTask, err := t.store.Get(ctx, pollingCluster, *task)

switch err.(type) {
case nil:
// No action
case *types.BadRequestError, *types.InternalDataInconsistencyError, *types.EntityNotExistsError:
t.logger.Warn("Failed to get replication task.", tag.Error(err))
default:
t.logger.Error("Failed to get replication task. Return what we have so far.", tag.Error(err))
hasMore = true
break TaskInfoLoop
if err != nil {
if errors.As(err, new(*types.BadRequestError)) ||
errors.As(err, new(*types.InternalDataInconsistencyError)) ||
errors.As(err, new(*types.EntityNotExistsError)) {
t.logger.Warn("Failed to get replication task.", tag.Error(err))
} else {
t.logger.Error("Failed to get replication task. Return what we have so far.", tag.Error(err))
hasMore = true
break
}
}

// We update readLevel only if we have found matching replication tasks on the passive side.
readLevel = task.TaskID
if replicationTask != nil {
replicationTasks = append(replicationTasks, replicationTask)
}
}
taskGeneratedTimer.Stop()

t.scope.RecordTimer(metrics.ReplicationTasksLagRaw, time.Duration(t.ackLevels.GetTransferMaxReadLevel()-oldestUnprocessedTaskID))
t.scope.RecordTimer(metrics.ReplicationTasksDelay, time.Duration(oldestUnprocessedTaskTimestamp-t.timeSource.Now().UnixNano()))

t.scope.RecordTimer(metrics.ReplicationTasksLag, time.Duration(t.ackLevels.GetTransferMaxReadLevel()-readLevel))
t.scope.RecordTimer(metrics.ReplicationTasksFetched, time.Duration(len(tasks)))
t.scope.RecordTimer(metrics.ReplicationTasksReturned, time.Duration(len(replicationTasks)))
Expand Down
6 changes: 4 additions & 2 deletions service/history/replication/task_ack_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/stretchr/testify/require"

"github.com/uber/cadence/common/cache"
"github.com/uber/cadence/common/clock"
"github.com/uber/cadence/common/log"
"github.com/uber/cadence/common/metrics"
"github.com/uber/cadence/common/persistence"
Expand Down Expand Up @@ -142,7 +143,7 @@ func TestTaskAckManager_GetTasks(t *testing.T) {
pollingCluster: testClusterB,
lastReadLevel: 5,
expectResult: &types.ReplicationMessages{
ReplicationTasks: nil,
ReplicationTasks: []*types.ReplicationTask{},
LastRetrievedMessageID: 11,
HasMore: false,
},
Expand Down Expand Up @@ -189,6 +190,7 @@ func TestTaskAckManager_GetTasks(t *testing.T) {
pollingCluster: testClusterA,
lastReadLevel: 5,
expectResult: &types.ReplicationMessages{
ReplicationTasks: []*types.ReplicationTask{},
LastRetrievedMessageID: 5,
HasMore: true,
},
Expand Down Expand Up @@ -216,7 +218,7 @@ func TestTaskAckManager_GetTasks(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
taskStore := createTestTaskStore(t, tt.domains, tt.hydrator)
ackManager := NewTaskAckManager(testShardID, tt.ackLevels, metrics.NewNoopMetricsClient(), log.NewNoop(), tt.reader, taskStore)
ackManager := NewTaskAckManager(testShardID, tt.ackLevels, metrics.NewNoopMetricsClient(), log.NewNoop(), tt.reader, taskStore, clock.NewMockedTimeSource())
result, err := ackManager.GetTasks(context.Background(), tt.pollingCluster, tt.lastReadLevel)

if tt.expectErr != "" {
Expand Down

0 comments on commit eeba22d

Please sign in to comment.