Skip to content

Commit

Permalink
task storage refactored
Browse files Browse the repository at this point in the history
  • Loading branch information
ribaraka committed Feb 4, 2025
1 parent f1bb070 commit 3472e3e
Show file tree
Hide file tree
Showing 8 changed files with 62 additions and 37 deletions.
3 changes: 3 additions & 0 deletions common/persistence/data_manager_interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -1019,6 +1019,7 @@ type (
TaskType int
TaskListKind int
RangeID int64
UpdatedTime time.Time
}

// LeaseTaskListResponse is response to LeaseTaskListRequest
Expand All @@ -1041,6 +1042,7 @@ type (
UpdateTaskListRequest struct {
TaskListInfo *TaskListInfo
DomainName string
UpdatedTime time.Time
}

// UpdateTaskListResponse is the response to UpdateTaskList
Expand Down Expand Up @@ -1085,6 +1087,7 @@ type (
TaskListInfo *TaskListInfo
Tasks []*CreateTaskInfo
DomainName string
CreatedTime time.Time
}

// CreateTaskInfo describes a task to be created in CreateTasksRequest
Expand Down
2 changes: 2 additions & 0 deletions common/persistence/execution_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ var (
testSearchAttributes1 = map[string]interface{}{"TestAttr1": "val1", "TestAttr2": 2, "TestAttr3": false}
testSearchAttributes2 = map[string]interface{}{"TestAttr1": "val2", "TestAttr2": 2, "TestAttr3": false}
testSearchAttributes3 = map[string]interface{}{"TestAttr2": 2, "TestAttr3": false}
fixedTime = time.Date(2025, 1, 6, 15, 0, 0, 0, time.UTC)
)

func TestExecutionManager_ProxyStoreMethods(t *testing.T) {
Expand Down Expand Up @@ -1001,6 +1002,7 @@ func TestCreateWorkflowExecution(t *testing.T) {
PreviousLastWriteVersion: 1,
NewWorkflowSnapshot: *sampleInternalWorkflowSnapshot(),
WorkflowRequestMode: CreateWorkflowRequestModeReplicated,
CreatedTime: time.Now(),
}).Return(nil, nil)

// Prepare DeserializeWorkflow call
Expand Down
15 changes: 8 additions & 7 deletions common/persistence/nosql/nosql_task_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ func (t *nosqlTaskStore) LeaseTaskList(
Message: "LeaseTaskList requires non empty task list",
}
}
now := time.Now()
now := request.UpdatedTime
var err, selectErr error
var currTL *nosqlplugin.TaskListRow
storeShard, err := t.GetStoreShardByTaskList(request.DomainID, request.TaskList, request.TaskType)
Expand Down Expand Up @@ -217,7 +217,7 @@ func (t *nosqlTaskStore) UpdateTaskList(
RangeID: tli.RangeID,
TaskListKind: tli.Kind,
AckLevel: tli.AckLevel,
LastUpdatedTime: time.Now(),
LastUpdatedTime: request.UpdatedTime,
AdaptivePartitionConfig: tli.AdaptivePartitionConfig,
}
storeShard, err := t.GetStoreShardByTaskList(tli.DomainID, tli.Name, tli.TaskType)
Expand Down Expand Up @@ -287,7 +287,7 @@ func (t *nosqlTaskStore) CreateTasks(
ctx context.Context,
request *persistence.CreateTasksRequest,
) (*persistence.CreateTasksResponse, error) {
now := time.Now()
now := request.CreatedTime
var tasks []*nosqlplugin.TaskRowForInsert
for _, taskRequest := range request.Tasks {
task := &nosqlplugin.TaskRow{
Expand Down Expand Up @@ -326,10 +326,11 @@ func (t *nosqlTaskStore) CreateTasks(
}

tasklistCondition := &nosqlplugin.TaskListRow{
DomainID: request.TaskListInfo.DomainID,
TaskListName: request.TaskListInfo.Name,
TaskListType: request.TaskListInfo.TaskType,
RangeID: request.TaskListInfo.RangeID,
DomainID: request.TaskListInfo.DomainID,
TaskListName: request.TaskListInfo.Name,
TaskListType: request.TaskListInfo.TaskType,
RangeID: request.TaskListInfo.RangeID,
LastUpdatedTime: now,
}

tli := request.TaskListInfo
Expand Down
20 changes: 12 additions & 8 deletions common/persistence/nosql/nosql_task_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -544,10 +544,11 @@ func TestCreateTasks(t *testing.T) {
name: "success - skipping task with Expiry expired",
setupMock: func(dbMock *nosqlplugin.MockDB) {
dbMock.EXPECT().InsertTasks(gomock.Any(), gomock.Any(), &nosqlplugin.TaskListRow{
DomainID: TestDomainID,
TaskListName: TestTaskListName,
TaskListType: TestTaskType,
RangeID: 1,
DomainID: TestDomainID,
TaskListName: TestTaskListName,
TaskListType: TestTaskType,
RangeID: 1,
LastUpdatedTime: now,
}).Do(func(_ context.Context, tasks []*nosqlplugin.TaskRowForInsert, _ *nosqlplugin.TaskListRow) {
assert.Len(t, tasks, 0)
}).Return(nil).Times(1)
Expand All @@ -559,6 +560,7 @@ func TestCreateTasks(t *testing.T) {
TaskType: TestTaskType,
RangeID: 1,
},
CreatedTime: now,
Tasks: []*persistence.CreateTaskInfo{
{
TaskID: 100,
Expand Down Expand Up @@ -666,6 +668,7 @@ func getValidLeaseTaskListRequest() *persistence.LeaseTaskListRequest {
TaskType: int(types.TaskListTypeDecision),
TaskListKind: int(types.TaskListKindNormal),
RangeID: 0,
UpdatedTime: FixedTime,
}
}

Expand All @@ -685,7 +688,7 @@ func checkTaskListInfoExpected(t *testing.T, taskListInfo *persistence.TaskListI
assert.Equal(t, initialRangeID, taskListInfo.RangeID)
assert.Equal(t, initialAckLevel, taskListInfo.AckLevel)
assert.Equal(t, int(types.TaskListKindNormal), taskListInfo.Kind)
assert.WithinDuration(t, time.Now(), taskListInfo.LastUpdated, time.Second)
assert.WithinDuration(t, FixedTime, taskListInfo.LastUpdated, time.Second)
}

func taskRowEqualTaskInfo(t *testing.T, taskrow1 nosqlplugin.TaskRow, taskInfo1 *persistence.TaskInfo) {
Expand Down Expand Up @@ -727,7 +730,7 @@ func getExpectedTaskListRow() *nosqlplugin.TaskListRow {
RangeID: initialRangeID,
TaskListKind: int(types.TaskListKindNormal),
AckLevel: initialAckLevel,
LastUpdatedTime: time.Now(),
LastUpdatedTime: FixedTime,
}
}

Expand All @@ -739,7 +742,7 @@ func getExpectedTaskListRowWithPartitionConfig() *nosqlplugin.TaskListRow {
RangeID: initialRangeID,
TaskListKind: int(types.TaskListKindNormal),
AckLevel: initialAckLevel,
LastUpdatedTime: time.Now(),
LastUpdatedTime: FixedTime,
AdaptivePartitionConfig: &persistence.TaskListPartitionConfig{
Version: 1,
NumReadPartitions: 2,
Expand All @@ -749,6 +752,7 @@ func getExpectedTaskListRowWithPartitionConfig() *nosqlplugin.TaskListRow {
}

func checkTaskListRowExpected(t *testing.T, expectedRow *nosqlplugin.TaskListRow, taskList *nosqlplugin.TaskListRow) {
t.Helper()
// Check the duration
assert.WithinDuration(t, expectedRow.LastUpdatedTime, taskList.LastUpdatedTime, time.Second)

Expand All @@ -765,7 +769,7 @@ func getExpectedTaskListInfo() *persistence.TaskListInfo {
RangeID: initialRangeID,
AckLevel: initialAckLevel,
Kind: int(types.TaskListKindNormal),
LastUpdated: time.Now(),
LastUpdated: FixedTime,
AdaptivePartitionConfig: &persistence.TaskListPartitionConfig{
Version: 1,
NumReadPartitions: 2,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@ func (db *cdb) InsertIntoHistoryTreeAndNode(ctx context.Context, treeRow *nosqlp
// Note: for perf, prefer using batch for inserting more than one records
batch := db.session.NewBatch(gocql.LoggedBatch).WithContext(ctx)
batch.Query(v2templateInsertTree,
// TODO: Should the `persistence.UnixNanoToDBTimestamp(treeRow.CreateTimestamp.UnixNano())` logic be handled by `persistenceManager` as well, or should it remain as is?
treeRow.TreeID, treeRow.BranchID, ancs, persistence.UnixNanoToDBTimestamp(treeRow.CreateTimestamp.UnixNano()), treeRow.Info, treeRow.CreateTimestamp)
batch.Query(v2templateUpsertData,
nodeRow.TreeID, nodeRow.BranchID, nodeRow.NodeID, nodeRow.TxnID, nodeRow.Data, nodeRow.DataEncoding, nodeRow.CreateTimestamp)
Expand Down
36 changes: 19 additions & 17 deletions common/persistence/nosql/nosqlplugin/cassandra/tasks.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ func fromTaskListPartitionConfig(config *persistence.TaskListPartitionConfig) ma
// InsertTaskList insert a single tasklist row
// Return TaskOperationConditionFailure if the condition doesn't meet
func (db *cdb) InsertTaskList(ctx context.Context, row *nosqlplugin.TaskListRow) error {
timeStamp := db.timeSrc.Now()
timeStamp := row.LastUpdatedTime
query := db.session.Query(templateInsertTaskListQuery,
row.DomainID,
row.TaskListName,
Expand Down Expand Up @@ -144,7 +144,7 @@ func (db *cdb) UpdateTaskList(
row *nosqlplugin.TaskListRow,
previousRangeID int64,
) error {
timeStamp := db.timeSrc.Now()
timeStamp := row.LastUpdatedTime
query := db.session.Query(templateUpdateTaskListQuery,
row.RangeID,
row.DomainID,
Expand Down Expand Up @@ -198,7 +198,7 @@ func (db *cdb) UpdateTaskListWithTTL(
row *nosqlplugin.TaskListRow,
previousRangeID int64,
) error {
timeStamp := db.timeSrc.Now()
timeStamp := row.LastUpdatedTime
batch := db.session.NewBatch(gocql.LoggedBatch).WithContext(ctx)
// part 1 is used to set TTL on primary key as UPDATE can't set TTL for primary key
batch.Query(templateUpdateTaskListQueryWithTTLPart1,
Expand Down Expand Up @@ -282,7 +282,7 @@ func (db *cdb) InsertTasks(
domainID := tasklistCondition.DomainID
taskListName := tasklistCondition.TaskListName
taskListType := tasklistCondition.TaskListType
timeStamp := db.timeSrc.Now()
timeStamp := tasklistCondition.LastUpdatedTime

for _, task := range tasksToInsert {
scheduleID := task.ScheduledID
Expand Down Expand Up @@ -387,23 +387,25 @@ PopulateTasks:
continue
}

// Extract the TTL value
ttlValue, ttlExists := task["ttl"]

// Check if TTL is null or an integer
var ttl *int
if ttlExists && ttlValue != nil {
if ttlInt, ok := ttlValue.(int); ok {
ttl = &ttlInt // TTL is an integer
}
}
// TODO: no usage of ttl
//// Extract the TTL value
//ttlValue, ttlExists := task["ttl"]
//
//// Check if TTL is null or an integer
//var ttl *int
//if ttlExists && ttlValue != nil {
// if ttlInt, ok := ttlValue.(int); ok {
// ttl = &ttlInt // TTL is an integer
// }
//}

t := createTaskInfo(task["task"].(map[string]interface{}))
t.TaskID = taskID.(int64)

if ttl != nil {
t.Expiry = db.timeSrc.Now().Add(time.Duration(*ttl) * time.Second)
}
// TODO: removing this, because there is no usage of it
//if ttl != nil {
// t.Expiry = db.timeSrc.Now().Add(time.Duration(*ttl) * time.Second)
//}

response = append(response, t)
if len(response) == filter.BatchSize {
Expand Down
7 changes: 7 additions & 0 deletions common/persistence/task_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,14 @@ package persistence

import (
"context"

"github.com/uber/cadence/common/clock"
)

type (
taskManager struct {
persistence TaskStore
timeSrc clock.TimeSource
}
)

Expand All @@ -40,6 +43,7 @@ func NewTaskManager(
) TaskManager {
return &taskManager{
persistence: persistence,
timeSrc: clock.NewRealTimeSource(),
}
}

Expand All @@ -52,6 +56,7 @@ func (t *taskManager) Close() {
}

func (t *taskManager) LeaseTaskList(ctx context.Context, request *LeaseTaskListRequest) (*LeaseTaskListResponse, error) {
request.UpdatedTime = t.timeSrc.Now()
return t.persistence.LeaseTaskList(ctx, request)
}

Expand All @@ -60,6 +65,7 @@ func (t *taskManager) GetTaskList(ctx context.Context, request *GetTaskListReque
}

func (t *taskManager) UpdateTaskList(ctx context.Context, request *UpdateTaskListRequest) (*UpdateTaskListResponse, error) {
request.UpdatedTime = t.timeSrc.Now()
return t.persistence.UpdateTaskList(ctx, request)
}

Expand All @@ -76,6 +82,7 @@ func (t *taskManager) GetTaskListSize(ctx context.Context, request *GetTaskListS
}

func (t *taskManager) CreateTasks(ctx context.Context, request *CreateTasksRequest) (*CreateTasksResponse, error) {
request.CreatedTime = t.timeSrc.Now()
return t.persistence.CreateTasks(ctx, request)
}

Expand Down
15 changes: 11 additions & 4 deletions service/matching/tasklist/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"sync"
"sync/atomic"

"github.com/uber/cadence/common/clock"
"github.com/uber/cadence/common/log"
"github.com/uber/cadence/common/log/tag"
"github.com/uber/cadence/common/persistence"
Expand All @@ -44,6 +45,7 @@ type (
partitionConfig *persistence.TaskListPartitionConfig
store persistence.TaskManager
logger log.Logger
timeSrc clock.TimeSource
}
taskListState struct {
rangeID int64
Expand All @@ -70,6 +72,7 @@ func newTaskListDB(store persistence.TaskManager, domainID string, domainName st
taskType: taskType,
store: store,
logger: logger,
timeSrc: clock.NewRealTimeSource(),
}
}

Expand Down Expand Up @@ -103,6 +106,7 @@ func (db *taskListDB) RenewLease() (taskListState, error) {
TaskListKind: db.taskListKind,
RangeID: atomic.LoadInt64(&db.rangeID),
DomainName: db.domainName,
UpdatedTime: db.timeSrc.Now(),
})
if err != nil {
return taskListState{}, err
Expand All @@ -127,7 +131,8 @@ func (db *taskListDB) UpdateState(ackLevel int64) error {
Kind: db.taskListKind,
AdaptivePartitionConfig: db.partitionConfig,
},
DomainName: db.domainName,
DomainName: db.domainName,
UpdatedTime: db.timeSrc.Now(),
})
if err != nil {
return err
Expand All @@ -149,7 +154,8 @@ func (db *taskListDB) UpdateTaskListPartitionConfig(partitionConfig *persistence
Kind: db.taskListKind,
AdaptivePartitionConfig: partitionConfig,
},
DomainName: db.domainName,
DomainName: db.domainName,
UpdatedTime: db.timeSrc.Now(),
})
if err != nil {
return err
Expand All @@ -169,8 +175,9 @@ func (db *taskListDB) CreateTasks(tasks []*persistence.CreateTaskInfo) (*persist
TaskType: db.taskType,
RangeID: db.rangeID,
},
Tasks: tasks,
DomainName: db.domainName,
Tasks: tasks,
DomainName: db.domainName,
CreatedTime: db.timeSrc.Now(),
})
}

Expand Down

0 comments on commit 3472e3e

Please sign in to comment.