Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Move time source from db layer to PersistenceManager #6638

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 36 additions & 12 deletions common/persistence/nosql/nosql_execution_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ package nosql
import (
"context"
"fmt"
"time"

"github.com/uber/cadence/common/log"
"github.com/uber/cadence/common/log/tag"
Expand Down Expand Up @@ -114,9 +115,14 @@ func (d *nosqlExecutionStore) CreateWorkflowExecution(
err = d.db.InsertWorkflowExecutionWithTasks(
ctx,
workflowRequestsWriteRequest,
currentWorkflowWriteReq, workflowExecutionWriteReq,
transferTasks, crossClusterTasks, replicationTasks, timerTasks,
currentWorkflowWriteReq,
workflowExecutionWriteReq,
transferTasks,
crossClusterTasks,
replicationTasks,
timerTasks,
shardCondition,
time.Now(),
)
if err != nil {
conditionFailureErr, isConditionFailedError := err.(*nosqlplugin.WorkflowOperationConditionFailure)
Expand Down Expand Up @@ -337,10 +343,19 @@ func (d *nosqlExecutionStore) UpdateWorkflowExecution(
}

err = d.db.UpdateWorkflowExecutionWithTasks(
ctx, workflowRequestsWriteRequest, currentWorkflowWriteReq,
mutateExecution, insertExecution, nil, // no workflow to reset here
nosqlTransferTasks, nosqlCrossClusterTasks, nosqlReplicationTasks, nosqlTimerTasks,
shardCondition)
ctx,
workflowRequestsWriteRequest,
currentWorkflowWriteReq,
mutateExecution,
insertExecution,
nil,
nosqlTransferTasks,
nosqlCrossClusterTasks,
nosqlReplicationTasks,
nosqlTimerTasks,
shardCondition,
time.Now(),
)

return d.processUpdateWorkflowResult(err, request.RangeID)
}
Expand Down Expand Up @@ -489,10 +504,19 @@ func (d *nosqlExecutionStore) ConflictResolveWorkflowExecution(
}

err = d.db.UpdateWorkflowExecutionWithTasks(
ctx, workflowRequestsWriteRequest, currentWorkflowWriteReq,
mutateExecution, insertExecution, resetExecution,
nosqlTransferTasks, nosqlCrossClusterTasks, nosqlReplicationTasks, nosqlTimerTasks,
shardCondition)
ctx,
workflowRequestsWriteRequest,
currentWorkflowWriteReq,
mutateExecution,
insertExecution,
resetExecution,
nosqlTransferTasks,
nosqlCrossClusterTasks,
nosqlReplicationTasks,
nosqlTimerTasks,
shardCondition,
time.Now(),
)
return d.processUpdateWorkflowResult(err, request.RangeID)
}

Expand Down Expand Up @@ -759,7 +783,7 @@ func (d *nosqlExecutionStore) PutReplicationTaskToDLQ(
ctx context.Context,
request *persistence.InternalPutReplicationTaskToDLQRequest,
) error {
err := d.db.InsertReplicationDLQTask(ctx, d.shardID, request.SourceClusterName, *request.TaskInfo)
err := d.db.InsertReplicationDLQTask(ctx, d.shardID, request.SourceClusterName, *request.TaskInfo, time.Now())
if err != nil {
return convertCommonErrors(d.db, "PutReplicationTaskToDLQ", err)
}
Expand Down Expand Up @@ -843,7 +867,7 @@ func (d *nosqlExecutionStore) CreateFailoverMarkerTasks(
err := d.db.InsertReplicationTask(ctx, nosqlTasks, nosqlplugin.ShardCondition{
ShardID: d.shardID,
RangeID: request.RangeID,
})
}, time.Now())

