Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce task scheduler rate limiter #6677

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions common/clock/ratelimiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
// SOFTWARE.

//go:generate mockgen -package=$GOPACKAGE -destination=ratelimiter_mock.go github.com/uber/cadence/common/clock Reservation

package clock

import (
Expand Down
88 changes: 88 additions & 0 deletions common/clock/ratelimiter_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

18 changes: 18 additions & 0 deletions common/dynamicconfig/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -943,6 +943,7 @@ const (
// Default value: 1
// Allowed filters: N/A
TaskSchedulerDispatcherCount
TaskSchedulerGlobalDomainRPS
// TaskCriticalRetryCount is the critical retry count for background tasks
// when task attempt exceeds this threshold:
// - task attempt metrics and additional error logs will be emitted
Expand Down Expand Up @@ -1702,6 +1703,8 @@ const (
// Default value: false
// Allowed filters: N/A
TransferProcessorEnableValidator
TaskSchedulerEnableRateLimiter
TaskSchedulerEnableRateLimiterShadowMode
// EnableAdminProtection is whether to enable admin checking
// KeyName: history.enableAdminProtection
// Value type: Bool
Expand Down Expand Up @@ -3392,6 +3395,11 @@ var IntKeys = map[IntKey]DynamicInt{
Description: "TaskSchedulerDispatcherCount is the number of task dispatcher in task scheduler (only applies to host level task scheduler)",
DefaultValue: 1,
},
TaskSchedulerGlobalDomainRPS: {
KeyName: "history.taskSchedulerGlobalDomainRPS",
Description: "TaskSchedulerGlobalDomainRPS is the task scheduling domain rate limit per second for the whole Cadence cluster",
DefaultValue: 1000,
},
TaskCriticalRetryCount: {
KeyName: "history.taskCriticalRetryCount",
Description: "TaskCriticalRetryCount is the critical retry count for background tasks, when task attempt exceeds this threshold:- task attempt metrics and additional error logs will be emitted- task priority will be lowered",
Expand Down Expand Up @@ -4063,6 +4071,16 @@ var BoolKeys = map[BoolKey]DynamicBool{
Description: "TransferProcessorEnableValidator is whether validator should be enabled for transferQueueProcessor",
DefaultValue: false,
},
TaskSchedulerEnableRateLimiter: {
KeyName: "history.taskSchedulerEnableRateLimiter",
Description: "TaskSchedulerEnableRateLimiter indicates whether the task scheduler rate limiter is enabled",
DefaultValue: false,
},
TaskSchedulerEnableRateLimiterShadowMode: {
KeyName: "history.taskSchedulerEnableRateLimiterShadowMode",
Description: "TaskSchedulerEnableRateLimiterShadowMode indicates whether the task scheduler rate limiter is in shadow mode",
DefaultValue: true,
},
EnableAdminProtection: {
KeyName: "history.enableAdminProtection",
Description: "EnableAdminProtection is whether to enable admin checking",
Expand Down
5 changes: 5 additions & 0 deletions common/metrics/defs.go
Original file line number Diff line number Diff line change
Expand Up @@ -815,6 +815,8 @@ const (
ParallelTaskProcessingScope
// TaskSchedulerScope is used by task scheduler logic
TaskSchedulerScope
// TaskSchedulerRateLimiterScope is used by task scheduler rate limiter logic
TaskSchedulerRateLimiterScope

// HistoryArchiverScope is used by history archivers
HistoryArchiverScope
Expand Down Expand Up @@ -1755,6 +1757,7 @@ var ScopeDefs = map[ServiceIdx]map[int]scopeDefinition{
SequentialTaskProcessingScope: {operation: "SequentialTaskProcessing"},
ParallelTaskProcessingScope: {operation: "ParallelTaskProcessing"},
TaskSchedulerScope: {operation: "TaskScheduler"},
TaskSchedulerRateLimiterScope: {operation: "TaskSchedulerRateLimiter"},

HistoryArchiverScope: {operation: "HistoryArchiver"},
VisibilityArchiverScope: {operation: "VisibilityArchiver"},
Expand Down Expand Up @@ -2334,6 +2337,7 @@ const (
TransferTaskMissingEventCounterPerDomain
ReplicationTasksAppliedPerDomain
WorkflowTerminateCounterPerDomain
TaskSchedulerThrottledCounterPerDomain

TaskRedispatchQueuePendingTasksTimer

Expand Down Expand Up @@ -3045,6 +3049,7 @@ var MetricDefs = map[ServiceIdx]map[int]metricDefinition{
TransferTaskMissingEventCounterPerDomain: {metricName: "transfer_task_missing_event_counter_per_domain", metricRollupName: "transfer_task_missing_event_counter", metricType: Counter},
ReplicationTasksAppliedPerDomain: {metricName: "replication_tasks_applied_per_domain", metricRollupName: "replication_tasks_applied", metricType: Counter},
WorkflowTerminateCounterPerDomain: {metricName: "workflow_terminate_counter_per_domain", metricRollupName: "workflow_terminate_counter", metricType: Counter},
TaskSchedulerThrottledCounterPerDomain: {metricName: "task_scheduler_throttled_counter_per_domain", metricRollupName: "task_scheduler_throttled_counter", metricType: Counter},

TaskBatchCompleteCounter: {metricName: "task_batch_complete_counter", metricType: Counter},
TaskBatchCompleteFailure: {metricName: "task_batch_complete_error", metricType: Counter},
Expand Down
2 changes: 2 additions & 0 deletions common/quotas/collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

//go:generate mockgen -package=$GOPACKAGE -destination=collection_mock.go github.com/uber/cadence/common/quotas ICollection

package quotas

import "sync"
Expand Down
76 changes: 76 additions & 0 deletions common/quotas/collection_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

62 changes: 34 additions & 28 deletions service/history/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,20 +88,23 @@ type Config struct {
StandbyTaskMissingEventsDiscardDelay dynamicconfig.DurationPropertyFn

// Task process settings
TaskProcessRPS dynamicconfig.IntPropertyFnWithDomainFilter
TaskSchedulerType dynamicconfig.IntPropertyFn
TaskSchedulerWorkerCount dynamicconfig.IntPropertyFn
TaskSchedulerShardWorkerCount dynamicconfig.IntPropertyFn
TaskSchedulerQueueSize dynamicconfig.IntPropertyFn
TaskSchedulerShardQueueSize dynamicconfig.IntPropertyFn
TaskSchedulerDispatcherCount dynamicconfig.IntPropertyFn
TaskSchedulerRoundRobinWeights dynamicconfig.MapPropertyFn
TaskCriticalRetryCount dynamicconfig.IntPropertyFn
ActiveTaskRedispatchInterval dynamicconfig.DurationPropertyFn
StandbyTaskRedispatchInterval dynamicconfig.DurationPropertyFn
StandbyTaskReReplicationContextTimeout dynamicconfig.DurationPropertyFnWithDomainIDFilter
EnableDropStuckTaskByDomainID dynamicconfig.BoolPropertyFnWithDomainIDFilter
ResurrectionCheckMinDelay dynamicconfig.DurationPropertyFnWithDomainFilter
TaskProcessRPS dynamicconfig.IntPropertyFnWithDomainFilter
TaskSchedulerType dynamicconfig.IntPropertyFn
TaskSchedulerWorkerCount dynamicconfig.IntPropertyFn
TaskSchedulerShardWorkerCount dynamicconfig.IntPropertyFn
TaskSchedulerQueueSize dynamicconfig.IntPropertyFn
TaskSchedulerShardQueueSize dynamicconfig.IntPropertyFn
TaskSchedulerDispatcherCount dynamicconfig.IntPropertyFn
TaskSchedulerRoundRobinWeights dynamicconfig.MapPropertyFn
TaskSchedulerGlobalDomainRPS dynamicconfig.IntPropertyFnWithDomainFilter
TaskSchedulerEnableRateLimiter dynamicconfig.BoolPropertyFn
TaskSchedulerEnableRateLimiterShadowMode dynamicconfig.BoolPropertyFnWithDomainFilter
TaskCriticalRetryCount dynamicconfig.IntPropertyFn
ActiveTaskRedispatchInterval dynamicconfig.DurationPropertyFn
StandbyTaskRedispatchInterval dynamicconfig.DurationPropertyFn
StandbyTaskReReplicationContextTimeout dynamicconfig.DurationPropertyFnWithDomainIDFilter
EnableDropStuckTaskByDomainID dynamicconfig.BoolPropertyFnWithDomainIDFilter
ResurrectionCheckMinDelay dynamicconfig.DurationPropertyFnWithDomainFilter

// QueueProcessor settings
QueueProcessorEnableSplit dynamicconfig.BoolPropertyFn
Expand Down Expand Up @@ -365,20 +368,23 @@ func New(dc *dynamicconfig.Collection, numberOfShards int, maxMessageSize int, i
DeleteHistoryEventContextTimeout: dc.GetIntProperty(dynamicconfig.DeleteHistoryEventContextTimeout),
MaxResponseSize: maxMessageSize,

TaskProcessRPS: dc.GetIntPropertyFilteredByDomain(dynamicconfig.TaskProcessRPS),
TaskSchedulerType: dc.GetIntProperty(dynamicconfig.TaskSchedulerType),
TaskSchedulerWorkerCount: dc.GetIntProperty(dynamicconfig.TaskSchedulerWorkerCount),
TaskSchedulerShardWorkerCount: dc.GetIntProperty(dynamicconfig.TaskSchedulerShardWorkerCount),
TaskSchedulerQueueSize: dc.GetIntProperty(dynamicconfig.TaskSchedulerQueueSize),
TaskSchedulerShardQueueSize: dc.GetIntProperty(dynamicconfig.TaskSchedulerShardQueueSize),
TaskSchedulerDispatcherCount: dc.GetIntProperty(dynamicconfig.TaskSchedulerDispatcherCount),
TaskSchedulerRoundRobinWeights: dc.GetMapProperty(dynamicconfig.TaskSchedulerRoundRobinWeights),
TaskCriticalRetryCount: dc.GetIntProperty(dynamicconfig.TaskCriticalRetryCount),
ActiveTaskRedispatchInterval: dc.GetDurationProperty(dynamicconfig.ActiveTaskRedispatchInterval),
StandbyTaskRedispatchInterval: dc.GetDurationProperty(dynamicconfig.StandbyTaskRedispatchInterval),
StandbyTaskReReplicationContextTimeout: dc.GetDurationPropertyFilteredByDomainID(dynamicconfig.StandbyTaskReReplicationContextTimeout),
EnableDropStuckTaskByDomainID: dc.GetBoolPropertyFilteredByDomainID(dynamicconfig.EnableDropStuckTaskByDomainID),
ResurrectionCheckMinDelay: dc.GetDurationPropertyFilteredByDomain(dynamicconfig.ResurrectionCheckMinDelay),
TaskProcessRPS: dc.GetIntPropertyFilteredByDomain(dynamicconfig.TaskProcessRPS),
TaskSchedulerType: dc.GetIntProperty(dynamicconfig.TaskSchedulerType),
TaskSchedulerWorkerCount: dc.GetIntProperty(dynamicconfig.TaskSchedulerWorkerCount),
TaskSchedulerShardWorkerCount: dc.GetIntProperty(dynamicconfig.TaskSchedulerShardWorkerCount),
TaskSchedulerQueueSize: dc.GetIntProperty(dynamicconfig.TaskSchedulerQueueSize),
TaskSchedulerShardQueueSize: dc.GetIntProperty(dynamicconfig.TaskSchedulerShardQueueSize),
TaskSchedulerDispatcherCount: dc.GetIntProperty(dynamicconfig.TaskSchedulerDispatcherCount),
TaskSchedulerRoundRobinWeights: dc.GetMapProperty(dynamicconfig.TaskSchedulerRoundRobinWeights),
TaskSchedulerGlobalDomainRPS: dc.GetIntPropertyFilteredByDomain(dynamicconfig.TaskSchedulerGlobalDomainRPS),
TaskSchedulerEnableRateLimiter: dc.GetBoolProperty(dynamicconfig.TaskSchedulerEnableRateLimiter),
TaskSchedulerEnableRateLimiterShadowMode: dc.GetBoolPropertyFilteredByDomain(dynamicconfig.TaskSchedulerEnableRateLimiterShadowMode),
TaskCriticalRetryCount: dc.GetIntProperty(dynamicconfig.TaskCriticalRetryCount),
ActiveTaskRedispatchInterval: dc.GetDurationProperty(dynamicconfig.ActiveTaskRedispatchInterval),
StandbyTaskRedispatchInterval: dc.GetDurationProperty(dynamicconfig.StandbyTaskRedispatchInterval),
StandbyTaskReReplicationContextTimeout: dc.GetDurationPropertyFilteredByDomainID(dynamicconfig.StandbyTaskReReplicationContextTimeout),
EnableDropStuckTaskByDomainID: dc.GetBoolPropertyFilteredByDomainID(dynamicconfig.EnableDropStuckTaskByDomainID),
ResurrectionCheckMinDelay: dc.GetDurationPropertyFilteredByDomain(dynamicconfig.ResurrectionCheckMinDelay),

QueueProcessorEnableSplit: dc.GetBoolProperty(dynamicconfig.QueueProcessorEnableSplit),
QueueProcessorSplitMaxLevel: dc.GetIntProperty(dynamicconfig.QueueProcessorSplitMaxLevel),
Expand Down
3 changes: 3 additions & 0 deletions service/history/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,9 @@ func TestNewConfig(t *testing.T) {
"GlobalRatelimiterUpdateInterval": {dynamicconfig.GlobalRatelimiterUpdateInterval, time.Second},
"GlobalRatelimiterDecayAfter": {dynamicconfig.HistoryGlobalRatelimiterDecayAfter, time.Second},
"GlobalRatelimiterGCAfter": {dynamicconfig.HistoryGlobalRatelimiterGCAfter, time.Second},
"TaskSchedulerGlobalDomainRPS": {dynamicconfig.TaskSchedulerGlobalDomainRPS, 97},
"TaskSchedulerEnableRateLimiterShadowMode": {dynamicconfig.TaskSchedulerEnableRateLimiterShadowMode, false},
"TaskSchedulerEnableRateLimiter": {dynamicconfig.TaskSchedulerEnableRateLimiter, true},
"HostName": {nil, hostname},
}
client := dynamicconfig.NewInMemoryClient()
Expand Down
22 changes: 16 additions & 6 deletions service/history/handler/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,14 @@ func (h *handlerImpl) Start() {
h.config,
)

h.queueTaskProcessor, err = task.NewProcessor(
h.controller = shard.NewShardController(
h.Resource,
h,
h.config,
)

var taskProcessor task.Processor
taskProcessor, err = task.NewProcessor(
taskPriorityAssigner,
h.config,
h.GetLogger(),
Expand All @@ -138,13 +145,16 @@ func (h *handlerImpl) Start() {
if err != nil {
h.GetLogger().Fatal("Creating priority task processor failed", tag.Error(err))
}
h.queueTaskProcessor.Start()

h.controller = shard.NewShardController(
h.Resource,
h,
taskRateLimiter := task.NewRateLimiter(
h.GetLogger(),
h.GetMetricsClient(),
h.GetDomainCache(),
h.config,
h.controller,
)
h.queueTaskProcessor = task.NewRateLimitedProcessor(taskProcessor, taskRateLimiter)
h.queueTaskProcessor.Start()

h.historyEventNotifier = events.NewNotifier(h.GetTimeSource(), h.GetMetricsClient(), h.config.GetShardID)
// events notifier must starts before controller
h.historyEventNotifier.Start()
Expand Down
Loading
Loading