From a53ef52d2e8742780511817585122e47a50aaedf Mon Sep 17 00:00:00 2001 From: Nigel Foucha <73838612+nfoucha@users.noreply.github.com> Date: Fri, 15 Nov 2024 10:43:11 -0500 Subject: [PATCH] feat(operator): update pod definition and nifi reconciliation for istio compatibility (#476) * feat(operator): update pod definition and nifi reconciliation for istio compatibility this pushes the zk connectivity check into the pod command instead of the init container and removes istio overrides from reconciliation checks relates to #172 * fix ordering for container list merge to prefer incoming spec * update CHANGELOG.md * add more details to CHANGELOG.md * Update CHANGELOG.md Co-authored-by: Juldrixx <31806759+juldrixx@users.noreply.github.com> --------- Co-authored-by: Juldrixx <31806759+juldrixx@users.noreply.github.com> --- CHANGELOG.md | 6 ++++ pkg/resources/nifi/nifi.go | 57 ++++++++++++++++++++++++++++-- pkg/resources/nifi/pod.go | 72 ++++++++++++++++---------------------- 3 files changed, 91 insertions(+), 44 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index eb81b1b562..bfb35645aa 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,10 +2,16 @@ ### Added +- [PR #476](https://github.com/konpyutaika/nifikop/pull/476) - **[Operator/NifiCluster]** Added logic to include injected containers and init containers in desired pod spec. + ### Changed +- [PR #476](https://github.com/konpyutaika/nifikop/pull/476) - **[Operator/NifiCluster]** Move Zookeeper connectivity check from init container to main container. + ### Fixed Bugs +- [PR #476](https://github.com/konpyutaika/nifikop/pull/476) - **[Operator/NifiCluster]** Fixed istio init race condition and pod reconciliation. + ### Deprecated ### Removed diff --git a/pkg/resources/nifi/nifi.go b/pkg/resources/nifi/nifi.go index 80c821d775..c758c6e280 100644 --- a/pkg/resources/nifi/nifi.go +++ b/pkg/resources/nifi/nifi.go @@ -661,6 +661,12 @@ func (r *Reconciler) reconcileNifiPod(log zap.Logger, desiredPod *corev1.Pod) (e } if err == nil { + // k8s-objectmatcher options + opts := []patch.CalculateOption{ + patch.IgnoreStatusFields(), + patch.IgnoreVolumeClaimTemplateTypeMetaAndStatus(), + patch.IgnorePDBSelector(), + } // Since toleration does not support patchStrategy:"merge,retainKeys", we need to add all toleration from the current pod if the toleration is set in the CR if len(desiredPod.Spec.Tolerations) > 0 { desiredPod.Spec.Tolerations = append(desiredPod.Spec.Tolerations, currentPod.Spec.Tolerations...) @@ -674,8 +680,55 @@ func (r *Reconciler) reconcileNifiPod(log zap.Logger, desiredPod *corev1.Pod) (e } desiredPod.Spec.Tolerations = uniqueTolerations } + // If there are extra initContainers from webhook injections we need to add them + if len(currentPod.Spec.InitContainers) > len(desiredPod.Spec.InitContainers) { + desiredPod.Spec.InitContainers = append(desiredPod.Spec.InitContainers, currentPod.Spec.InitContainers...) + uniqueContainers := []corev1.Container{} + keys := make(map[string]bool) + for _, c := range desiredPod.Spec.InitContainers { + if _, value := keys[c.Name]; !value { + keys[c.Name] = true + uniqueContainers = append(uniqueContainers, c) + } + } + desiredPod.Spec.InitContainers = uniqueContainers + } + // If there are extra containers from webhook injections we need to add them + if len(currentPod.Spec.Containers) > len(desiredPod.Spec.Containers) { + desiredPod.Spec.Containers = append(desiredPod.Spec.Containers, currentPod.Spec.Containers...) + uniqueContainers := []corev1.Container{} + keys := make(map[string]bool) + for _, c := range desiredPod.Spec.Containers { + if _, value := keys[c.Name]; !value { + keys[c.Name] = true + uniqueContainers = append(uniqueContainers, c) + } + } + desiredPod.Spec.Containers = uniqueContainers + } + // Remove problematic fields if istio + if _, ok := currentPod.Annotations["istio.io/rev"]; ok { + // Prometheus scrape port is overridden by istio injection + delete(currentPod.Annotations, "prometheus.io/port") + delete(desiredPod.Annotations, "prometheus.io/port") + // Liveness probe port is overridden by istio injection + desiredContainer := corev1.Container{} + for _, c := range desiredPod.Spec.Containers { + if c.Name == "nifi" { + desiredContainer = c + } + } + currentContainers := []corev1.Container{} + for _, c := range currentPod.Spec.Containers { + if c.Name == "nifi" { + c.LivenessProbe = desiredContainer.LivenessProbe + } + currentContainers = append(currentContainers, c) + } + currentPod.Spec.Containers = currentContainers + } // Check if the resource actually updated - patchResult, err := patch.DefaultPatchMaker.Calculate(currentPod, desiredPod) + patchResult, err := patch.DefaultPatchMaker.Calculate(currentPod, desiredPod, opts...) if err != nil { log.Error("could not match pod objects", zap.String("clusterName", r.NifiCluster.Name), @@ -697,7 +750,7 @@ func (r *Reconciler) reconcileNifiPod(log zap.Logger, desiredPod *corev1.Pod) (e log.Debug("pod resource is in sync", zap.String("clusterName", r.NifiCluster.Name), - zap.String("podName", desiredPod.Name)) + zap.String("podName", currentPod.Name)) return nil, k8sutil.PodReady(currentPod) } diff --git a/pkg/resources/nifi/pod.go b/pkg/resources/nifi/pod.go index 013fcfad02..f341698dfe 100644 --- a/pkg/resources/nifi/pod.go +++ b/pkg/resources/nifi/pod.go @@ -95,46 +95,6 @@ func (r *Reconciler) pod(node v1.Node, nodeConfig *v1.NodeConfig, pvcs []corev1. }...) podInitContainers := r.injectAdditionalFields(nodeConfig, initContainers) - if len(zkAddress) > 0 { - podInitContainers = r.injectAdditionalFields(nodeConfig, append(initContainers, []corev1.Container{ - { - Name: "zookeeper", - Image: r.NifiCluster.Spec.GetInitContainerImage(), - ImagePullPolicy: nodeConfig.GetImagePullPolicy(), - Env: []corev1.EnvVar{ - { - Name: "ZK_ADDRESS", - Value: zkAddress, - }, - }, - // The zookeeper init check here just ensures that at least one configured zookeeper host is alive - Command: []string{"bash", "-c", ` -set -e -echo "Trying to contact Zookeeper using connection string: ${ZK_ADDRESS}" - -connected=0 -IFS=',' read -r -a zk_hosts <<< "${ZK_ADDRESS}" -until [ $connected -eq 1 ] -do -for zk_host in "${zk_hosts[@]}" -do - IFS=':' read -r -a zk_host_port <<< "${zk_host}" - - echo "Checking Zookeeper Host: [${zk_host_port[0]}] Port: [${zk_host_port[1]}]" - nc -vzw 1 ${zk_host_port[0]} ${zk_host_port[1]} - if [ $? -eq 0 ]; then - echo "Connected to ${zk_host_port}" - connected=1 - fi -done - -sleep 1 -done -`}, - Resources: generateInitContainerResources(), - }, - }...)) - } sort.Slice(podVolumes, func(i, j int) bool { return podVolumes[i].Name < podVolumes[j].Name @@ -534,12 +494,40 @@ do notMatchedIp=false fi done -echo "Hostname is successfully binded withy IP address"`, nodeAddress, nodeAddress) +echo "Hostname is successfully binded with IP address"`, nodeAddress, nodeAddress) + } + + zkResolve := "" + if len(zkAddress) > 0 { + zkResolve = `echo "Trying to contact Zookeeper using connection string: ${NIFI_ZOOKEEPER_CONNECT_STRING}" + +connected=0 +IFS=',' read -r -a zk_hosts <<< "${NIFI_ZOOKEEPER_CONNECT_STRING}" +until [ $connected -eq 1 ] +do + for zk_host in "${zk_hosts[@]}" + do + IFS=':' read -r -a zk_host_port <<< "${zk_host}" + + echo "Checking Zookeeper Host: [${zk_host_port[0]}] Port: [${zk_host_port[1]}]" + set +e + curl --telnet-option 'BOGUS=1' --connect-timeout 2 -s telnet://${zk_host_port[0]}:${zk_host_port[1]} < /dev/null + if [ $? -eq 48 ]; then + echo "Connected to ${zk_host_port}" + connected=1 + fi + set -e + done + + sleep 1 +done` } + command := []string{"bash", "-ce", fmt.Sprintf(`cp ${NIFI_HOME}/tmp/* ${NIFI_HOME}/conf/ %s %s -exec bin/nifi.sh run`, resolveIp, singleUser)} +%s +exec bin/nifi.sh run`, zkResolve, resolveIp, singleUser)} return corev1.Container{ Name: ContainerName,