Skip to content

Commit

Permalink
feat(operator): update pod definition and nifi reconciliation for ist…
Browse files Browse the repository at this point in the history
…io compatibility

this pushes the zk connectivity check into the pod command instead of the init container and removes istio overrides from reconciliation checks

relates to konpyutaika#172
  • Loading branch information
nfoucha committed Nov 15, 2024
1 parent bf5159e commit 179a385
Show file tree
Hide file tree
Showing 2 changed files with 85 additions and 44 deletions.
57 changes: 55 additions & 2 deletions pkg/resources/nifi/nifi.go
Original file line number Diff line number Diff line change
Expand Up @@ -661,6 +661,12 @@ func (r *Reconciler) reconcileNifiPod(log zap.Logger, desiredPod *corev1.Pod) (e
}

if err == nil {
// k8s-objectmatcher options
opts := []patch.CalculateOption{
patch.IgnoreStatusFields(),
patch.IgnoreVolumeClaimTemplateTypeMetaAndStatus(),
patch.IgnorePDBSelector(),
}
// Since toleration does not support patchStrategy:"merge,retainKeys", we need to add all toleration from the current pod if the toleration is set in the CR
if len(desiredPod.Spec.Tolerations) > 0 {
desiredPod.Spec.Tolerations = append(desiredPod.Spec.Tolerations, currentPod.Spec.Tolerations...)
Expand All @@ -674,8 +680,55 @@ func (r *Reconciler) reconcileNifiPod(log zap.Logger, desiredPod *corev1.Pod) (e
}
desiredPod.Spec.Tolerations = uniqueTolerations
}
// If there are extra initContainers from webhook injections we need to add them
if len(currentPod.Spec.InitContainers) > len(desiredPod.Spec.InitContainers) {
desiredPod.Spec.InitContainers = append(currentPod.Spec.InitContainers, desiredPod.Spec.InitContainers...)
uniqueContainers := []corev1.Container{}
keys := make(map[string]bool)
for _, c := range desiredPod.Spec.InitContainers {
if _, value := keys[c.Name]; !value {
keys[c.Name] = true
uniqueContainers = append(uniqueContainers, c)
}
}
desiredPod.Spec.InitContainers = uniqueContainers
}
// If there are extra containers from webhook injections we need to add them
if len(currentPod.Spec.Containers) > len(desiredPod.Spec.Containers) {
desiredPod.Spec.Containers = append(currentPod.Spec.Containers, desiredPod.Spec.Containers...)
uniqueContainers := []corev1.Container{}
keys := make(map[string]bool)
for _, c := range desiredPod.Spec.Containers {
if _, value := keys[c.Name]; !value {
keys[c.Name] = true
uniqueContainers = append(uniqueContainers, c)
}
}
desiredPod.Spec.Containers = uniqueContainers
}
// Remove problematic fields if istio
if _, ok := currentPod.Annotations["istio.io/rev"]; ok {
// Prometheus scrape port is overridden by istio injection
delete(currentPod.Annotations, "prometheus.io/port")
delete(desiredPod.Annotations, "prometheus.io/port")
// Liveness probe port is overridden by istio injection
desiredContainer := corev1.Container{}
for _, c := range desiredPod.Spec.Containers {
if c.Name == "nifi" {
desiredContainer = c
}
}
currentContainers := []corev1.Container{}
for _, c := range currentPod.Spec.Containers {
if c.Name == "nifi" {
c.LivenessProbe = desiredContainer.LivenessProbe
}
currentContainers = append(currentContainers, c)
}
currentPod.Spec.Containers = currentContainers
}
// Check if the resource actually updated
patchResult, err := patch.DefaultPatchMaker.Calculate(currentPod, desiredPod)
patchResult, err := patch.DefaultPatchMaker.Calculate(currentPod, desiredPod, opts...)
if err != nil {
log.Error("could not match pod objects",
zap.String("clusterName", r.NifiCluster.Name),
Expand All @@ -697,7 +750,7 @@ func (r *Reconciler) reconcileNifiPod(log zap.Logger, desiredPod *corev1.Pod) (e

log.Debug("pod resource is in sync",
zap.String("clusterName", r.NifiCluster.Name),
zap.String("podName", desiredPod.Name))
zap.String("podName", currentPod.Name))

return nil, k8sutil.PodReady(currentPod)
}
Expand Down
72 changes: 30 additions & 42 deletions pkg/resources/nifi/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,46 +95,6 @@ func (r *Reconciler) pod(node v1.Node, nodeConfig *v1.NodeConfig, pvcs []corev1.
}...)

podInitContainers := r.injectAdditionalFields(nodeConfig, initContainers)
if len(zkAddress) > 0 {
podInitContainers = r.injectAdditionalFields(nodeConfig, append(initContainers, []corev1.Container{
{
Name: "zookeeper",
Image: r.NifiCluster.Spec.GetInitContainerImage(),
ImagePullPolicy: nodeConfig.GetImagePullPolicy(),
Env: []corev1.EnvVar{
{
Name: "ZK_ADDRESS",
Value: zkAddress,
},
},
// The zookeeper init check here just ensures that at least one configured zookeeper host is alive
Command: []string{"bash", "-c", `
set -e
echo "Trying to contact Zookeeper using connection string: ${ZK_ADDRESS}"
connected=0
IFS=',' read -r -a zk_hosts <<< "${ZK_ADDRESS}"
until [ $connected -eq 1 ]
do
for zk_host in "${zk_hosts[@]}"
do
IFS=':' read -r -a zk_host_port <<< "${zk_host}"
echo "Checking Zookeeper Host: [${zk_host_port[0]}] Port: [${zk_host_port[1]}]"
nc -vzw 1 ${zk_host_port[0]} ${zk_host_port[1]}
if [ $? -eq 0 ]; then
echo "Connected to ${zk_host_port}"
connected=1
fi
done
sleep 1
done
`},
Resources: generateInitContainerResources(),
},
}...))
}

sort.Slice(podVolumes, func(i, j int) bool {
return podVolumes[i].Name < podVolumes[j].Name
Expand Down Expand Up @@ -534,12 +494,40 @@ do
notMatchedIp=false
fi
done
echo "Hostname is successfully binded withy IP address"`, nodeAddress, nodeAddress)
echo "Hostname is successfully binded with IP address"`, nodeAddress, nodeAddress)
}

zkResolve := ""
if len(zkAddress) > 0 {
zkResolve = `echo "Trying to contact Zookeeper using connection string: ${NIFI_ZOOKEEPER_CONNECT_STRING}"
connected=0
IFS=',' read -r -a zk_hosts <<< "${NIFI_ZOOKEEPER_CONNECT_STRING}"
until [ $connected -eq 1 ]
do
for zk_host in "${zk_hosts[@]}"
do
IFS=':' read -r -a zk_host_port <<< "${zk_host}"
echo "Checking Zookeeper Host: [${zk_host_port[0]}] Port: [${zk_host_port[1]}]"
set +e
curl --telnet-option 'BOGUS=1' --connect-timeout 2 -s telnet://${zk_host_port[0]}:${zk_host_port[1]} < /dev/null
if [ $? -eq 48 ]; then
echo "Connected to ${zk_host_port}"
connected=1
fi
set -e
done
sleep 1
done`
}

command := []string{"bash", "-ce", fmt.Sprintf(`cp ${NIFI_HOME}/tmp/* ${NIFI_HOME}/conf/
%s
%s
exec bin/nifi.sh run`, resolveIp, singleUser)}
%s
exec bin/nifi.sh run`, zkResolve, resolveIp, singleUser)}

return corev1.Container{
Name: ContainerName,
Expand Down

0 comments on commit 179a385

Please sign in to comment.