Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move time source from db layer to PersistenceManager #6646

Open
wants to merge 13 commits into
base: master
Choose a base branch
from
6 changes: 4 additions & 2 deletions common/persistence/config_store_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@ package persistence

import (
"context"
"time"

"github.com/uber/cadence/common"
"github.com/uber/cadence/common/clock"
"github.com/uber/cadence/common/log"
)

Expand All @@ -35,6 +35,7 @@ type (
serializer PayloadSerializer
persistence ConfigStore
logger log.Logger
timeSrc clock.TimeSource
}
)

Expand All @@ -46,6 +47,7 @@ func NewConfigStoreManagerImpl(persistence ConfigStore, logger log.Logger) Confi
serializer: NewPayloadSerializer(),
persistence: persistence,
logger: logger,
timeSrc: clock.NewRealTimeSource(),
}
}

Expand Down Expand Up @@ -79,7 +81,7 @@ func (m *configStoreManagerImpl) UpdateDynamicConfig(ctx context.Context, reques
entry := &InternalConfigStoreEntry{
RowType: int(cfgType),
Version: request.Snapshot.Version,
Timestamp: time.Now(),
Timestamp: m.timeSrc.Now(),
Values: blob,
}

Expand Down
2 changes: 2 additions & 0 deletions common/persistence/config_store_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"go.uber.org/mock/gomock"

"github.com/uber/cadence/common"
"github.com/uber/cadence/common/clock"
"github.com/uber/cadence/common/log"
"github.com/uber/cadence/common/types"
)
Expand All @@ -46,6 +47,7 @@ func setUpMocksForConfigStoreManager(t *testing.T) (*configStoreManagerImpl, *Mo
serializer: mockSerializer,
persistence: mockStore,
logger: logger,
timeSrc: clock.NewRealTimeSource(),
}, mockStore, mockSerializer
}

