Skip to content

Commit

Permalink
refactor(tasks): simplify context cluster task (#6081)
Browse files Browse the repository at this point in the history
  • Loading branch information
liubog2008 authored Feb 18, 2025
1 parent 702e096 commit ffbbc1f
Show file tree
Hide file tree
Showing 29 changed files with 246 additions and 137 deletions.
8 changes: 0 additions & 8 deletions pkg/apiutil/core/v1alpha1/group.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,6 @@ import (
"github.com/pingcap/tidb-operator/pkg/runtime/scope"
)

func Cluster[
S scope.Object[F, T],
F client.Object,
T runtime.Object,
](f F) string {
return scope.From[S](f).Cluster()
}

func Version[
S scope.Group[F, T],
F client.Object,
Expand Down
29 changes: 29 additions & 0 deletions pkg/apiutil/core/v1alpha1/object.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
// Copyright 2024 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package coreutil

import (
"github.com/pingcap/tidb-operator/pkg/client"
"github.com/pingcap/tidb-operator/pkg/runtime"
"github.com/pingcap/tidb-operator/pkg/runtime/scope"
)

func Cluster[
S scope.Object[F, T],
F client.Object,
T runtime.Object,
](f F) string {
return scope.From[S](f).Cluster()
}
16 changes: 8 additions & 8 deletions pkg/controllers/common/resource_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,19 +33,19 @@ func TestResource(t *testing.T) {
}{
{
desc: "normal",
ns: Namespace("aaa"),
ns: Namespace("xxx"),
name: Name("bbb"),
obj: 42,
expectedNs: "aaa",
expectedNs: "xxx",
expectedName: "bbb",
expectedObj: 42,
},
{
desc: "use name func",
ns: Lazy[string](func() string { return "aaa" }),
ns: Lazy[string](func() string { return "xxx" }),
name: Lazy[string](func() string { return "bbb" }),
obj: 42,
expectedNs: "aaa",
expectedNs: "xxx",
expectedName: "bbb",
expectedObj: 42,
},
Expand Down Expand Up @@ -85,21 +85,21 @@ func TestResourceSlice(t *testing.T) {
}{
{
desc: "normal",
ns: Namespace("aaa"),
ns: Namespace("nnn"),
labels: Labels(map[string]string{"xxx": "yyy"}),
objs: []*int{ptr.To(42)},
expectedNs: "aaa",
expectedNs: "nnn",
expectedLabels: map[string]string{
"xxx": "yyy",
},
expectedObjs: []*int{ptr.To(42)},
},
{
desc: "use func",
ns: Lazy[string](func() string { return "aaa" }),
ns: Lazy[string](func() string { return "nnn" }),
labels: LabelsFunc(func() map[string]string { return map[string]string{"xxx": "yyy"} }),
objs: []*int{ptr.To(42)},
expectedNs: "aaa",
expectedNs: "nnn",
expectedLabels: map[string]string{
"xxx": "yyy",
},
Expand Down
40 changes: 35 additions & 5 deletions pkg/controllers/common/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,11 @@ import (
"k8s.io/apimachinery/pkg/types"

"github.com/pingcap/tidb-operator/api/v2/core/v1alpha1"
coreutil "github.com/pingcap/tidb-operator/pkg/apiutil/core/v1alpha1"
"github.com/pingcap/tidb-operator/pkg/client"
"github.com/pingcap/tidb-operator/pkg/features"
"github.com/pingcap/tidb-operator/pkg/runtime"
"github.com/pingcap/tidb-operator/pkg/runtime/scope"
"github.com/pingcap/tidb-operator/pkg/utils/task/v3"
)

Expand Down Expand Up @@ -114,11 +117,6 @@ func TaskContextTiFlash(state TiFlashStateInitializer, c client.Client) task.Tas
return taskContextResource("TiFlash", w, c, false)
}

func TaskContextCluster(state ClusterStateInitializer, c client.Client) task.Task {
w := state.ClusterInitializer()
return taskContextResource("Cluster", w, c, true)
}

func TaskContextPod(state PodStateInitializer, c client.Client) task.Task {
w := state.PodInitializer()
return taskContextResource("Pod", w, c, false)
Expand Down Expand Up @@ -193,3 +191,35 @@ func TaskFeatureGates(state ClusterState) task.Task {
return task.Complete().With("feature gates are initialized")
})
}

type ContextClusterNewer[
F client.Object,
] interface {
Object() F
SetCluster(c *v1alpha1.Cluster)
}

func TaskContextCluster[
S scope.Object[F, T],
F client.Object,
T runtime.Object,
](state ContextClusterNewer[F], c client.Client) task.Task {
return task.NameTaskFunc("ContextCluster", func(ctx context.Context) task.Result {
cluster := v1alpha1.Cluster{}
obj := state.Object()

key := types.NamespacedName{
Namespace: obj.GetNamespace(),
Name: coreutil.Cluster[S](obj),
}
if err := c.Get(ctx, key, &cluster); err != nil {
if !errors.IsNotFound(err) {
return task.Fail().With("can't get %s: %v", key, err)
}

return task.Fail().With("cannot find %s: %v", key, err)
}
state.SetCluster(&cluster)
return task.Complete().With("cluster is set")
})
}
67 changes: 47 additions & 20 deletions pkg/controllers/common/task_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/pingcap/tidb-operator/api/v2/core/v1alpha1"
"github.com/pingcap/tidb-operator/pkg/client"
"github.com/pingcap/tidb-operator/pkg/features"
"github.com/pingcap/tidb-operator/pkg/runtime/scope"
"github.com/pingcap/tidb-operator/pkg/utils/fake"
"github.com/pingcap/tidb-operator/pkg/utils/task/v3"
)
Expand Down Expand Up @@ -95,10 +96,35 @@ func TestTaskContextPD(t *testing.T) {
}
}

type fakeObjectState[
F client.Object,
] struct {
obj F
cluster *v1alpha1.Cluster
}

func (s *fakeObjectState[F]) Object() F {
return s.obj
}

func (s *fakeObjectState[F]) SetCluster(c *v1alpha1.Cluster) {
s.cluster = c
}

func newFakeObjectState[
F client.Object,
](f F) *fakeObjectState[F] {
return &fakeObjectState[F]{
obj: f,
}
}

func TestTaskContextCluster(t *testing.T) {
const ns = "aaa"
const name = "bbb"
cases := []struct {
desc string
state *fakeState[v1alpha1.Cluster]
state *fakeObjectState[*v1alpha1.PD]
objs []client.Object
unexpectedErr bool

Expand All @@ -107,32 +133,35 @@ func TestTaskContextCluster(t *testing.T) {
}{
{
desc: "success",
state: &fakeState[v1alpha1.Cluster]{
ns: "aaa",
name: "aaa",
},
state: newFakeObjectState(fake.FakeObj(name, func(obj *v1alpha1.PD) *v1alpha1.PD {
obj.Namespace = ns
obj.Spec.Cluster.Name = name
return obj
})),
objs: []client.Object{
fake.FakeObj("aaa", fake.SetNamespace[v1alpha1.Cluster]("aaa")),
fake.FakeObj(name, fake.SetNamespace[v1alpha1.Cluster](ns)),
},
expectedResult: task.SComplete,
expectedObj: fake.FakeObj("aaa", fake.SetNamespace[v1alpha1.Cluster]("aaa")),
expectedObj: fake.FakeObj(name, fake.SetNamespace[v1alpha1.Cluster](ns)),
},
{
desc: "not found",
state: &fakeState[v1alpha1.Cluster]{
ns: "aaa",
name: "aaa",
},
state: newFakeObjectState(fake.FakeObj(name, func(obj *v1alpha1.PD) *v1alpha1.PD {
obj.Namespace = ns
obj.Spec.Cluster.Name = name
return obj
})),
expectedResult: task.SFail,
},
{
desc: "has unexpected error",
state: &fakeState[v1alpha1.Cluster]{
ns: "aaa",
name: "aaa",
},
state: newFakeObjectState(fake.FakeObj(name, func(obj *v1alpha1.PD) *v1alpha1.PD {
obj.Namespace = ns
obj.Spec.Cluster.Name = name
return obj
})),
objs: []client.Object{
fake.FakeObj("aaa", fake.SetNamespace[v1alpha1.Cluster]("aaa")),
fake.FakeObj(name, fake.SetNamespace[v1alpha1.Cluster](ns)),
},
unexpectedErr: true,
expectedResult: task.SFail,
Expand All @@ -149,12 +178,10 @@ func TestTaskContextCluster(t *testing.T) {
if c.unexpectedErr {
fc.WithError("*", "*", errors.NewInternalError(fmt.Errorf("fake internal err")))
}
s := &fakeClusterState{s: c.state}

res, done := task.RunTask(context.Background(), TaskContextCluster(s, fc))
res, done := task.RunTask(context.Background(), TaskContextCluster[scope.PD](c.state, fc))
assert.Equal(tt, c.expectedResult, res.Status(), c.desc)
assert.False(tt, done, c.desc)
assert.Equal(tt, c.expectedObj, c.state.obj, c.desc)
assert.Equal(tt, c.expectedObj, c.state.cluster, c.desc)
})
}
}
Expand Down
3 changes: 2 additions & 1 deletion pkg/controllers/pd/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/pingcap/tidb-operator/pkg/controllers/common"
"github.com/pingcap/tidb-operator/pkg/controllers/pd/tasks"
"github.com/pingcap/tidb-operator/pkg/runtime"
"github.com/pingcap/tidb-operator/pkg/runtime/scope"
"github.com/pingcap/tidb-operator/pkg/utils/task/v3"
)

Expand All @@ -29,7 +30,7 @@ func (r *Reconciler) NewRunner(state *tasks.ReconcileContext, reporter task.Task
task.IfBreak(common.CondInstanceHasBeenDeleted(state)),

// get cluster
common.TaskContextCluster(state, r.Client),
common.TaskContextCluster[scope.PD](state, r.Client),
common.TaskFeatureGates(state),
// if it's paused just return
task.IfBreak(common.CondClusterIsPaused(state)),
Expand Down
21 changes: 11 additions & 10 deletions pkg/controllers/pd/tasks/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ type state struct {

type State interface {
common.PDStateInitializer
common.ClusterStateInitializer
common.PodStateInitializer
common.PDSliceStateInitializer

Expand All @@ -46,6 +45,9 @@ type State interface {
common.PDSliceState

common.InstanceState[*runtime.PD]

common.ContextClusterNewer[*v1alpha1.PD]

SetPod(*corev1.Pod)
}

Expand All @@ -56,6 +58,10 @@ func NewState(key types.NamespacedName) State {
return s
}

func (s *state) Object() *v1alpha1.PD {
return s.pd
}

func (s *state) PD() *v1alpha1.PD {
return s.pd
}
Expand All @@ -76,6 +82,10 @@ func (s *state) SetPod(pod *corev1.Pod) {
s.pod = pod
}

func (s *state) SetCluster(cluster *v1alpha1.Cluster) {
s.cluster = cluster
}

func (s *state) PDSlice() []*v1alpha1.PD {
return s.pds
}
Expand All @@ -87,15 +97,6 @@ func (s *state) PDInitializer() common.PDInitializer {
Initializer()
}

func (s *state) ClusterInitializer() common.ClusterInitializer {
return common.NewResource(func(cluster *v1alpha1.Cluster) { s.cluster = cluster }).
WithNamespace(common.Namespace(s.key.Namespace)).
WithName(common.Lazy[string](func() string {
return s.pd.Spec.Cluster.Name
})).
Initializer()
}

func (s *state) PodInitializer() common.PodInitializer {
return common.NewResource(s.SetPod).
WithNamespace(common.Namespace(s.key.Namespace)).
Expand Down
3 changes: 2 additions & 1 deletion pkg/controllers/pd/tasks/state_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/pingcap/tidb-operator/api/v2/core/v1alpha1"
"github.com/pingcap/tidb-operator/pkg/client"
"github.com/pingcap/tidb-operator/pkg/controllers/common"
"github.com/pingcap/tidb-operator/pkg/runtime/scope"
"github.com/pingcap/tidb-operator/pkg/utils/fake"
"github.com/pingcap/tidb-operator/pkg/utils/task/v3"
)
Expand Down Expand Up @@ -120,7 +121,7 @@ func TestState(t *testing.T) {
ctx := context.Background()
res, done := task.RunTask(ctx, task.Block(
common.TaskContextPD(s, fc),
common.TaskContextCluster(s, fc),
common.TaskContextCluster[scope.PD](s, fc),
common.TaskContextPDSlice(s, fc),
common.TaskContextPod(s, fc),
))
Expand Down
3 changes: 2 additions & 1 deletion pkg/controllers/pdgroup/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/pingcap/tidb-operator/pkg/controllers/common"
"github.com/pingcap/tidb-operator/pkg/controllers/pdgroup/tasks"
"github.com/pingcap/tidb-operator/pkg/runtime"
"github.com/pingcap/tidb-operator/pkg/runtime/scope"
"github.com/pingcap/tidb-operator/pkg/utils/task/v3"
)

Expand All @@ -29,7 +30,7 @@ func (r *Reconciler) NewRunner(state *tasks.ReconcileContext, reporter task.Task
task.IfBreak(common.CondGroupHasBeenDeleted(state)),

// get cluster
common.TaskContextCluster(state, r.Client),
common.TaskContextCluster[scope.PDGroup](state, r.Client),
common.TaskFeatureGates(state),
// if it's paused just return
task.IfBreak(common.CondClusterIsPaused(state)),
Expand Down
Loading

0 comments on commit ffbbc1f

Please sign in to comment.