Skip to content

Commit

Permalink
schedule: use config interface for scheduler controller (#6774)
Browse files Browse the repository at this point in the history
ref #5839

Signed-off-by: Ryan Leung <[email protected]>

Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com>
  • Loading branch information
rleungx and ti-chi-bot[bot] authored Jul 11, 2023
1 parent 1d0bbb8 commit 3cce629
Show file tree
Hide file tree
Showing 6 changed files with 64 additions and 72 deletions.
5 changes: 5 additions & 0 deletions pkg/schedule/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/tikv/pd/pkg/core/constant"
"github.com/tikv/pd/pkg/core/storelimit"
"github.com/tikv/pd/pkg/storage/endpoint"
)

// RejectLeader is the label property type that suggests a store should not
Expand All @@ -30,6 +31,10 @@ func IsSchedulerRegistered(name string) bool {
// Config is the interface that wraps the Config related methods.
type Config interface {
IsSchedulingHalted() bool
IsSchedulerDisabled(string) bool
AddSchedulerCfg(string, []string)
RemoveSchedulerCfg(string)
Persist(endpoint.ConfigStorage) error

GetReplicaScheduleLimit() uint64
GetRegionScheduleLimit() uint64
Expand Down
8 changes: 4 additions & 4 deletions pkg/schedule/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,8 @@ type Coordinator struct {
// NewCoordinator creates a new Coordinator.
func NewCoordinator(ctx context.Context, cluster sche.ClusterInformer, hbStreams *hbstream.HeartbeatStreams) *Coordinator {
ctx, cancel := context.WithCancel(ctx)
opController := operator.NewController(ctx, cluster.GetBasicCluster(), cluster.GetPersistOptions(), hbStreams)
schedulers := schedulers.NewController(ctx, cluster, opController)
opController := operator.NewController(ctx, cluster.GetBasicCluster(), cluster.GetOpts(), hbStreams)
schedulers := schedulers.NewController(ctx, cluster, cluster.GetStorage(), opController)
c := &Coordinator{
ctx: ctx,
cancel: cancel,
Expand All @@ -101,7 +101,7 @@ func NewCoordinator(ctx context.Context, cluster sche.ClusterInformer, hbStreams
hbStreams: hbStreams,
pluginInterface: NewPluginInterface(),
}
c.diagnosticManager = diagnostic.NewManager(schedulers, cluster.GetPersistOptions())
c.diagnosticManager = diagnostic.NewManager(schedulers, cluster.GetOpts())
return c
}

Expand Down Expand Up @@ -316,7 +316,7 @@ func (c *Coordinator) RunUntilStop() {
c.Run()
<-c.ctx.Done()
log.Info("Coordinator is stopping")
c.GetSchedulersController().GetWaitGroup().Wait()
c.GetSchedulersController().Wait()
c.wg.Wait()
log.Info("Coordinator has been stopped")
}
Expand Down
16 changes: 4 additions & 12 deletions pkg/schedule/diagnostic/diagnostic_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,18 @@ import (
"time"

"github.com/tikv/pd/pkg/errs"
"github.com/tikv/pd/pkg/schedule/config"
"github.com/tikv/pd/pkg/schedule/schedulers"
"github.com/tikv/pd/server/config"
)

// Manager is used to manage the diagnostic result of schedulers for now.
type Manager struct {
config *config.PersistOptions
config config.Config
schedulerController *schedulers.Controller
}

// NewManager creates a new Manager.
func NewManager(schedulerController *schedulers.Controller, config *config.PersistOptions) *Manager {
func NewManager(schedulerController *schedulers.Controller, config config.Config) *Manager {
return &Manager{
config: config,
schedulerController: schedulerController,
Expand All @@ -48,15 +48,7 @@ func (d *Manager) GetDiagnosticResult(name string) (*schedulers.DiagnosticResult
res := &schedulers.DiagnosticResult{Name: name, Timestamp: ts, Status: schedulers.Disabled}
return res, nil
}
var isDisabled bool
t := scheduler.Scheduler.GetType()
scheduleConfig := d.config.GetScheduleConfig()
for _, s := range scheduleConfig.Schedulers {
if t == s.Type {
isDisabled = s.Disable
break
}
}
isDisabled := d.config.IsSchedulerDisabled(scheduler.Scheduler.GetType())
if isDisabled {
ts := uint64(time.Now().Unix())
res := &schedulers.DiagnosticResult{Name: name, Timestamp: ts, Status: schedulers.Disabled}
Expand Down
63 changes: 15 additions & 48 deletions pkg/schedule/schedulers/scheduler_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,8 @@ import (
"github.com/tikv/pd/pkg/schedule/labeler"
"github.com/tikv/pd/pkg/schedule/operator"
"github.com/tikv/pd/pkg/schedule/plan"
"github.com/tikv/pd/pkg/storage"
"github.com/tikv/pd/pkg/storage/endpoint"
"github.com/tikv/pd/pkg/utils/logutil"
"github.com/tikv/pd/server/config"
"go.uber.org/zap"
)

Expand All @@ -43,24 +42,26 @@ type Controller struct {
sync.RWMutex
wg sync.WaitGroup
ctx context.Context
cluster sche.ClusterInformer
cluster sche.ScheduleCluster
storage endpoint.ConfigStorage
schedulers map[string]*ScheduleController
opController *operator.Controller
}

// NewController creates a scheduler controller.
func NewController(ctx context.Context, cluster sche.ClusterInformer, opController *operator.Controller) *Controller {
func NewController(ctx context.Context, cluster sche.ScheduleCluster, storage endpoint.ConfigStorage, opController *operator.Controller) *Controller {
return &Controller{
ctx: ctx,
cluster: cluster,
storage: storage,
schedulers: make(map[string]*ScheduleController),
opController: opController,
}
}

// GetWaitGroup returns the waitGroup of the controller.
func (c *Controller) GetWaitGroup() *sync.WaitGroup {
return &c.wg
// Wait waits on all schedulers to exit.
func (c *Controller) Wait() {
c.wg.Wait()
}

// GetScheduler returns a schedule controller by name.
Expand Down Expand Up @@ -108,7 +109,7 @@ func (c *Controller) CollectSchedulerMetrics() {
}

func (c *Controller) isSchedulingHalted() bool {
return c.cluster.GetPersistOptions().IsSchedulingHalted()
return c.cluster.GetOpts().IsSchedulingHalted()
}

// ResetSchedulerMetrics resets metrics of all schedulers.
Expand All @@ -133,7 +134,7 @@ func (c *Controller) AddScheduler(scheduler Scheduler, args ...string) error {
c.wg.Add(1)
go c.runScheduler(s)
c.schedulers[s.Scheduler.GetName()] = s
c.cluster.GetPersistOptions().AddSchedulerCfg(s.Scheduler.GetType(), args)
c.cluster.GetOpts().AddSchedulerCfg(s.Scheduler.GetType(), args)
return nil
}

Expand All @@ -149,18 +150,14 @@ func (c *Controller) RemoveScheduler(name string) error {
return errs.ErrSchedulerNotFound.FastGenByArgs()
}

opt := c.cluster.GetPersistOptions()
if err := c.removeOptScheduler(opt, name); err != nil {
log.Error("can not remove scheduler", zap.String("scheduler-name", name), errs.ZapError(err))
return err
}

if err := opt.Persist(c.cluster.GetStorage()); err != nil {
opt := c.cluster.GetOpts()
opt.RemoveSchedulerCfg(s.Scheduler.GetType())
if err := opt.Persist(c.storage); err != nil {
log.Error("the option can not persist scheduler config", errs.ZapError(err))
return err
}

if err := c.cluster.GetStorage().RemoveScheduleConfig(name); err != nil {
if err := c.storage.RemoveScheduleConfig(name); err != nil {
log.Error("can not remove the scheduler config", errs.ZapError(err))
return err
}
Expand All @@ -172,29 +169,6 @@ func (c *Controller) RemoveScheduler(name string) error {
return nil
}

func (c *Controller) removeOptScheduler(o *config.PersistOptions, name string) error {
v := o.GetScheduleConfig().Clone()
for i, schedulerCfg := range v.Schedulers {
// To create a temporary scheduler is just used to get scheduler's name
decoder := ConfigSliceDecoder(schedulerCfg.Type, schedulerCfg.Args)
tmp, err := CreateScheduler(schedulerCfg.Type, c.opController, storage.NewStorageWithMemoryBackend(), decoder, c.RemoveScheduler)
if err != nil {
return err
}
if tmp.GetName() == name {
if config.IsDefaultScheduler(tmp.GetType()) {
schedulerCfg.Disable = true
v.Schedulers[i] = schedulerCfg
} else {
v.Schedulers = append(v.Schedulers[:i], v.Schedulers[i+1:]...)
}
o.SetScheduleConfig(v)
return nil
}
}
return nil
}

// PauseOrResumeScheduler pauses or resumes a scheduler by name.
func (c *Controller) PauseOrResumeScheduler(name string, t int64) error {
c.Lock()
Expand Down Expand Up @@ -265,14 +239,7 @@ func (c *Controller) IsSchedulerDisabled(name string) (bool, error) {
if !ok {
return false, errs.ErrSchedulerNotFound.FastGenByArgs()
}
t := s.Scheduler.GetType()
scheduleConfig := c.cluster.GetPersistOptions().GetScheduleConfig()
for _, s := range scheduleConfig.Schedulers {
if t == s.Type {
return s.Disable, nil
}
}
return false, nil
return c.cluster.GetOpts().IsSchedulerDisabled(s.Scheduler.GetType()), nil
}

// IsSchedulerExisted returns whether a scheduler is existed.
Expand Down
16 changes: 8 additions & 8 deletions server/cluster/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2440,7 +2440,7 @@ func prepare(setCfg func(*config.ScheduleConfig), setTc func(*testCluster), run
}
return tc, co, func() {
co.Stop()
co.GetSchedulersController().GetWaitGroup().Wait()
co.GetSchedulersController().Wait()
co.GetWaitGroup().Wait()
hbStreams.Close()
cancel()
Expand Down Expand Up @@ -2723,7 +2723,7 @@ func TestCheckCache(t *testing.T) {
re.Len(oc.GetOperators(), 1)
re.Empty(co.GetCheckerController().GetWaitingRegions())

co.GetSchedulersController().GetWaitGroup().Wait()
co.GetSchedulersController().Wait()
co.GetWaitGroup().Wait()
re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/schedule/break-patrol"))
}
Expand Down Expand Up @@ -2976,7 +2976,7 @@ func TestPersistScheduler(t *testing.T) {
re.Len(sc.GetSchedulerNames(), defaultCount-3)
re.NoError(co.GetCluster().GetPersistOptions().Persist(storage))
co.Stop()
co.GetSchedulersController().GetWaitGroup().Wait()
co.GetSchedulersController().Wait()
co.GetWaitGroup().Wait()
// make a new coordinator for testing
// whether the schedulers added or removed in dynamic way are recorded in opt
Expand Down Expand Up @@ -3006,7 +3006,7 @@ func TestPersistScheduler(t *testing.T) {
sc = co.GetSchedulersController()
re.Len(sc.GetSchedulerNames(), 3)
co.Stop()
co.GetSchedulersController().GetWaitGroup().Wait()
co.GetSchedulersController().Wait()
co.GetWaitGroup().Wait()
// suppose restart PD again
_, newOpt, err = newTestScheduleConfig()
Expand Down Expand Up @@ -3034,7 +3034,7 @@ func TestPersistScheduler(t *testing.T) {
re.Len(sc.GetSchedulerNames(), 4)
re.NoError(co.GetCluster().GetPersistOptions().Persist(co.GetCluster().GetStorage()))
co.Stop()
co.GetSchedulersController().GetWaitGroup().Wait()
co.GetSchedulersController().Wait()
co.GetWaitGroup().Wait()
_, newOpt, err = newTestScheduleConfig()
re.NoError(err)
Expand Down Expand Up @@ -3091,7 +3091,7 @@ func TestRemoveScheduler(t *testing.T) {
re.Empty(sc.GetSchedulerNames())
re.NoError(co.GetCluster().GetPersistOptions().Persist(co.GetCluster().GetStorage()))
co.Stop()
co.GetSchedulersController().GetWaitGroup().Wait()
co.GetSchedulersController().Wait()
co.GetWaitGroup().Wait()

// suppose restart PD again
Expand All @@ -3105,7 +3105,7 @@ func TestRemoveScheduler(t *testing.T) {
// the option remains default scheduler
re.Len(co.GetCluster().GetPersistOptions().GetSchedulers(), defaultCount)
co.Stop()
co.GetSchedulersController().GetWaitGroup().Wait()
co.GetSchedulersController().Wait()
co.GetWaitGroup().Wait()
}

Expand Down Expand Up @@ -3137,7 +3137,7 @@ func TestRestart(t *testing.T) {
re.NoError(dispatchHeartbeat(co, region, stream))
region = waitPromoteLearner(re, stream, region, 2)
co.Stop()
co.GetSchedulersController().GetWaitGroup().Wait()
co.GetSchedulersController().Wait()
co.GetWaitGroup().Wait()

// Recreate coordinator then add another replica on store 3.
Expand Down
28 changes: 28 additions & 0 deletions server/config/persist_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -665,6 +665,17 @@ func (o *PersistOptions) GetSchedulers() SchedulerConfigs {
return o.GetScheduleConfig().Schedulers
}

// IsSchedulerDisabled returns if the scheduler is disabled.
func (o *PersistOptions) IsSchedulerDisabled(t string) bool {
schedulers := o.GetScheduleConfig().Schedulers
for _, s := range schedulers {
if t == s.Type {
return s.Disable
}
}
return false
}

// GetHotRegionsWriteInterval gets interval for PD to store Hot Region information.
func (o *PersistOptions) GetHotRegionsWriteInterval() time.Duration {
return o.GetScheduleConfig().HotRegionsWriteInterval.Duration
Expand Down Expand Up @@ -697,6 +708,23 @@ func (o *PersistOptions) AddSchedulerCfg(tp string, args []string) {
o.SetScheduleConfig(v)
}

// RemoveSchedulerCfg removes the scheduler configurations.
func (o *PersistOptions) RemoveSchedulerCfg(tp string) {
v := o.GetScheduleConfig().Clone()
for i, schedulerCfg := range v.Schedulers {
if tp == schedulerCfg.Type {
if IsDefaultScheduler(tp) {
schedulerCfg.Disable = true
v.Schedulers[i] = schedulerCfg
} else {
v.Schedulers = append(v.Schedulers[:i], v.Schedulers[i+1:]...)
}
o.SetScheduleConfig(v)
return
}
}
}

// SetLabelProperty sets the label property.
func (o *PersistOptions) SetLabelProperty(typ, labelKey, labelValue string) {
cfg := o.GetLabelPropertyConfig().Clone()
Expand Down

0 comments on commit 3cce629

Please sign in to comment.