Skip to content

Commit

Permalink
history storage refactored
Browse files Browse the repository at this point in the history
  • Loading branch information
ribaraka committed Jan 29, 2025
1 parent 38fb989 commit e99014a
Show file tree
Hide file tree
Showing 7 changed files with 33 additions and 15 deletions.
4 changes: 4 additions & 0 deletions common/persistence/data_manager_interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -1357,6 +1357,8 @@ type (

// DomainName to get metrics created with the domain
DomainName string

CreatedTime time.Time
}

// AppendHistoryNodesResponse is a response to AppendHistoryNodesRequest
Expand Down Expand Up @@ -1438,6 +1440,8 @@ type (
ShardID *int
// DomainName to create metrics for Domain Cost Attribution
DomainName string

CreatedTime time.Time
}

// ForkHistoryBranchResponse is the response to ForkHistoryBranchRequest
Expand Down
4 changes: 4 additions & 0 deletions common/persistence/data_store_interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -540,6 +540,8 @@ type (
TransactionID int64
// Used in sharded data stores to identify which shard to use
ShardID int

CreatedTime time.Time
}

// InternalGetWorkflowExecutionRequest is used to retrieve the info of a workflow execution
Expand Down Expand Up @@ -578,6 +580,8 @@ type (
Info string
// Used in sharded data stores to identify which shard to use
ShardID int

CreatedTime time.Time
}

// InternalForkHistoryBranchResponse is the response to ForkHistoryBranchRequest
Expand Down
5 changes: 5 additions & 0 deletions common/persistence/history_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (

workflow "github.com/uber/cadence/.gen/go/shared"
"github.com/uber/cadence/common"
"github.com/uber/cadence/common/clock"
"github.com/uber/cadence/common/codec"
"github.com/uber/cadence/common/dynamicconfig"
"github.com/uber/cadence/common/log"
Expand Down Expand Up @@ -65,6 +66,7 @@ type (
deserializeTokenFn func([]byte, int64) (*historyV2PagingToken, error)
readRawHistoryBranchFn func(context.Context, *ReadHistoryBranchRequest) ([]*DataBlob, *historyV2PagingToken, int, log.Logger, error)
readHistoryBranchFn func(context.Context, bool, *ReadHistoryBranchRequest) ([]*types.HistoryEvent, []*types.History, []byte, int, int64, error)
timeSrc clock.TimeSource
}
)

Expand Down Expand Up @@ -96,6 +98,7 @@ func NewHistoryV2ManagerImpl(
transactionSizeLimit: transactionSizeLimit,
serializeTokenFn: serializeToken,
deserializeTokenFn: deserializeToken,
timeSrc: clock.NewRealTimeSource(),
}
hm.readRawHistoryBranchFn = hm.readRawHistoryBranch
hm.readHistoryBranchFn = hm.readHistoryBranch
Expand Down Expand Up @@ -134,6 +137,7 @@ func (m *historyV2ManagerImpl) ForkHistoryBranch(
NewBranchID: uuid.New(),
Info: request.Info,
ShardID: shardID,
CreatedTime: m.timeSrc.Now(),
}

resp, err := m.persistence.ForkHistoryBranch(ctx, req)
Expand Down Expand Up @@ -272,6 +276,7 @@ func (m *historyV2ManagerImpl) AppendHistoryNodes(
Events: blob,
TransactionID: request.TransactionID,
ShardID: shardID,
CreatedTime: m.timeSrc.Now(),
}

err = m.persistence.AppendHistoryNodes(ctx, req)
Expand Down
8 changes: 5 additions & 3 deletions common/persistence/nosql/nosql_execution_store_util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ import (
"github.com/uber/cadence/common/types"
)

var FixedTime = time.Date(2025, 1, 6, 15, 0, 0, 0, time.UTC)

func TestNosqlExecutionStoreUtils(t *testing.T) {
testCases := []struct {
name string
Expand All @@ -60,7 +62,7 @@ func TestNosqlExecutionStoreUtils(t *testing.T) {
Data: []byte(`[{"Branches":[{"BranchID":"test-branch-id","BeginNodeID":1,"EndNodeID":2}]}]`),
},
}
return store.prepareCreateWorkflowExecutionRequestWithMaps(workflowSnapshot, time.Now())
return store.prepareCreateWorkflowExecutionRequestWithMaps(workflowSnapshot, FixedTime)
},
input: &persistence.InternalWorkflowSnapshot{},
validate: func(t *testing.T, req *nosqlplugin.WorkflowExecutionRequest, err error) {
Expand All @@ -85,7 +87,7 @@ func TestNosqlExecutionStoreUtils(t *testing.T) {
},
Checksum: checksum.Checksum{Value: nil},
}
return store.prepareCreateWorkflowExecutionRequestWithMaps(workflowSnapshot, time.Now())
return store.prepareCreateWorkflowExecutionRequestWithMaps(workflowSnapshot, FixedTime)
},
validate: func(t *testing.T, req *nosqlplugin.WorkflowExecutionRequest, err error) {
assert.NoError(t, err)
Expand All @@ -108,7 +110,7 @@ func TestNosqlExecutionStoreUtils(t *testing.T) {
Data: []byte("[]"), // Empty VersionHistories
},
}
return store.prepareCreateWorkflowExecutionRequestWithMaps(workflowSnapshot, time.Now())
return store.prepareCreateWorkflowExecutionRequestWithMaps(workflowSnapshot, FixedTime)
},
validate: func(t *testing.T, req *nosqlplugin.WorkflowExecutionRequest, err error) {
assert.NoError(t, err)
Expand Down
14 changes: 7 additions & 7 deletions common/persistence/nosql/nosql_history_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ package nosql

import (
"context"
"time"

"github.com/uber/cadence/common"
"github.com/uber/cadence/common/config"
Expand Down Expand Up @@ -76,7 +75,7 @@ func (h *nosqlHistoryStore) AppendHistoryNodes(
TreeID: branchInfo.TreeID,
BranchID: branchInfo.BranchID,
Ancestors: ancestors,
CreateTimestamp: time.Now(),
CreateTimestamp: request.CreatedTime,
Info: request.Info,
}
}
Expand Down Expand Up @@ -279,11 +278,12 @@ func (h *nosqlHistoryStore) ForkHistoryBranch(
ancestors = append(ancestors, anc)
}
treeRow := &nosqlplugin.HistoryTreeRow{
ShardID: request.ShardID,
TreeID: treeID,
BranchID: request.NewBranchID,
Ancestors: ancestors,
CreateTimestamp: time.Now(),
ShardID: request.ShardID,
TreeID: treeID,
BranchID: request.NewBranchID,
Ancestors: ancestors,
// TODO: Added time from `persistMngr` here as well, though it wasn’t in the plan. Is this okay?
CreateTimestamp: request.CreatedTime,
Info: request.Info,
}

Expand Down
3 changes: 3 additions & 0 deletions common/persistence/nosql/nosql_history_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ func validInternalAppendHistoryNodesRequest() *persistence.InternalAppendHistory
},
TransactionID: testTransactionID,
ShardID: testShardID,
CreatedTime: time.Now(),
}
}

