Skip to content

Commit

Permalink
scheduler: add disable to independent config (#8567)
Browse files Browse the repository at this point in the history
ref #8474

Signed-off-by: okJiang <[email protected]>

Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com>
  • Loading branch information
okJiang and ti-chi-bot[bot] authored Sep 18, 2024
1 parent 71f6f96 commit 23d544f
Show file tree
Hide file tree
Showing 32 changed files with 381 additions and 119 deletions.
1 change: 1 addition & 0 deletions pkg/mcs/scheduling/server/config/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ type Watcher struct {
*PersistConfig
// Some data, like the scheduler configs, should be loaded into the storage
// to make sure the coordinator could access them correctly.
// It is a memory storage.
storage storage.Storage
// schedulersController is used to trigger the scheduler's config reloading.
// Store as `*schedulers.Controller`.
Expand Down
31 changes: 17 additions & 14 deletions pkg/schedule/schedulers/balance_leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ import (
"github.com/tikv/pd/pkg/schedule/plan"
"github.com/tikv/pd/pkg/schedule/types"
"github.com/tikv/pd/pkg/utils/reflectutil"
"github.com/tikv/pd/pkg/utils/syncutil"
"github.com/tikv/pd/pkg/utils/typeutil"
"github.com/unrolled/render"
"go.uber.org/zap"
Expand All @@ -51,32 +50,36 @@ const (
transferOut = "transfer-out"
)

type balanceLeaderSchedulerConfig struct {
syncutil.RWMutex
schedulerConfig

type balanceLeaderSchedulerParam struct {
Ranges []core.KeyRange `json:"ranges"`
// Batch is used to generate multiple operators by one scheduling
Batch int `json:"batch"`
}

type balanceLeaderSchedulerConfig struct {
baseDefaultSchedulerConfig
balanceLeaderSchedulerParam
}

func (conf *balanceLeaderSchedulerConfig) update(data []byte) (int, any) {
conf.Lock()
defer conf.Unlock()

oldConfig, _ := json.Marshal(conf)
param := &conf.balanceLeaderSchedulerParam
oldConfig, _ := json.Marshal(param)

if err := json.Unmarshal(data, conf); err != nil {
if err := json.Unmarshal(data, param); err != nil {
return http.StatusInternalServerError, err.Error()
}
newConfig, _ := json.Marshal(conf)
newConfig, _ := json.Marshal(param)
if !bytes.Equal(oldConfig, newConfig) {
if !conf.validateLocked() {
if err := json.Unmarshal(oldConfig, conf); err != nil {
if err := json.Unmarshal(oldConfig, param); err != nil {
return http.StatusInternalServerError, err.Error()
}
return http.StatusBadRequest, "invalid batch size which should be an integer between 1 and 10"
}
conf.balanceLeaderSchedulerParam = *param
if err := conf.save(); err != nil {
log.Warn("failed to save balance-leader-scheduler config", errs.ZapError(err))
}
Expand All @@ -87,23 +90,23 @@ func (conf *balanceLeaderSchedulerConfig) update(data []byte) (int, any) {
if err := json.Unmarshal(data, &m); err != nil {
return http.StatusInternalServerError, err.Error()
}
ok := reflectutil.FindSameFieldByJSON(conf, m)
ok := reflectutil.FindSameFieldByJSON(param, m)
if ok {
return http.StatusOK, "Config is the same with origin, so do nothing."
}
return http.StatusBadRequest, "Config item is not found."
}

func (conf *balanceLeaderSchedulerConfig) validateLocked() bool {
func (conf *balanceLeaderSchedulerParam) validateLocked() bool {
return conf.Batch >= 1 && conf.Batch <= 10
}

func (conf *balanceLeaderSchedulerConfig) clone() *balanceLeaderSchedulerConfig {
func (conf *balanceLeaderSchedulerConfig) clone() *balanceLeaderSchedulerParam {
conf.RLock()
defer conf.RUnlock()
ranges := make([]core.KeyRange, len(conf.Ranges))
copy(ranges, conf.Ranges)
return &balanceLeaderSchedulerConfig{
return &balanceLeaderSchedulerParam{
Ranges: ranges,
Batch: conf.Batch,
}
Expand Down Expand Up @@ -164,7 +167,7 @@ type balanceLeaderScheduler struct {
// each store balanced.
func newBalanceLeaderScheduler(opController *operator.Controller, conf *balanceLeaderSchedulerConfig, options ...BalanceLeaderCreateOption) Scheduler {
s := &balanceLeaderScheduler{
BaseScheduler: NewBaseScheduler(opController, types.BalanceLeaderScheduler),
BaseScheduler: NewBaseScheduler(opController, types.BalanceLeaderScheduler, conf),
retryQuota: newRetryQuota(),
conf: conf,
handler: newBalanceLeaderHandler(conf),
Expand Down
6 changes: 4 additions & 2 deletions pkg/schedule/schedulers/balance_leader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,10 @@ func TestBalanceLeaderSchedulerConfigClone(t *testing.T) {
re := require.New(t)
keyRanges1, _ := getKeyRanges([]string{"a", "b", "c", "d"})
conf := &balanceLeaderSchedulerConfig{
Ranges: keyRanges1,
Batch: 10,
balanceLeaderSchedulerParam: balanceLeaderSchedulerParam{
Ranges: keyRanges1,
Batch: 10,
},
}
conf2 := conf.clone()
re.Equal(conf.Batch, conf2.Batch)
Expand Down
4 changes: 3 additions & 1 deletion pkg/schedule/schedulers/balance_region.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ import (
)

type balanceRegionSchedulerConfig struct {
baseDefaultSchedulerConfig

Ranges []core.KeyRange `json:"ranges"`
// TODO: When we prepare to use Ranges, we will need to implement the ReloadConfig function for this scheduler.
}
Expand All @@ -48,7 +50,7 @@ type balanceRegionScheduler struct {
// each store balanced.
func newBalanceRegionScheduler(opController *operator.Controller, conf *balanceRegionSchedulerConfig, opts ...BalanceRegionCreateOption) Scheduler {
scheduler := &balanceRegionScheduler{
BaseScheduler: NewBaseScheduler(opController, types.BalanceRegionScheduler),
BaseScheduler: NewBaseScheduler(opController, types.BalanceRegionScheduler, conf),
retryQuota: newRetryQuota(),
name: types.BalanceRegionScheduler.String(),
conf: conf,
Expand Down
2 changes: 1 addition & 1 deletion pkg/schedule/schedulers/balance_witness.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ type balanceWitnessScheduler struct {
// each store balanced.
func newBalanceWitnessScheduler(opController *operator.Controller, conf *balanceWitnessSchedulerConfig, options ...BalanceWitnessCreateOption) Scheduler {
s := &balanceWitnessScheduler{
BaseScheduler: NewBaseScheduler(opController, types.BalanceWitnessScheduler),
BaseScheduler: NewBaseScheduler(opController, types.BalanceWitnessScheduler, conf),
retryQuota: newRetryQuota(),
conf: conf,
handler: newBalanceWitnessHandler(conf),
Expand Down
33 changes: 31 additions & 2 deletions pkg/schedule/schedulers/base_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,11 +65,16 @@ type BaseScheduler struct {

name string
tp types.CheckerSchedulerType
conf schedulerConfig
}

// NewBaseScheduler returns a basic scheduler
func NewBaseScheduler(opController *operator.Controller, tp types.CheckerSchedulerType) *BaseScheduler {
return &BaseScheduler{OpController: opController, tp: tp}
func NewBaseScheduler(
opController *operator.Controller,
tp types.CheckerSchedulerType,
conf schedulerConfig,
) *BaseScheduler {
return &BaseScheduler{OpController: opController, tp: tp, conf: conf}
}

func (*BaseScheduler) ServeHTTP(w http.ResponseWriter, _ *http.Request) {
Expand Down Expand Up @@ -114,3 +119,27 @@ func (s *BaseScheduler) GetName() string {
func (s *BaseScheduler) GetType() types.CheckerSchedulerType {
return s.tp
}

// IsDisable implements the Scheduler interface.
func (s *BaseScheduler) IsDisable() bool {
if conf, ok := s.conf.(defaultSchedulerConfig); ok {
return conf.isDisable()
}
return false
}

// SetDisable implements the Scheduler interface.
func (s *BaseScheduler) SetDisable(disable bool) error {
if conf, ok := s.conf.(defaultSchedulerConfig); ok {
return conf.setDisable(disable)
}
return nil
}

// IsDefault returns if the scheduler is a default scheduler.
func (s *BaseScheduler) IsDefault() bool {
if _, ok := s.conf.(defaultSchedulerConfig); ok {
return true
}
return false
}
47 changes: 46 additions & 1 deletion pkg/schedule/schedulers/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,21 @@ package schedulers
import (
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/log"
"github.com/tikv/pd/pkg/errs"
"github.com/tikv/pd/pkg/storage/endpoint"
"github.com/tikv/pd/pkg/utils/syncutil"
"go.uber.org/zap"
)

type schedulerConfig interface {
init(name string, storage endpoint.ConfigStorage, data any)
save() error
load(any) error
init(name string, storage endpoint.ConfigStorage, data any)
}

var _ schedulerConfig = &baseSchedulerConfig{}

type baseSchedulerConfig struct {
name string
storage endpoint.ConfigStorage
Expand Down Expand Up @@ -58,3 +64,42 @@ func (b *baseSchedulerConfig) load(v any) error {
}
return DecodeConfig([]byte(data), v)
}

// defaultSchedulerConfig is the interface to represent the default scheduler
// config. It is used in the BaseScheduler.
type defaultSchedulerConfig interface {
schedulerConfig

isDisable() bool
setDisable(disabled bool) error
}

type baseDefaultSchedulerConfig struct {
schedulerConfig
syncutil.RWMutex

Disabled bool `json:"disabled"`
}

func newBaseDefaultSchedulerConfig() baseDefaultSchedulerConfig {
return baseDefaultSchedulerConfig{
schedulerConfig: &baseSchedulerConfig{},
}
}

func (b *baseDefaultSchedulerConfig) isDisable() bool {
b.Lock()
defer b.Unlock()
if err := b.load(b); err != nil {
log.Warn("failed to load scheduler config, maybe the config never persist", errs.ZapError(err))
}
return b.Disabled
}

func (b *baseDefaultSchedulerConfig) setDisable(disabled bool) error {
b.Lock()
defer b.Unlock()
b.Disabled = disabled
log.Info("set scheduler disable", zap.Bool("disabled", disabled))
return b.save()
}
34 changes: 34 additions & 0 deletions pkg/schedule/schedulers/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,3 +48,37 @@ func TestSchedulerConfig(t *testing.T) {
// report error because the config is empty and cannot be decoded
require.Error(t, cfg2.load(newTc))
}

func TestDefaultSchedulerConfig(t *testing.T) {
s := storage.NewStorageWithMemoryBackend()

type testConfig struct {
balanceLeaderSchedulerConfig
Value string `json:"value"`
}

cfg := &testConfig{
balanceLeaderSchedulerConfig: balanceLeaderSchedulerConfig{
baseDefaultSchedulerConfig: newBaseDefaultSchedulerConfig(),
},
Value: "test",
}
cfg.init("test", s, cfg)
require.False(t, cfg.isDisable())
require.NoError(t, cfg.setDisable(true))
require.True(t, cfg.isDisable())

cfg2 := &testConfig{
balanceLeaderSchedulerConfig: balanceLeaderSchedulerConfig{
baseDefaultSchedulerConfig: newBaseDefaultSchedulerConfig(),
},
}
cfg2.init("test", s, cfg2)
require.True(t, cfg2.isDisable())
require.Equal(t, "", cfg2.Value)

cfg3 := &testConfig{}
require.NoError(t, cfg2.load(cfg3))
require.Equal(t, cfg.Value, cfg3.Value)
require.True(t, cfg3.Disabled)
}
2 changes: 1 addition & 1 deletion pkg/schedule/schedulers/evict_leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ type evictLeaderScheduler struct {
func newEvictLeaderScheduler(opController *operator.Controller, conf *evictLeaderSchedulerConfig) Scheduler {
handler := newEvictLeaderHandler(conf)
return &evictLeaderScheduler{
BaseScheduler: NewBaseScheduler(opController, types.EvictLeaderScheduler),
BaseScheduler: NewBaseScheduler(opController, types.EvictLeaderScheduler, conf),
conf: conf,
handler: handler,
}
Expand Down
14 changes: 6 additions & 8 deletions pkg/schedule/schedulers/evict_slow_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import (
"github.com/tikv/pd/pkg/schedule/plan"
"github.com/tikv/pd/pkg/schedule/types"
"github.com/tikv/pd/pkg/utils/apiutil"
"github.com/tikv/pd/pkg/utils/syncutil"
"github.com/unrolled/render"
"go.uber.org/zap"
)
Expand All @@ -39,8 +38,7 @@ const (
)

type evictSlowStoreSchedulerConfig struct {
syncutil.RWMutex
schedulerConfig
baseDefaultSchedulerConfig

cluster *core.BasicCluster
// Last timestamp of the chosen slow store for eviction.
Expand All @@ -52,10 +50,10 @@ type evictSlowStoreSchedulerConfig struct {

func initEvictSlowStoreSchedulerConfig() *evictSlowStoreSchedulerConfig {
return &evictSlowStoreSchedulerConfig{
schedulerConfig: &baseSchedulerConfig{},
lastSlowStoreCaptureTS: time.Time{},
RecoveryDurationGap: defaultRecoveryDurationGap,
EvictedStores: make([]uint64, 0),
baseDefaultSchedulerConfig: newBaseDefaultSchedulerConfig(),
lastSlowStoreCaptureTS: time.Time{},
RecoveryDurationGap: defaultRecoveryDurationGap,
EvictedStores: make([]uint64, 0),
}
}

Expand Down Expand Up @@ -314,7 +312,7 @@ func (s *evictSlowStoreScheduler) Schedule(cluster sche.SchedulerCluster, _ bool
func newEvictSlowStoreScheduler(opController *operator.Controller, conf *evictSlowStoreSchedulerConfig) Scheduler {
handler := newEvictSlowStoreHandler(conf)
return &evictSlowStoreScheduler{
BaseScheduler: NewBaseScheduler(opController, types.EvictSlowStoreScheduler),
BaseScheduler: NewBaseScheduler(opController, types.EvictSlowStoreScheduler, conf),
conf: conf,
handler: handler,
}
Expand Down
Loading

0 comments on commit 23d544f

Please sign in to comment.