-
Notifications
You must be signed in to change notification settings - Fork 815
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Move time source from db layer to PersistenceManager #6646
base: master
Are you sure you want to change the base?
Move time source from db layer to PersistenceManager #6646
Conversation
@@ -60,7 +60,7 @@ func TestNosqlExecutionStoreUtils(t *testing.T) { | |||
Data: []byte(`[{"Branches":[{"BranchID":"test-branch-id","BeginNodeID":1,"EndNodeID":2}]}]`), | |||
}, | |||
} | |||
return store.prepareCreateWorkflowExecutionRequestWithMaps(workflowSnapshot) | |||
return store.prepareCreateWorkflowExecutionRequestWithMaps(workflowSnapshot, time.Now()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Have you had any error when running the unit tests? I was thinking to use a constant time stamp in the tests like this so that in the unit tests it will always keep the same
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, there haven't been any errors in the tests with this implementation. However, I refactored it to use a FixedTime variable for consistency.
e99014a
to
f1bb070
Compare
@bowenxia added another commit with a refactor of the history storage. If you have time, please take a look. I'll be sending the work in small chunks, little by little. There are a few TODOs— if you could reply to them, I’d really appreciate it. |
@@ -54,19 +53,20 @@ func (db *cdb) InsertIntoHistoryTreeAndNode(ctx context.Context, treeRow *nosqlp | |||
// Note: for perf, prefer using batch for inserting more than one records | |||
batch := db.session.NewBatch(gocql.LoggedBatch).WithContext(ctx) | |||
batch.Query(v2templateInsertTree, | |||
treeRow.TreeID, treeRow.BranchID, ancs, persistence.UnixNanoToDBTimestamp(treeRow.CreateTimestamp.UnixNano()), treeRow.Info, timeStamp) | |||
// TODO: Should the `persistence.UnixNanoToDBTimestamp(treeRow.CreateTimestamp.UnixNano())` logic be handled by `persistenceManager` as well, or should it remain as is? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's leave it as is, since this PR is more related to the created_time and last_updated_time timestamps.
LGTM so far. Lemme know if you are planning to refactor other persistence managers here, or in a new PR. |
I'm planning to do everything in one PR. I figured it would be more convenient to have the refactor in a single place rather than splitting it up, unless there are any objections. |
3472e3e
to
daf8015
Compare
@bowenxia, I refactored the task manager—please take a look. I've added some questions marked as TODOs. |
ttl = &ttlInt // TTL is an integer | ||
} | ||
} | ||
// TODO: no usage of ttl |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's leave it as-is. Because this is not the purpose of this PR.
If there's something wrong, it would be easier to find root cause.
if ttl != nil { | ||
t.Expiry = db.timeSrc.Now().Add(time.Duration(*ttl) * time.Second) | ||
} | ||
// TODO: removing this, because there is no usage of it |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Another issue at this low-level layer is that TTL expiry computation is happening here, whereas I understand that we should remove any computation from db layer entirely. Moving this logic to upstream led nowhere, as TTL isn't used there either.
Since the task is to remove timeSrc
from db
and we need to discuss this matter later, I could replace the current logic with time.Now()
and keep the TODO.
@@ -110,7 +110,7 @@ func fromTaskListPartitionConfig(config *persistence.TaskListPartitionConfig) ma | |||
// InsertTaskList insert a single tasklist row | |||
// Return TaskOperationConditionFailure if the condition doesn't meet | |||
func (db *cdb) InsertTaskList(ctx context.Context, row *nosqlplugin.TaskListRow) error { | |||
timeStamp := db.timeSrc.Now() | |||
timeStamp := row.LastUpdatedTime |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
insert is a create operation. It would be confused to have lastUpdatedTime. I understand that you might want to reuse the LastUpdateTime in TaskListRow struct, but
- This lastUpdatedTime is used for task object, not for a TaskList row in Cassandra. Thus, it uses time.now() in nosql_task_store.
- Changing it would break the logic of creating a task obejct, which is not related to what we are doing in this PR.
- Maybe add a field like "timestamp" would be a better idea.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I suggest using a single expression, like currentTimestamp
, for all functions (insert, update). What do you think?
@@ -282,7 +282,7 @@ func (db *cdb) InsertTasks( | |||
domainID := tasklistCondition.DomainID | |||
taskListName := tasklistCondition.TaskListName | |||
taskListType := tasklistCondition.TaskListType | |||
timeStamp := db.timeSrc.Now() | |||
timeStamp := tasklistCondition.LastUpdatedTime |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same here. Insert is a create operation
@bowenxia when you have a moment, please take another look—especially at the Queue Manager. It requires additional attention on how to handle it. |
@@ -200,13 +200,13 @@ type ( | |||
EnqueueMessage(ctx context.Context, messagePayload []byte) error | |||
ReadMessages(ctx context.Context, lastMessageID int64, maxCount int) ([]*InternalQueueMessage, error) | |||
DeleteMessagesBefore(ctx context.Context, messageID int64) error | |||
UpdateAckLevel(ctx context.Context, messageID int64, clusterName string) error | |||
UpdateAckLevel(ctx context.Context, messageID int64, clusterName string, now time.Time) error |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit:
maybe consider changing those to currentTimeStamp as well to make those more descriptive.
@@ -100,7 +101,7 @@ func (q *nosqlQueueStore) EnqueueMessage( | |||
if err != nil { | |||
return err | |||
} | |||
_, err = q.tryEnqueue(ctx, q.queueType, getNextID(ackLevels, lastMessageID), messagePayload) | |||
_, err = q.tryEnqueue(ctx, q.queueType, getNextID(ackLevels, lastMessageID), messagePayload, time.Now()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let's change the signature of newNoSQLQueueStore() to have a currentTimestamp as a parameter, in that case we can pass the time from persistence manager
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry, but I don't see how adding the currentTimestamp parameter to newNoSQLQueueStore
would allow passing the time from the persistence manager. The newNoSQLQueueStore
leads to the factory and then to the bean.
That said, I still was able to resolve the selected code.
However, my main challenge now is avoiding time generation at the store level, which happens within newNoSQLQueueStore
, specifically in the createQueueMetadataEntryIfNotExist
method. If the current solution is okay, please let me know.
I've updated the PR. Please take another look. Skip the tests for now, as they are pending our decision on the solution for newNoSQLQueueStore
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes you are right.
Adding timeSrc to nosql_queue_store might be a way. @taylanisikdemir Do you have an opinion? Since it is too hard to add timeSrc in queue_manager and pass it down to nosql_queue_store.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Update:
Sync with Taylan offilne. Let's add timeSrc as a field in to nosql_queue_store, and then create it during the call of its constructor for now. Also, could you mark this part as a to do for future work? Thanks!
@@ -352,7 +353,7 @@ func TestUpdateAckLevel(t *testing.T) { | |||
require.NoError(t, err, "Failed to create sql queue store") | |||
|
|||
tc.mockSetup(mockDB, mockTx) | |||
err = store.UpdateAckLevel(context.Background(), 0, tc.clusterName) | |||
err = store.UpdateAckLevel(context.Background(), 0, tc.clusterName, time.Now()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe change those to fixedTime as well
@@ -719,7 +720,7 @@ func TestUpdateDLQAckLevel(t *testing.T) { | |||
require.NoError(t, err, "Failed to create sql queue store") | |||
|
|||
tc.mockSetup(mockDB, mockTx) | |||
err = store.UpdateDLQAckLevel(context.Background(), 0, tc.clusterName) | |||
err = store.UpdateDLQAckLevel(context.Background(), 0, tc.clusterName, time.Now()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same
@@ -248,20 +244,21 @@ func (q *nosqlQueueStore) insertInitialQueueMetadataRecord( | |||
queueType persistence.QueueType, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's add a currentTimestamp here as an extra field
@@ -114,16 +115,11 @@ func (q *nosqlQueueStore) EnqueueMessageToDLQ( | |||
return err | |||
} | |||
|
|||
_, err = q.tryEnqueue(ctx, q.getDLQTypeFromQueueType(), lastMessageID+1, messagePayload) | |||
_, err = q.tryEnqueue(ctx, q.getDLQTypeFromQueueType(), lastMessageID+1, messagePayload, time.Now()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's do the same as you did for UpdateAckLevel and UpdateDLQAckLevel to have a currentTimestamp as an extra parameter
4c8cfb7
to
2b03e69
Compare
d9d0012
to
fc3bf14
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shard storage refactored LGTM
RangeID: initialRangeID, | ||
TaskListKind: request.TaskListKind, | ||
AckLevel: initialAckLevel, | ||
LastUpdatedTime: now, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: consider change now
to be a more descriptive name like currentTimeStamp
TaskListName: request.TaskListInfo.Name, | ||
TaskListType: request.TaskListInfo.TaskType, | ||
RangeID: request.TaskListInfo.RangeID, | ||
LastUpdatedTime: now, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: same
ts := []persistence.Task{task} | ||
|
||
tasks, err := d.prepareReplicationTasksForWorkflowTxn(task.DomainID, rowTypeReplicationWorkflowID, rowTypeReplicationRunID, ts) | ||
if err != nil { | ||
return err | ||
} | ||
tasks[i].CurrentTimeStamp = request.CurrentTimeStamp |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is redundant here to add timestamp to each task since we are storing the entire tasklist into Cassandra; also we do have a creationTime for each task.
I am thinking how to pass one timestamp into the method d.db.InsertReplicationTask
without changing the signature. @taylanisikdemir Any suggestions? I am thinking to add the timestamp into ctx, but not sure if it is the best way.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There has been update in this block of code (i.e. Refactor peristence layer to support inserting history tasks of new categories #6671). Adjusted the code accordingly.
# Conflicts: # common/persistence/nosql/nosqlplugin/cassandra/tasks_test.go
# Conflicts: # common/persistence/nosql/nosql_execution_store.go
ac8f49b
to
bbcb70d
Compare
bbcb70d
to
b19b80a
Compare
Reviewing new changes |
8949948
to
0ec186a
Compare
@bowenxia, I’ve made some minor changes and added one TODO—please take a look. Looks like I’ve refactored everything required for this task. The Queue store is still a bit tricky though. I think it's ready to remove WIP and start to wrap it up 😄 |
Hi @ribaraka sounds good. I was too busy to review the PR today. I'll try get back to you tmr or the day after. Thanks for your contribution! |
Refactored the system so that the PersistenceManager level handles the timestamp
Why?
#6610
How did you test it?
unit tests
Potential risks
Release notes
Documentation Changes