Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

(finalizers): refactor #124

Merged
merged 3 commits into from
Jan 14, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion controllers/druid/druid_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ func (r *DruidReconciler) Reconcile(ctx context.Context, request reconcile.Reque
return ctrl.Result{}, err
}

// Intialize Emit Events
// Initialize Emit Events
var emitEvent EventEmitter = EmitEventFuncs{r.Recorder}

if err := deployDruidCluster(ctx, r.Client, instance, emitEvent); err != nil {
Expand Down
114 changes: 114 additions & 0 deletions controllers/druid/finalizers.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
package druid

import (
"context"
"fmt"

"github.com/datainfrahq/druid-operator/apis/druid/v1alpha1"
appsv1 "k8s.io/api/apps/v1"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
)

const (
deletePVCFinalizerName = "deletepvc.finalizers.druid.apache.org"
)

var (
defaultFinalizers []string
)

func updateFinalizers(ctx context.Context, sdk client.Client, m *v1alpha1.Druid, emitEvents EventEmitter) error {
desiredFinalizers := m.GetFinalizers()
additionFinalizers := defaultFinalizers

desiredFinalizers = RemoveString(desiredFinalizers, deletePVCFinalizerName)
if !m.Spec.DisablePVCDeletionFinalizer {
additionFinalizers = append(additionFinalizers, deletePVCFinalizerName)
}

for _, finalizer := range additionFinalizers {
if !ContainsString(desiredFinalizers, finalizer) {
desiredFinalizers = append(desiredFinalizers, finalizer)
}
}

m.SetFinalizers(desiredFinalizers)
_, err := writers.Update(ctx, sdk, m, m, emitEvents)
if err != nil {
return err
}

return nil
}

func executeFinalizers(ctx context.Context, sdk client.Client, m *v1alpha1.Druid, emitEvents EventEmitter) error {
if m.Spec.DisablePVCDeletionFinalizer == false {
if err := executePVCFinalizer(ctx, sdk, m, emitEvents); err != nil {
return err
}
}
return nil
}

/*
executePVCFinalizer will execute a PVC deletion of all Druid's PVCs.
Flow:
1. Get sts List and PVC List
2. Range and Delete sts first and then delete pvc. PVC must be deleted after sts termination has been executed
else pvc finalizer shall block deletion since a pod/sts is referencing it.
3. Once delete is executed we block program and return.
*/
func executePVCFinalizer(ctx context.Context, sdk client.Client, m *v1alpha1.Druid, emitEvents EventEmitter) error {
if ContainsString(m.ObjectMeta.Finalizers, deletePVCFinalizerName) {
pvcLabels := map[string]string{
"druid_cr": m.Name,
}

pvcList, err := readers.List(ctx, sdk, m, pvcLabels, emitEvents, func() objectList { return &v1.PersistentVolumeClaimList{} }, func(listObj runtime.Object) []object {
items := listObj.(*v1.PersistentVolumeClaimList).Items
result := make([]object, len(items))
for i := 0; i < len(items); i++ {
result[i] = &items[i]
}
return result
})
if err != nil {
return err
}

stsList, err := readers.List(ctx, sdk, m, makeLabelsForDruid(m.Name), emitEvents, func() objectList { return &appsv1.StatefulSetList{} }, func(listObj runtime.Object) []object {
items := listObj.(*appsv1.StatefulSetList).Items
result := make([]object, len(items))
for i := 0; i < len(items); i++ {
result[i] = &items[i]
}
return result
})
if err != nil {
return err
}

msg := fmt.Sprintf("Trigerring finalizer for CR [%s] in namespace [%s]", m.Name, m.Namespace)
// sendEvent(sdk, m, v1.EventTypeNormal, DruidFinalizer, msg)
logger.Info(msg)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as we our refactoring, we can convert this to an event.

if err := deleteSTSAndPVC(ctx, sdk, m, stsList, pvcList, emitEvents); err != nil {
return err
} else {
msg := fmt.Sprintf("Finalizer success for CR [%s] in namespace [%s]", m.Name, m.Namespace)
// sendEvent(sdk, m, v1.EventTypeNormal, DruidFinalizerSuccess, msg)
logger.Info(msg)
AdheipSingh marked this conversation as resolved.
Show resolved Hide resolved
}

// remove our finalizer from the list and update it.
m.ObjectMeta.Finalizers = RemoveString(m.ObjectMeta.Finalizers, deletePVCFinalizerName)

_, err = writers.Update(ctx, sdk, m, m, emitEvents)
if err != nil {
return err
}

}
return nil
}
141 changes: 141 additions & 0 deletions controllers/druid/finalizers_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
package druid

import (
"time"

druidv1alpha1 "github.com/datainfrahq/druid-operator/apis/druid/v1alpha1"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
"k8s.io/apimachinery/pkg/types"
)

// +kubebuilder:docs-gen:collapse=Imports

/*
finalizers_test
*/
var _ = Describe("Test finalizers logic", func() {
const (
filePath = "testdata/finalizers.yaml"
timeout = time.Second * 45
interval = time.Millisecond * 250
)

var (
druid = &druidv1alpha1.Druid{}
)

Context("When creating a druid cluster", func() {
It("Should create the druid object", func() {
By("Creating a new druid")
druidCR, err := readDruidClusterSpecFromFile(filePath)
Expect(err).Should(BeNil())
Expect(k8sClient.Create(ctx, druidCR)).To(Succeed())

By("Getting a newly created druid")
Eventually(func() bool {
err := k8sClient.Get(ctx, types.NamespacedName{Name: druidCR.Name, Namespace: druidCR.Namespace}, druid)
return err == nil
}, timeout, interval).Should(BeTrue())
})
It("Should add the delete PVC finalizer", func() {
By("Waiting for the finalizer to be created")
Eventually(func() bool {
err := k8sClient.Get(ctx, types.NamespacedName{Name: druid.Name, Namespace: druid.Namespace}, druid)
if err == nil && ContainsString(druid.GetFinalizers(), deletePVCFinalizerName) {
return true
}
return false
}, timeout, interval).Should(BeTrue())
})
It("Should delete druid successfully", func() {
By("Waiting for the druid cluster to be deleted")
Expect(k8sClient.Delete(ctx, druid)).To(Succeed())
Eventually(func() bool {
err := k8sClient.Get(ctx, types.NamespacedName{Name: druid.Name, Namespace: druid.Namespace}, druid)
return err != nil
}, timeout, interval).Should(BeTrue())
})
})

Context("When creating a druid cluster with disablePVCDeletion", func() {
It("Should create the druid object", func() {
By("Creating a new druid")
druidCR, err := readDruidClusterSpecFromFile(filePath)
druidCR.Spec.DisablePVCDeletionFinalizer = true
Expect(err).Should(BeNil())
Expect(k8sClient.Create(ctx, druidCR)).To(Succeed())

By("Getting a newly created druid")
Eventually(func() bool {
err := k8sClient.Get(ctx, types.NamespacedName{Name: druidCR.Name, Namespace: druidCR.Namespace}, druid)
return err == nil
}, timeout, interval).Should(BeTrue())
})
It("Should not add the delete PVC finalizer", func() {
By("Call for the update finalizer function")
Expect(updateFinalizers(ctx, k8sClient, druid, emitEvent)).Should(BeNil())

By("Getting a updated druid")
Eventually(func() bool {
err := k8sClient.Get(ctx, types.NamespacedName{Name: druid.Name, Namespace: druid.Namespace}, druid)
return err == nil
}, timeout, interval).Should(BeTrue())

By("Checking the absence of the finalizer")
Expect(ContainsString(druid.GetFinalizers(), deletePVCFinalizerName)).Should(BeFalse())
})
It("Should delete druid successfully", func() {
Expect(k8sClient.Delete(ctx, druid)).To(Succeed())
Eventually(func() bool {
err := k8sClient.Get(ctx, types.NamespacedName{Name: druid.Name, Namespace: druid.Namespace}, druid)
return err != nil
}, timeout, interval).Should(BeTrue())
})
})

Context("When creating a druid cluster", func() {
It("Should create the druid object", func() {
By("Creating a new druid")
druidCR, err := readDruidClusterSpecFromFile(filePath)
Expect(err).Should(BeNil())
Expect(k8sClient.Create(ctx, druidCR)).To(Succeed())

By("Getting the CR")
Eventually(func() bool {
err := k8sClient.Get(ctx, types.NamespacedName{Name: druidCR.Name, Namespace: druidCR.Namespace}, druid)
return err == nil
}, timeout, interval).Should(BeTrue())
})
It("Should add the delete PVC finalizer", func() {
By("Waiting for the finalizer to be created")
Eventually(func() bool {
err := k8sClient.Get(ctx, types.NamespacedName{Name: druid.Name, Namespace: druid.Namespace}, druid)
if err == nil && ContainsString(druid.GetFinalizers(), deletePVCFinalizerName) {
return true
}
return false
}, timeout, interval).Should(BeTrue())
})
It("Should remove the delete PVC finalizer", func() {
By("Disabling the deletePVC finalizer")
druid.Spec.DisablePVCDeletionFinalizer = true
Expect(k8sClient.Update(ctx, druid)).To(BeNil())
By("Waiting for the finalizer to be deleted")
Eventually(func() bool {
err := k8sClient.Get(ctx, types.NamespacedName{Name: druid.Name, Namespace: druid.Namespace}, druid)
if err == nil && !ContainsString(druid.GetFinalizers(), deletePVCFinalizerName) {
return true
}
return false
}, timeout, interval).Should(BeTrue())
})
It("Should delete druid successfully", func() {
Expect(k8sClient.Delete(ctx, druid)).To(Succeed())
Eventually(func() bool {
err := k8sClient.Get(ctx, types.NamespacedName{Name: druid.Name, Namespace: druid.Namespace}, druid)
return err != nil
}, timeout, interval).Should(BeTrue())
})
})
})
90 changes: 6 additions & 84 deletions controllers/druid/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,35 +80,12 @@ func deployDruidCluster(ctx context.Context, sdk client.Client, m *v1alpha1.Drui
return err
}

/*
Default Behavior: Finalizer shall be always executed resulting in deletion of pvc post deletion of Druid CR
When the object (druid CR) has for deletion time stamp set, execute the finalizer.
Finalizer shall execute the following flow :
1. Get sts List and PVC List
2. Range and Delete sts first and then delete pvc. PVC must be deleted after sts termination has been executed
else pvc finalizer shall block deletion since a pod/sts is referencing it.
3. Once delete is executed we block program and return.
*/

if m.Spec.DisablePVCDeletionFinalizer == false {
md := m.GetDeletionTimestamp() != nil
if md {
return executeFinalizers(ctx, sdk, m, emitEvents)
}
/*
If finalizer isn't present add it to object meta.
In case cr is already deleted do not call this function
*/
cr := checkIfCRExists(ctx, sdk, m, emitEvents)
if cr {
if !ContainsString(m.ObjectMeta.Finalizers, finalizerName) {
m.SetFinalizers(append(m.GetFinalizers(), finalizerName))
_, err := writers.Update(context.Background(), sdk, m, m, emitEvents)
if err != nil {
return err
}
}
}
if m.GetDeletionTimestamp() != nil {
return executeFinalizers(ctx, sdk, m, emitEvents)
}

if err := updateFinalizers(ctx, sdk, m, emitEvents); err != nil {
return err
}

for _, elem := range allNodeSpecs {
Expand Down Expand Up @@ -574,61 +551,6 @@ func setPVCLabels(ctx context.Context, sdk client.Client, drd *v1alpha1.Druid, e
return nil
}

func executeFinalizers(ctx context.Context, sdk client.Client, m *v1alpha1.Druid, emitEvents EventEmitter) error {

if ContainsString(m.ObjectMeta.Finalizers, finalizerName) {
pvcLabels := map[string]string{
"druid_cr": m.Name,
}

pvcList, err := readers.List(ctx, sdk, m, pvcLabels, emitEvents, func() objectList { return &v1.PersistentVolumeClaimList{} }, func(listObj runtime.Object) []object {
items := listObj.(*v1.PersistentVolumeClaimList).Items
result := make([]object, len(items))
for i := 0; i < len(items); i++ {
result[i] = &items[i]
}
return result
})
if err != nil {
return err
}

stsList, err := readers.List(ctx, sdk, m, makeLabelsForDruid(m.Name), emitEvents, func() objectList { return &appsv1.StatefulSetList{} }, func(listObj runtime.Object) []object {
items := listObj.(*appsv1.StatefulSetList).Items
result := make([]object, len(items))
for i := 0; i < len(items); i++ {
result[i] = &items[i]
}
return result
})
if err != nil {
return err
}

msg := fmt.Sprintf("Trigerring finalizer for CR [%s] in namespace [%s]", m.Name, m.Namespace)
// sendEvent(sdk, m, v1.EventTypeNormal, DruidFinalizer, msg)
logger.Info(msg)
if err := deleteSTSAndPVC(ctx, sdk, m, stsList, pvcList, emitEvents); err != nil {
return err
} else {
msg := fmt.Sprintf("Finalizer success for CR [%s] in namespace [%s]", m.Name, m.Namespace)
// sendEvent(sdk, m, v1.EventTypeNormal, DruidFinalizerSuccess, msg)
logger.Info(msg)
}

// remove our finalizer from the list and update it.
m.ObjectMeta.Finalizers = RemoveString(m.ObjectMeta.Finalizers, finalizerName)

_, err = writers.Update(ctx, sdk, m, m, emitEvents)
if err != nil {
return err
}

}
return nil

}

func execCheckCrashStatus(ctx context.Context, sdk client.Client, nodeSpec *v1alpha1.DruidNodeSpec, m *v1alpha1.Druid, event EventEmitter) {
if m.Spec.ForceDeleteStsPodOnError == false {
return
Expand Down
3 changes: 3 additions & 0 deletions controllers/druid/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ var (
testEnv *envtest.Environment
ctx context.Context
cancel context.CancelFunc
emitEvent EventEmitter
)

func TestAPIs(t *testing.T) {
Expand Down Expand Up @@ -93,6 +94,8 @@ var _ = BeforeSuite(func() {
}).SetupWithManager(k8sManager)
Expect(err).ToNot(HaveOccurred())

emitEvent = EmitEventFuncs{k8sManager.GetEventRecorderFor("druid-operator")}

go func() {
defer GinkgoRecover()
err = k8sManager.Start(ctx)
Expand Down
Loading
Loading