if err != nil {
conditionFailureErr, isConditionFailedError := err.(*nosqlplugin.ShardOperationConditionFailure)
Expand Down
5 changes: 3 additions & 2 deletions common/persistence/nosql/nosql_shard_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ package nosql
import (
"context"
"fmt"
"time"

"github.com/uber/cadence/common/config"
"github.com/uber/cadence/common/log"
Expand Down Expand Up @@ -63,7 +64,7 @@ func (sh *nosqlShardStore) CreateShard(
if err != nil {
return err
}
err = storeShard.db.InsertShard(ctx, request.ShardInfo)
err = storeShard.db.InsertShard(ctx, request.ShardInfo, time.Now())
if err != nil {
conditionFailure, ok := err.(*nosqlplugin.ShardOperationConditionFailure)
if ok {
Expand Down Expand Up @@ -161,7 +162,7 @@ func (sh *nosqlShardStore) UpdateShard(
if err != nil {
return err
}
err = storeShard.db.UpdateShard(ctx, request.ShardInfo, request.PreviousRangeID)
err = storeShard.db.UpdateShard(ctx, request.ShardInfo, request.PreviousRangeID, time.Now())
if err != nil {
conditionFailure, ok := err.(*nosqlplugin.ShardOperationConditionFailure)
if ok {
Expand Down
2 changes: 1 addition & 1 deletion common/persistence/nosql/nosql_task_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ func (t *nosqlTaskStore) UpdateTaskList(
}

if tli.Kind == persistence.TaskListKindSticky { // if task_list is sticky, then update with TTL
err = storeShard.db.UpdateTaskListWithTTL(ctx, stickyTaskListTTL, taskListToUpdate, tli.RangeID)
err = storeShard.db.UpdateTaskListWithTTL(ctx, stickyTaskListTTL, taskListToUpdate, tli.RangeID, time.Now())
} else {
err = storeShard.db.UpdateTaskList(ctx, taskListToUpdate, tli.RangeID)
}
Expand Down
9 changes: 0 additions & 9 deletions common/persistence/nosql/nosqlplugin/cassandra/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
package cassandra

import (
"github.com/uber/cadence/common/clock"
"github.com/uber/cadence/common/config"
"github.com/uber/cadence/common/log"
"github.com/uber/cadence/common/log/tag"
Expand All @@ -37,7 +36,6 @@ type cdb struct {
session gocql.Session
cfg *config.NoSQL
dc *persistence.DynamicConfiguration
timeSrc clock.TimeSource
}

var _ nosqlplugin.DB = (*cdb)(nil)
Expand All @@ -53,12 +51,6 @@ func dbWithClient(client gocql.Client) cassandraDBOption {
}
}

func dbWithTimeSource(timeSrc clock.TimeSource) cassandraDBOption {
return func(db *cdb) {
db.timeSrc = timeSrc
}
}

// newCassandraDBFromSession returns a DB from a session
func newCassandraDBFromSession(
cfg *config.NoSQL,
Expand All @@ -72,7 +64,6 @@ func newCassandraDBFromSession(
logger: logger,
cfg: cfg,
dc: dc,
timeSrc: clock.NewRealTimeSource(),
}

for _, opt := range opts {
Expand Down
8 changes: 4 additions & 4 deletions common/persistence/nosql/nosqlplugin/cassandra/shard.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ import (

// InsertShard creates a new shard, return error is there is any.
// Return ShardOperationConditionFailure if the condition doesn't meet
func (db *cdb) InsertShard(ctx context.Context, row *nosqlplugin.ShardRow) error {
cqlNowTimestamp := persistence.UnixNanoToDBTimestamp(db.timeSrc.Now().UnixNano())
func (db *cdb) InsertShard(ctx context.Context, row *nosqlplugin.ShardRow, timeStamp time.Time) error {
cqlNowTimestamp := persistence.UnixNanoToDBTimestamp(timeStamp.UnixNano())
markerData, markerEncoding := persistence.FromDataBlob(row.PendingFailoverMarkers)
transferPQS, transferPQSEncoding := persistence.FromDataBlob(row.TransferProcessingQueueStates)
timerPQS, timerPQSEncoding := persistence.FromDataBlob(row.TimerProcessingQueueStates)
Expand Down Expand Up @@ -233,8 +233,8 @@ func (db *cdb) UpdateRangeID(ctx context.Context, shardID int, rangeID int64, pr

// UpdateShard updates a shard, return error is there is any.
// Return ShardOperationConditionFailure if the condition doesn't meet
func (db *cdb) UpdateShard(ctx context.Context, row *nosqlplugin.ShardRow, previousRangeID int64) error {
cqlNowTimestamp := persistence.UnixNanoToDBTimestamp(db.timeSrc.Now().UnixNano())
func (db *cdb) UpdateShard(ctx context.Context, row *nosqlplugin.ShardRow, previousRangeID int64, timeStamp time.Time) error {
cqlNowTimestamp := persistence.UnixNanoToDBTimestamp(timeStamp.UnixNano())
markerData, markerEncoding := persistence.FromDataBlob(row.PendingFailoverMarkers)
transferPQS, transferPQSEncoding := persistence.FromDataBlob(row.TransferProcessingQueueStates)
timerPQS, timerPQSEncoding := persistence.FromDataBlob(row.TimerProcessingQueueStates)
Expand Down
26 changes: 8 additions & 18 deletions common/persistence/nosql/nosqlplugin/cassandra/shard_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ import (
"github.com/google/go-cmp/cmp"
"go.uber.org/mock/gomock"

"github.com/uber/cadence/common/clock"
"github.com/uber/cadence/common/config"
"github.com/uber/cadence/common/log/testlogger"
"github.com/uber/cadence/common/persistence"
Expand Down Expand Up @@ -112,11 +111,9 @@ func TestInsertShard(t *testing.T) {
cfg := &config.NoSQL{}
logger := testlogger.New(t)
dc := &persistence.DynamicConfiguration{}
timeSrc := clock.NewMockedTimeSourceAt(ts)
db := newCassandraDBFromSession(cfg, session, logger, dc, dbWithClient(client), dbWithTimeSource(timeSrc))

err := db.InsertShard(context.Background(), tc.row)
db := newCassandraDBFromSession(cfg, session, logger, dc, dbWithClient(client))

err := db.InsertShard(context.Background(), tc.row, ts)
if (err != nil) != tc.wantErr {
t.Errorf("InsertShard() error = %v, wantErr %v", err, tc.wantErr)
}
Expand Down Expand Up @@ -258,8 +255,8 @@ func TestSelectShard(t *testing.T) {
cfg := &config.NoSQL{}
logger := testlogger.New(t)
dc := &persistence.DynamicConfiguration{}
timeSrc := clock.NewMockedTimeSourceAt(ts)
db := newCassandraDBFromSession(cfg, session, logger, dc, dbWithClient(client), dbWithTimeSource(timeSrc))

db := newCassandraDBFromSession(cfg, session, logger, dc, dbWithClient(client))

gotRangeID, gotShardInfo, err := db.SelectShard(context.Background(), tc.shardID, tc.cluster)

Expand Down Expand Up @@ -287,11 +284,6 @@ func TestSelectShard(t *testing.T) {
}

func TestUpdateRangeID(t *testing.T) {
ts, err := time.Parse(time.RFC3339, "2024-04-02T18:00:00Z")
if err != nil {
t.Fatalf("Failed to parse time: %v", err)
}

tests := []struct {
name string
shardID int
Expand Down Expand Up @@ -365,8 +357,8 @@ func TestUpdateRangeID(t *testing.T) {
cfg := &config.NoSQL{}
logger := testlogger.New(t)
dc := &persistence.DynamicConfiguration{}
timeSrc := clock.NewMockedTimeSourceAt(ts)
db := newCassandraDBFromSession(cfg, session, logger, dc, dbWithClient(client), dbWithTimeSource(timeSrc))

db := newCassandraDBFromSession(cfg, session, logger, dc, dbWithClient(client))

err := db.UpdateRangeID(context.Background(), tc.shardID, tc.rangeID, tc.prevRangeID)

Expand Down Expand Up @@ -480,11 +472,9 @@ func TestUpdateShard(t *testing.T) {
cfg := &config.NoSQL{}
logger := testlogger.New(t)
dc := &persistence.DynamicConfiguration{}
timeSrc := clock.NewMockedTimeSourceAt(ts)
db := newCassandraDBFromSession(cfg, session, logger, dc, dbWithClient(client), dbWithTimeSource(timeSrc))

err := db.UpdateShard(context.Background(), tc.row, tc.prevRangeID)
db := newCassandraDBFromSession(cfg, session, logger, dc, dbWithClient(client))

err := db.UpdateShard(context.Background(), tc.row, tc.prevRangeID, ts)
if (err != nil) != tc.wantErr {
t.Errorf("UpdateShard() error = %v, wantErr %v", err, tc.wantErr)
}
Expand Down
3 changes: 2 additions & 1 deletion common/persistence/nosql/nosqlplugin/cassandra/tasks.go
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,7 @@ func (db *cdb) UpdateTaskListWithTTL(
ttlSeconds int64,
row *nosqlplugin.TaskListRow,
previousRangeID int64,
timeStamp time.Time,
) error {
batch := db.session.NewBatch(gocql.LoggedBatch).WithContext(ctx)
// part 1 is used to set TTL on primary key as UPDATE can't set TTL for primary key
Expand All @@ -281,7 +282,7 @@ func (db *cdb) UpdateTaskListWithTTL(
row.TaskListType,
row.AckLevel,
row.TaskListKind,
db.timeSrc.Now(),
timeStamp,
fromTaskListPartitionConfig(row.AdaptivePartitionConfig),
row.DomainID,
row.TaskListName,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ import (
"github.com/google/go-cmp/cmp"
"go.uber.org/mock/gomock"

"github.com/uber/cadence/common/clock"
"github.com/uber/cadence/common/config"
"github.com/uber/cadence/common/log/testlogger"
"github.com/uber/cadence/common/persistence"
Expand Down Expand Up @@ -543,10 +542,9 @@ func TestUpdateTaskListWithTTL(t *testing.T) {
logger := testlogger.New(t)
dc := &persistence.DynamicConfiguration{}

timeSrc := clock.NewMockedTimeSourceAt(ts)
db := newCassandraDBFromSession(cfg, session, logger, dc, dbWithClient(client), dbWithTimeSource(timeSrc))
db := newCassandraDBFromSession(cfg, session, logger, dc, dbWithClient(client))

err := db.UpdateTaskListWithTTL(context.Background(), tc.ttlSeconds, tc.row, tc.prevRangeID)
err := db.UpdateTaskListWithTTL(context.Background(), tc.ttlSeconds, tc.row, tc.prevRangeID, ts)

if (err != nil) != tc.wantErr {
t.Errorf("UpdateTaskListWithTTL() error = %v, wantErr %v", err, tc.wantErr)
Expand Down Expand Up @@ -823,8 +821,7 @@ func TestInsertTasks(t *testing.T) {
logger := testlogger.New(t)
dc := &persistence.DynamicConfiguration{}

timeSrc := clock.NewMockedTimeSourceAt(ts)
db := newCassandraDBFromSession(cfg, session, logger, dc, dbWithClient(client), dbWithTimeSource(timeSrc))
db := newCassandraDBFromSession(cfg, session, logger, dc, dbWithClient(client))

err := db.InsertTasks(context.Background(), tc.tasksToInsert, tc.tasklistCond)

Expand Down
17 changes: 11 additions & 6 deletions common/persistence/nosql/nosqlplugin/cassandra/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,11 @@ func (db *cdb) InsertWorkflowExecutionWithTasks(
replicationTasks []*nosqlplugin.ReplicationTask,
timerTasks []*nosqlplugin.TimerTask,
shardCondition *nosqlplugin.ShardCondition,
timeStamp time.Time,
) error {
shardID := shardCondition.ShardID
domainID := execution.DomainID
workflowID := execution.WorkflowID
timeStamp := db.timeSrc.Now()

batch := db.session.NewBatch(gocql.LoggedBatch).WithContext(ctx)

Expand Down Expand Up @@ -126,11 +126,11 @@ func (db *cdb) UpdateWorkflowExecutionWithTasks(
replicationTasks []*nosqlplugin.ReplicationTask,
timerTasks []*nosqlplugin.TimerTask,
shardCondition *nosqlplugin.ShardCondition,
timeStamp time.Time,
) error {
shardID := shardCondition.ShardID
var domainID, workflowID string
var previousNextEventIDCondition int64
timeStamp := db.timeSrc.Now()
if mutatedExecution != nil {
domainID = mutatedExecution.DomainID
workflowID = mutatedExecution.WorkflowID
Expand Down Expand Up @@ -626,7 +626,7 @@ func (db *cdb) RangeDeleteCrossClusterTasks(ctx context.Context, shardID int, ta
return query.Exec()
}

func (db *cdb) InsertReplicationDLQTask(ctx context.Context, shardID int, sourceCluster string, task nosqlplugin.ReplicationTask) error {
func (db *cdb) InsertReplicationDLQTask(ctx context.Context, shardID int, sourceCluster string, task nosqlplugin.ReplicationTask, timeStamp time.Time) error {
// Use source cluster name as the workflow id for replication dlq
query := db.session.Query(templateCreateReplicationTaskQuery,
shardID,
Expand All @@ -650,7 +650,7 @@ func (db *cdb) InsertReplicationDLQTask(ctx context.Context, shardID int, source
defaultVisibilityTimestamp,
defaultVisibilityTimestamp,
task.TaskID,
db.timeSrc.Now(),
timeStamp,
).WithContext(ctx)

return query.Exec()
Expand Down Expand Up @@ -720,14 +720,19 @@ func (db *cdb) RangeDeleteReplicationDLQTasks(ctx context.Context, shardID int,
return db.executeWithConsistencyAll(query)
}

func (db *cdb) InsertReplicationTask(ctx context.Context, tasks []*nosqlplugin.ReplicationTask, shardCondition nosqlplugin.ShardCondition) error {
func (db *cdb) InsertReplicationTask(
ctx context.Context,
tasks []*nosqlplugin.ReplicationTask,
shardCondition nosqlplugin.ShardCondition,
timeStamp time.Time,
) error {
if len(tasks) == 0 {
return nil
}

shardID := shardCondition.ShardID
batch := db.session.NewBatch(gocql.LoggedBatch).WithContext(ctx)
timeStamp := db.timeSrc.Now()

for _, task := range tasks {
createReplicationTasks(batch, shardID, task.DomainID, task.WorkflowID, []*nosqlplugin.ReplicationTask{task}, timeStamp)
}
Expand Down
Loading