From e7f58fbdfe539a76c9ac178ed2d4c75dee289247 Mon Sep 17 00:00:00 2001 From: Stanislav Bychkov Date: Thu, 23 Jan 2025 19:57:47 +0200 Subject: [PATCH] Move time source from db layer to PersistenceManager --- .../nosql/nosql_execution_store.go | 48 ++++++++++++++----- common/persistence/nosql/nosql_shard_store.go | 5 +- common/persistence/nosql/nosql_task_store.go | 2 +- .../nosql/nosqlplugin/cassandra/db.go | 9 ---- .../nosql/nosqlplugin/cassandra/shard.go | 8 ++-- .../nosql/nosqlplugin/cassandra/shard_test.go | 26 ++++------ .../nosql/nosqlplugin/cassandra/tasks.go | 3 +- .../nosql/nosqlplugin/cassandra/tasks_test.go | 9 ++-- .../nosql/nosqlplugin/cassandra/workflow.go | 17 ++++--- .../nosqlplugin/cassandra/workflow_test.go | 6 ++- .../nosql/nosqlplugin/dynamodb/shard.go | 5 +- .../nosql/nosqlplugin/dynamodb/task.go | 2 + .../nosql/nosqlplugin/dynamodb/workflow.go | 6 ++- .../nosql/nosqlplugin/interfaces.go | 12 +++-- .../nosql/nosqlplugin/interfaces_mock.go | 42 ++++++++-------- .../nosql/nosqlplugin/mongodb/shard.go | 5 +- .../nosql/nosqlplugin/mongodb/task.go | 2 + .../nosql/nosqlplugin/mongodb/workflow.go | 6 ++- 18 files changed, 118 insertions(+), 95 deletions(-) diff --git a/common/persistence/nosql/nosql_execution_store.go b/common/persistence/nosql/nosql_execution_store.go index d0b7da568aa..eae8b74d432 100644 --- a/common/persistence/nosql/nosql_execution_store.go +++ b/common/persistence/nosql/nosql_execution_store.go @@ -23,6 +23,7 @@ package nosql import ( "context" "fmt" + "time" "github.com/uber/cadence/common/log" "github.com/uber/cadence/common/log/tag" @@ -114,9 +115,14 @@ func (d *nosqlExecutionStore) CreateWorkflowExecution( err = d.db.InsertWorkflowExecutionWithTasks( ctx, workflowRequestsWriteRequest, - currentWorkflowWriteReq, workflowExecutionWriteReq, - transferTasks, crossClusterTasks, replicationTasks, timerTasks, + currentWorkflowWriteReq, + workflowExecutionWriteReq, + transferTasks, + crossClusterTasks, + replicationTasks, + timerTasks, shardCondition, + time.Now(), ) if err != nil { conditionFailureErr, isConditionFailedError := err.(*nosqlplugin.WorkflowOperationConditionFailure) @@ -337,10 +343,19 @@ func (d *nosqlExecutionStore) UpdateWorkflowExecution( } err = d.db.UpdateWorkflowExecutionWithTasks( - ctx, workflowRequestsWriteRequest, currentWorkflowWriteReq, - mutateExecution, insertExecution, nil, // no workflow to reset here - nosqlTransferTasks, nosqlCrossClusterTasks, nosqlReplicationTasks, nosqlTimerTasks, - shardCondition) + ctx, + workflowRequestsWriteRequest, + currentWorkflowWriteReq, + mutateExecution, + insertExecution, + nil, + nosqlTransferTasks, + nosqlCrossClusterTasks, + nosqlReplicationTasks, + nosqlTimerTasks, + shardCondition, + time.Now(), + ) return d.processUpdateWorkflowResult(err, request.RangeID) } @@ -489,10 +504,19 @@ func (d *nosqlExecutionStore) ConflictResolveWorkflowExecution( } err = d.db.UpdateWorkflowExecutionWithTasks( - ctx, workflowRequestsWriteRequest, currentWorkflowWriteReq, - mutateExecution, insertExecution, resetExecution, - nosqlTransferTasks, nosqlCrossClusterTasks, nosqlReplicationTasks, nosqlTimerTasks, - shardCondition) + ctx, + workflowRequestsWriteRequest, + currentWorkflowWriteReq, + mutateExecution, + insertExecution, + resetExecution, + nosqlTransferTasks, + nosqlCrossClusterTasks, + nosqlReplicationTasks, + nosqlTimerTasks, + shardCondition, + time.Now(), + ) return d.processUpdateWorkflowResult(err, request.RangeID) } @@ -759,7 +783,7 @@ func (d *nosqlExecutionStore) PutReplicationTaskToDLQ( ctx context.Context, request *persistence.InternalPutReplicationTaskToDLQRequest, ) error { - err := d.db.InsertReplicationDLQTask(ctx, d.shardID, request.SourceClusterName, *request.TaskInfo) + err := d.db.InsertReplicationDLQTask(ctx, d.shardID, request.SourceClusterName, *request.TaskInfo, time.Now()) if err != nil { return convertCommonErrors(d.db, "PutReplicationTaskToDLQ", err) } @@ -843,7 +867,7 @@ func (d *nosqlExecutionStore) CreateFailoverMarkerTasks( err := d.db.InsertReplicationTask(ctx, nosqlTasks, nosqlplugin.ShardCondition{ ShardID: d.shardID, RangeID: request.RangeID, - }) + }, time.Now()) if err != nil { conditionFailureErr, isConditionFailedError := err.(*nosqlplugin.ShardOperationConditionFailure) diff --git a/common/persistence/nosql/nosql_shard_store.go b/common/persistence/nosql/nosql_shard_store.go index 1705b206524..5568703f930 100644 --- a/common/persistence/nosql/nosql_shard_store.go +++ b/common/persistence/nosql/nosql_shard_store.go @@ -23,6 +23,7 @@ package nosql import ( "context" "fmt" + "time" "github.com/uber/cadence/common/config" "github.com/uber/cadence/common/log" @@ -63,7 +64,7 @@ func (sh *nosqlShardStore) CreateShard( if err != nil { return err } - err = storeShard.db.InsertShard(ctx, request.ShardInfo) + err = storeShard.db.InsertShard(ctx, request.ShardInfo, time.Now()) if err != nil { conditionFailure, ok := err.(*nosqlplugin.ShardOperationConditionFailure) if ok { @@ -161,7 +162,7 @@ func (sh *nosqlShardStore) UpdateShard( if err != nil { return err } - err = storeShard.db.UpdateShard(ctx, request.ShardInfo, request.PreviousRangeID) + err = storeShard.db.UpdateShard(ctx, request.ShardInfo, request.PreviousRangeID, time.Now()) if err != nil { conditionFailure, ok := err.(*nosqlplugin.ShardOperationConditionFailure) if ok { diff --git a/common/persistence/nosql/nosql_task_store.go b/common/persistence/nosql/nosql_task_store.go index 4b713afa04c..1743cb731c0 100644 --- a/common/persistence/nosql/nosql_task_store.go +++ b/common/persistence/nosql/nosql_task_store.go @@ -225,7 +225,7 @@ func (t *nosqlTaskStore) UpdateTaskList( } if tli.Kind == persistence.TaskListKindSticky { // if task_list is sticky, then update with TTL - err = storeShard.db.UpdateTaskListWithTTL(ctx, stickyTaskListTTL, taskListToUpdate, tli.RangeID) + err = storeShard.db.UpdateTaskListWithTTL(ctx, stickyTaskListTTL, taskListToUpdate, tli.RangeID, time.Now()) } else { err = storeShard.db.UpdateTaskList(ctx, taskListToUpdate, tli.RangeID) } diff --git a/common/persistence/nosql/nosqlplugin/cassandra/db.go b/common/persistence/nosql/nosqlplugin/cassandra/db.go index cc00c1e1e80..7668857ca5f 100644 --- a/common/persistence/nosql/nosqlplugin/cassandra/db.go +++ b/common/persistence/nosql/nosqlplugin/cassandra/db.go @@ -21,7 +21,6 @@ package cassandra import ( - "github.com/uber/cadence/common/clock" "github.com/uber/cadence/common/config" "github.com/uber/cadence/common/log" "github.com/uber/cadence/common/log/tag" @@ -37,7 +36,6 @@ type cdb struct { session gocql.Session cfg *config.NoSQL dc *persistence.DynamicConfiguration - timeSrc clock.TimeSource } var _ nosqlplugin.DB = (*cdb)(nil) @@ -53,12 +51,6 @@ func dbWithClient(client gocql.Client) cassandraDBOption { } } -func dbWithTimeSource(timeSrc clock.TimeSource) cassandraDBOption { - return func(db *cdb) { - db.timeSrc = timeSrc - } -} - // newCassandraDBFromSession returns a DB from a session func newCassandraDBFromSession( cfg *config.NoSQL, @@ -72,7 +64,6 @@ func newCassandraDBFromSession( logger: logger, cfg: cfg, dc: dc, - timeSrc: clock.NewRealTimeSource(), } for _, opt := range opts { diff --git a/common/persistence/nosql/nosqlplugin/cassandra/shard.go b/common/persistence/nosql/nosqlplugin/cassandra/shard.go index 301ffe09be2..fefdef2770d 100644 --- a/common/persistence/nosql/nosqlplugin/cassandra/shard.go +++ b/common/persistence/nosql/nosqlplugin/cassandra/shard.go @@ -33,8 +33,8 @@ import ( // InsertShard creates a new shard, return error is there is any. // Return ShardOperationConditionFailure if the condition doesn't meet -func (db *cdb) InsertShard(ctx context.Context, row *nosqlplugin.ShardRow) error { - cqlNowTimestamp := persistence.UnixNanoToDBTimestamp(db.timeSrc.Now().UnixNano()) +func (db *cdb) InsertShard(ctx context.Context, row *nosqlplugin.ShardRow, timeStamp time.Time) error { + cqlNowTimestamp := persistence.UnixNanoToDBTimestamp(timeStamp.UnixNano()) markerData, markerEncoding := persistence.FromDataBlob(row.PendingFailoverMarkers) transferPQS, transferPQSEncoding := persistence.FromDataBlob(row.TransferProcessingQueueStates) timerPQS, timerPQSEncoding := persistence.FromDataBlob(row.TimerProcessingQueueStates) @@ -233,8 +233,8 @@ func (db *cdb) UpdateRangeID(ctx context.Context, shardID int, rangeID int64, pr // UpdateShard updates a shard, return error is there is any. // Return ShardOperationConditionFailure if the condition doesn't meet -func (db *cdb) UpdateShard(ctx context.Context, row *nosqlplugin.ShardRow, previousRangeID int64) error { - cqlNowTimestamp := persistence.UnixNanoToDBTimestamp(db.timeSrc.Now().UnixNano()) +func (db *cdb) UpdateShard(ctx context.Context, row *nosqlplugin.ShardRow, previousRangeID int64, timeStamp time.Time) error { + cqlNowTimestamp := persistence.UnixNanoToDBTimestamp(timeStamp.UnixNano()) markerData, markerEncoding := persistence.FromDataBlob(row.PendingFailoverMarkers) transferPQS, transferPQSEncoding := persistence.FromDataBlob(row.TransferProcessingQueueStates) timerPQS, timerPQSEncoding := persistence.FromDataBlob(row.TimerProcessingQueueStates) diff --git a/common/persistence/nosql/nosqlplugin/cassandra/shard_test.go b/common/persistence/nosql/nosqlplugin/cassandra/shard_test.go index 33692f9fbe2..38933497520 100644 --- a/common/persistence/nosql/nosqlplugin/cassandra/shard_test.go +++ b/common/persistence/nosql/nosqlplugin/cassandra/shard_test.go @@ -29,7 +29,6 @@ import ( "github.com/google/go-cmp/cmp" "go.uber.org/mock/gomock" - "github.com/uber/cadence/common/clock" "github.com/uber/cadence/common/config" "github.com/uber/cadence/common/log/testlogger" "github.com/uber/cadence/common/persistence" @@ -112,11 +111,9 @@ func TestInsertShard(t *testing.T) { cfg := &config.NoSQL{} logger := testlogger.New(t) dc := &persistence.DynamicConfiguration{} - timeSrc := clock.NewMockedTimeSourceAt(ts) - db := newCassandraDBFromSession(cfg, session, logger, dc, dbWithClient(client), dbWithTimeSource(timeSrc)) - - err := db.InsertShard(context.Background(), tc.row) + db := newCassandraDBFromSession(cfg, session, logger, dc, dbWithClient(client)) + err := db.InsertShard(context.Background(), tc.row, ts) if (err != nil) != tc.wantErr { t.Errorf("InsertShard() error = %v, wantErr %v", err, tc.wantErr) } @@ -258,8 +255,8 @@ func TestSelectShard(t *testing.T) { cfg := &config.NoSQL{} logger := testlogger.New(t) dc := &persistence.DynamicConfiguration{} - timeSrc := clock.NewMockedTimeSourceAt(ts) - db := newCassandraDBFromSession(cfg, session, logger, dc, dbWithClient(client), dbWithTimeSource(timeSrc)) + + db := newCassandraDBFromSession(cfg, session, logger, dc, dbWithClient(client)) gotRangeID, gotShardInfo, err := db.SelectShard(context.Background(), tc.shardID, tc.cluster) @@ -287,11 +284,6 @@ func TestSelectShard(t *testing.T) { } func TestUpdateRangeID(t *testing.T) { - ts, err := time.Parse(time.RFC3339, "2024-04-02T18:00:00Z") - if err != nil { - t.Fatalf("Failed to parse time: %v", err) - } - tests := []struct { name string shardID int @@ -365,8 +357,8 @@ func TestUpdateRangeID(t *testing.T) { cfg := &config.NoSQL{} logger := testlogger.New(t) dc := &persistence.DynamicConfiguration{} - timeSrc := clock.NewMockedTimeSourceAt(ts) - db := newCassandraDBFromSession(cfg, session, logger, dc, dbWithClient(client), dbWithTimeSource(timeSrc)) + + db := newCassandraDBFromSession(cfg, session, logger, dc, dbWithClient(client)) err := db.UpdateRangeID(context.Background(), tc.shardID, tc.rangeID, tc.prevRangeID) @@ -480,11 +472,9 @@ func TestUpdateShard(t *testing.T) { cfg := &config.NoSQL{} logger := testlogger.New(t) dc := &persistence.DynamicConfiguration{} - timeSrc := clock.NewMockedTimeSourceAt(ts) - db := newCassandraDBFromSession(cfg, session, logger, dc, dbWithClient(client), dbWithTimeSource(timeSrc)) - - err := db.UpdateShard(context.Background(), tc.row, tc.prevRangeID) + db := newCassandraDBFromSession(cfg, session, logger, dc, dbWithClient(client)) + err := db.UpdateShard(context.Background(), tc.row, tc.prevRangeID, ts) if (err != nil) != tc.wantErr { t.Errorf("UpdateShard() error = %v, wantErr %v", err, tc.wantErr) } diff --git a/common/persistence/nosql/nosqlplugin/cassandra/tasks.go b/common/persistence/nosql/nosqlplugin/cassandra/tasks.go index f647463cdda..21ff7025bdf 100644 --- a/common/persistence/nosql/nosqlplugin/cassandra/tasks.go +++ b/common/persistence/nosql/nosqlplugin/cassandra/tasks.go @@ -261,6 +261,7 @@ func (db *cdb) UpdateTaskListWithTTL( ttlSeconds int64, row *nosqlplugin.TaskListRow, previousRangeID int64, + timeStamp time.Time, ) error { batch := db.session.NewBatch(gocql.LoggedBatch).WithContext(ctx) // part 1 is used to set TTL on primary key as UPDATE can't set TTL for primary key @@ -281,7 +282,7 @@ func (db *cdb) UpdateTaskListWithTTL( row.TaskListType, row.AckLevel, row.TaskListKind, - db.timeSrc.Now(), + timeStamp, fromTaskListPartitionConfig(row.AdaptivePartitionConfig), row.DomainID, row.TaskListName, diff --git a/common/persistence/nosql/nosqlplugin/cassandra/tasks_test.go b/common/persistence/nosql/nosqlplugin/cassandra/tasks_test.go index b7ee238ee9b..7fdfff264b3 100644 --- a/common/persistence/nosql/nosqlplugin/cassandra/tasks_test.go +++ b/common/persistence/nosql/nosqlplugin/cassandra/tasks_test.go @@ -30,7 +30,6 @@ import ( "github.com/google/go-cmp/cmp" "go.uber.org/mock/gomock" - "github.com/uber/cadence/common/clock" "github.com/uber/cadence/common/config" "github.com/uber/cadence/common/log/testlogger" "github.com/uber/cadence/common/persistence" @@ -543,10 +542,9 @@ func TestUpdateTaskListWithTTL(t *testing.T) { logger := testlogger.New(t) dc := &persistence.DynamicConfiguration{} - timeSrc := clock.NewMockedTimeSourceAt(ts) - db := newCassandraDBFromSession(cfg, session, logger, dc, dbWithClient(client), dbWithTimeSource(timeSrc)) + db := newCassandraDBFromSession(cfg, session, logger, dc, dbWithClient(client)) - err := db.UpdateTaskListWithTTL(context.Background(), tc.ttlSeconds, tc.row, tc.prevRangeID) + err := db.UpdateTaskListWithTTL(context.Background(), tc.ttlSeconds, tc.row, tc.prevRangeID, ts) if (err != nil) != tc.wantErr { t.Errorf("UpdateTaskListWithTTL() error = %v, wantErr %v", err, tc.wantErr) @@ -823,8 +821,7 @@ func TestInsertTasks(t *testing.T) { logger := testlogger.New(t) dc := &persistence.DynamicConfiguration{} - timeSrc := clock.NewMockedTimeSourceAt(ts) - db := newCassandraDBFromSession(cfg, session, logger, dc, dbWithClient(client), dbWithTimeSource(timeSrc)) + db := newCassandraDBFromSession(cfg, session, logger, dc, dbWithClient(client)) err := db.InsertTasks(context.Background(), tc.tasksToInsert, tc.tasklistCond) diff --git a/common/persistence/nosql/nosqlplugin/cassandra/workflow.go b/common/persistence/nosql/nosqlplugin/cassandra/workflow.go index ae42074ada8..a3cb214ddfa 100644 --- a/common/persistence/nosql/nosqlplugin/cassandra/workflow.go +++ b/common/persistence/nosql/nosqlplugin/cassandra/workflow.go @@ -46,11 +46,11 @@ func (db *cdb) InsertWorkflowExecutionWithTasks( replicationTasks []*nosqlplugin.ReplicationTask, timerTasks []*nosqlplugin.TimerTask, shardCondition *nosqlplugin.ShardCondition, + timeStamp time.Time, ) error { shardID := shardCondition.ShardID domainID := execution.DomainID workflowID := execution.WorkflowID - timeStamp := db.timeSrc.Now() batch := db.session.NewBatch(gocql.LoggedBatch).WithContext(ctx) @@ -126,11 +126,11 @@ func (db *cdb) UpdateWorkflowExecutionWithTasks( replicationTasks []*nosqlplugin.ReplicationTask, timerTasks []*nosqlplugin.TimerTask, shardCondition *nosqlplugin.ShardCondition, + timeStamp time.Time, ) error { shardID := shardCondition.ShardID var domainID, workflowID string var previousNextEventIDCondition int64 - timeStamp := db.timeSrc.Now() if mutatedExecution != nil { domainID = mutatedExecution.DomainID workflowID = mutatedExecution.WorkflowID @@ -626,7 +626,7 @@ func (db *cdb) RangeDeleteCrossClusterTasks(ctx context.Context, shardID int, ta return query.Exec() } -func (db *cdb) InsertReplicationDLQTask(ctx context.Context, shardID int, sourceCluster string, task nosqlplugin.ReplicationTask) error { +func (db *cdb) InsertReplicationDLQTask(ctx context.Context, shardID int, sourceCluster string, task nosqlplugin.ReplicationTask, timeStamp time.Time) error { // Use source cluster name as the workflow id for replication dlq query := db.session.Query(templateCreateReplicationTaskQuery, shardID, @@ -650,7 +650,7 @@ func (db *cdb) InsertReplicationDLQTask(ctx context.Context, shardID int, source defaultVisibilityTimestamp, defaultVisibilityTimestamp, task.TaskID, - db.timeSrc.Now(), + timeStamp, ).WithContext(ctx) return query.Exec() @@ -720,14 +720,19 @@ func (db *cdb) RangeDeleteReplicationDLQTasks(ctx context.Context, shardID int, return db.executeWithConsistencyAll(query) } -func (db *cdb) InsertReplicationTask(ctx context.Context, tasks []*nosqlplugin.ReplicationTask, shardCondition nosqlplugin.ShardCondition) error { +func (db *cdb) InsertReplicationTask( + ctx context.Context, + tasks []*nosqlplugin.ReplicationTask, + shardCondition nosqlplugin.ShardCondition, + timeStamp time.Time, +) error { if len(tasks) == 0 { return nil } shardID := shardCondition.ShardID batch := db.session.NewBatch(gocql.LoggedBatch).WithContext(ctx) - timeStamp := db.timeSrc.Now() + for _, task := range tasks { createReplicationTasks(batch, shardID, task.DomainID, task.WorkflowID, []*nosqlplugin.ReplicationTask{task}, timeStamp) } diff --git a/common/persistence/nosql/nosqlplugin/cassandra/workflow_test.go b/common/persistence/nosql/nosqlplugin/cassandra/workflow_test.go index 0649bc52b80..e8e1c07e0fb 100644 --- a/common/persistence/nosql/nosqlplugin/cassandra/workflow_test.go +++ b/common/persistence/nosql/nosqlplugin/cassandra/workflow_test.go @@ -147,6 +147,7 @@ func TestInsertWorkflowExecutionWithTasks(t *testing.T) { tc.replicationTasks, tc.timerTasks, tc.shardCondition, + time.Now(), ) if (err != nil) != tc.wantErr { @@ -458,6 +459,7 @@ func TestUpdateWorkflowExecutionWithTasks(t *testing.T) { tc.replicationTasks, tc.timerTasks, tc.shardCondition, + time.Now(), ) if (err != nil) != tc.wantErr { @@ -1942,7 +1944,7 @@ func TestInsertReplicationDLQTask(t *testing.T) { logger := testlogger.New(t) db := newCassandraDBFromSession(nil, session, logger, nil, dbWithClient(gocql.NewMockClient(ctrl))) - err := db.InsertReplicationDLQTask(context.Background(), tc.shardID, tc.sourceCluster, tc.task) + err := db.InsertReplicationDLQTask(context.Background(), tc.shardID, tc.sourceCluster, tc.task, time.Now()) if (err != nil) != tc.wantErr { t.Errorf("RangeDeleteCrossClusterTasks() error: %v, wantErr: %v", err, tc.wantErr) @@ -2315,7 +2317,7 @@ func TestInsertReplicationTask(t *testing.T) { logger := testlogger.New(t) db := newCassandraDBFromSession(nil, session, logger, nil, dbWithClient(gocql.NewMockClient(ctrl))) - err := db.InsertReplicationTask(context.Background(), tc.tasks, tc.shardCondition) + err := db.InsertReplicationTask(context.Background(), tc.tasks, tc.shardCondition, time.Now()) if (err != nil) != tc.wantErr { t.Errorf("InsertReplicationTask() error: %v, wantErr: %v", err, tc.wantErr) diff --git a/common/persistence/nosql/nosqlplugin/dynamodb/shard.go b/common/persistence/nosql/nosqlplugin/dynamodb/shard.go index 6c08853bbde..2423ac7c5a0 100644 --- a/common/persistence/nosql/nosqlplugin/dynamodb/shard.go +++ b/common/persistence/nosql/nosqlplugin/dynamodb/shard.go @@ -22,13 +22,14 @@ package dynamodb import ( "context" + "time" "github.com/uber/cadence/common/persistence/nosql/nosqlplugin" ) // InsertShard creates a new shard, return error is there is any. // Return ShardOperationConditionFailure if the condition doesn't meet -func (db *ddb) InsertShard(ctx context.Context, row *nosqlplugin.ShardRow) error { +func (db *ddb) InsertShard(ctx context.Context, row *nosqlplugin.ShardRow, timeStamp time.Time) error { panic("TODO") } @@ -45,6 +46,6 @@ func (db *ddb) UpdateRangeID(ctx context.Context, shardID int, rangeID int64, pr // UpdateShard updates a shard, return error is there is any. // Return ShardOperationConditionFailure if the condition doesn't meet -func (db *ddb) UpdateShard(ctx context.Context, row *nosqlplugin.ShardRow, previousRangeID int64) error { +func (db *ddb) UpdateShard(ctx context.Context, row *nosqlplugin.ShardRow, previousRangeID int64, timeStamp time.Time) error { panic("TODO") } diff --git a/common/persistence/nosql/nosqlplugin/dynamodb/task.go b/common/persistence/nosql/nosqlplugin/dynamodb/task.go index 2f2e2aac050..937450cb430 100644 --- a/common/persistence/nosql/nosqlplugin/dynamodb/task.go +++ b/common/persistence/nosql/nosqlplugin/dynamodb/task.go @@ -22,6 +22,7 @@ package dynamodb import ( "context" + "time" "github.com/uber/cadence/common/persistence/nosql/nosqlplugin" ) @@ -57,6 +58,7 @@ func (db *ddb) UpdateTaskListWithTTL( ttlSeconds int64, row *nosqlplugin.TaskListRow, previousRangeID int64, + timeStamp time.Time, ) error { panic("TODO") } diff --git a/common/persistence/nosql/nosqlplugin/dynamodb/workflow.go b/common/persistence/nosql/nosqlplugin/dynamodb/workflow.go index dd8e6215578..87a80bfa5e1 100644 --- a/common/persistence/nosql/nosqlplugin/dynamodb/workflow.go +++ b/common/persistence/nosql/nosqlplugin/dynamodb/workflow.go @@ -41,6 +41,7 @@ func (db *ddb) InsertWorkflowExecutionWithTasks( replicationTasks []*nosqlplugin.ReplicationTask, timerTasks []*nosqlplugin.TimerTask, shardCondition *nosqlplugin.ShardCondition, + timeStamp time.Time, ) error { panic("TODO") } @@ -57,6 +58,7 @@ func (db *ddb) UpdateWorkflowExecutionWithTasks( replicationTasks []*nosqlplugin.ReplicationTask, timerTasks []*nosqlplugin.TimerTask, shardCondition *nosqlplugin.ShardCondition, + timeStamp time.Time, ) error { panic("TODO") } @@ -125,7 +127,7 @@ func (db *ddb) RangeDeleteReplicationTasks(ctx context.Context, shardID int, inc panic("TODO") } -func (db *ddb) InsertReplicationTask(ctx context.Context, tasks []*nosqlplugin.ReplicationTask, condition nosqlplugin.ShardCondition) error { +func (db *ddb) InsertReplicationTask(ctx context.Context, tasks []*nosqlplugin.ReplicationTask, condition nosqlplugin.ShardCondition, timeStamp time.Time) error { panic("TODO") } @@ -141,7 +143,7 @@ func (db *ddb) RangeDeleteCrossClusterTasks(ctx context.Context, shardID int, ta panic("TODO") } -func (db *ddb) InsertReplicationDLQTask(ctx context.Context, shardID int, sourceCluster string, task nosqlplugin.ReplicationTask) error { +func (db *ddb) InsertReplicationDLQTask(ctx context.Context, shardID int, sourceCluster string, task nosqlplugin.ReplicationTask, timeStamp time.Time) error { panic("TODO") } diff --git a/common/persistence/nosql/nosqlplugin/interfaces.go b/common/persistence/nosql/nosqlplugin/interfaces.go index fae199878cb..9f884852f79 100644 --- a/common/persistence/nosql/nosqlplugin/interfaces.go +++ b/common/persistence/nosql/nosqlplugin/interfaces.go @@ -205,7 +205,7 @@ type ( // InsertShard creates a new shard. // Return error is there is any thing wrong // Return the ShardOperationConditionFailure when doesn't meet the condition - InsertShard(ctx context.Context, row *ShardRow) error + InsertShard(ctx context.Context, row *ShardRow, timeStamp time.Time) error // SelectShard gets a shard, rangeID is the current rangeID in shard row SelectShard(ctx context.Context, shardID int, currentClusterName string) (rangeID int64, shard *ShardRow, err error) // UpdateRangeID updates the rangeID @@ -215,7 +215,7 @@ type ( // UpdateShard updates a shard // Return error is there is any thing wrong // Return the ShardOperationConditionFailure when doesn't meet the condition - UpdateShard(ctx context.Context, row *ShardRow, previousRangeID int64) error + UpdateShard(ctx context.Context, row *ShardRow, previousRangeID int64, timeStamp time.Time) error } /** @@ -329,7 +329,7 @@ type ( // Return TaskOperationConditionFailure if the condition doesn't meet // Ignore TTL if it's not supported, which becomes exactly the same as UpdateTaskList, but ListTaskList must be // implemented for TaskListScavenger - UpdateTaskListWithTTL(ctx context.Context, ttlSeconds int64, row *TaskListRow, previousRangeID int64) error + UpdateTaskListWithTTL(ctx context.Context, ttlSeconds int64, row *TaskListRow, previousRangeID int64, timeStamp time.Time) error // ListTaskList returns all tasklists. // Noop if TTL is already implemented in other methods ListTaskList(ctx context.Context, pageSize int, nextPageToken []byte) (*ListTaskListResult, error) @@ -421,6 +421,7 @@ type ( replicationTasks []*ReplicationTask, timerTasks []*TimerTask, shardCondition *ShardCondition, + timeStamp time.Time, ) error // UpdateWorkflowExecutionWithTasks is for updating a new workflow execution record. @@ -451,6 +452,7 @@ type ( replicationTasks []*ReplicationTask, timerTasks []*TimerTask, shardCondition *ShardCondition, + timeStamp time.Time, ) error // current_workflow table @@ -495,7 +497,7 @@ type ( // delete a range of replication tasks RangeDeleteReplicationTasks(ctx context.Context, shardID int, inclusiveEndTaskID int64) error // insert replication task with shard condition check - InsertReplicationTask(ctx context.Context, tasks []*ReplicationTask, condition ShardCondition) error + InsertReplicationTask(ctx context.Context, tasks []*ReplicationTask, condition ShardCondition, timeStamp time.Time) error // cross_cluster_task table // within a shard, paging through replication tasks order by taskID(ASC), filtered by minTaskID(exclusive) and maxTaskID(inclusive) @@ -507,7 +509,7 @@ type ( // replication_dlq_task // insert a new replication task to DLQ - InsertReplicationDLQTask(ctx context.Context, shardID int, sourceCluster string, task ReplicationTask) error + InsertReplicationDLQTask(ctx context.Context, shardID int, sourceCluster string, task ReplicationTask, timeStamp time.Time) error // within a shard, for a sourceCluster, paging through replication tasks order by taskID(ASC), filtered by minTaskID(exclusive) and maxTaskID(inclusive) SelectReplicationDLQTasksOrderByTaskID(ctx context.Context, shardID int, sourceCluster string, pageSize int, pageToken []byte, exclusiveMinTaskID, inclusiveMaxTaskID int64) ([]*ReplicationTask, []byte, error) // return the DLQ size diff --git a/common/persistence/nosql/nosqlplugin/interfaces_mock.go b/common/persistence/nosql/nosqlplugin/interfaces_mock.go index 3efd68ad426..693cd70968b 100644 --- a/common/persistence/nosql/nosqlplugin/interfaces_mock.go +++ b/common/persistence/nosql/nosqlplugin/interfaces_mock.go @@ -482,7 +482,7 @@ func (mr *MockDBMockRecorder) InsertQueueMetadata(ctx, queueType, version any) * } // InsertReplicationDLQTask mocks base method. -func (m *MockDB) InsertReplicationDLQTask(ctx context.Context, shardID int, sourceCluster string, task ReplicationTask) error { +func (m *MockDB) InsertReplicationDLQTask(ctx context.Context, shardID int, sourceCluster string, task ReplicationTask, timeStamp time.Time) error { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "InsertReplicationDLQTask", ctx, shardID, sourceCluster, task) ret0, _ := ret[0].(error) @@ -496,7 +496,7 @@ func (mr *MockDBMockRecorder) InsertReplicationDLQTask(ctx, shardID, sourceClust } // InsertReplicationTask mocks base method. -func (m *MockDB) InsertReplicationTask(ctx context.Context, tasks []*ReplicationTask, condition ShardCondition) error { +func (m *MockDB) InsertReplicationTask(ctx context.Context, tasks []*ReplicationTask, condition ShardCondition, timeStamp time.Time) error { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "InsertReplicationTask", ctx, tasks, condition) ret0, _ := ret[0].(error) @@ -510,7 +510,7 @@ func (mr *MockDBMockRecorder) InsertReplicationTask(ctx, tasks, condition any) * } // InsertShard mocks base method. -func (m *MockDB) InsertShard(ctx context.Context, row *ShardRow) error { +func (m *MockDB) InsertShard(ctx context.Context, row *ShardRow, timeStamp time.Time) error { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "InsertShard", ctx, row) ret0, _ := ret[0].(error) @@ -566,7 +566,7 @@ func (mr *MockDBMockRecorder) InsertVisibility(ctx, ttlSeconds, row any) *gomock } // InsertWorkflowExecutionWithTasks mocks base method. -func (m *MockDB) InsertWorkflowExecutionWithTasks(ctx context.Context, requests *WorkflowRequestsWriteRequest, currentWorkflowRequest *CurrentWorkflowWriteRequest, execution *WorkflowExecutionRequest, transferTasks []*TransferTask, crossClusterTasks []*CrossClusterTask, replicationTasks []*ReplicationTask, timerTasks []*TimerTask, shardCondition *ShardCondition) error { +func (m *MockDB) InsertWorkflowExecutionWithTasks(ctx context.Context, requests *WorkflowRequestsWriteRequest, currentWorkflowRequest *CurrentWorkflowWriteRequest, execution *WorkflowExecutionRequest, transferTasks []*TransferTask, crossClusterTasks []*CrossClusterTask, replicationTasks []*ReplicationTask, timerTasks []*TimerTask, shardCondition *ShardCondition, timeStamp time.Time) error { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "InsertWorkflowExecutionWithTasks", ctx, requests, currentWorkflowRequest, execution, transferTasks, crossClusterTasks, replicationTasks, timerTasks, shardCondition) ret0, _ := ret[0].(error) @@ -1208,7 +1208,7 @@ func (mr *MockDBMockRecorder) UpdateRangeID(ctx, shardID, rangeID, previousRange } // UpdateShard mocks base method. -func (m *MockDB) UpdateShard(ctx context.Context, row *ShardRow, previousRangeID int64) error { +func (m *MockDB) UpdateShard(ctx context.Context, row *ShardRow, previousRangeID int64, timeStamp time.Time) error { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "UpdateShard", ctx, row, previousRangeID) ret0, _ := ret[0].(error) @@ -1236,7 +1236,7 @@ func (mr *MockDBMockRecorder) UpdateTaskList(ctx, row, previousRangeID any) *gom } // UpdateTaskListWithTTL mocks base method. -func (m *MockDB) UpdateTaskListWithTTL(ctx context.Context, ttlSeconds int64, row *TaskListRow, previousRangeID int64) error { +func (m *MockDB) UpdateTaskListWithTTL(ctx context.Context, ttlSeconds int64, row *TaskListRow, previousRangeID int64, timeStamp time.Time) error { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "UpdateTaskListWithTTL", ctx, ttlSeconds, row, previousRangeID) ret0, _ := ret[0].(error) @@ -1264,7 +1264,7 @@ func (mr *MockDBMockRecorder) UpdateVisibility(ctx, ttlSeconds, row any) *gomock } // UpdateWorkflowExecutionWithTasks mocks base method. -func (m *MockDB) UpdateWorkflowExecutionWithTasks(ctx context.Context, requests *WorkflowRequestsWriteRequest, currentWorkflowRequest *CurrentWorkflowWriteRequest, mutatedExecution, insertedExecution, resetExecution *WorkflowExecutionRequest, transferTasks []*TransferTask, crossClusterTasks []*CrossClusterTask, replicationTasks []*ReplicationTask, timerTasks []*TimerTask, shardCondition *ShardCondition) error { +func (m *MockDB) UpdateWorkflowExecutionWithTasks(ctx context.Context, requests *WorkflowRequestsWriteRequest, currentWorkflowRequest *CurrentWorkflowWriteRequest, mutatedExecution, insertedExecution, resetExecution *WorkflowExecutionRequest, transferTasks []*TransferTask, crossClusterTasks []*CrossClusterTask, replicationTasks []*ReplicationTask, timerTasks []*TimerTask, shardCondition *ShardCondition, timeStamp time.Time) error { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "UpdateWorkflowExecutionWithTasks", ctx, requests, currentWorkflowRequest, mutatedExecution, insertedExecution, resetExecution, transferTasks, crossClusterTasks, replicationTasks, timerTasks, shardCondition) ret0, _ := ret[0].(error) @@ -1598,7 +1598,7 @@ func (mr *MocktableCRUDMockRecorder) InsertQueueMetadata(ctx, queueType, version } // InsertReplicationDLQTask mocks base method. -func (m *MocktableCRUD) InsertReplicationDLQTask(ctx context.Context, shardID int, sourceCluster string, task ReplicationTask) error { +func (m *MocktableCRUD) InsertReplicationDLQTask(ctx context.Context, shardID int, sourceCluster string, task ReplicationTask, timeStamp time.Time) error { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "InsertReplicationDLQTask", ctx, shardID, sourceCluster, task) ret0, _ := ret[0].(error) @@ -1612,7 +1612,7 @@ func (mr *MocktableCRUDMockRecorder) InsertReplicationDLQTask(ctx, shardID, sour } // InsertReplicationTask mocks base method. -func (m *MocktableCRUD) InsertReplicationTask(ctx context.Context, tasks []*ReplicationTask, condition ShardCondition) error { +func (m *MocktableCRUD) InsertReplicationTask(ctx context.Context, tasks []*ReplicationTask, condition ShardCondition, timeStamp time.Time) error { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "InsertReplicationTask", ctx, tasks, condition) ret0, _ := ret[0].(error) @@ -1626,7 +1626,7 @@ func (mr *MocktableCRUDMockRecorder) InsertReplicationTask(ctx, tasks, condition } // InsertShard mocks base method. -func (m *MocktableCRUD) InsertShard(ctx context.Context, row *ShardRow) error { +func (m *MocktableCRUD) InsertShard(ctx context.Context, row *ShardRow, timeStamp time.Time) error { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "InsertShard", ctx, row) ret0, _ := ret[0].(error) @@ -1682,7 +1682,7 @@ func (mr *MocktableCRUDMockRecorder) InsertVisibility(ctx, ttlSeconds, row any) } // InsertWorkflowExecutionWithTasks mocks base method. -func (m *MocktableCRUD) InsertWorkflowExecutionWithTasks(ctx context.Context, requests *WorkflowRequestsWriteRequest, currentWorkflowRequest *CurrentWorkflowWriteRequest, execution *WorkflowExecutionRequest, transferTasks []*TransferTask, crossClusterTasks []*CrossClusterTask, replicationTasks []*ReplicationTask, timerTasks []*TimerTask, shardCondition *ShardCondition) error { +func (m *MocktableCRUD) InsertWorkflowExecutionWithTasks(ctx context.Context, requests *WorkflowRequestsWriteRequest, currentWorkflowRequest *CurrentWorkflowWriteRequest, execution *WorkflowExecutionRequest, transferTasks []*TransferTask, crossClusterTasks []*CrossClusterTask, replicationTasks []*ReplicationTask, timerTasks []*TimerTask, shardCondition *ShardCondition, timeStamp time.Time) error { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "InsertWorkflowExecutionWithTasks", ctx, requests, currentWorkflowRequest, execution, transferTasks, crossClusterTasks, replicationTasks, timerTasks, shardCondition) ret0, _ := ret[0].(error) @@ -2254,7 +2254,7 @@ func (mr *MocktableCRUDMockRecorder) UpdateRangeID(ctx, shardID, rangeID, previo } // UpdateShard mocks base method. -func (m *MocktableCRUD) UpdateShard(ctx context.Context, row *ShardRow, previousRangeID int64) error { +func (m *MocktableCRUD) UpdateShard(ctx context.Context, row *ShardRow, previousRangeID int64, timeStamp time.Time) error { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "UpdateShard", ctx, row, previousRangeID) ret0, _ := ret[0].(error) @@ -2282,7 +2282,7 @@ func (mr *MocktableCRUDMockRecorder) UpdateTaskList(ctx, row, previousRangeID an } // UpdateTaskListWithTTL mocks base method. -func (m *MocktableCRUD) UpdateTaskListWithTTL(ctx context.Context, ttlSeconds int64, row *TaskListRow, previousRangeID int64) error { +func (m *MocktableCRUD) UpdateTaskListWithTTL(ctx context.Context, ttlSeconds int64, row *TaskListRow, previousRangeID int64, timeStamp time.Time) error { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "UpdateTaskListWithTTL", ctx, ttlSeconds, row, previousRangeID) ret0, _ := ret[0].(error) @@ -2310,7 +2310,7 @@ func (mr *MocktableCRUDMockRecorder) UpdateVisibility(ctx, ttlSeconds, row any) } // UpdateWorkflowExecutionWithTasks mocks base method. -func (m *MocktableCRUD) UpdateWorkflowExecutionWithTasks(ctx context.Context, requests *WorkflowRequestsWriteRequest, currentWorkflowRequest *CurrentWorkflowWriteRequest, mutatedExecution, insertedExecution, resetExecution *WorkflowExecutionRequest, transferTasks []*TransferTask, crossClusterTasks []*CrossClusterTask, replicationTasks []*ReplicationTask, timerTasks []*TimerTask, shardCondition *ShardCondition) error { +func (m *MocktableCRUD) UpdateWorkflowExecutionWithTasks(ctx context.Context, requests *WorkflowRequestsWriteRequest, currentWorkflowRequest *CurrentWorkflowWriteRequest, mutatedExecution, insertedExecution, resetExecution *WorkflowExecutionRequest, transferTasks []*TransferTask, crossClusterTasks []*CrossClusterTask, replicationTasks []*ReplicationTask, timerTasks []*TimerTask, shardCondition *ShardCondition, timeStamp time.Time) error { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "UpdateWorkflowExecutionWithTasks", ctx, requests, currentWorkflowRequest, mutatedExecution, insertedExecution, resetExecution, transferTasks, crossClusterTasks, replicationTasks, timerTasks, shardCondition) ret0, _ := ret[0].(error) @@ -2822,7 +2822,7 @@ func (m *MockShardCRUD) EXPECT() *MockShardCRUDMockRecorder { } // InsertShard mocks base method. -func (m *MockShardCRUD) InsertShard(ctx context.Context, row *ShardRow) error { +func (m *MockShardCRUD) InsertShard(ctx context.Context, row *ShardRow, timeStamp time.Time) error { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "InsertShard", ctx, row) ret0, _ := ret[0].(error) @@ -2866,7 +2866,7 @@ func (mr *MockShardCRUDMockRecorder) UpdateRangeID(ctx, shardID, rangeID, previo } // UpdateShard mocks base method. -func (m *MockShardCRUD) UpdateShard(ctx context.Context, row *ShardRow, previousRangeID int64) error { +func (m *MockShardCRUD) UpdateShard(ctx context.Context, row *ShardRow, previousRangeID int64, timeStamp time.Time) error { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "UpdateShard", ctx, row, previousRangeID) ret0, _ := ret[0].(error) @@ -3131,7 +3131,7 @@ func (mr *MockTaskCRUDMockRecorder) UpdateTaskList(ctx, row, previousRangeID any } // UpdateTaskListWithTTL mocks base method. -func (m *MockTaskCRUD) UpdateTaskListWithTTL(ctx context.Context, ttlSeconds int64, row *TaskListRow, previousRangeID int64) error { +func (m *MockTaskCRUD) UpdateTaskListWithTTL(ctx context.Context, ttlSeconds int64, row *TaskListRow, previousRangeID int64, timeStamp time.Time) error { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "UpdateTaskListWithTTL", ctx, ttlSeconds, row, previousRangeID) ret0, _ := ret[0].(error) @@ -3267,7 +3267,7 @@ func (mr *MockWorkflowCRUDMockRecorder) DeleteWorkflowExecution(ctx, shardID, do } // InsertReplicationDLQTask mocks base method. -func (m *MockWorkflowCRUD) InsertReplicationDLQTask(ctx context.Context, shardID int, sourceCluster string, task ReplicationTask) error { +func (m *MockWorkflowCRUD) InsertReplicationDLQTask(ctx context.Context, shardID int, sourceCluster string, task ReplicationTask, timeStamp time.Time) error { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "InsertReplicationDLQTask", ctx, shardID, sourceCluster, task) ret0, _ := ret[0].(error) @@ -3281,7 +3281,7 @@ func (mr *MockWorkflowCRUDMockRecorder) InsertReplicationDLQTask(ctx, shardID, s } // InsertReplicationTask mocks base method. -func (m *MockWorkflowCRUD) InsertReplicationTask(ctx context.Context, tasks []*ReplicationTask, condition ShardCondition) error { +func (m *MockWorkflowCRUD) InsertReplicationTask(ctx context.Context, tasks []*ReplicationTask, condition ShardCondition, timeStamp time.Time) error { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "InsertReplicationTask", ctx, tasks, condition) ret0, _ := ret[0].(error) @@ -3295,7 +3295,7 @@ func (mr *MockWorkflowCRUDMockRecorder) InsertReplicationTask(ctx, tasks, condit } // InsertWorkflowExecutionWithTasks mocks base method. -func (m *MockWorkflowCRUD) InsertWorkflowExecutionWithTasks(ctx context.Context, requests *WorkflowRequestsWriteRequest, currentWorkflowRequest *CurrentWorkflowWriteRequest, execution *WorkflowExecutionRequest, transferTasks []*TransferTask, crossClusterTasks []*CrossClusterTask, replicationTasks []*ReplicationTask, timerTasks []*TimerTask, shardCondition *ShardCondition) error { +func (m *MockWorkflowCRUD) InsertWorkflowExecutionWithTasks(ctx context.Context, requests *WorkflowRequestsWriteRequest, currentWorkflowRequest *CurrentWorkflowWriteRequest, execution *WorkflowExecutionRequest, transferTasks []*TransferTask, crossClusterTasks []*CrossClusterTask, replicationTasks []*ReplicationTask, timerTasks []*TimerTask, shardCondition *ShardCondition, timeStamp time.Time) error { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "InsertWorkflowExecutionWithTasks", ctx, requests, currentWorkflowRequest, execution, transferTasks, crossClusterTasks, replicationTasks, timerTasks, shardCondition) ret0, _ := ret[0].(error) @@ -3551,7 +3551,7 @@ func (mr *MockWorkflowCRUDMockRecorder) SelectWorkflowExecution(ctx, shardID, do } // UpdateWorkflowExecutionWithTasks mocks base method. -func (m *MockWorkflowCRUD) UpdateWorkflowExecutionWithTasks(ctx context.Context, requests *WorkflowRequestsWriteRequest, currentWorkflowRequest *CurrentWorkflowWriteRequest, mutatedExecution, insertedExecution, resetExecution *WorkflowExecutionRequest, transferTasks []*TransferTask, crossClusterTasks []*CrossClusterTask, replicationTasks []*ReplicationTask, timerTasks []*TimerTask, shardCondition *ShardCondition) error { +func (m *MockWorkflowCRUD) UpdateWorkflowExecutionWithTasks(ctx context.Context, requests *WorkflowRequestsWriteRequest, currentWorkflowRequest *CurrentWorkflowWriteRequest, mutatedExecution, insertedExecution, resetExecution *WorkflowExecutionRequest, transferTasks []*TransferTask, crossClusterTasks []*CrossClusterTask, replicationTasks []*ReplicationTask, timerTasks []*TimerTask, shardCondition *ShardCondition, timeStamp time.Time) error { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "UpdateWorkflowExecutionWithTasks", ctx, requests, currentWorkflowRequest, mutatedExecution, insertedExecution, resetExecution, transferTasks, crossClusterTasks, replicationTasks, timerTasks, shardCondition) ret0, _ := ret[0].(error) diff --git a/common/persistence/nosql/nosqlplugin/mongodb/shard.go b/common/persistence/nosql/nosqlplugin/mongodb/shard.go index d2c9d53a36f..24cacc96a33 100644 --- a/common/persistence/nosql/nosqlplugin/mongodb/shard.go +++ b/common/persistence/nosql/nosqlplugin/mongodb/shard.go @@ -23,13 +23,14 @@ package mongodb import ( "context" "log" + "time" "github.com/uber/cadence/common/persistence/nosql/nosqlplugin" ) // InsertShard creates a new shard, return error is there is any. // Return ShardOperationConditionFailure if the condition doesn't meet -func (db *mdb) InsertShard(ctx context.Context, row *nosqlplugin.ShardRow) error { +func (db *mdb) InsertShard(ctx context.Context, row *nosqlplugin.ShardRow, timeStamp time.Time) error { log.Println("not implemented...ignore the error for testing...") return nil } @@ -47,6 +48,6 @@ func (db *mdb) UpdateRangeID(ctx context.Context, shardID int, rangeID int64, pr // UpdateShard updates a shard, return error is there is any. // Return ShardOperationConditionFailure if the condition doesn't meet -func (db *mdb) UpdateShard(ctx context.Context, row *nosqlplugin.ShardRow, previousRangeID int64) error { +func (db *mdb) UpdateShard(ctx context.Context, row *nosqlplugin.ShardRow, previousRangeID int64, timeStamp time.Time) error { panic("TODO") } diff --git a/common/persistence/nosql/nosqlplugin/mongodb/task.go b/common/persistence/nosql/nosqlplugin/mongodb/task.go index be9181f2222..f00f33052b2 100644 --- a/common/persistence/nosql/nosqlplugin/mongodb/task.go +++ b/common/persistence/nosql/nosqlplugin/mongodb/task.go @@ -22,6 +22,7 @@ package mongodb import ( "context" + "time" "github.com/uber/cadence/common/persistence/nosql/nosqlplugin" ) @@ -57,6 +58,7 @@ func (db *mdb) UpdateTaskListWithTTL( ttlSeconds int64, row *nosqlplugin.TaskListRow, previousRangeID int64, + timeStamp time.Time, ) error { panic("TODO") } diff --git a/common/persistence/nosql/nosqlplugin/mongodb/workflow.go b/common/persistence/nosql/nosqlplugin/mongodb/workflow.go index 95ad3ace3bf..831791693b3 100644 --- a/common/persistence/nosql/nosqlplugin/mongodb/workflow.go +++ b/common/persistence/nosql/nosqlplugin/mongodb/workflow.go @@ -41,6 +41,7 @@ func (db *mdb) InsertWorkflowExecutionWithTasks( replicationTasks []*nosqlplugin.ReplicationTask, timerTasks []*nosqlplugin.TimerTask, shardCondition *nosqlplugin.ShardCondition, + timeStamp time.Time, ) error { panic("TODO") } @@ -57,6 +58,7 @@ func (db *mdb) UpdateWorkflowExecutionWithTasks( replicationTasks []*nosqlplugin.ReplicationTask, timerTasks []*nosqlplugin.TimerTask, shardCondition *nosqlplugin.ShardCondition, + timeStamp time.Time, ) error { panic("TODO") } @@ -125,7 +127,7 @@ func (db *mdb) RangeDeleteReplicationTasks(ctx context.Context, shardID int, inc panic("TODO") } -func (db *mdb) InsertReplicationTask(ctx context.Context, tasks []*nosqlplugin.ReplicationTask, condition nosqlplugin.ShardCondition) error { +func (db *mdb) InsertReplicationTask(ctx context.Context, tasks []*nosqlplugin.ReplicationTask, condition nosqlplugin.ShardCondition, timeStamp time.Time) error { panic("TODO") } @@ -141,7 +143,7 @@ func (db *mdb) RangeDeleteCrossClusterTasks(ctx context.Context, shardID int, ta panic("TODO") } -func (db *mdb) InsertReplicationDLQTask(ctx context.Context, shardID int, sourceCluster string, task nosqlplugin.ReplicationTask) error { +func (db *mdb) InsertReplicationDLQTask(ctx context.Context, shardID int, sourceCluster string, task nosqlplugin.ReplicationTask, timeStamp time.Time) error { panic("TODO") }