Skip to content

Commit

Permalink
conflict and test fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
ribaraka committed Feb 24, 2025
1 parent 274e058 commit b19b80a
Show file tree
Hide file tree
Showing 9 changed files with 82 additions and 48 deletions.
53 changes: 38 additions & 15 deletions common/persistence/execution_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,12 +108,6 @@ func TestExecutionManager_ProxyStoreMethods(t *testing.T) {
mockedStore.EXPECT().RangeDeleteReplicationTaskFromDLQ(gomock.Any(), gomock.Any()).Return(nil, nil)
},
},
{
method: "CreateFailoverMarkerTasks",
prepareMocks: func(mockedStore *MockExecutionStore) {
mockedStore.EXPECT().CreateFailoverMarkerTasks(gomock.Any(), gomock.Any()).Return(nil)
},
},
{
method: "GetTimerIndexTasks",
prepareMocks: func(mockedStore *MockExecutionStore) {
Expand Down Expand Up @@ -642,7 +636,7 @@ func TestPutReplicationTaskToDLQ(t *testing.T) {
mockedStore := NewMockExecutionStore(ctrl)
manager := NewExecutionManagerImpl(mockedStore, testlogger.New(t), nil)

now := time.Now().UTC().Round(time.Second)
now := time.Now().UTC()

task := &PutReplicationTaskToDLQRequest{
SourceClusterName: "test-cluster",
Expand All @@ -655,14 +649,12 @@ func TestPutReplicationTaskToDLQ(t *testing.T) {
}

mockedStore.EXPECT().PutReplicationTaskToDLQ(gomock.Any(), gomock.Any()).DoAndReturn(func(ctx context.Context, req *InternalPutReplicationTaskToDLQRequest) error {
assert.Equal(t, &InternalPutReplicationTaskToDLQRequest{
SourceClusterName: "test-cluster",
TaskInfo: &InternalReplicationTaskInfo{
DomainID: testDomainID,
WorkflowID: testWorkflowID,
CreationTime: now,
},
}, req)
assert.Equal(t, "test-cluster", req.SourceClusterName)
assert.Equal(t, testDomainID, req.TaskInfo.DomainID)
assert.Equal(t, testWorkflowID, req.TaskInfo.WorkflowID)
assert.Equal(t, now, req.TaskInfo.CreationTime)

assert.WithinDuration(t, now, req.TaskInfo.CurrentTimeStamp, time.Second)
return nil
})

Expand Down Expand Up @@ -1262,6 +1254,37 @@ func TestConflictResolveWorkflowExecution(t *testing.T) {
}
}

func TestCreateFailoverMarkerTasks(t *testing.T) {
ctrl := gomock.NewController(t)
mockedStore := NewMockExecutionStore(ctrl)
manager := NewExecutionManagerImpl(mockedStore, testlogger.New(t), nil)

req := &CreateFailoverMarkersRequest{
Markers: []*FailoverMarkerTask{{
TaskData: TaskData{
Version: 0,
TaskID: 0,
VisibilityTimestamp: time.Time{},
},
DomainID: "1",
}},
RangeID: 1,
CurrentTimeStamp: time.Now(),
}

mockedStore.EXPECT().CreateFailoverMarkerTasks(gomock.Any(), gomock.Any()).
DoAndReturn(func(ctx context.Context, req *CreateFailoverMarkersRequest) error {
assert.Equal(t, req.RangeID, req.RangeID)
assert.Equal(t, req.Markers, req.Markers)

assert.WithinDuration(t, req.CurrentTimeStamp, req.CurrentTimeStamp, time.Second)
return nil
})

err := manager.CreateFailoverMarkerTasks(context.Background(), req)
assert.NoError(t, err)
}

func sampleInternalActivityInfo(name string) *InternalActivityInfo {
return &InternalActivityInfo{
Version: 1,
Expand Down
5 changes: 1 addition & 4 deletions common/persistence/nosql/nosql_execution_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -829,22 +829,19 @@ func (d *nosqlExecutionStore) CreateFailoverMarkerTasks(

var nosqlTasks []*nosqlplugin.HistoryMigrationTask
for _, task := range request.Markers {
//var nosqlTasks []*nosqlplugin.ReplicationTask
//for i, task := range request.Markers {
ts := []persistence.Task{task}

tasks, err := d.prepareReplicationTasksForWorkflowTxn(task.DomainID, rowTypeReplicationWorkflowID, rowTypeReplicationRunID, ts)
if err != nil {
return err
}
for _, task := range tasks {
task.CurrentTimeStamp = request.CurrentTimeStamp
nosqlTasks = append(nosqlTasks, &nosqlplugin.HistoryMigrationTask{
Replication: task,
Task: nil, // TODO: encode replication task into datablob
})
}
//tasks[i].CurrentTimeStamp = request.CurrentTimeStamp
//nosqlTasks = append(nosqlTasks, tasks...)
}

err := d.db.InsertReplicationTask(ctx, nosqlTasks, nosqlplugin.ShardCondition{
Expand Down
7 changes: 5 additions & 2 deletions common/persistence/nosql/nosql_queue_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"fmt"
"time"

"github.com/uber/cadence/common/clock"
"github.com/uber/cadence/common/config"
"github.com/uber/cadence/common/log"
"github.com/uber/cadence/common/persistence"
Expand All @@ -39,6 +40,7 @@ const (
type nosqlQueueStore struct {
queueType persistence.QueueType
nosqlStore
timeSrc clock.TimeSource
}

func newNoSQLQueueStore(
Expand All @@ -54,10 +56,11 @@ func newNoSQLQueueStore(
queue := &nosqlQueueStore{
nosqlStore: shardedStore.GetDefaultShard(),
queueType: queueType,
// TODO: move the time generation to the persistence manager layer
timeSrc: clock.NewRealTimeSource(),
}

currentTimestamp := time.Now()
if err := queue.createQueueMetadataEntryIfNotExist(currentTimestamp); err != nil {
if err := queue.createQueueMetadataEntryIfNotExist(queue.timeSrc.Now()); err != nil {
return nil, fmt.Errorf("failed to check and create queue metadata entry: %v", err)
}

Expand Down
16 changes: 9 additions & 7 deletions common/persistence/nosql/nosql_queue_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,9 +219,10 @@ func TestEnqueueMessage_Succeeds(t *testing.T) {
assert.Equal(
t,
&nosqlplugin.QueueMessageRow{
QueueType: testQueueType,
ID: lastMessageID + 20 + 1, // should be the max of cluster AckLevels + 1
Payload: testPayload,
QueueType: testQueueType,
ID: lastMessageID + 20 + 1, // should be the max of cluster AckLevels + 1
Payload: testPayload,
CurrentTimeStamp: FixedTime,
},
row,
)
Expand Down Expand Up @@ -286,9 +287,10 @@ func TestEnqueueMessageToDLQ_Succeeds(t *testing.T) {
assert.Equal(
t,
&nosqlplugin.QueueMessageRow{
QueueType: dlqMessageType,
ID: lastMessageID + 1,
Payload: testPayload,
QueueType: dlqMessageType,
ID: lastMessageID + 1,
Payload: testPayload,
CurrentTimeStamp: FixedTime,
},
row,
)
Expand Down Expand Up @@ -545,7 +547,7 @@ func TestUpdateAckLevel_Succeeds(t *testing.T) {
assert.Equal(t, expectedClusterAckLevels, newMeta.ClusterAckLevels)
}).Return(nil)

assert.NoError(t, store.UpdateDLQAckLevel(ctx, messageID, clusterName, FixedTime))
assert.NoError(t, store.UpdateAckLevel(ctx, messageID, clusterName, FixedTime))
}

func TestUpdateAckLevel_FailsIfSelectMetadataFails(t *testing.T) {
Expand Down
22 changes: 11 additions & 11 deletions common/persistence/nosql/nosql_task_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ func (t *nosqlTaskStore) LeaseTaskList(
Message: "LeaseTaskList requires non empty task list",
}
}
now := request.CurrentTimeStamp
currentTimeStamp := request.CurrentTimeStamp
var err, selectErr error
var currTL *nosqlplugin.TaskListRow
storeShard, err := t.GetStoreShardByTaskList(request.DomainID, request.TaskList, request.TaskType)
Expand All @@ -121,8 +121,8 @@ func (t *nosqlTaskStore) LeaseTaskList(
RangeID: initialRangeID,
TaskListKind: request.TaskListKind,
AckLevel: initialAckLevel,
LastUpdatedTime: now,
CurrentTimeStamp: now,
LastUpdatedTime: currentTimeStamp,
CurrentTimeStamp: currentTimeStamp,
}
err = storeShard.db.InsertTaskList(ctx, currTL)
} else {
Expand All @@ -149,8 +149,8 @@ func (t *nosqlTaskStore) LeaseTaskList(
RangeID: currTL.RangeID,
TaskListKind: currTL.TaskListKind,
AckLevel: currTL.AckLevel,
LastUpdatedTime: now,
CurrentTimeStamp: now,
LastUpdatedTime: currentTimeStamp,
CurrentTimeStamp: currentTimeStamp,
AdaptivePartitionConfig: currTL.AdaptivePartitionConfig,
}, currTL.RangeID-1)
}
Expand All @@ -171,7 +171,7 @@ func (t *nosqlTaskStore) LeaseTaskList(
RangeID: currTL.RangeID,
AckLevel: currTL.AckLevel,
Kind: request.TaskListKind,
LastUpdated: now,
LastUpdated: currentTimeStamp,
AdaptivePartitionConfig: currTL.AdaptivePartitionConfig,
}
return &persistence.LeaseTaskListResponse{TaskListInfo: tli}, nil
Expand Down Expand Up @@ -290,7 +290,7 @@ func (t *nosqlTaskStore) CreateTasks(
ctx context.Context,
request *persistence.CreateTasksRequest,
) (*persistence.CreateTasksResponse, error) {
now := request.CurrentTimeStamp
currentTimeStamp := request.CurrentTimeStamp
var tasks []*nosqlplugin.TaskRowForInsert
for _, taskRequest := range request.Tasks {
task := &nosqlplugin.TaskRow{
Expand All @@ -301,15 +301,15 @@ func (t *nosqlTaskStore) CreateTasks(
WorkflowID: taskRequest.Data.WorkflowID,
RunID: taskRequest.Data.RunID,
ScheduledID: taskRequest.Data.ScheduleID,
CreatedTime: now,
CreatedTime: currentTimeStamp,
PartitionConfig: taskRequest.Data.PartitionConfig,
}

var ttl int
// If the Data has a non-zero Expiry value, means that the ask is being re-added to the tasks table.
// If that's the case, use the Expiry value to calculate the new TTL value to match history's timeout value.
if !taskRequest.Data.Expiry.IsZero() {
scheduleToStartTimeoutSeconds := int(taskRequest.Data.Expiry.Sub(now).Seconds())
scheduleToStartTimeoutSeconds := int(taskRequest.Data.Expiry.Sub(currentTimeStamp).Seconds())

if scheduleToStartTimeoutSeconds > 0 {
ttl = scheduleToStartTimeoutSeconds
Expand All @@ -333,8 +333,8 @@ func (t *nosqlTaskStore) CreateTasks(
TaskListName: request.TaskListInfo.Name,
TaskListType: request.TaskListInfo.TaskType,
RangeID: request.TaskListInfo.RangeID,
LastUpdatedTime: now,
CurrentTimeStamp: now,
LastUpdatedTime: currentTimeStamp,
CurrentTimeStamp: currentTimeStamp,
}

tli := request.TaskListInfo
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,7 @@ func WFExecRequest(opts ...WFExecRequestOption) *nosqlplugin.WorkflowExecutionRe
Value: []byte("test-checksum"),
},
PreviousNextEventIDCondition: common.Int64Ptr(123),
CreatedTime: ts,
UpdatedTime: ts,
CurrentTimeStamp: ts,
}

for _, opt := range opts {
Expand Down
6 changes: 3 additions & 3 deletions common/persistence/nosql/nosqlplugin/cassandra/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func (db *cdb) InsertWorkflowExecutionWithTasks(
shardID := shardCondition.ShardID
domainID := execution.DomainID
workflowID := execution.WorkflowID
timeStamp := execution.CreatedTime
timeStamp := execution.CurrentTimeStamp

batch := db.session.NewBatch(gocql.LoggedBatch).WithContext(ctx)

Expand Down Expand Up @@ -122,7 +122,7 @@ func (db *cdb) UpdateWorkflowExecutionWithTasks(
shardID := shardCondition.ShardID
var domainID, workflowID string
var previousNextEventIDCondition int64
timeStamp := insertedExecution.UpdatedTime
timeStamp := insertedExecution.CurrentTimeStamp
if mutatedExecution != nil {
domainID = mutatedExecution.DomainID
workflowID = mutatedExecution.WorkflowID
Expand Down Expand Up @@ -721,7 +721,7 @@ func (db *cdb) InsertReplicationTask(ctx context.Context, tasks []*nosqlplugin.H

shardID := shardCondition.ShardID
batch := db.session.NewBatch(gocql.LoggedBatch).WithContext(ctx)
timeStamp := tasks[0].CurrentTimeStamp
timeStamp := tasks[0].Replication.CurrentTimeStamp
for _, task := range tasks {
createReplicationTasks(batch, shardID, task.Replication.DomainID, task.Replication.WorkflowID, []*nosqlplugin.HistoryMigrationTask{task}, timeStamp)
}
Expand Down
3 changes: 1 addition & 2 deletions common/persistence/nosql/nosqlplugin/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,7 @@ type (
VersionHistories *persistence.DataBlob
Checksums *checksum.Checksum
LastWriteVersion int64
CreatedTime time.Time
UpdatedTime time.Time
CurrentTimeStamp time.Time
// condition checking for updating execution info
PreviousNextEventIDCondition *int64

Expand Down
15 changes: 13 additions & 2 deletions common/persistence/shard_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,12 @@ func TestShardManagerCreateShard(t *testing.T) {
t.Run(name, func(t *testing.T) {
store := NewMockShardStore(ctrl)
if test.internalRequest != nil {
store.EXPECT().CreateShard(gomock.Any(), gomock.Eq(test.internalRequest)).Return(test.internalResponse)
store.EXPECT().CreateShard(gomock.Any(), gomock.Any()).
Do(func(ctx context.Context, req *InternalCreateShardRequest) {
assert.Equal(t, test.internalRequest.ShardInfo, req.ShardInfo)

assert.WithinDuration(t, time.Now(), req.CurrentTimeStamp, time.Second)
}).Return(test.internalResponse)
}

manager := NewShardManager(store, WithSerializer(test.serializer))
Expand Down Expand Up @@ -237,7 +242,13 @@ func TestShardManagerUpdateShard(t *testing.T) {
t.Run(name, func(t *testing.T) {
store := NewMockShardStore(ctrl)
if test.internalRequest != nil {
store.EXPECT().UpdateShard(gomock.Any(), gomock.Eq(test.internalRequest)).Return(test.internalResponse)
store.EXPECT().UpdateShard(gomock.Any(), gomock.Any()).
Do(func(ctx context.Context, req *InternalUpdateShardRequest) {
assert.Equal(t, test.internalRequest.PreviousRangeID, req.PreviousRangeID)
assert.Equal(t, test.internalRequest.ShardInfo, req.ShardInfo)

assert.WithinDuration(t, time.Now(), req.CurrentTimeStamp, time.Second)
}).Return(test.internalResponse)
}

manager := NewShardManager(store, WithSerializer(test.serializer))
Expand Down

0 comments on commit b19b80a

Please sign in to comment.