diff --git a/common/persistence/data_manager_interfaces.go b/common/persistence/data_manager_interfaces.go index ba7b586c3de..4cb5b3a6572 100644 --- a/common/persistence/data_manager_interfaces.go +++ b/common/persistence/data_manager_interfaces.go @@ -1357,6 +1357,8 @@ type ( // DomainName to get metrics created with the domain DomainName string + + CreatedTime time.Time } // AppendHistoryNodesResponse is a response to AppendHistoryNodesRequest @@ -1438,6 +1440,8 @@ type ( ShardID *int // DomainName to create metrics for Domain Cost Attribution DomainName string + + CreatedTime time.Time } // ForkHistoryBranchResponse is the response to ForkHistoryBranchRequest diff --git a/common/persistence/data_store_interfaces.go b/common/persistence/data_store_interfaces.go index 4a5a32bf634..6ccc5a1282c 100644 --- a/common/persistence/data_store_interfaces.go +++ b/common/persistence/data_store_interfaces.go @@ -540,6 +540,8 @@ type ( TransactionID int64 // Used in sharded data stores to identify which shard to use ShardID int + + CreatedTime time.Time } // InternalGetWorkflowExecutionRequest is used to retrieve the info of a workflow execution @@ -578,6 +580,8 @@ type ( Info string // Used in sharded data stores to identify which shard to use ShardID int + + CreatedTime time.Time } // InternalForkHistoryBranchResponse is the response to ForkHistoryBranchRequest diff --git a/common/persistence/history_manager.go b/common/persistence/history_manager.go index dbaa5dd7f24..ade02f92d5f 100644 --- a/common/persistence/history_manager.go +++ b/common/persistence/history_manager.go @@ -29,6 +29,7 @@ import ( workflow "github.com/uber/cadence/.gen/go/shared" "github.com/uber/cadence/common" + "github.com/uber/cadence/common/clock" "github.com/uber/cadence/common/codec" "github.com/uber/cadence/common/dynamicconfig" "github.com/uber/cadence/common/log" @@ -65,6 +66,7 @@ type ( deserializeTokenFn func([]byte, int64) (*historyV2PagingToken, error) readRawHistoryBranchFn func(context.Context, *ReadHistoryBranchRequest) ([]*DataBlob, *historyV2PagingToken, int, log.Logger, error) readHistoryBranchFn func(context.Context, bool, *ReadHistoryBranchRequest) ([]*types.HistoryEvent, []*types.History, []byte, int, int64, error) + timeSrc clock.TimeSource } ) @@ -96,6 +98,7 @@ func NewHistoryV2ManagerImpl( transactionSizeLimit: transactionSizeLimit, serializeTokenFn: serializeToken, deserializeTokenFn: deserializeToken, + timeSrc: clock.NewRealTimeSource(), } hm.readRawHistoryBranchFn = hm.readRawHistoryBranch hm.readHistoryBranchFn = hm.readHistoryBranch @@ -134,6 +137,7 @@ func (m *historyV2ManagerImpl) ForkHistoryBranch( NewBranchID: uuid.New(), Info: request.Info, ShardID: shardID, + CreatedTime: m.timeSrc.Now(), } resp, err := m.persistence.ForkHistoryBranch(ctx, req) @@ -272,6 +276,7 @@ func (m *historyV2ManagerImpl) AppendHistoryNodes( Events: blob, TransactionID: request.TransactionID, ShardID: shardID, + CreatedTime: m.timeSrc.Now(), } err = m.persistence.AppendHistoryNodes(ctx, req) diff --git a/common/persistence/nosql/nosql_execution_store_util_test.go b/common/persistence/nosql/nosql_execution_store_util_test.go index 7c1a166483c..72d9a9861a1 100644 --- a/common/persistence/nosql/nosql_execution_store_util_test.go +++ b/common/persistence/nosql/nosql_execution_store_util_test.go @@ -39,6 +39,8 @@ import ( "github.com/uber/cadence/common/types" ) +var FixedTime = time.Date(2025, 1, 6, 15, 0, 0, 0, time.UTC) + func TestNosqlExecutionStoreUtils(t *testing.T) { testCases := []struct { name string @@ -60,7 +62,7 @@ func TestNosqlExecutionStoreUtils(t *testing.T) { Data: []byte(`[{"Branches":[{"BranchID":"test-branch-id","BeginNodeID":1,"EndNodeID":2}]}]`), }, } - return store.prepareCreateWorkflowExecutionRequestWithMaps(workflowSnapshot, time.Now()) + return store.prepareCreateWorkflowExecutionRequestWithMaps(workflowSnapshot, FixedTime) }, input: &persistence.InternalWorkflowSnapshot{}, validate: func(t *testing.T, req *nosqlplugin.WorkflowExecutionRequest, err error) { @@ -85,7 +87,7 @@ func TestNosqlExecutionStoreUtils(t *testing.T) { }, Checksum: checksum.Checksum{Value: nil}, } - return store.prepareCreateWorkflowExecutionRequestWithMaps(workflowSnapshot, time.Now()) + return store.prepareCreateWorkflowExecutionRequestWithMaps(workflowSnapshot, FixedTime) }, validate: func(t *testing.T, req *nosqlplugin.WorkflowExecutionRequest, err error) { assert.NoError(t, err) @@ -108,7 +110,7 @@ func TestNosqlExecutionStoreUtils(t *testing.T) { Data: []byte("[]"), // Empty VersionHistories }, } - return store.prepareCreateWorkflowExecutionRequestWithMaps(workflowSnapshot, time.Now()) + return store.prepareCreateWorkflowExecutionRequestWithMaps(workflowSnapshot, FixedTime) }, validate: func(t *testing.T, req *nosqlplugin.WorkflowExecutionRequest, err error) { assert.NoError(t, err) diff --git a/common/persistence/nosql/nosql_history_store.go b/common/persistence/nosql/nosql_history_store.go index 49b4bf3ac7c..ee90b6659cb 100644 --- a/common/persistence/nosql/nosql_history_store.go +++ b/common/persistence/nosql/nosql_history_store.go @@ -22,7 +22,6 @@ package nosql import ( "context" - "time" "github.com/uber/cadence/common" "github.com/uber/cadence/common/config" @@ -76,7 +75,7 @@ func (h *nosqlHistoryStore) AppendHistoryNodes( TreeID: branchInfo.TreeID, BranchID: branchInfo.BranchID, Ancestors: ancestors, - CreateTimestamp: time.Now(), + CreateTimestamp: request.CreatedTime, Info: request.Info, } } @@ -279,11 +278,12 @@ func (h *nosqlHistoryStore) ForkHistoryBranch( ancestors = append(ancestors, anc) } treeRow := &nosqlplugin.HistoryTreeRow{ - ShardID: request.ShardID, - TreeID: treeID, - BranchID: request.NewBranchID, - Ancestors: ancestors, - CreateTimestamp: time.Now(), + ShardID: request.ShardID, + TreeID: treeID, + BranchID: request.NewBranchID, + Ancestors: ancestors, + // TODO: Added time from `persistMngr` here as well, though it wasn’t in the plan. Is this okay? + CreateTimestamp: request.CreatedTime, Info: request.Info, } diff --git a/common/persistence/nosql/nosql_history_store_test.go b/common/persistence/nosql/nosql_history_store_test.go index 8fe2bee5bf8..22f43ed4860 100644 --- a/common/persistence/nosql/nosql_history_store_test.go +++ b/common/persistence/nosql/nosql_history_store_test.go @@ -68,6 +68,7 @@ func validInternalAppendHistoryNodesRequest() *persistence.InternalAppendHistory }, TransactionID: testTransactionID, ShardID: testShardID, + CreatedTime: time.Now(), } } @@ -349,6 +350,7 @@ func validInternalForkHistoryBranchRequest(forkNodeID int64) *persistence.Intern NewBranchID: "TestNewBranchID", Info: "TestInfo", ShardID: testShardID, + CreatedTime: time.Now(), } } @@ -394,6 +396,7 @@ func expectedTreeRow() *nosqlplugin.HistoryTreeRow { } func treeRowEqual(t *testing.T, expected, actual *nosqlplugin.HistoryTreeRow) { + t.Helper() assert.Equal(t, expected.ShardID, actual.ShardID) assert.Equal(t, expected.TreeID, actual.TreeID) assert.Equal(t, expected.BranchID, actual.BranchID) diff --git a/common/persistence/nosql/nosqlplugin/cassandra/history_events.go b/common/persistence/nosql/nosqlplugin/cassandra/history_events.go index ec9e4cac838..9055d21973a 100644 --- a/common/persistence/nosql/nosqlplugin/cassandra/history_events.go +++ b/common/persistence/nosql/nosqlplugin/cassandra/history_events.go @@ -34,7 +34,6 @@ import ( // InsertIntoHistoryTreeAndNode inserts one or two rows: tree row and node row(at least one of them) func (db *cdb) InsertIntoHistoryTreeAndNode(ctx context.Context, treeRow *nosqlplugin.HistoryTreeRow, nodeRow *nosqlplugin.HistoryNodeRow) error { - timeStamp := db.timeSrc.Now() if treeRow == nil && nodeRow == nil { return fmt.Errorf("require at least a tree row or a node row to insert") } @@ -54,19 +53,20 @@ func (db *cdb) InsertIntoHistoryTreeAndNode(ctx context.Context, treeRow *nosqlp // Note: for perf, prefer using batch for inserting more than one records batch := db.session.NewBatch(gocql.LoggedBatch).WithContext(ctx) batch.Query(v2templateInsertTree, - treeRow.TreeID, treeRow.BranchID, ancs, persistence.UnixNanoToDBTimestamp(treeRow.CreateTimestamp.UnixNano()), treeRow.Info, timeStamp) + // TODO: Should the `persistence.UnixNanoToDBTimestamp(treeRow.CreateTimestamp.UnixNano())` logic be handled by `persistenceManager` as well, or should it remain as is? + treeRow.TreeID, treeRow.BranchID, ancs, persistence.UnixNanoToDBTimestamp(treeRow.CreateTimestamp.UnixNano()), treeRow.Info) batch.Query(v2templateUpsertData, - nodeRow.TreeID, nodeRow.BranchID, nodeRow.NodeID, nodeRow.TxnID, nodeRow.Data, nodeRow.DataEncoding, timeStamp) + nodeRow.TreeID, nodeRow.BranchID, nodeRow.NodeID, nodeRow.TxnID, nodeRow.Data, nodeRow.DataEncoding) err = db.session.ExecuteBatch(batch) } else { var query gocql.Query if treeRow != nil { query = db.session.Query(v2templateInsertTree, - treeRow.TreeID, treeRow.BranchID, ancs, persistence.UnixNanoToDBTimestamp(treeRow.CreateTimestamp.UnixNano()), treeRow.Info, timeStamp).WithContext(ctx) + treeRow.TreeID, treeRow.BranchID, ancs, persistence.UnixNanoToDBTimestamp(treeRow.CreateTimestamp.UnixNano()), treeRow.Info).WithContext(ctx) } if nodeRow != nil { query = db.session.Query(v2templateUpsertData, - nodeRow.TreeID, nodeRow.BranchID, nodeRow.NodeID, nodeRow.TxnID, nodeRow.Data, nodeRow.DataEncoding, timeStamp).WithContext(ctx) + nodeRow.TreeID, nodeRow.BranchID, nodeRow.NodeID, nodeRow.TxnID, nodeRow.Data, nodeRow.DataEncoding).WithContext(ctx) } err = query.Exec() }