Expand Down
31 changes: 18 additions & 13 deletions common/persistence/data_manager_interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -1051,12 +1051,13 @@ type (

// LeaseTaskListRequest is used to request lease of a task list
LeaseTaskListRequest struct {
DomainID string
DomainName string
TaskList string
TaskType int
TaskListKind int
RangeID int64
DomainID string
DomainName string
TaskList string
TaskType int
TaskListKind int
RangeID int64
CurrentTimeStamp time.Time
}

// LeaseTaskListResponse is response to LeaseTaskListRequest
Expand All @@ -1077,8 +1078,9 @@ type (

// UpdateTaskListRequest is used to update task list implementation information
UpdateTaskListRequest struct {
TaskListInfo *TaskListInfo
DomainName string
TaskListInfo *TaskListInfo
DomainName string
CurrentTimeStamp time.Time
}

// UpdateTaskListResponse is the response to UpdateTaskList
Expand Down Expand Up @@ -1120,9 +1122,10 @@ type (

// CreateTasksRequest is used to create a new task for a workflow exectution
CreateTasksRequest struct {
TaskListInfo *TaskListInfo
Tasks []*CreateTaskInfo
DomainName string
TaskListInfo *TaskListInfo
Tasks []*CreateTaskInfo
DomainName string
CurrentTimeStamp time.Time
}

// CreateTaskInfo describes a task to be created in CreateTasksRequest
Expand Down Expand Up @@ -1243,6 +1246,7 @@ type (
ConfigVersion int64
FailoverVersion int64
LastUpdatedTime int64
CurrentTimeStamp time.Time
}

// CreateDomainResponse is the response for CreateDomain
Expand Down Expand Up @@ -1545,8 +1549,9 @@ type (

// CreateFailoverMarkersRequest is request to create failover markers
CreateFailoverMarkersRequest struct {
RangeID int64
Markers []*FailoverMarkerTask
RangeID int64
Markers []*FailoverMarkerTask
CurrentTimeStamp time.Time
}

// FetchDynamicConfigResponse is a response to FetchDynamicConfigResponse
Expand Down
29 changes: 22 additions & 7 deletions common/persistence/data_store_interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,16 +197,16 @@ type (
// Queue is a store to enqueue and get messages
Queue interface {
Closeable
EnqueueMessage(ctx context.Context, messagePayload []byte) error
EnqueueMessage(ctx context.Context, messagePayload []byte, currentTimeStamp time.Time) error
ReadMessages(ctx context.Context, lastMessageID int64, maxCount int) ([]*InternalQueueMessage, error)
DeleteMessagesBefore(ctx context.Context, messageID int64) error
UpdateAckLevel(ctx context.Context, messageID int64, clusterName string) error
UpdateAckLevel(ctx context.Context, messageID int64, clusterName string, currentTimestamp time.Time) error
GetAckLevels(ctx context.Context) (map[string]int64, error)
EnqueueMessageToDLQ(ctx context.Context, messagePayload []byte) error
EnqueueMessageToDLQ(ctx context.Context, messagePayload []byte, currentTimeStamp time.Time) error
ReadMessagesFromDLQ(ctx context.Context, firstMessageID int64, lastMessageID int64, pageSize int, pageToken []byte) ([]*InternalQueueMessage, []byte, error)
DeleteMessageFromDLQ(ctx context.Context, messageID int64) error
RangeDeleteMessagesFromDLQ(ctx context.Context, firstMessageID int64, lastMessageID int64) error
UpdateDLQAckLevel(ctx context.Context, messageID int64, clusterName string) error
UpdateDLQAckLevel(ctx context.Context, messageID int64, clusterName string, currentTimestamp time.Time) error
GetDLQAckLevels(ctx context.Context) (map[string]int64, error)
GetDLQSize(ctx context.Context) (int64, error)
}
Expand Down Expand Up @@ -238,6 +238,8 @@ type (
NewWorkflowSnapshot InternalWorkflowSnapshot

WorkflowRequestMode CreateWorkflowRequestMode

CurrentTimeStamp time.Time
}

// InternalGetReplicationTasksResponse is the response to GetReplicationTask
Expand Down Expand Up @@ -269,6 +271,7 @@ type (
BranchToken []byte
NewRunBranchToken []byte
CreationTime time.Time
CurrentTimeStamp time.Time
}

// InternalWorkflowExecutionInfo describes a workflow execution for Persistence Interface
Expand Down Expand Up @@ -424,6 +427,8 @@ type (
NewWorkflowSnapshot *InternalWorkflowSnapshot

WorkflowRequestMode CreateWorkflowRequestMode

CurrentTimeStamp time.Time
}

// InternalConflictResolveWorkflowExecutionRequest is used to reset workflow execution state for Persistence Interface
Expand All @@ -442,6 +447,8 @@ type (
CurrentWorkflowMutation *InternalWorkflowMutation

WorkflowRequestMode CreateWorkflowRequestMode

CurrentTimeStamp time.Time
}

// InternalWorkflowMutation is used as generic workflow execution state mutation for Persistence Interface
Expand Down Expand Up @@ -528,6 +535,8 @@ type (
TransactionID int64
// Used in sharded data stores to identify which shard to use
ShardID int

CurrentTimeStamp time.Time
}

// InternalGetWorkflowExecutionRequest is used to retrieve the info of a workflow execution
Expand Down Expand Up @@ -566,6 +575,8 @@ type (
Info string
// Used in sharded data stores to identify which shard to use
ShardID int

CurrentTimeStamp time.Time
}

// InternalForkHistoryBranchResponse is the response to ForkHistoryBranchRequest
Expand Down Expand Up @@ -810,6 +821,7 @@ type (
ConfigVersion int64
FailoverVersion int64
LastUpdatedTime time.Time
CurrentTimeStamp time.Time
}

// InternalGetDomainResponse is the response for GetDomain
Expand Down Expand Up @@ -865,11 +877,13 @@ type (
ClusterReplicationLevel map[string]int64 `json:"cluster_replication_level"`
DomainNotificationVersion int64 `json:"domain_notification_version"`
PendingFailoverMarkers *DataBlob `json:"pending_failover_markers"`
CurrentTimestamp time.Time
}

// InternalCreateShardRequest is request to CreateShard
InternalCreateShardRequest struct {
ShardInfo *InternalShardInfo
ShardInfo *InternalShardInfo
CurrentTimeStamp time.Time
}

// InternalGetShardRequest is used to get shard information
Expand All @@ -879,8 +893,9 @@ type (

// InternalUpdateShardRequest is used to update shard information
InternalUpdateShardRequest struct {
ShardInfo *InternalShardInfo
PreviousRangeID int64
ShardInfo *InternalShardInfo
PreviousRangeID int64
CurrentTimeStamp time.Time
}

// InternalGetShardResponse is the response to GetShard
Expand Down
4 changes: 4 additions & 0 deletions common/persistence/domain_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"time"

"github.com/uber/cadence/common"
"github.com/uber/cadence/common/clock"
"github.com/uber/cadence/common/log"
"github.com/uber/cadence/common/types"
)
Expand All @@ -36,6 +37,7 @@ type (
serializer PayloadSerializer
persistence DomainStore
logger log.Logger
timeSrc clock.TimeSource
}
)

Expand All @@ -45,6 +47,7 @@ func NewDomainManagerImpl(persistence DomainStore, logger log.Logger, serializer
serializer: serializer,
persistence: persistence,
logger: logger,
timeSrc: clock.NewRealTimeSource(),
}
}

Expand All @@ -68,6 +71,7 @@ func (m *domainManagerImpl) CreateDomain(
ConfigVersion: request.ConfigVersion,
FailoverVersion: request.FailoverVersion,
LastUpdatedTime: time.Unix(0, request.LastUpdatedTime),
CurrentTimeStamp: m.timeSrc.Now(),
})
}

Expand Down
47 changes: 27 additions & 20 deletions common/persistence/domain_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,27 +110,34 @@ func TestCreateDomain(t *testing.T) {
mockSerializer.EXPECT().
SerializeAsyncWorkflowsConfig(&types.AsyncWorkflowConfiguration{Enabled: true, PredefinedQueueName: "q", QueueType: "kafka"}, common.EncodingTypeThriftRW).
Return(&DataBlob{Encoding: common.EncodingTypeThriftRW, Data: []byte("async-workflow-config")}, nil).Times(1)

expectedReq := &InternalCreateDomainRequest{
Info: testFixtureDomainInfo(),
Config: &InternalDomainConfig{
Retention: common.DaysToDuration(1),
EmitMetric: true,
HistoryArchivalStatus: types.ArchivalStatusEnabled,
HistoryArchivalURI: "s3://abc",
VisibilityArchivalStatus: types.ArchivalStatusEnabled,
VisibilityArchivalURI: "s3://xyz",
BadBinaries: &DataBlob{Encoding: common.EncodingTypeThriftRW, Data: []byte("bad-binaries")},
IsolationGroups: &DataBlob{Encoding: common.EncodingTypeThriftRW, Data: []byte("isolation-groups")},
AsyncWorkflowsConfig: &DataBlob{Encoding: common.EncodingTypeThriftRW, Data: []byte("async-workflow-config")},
},
ReplicationConfig: testFixtureDomainReplicationConfig(),
IsGlobalDomain: true,
ConfigVersion: 1,
FailoverVersion: 10,
LastUpdatedTime: time.Unix(0, 100),
CurrentTimeStamp: time.Now(),
}

mockStore.EXPECT().
CreateDomain(gomock.Any(), &InternalCreateDomainRequest{
Info: testFixtureDomainInfo(),
Config: &InternalDomainConfig{
Retention: common.DaysToDuration(1),
EmitMetric: true,
HistoryArchivalStatus: types.ArchivalStatusEnabled,
HistoryArchivalURI: "s3://abc",
VisibilityArchivalStatus: types.ArchivalStatusEnabled,
VisibilityArchivalURI: "s3://xyz",
BadBinaries: &DataBlob{Encoding: common.EncodingTypeThriftRW, Data: []byte("bad-binaries")},
IsolationGroups: &DataBlob{Encoding: common.EncodingTypeThriftRW, Data: []byte("isolation-groups")},
AsyncWorkflowsConfig: &DataBlob{Encoding: common.EncodingTypeThriftRW, Data: []byte("async-workflow-config")},
},
ReplicationConfig: testFixtureDomainReplicationConfig(),
IsGlobalDomain: true,
ConfigVersion: 1,
FailoverVersion: 10,
LastUpdatedTime: time.Unix(0, 100),
}).
Return(&CreateDomainResponse{}, nil).Times(1)
CreateDomain(gomock.Any(), gomock.Any()).DoAndReturn(
func(_ context.Context, actualRequest *InternalCreateDomainRequest) (*CreateDomainResponse, error) {
assert.WithinDuration(t, expectedReq.CurrentTimeStamp, actualRequest.CurrentTimeStamp, time.Second)
return nil, nil
})
},
request: &CreateDomainRequest{
Info: testFixtureDomainInfo(),
Expand Down
11 changes: 11 additions & 0 deletions common/persistence/execution_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"time"

"github.com/uber/cadence/common"
"github.com/uber/cadence/common/clock"
"github.com/uber/cadence/common/log"
"github.com/uber/cadence/common/types"
)
Expand All @@ -37,6 +38,7 @@ type (
persistence ExecutionStore
statsComputer statsComputer
logger log.Logger
timeSrc clock.TimeSource
}
)

