Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor peristence layer to support inserting history tasks of new categories #6671

Merged
merged 1 commit into from
Feb 20, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 43 additions & 12 deletions common/persistence/data_manager_interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,46 @@ const (
TaskListKindSticky
)

// HistoryTaskCategory represents various categories of history tasks
type HistoryTaskCategory struct {
categoryType int
categoryID int
}

func (c *HistoryTaskCategory) Type() int {
return c.categoryType
}

func (c *HistoryTaskCategory) ID() int {
return c.categoryID
}

const (
HistoryTaskCategoryTypeImmediate = iota + 1
HistoryTaskCategoryTypeScheduled
)

const (
HistoryTaskCategoryIDTransfer = 1
HistoryTaskCategoryIDTimer = 2
HistoryTaskCategoryIDReplication = 3
Comment on lines +197 to +199
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

category id to category type mapping is 1 to 1. Do we need the category type as a separate enum?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, at some place, I found I need to tell if the task is a scheduled task or immediate task.
And that requires another map to track which category is scheduled task and which category is immediate task.

)

var (
HistoryTaskCategoryTransfer = HistoryTaskCategory{
categoryType: HistoryTaskCategoryTypeImmediate,
categoryID: HistoryTaskCategoryIDTransfer,
}
HistoryTaskCategoryTimer = HistoryTaskCategory{
categoryType: HistoryTaskCategoryTypeScheduled,
categoryID: HistoryTaskCategoryIDTimer,
}
HistoryTaskCategoryReplication = HistoryTaskCategory{
categoryType: HistoryTaskCategoryTypeImmediate,
categoryID: HistoryTaskCategoryIDReplication,
}
)

