diff --git a/CHANGELOG.md b/CHANGELOG.md index eb81b1b562..bfb35645aa 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,10 +2,16 @@ ### Added +- [PR #476](https://github.com/konpyutaika/nifikop/pull/476) - **[Operator/NifiCluster]** Added logic to include injected containers and init containers in desired pod spec. + ### Changed +- [PR #476](https://github.com/konpyutaika/nifikop/pull/476) - **[Operator/NifiCluster]** Move Zookeeper connectivity check from init container to main container. + ### Fixed Bugs +- [PR #476](https://github.com/konpyutaika/nifikop/pull/476) - **[Operator/NifiCluster]** Fixed istio init race condition and pod reconciliation. + ### Deprecated ### Removed diff --git a/pkg/resources/nifi/nifi.go b/pkg/resources/nifi/nifi.go index 80c821d775..c758c6e280 100644 --- a/pkg/resources/nifi/nifi.go +++ b/pkg/resources/nifi/nifi.go @@ -661,6 +661,12 @@ func (r *Reconciler) reconcileNifiPod(log zap.Logger, desiredPod *corev1.Pod) (e } if err == nil { + // k8s-objectmatcher options + opts := []patch.CalculateOption{ + patch.IgnoreStatusFields(), + patch.IgnoreVolumeClaimTemplateTypeMetaAndStatus(), + patch.IgnorePDBSelector(), + } // Since toleration does not support patchStrategy:"merge,retainKeys", we need to add all toleration from the current pod if the toleration is set in the CR if len(desiredPod.Spec.Tolerations) > 0 { desiredPod.Spec.Tolerations = append(desiredPod.Spec.Tolerations, currentPod.Spec.Tolerations...) @@ -674,8 +680,55 @@ func (r *Reconciler) reconcileNifiPod(log zap.Logger, desiredPod *corev1.Pod) (e } desiredPod.Spec.Tolerations = uniqueTolerations } + // If there are extra initContainers from webhook injections we need to add them + if len(currentPod.Spec.InitContainers) > len(desiredPod.Spec.InitContainers) { + desiredPod.Spec.InitContainers = append(desiredPod.Spec.InitContainers, currentPod.Spec.InitContainers...) + uniqueContainers := []corev1.Container{} + keys := make(map[string]bool) + for _, c := range desiredPod.Spec.InitContainers { + if _, value := keys[c.Name]; !value { + keys[c.Name] = true + uniqueContainers = append(uniqueContainers, c) + } + } + desiredPod.Spec.InitContainers = uniqueContainers + } + // If there are extra containers from webhook injections we need to add them + if len(currentPod.Spec.Containers) > len(desiredPod.Spec.Containers) { + desiredPod.Spec.Containers = append(desiredPod.Spec.Containers, currentPod.Spec.Containers...) + uniqueContainers := []corev1.Container{} + keys := make(map[string]bool) + for _, c := range desiredPod.Spec.Containers { + if _, value := keys[c.Name]; !value { + keys[c.Name] = true + uniqueContainers = append(uniqueContainers, c) + } + } + desiredPod.Spec.Containers = uniqueContainers + } + // Remove problematic fields if istio + if _, ok := currentPod.Annotations["istio.io/rev"]; ok { + // Prometheus scrape port is overridden by istio injection + delete(currentPod.Annotations, "prometheus.io/port") + delete(desiredPod.Annotations, "prometheus.io/port") + // Liveness probe port is overridden by istio injection + desiredContainer := corev1.Container{} + for _, c := range desiredPod.Spec.Containers { + if c.Name == "nifi" { + desiredContainer = c + } + } + currentContainers := []corev1.Container{} + for _, c := range currentPod.Spec.Containers { + if c.Name == "nifi" { + c.LivenessProbe = desiredContainer.LivenessProbe + } + currentContainers = append(currentContainers, c) + } + currentPod.Spec.Containers = currentContainers + } // Check if the resource actually updated - patchResult, err := patch.DefaultPatchMaker.Calculate(currentPod, desiredPod) + patchResult, err := patch.DefaultPatchMaker.Calculate(currentPod, desiredPod, opts...) if err != nil { log.Error("could not match pod objects", zap.String("clusterName", r.NifiCluster.Name), @@ -697,7 +750,7 @@ func (r *Reconciler) reconcileNifiPod(log zap.Logger, desiredPod *corev1.Pod) (e log.Debug("pod resource is in sync", zap.String("clusterName", r.NifiCluster.Name), - zap.String("podName", desiredPod.Name)) + zap.String("podName", currentPod.Name)) return nil, k8sutil.PodReady(currentPod) } diff --git a/pkg/resources/nifi/pod.go b/pkg/resources/nifi/pod.go index 013fcfad02..f341698dfe 100644 --- a/pkg/resources/nifi/pod.go +++ b/pkg/resources/nifi/pod.go @@ -95,46 +95,6 @@ func (r *Reconciler) pod(node v1.Node, nodeConfig *v1.NodeConfig, pvcs []corev1. }...) podInitContainers := r.injectAdditionalFields(nodeConfig, initContainers) - if len(zkAddress) > 0 { - podInitContainers = r.injectAdditionalFields(nodeConfig, append(initContainers, []corev1.Container{ - { - Name: "zookeeper", - Image: r.NifiCluster.Spec.GetInitContainerImage(), - ImagePullPolicy: nodeConfig.GetImagePullPolicy(), - Env: []corev1.EnvVar{ - { - Name: "ZK_ADDRESS", - Value: zkAddress, - }, - }, - // The zookeeper init check here just ensures that at least one configured zookeeper host is alive - Command: []string{"bash", "-c", ` -set -e -echo "Trying to contact Zookeeper using connection string: ${ZK_ADDRESS}" - -connected=0 -IFS=',' read -r -a zk_hosts <<< "${ZK_ADDRESS}" -until [ $connected -eq 1 ] -do -for zk_host in "${zk_hosts[@]}" -do - IFS=':' read -r -a zk_host_port <<< "${zk_host}" - - echo "Checking Zookeeper Host: [${zk_host_port[0]}] Port: [${zk_host_port[1]}]" - nc -vzw 1 ${zk_host_port[0]} ${zk_host_port[1]} - if [ $? -eq 0 ]; then - echo "Connected to ${zk_host_port}" - connected=1 - fi -done - -sleep 1 -done -`}, - Resources: generateInitContainerResources(), - }, - }...)) - } sort.Slice(podVolumes, func(i, j int) bool { return podVolumes[i].Name < podVolumes[j].Name @@ -534,12 +494,40 @@ do notMatchedIp=false fi done -echo "Hostname is successfully binded withy IP address"`, nodeAddress, nodeAddress) +echo "Hostname is successfully binded with IP address"`, nodeAddress, nodeAddress) + } + + zkResolve := "" + if len(zkAddress) > 0 { + zkResolve = `echo "Trying to contact Zookeeper using connection string: ${NIFI_ZOOKEEPER_CONNECT_STRING}" + +connected=0 +IFS=',' read -r -a zk_hosts <<< "${NIFI_ZOOKEEPER_CONNECT_STRING}" +until [ $connected -eq 1 ] +do + for zk_host in "${zk_hosts[@]}" + do + IFS=':' read -r -a zk_host_port <<< "${zk_host}" + + echo "Checking Zookeeper Host: [${zk_host_port[0]}] Port: [${zk_host_port[1]}]" + set +e + curl --telnet-option 'BOGUS=1' --connect-timeout 2 -s telnet://${zk_host_port[0]}:${zk_host_port[1]} < /dev/null + if [ $? -eq 48 ]; then + echo "Connected to ${zk_host_port}" + connected=1 + fi + set -e + done + + sleep 1 +done` } + command := []string{"bash", "-ce", fmt.Sprintf(`cp ${NIFI_HOME}/tmp/* ${NIFI_HOME}/conf/ %s %s -exec bin/nifi.sh run`, resolveIp, singleUser)} +%s +exec bin/nifi.sh run`, zkResolve, resolveIp, singleUser)} return corev1.Container{ Name: ContainerName,