Skip to content

Commit

Permalink
queue storage refactored
Browse files Browse the repository at this point in the history
  • Loading branch information
ribaraka committed Feb 7, 2025
1 parent 4846e42 commit 4c8cfb7
Show file tree
Hide file tree
Showing 13 changed files with 71 additions and 79 deletions.
4 changes: 2 additions & 2 deletions common/persistence/data_store_interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,13 +200,13 @@ type (
EnqueueMessage(ctx context.Context, messagePayload []byte) error
ReadMessages(ctx context.Context, lastMessageID int64, maxCount int) ([]*InternalQueueMessage, error)
DeleteMessagesBefore(ctx context.Context, messageID int64) error
UpdateAckLevel(ctx context.Context, messageID int64, clusterName string) error
UpdateAckLevel(ctx context.Context, messageID int64, clusterName string, now time.Time) error
GetAckLevels(ctx context.Context) (map[string]int64, error)
EnqueueMessageToDLQ(ctx context.Context, messagePayload []byte) error
ReadMessagesFromDLQ(ctx context.Context, firstMessageID int64, lastMessageID int64, pageSize int, pageToken []byte) ([]*InternalQueueMessage, []byte, error)
DeleteMessageFromDLQ(ctx context.Context, messageID int64) error
RangeDeleteMessagesFromDLQ(ctx context.Context, firstMessageID int64, lastMessageID int64) error
UpdateDLQAckLevel(ctx context.Context, messageID int64, clusterName string) error
UpdateDLQAckLevel(ctx context.Context, messageID int64, clusterName string, now time.Time) error
GetDLQAckLevels(ctx context.Context) (map[string]int64, error)
GetDLQSize(ctx context.Context) (int64, error)
}
Expand Down
40 changes: 17 additions & 23 deletions common/persistence/nosql/nosql_queue_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ package nosql
import (
"context"
"fmt"
"time"

"github.com/uber/cadence/common/config"
"github.com/uber/cadence/common/log"
Expand Down Expand Up @@ -100,7 +101,7 @@ func (q *nosqlQueueStore) EnqueueMessage(
if err != nil {
return err
}
_, err = q.tryEnqueue(ctx, q.queueType, getNextID(ackLevels, lastMessageID), messagePayload)
_, err = q.tryEnqueue(ctx, q.queueType, getNextID(ackLevels, lastMessageID), messagePayload, time.Now())
return err
}

Expand All @@ -114,16 +115,11 @@ func (q *nosqlQueueStore) EnqueueMessageToDLQ(
return err
}

_, err = q.tryEnqueue(ctx, q.getDLQTypeFromQueueType(), lastMessageID+1, messagePayload)
_, err = q.tryEnqueue(ctx, q.getDLQTypeFromQueueType(), lastMessageID+1, messagePayload, time.Now())
return err
}