Expand All @@ -53,6 +55,7 @@ func NewExecutionManagerImpl(
persistence: persistence,
statsComputer: statsComputer{},
logger: logger,
timeSrc: clock.NewRealTimeSource(),
}
}

Expand Down Expand Up @@ -348,6 +351,8 @@ func (m *executionManagerImpl) UpdateWorkflowExecution(
NewWorkflowSnapshot: serializedNewWorkflowSnapshot,

WorkflowRequestMode: request.WorkflowRequestMode,

CurrentTimeStamp: m.timeSrc.Now(),
}
msuss := m.statsComputer.computeMutableStateUpdateStats(newRequest)
err = m.persistence.UpdateWorkflowExecution(ctx, newRequest)
Expand Down Expand Up @@ -567,6 +572,8 @@ func (m *executionManagerImpl) ConflictResolveWorkflowExecution(
CurrentWorkflowMutation: serializedCurrentWorkflowMutation,

WorkflowRequestMode: request.WorkflowRequestMode,

CurrentTimeStamp: m.timeSrc.Now(),
}
msuss := m.statsComputer.computeMutableStateConflictResolveStats(newRequest)
err = m.persistence.ConflictResolveWorkflowExecution(ctx, newRequest)
Expand Down Expand Up @@ -599,6 +606,8 @@ func (m *executionManagerImpl) CreateWorkflowExecution(
NewWorkflowSnapshot: *serializedNewWorkflowSnapshot,

WorkflowRequestMode: request.WorkflowRequestMode,

CurrentTimeStamp: m.timeSrc.Now(),
}

msuss := m.statsComputer.computeMutableStateCreateStats(newRequest)
Expand Down Expand Up @@ -940,6 +949,7 @@ func (m *executionManagerImpl) CreateFailoverMarkerTasks(
ctx context.Context,
request *CreateFailoverMarkersRequest,
) error {
request.CurrentTimeStamp = m.timeSrc.Now()
return m.persistence.CreateFailoverMarkerTasks(ctx, request)
}

Expand Down Expand Up @@ -1017,6 +1027,7 @@ func (m *executionManagerImpl) toInternalReplicationTaskInfo(info *ReplicationTa
BranchToken: info.BranchToken,
NewRunBranchToken: info.NewRunBranchToken,
CreationTime: time.Unix(0, info.CreationTime).UTC(),
CurrentTimeStamp: m.timeSrc.Now(),
}
}

Expand Down
Loading