From 179a385890fa4354ba0400e133e5348be1c8057a Mon Sep 17 00:00:00 2001 From: Nigel Foucha Date: Thu, 7 Nov 2024 15:20:31 -0500 Subject: [PATCH] feat(operator): update pod definition and nifi reconciliation for istio compatibility this pushes the zk connectivity check into the pod command instead of the init container and removes istio overrides from reconciliation checks relates to #172 --- pkg/resources/nifi/nifi.go | 57 ++++++++++++++++++++++++++++-- pkg/resources/nifi/pod.go | 72 ++++++++++++++++---------------------- 2 files changed, 85 insertions(+), 44 deletions(-) diff --git a/pkg/resources/nifi/nifi.go b/pkg/resources/nifi/nifi.go index 80c821d775..2111eeaebb 100644 --- a/pkg/resources/nifi/nifi.go +++ b/pkg/resources/nifi/nifi.go @@ -661,6 +661,12 @@ func (r *Reconciler) reconcileNifiPod(log zap.Logger, desiredPod *corev1.Pod) (e } if err == nil { + // k8s-objectmatcher options + opts := []patch.CalculateOption{ + patch.IgnoreStatusFields(), + patch.IgnoreVolumeClaimTemplateTypeMetaAndStatus(), + patch.IgnorePDBSelector(), + } // Since toleration does not support patchStrategy:"merge,retainKeys", we need to add all toleration from the current pod if the toleration is set in the CR if len(desiredPod.Spec.Tolerations) > 0 { desiredPod.Spec.Tolerations = append(desiredPod.Spec.Tolerations, currentPod.Spec.Tolerations...) @@ -674,8 +680,55 @@ func (r *Reconciler) reconcileNifiPod(log zap.Logger, desiredPod *corev1.Pod) (e } desiredPod.Spec.Tolerations = uniqueTolerations } + // If there are extra initContainers from webhook injections we need to add them + if len(currentPod.Spec.InitContainers) > len(desiredPod.Spec.InitContainers) { + desiredPod.Spec.InitContainers = append(currentPod.Spec.InitContainers, desiredPod.Spec.InitContainers...) + uniqueContainers := []corev1.Container{} + keys := make(map[string]bool) + for _, c := range desiredPod.Spec.InitContainers { + if _, value := keys[c.Name]; !value { + keys[c.Name] = true + uniqueContainers = append(uniqueContainers, c) + } + } + desiredPod.Spec.InitContainers = uniqueContainers + } + // If there are extra containers from webhook injections we need to add them + if len(currentPod.Spec.Containers) > len(desiredPod.Spec.Containers) { + desiredPod.Spec.Containers = append(currentPod.Spec.Containers, desiredPod.Spec.Containers...) + uniqueContainers := []corev1.Container{} + keys := make(map[string]bool) + for _, c := range desiredPod.Spec.Containers { + if _, value := keys[c.Name]; !value { + keys[c.Name] = true + uniqueContainers = append(uniqueContainers, c) + } + } + desiredPod.Spec.Containers = uniqueContainers + } + // Remove problematic fields if istio + if _, ok := currentPod.Annotations["istio.io/rev"]; ok { + // Prometheus scrape port is overridden by istio injection + delete(currentPod.Annotations, "prometheus.io/port") + delete(desiredPod.Annotations, "prometheus.io/port") + // Liveness probe port is overridden by istio injection + desiredContainer := corev1.Container{} + for _, c := range desiredPod.Spec.Containers { + if c.Name == "nifi" { + desiredContainer = c + } + } + currentContainers := []corev1.Container{} + for _, c := range currentPod.Spec.Containers { + if c.Name == "nifi" { + c.LivenessProbe = desiredContainer.LivenessProbe + } + currentContainers = append(currentContainers, c) + } + currentPod.Spec.Containers = currentContainers + } // Check if the resource actually updated - patchResult, err := patch.DefaultPatchMaker.Calculate(currentPod, desiredPod) + patchResult, err := patch.DefaultPatchMaker.Calculate(currentPod, desiredPod, opts...) if err != nil { log.Error("could not match pod objects", zap.String("clusterName", r.NifiCluster.Name), @@ -697,7 +750,7 @@ func (r *Reconciler) reconcileNifiPod(log zap.Logger, desiredPod *corev1.Pod) (e log.Debug("pod resource is in sync", zap.String("clusterName", r.NifiCluster.Name), - zap.String("podName", desiredPod.Name)) + zap.String("podName", currentPod.Name)) return nil, k8sutil.PodReady(currentPod) } diff --git a/pkg/resources/nifi/pod.go b/pkg/resources/nifi/pod.go index 013fcfad02..f341698dfe 100644 --- a/pkg/resources/nifi/pod.go +++ b/pkg/resources/nifi/pod.go @@ -95,46 +95,6 @@ func (r *Reconciler) pod(node v1.Node, nodeConfig *v1.NodeConfig, pvcs []corev1. }...) podInitContainers := r.injectAdditionalFields(nodeConfig, initContainers) - if len(zkAddress) > 0 { - podInitContainers = r.injectAdditionalFields(nodeConfig, append(initContainers, []corev1.Container{ - { - Name: "zookeeper", - Image: r.NifiCluster.Spec.GetInitContainerImage(), - ImagePullPolicy: nodeConfig.GetImagePullPolicy(), - Env: []corev1.EnvVar{ - { - Name: "ZK_ADDRESS", - Value: zkAddress, - }, - }, - // The zookeeper init check here just ensures that at least one configured zookeeper host is alive - Command: []string{"bash", "-c", ` -set -e -echo "Trying to contact Zookeeper using connection string: ${ZK_ADDRESS}" - -connected=0 -IFS=',' read -r -a zk_hosts <<< "${ZK_ADDRESS}" -until [ $connected -eq 1 ] -do -for zk_host in "${zk_hosts[@]}" -do - IFS=':' read -r -a zk_host_port <<< "${zk_host}" - - echo "Checking Zookeeper Host: [${zk_host_port[0]}] Port: [${zk_host_port[1]}]" - nc -vzw 1 ${zk_host_port[0]} ${zk_host_port[1]} - if [ $? -eq 0 ]; then - echo "Connected to ${zk_host_port}" - connected=1 - fi -done - -sleep 1 -done -`}, - Resources: generateInitContainerResources(), - }, - }...)) - } sort.Slice(podVolumes, func(i, j int) bool { return podVolumes[i].Name < podVolumes[j].Name @@ -534,12 +494,40 @@ do notMatchedIp=false fi done -echo "Hostname is successfully binded withy IP address"`, nodeAddress, nodeAddress) +echo "Hostname is successfully binded with IP address"`, nodeAddress, nodeAddress) + } + + zkResolve := "" + if len(zkAddress) > 0 { + zkResolve = `echo "Trying to contact Zookeeper using connection string: ${NIFI_ZOOKEEPER_CONNECT_STRING}" + +connected=0 +IFS=',' read -r -a zk_hosts <<< "${NIFI_ZOOKEEPER_CONNECT_STRING}" +until [ $connected -eq 1 ] +do + for zk_host in "${zk_hosts[@]}" + do + IFS=':' read -r -a zk_host_port <<< "${zk_host}" + + echo "Checking Zookeeper Host: [${zk_host_port[0]}] Port: [${zk_host_port[1]}]" + set +e + curl --telnet-option 'BOGUS=1' --connect-timeout 2 -s telnet://${zk_host_port[0]}:${zk_host_port[1]} < /dev/null + if [ $? -eq 48 ]; then + echo "Connected to ${zk_host_port}" + connected=1 + fi + set -e + done + + sleep 1 +done` } + command := []string{"bash", "-ce", fmt.Sprintf(`cp ${NIFI_HOME}/tmp/* ${NIFI_HOME}/conf/ %s %s -exec bin/nifi.sh run`, resolveIp, singleUser)} +%s +exec bin/nifi.sh run`, zkResolve, resolveIp, singleUser)} return corev1.Container{ Name: ContainerName,