// Transfer task types
const (
TransferTaskTypeDecisionTask = iota
Expand Down Expand Up @@ -806,10 +846,7 @@ type (
NewBufferedEvents []*types.HistoryEvent
ClearBufferedEvents bool

TransferTasks []Task
CrossClusterTasks []Task
ReplicationTasks []Task
TimerTasks []Task
TasksByCategory map[HistoryTaskCategory][]Task

WorkflowRequests []*WorkflowRequest

Expand All @@ -830,10 +867,7 @@ type (
SignalInfos []*SignalInfo
SignalRequestedIDs []string

TransferTasks []Task
CrossClusterTasks []Task
ReplicationTasks []Task
TimerTasks []Task
TasksByCategory map[HistoryTaskCategory][]Task

WorkflowRequests []*WorkflowRequest

Expand Down Expand Up @@ -1326,10 +1360,7 @@ type (
DeleteSignalInfoCount int
DeleteRequestCancelInfoCount int

TransferTasksCount int
CrossClusterTaskCount int
TimerTasksCount int
ReplicationTasksCount int
TaskCountByCategory map[HistoryTaskCategory]int
}

// UpdateWorkflowExecutionResponse is response for UpdateWorkflowExecutionRequest
Expand Down
10 changes: 2 additions & 8 deletions common/persistence/data_store_interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -466,10 +466,7 @@ type (
NewBufferedEvents *DataBlob
ClearBufferedEvents bool

TransferTasks []Task
CrossClusterTasks []Task
TimerTasks []Task
ReplicationTasks []Task
TasksByCategory map[HistoryTaskCategory][]Task

WorkflowRequests []*WorkflowRequest

Expand All @@ -493,10 +490,7 @@ type (
SignalInfos []*SignalInfo
SignalRequestedIDs []string

TransferTasks []Task
CrossClusterTasks []Task
TimerTasks []Task
ReplicationTasks []Task
TasksByCategory map[HistoryTaskCategory][]Task

WorkflowRequests []*WorkflowRequest

Expand Down
10 changes: 2 additions & 8 deletions common/persistence/execution_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -676,10 +676,7 @@ func (m *executionManagerImpl) SerializeWorkflowMutation(
NewBufferedEvents: serializedNewBufferedEvents,
ClearBufferedEvents: input.ClearBufferedEvents,

TransferTasks: input.TransferTasks,
CrossClusterTasks: input.CrossClusterTasks,
ReplicationTasks: input.ReplicationTasks,
TimerTasks: input.TimerTasks,
TasksByCategory: input.TasksByCategory,

WorkflowRequests: input.WorkflowRequests,

Expand Down Expand Up @@ -742,10 +739,7 @@ func (m *executionManagerImpl) SerializeWorkflowSnapshot(
SignalInfos: input.SignalInfos,
SignalRequestedIDs: input.SignalRequestedIDs,

TransferTasks: input.TransferTasks,
CrossClusterTasks: input.CrossClusterTasks,
ReplicationTasks: input.ReplicationTasks,
TimerTasks: input.TimerTasks,
TasksByCategory: input.TasksByCategory,

WorkflowRequests: input.WorkflowRequests,

Expand Down
68 changes: 36 additions & 32 deletions common/persistence/execution_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -426,14 +426,15 @@ func TestExecutionManager_UpdateWorkflowExecution(t *testing.T) {
res, err := manager.UpdateWorkflowExecution(context.Background(), request)
assert.NoError(t, err)
stats := &MutableStateUpdateSessionStats{
MutableStateSize: 90,
ExecutionInfoSize: 40,
ActivityInfoSize: 20,
TimerInfoSize: 10,
ChildInfoSize: 20,
ActivityInfoCount: 1,
TimerInfoCount: 2,
ChildInfoCount: 1,
MutableStateSize: 90,
ExecutionInfoSize: 40,
ActivityInfoSize: 20,
TimerInfoSize: 10,
ChildInfoSize: 20,
ActivityInfoCount: 1,
TimerInfoCount: 1,
ChildInfoCount: 1,
TaskCountByCategory: map[HistoryTaskCategory]int{},
}
assert.Equal(t, stats, res.MutableStateUpdateSessionStats)
}
Expand Down Expand Up @@ -1016,14 +1017,15 @@ func TestCreateWorkflowExecution(t *testing.T) {
checkRes: func(t *testing.T, response *CreateWorkflowExecutionResponse, err error) {
assert.Equal(t, &CreateWorkflowExecutionResponse{
MutableStateUpdateSessionStats: &MutableStateUpdateSessionStats{
MutableStateSize: 91,
ExecutionInfoSize: 20,
ActivityInfoSize: 29,
TimerInfoSize: 22,
ChildInfoSize: 20,
ActivityInfoCount: 1,
TimerInfoCount: 2,
ChildInfoCount: 1,
MutableStateSize: 91,
ExecutionInfoSize: 20,
ActivityInfoSize: 29,
TimerInfoSize: 22,
ChildInfoSize: 20,
ActivityInfoCount: 1,
TimerInfoCount: 2,
ChildInfoCount: 1,
TaskCountByCategory: map[HistoryTaskCategory]int{},
},
}, response)
},
Expand Down Expand Up @@ -1118,14 +1120,15 @@ func TestConflictResolveWorkflowExecution(t *testing.T) {
assert.NoError(t, err)
assert.Equal(t, &ConflictResolveWorkflowExecutionResponse{
MutableStateUpdateSessionStats: &MutableStateUpdateSessionStats{
MutableStateSize: 91,
ExecutionInfoSize: 20,
ActivityInfoSize: 29,
TimerInfoSize: 22,
ChildInfoSize: 20,
ActivityInfoCount: 1,
TimerInfoCount: 2,
ChildInfoCount: 1,
MutableStateSize: 91,
ExecutionInfoSize: 20,
ActivityInfoSize: 29,
TimerInfoSize: 22,
ChildInfoSize: 20,
ActivityInfoCount: 1,
TimerInfoCount: 2,
ChildInfoCount: 1,
TaskCountByCategory: map[HistoryTaskCategory]int{},
},
}, response)
},
Expand Down Expand Up @@ -1184,14 +1187,15 @@ func TestConflictResolveWorkflowExecution(t *testing.T) {
assert.NoError(t, err)
assert.Equal(t, &ConflictResolveWorkflowExecutionResponse{
MutableStateUpdateSessionStats: &MutableStateUpdateSessionStats{
MutableStateSize: 161,
ExecutionInfoSize: 40,
ActivityInfoSize: 49,
TimerInfoSize: 32,
ChildInfoSize: 40,
ActivityInfoCount: 2,
TimerInfoCount: 6,
ChildInfoCount: 2,
MutableStateSize: 161,
ExecutionInfoSize: 40,
ActivityInfoSize: 49,
TimerInfoSize: 32,
ChildInfoSize: 40,
ActivityInfoCount: 2,
TimerInfoCount: 3,
ChildInfoCount: 2,
TaskCountByCategory: map[HistoryTaskCategory]int{},
},
}, response)
},
Expand Down
67 changes: 35 additions & 32 deletions common/persistence/nosql/nosql_execution_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,10 +90,11 @@ func (d *nosqlExecutionStore) CreateWorkflowExecution(
return nil, err
}

transferTasks, crossClusterTasks, replicationTasks, timerTasks, err := d.prepareNoSQLTasksForWorkflowTxn(
tasksByCategory := map[persistence.HistoryTaskCategory][]*nosqlplugin.HistoryMigrationTask{}
err = d.prepareNoSQLTasksForWorkflowTxn(
domainID, workflowID, runID,
newWorkflow.TransferTasks, newWorkflow.CrossClusterTasks, newWorkflow.ReplicationTasks, newWorkflow.TimerTasks,
nil, nil, nil, nil,
newWorkflow.TasksByCategory,
tasksByCategory,
)
if err != nil {
return nil, err
Expand All @@ -115,7 +116,7 @@ func (d *nosqlExecutionStore) CreateWorkflowExecution(
ctx,
workflowRequestsWriteRequest,
currentWorkflowWriteReq, workflowExecutionWriteReq,
transferTasks, crossClusterTasks, replicationTasks, timerTasks,
tasksByCategory,
shardCondition,
)
if err != nil {
Expand Down Expand Up @@ -287,21 +288,18 @@ func (d *nosqlExecutionStore) UpdateWorkflowExecution(
}

var mutateExecution, insertExecution *nosqlplugin.WorkflowExecutionRequest
var nosqlTransferTasks []*nosqlplugin.TransferTask
var nosqlCrossClusterTasks []*nosqlplugin.CrossClusterTask
var nosqlReplicationTasks []*nosqlplugin.ReplicationTask
var nosqlTimerTasks []*nosqlplugin.TimerTask
var workflowRequests []*nosqlplugin.WorkflowRequestRow
tasksByCategory := map[persistence.HistoryTaskCategory][]*nosqlplugin.HistoryMigrationTask{}

// 1. current
mutateExecution, err = d.prepareUpdateWorkflowExecutionRequestWithMapsAndEventBuffer(&updateWorkflow)
if err != nil {
return err
}
nosqlTransferTasks, nosqlCrossClusterTasks, nosqlReplicationTasks, nosqlTimerTasks, err = d.prepareNoSQLTasksForWorkflowTxn(
err = d.prepareNoSQLTasksForWorkflowTxn(
domainID, workflowID, updateWorkflow.ExecutionInfo.RunID,
updateWorkflow.TransferTasks, updateWorkflow.CrossClusterTasks, updateWorkflow.ReplicationTasks, updateWorkflow.TimerTasks,
nosqlTransferTasks, nosqlCrossClusterTasks, nosqlReplicationTasks, nosqlTimerTasks,
updateWorkflow.TasksByCategory,
tasksByCategory,
)
if err != nil {
return err
Expand All @@ -315,10 +313,10 @@ func (d *nosqlExecutionStore) UpdateWorkflowExecution(
return err
}

nosqlTransferTasks, nosqlCrossClusterTasks, nosqlReplicationTasks, nosqlTimerTasks, err = d.prepareNoSQLTasksForWorkflowTxn(
err = d.prepareNoSQLTasksForWorkflowTxn(
domainID, workflowID, newWorkflow.ExecutionInfo.RunID,
newWorkflow.TransferTasks, newWorkflow.CrossClusterTasks, newWorkflow.ReplicationTasks, newWorkflow.TimerTasks,
nosqlTransferTasks, nosqlCrossClusterTasks, nosqlReplicationTasks, nosqlTimerTasks,
newWorkflow.TasksByCategory,
tasksByCategory,
)
if err != nil {
return err
Expand All @@ -339,7 +337,7 @@ func (d *nosqlExecutionStore) UpdateWorkflowExecution(
err = d.db.UpdateWorkflowExecutionWithTasks(
ctx, workflowRequestsWriteRequest, currentWorkflowWriteReq,
mutateExecution, insertExecution, nil, // no workflow to reset here
nosqlTransferTasks, nosqlCrossClusterTasks, nosqlReplicationTasks, nosqlTimerTasks,
tasksByCategory,
shardCondition)

return d.processUpdateWorkflowResult(err, request.RangeID)
Expand Down Expand Up @@ -422,22 +420,19 @@ func (d *nosqlExecutionStore) ConflictResolveWorkflowExecution(
}

var mutateExecution, insertExecution, resetExecution *nosqlplugin.WorkflowExecutionRequest
var nosqlTransferTasks []*nosqlplugin.TransferTask
var nosqlCrossClusterTasks []*nosqlplugin.CrossClusterTask
var nosqlReplicationTasks []*nosqlplugin.ReplicationTask
var nosqlTimerTasks []*nosqlplugin.TimerTask
var workflowRequests []*nosqlplugin.WorkflowRequestRow
tasksByCategory := map[persistence.HistoryTaskCategory][]*nosqlplugin.HistoryMigrationTask{}

// 1. current
if currentWorkflow != nil {
mutateExecution, err = d.prepareUpdateWorkflowExecutionRequestWithMapsAndEventBuffer(currentWorkflow)
if err != nil {
return err
}
nosqlTransferTasks, nosqlCrossClusterTasks, nosqlReplicationTasks, nosqlTimerTasks, err = d.prepareNoSQLTasksForWorkflowTxn(
err = d.prepareNoSQLTasksForWorkflowTxn(
domainID, workflowID, currentWorkflow.ExecutionInfo.RunID,
currentWorkflow.TransferTasks, currentWorkflow.CrossClusterTasks, currentWorkflow.ReplicationTasks, currentWorkflow.TimerTasks,
nosqlTransferTasks, nosqlCrossClusterTasks, nosqlReplicationTasks, nosqlTimerTasks,
currentWorkflow.TasksByCategory,
tasksByCategory,
)
if err != nil {
return err
Expand All @@ -450,10 +445,10 @@ func (d *nosqlExecutionStore) ConflictResolveWorkflowExecution(
if err != nil {
return err
}
nosqlTransferTasks, nosqlCrossClusterTasks, nosqlReplicationTasks, nosqlTimerTasks, err = d.prepareNoSQLTasksForWorkflowTxn(
err = d.prepareNoSQLTasksForWorkflowTxn(
domainID, workflowID, resetWorkflow.ExecutionInfo.RunID,
resetWorkflow.TransferTasks, resetWorkflow.CrossClusterTasks, resetWorkflow.ReplicationTasks, resetWorkflow.TimerTasks,
nosqlTransferTasks, nosqlCrossClusterTasks, nosqlReplicationTasks, nosqlTimerTasks,
resetWorkflow.TasksByCategory,
tasksByCategory,
)
if err != nil {
return err
Expand All @@ -467,10 +462,10 @@ func (d *nosqlExecutionStore) ConflictResolveWorkflowExecution(
return err
}

nosqlTransferTasks, nosqlCrossClusterTasks, nosqlReplicationTasks, nosqlTimerTasks, err = d.prepareNoSQLTasksForWorkflowTxn(
err = d.prepareNoSQLTasksForWorkflowTxn(
domainID, workflowID, newWorkflow.ExecutionInfo.RunID,
newWorkflow.TransferTasks, newWorkflow.CrossClusterTasks, newWorkflow.ReplicationTasks, newWorkflow.TimerTasks,
nosqlTransferTasks, nosqlCrossClusterTasks, nosqlReplicationTasks, nosqlTimerTasks,
newWorkflow.TasksByCategory,
tasksByCategory,
)
if err != nil {
return err
Expand All @@ -491,7 +486,7 @@ func (d *nosqlExecutionStore) ConflictResolveWorkflowExecution(
err = d.db.UpdateWorkflowExecutionWithTasks(
ctx, workflowRequestsWriteRequest, currentWorkflowWriteReq,
mutateExecution, insertExecution, resetExecution,
nosqlTransferTasks, nosqlCrossClusterTasks, nosqlReplicationTasks, nosqlTimerTasks,
tasksByCategory,
shardCondition)
return d.processUpdateWorkflowResult(err, request.RangeID)
}
Expand Down Expand Up @@ -759,7 +754,10 @@ func (d *nosqlExecutionStore) PutReplicationTaskToDLQ(
ctx context.Context,
request *persistence.InternalPutReplicationTaskToDLQRequest,
) error {
err := d.db.InsertReplicationDLQTask(ctx, d.shardID, request.SourceClusterName, *request.TaskInfo)
err := d.db.InsertReplicationDLQTask(ctx, d.shardID, request.SourceClusterName, &nosqlplugin.HistoryMigrationTask{
Replication: request.TaskInfo,
Task: nil, // TODO: encode task infor into datablob
})
if err != nil {
return convertCommonErrors(d.db, "PutReplicationTaskToDLQ", err)
}
Expand Down Expand Up @@ -829,15 +827,20 @@ func (d *nosqlExecutionStore) CreateFailoverMarkerTasks(
request *persistence.CreateFailoverMarkersRequest,
) error {

var nosqlTasks []*nosqlplugin.ReplicationTask
var nosqlTasks []*nosqlplugin.HistoryMigrationTask
for _, task := range request.Markers {
ts := []persistence.Task{task}

tasks, err := d.prepareReplicationTasksForWorkflowTxn(task.DomainID, rowTypeReplicationWorkflowID, rowTypeReplicationRunID, ts)
if err != nil {
return err
}
nosqlTasks = append(nosqlTasks, tasks...)
for _, task := range tasks {
nosqlTasks = append(nosqlTasks, &nosqlplugin.HistoryMigrationTask{
Replication: task,
Task: nil, // TODO: encode replication task into datablob
})
}
}

err := d.db.InsertReplicationTask(ctx, nosqlTasks, nosqlplugin.ShardCondition{
Expand Down
Loading
Loading