Skip to content

Commit

Permalink
Using ClusterCacheTracker instead of remote.NewClusterClient
Browse files Browse the repository at this point in the history
  • Loading branch information
laozc committed Aug 4, 2023
1 parent b6b8803 commit 215e49e
Show file tree
Hide file tree
Showing 6 changed files with 163 additions and 32 deletions.
36 changes: 33 additions & 3 deletions controllers/controllers_suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,9 @@ import (
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/client-go/kubernetes/scheme"
clusterv1 "sigs.k8s.io/cluster-api/api/v1beta1"
"sigs.k8s.io/cluster-api/controllers/remote"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"

infrav1 "sigs.k8s.io/cluster-api-provider-vsphere/apis/v1beta1"
Expand Down Expand Up @@ -63,15 +65,43 @@ func setup() {

testEnv = helpers.NewTestEnvironment()

secretCachingClient, err := client.New(testEnv.Manager.GetConfig(), client.Options{
HTTPClient: testEnv.Manager.GetHTTPClient(),
Cache: &client.CacheOptions{
Reader: testEnv.Manager.GetCache(),
},
})
if err != nil {
panic("unable to create secret caching client")
}

tracker, err := remote.NewClusterCacheTracker(
testEnv.Manager,
remote.ClusterCacheTrackerOptions{
SecretCachingClient: secretCachingClient,
ControllerName: "testenv-manager",
},
)
if err != nil {
panic(fmt.Sprintf("unable to setup ClusterCacheTracker: %v", err))
}

controllerOpts := controller.Options{MaxConcurrentReconciles: 10}

if err := (&remote.ClusterCacheReconciler{
Client: testEnv.Manager.GetClient(),
Tracker: tracker,
}).SetupWithManager(ctx, testEnv.Manager, controllerOpts); err != nil {
panic(fmt.Sprintf("unable to create ClusterCacheReconciler controller: %v", err))
}

if err := AddClusterControllerToManager(testEnv.GetContext(), testEnv.Manager, &infrav1.VSphereCluster{}, controllerOpts); err != nil {
panic(fmt.Sprintf("unable to setup VsphereCluster controller: %v", err))
}
if err := AddMachineControllerToManager(testEnv.GetContext(), testEnv.Manager, &infrav1.VSphereMachine{}, controllerOpts); err != nil {
panic(fmt.Sprintf("unable to setup VsphereMachine controller: %v", err))
}
if err := AddVMControllerToManager(testEnv.GetContext(), testEnv.Manager, controllerOpts); err != nil {
if err := AddVMControllerToManager(testEnv.GetContext(), testEnv.Manager, tracker, controllerOpts); err != nil {
panic(fmt.Sprintf("unable to setup VsphereVM controller: %v", err))
}
if err := AddVsphereClusterIdentityControllerToManager(testEnv.GetContext(), testEnv.Manager, controllerOpts); err != nil {
Expand All @@ -80,10 +110,10 @@ func setup() {
if err := AddVSphereDeploymentZoneControllerToManager(testEnv.GetContext(), testEnv.Manager, controllerOpts); err != nil {
panic(fmt.Sprintf("unable to setup VSphereDeploymentZone controller: %v", err))
}
if err := AddServiceAccountProviderControllerToManager(testEnv.GetContext(), testEnv.Manager, controllerOpts); err != nil {
if err := AddServiceAccountProviderControllerToManager(testEnv.GetContext(), testEnv.Manager, tracker, controllerOpts); err != nil {
panic(fmt.Sprintf("unable to setup ServiceAccount controller: %v", err))
}
if err := AddServiceDiscoveryControllerToManager(testEnv.GetContext(), testEnv.Manager, controllerOpts); err != nil {
if err := AddServiceDiscoveryControllerToManager(testEnv.GetContext(), testEnv.Manager, tracker, controllerOpts); err != nil {
panic(fmt.Sprintf("unable to setup SvcDiscovery controller: %v", err))
}

Expand Down
14 changes: 9 additions & 5 deletions controllers/serviceaccount_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ const (
)

// AddServiceAccountProviderControllerToManager adds this controller to the provided manager.
func AddServiceAccountProviderControllerToManager(ctx *context.ControllerManagerContext, mgr manager.Manager, options controller.Options) error {
func AddServiceAccountProviderControllerToManager(ctx *context.ControllerManagerContext, mgr manager.Manager, tracker *remote.ClusterCacheTracker, options controller.Options) error {
var (
controlledType = &vmwarev1.ProviderServiceAccount{}
controlledTypeName = reflect.TypeOf(controlledType).Elem().Name()
Expand All @@ -82,8 +82,8 @@ func AddServiceAccountProviderControllerToManager(ctx *context.ControllerManager
Logger: ctx.Logger.WithName(controllerNameShort),
}
r := ServiceAccountReconciler{
ControllerContext: controllerContext,
remoteClientGetter: remote.NewClusterClient,
ControllerContext: controllerContext,
remoteClusterCacheTracker: tracker,
}

clusterToInfraFn := clusterToSupervisorInfrastructureMapFunc(ctx)
Expand Down Expand Up @@ -134,7 +134,7 @@ func clusterToSupervisorInfrastructureMapFunc(managerContext *context.Controller
type ServiceAccountReconciler struct {
*context.ControllerContext

remoteClientGetter remote.ClusterClientGetter
remoteClusterCacheTracker *remote.ClusterCacheTracker
}

func (r ServiceAccountReconciler) Reconcile(_ goctx.Context, req reconcile.Request) (_ reconcile.Result, reterr error) {
Expand Down Expand Up @@ -202,8 +202,12 @@ func (r ServiceAccountReconciler) Reconcile(_ goctx.Context, req reconcile.Reque
// then just return a no-op and wait for the next sync. This will occur when
// the Cluster's status is updated with a reference to the secret that has
// the Kubeconfig data used to access the target cluster.
guestClient, err := r.remoteClientGetter(clusterContext, ProviderServiceAccountControllerName, clusterContext.Client, client.ObjectKeyFromObject(cluster))
guestClient, err := r.remoteClusterCacheTracker.GetClient(clusterContext, client.ObjectKeyFromObject(cluster))
if err != nil {
if errors.Is(err, remote.ErrClusterLocked) {
r.Logger.V(5).Info("Requeuing because another worker has the lock on the ClusterCacheTracker")
return ctrl.Result{Requeue: true}, nil
}
clusterContext.Logger.Info("The control plane is not ready yet", "err", err)
return reconcile.Result{RequeueAfter: clusterNotReadyRequeueTime}, nil
}
Expand Down
14 changes: 9 additions & 5 deletions controllers/servicediscovery_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ const (
// +kubebuilder:rbac:groups="",resources=configmaps/status,verbs=get

// AddServiceDiscoveryControllerToManager adds the ServiceDiscovery controller to the provided manager.
func AddServiceDiscoveryControllerToManager(ctx *context.ControllerManagerContext, mgr manager.Manager, options controller.Options) error {
func AddServiceDiscoveryControllerToManager(ctx *context.ControllerManagerContext, mgr manager.Manager, tracker *remote.ClusterCacheTracker, options controller.Options) error {
var (
controllerNameShort = ServiceDiscoveryControllerName
controllerNameLong = fmt.Sprintf("%s/%s/%s", ctx.Namespace, ctx.Name, ServiceDiscoveryControllerName)
Expand All @@ -84,8 +84,8 @@ func AddServiceDiscoveryControllerToManager(ctx *context.ControllerManagerContex
Logger: ctx.Logger.WithName(controllerNameShort),
}
r := serviceDiscoveryReconciler{
ControllerContext: controllerContext,
remoteClientGetter: remote.NewClusterClient,
ControllerContext: controllerContext,
remoteClusterCacheTracker: tracker,
}

configMapCache, err := cache.New(mgr.GetConfig(), cache.Options{
Expand Down Expand Up @@ -128,7 +128,7 @@ func AddServiceDiscoveryControllerToManager(ctx *context.ControllerManagerContex
type serviceDiscoveryReconciler struct {
*context.ControllerContext

remoteClientGetter remote.ClusterClientGetter
remoteClusterCacheTracker *remote.ClusterCacheTracker
}

func (r serviceDiscoveryReconciler) Reconcile(_ goctx.Context, req reconcile.Request) (_ reconcile.Result, reterr error) {
Expand Down Expand Up @@ -189,8 +189,12 @@ func (r serviceDiscoveryReconciler) Reconcile(_ goctx.Context, req reconcile.Req

// We cannot proceed until we are able to access the target cluster. Until
// then just return a no-op and wait for the next sync.
guestClient, err := r.remoteClientGetter(clusterContext, ServiceDiscoveryControllerName, clusterContext.Client, client.ObjectKeyFromObject(cluster))
guestClient, err := r.remoteClusterCacheTracker.GetClient(clusterContext, client.ObjectKeyFromObject(cluster))
if err != nil {
if errors.Is(err, remote.ErrClusterLocked) {
r.Logger.V(5).Info("Requeuing because another worker has the lock on the ClusterCacheTracker")
return ctrl.Result{Requeue: true}, nil
}
logger.Info("The control plane is not ready yet", "err", err)
return reconcile.Result{RequeueAfter: clusterNotReadyRequeueTime}, nil
}
Expand Down
32 changes: 22 additions & 10 deletions controllers/vspherevm_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ import (
// AddVMControllerToManager adds the VM controller to the provided manager.
//
//nolint:forcetypeassert
func AddVMControllerToManager(ctx *context.ControllerManagerContext, mgr manager.Manager, options controller.Options) error {
func AddVMControllerToManager(ctx *context.ControllerManagerContext, mgr manager.Manager, tracker *remote.ClusterCacheTracker, options controller.Options) error {
var (
controlledType = &infrav1.VSphereVM{}
controlledTypeName = reflect.TypeOf(controlledType).Elem().Name()
Expand All @@ -88,8 +88,9 @@ func AddVMControllerToManager(ctx *context.ControllerManagerContext, mgr manager
Logger: ctx.Logger.WithName(controllerNameShort),
}
r := vmReconciler{
ControllerContext: controllerContext,
VMService: &govmomi.VMService{},
ControllerContext: controllerContext,
VMService: &govmomi.VMService{},
remoteClusterCacheTracker: tracker,
}
controller, err := ctrl.NewControllerManagedBy(mgr).
// Watch the controlled, infrastructure resource.
Expand Down Expand Up @@ -158,7 +159,8 @@ func AddVMControllerToManager(ctx *context.ControllerManagerContext, mgr manager
type vmReconciler struct {
*context.ControllerContext

VMService services.VirtualMachineService
VMService services.VirtualMachineService
remoteClusterCacheTracker *remote.ClusterCacheTracker
}

// Reconcile ensures the back-end state reflects the Kubernetes resource state intent.
Expand Down Expand Up @@ -344,9 +346,14 @@ func (r vmReconciler) reconcileDelete(ctx *context.VMContext) (reconcile.Result,
}

// Attempt to delete the node corresponding to the vsphere VM
if err := r.deleteNode(ctx, vm.Name); err != nil {
result, err = r.deleteNode(ctx, vm.Name)
if err != nil {
r.Logger.V(6).Info("unable to delete node", "err", err)
}
if !result.IsZero() {
// a non-zero value means we need to requeue the request before proceed.
return result, nil
}

if err := r.deleteIPAddressClaims(ctx); err != nil {
return reconcile.Result{}, err
Expand All @@ -362,15 +369,19 @@ func (r vmReconciler) reconcileDelete(ctx *context.VMContext) (reconcile.Result,
// This is necessary since CAPI does not the nodeRef field on the owner Machine object
// until the node moves to Ready state. Hence, on Machine deletion it is unable to delete
// the kubernetes node corresponding to the VM.
func (r vmReconciler) deleteNode(ctx *context.VMContext, name string) error {
func (r vmReconciler) deleteNode(ctx *context.VMContext, name string) (reconcile.Result, error) {
// Fetching the cluster object from the VSphereVM object to create a remote client to the cluster
cluster, err := clusterutilv1.GetClusterFromMetadata(r.ControllerContext, r.Client, ctx.VSphereVM.ObjectMeta)
if err != nil {
return err
return ctrl.Result{}, err
}
clusterClient, err := remote.NewClusterClient(ctx, r.ControllerContext.Name, r.Client, ctrlclient.ObjectKeyFromObject(cluster))
clusterClient, err := r.remoteClusterCacheTracker.GetClient(ctx, ctrlclient.ObjectKeyFromObject(cluster))
if err != nil {
return err
if errors.Is(err, remote.ErrClusterLocked) {
r.Logger.V(5).Info("Requeuing because another worker has the lock on the ClusterCacheTracker")
return ctrl.Result{Requeue: true}, nil
}
return ctrl.Result{}, err
}

// Attempt to delete the corresponding node
Expand All @@ -379,7 +390,8 @@ func (r vmReconciler) deleteNode(ctx *context.VMContext, name string) error {
Name: name,
},
}
return clusterClient.Delete(ctx, node)
err = clusterClient.Delete(ctx, node)
return ctrl.Result{}, err
}

func (r vmReconciler) reconcileNormal(ctx *context.VMContext) (reconcile.Result, error) {
Expand Down
37 changes: 35 additions & 2 deletions controllers/vspherevm_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,13 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
apirecord "k8s.io/client-go/tools/record"
clusterv1 "sigs.k8s.io/cluster-api/api/v1beta1"
"sigs.k8s.io/cluster-api/controllers/remote"
ipamv1 "sigs.k8s.io/cluster-api/exp/ipam/api/v1alpha1"
"sigs.k8s.io/cluster-api/util"
"sigs.k8s.io/cluster-api/util/conditions"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/reconcile"

Expand Down Expand Up @@ -73,6 +75,36 @@ func TestReconcileNormal_WaitingForIPAddrAllocation(t *testing.T) {
}
defer simr.Destroy()

secretCachingClient, err := client.New(testEnv.Manager.GetConfig(), client.Options{
HTTPClient: testEnv.Manager.GetHTTPClient(),
Cache: &client.CacheOptions{
Reader: testEnv.Manager.GetCache(),
},
})
if err != nil {
panic("unable to create secret caching client")
}

tracker, err := remote.NewClusterCacheTracker(
testEnv.Manager,
remote.ClusterCacheTrackerOptions{
SecretCachingClient: secretCachingClient,
ControllerName: "testvspherevm-manager",
},
)
if err != nil {
t.Fatalf("unable to setup ClusterCacheTracker: %v", err)
}

controllerOpts := controller.Options{MaxConcurrentReconciles: 10}

if err := (&remote.ClusterCacheReconciler{
Client: testEnv.Manager.GetClient(),
Tracker: tracker,
}).SetupWithManager(ctx, testEnv.Manager, controllerOpts); err != nil {
panic(fmt.Sprintf("unable to create ClusterCacheReconciler controller: %v", err))
}

create := func(netSpec infrav1.NetworkSpec) func() {
return func() {
vsphereCluster = &infrav1.VSphereCluster{
Expand Down Expand Up @@ -177,8 +209,9 @@ func TestReconcileNormal_WaitingForIPAddrAllocation(t *testing.T) {
Logger: log.Log,
}
return vmReconciler{
ControllerContext: controllerContext,
VMService: vmService,
ControllerContext: controllerContext,
VMService: vmService,
remoteClusterCacheTracker: tracker,
}
}

Expand Down
Loading

0 comments on commit 215e49e

Please sign in to comment.