func (q *nosqlQueueStore) tryEnqueue(
ctx context.Context,
queueType persistence.QueueType,
messageID int64,
messagePayload []byte,
) (int64, error) {
func (q *nosqlQueueStore) tryEnqueue(ctx context.Context, queueType persistence.QueueType, messageID int64, messagePayload []byte, now time.Time) (int64, error) {
err := q.db.InsertIntoQueue(ctx, &nosqlplugin.QueueMessageRow{
QueueType: queueType,
ID: messageID,
Expand Down Expand Up @@ -248,20 +244,21 @@ func (q *nosqlQueueStore) insertInitialQueueMetadataRecord(
queueType persistence.QueueType,
) error {
version := int64(0)
if err := q.db.InsertQueueMetadata(ctx, queueType, version); err != nil {

// TODO: kind of stuck here. don't know if there is better solution than this
row := nosqlplugin.QueueMetadataRow{
Version: version,
CurrentTimeStamp: time.Now(),
}
if err := q.db.InsertQueueMetadata(ctx, row); err != nil {
return convertCommonErrors(q.db, fmt.Sprintf("InsertInitialQueueMetadataRecord, Type: %v", queueType), err)
}

return nil
}

func (q *nosqlQueueStore) UpdateAckLevel(
ctx context.Context,
messageID int64,
clusterName string,
) error {

return q.updateAckLevel(ctx, messageID, clusterName, q.queueType)
func (q *nosqlQueueStore) UpdateAckLevel(ctx context.Context, messageID int64, clusterName string, now time.Time) error {
return q.updateAckLevel(ctx, messageID, clusterName, q.queueType, now)
}

func (q *nosqlQueueStore) GetAckLevels(
Expand All @@ -275,13 +272,8 @@ func (q *nosqlQueueStore) GetAckLevels(
return queueMetadata.ClusterAckLevels, nil
}

func (q *nosqlQueueStore) UpdateDLQAckLevel(
ctx context.Context,
messageID int64,
clusterName string,
) error {

return q.updateAckLevel(ctx, messageID, clusterName, q.getDLQTypeFromQueueType())
func (q *nosqlQueueStore) UpdateDLQAckLevel(ctx context.Context, messageID int64, clusterName string, now time.Time) error {
return q.updateAckLevel(ctx, messageID, clusterName, q.getDLQTypeFromQueueType(), now)
}

func (q *nosqlQueueStore) GetDLQAckLevels(
Expand Down Expand Up @@ -352,6 +344,7 @@ func (q *nosqlQueueStore) updateAckLevel(
messageID int64,
clusterName string,
queueType persistence.QueueType,
now time.Time,
) error {

queueMetadata, err := q.getQueueMetadata(ctx, queueType)
Expand All @@ -366,6 +359,7 @@ func (q *nosqlQueueStore) updateAckLevel(

queueMetadata.ClusterAckLevels[clusterName] = messageID
queueMetadata.Version++
queueMetadata.CurrentTimeStamp = now

// Use negative queue type as the dlq type
err = q.updateQueueMetadata(ctx, queueMetadata)
Expand Down
14 changes: 5 additions & 9 deletions common/persistence/nosql/nosqlplugin/cassandra/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func (db *cdb) InsertIntoQueue(
ctx context.Context,
row *nosqlplugin.QueueMessageRow,
) error {
timeStamp := db.timeSrc.Now()
timeStamp := row.CurrentTimeStamp
query := db.session.Query(templateEnqueueMessageQuery, row.QueueType, row.ID, row.Payload, timeStamp).WithContext(ctx)
previous := make(map[string]interface{})
applied, err := query.MapScanCAS(previous)
Expand Down Expand Up @@ -168,14 +168,10 @@ func (db *cdb) DeleteMessage(
}

// Insert an empty metadata row, starting from a version
func (db *cdb) InsertQueueMetadata(
ctx context.Context,
queueType persistence.QueueType,
version int64,
) error {
timeStamp := db.timeSrc.Now()
func (db *cdb) InsertQueueMetadata(ctx context.Context, row nosqlplugin.QueueMetadataRow) error {
timeStamp := row.CurrentTimeStamp
clusterAckLevels := map[string]int64{}
query := db.session.Query(templateInsertQueueMetadataQuery, queueType, clusterAckLevels, version, timeStamp).WithContext(ctx)
query := db.session.Query(templateInsertQueueMetadataQuery, row.QueueType, clusterAckLevels, row.Version, timeStamp).WithContext(ctx)

// NOTE: Must pass nils to be compatible with ScyllaDB's LWT behavior
// "Scylla always returns the old version of the row, regardless of whether the condition is true or not."
Expand All @@ -195,7 +191,7 @@ func (db *cdb) UpdateQueueMetadataCas(
ctx context.Context,
row nosqlplugin.QueueMetadataRow,
) error {
timeStamp := db.timeSrc.Now()
timeStamp := row.CurrentTimeStamp
query := db.session.Query(templateUpdateQueueMetadataQuery,
row.ClusterAckLevels,
row.Version,
Expand Down
15 changes: 11 additions & 4 deletions common/persistence/nosql/nosqlplugin/cassandra/queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -831,7 +831,12 @@ func TestInsertQueueMetadata(t *testing.T) {
db := newCassandraDBFromSession(cfg, session, logger, dc, dbWithClient(client))
db.timeSrc = clock.NewMockedTimeSourceAt(FixedTime)

err := db.InsertQueueMetadata(context.Background(), tc.queueType, tc.version)
row := nosqlplugin.QueueMetadataRow{
QueueType: tc.queueType,
Version: tc.version,
CurrentTimeStamp: FixedTime,
}
err := db.InsertQueueMetadata(context.Background(), row)

if (err != nil) != tc.wantErr {
t.Errorf("Got error = %v, wantErr %v", err, tc.wantErr)
Expand Down Expand Up @@ -862,6 +867,7 @@ func TestUpdateQueueMetadataCas(t *testing.T) {
QueueType: persistence.QueueType(2),
ClusterAckLevels: map[string]int64{"cluster1": 1000, "cluster2": 2000},
Version: 25,
CurrentTimeStamp: FixedTime,
},
queryMockFn: func(query *gocql.MockQuery) {
query.EXPECT().WithContext(gomock.Any()).Return(query).Times(1)
Expand Down Expand Up @@ -933,8 +939,9 @@ func TestUpdateQueueMetadataCas(t *testing.T) {
}
func queueMessageRow(id int64) *nosqlplugin.QueueMessageRow {
return &nosqlplugin.QueueMessageRow{
QueueType: persistence.DomainReplicationQueueType,
ID: id,
Payload: []byte(fmt.Sprintf("test-payload-%d", id)),
QueueType: persistence.DomainReplicationQueueType,
ID: id,
Payload: []byte(fmt.Sprintf("test-payload-%d", id)),
CurrentTimeStamp: FixedTime,
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -67,5 +67,6 @@ func NewDomainRow(ts time.Time) *nosqlplugin.DomainRow {
FailoverEndTime: &ts,
LastUpdatedTime: ts,
NotificationVersion: 5,
CurrentTimeStamp: time.Now(),
}
}
6 changes: 1 addition & 5 deletions common/persistence/nosql/nosqlplugin/dynamodb/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,11 +91,7 @@ func (db *ddb) DeleteMessage(
}

// Insert an empty metadata row, starting from a version
func (db *ddb) InsertQueueMetadata(
ctx context.Context,
queueType persistence.QueueType,
version int64,
) error {
func (db *ddb) InsertQueueMetadata(ctx context.Context, row nosqlplugin.QueueMetadataRow) error {
panic("TODO")
}

Expand Down
2 changes: 1 addition & 1 deletion common/persistence/nosql/nosqlplugin/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ type (
DeleteMessage(ctx context.Context, queueType persistence.QueueType, messageID int64) error

// Insert an empty metadata row, starting from a version
InsertQueueMetadata(ctx context.Context, queueType persistence.QueueType, version int64) error
InsertQueueMetadata(ctx context.Context, row QueueMetadataRow) error
// **Conditionally** update a queue metadata row, if current version is matched(meaning current == row.Version - 1),
// then the current version will increase by one when updating the metadata row
// Must return conditionFailed error if the condition is not met
Expand Down
24 changes: 12 additions & 12 deletions common/persistence/nosql/nosqlplugin/interfaces_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 1 addition & 5 deletions common/persistence/nosql/nosqlplugin/mongodb/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,11 +92,7 @@ func (db *mdb) DeleteMessage(
}

// Insert an empty metadata row, starting from a version
func (db *mdb) InsertQueueMetadata(
ctx context.Context,
queueType persistence.QueueType,
version int64,
) error {
func (db *mdb) InsertQueueMetadata(ctx context.Context, row nosqlplugin.QueueMetadataRow) error {
fmt.Println("not implemented, ignore the eror for testing")
return nil
}
Expand Down
8 changes: 5 additions & 3 deletions common/persistence/nosql/nosqlplugin/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,16 +248,18 @@ type (

// QueueMessageRow defines the row struct for queue message
QueueMessageRow struct {
QueueType persistence.QueueType
ID int64
Payload []byte
QueueType persistence.QueueType
ID int64
Payload []byte
CurrentTimeStamp time.Time
}

// QueueMetadataRow defines the row struct for metadata
QueueMetadataRow struct {
QueueType persistence.QueueType
ClusterAckLevels map[string]int64
Version int64
CurrentTimeStamp time.Time
}

// HistoryNodeRow represents a row in history_node table
Expand Down
12 changes: 9 additions & 3 deletions common/persistence/queue_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,16 @@

package persistence

import "context"
import (
"context"

"github.com/uber/cadence/common/clock"
)

type (
queueManager struct {
persistence Queue
timeSrc clock.TimeSource
}
)

Expand All @@ -38,6 +43,7 @@ func NewQueueManager(
) QueueManager {
return &queueManager{
persistence: persistence,
timeSrc: clock.NewRealTimeSource(),
}
}

Expand Down Expand Up @@ -66,7 +72,7 @@ func (q *queueManager) DeleteMessagesBefore(ctx context.Context, messageID int64
}

func (q *queueManager) UpdateAckLevel(ctx context.Context, messageID int64, clusterName string) error {
return q.persistence.UpdateAckLevel(ctx, messageID, clusterName)
return q.persistence.UpdateAckLevel(ctx, messageID, clusterName, q.timeSrc.Now())
}

func (q *queueManager) GetAckLevels(ctx context.Context) (map[string]int64, error) {
Expand Down Expand Up @@ -98,7 +104,7 @@ func (q *queueManager) RangeDeleteMessagesFromDLQ(ctx context.Context, firstMess
}

func (q *queueManager) UpdateDLQAckLevel(ctx context.Context, messageID int64, clusterName string) error {
return q.persistence.UpdateDLQAckLevel(ctx, messageID, clusterName)
return q.persistence.UpdateDLQAckLevel(ctx, messageID, clusterName, q.timeSrc.Now())
}

func (q *queueManager) GetDLQAckLevels(ctx context.Context) (map[string]int64, error) {
Expand Down
13 changes: 3 additions & 10 deletions common/persistence/sql/sql_queue_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"context"
"database/sql"
"fmt"
"time"

"github.com/uber/cadence/common/log"
"github.com/uber/cadence/common/persistence"
Expand Down Expand Up @@ -117,11 +118,7 @@ func (q *sqlQueueStore) DeleteMessagesBefore(
return nil
}

func (q *sqlQueueStore) UpdateAckLevel(
ctx context.Context,
messageID int64,
clusterName string,
) error {
func (q *sqlQueueStore) UpdateAckLevel(ctx context.Context, messageID int64, clusterName string, now time.Time) error {
return q.txExecute(ctx, sqlplugin.DbDefaultShard, "UpdateAckLevel", func(tx sqlplugin.Tx) error {
clusterAckLevels, err := tx.GetAckLevels(ctx, q.queueType, true)
if err != nil {
Expand Down Expand Up @@ -229,11 +226,7 @@ func (q *sqlQueueStore) RangeDeleteMessagesFromDLQ(
return nil
}

func (q *sqlQueueStore) UpdateDLQAckLevel(
ctx context.Context,
messageID int64,
clusterName string,
) error {
func (q *sqlQueueStore) UpdateDLQAckLevel(ctx context.Context, messageID int64, clusterName string, now time.Time) error {
return q.txExecute(ctx, sqlplugin.DbDefaultShard, "UpdateDLQAckLevel", func(tx sqlplugin.Tx) error {
clusterAckLevels, err := tx.GetAckLevels(ctx, q.getDLQTypeFromQueueType(), true)
if err != nil {
Expand Down
Loading

0 comments on commit 4c8cfb7

Please sign in to comment.