Expand Down Expand Up @@ -349,6 +350,7 @@ func validInternalForkHistoryBranchRequest(forkNodeID int64) *persistence.Intern
NewBranchID: "TestNewBranchID",
Info: "TestInfo",
ShardID: testShardID,
CreatedTime: time.Now(),
}
}

Expand Down Expand Up @@ -394,6 +396,7 @@ func expectedTreeRow() *nosqlplugin.HistoryTreeRow {
}

func treeRowEqual(t *testing.T, expected, actual *nosqlplugin.HistoryTreeRow) {
t.Helper()
assert.Equal(t, expected.ShardID, actual.ShardID)
assert.Equal(t, expected.TreeID, actual.TreeID)
assert.Equal(t, expected.BranchID, actual.BranchID)
Expand Down
10 changes: 5 additions & 5 deletions common/persistence/nosql/nosqlplugin/cassandra/history_events.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ import (

// InsertIntoHistoryTreeAndNode inserts one or two rows: tree row and node row(at least one of them)
func (db *cdb) InsertIntoHistoryTreeAndNode(ctx context.Context, treeRow *nosqlplugin.HistoryTreeRow, nodeRow *nosqlplugin.HistoryNodeRow) error {
timeStamp := db.timeSrc.Now()
if treeRow == nil && nodeRow == nil {
return fmt.Errorf("require at least a tree row or a node row to insert")
}
Expand All @@ -54,19 +53,20 @@ func (db *cdb) InsertIntoHistoryTreeAndNode(ctx context.Context, treeRow *nosqlp
// Note: for perf, prefer using batch for inserting more than one records
batch := db.session.NewBatch(gocql.LoggedBatch).WithContext(ctx)
batch.Query(v2templateInsertTree,
treeRow.TreeID, treeRow.BranchID, ancs, persistence.UnixNanoToDBTimestamp(treeRow.CreateTimestamp.UnixNano()), treeRow.Info, timeStamp)
// TODO: Should the `persistence.UnixNanoToDBTimestamp(treeRow.CreateTimestamp.UnixNano())` logic be handled by `persistenceManager` as well, or should it remain as is?
treeRow.TreeID, treeRow.BranchID, ancs, persistence.UnixNanoToDBTimestamp(treeRow.CreateTimestamp.UnixNano()), treeRow.Info)
batch.Query(v2templateUpsertData,
nodeRow.TreeID, nodeRow.BranchID, nodeRow.NodeID, nodeRow.TxnID, nodeRow.Data, nodeRow.DataEncoding, timeStamp)
nodeRow.TreeID, nodeRow.BranchID, nodeRow.NodeID, nodeRow.TxnID, nodeRow.Data, nodeRow.DataEncoding)
err = db.session.ExecuteBatch(batch)
} else {
var query gocql.Query
if treeRow != nil {
query = db.session.Query(v2templateInsertTree,
treeRow.TreeID, treeRow.BranchID, ancs, persistence.UnixNanoToDBTimestamp(treeRow.CreateTimestamp.UnixNano()), treeRow.Info, timeStamp).WithContext(ctx)
treeRow.TreeID, treeRow.BranchID, ancs, persistence.UnixNanoToDBTimestamp(treeRow.CreateTimestamp.UnixNano()), treeRow.Info).WithContext(ctx)
}
if nodeRow != nil {
query = db.session.Query(v2templateUpsertData,
nodeRow.TreeID, nodeRow.BranchID, nodeRow.NodeID, nodeRow.TxnID, nodeRow.Data, nodeRow.DataEncoding, timeStamp).WithContext(ctx)
nodeRow.TreeID, nodeRow.BranchID, nodeRow.NodeID, nodeRow.TxnID, nodeRow.Data, nodeRow.DataEncoding).WithContext(ctx)
}
err = query.Exec()
}
Expand Down

0 comments on commit e99014a

Please sign in to comment.