Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[history] Adding more metrics for replication #6673

Merged
merged 3 commits into from
Feb 21, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions common/metrics/defs.go
Original file line number Diff line number Diff line change
Expand Up @@ -2526,6 +2526,8 @@ const (
ReplicationTasksApplied
ReplicationTasksFailed
ReplicationTasksLag
ReplicationTasksLagRaw
ReplicationTasksDelay
ReplicationTasksFetched
ReplicationTasksReturned
ReplicationTasksReturnedDiff
Expand Down Expand Up @@ -3228,6 +3230,8 @@ var MetricDefs = map[ServiceIdx]map[int]metricDefinition{
ReplicationTasksApplied: {metricName: "replication_tasks_applied", metricType: Counter},
ReplicationTasksFailed: {metricName: "replication_tasks_failed", metricType: Counter},
ReplicationTasksLag: {metricName: "replication_tasks_lag", metricType: Timer},
ReplicationTasksLagRaw: {metricName: "replication_tasks_lag_raw", metricType: Timer},
ReplicationTasksDelay: {metricName: "replication_tasks_delay", metricType: Timer},
ReplicationTasksFetched: {metricName: "replication_tasks_fetched", metricType: Timer},
ReplicationTasksReturned: {metricName: "replication_tasks_returned", metricType: Timer},
ReplicationTasksReturnedDiff: {metricName: "replication_tasks_returned_diff", metricType: Timer},
Expand Down
1 change: 1 addition & 0 deletions service/history/engine/engineimpl/history_engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,7 @@ func NewEngineWithShardContext(
shard.GetLogger(),
replicationReader,
replicationTaskStore,
shard.GetTimeSource(),
),
replicationTaskStore: replicationTaskStore,
replicationMetricsEmitter: replication.NewMetricsEmitter(
Expand Down
56 changes: 40 additions & 16 deletions service/history/replication/task_ack_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,12 @@ package replication

import (
"context"
"errors"
"strconv"
"time"

"github.com/uber/cadence/common"
"github.com/uber/cadence/common/clock"
"github.com/uber/cadence/common/log"
"github.com/uber/cadence/common/log/tag"
"github.com/uber/cadence/common/metrics"
Expand All @@ -45,6 +47,8 @@ type (

reader taskReader
store *TaskStore

timeSource clock.TimeSource
}

ackLevelStore interface {
Expand All @@ -66,6 +70,7 @@ func NewTaskAckManager(
logger log.Logger,
reader taskReader,
store *TaskStore,
timeSource clock.TimeSource,
) TaskAckManager {

return TaskAckManager{
Expand All @@ -74,9 +79,10 @@ func NewTaskAckManager(
metrics.ReplicatorQueueProcessorScope,
metrics.InstanceTag(strconv.Itoa(shardID)),
),
logger: logger.WithTags(tag.ComponentReplicationAckManager),
reader: reader,
store: store,
logger: logger.WithTags(tag.ComponentReplicationAckManager),
reader: reader,
store: store,
timeSource: timeSource,
}
}

Expand All @@ -92,29 +98,47 @@ func (t *TaskAckManager) GetTasks(ctx context.Context, pollingCluster string, la
return nil, err
}

var replicationTasks []*types.ReplicationTask
readLevel := lastReadTaskID
TaskInfoLoop:
// Happy path assumption - we will push all tasks to replication tasks.
replicationTasks := make([]*types.ReplicationTask, 0, len(tasks))

var (
readLevel = lastReadTaskID
oldestUnprocessedTaskTimestamp = t.timeSource.Now().UnixNano()
oldestUnprocessedTaskID = t.ackLevels.GetTransferMaxReadLevel()
)

if len(tasks) > 0 {
// it does not matter if we can process task or not, but we need to know what was the oldest task information we have read.
// tasks must be ordered by taskID/time.
oldestUnprocessedTaskID = tasks[0].TaskID
oldestUnprocessedTaskTimestamp = tasks[0].CreationTime
}

for _, task := range tasks {
replicationTask, err := t.store.Get(ctx, pollingCluster, *task)

switch err.(type) {
case nil:
// No action
case *types.BadRequestError, *types.InternalDataInconsistencyError, *types.EntityNotExistsError:
t.logger.Warn("Failed to get replication task.", tag.Error(err))
default:
t.logger.Error("Failed to get replication task. Return what we have so far.", tag.Error(err))
hasMore = true
break TaskInfoLoop
if err != nil {
if errors.As(err, new(*types.BadRequestError)) ||
errors.As(err, new(*types.InternalDataInconsistencyError)) ||
errors.As(err, new(*types.EntityNotExistsError)) {
t.logger.Warn("Failed to get replication task.", tag.Error(err))
} else {
t.logger.Error("Failed to get replication task. Return what we have so far.", tag.Error(err))
hasMore = true
break
}
}

// We update readLevel only if we have found matching replication tasks on the passive side.
readLevel = task.TaskID
if replicationTask != nil {
replicationTasks = append(replicationTasks, replicationTask)
}
}
taskGeneratedTimer.Stop()

t.scope.RecordTimer(metrics.ReplicationTasksLagRaw, time.Duration(t.ackLevels.GetTransferMaxReadLevel()-oldestUnprocessedTaskID))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I might be missing how this will help with "If passive cluster cannot fetch tasks, does not report replication lag". oldestUnprocessedTaskID is almost identical to readLevel except when the loop breaks. So it can be off by 1?
I'd assume we would want to report the lag when t.reader.Read() returns err based on lastReadTaskID. That part is untouched.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. However, the issue was:
We never update readLevel if we fail in t.store.Get (f.e. due to hitting MaxQPS or Cass failures). So, we never reported any lag since the value was never updated.
I'll try it out and add a a check to the test case to ensure what we get from the metric.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've updated the logic, so we always pick the oldest task from the batch and report no delay if there are no tasks to replicate.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we also report the metrics before returning at line 98?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we fail to read history from the active side, we don't know the delay (as far as I understand), and we should be notified when we fail to pull the history.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. I thought we could still report lag based on lastReadTaskID

t.scope.RecordTimer(metrics.ReplicationTasksDelay, time.Duration(oldestUnprocessedTaskTimestamp-t.timeSource.Now().UnixNano()))

t.scope.RecordTimer(metrics.ReplicationTasksLag, time.Duration(t.ackLevels.GetTransferMaxReadLevel()-readLevel))
t.scope.RecordTimer(metrics.ReplicationTasksFetched, time.Duration(len(tasks)))
t.scope.RecordTimer(metrics.ReplicationTasksReturned, time.Duration(len(replicationTasks)))
Expand Down
6 changes: 4 additions & 2 deletions service/history/replication/task_ack_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/stretchr/testify/require"

"github.com/uber/cadence/common/cache"
"github.com/uber/cadence/common/clock"
"github.com/uber/cadence/common/log"
"github.com/uber/cadence/common/metrics"
"github.com/uber/cadence/common/persistence"
Expand Down Expand Up @@ -142,7 +143,7 @@ func TestTaskAckManager_GetTasks(t *testing.T) {
pollingCluster: testClusterB,
lastReadLevel: 5,
expectResult: &types.ReplicationMessages{
ReplicationTasks: nil,
ReplicationTasks: []*types.ReplicationTask{},
LastRetrievedMessageID: 11,
HasMore: false,
},
Expand Down Expand Up @@ -189,6 +190,7 @@ func TestTaskAckManager_GetTasks(t *testing.T) {
pollingCluster: testClusterA,
lastReadLevel: 5,
expectResult: &types.ReplicationMessages{
ReplicationTasks: []*types.ReplicationTask{},
LastRetrievedMessageID: 5,
HasMore: true,
},
Expand Down Expand Up @@ -216,7 +218,7 @@ func TestTaskAckManager_GetTasks(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
taskStore := createTestTaskStore(t, tt.domains, tt.hydrator)
ackManager := NewTaskAckManager(testShardID, tt.ackLevels, metrics.NewNoopMetricsClient(), log.NewNoop(), tt.reader, taskStore)
ackManager := NewTaskAckManager(testShardID, tt.ackLevels, metrics.NewNoopMetricsClient(), log.NewNoop(), tt.reader, taskStore, clock.NewMockedTimeSource())
result, err := ackManager.GetTasks(context.Background(), tt.pollingCluster, tt.lastReadLevel)

if tt.expectErr != "" {
Expand Down
Loading