Skip to content

Commit

Permalink
Update Cassandra to write history tasks to new columns
Browse files Browse the repository at this point in the history
  • Loading branch information
Shaddoll committed Feb 24, 2025
1 parent df2683f commit cb01189
Show file tree
Hide file tree
Showing 25 changed files with 2,241 additions and 1,398 deletions.
21 changes: 18 additions & 3 deletions common/persistence/client/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -526,10 +526,20 @@ func (f *factoryImpl) init(clusterName string, limiters map[string]quotas.Limite
defaultDataStore := Datastore{ratelimit: limiters[f.config.DefaultStore]}
switch {
case defaultCfg.NoSQL != nil:
parser, err := serialization.NewParser(common.EncodingTypeThriftRW, common.EncodingTypeThriftRW)
if err != nil {
f.logger.Fatal("failed to construct parser", tag.Error(err))
}
taskSerializer := serialization.NewTaskSerializer(parser)
shardedNoSQLConfig := defaultCfg.NoSQL.ConvertToShardedNoSQLConfig()
defaultDataStore.factory = nosql.NewFactory(*shardedNoSQLConfig, clusterName, f.logger, f.dc)
defaultDataStore.factory = nosql.NewFactory(*shardedNoSQLConfig, clusterName, f.logger, taskSerializer, f.dc)
case defaultCfg.ShardedNoSQL != nil:
defaultDataStore.factory = nosql.NewFactory(*defaultCfg.ShardedNoSQL, clusterName, f.logger, f.dc)
parser, err := serialization.NewParser(common.EncodingTypeThriftRW, common.EncodingTypeThriftRW)
if err != nil {
f.logger.Fatal("failed to construct parser", tag.Error(err))
}
taskSerializer := serialization.NewTaskSerializer(parser)
defaultDataStore.factory = nosql.NewFactory(*defaultCfg.ShardedNoSQL, clusterName, f.logger, taskSerializer, f.dc)
case defaultCfg.SQL != nil:
if defaultCfg.SQL.EncodingType == "" {
defaultCfg.SQL.EncodingType = string(common.EncodingTypeThriftRW)
Expand Down Expand Up @@ -572,8 +582,13 @@ func (f *factoryImpl) init(clusterName string, limiters map[string]quotas.Limite
visibilityDataStore := Datastore{ratelimit: limiters[f.config.VisibilityStore]}
switch {
case visibilityCfg.NoSQL != nil:
parser, err := serialization.NewParser(common.EncodingTypeThriftRW, common.EncodingTypeThriftRW)
if err != nil {
f.logger.Fatal("failed to construct parser", tag.Error(err))
}
taskSerializer := serialization.NewTaskSerializer(parser)
shardedNoSQLConfig := visibilityCfg.NoSQL.ConvertToShardedNoSQLConfig()
visibilityDataStore.factory = nosql.NewFactory(*shardedNoSQLConfig, clusterName, f.logger, f.dc)
visibilityDataStore.factory = nosql.NewFactory(*shardedNoSQLConfig, clusterName, f.logger, taskSerializer, f.dc)
case visibilityCfg.SQL != nil:
var decodingTypes []common.EncodingType
for _, dt := range visibilityCfg.SQL.DecodingTypes {
Expand Down
20 changes: 13 additions & 7 deletions common/persistence/nosql/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/uber/cadence/common/config"
"github.com/uber/cadence/common/log"
"github.com/uber/cadence/common/persistence"
"github.com/uber/cadence/common/persistence/serialization"
)

type (
Expand All @@ -37,22 +38,25 @@ type (
logger log.Logger
execStoreFactory *executionStoreFactory
dc *persistence.DynamicConfiguration
taskSerializer serialization.TaskSerializer
}

executionStoreFactory struct {
logger log.Logger
shardedNosqlStore shardedNosqlStore
taskSerializer serialization.TaskSerializer
}
)

// NewFactory returns an instance of a factory object which can be used to create
// datastores that are backed by cassandra
func NewFactory(cfg config.ShardedNoSQL, clusterName string, logger log.Logger, dc *persistence.DynamicConfiguration) *Factory {
func NewFactory(cfg config.ShardedNoSQL, clusterName string, logger log.Logger, taskSerializer serialization.TaskSerializer, dc *persistence.DynamicConfiguration) *Factory {
return &Factory{
cfg: cfg,
clusterName: clusterName,
logger: logger,
dc: dc,
cfg: cfg,
clusterName: clusterName,
logger: logger,
taskSerializer: taskSerializer,
dc: dc,
}
}

Expand Down Expand Up @@ -122,7 +126,7 @@ func (f *Factory) executionStoreFactory() (*executionStoreFactory, error) {
return f.execStoreFactory, nil
}

factory, err := newExecutionStoreFactory(f.cfg, f.logger, f.dc)
factory, err := newExecutionStoreFactory(f.cfg, f.logger, f.taskSerializer, f.dc)
if err != nil {
return nil, err
}
Expand All @@ -134,13 +138,15 @@ func (f *Factory) executionStoreFactory() (*executionStoreFactory, error) {
func newExecutionStoreFactory(
cfg config.ShardedNoSQL,
logger log.Logger,
taskSerializer serialization.TaskSerializer,
dc *persistence.DynamicConfiguration,
) (*executionStoreFactory, error) {
s, err := newShardedNosqlStore(cfg, logger, dc)
if err != nil {
return nil, err
}
return &executionStoreFactory{
taskSerializer: taskSerializer,
logger: logger,
shardedNosqlStore: s,
}, nil
Expand All @@ -156,7 +162,7 @@ func (f *executionStoreFactory) new(shardID int) (persistence.ExecutionStore, er
if err != nil {
return nil, err
}
pmgr, err := NewExecutionStore(shardID, storeShard.db, f.logger)
pmgr, err := NewExecutionStore(shardID, storeShard.db, f.logger, f.taskSerializer)
if err != nil {
return nil, err
}
Expand Down
13 changes: 6 additions & 7 deletions common/persistence/nosql/nosql_execution_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,27 +28,31 @@ import (
"github.com/uber/cadence/common/log/tag"
"github.com/uber/cadence/common/persistence"
"github.com/uber/cadence/common/persistence/nosql/nosqlplugin"
"github.com/uber/cadence/common/persistence/serialization"
"github.com/uber/cadence/common/types"
)

// Implements ExecutionStore
type nosqlExecutionStore struct {
shardID int
nosqlStore
taskSerializer serialization.TaskSerializer
}

// NewExecutionStore is used to create an instance of ExecutionStore implementation
func NewExecutionStore(
shardID int,
db nosqlplugin.DB,
logger log.Logger,
taskSerializer serialization.TaskSerializer,
) (persistence.ExecutionStore, error) {
return &nosqlExecutionStore{
nosqlStore: nosqlStore{
logger: logger,
db: db,
},
shardID: shardID,
shardID: shardID,
taskSerializer: taskSerializer,
}, nil
}

Expand Down Expand Up @@ -835,12 +839,7 @@ func (d *nosqlExecutionStore) CreateFailoverMarkerTasks(
if err != nil {
return err
}
for _, task := range tasks {
nosqlTasks = append(nosqlTasks, &nosqlplugin.HistoryMigrationTask{
Replication: task,
Task: nil, // TODO: encode replication task into datablob
})
}
nosqlTasks = append(nosqlTasks, tasks...)
}

err := d.db.InsertReplicationTask(ctx, nosqlTasks, nosqlplugin.ShardCondition{
Expand Down
40 changes: 33 additions & 7 deletions common/persistence/nosql/nosql_execution_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
"github.com/uber/cadence/common/log"
"github.com/uber/cadence/common/persistence"
"github.com/uber/cadence/common/persistence/nosql/nosqlplugin"
"github.com/uber/cadence/common/persistence/serialization"
"github.com/uber/cadence/common/types"
"github.com/uber/cadence/service/history/constants"
)
Expand Down Expand Up @@ -284,7 +285,8 @@ func TestUpdateWorkflowExecution(t *testing.T) {
t.Run(tc.name, func(t *testing.T) {
controller := gomock.NewController(t)
mockDB := nosqlplugin.NewMockDB(controller)
store, _ := NewExecutionStore(1, mockDB, log.NewNoop())
mockTaskSerializer := serialization.NewMockTaskSerializer(controller)
store, _ := NewExecutionStore(1, mockDB, log.NewNoop(), mockTaskSerializer)

tc.setupMock(mockDB, 1)

Expand Down Expand Up @@ -1346,7 +1348,7 @@ func TestCreateFailoverMarkerTasks(t *testing.T) {
name string
rangeID int64
markers []*persistence.FailoverMarkerTask
setupMock func(*nosqlplugin.MockDB)
setupMock func(*nosqlplugin.MockDB, *serialization.MockTaskSerializer)
expectedError error
}{
{
Expand All @@ -1358,13 +1360,31 @@ func TestCreateFailoverMarkerTasks(t *testing.T) {
DomainID: "testDomainID",
},
},
setupMock: func(mockDB *nosqlplugin.MockDB) {
setupMock: func(mockDB *nosqlplugin.MockDB, mockTaskSerializer *serialization.MockTaskSerializer) {
mockTaskSerializer.EXPECT().SerializeTask(persistence.HistoryTaskCategoryReplication, gomock.Any()).Return(persistence.DataBlob{
Data: []byte("1"),
Encoding: common.EncodingTypeThriftRW,
}, nil)
mockDB.EXPECT().
InsertReplicationTask(ctx, gomock.Any(), nosqlplugin.ShardCondition{ShardID: shardID, RangeID: 123}).
Return(nil)
},
expectedError: nil,
},
{
name: "serialization error",
rangeID: 123,
markers: []*persistence.FailoverMarkerTask{
{
TaskData: persistence.TaskData{},
DomainID: "testDomainID",
},
},
setupMock: func(mockDB *nosqlplugin.MockDB, mockTaskSerializer *serialization.MockTaskSerializer) {
mockTaskSerializer.EXPECT().SerializeTask(persistence.HistoryTaskCategoryReplication, gomock.Any()).Return(persistence.DataBlob{}, errors.New("some error"))
},
expectedError: errors.New("some error"),
},
{
name: "CreateFailoverMarkerTasks failure - ShardOperationConditionFailure",
rangeID: 123,
Expand All @@ -1374,7 +1394,11 @@ func TestCreateFailoverMarkerTasks(t *testing.T) {
DomainID: "testDomainID",
},
},
setupMock: func(mockDB *nosqlplugin.MockDB) {
setupMock: func(mockDB *nosqlplugin.MockDB, mockTaskSerializer *serialization.MockTaskSerializer) {
mockTaskSerializer.EXPECT().SerializeTask(persistence.HistoryTaskCategoryReplication, gomock.Any()).Return(persistence.DataBlob{
Data: []byte("1"),
Encoding: common.EncodingTypeThriftRW,
}, nil)
conditionFailureErr := &nosqlplugin.ShardOperationConditionFailure{
RangeID: 123, // Use direct int64 value
Details: "Shard condition failed", // Use direct string value
Expand All @@ -1394,9 +1418,10 @@ func TestCreateFailoverMarkerTasks(t *testing.T) {
controller := gomock.NewController(t)

mockDB := nosqlplugin.NewMockDB(controller)
store := newTestNosqlExecutionStore(mockDB, log.NewNoop())
mockTaskSerializer := serialization.NewMockTaskSerializer(controller)
store := newTestNosqlExecutionStoreWithTaskSerializer(mockDB, log.NewNoop(), mockTaskSerializer)

tc.setupMock(mockDB)
tc.setupMock(mockDB, mockTaskSerializer)

err := store.CreateFailoverMarkerTasks(ctx, &persistence.CreateFailoverMarkersRequest{
RangeID: tc.rangeID,
Expand Down Expand Up @@ -1483,7 +1508,8 @@ func TestConflictResolveWorkflowExecution(t *testing.T) {
gomockController := gomock.NewController(t)

mockDB := nosqlplugin.NewMockDB(gomockController)
store, err := NewExecutionStore(1, mockDB, log.NewNoop())
mockTaskSerializer := serialization.NewMockTaskSerializer(gomockController)
store, err := NewExecutionStore(1, mockDB, log.NewNoop(), mockTaskSerializer)
require.NoError(t, err)

tests := []struct {
Expand Down
Loading

0 comments on commit cb01189

Please sign in to comment.