From b19b80a4401133bc0e94b886c8556ab617660136 Mon Sep 17 00:00:00 2001 From: Stanislav Bychkov Date: Sun, 23 Feb 2025 14:16:52 +0200 Subject: [PATCH] conflict and test fixes --- common/persistence/execution_manager_test.go | 53 +++++++++++++------ .../nosql/nosql_execution_store.go | 5 +- common/persistence/nosql/nosql_queue_store.go | 7 ++- .../nosql/nosql_queue_store_test.go | 16 +++--- common/persistence/nosql/nosql_task_store.go | 22 ++++---- .../cassandra/testdata/workflow_execution.go | 3 +- .../nosql/nosqlplugin/cassandra/workflow.go | 6 +-- common/persistence/nosql/nosqlplugin/types.go | 3 +- common/persistence/shard_manager_test.go | 15 +++++- 9 files changed, 82 insertions(+), 48 deletions(-) diff --git a/common/persistence/execution_manager_test.go b/common/persistence/execution_manager_test.go index ededa46bc7a..0b6e6197b13 100644 --- a/common/persistence/execution_manager_test.go +++ b/common/persistence/execution_manager_test.go @@ -108,12 +108,6 @@ func TestExecutionManager_ProxyStoreMethods(t *testing.T) { mockedStore.EXPECT().RangeDeleteReplicationTaskFromDLQ(gomock.Any(), gomock.Any()).Return(nil, nil) }, }, - { - method: "CreateFailoverMarkerTasks", - prepareMocks: func(mockedStore *MockExecutionStore) { - mockedStore.EXPECT().CreateFailoverMarkerTasks(gomock.Any(), gomock.Any()).Return(nil) - }, - }, { method: "GetTimerIndexTasks", prepareMocks: func(mockedStore *MockExecutionStore) { @@ -642,7 +636,7 @@ func TestPutReplicationTaskToDLQ(t *testing.T) { mockedStore := NewMockExecutionStore(ctrl) manager := NewExecutionManagerImpl(mockedStore, testlogger.New(t), nil) - now := time.Now().UTC().Round(time.Second) + now := time.Now().UTC() task := &PutReplicationTaskToDLQRequest{ SourceClusterName: "test-cluster", @@ -655,14 +649,12 @@ func TestPutReplicationTaskToDLQ(t *testing.T) { } mockedStore.EXPECT().PutReplicationTaskToDLQ(gomock.Any(), gomock.Any()).DoAndReturn(func(ctx context.Context, req *InternalPutReplicationTaskToDLQRequest) error { - assert.Equal(t, &InternalPutReplicationTaskToDLQRequest{ - SourceClusterName: "test-cluster", - TaskInfo: &InternalReplicationTaskInfo{ - DomainID: testDomainID, - WorkflowID: testWorkflowID, - CreationTime: now, - }, - }, req) + assert.Equal(t, "test-cluster", req.SourceClusterName) + assert.Equal(t, testDomainID, req.TaskInfo.DomainID) + assert.Equal(t, testWorkflowID, req.TaskInfo.WorkflowID) + assert.Equal(t, now, req.TaskInfo.CreationTime) + + assert.WithinDuration(t, now, req.TaskInfo.CurrentTimeStamp, time.Second) return nil }) @@ -1262,6 +1254,37 @@ func TestConflictResolveWorkflowExecution(t *testing.T) { } } +func TestCreateFailoverMarkerTasks(t *testing.T) { + ctrl := gomock.NewController(t) + mockedStore := NewMockExecutionStore(ctrl) + manager := NewExecutionManagerImpl(mockedStore, testlogger.New(t), nil) + + req := &CreateFailoverMarkersRequest{ + Markers: []*FailoverMarkerTask{{ + TaskData: TaskData{ + Version: 0, + TaskID: 0, + VisibilityTimestamp: time.Time{}, + }, + DomainID: "1", + }}, + RangeID: 1, + CurrentTimeStamp: time.Now(), + } + + mockedStore.EXPECT().CreateFailoverMarkerTasks(gomock.Any(), gomock.Any()). + DoAndReturn(func(ctx context.Context, req *CreateFailoverMarkersRequest) error { + assert.Equal(t, req.RangeID, req.RangeID) + assert.Equal(t, req.Markers, req.Markers) + + assert.WithinDuration(t, req.CurrentTimeStamp, req.CurrentTimeStamp, time.Second) + return nil + }) + + err := manager.CreateFailoverMarkerTasks(context.Background(), req) + assert.NoError(t, err) +} + func sampleInternalActivityInfo(name string) *InternalActivityInfo { return &InternalActivityInfo{ Version: 1, diff --git a/common/persistence/nosql/nosql_execution_store.go b/common/persistence/nosql/nosql_execution_store.go index 9d0aca9a1fb..aabe056e7ee 100644 --- a/common/persistence/nosql/nosql_execution_store.go +++ b/common/persistence/nosql/nosql_execution_store.go @@ -829,8 +829,6 @@ func (d *nosqlExecutionStore) CreateFailoverMarkerTasks( var nosqlTasks []*nosqlplugin.HistoryMigrationTask for _, task := range request.Markers { - //var nosqlTasks []*nosqlplugin.ReplicationTask - //for i, task := range request.Markers { ts := []persistence.Task{task} tasks, err := d.prepareReplicationTasksForWorkflowTxn(task.DomainID, rowTypeReplicationWorkflowID, rowTypeReplicationRunID, ts) @@ -838,13 +836,12 @@ func (d *nosqlExecutionStore) CreateFailoverMarkerTasks( return err } for _, task := range tasks { + task.CurrentTimeStamp = request.CurrentTimeStamp nosqlTasks = append(nosqlTasks, &nosqlplugin.HistoryMigrationTask{ Replication: task, Task: nil, // TODO: encode replication task into datablob }) } - //tasks[i].CurrentTimeStamp = request.CurrentTimeStamp - //nosqlTasks = append(nosqlTasks, tasks...) } err := d.db.InsertReplicationTask(ctx, nosqlTasks, nosqlplugin.ShardCondition{ diff --git a/common/persistence/nosql/nosql_queue_store.go b/common/persistence/nosql/nosql_queue_store.go index 0cd47babe4b..cee46dd1a24 100644 --- a/common/persistence/nosql/nosql_queue_store.go +++ b/common/persistence/nosql/nosql_queue_store.go @@ -25,6 +25,7 @@ import ( "fmt" "time" + "github.com/uber/cadence/common/clock" "github.com/uber/cadence/common/config" "github.com/uber/cadence/common/log" "github.com/uber/cadence/common/persistence" @@ -39,6 +40,7 @@ const ( type nosqlQueueStore struct { queueType persistence.QueueType nosqlStore + timeSrc clock.TimeSource } func newNoSQLQueueStore( @@ -54,10 +56,11 @@ func newNoSQLQueueStore( queue := &nosqlQueueStore{ nosqlStore: shardedStore.GetDefaultShard(), queueType: queueType, + // TODO: move the time generation to the persistence manager layer + timeSrc: clock.NewRealTimeSource(), } - currentTimestamp := time.Now() - if err := queue.createQueueMetadataEntryIfNotExist(currentTimestamp); err != nil { + if err := queue.createQueueMetadataEntryIfNotExist(queue.timeSrc.Now()); err != nil { return nil, fmt.Errorf("failed to check and create queue metadata entry: %v", err) } diff --git a/common/persistence/nosql/nosql_queue_store_test.go b/common/persistence/nosql/nosql_queue_store_test.go index cdcd4c74ec5..eaf97ec6724 100644 --- a/common/persistence/nosql/nosql_queue_store_test.go +++ b/common/persistence/nosql/nosql_queue_store_test.go @@ -219,9 +219,10 @@ func TestEnqueueMessage_Succeeds(t *testing.T) { assert.Equal( t, &nosqlplugin.QueueMessageRow{ - QueueType: testQueueType, - ID: lastMessageID + 20 + 1, // should be the max of cluster AckLevels + 1 - Payload: testPayload, + QueueType: testQueueType, + ID: lastMessageID + 20 + 1, // should be the max of cluster AckLevels + 1 + Payload: testPayload, + CurrentTimeStamp: FixedTime, }, row, ) @@ -286,9 +287,10 @@ func TestEnqueueMessageToDLQ_Succeeds(t *testing.T) { assert.Equal( t, &nosqlplugin.QueueMessageRow{ - QueueType: dlqMessageType, - ID: lastMessageID + 1, - Payload: testPayload, + QueueType: dlqMessageType, + ID: lastMessageID + 1, + Payload: testPayload, + CurrentTimeStamp: FixedTime, }, row, ) @@ -545,7 +547,7 @@ func TestUpdateAckLevel_Succeeds(t *testing.T) { assert.Equal(t, expectedClusterAckLevels, newMeta.ClusterAckLevels) }).Return(nil) - assert.NoError(t, store.UpdateDLQAckLevel(ctx, messageID, clusterName, FixedTime)) + assert.NoError(t, store.UpdateAckLevel(ctx, messageID, clusterName, FixedTime)) } func TestUpdateAckLevel_FailsIfSelectMetadataFails(t *testing.T) { diff --git a/common/persistence/nosql/nosql_task_store.go b/common/persistence/nosql/nosql_task_store.go index 3624c95c92c..74e8ca3206e 100644 --- a/common/persistence/nosql/nosql_task_store.go +++ b/common/persistence/nosql/nosql_task_store.go @@ -98,7 +98,7 @@ func (t *nosqlTaskStore) LeaseTaskList( Message: "LeaseTaskList requires non empty task list", } } - now := request.CurrentTimeStamp + currentTimeStamp := request.CurrentTimeStamp var err, selectErr error var currTL *nosqlplugin.TaskListRow storeShard, err := t.GetStoreShardByTaskList(request.DomainID, request.TaskList, request.TaskType) @@ -121,8 +121,8 @@ func (t *nosqlTaskStore) LeaseTaskList( RangeID: initialRangeID, TaskListKind: request.TaskListKind, AckLevel: initialAckLevel, - LastUpdatedTime: now, - CurrentTimeStamp: now, + LastUpdatedTime: currentTimeStamp, + CurrentTimeStamp: currentTimeStamp, } err = storeShard.db.InsertTaskList(ctx, currTL) } else { @@ -149,8 +149,8 @@ func (t *nosqlTaskStore) LeaseTaskList( RangeID: currTL.RangeID, TaskListKind: currTL.TaskListKind, AckLevel: currTL.AckLevel, - LastUpdatedTime: now, - CurrentTimeStamp: now, + LastUpdatedTime: currentTimeStamp, + CurrentTimeStamp: currentTimeStamp, AdaptivePartitionConfig: currTL.AdaptivePartitionConfig, }, currTL.RangeID-1) } @@ -171,7 +171,7 @@ func (t *nosqlTaskStore) LeaseTaskList( RangeID: currTL.RangeID, AckLevel: currTL.AckLevel, Kind: request.TaskListKind, - LastUpdated: now, + LastUpdated: currentTimeStamp, AdaptivePartitionConfig: currTL.AdaptivePartitionConfig, } return &persistence.LeaseTaskListResponse{TaskListInfo: tli}, nil @@ -290,7 +290,7 @@ func (t *nosqlTaskStore) CreateTasks( ctx context.Context, request *persistence.CreateTasksRequest, ) (*persistence.CreateTasksResponse, error) { - now := request.CurrentTimeStamp + currentTimeStamp := request.CurrentTimeStamp var tasks []*nosqlplugin.TaskRowForInsert for _, taskRequest := range request.Tasks { task := &nosqlplugin.TaskRow{ @@ -301,7 +301,7 @@ func (t *nosqlTaskStore) CreateTasks( WorkflowID: taskRequest.Data.WorkflowID, RunID: taskRequest.Data.RunID, ScheduledID: taskRequest.Data.ScheduleID, - CreatedTime: now, + CreatedTime: currentTimeStamp, PartitionConfig: taskRequest.Data.PartitionConfig, } @@ -309,7 +309,7 @@ func (t *nosqlTaskStore) CreateTasks( // If the Data has a non-zero Expiry value, means that the ask is being re-added to the tasks table. // If that's the case, use the Expiry value to calculate the new TTL value to match history's timeout value. if !taskRequest.Data.Expiry.IsZero() { - scheduleToStartTimeoutSeconds := int(taskRequest.Data.Expiry.Sub(now).Seconds()) + scheduleToStartTimeoutSeconds := int(taskRequest.Data.Expiry.Sub(currentTimeStamp).Seconds()) if scheduleToStartTimeoutSeconds > 0 { ttl = scheduleToStartTimeoutSeconds @@ -333,8 +333,8 @@ func (t *nosqlTaskStore) CreateTasks( TaskListName: request.TaskListInfo.Name, TaskListType: request.TaskListInfo.TaskType, RangeID: request.TaskListInfo.RangeID, - LastUpdatedTime: now, - CurrentTimeStamp: now, + LastUpdatedTime: currentTimeStamp, + CurrentTimeStamp: currentTimeStamp, } tli := request.TaskListInfo diff --git a/common/persistence/nosql/nosqlplugin/cassandra/testdata/workflow_execution.go b/common/persistence/nosql/nosqlplugin/cassandra/testdata/workflow_execution.go index a95200170f7..67a10cfd187 100644 --- a/common/persistence/nosql/nosqlplugin/cassandra/testdata/workflow_execution.go +++ b/common/persistence/nosql/nosqlplugin/cassandra/testdata/workflow_execution.go @@ -70,8 +70,7 @@ func WFExecRequest(opts ...WFExecRequestOption) *nosqlplugin.WorkflowExecutionRe Value: []byte("test-checksum"), }, PreviousNextEventIDCondition: common.Int64Ptr(123), - CreatedTime: ts, - UpdatedTime: ts, + CurrentTimeStamp: ts, } for _, opt := range opts { diff --git a/common/persistence/nosql/nosqlplugin/cassandra/workflow.go b/common/persistence/nosql/nosqlplugin/cassandra/workflow.go index eb8f830b570..419ee98d75d 100644 --- a/common/persistence/nosql/nosqlplugin/cassandra/workflow.go +++ b/common/persistence/nosql/nosqlplugin/cassandra/workflow.go @@ -47,7 +47,7 @@ func (db *cdb) InsertWorkflowExecutionWithTasks( shardID := shardCondition.ShardID domainID := execution.DomainID workflowID := execution.WorkflowID - timeStamp := execution.CreatedTime + timeStamp := execution.CurrentTimeStamp batch := db.session.NewBatch(gocql.LoggedBatch).WithContext(ctx) @@ -122,7 +122,7 @@ func (db *cdb) UpdateWorkflowExecutionWithTasks( shardID := shardCondition.ShardID var domainID, workflowID string var previousNextEventIDCondition int64 - timeStamp := insertedExecution.UpdatedTime + timeStamp := insertedExecution.CurrentTimeStamp if mutatedExecution != nil { domainID = mutatedExecution.DomainID workflowID = mutatedExecution.WorkflowID @@ -721,7 +721,7 @@ func (db *cdb) InsertReplicationTask(ctx context.Context, tasks []*nosqlplugin.H shardID := shardCondition.ShardID batch := db.session.NewBatch(gocql.LoggedBatch).WithContext(ctx) - timeStamp := tasks[0].CurrentTimeStamp + timeStamp := tasks[0].Replication.CurrentTimeStamp for _, task := range tasks { createReplicationTasks(batch, shardID, task.Replication.DomainID, task.Replication.WorkflowID, []*nosqlplugin.HistoryMigrationTask{task}, timeStamp) } diff --git a/common/persistence/nosql/nosqlplugin/types.go b/common/persistence/nosql/nosqlplugin/types.go index ad56e720b9d..3fc516a3cbb 100644 --- a/common/persistence/nosql/nosqlplugin/types.go +++ b/common/persistence/nosql/nosqlplugin/types.go @@ -39,8 +39,7 @@ type ( VersionHistories *persistence.DataBlob Checksums *checksum.Checksum LastWriteVersion int64 - CreatedTime time.Time - UpdatedTime time.Time + CurrentTimeStamp time.Time // condition checking for updating execution info PreviousNextEventIDCondition *int64 diff --git a/common/persistence/shard_manager_test.go b/common/persistence/shard_manager_test.go index eafeb47a7fe..f986604daaa 100644 --- a/common/persistence/shard_manager_test.go +++ b/common/persistence/shard_manager_test.go @@ -97,7 +97,12 @@ func TestShardManagerCreateShard(t *testing.T) { t.Run(name, func(t *testing.T) { store := NewMockShardStore(ctrl) if test.internalRequest != nil { - store.EXPECT().CreateShard(gomock.Any(), gomock.Eq(test.internalRequest)).Return(test.internalResponse) + store.EXPECT().CreateShard(gomock.Any(), gomock.Any()). + Do(func(ctx context.Context, req *InternalCreateShardRequest) { + assert.Equal(t, test.internalRequest.ShardInfo, req.ShardInfo) + + assert.WithinDuration(t, time.Now(), req.CurrentTimeStamp, time.Second) + }).Return(test.internalResponse) } manager := NewShardManager(store, WithSerializer(test.serializer)) @@ -237,7 +242,13 @@ func TestShardManagerUpdateShard(t *testing.T) { t.Run(name, func(t *testing.T) { store := NewMockShardStore(ctrl) if test.internalRequest != nil { - store.EXPECT().UpdateShard(gomock.Any(), gomock.Eq(test.internalRequest)).Return(test.internalResponse) + store.EXPECT().UpdateShard(gomock.Any(), gomock.Any()). + Do(func(ctx context.Context, req *InternalUpdateShardRequest) { + assert.Equal(t, test.internalRequest.PreviousRangeID, req.PreviousRangeID) + assert.Equal(t, test.internalRequest.ShardInfo, req.ShardInfo) + + assert.WithinDuration(t, time.Now(), req.CurrentTimeStamp, time.Second) + }).Return(test.internalResponse) } manager := NewShardManager(store, WithSerializer(test.serializer))