Skip to content

Commit

Permalink
feat(operator): update pod definition and nifi reconciliation for ist…
Browse files Browse the repository at this point in the history
…io compatibility (#476)

* feat(operator): update pod definition and nifi reconciliation for istio compatibility

this pushes the zk connectivity check into the pod command instead of the init container and removes istio overrides from reconciliation checks

relates to #172

* fix ordering for container list merge to prefer incoming spec

* update CHANGELOG.md

* add more details to CHANGELOG.md

* Update CHANGELOG.md

Co-authored-by: Juldrixx <[email protected]>

---------

Co-authored-by: Juldrixx <[email protected]>
  • Loading branch information
nfoucha and juldrixx authored Nov 15, 2024
1 parent bf5159e commit a53ef52
Show file tree
Hide file tree
Showing 3 changed files with 91 additions and 44 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,16 @@

### Added

- [PR #476](https://github.com/konpyutaika/nifikop/pull/476) - **[Operator/NifiCluster]** Added logic to include injected containers and init containers in desired pod spec.

### Changed

- [PR #476](https://github.com/konpyutaika/nifikop/pull/476) - **[Operator/NifiCluster]** Move Zookeeper connectivity check from init container to main container.

### Fixed Bugs

- [PR #476](https://github.com/konpyutaika/nifikop/pull/476) - **[Operator/NifiCluster]** Fixed istio init race condition and pod reconciliation.

### Deprecated

### Removed
Expand Down
57 changes: 55 additions & 2 deletions pkg/resources/nifi/nifi.go
Original file line number Diff line number Diff line change
Expand Up @@ -661,6 +661,12 @@ func (r *Reconciler) reconcileNifiPod(log zap.Logger, desiredPod *corev1.Pod) (e
}

if err == nil {
// k8s-objectmatcher options
opts := []patch.CalculateOption{
patch.IgnoreStatusFields(),
patch.IgnoreVolumeClaimTemplateTypeMetaAndStatus(),
patch.IgnorePDBSelector(),
}
// Since toleration does not support patchStrategy:"merge,retainKeys", we need to add all toleration from the current pod if the toleration is set in the CR
if len(desiredPod.Spec.Tolerations) > 0 {
desiredPod.Spec.Tolerations = append(desiredPod.Spec.Tolerations, currentPod.Spec.Tolerations...)
Expand All @@ -674,8 +680,55 @@ func (r *Reconciler) reconcileNifiPod(log zap.Logger, desiredPod *corev1.Pod) (e
}
desiredPod.Spec.Tolerations = uniqueTolerations
}
// If there are extra initContainers from webhook injections we need to add them
if len(currentPod.Spec.InitContainers) > len(desiredPod.Spec.InitContainers) {
desiredPod.Spec.InitContainers = append(desiredPod.Spec.InitContainers, currentPod.Spec.InitContainers...)
uniqueContainers := []corev1.Container{}
keys := make(map[string]bool)
for _, c := range desiredPod.Spec.InitContainers {
if _, value := keys[c.Name]; !value {
keys[c.Name] = true
uniqueContainers = append(uniqueContainers, c)
}
}
desiredPod.Spec.InitContainers = uniqueContainers
}
// If there are extra containers from webhook injections we need to add them
if len(currentPod.Spec.Containers) > len(desiredPod.Spec.Containers) {
desiredPod.Spec.Containers = append(desiredPod.Spec.Containers, currentPod.Spec.Containers...)
uniqueContainers := []corev1.Container{}
keys := make(map[string]bool)
for _, c := range desiredPod.Spec.Containers {
if _, value := keys[c.Name]; !value {
keys[c.Name] = true
uniqueContainers = append(uniqueContainers, c)
}
}
desiredPod.Spec.Containers = uniqueContainers
}
// Remove problematic fields if istio
if _, ok := currentPod.Annotations["istio.io/rev"]; ok {
// Prometheus scrape port is overridden by istio injection
delete(currentPod.Annotations, "prometheus.io/port")
delete(desiredPod.Annotations, "prometheus.io/port")
// Liveness probe port is overridden by istio injection
desiredContainer := corev1.Container{}
for _, c := range desiredPod.Spec.Containers {
if c.Name == "nifi" {
desiredContainer = c
}
}
currentContainers := []corev1.Container{}
for _, c := range currentPod.Spec.Containers {
if c.Name == "nifi" {
c.LivenessProbe = desiredContainer.LivenessProbe
}
currentContainers = append(currentContainers, c)
}
currentPod.Spec.Containers = currentContainers
}
// Check if the resource actually updated
patchResult, err := patch.DefaultPatchMaker.Calculate(currentPod, desiredPod)
patchResult, err := patch.DefaultPatchMaker.Calculate(currentPod, desiredPod, opts...)
if err != nil {
log.Error("could not match pod objects",
zap.String("clusterName", r.NifiCluster.Name),
Expand All @@ -697,7 +750,7 @@ func (r *Reconciler) reconcileNifiPod(log zap.Logger, desiredPod *corev1.Pod) (e

log.Debug("pod resource is in sync",
zap.String("clusterName", r.NifiCluster.Name),
zap.String("podName", desiredPod.Name))
zap.String("podName", currentPod.Name))

return nil, k8sutil.PodReady(currentPod)
}
Expand Down
72 changes: 30 additions & 42 deletions pkg/resources/nifi/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,46 +95,6 @@ func (r *Reconciler) pod(node v1.Node, nodeConfig *v1.NodeConfig, pvcs []corev1.
}...)

podInitContainers := r.injectAdditionalFields(nodeConfig, initContainers)
if len(zkAddress) > 0 {
podInitContainers = r.injectAdditionalFields(nodeConfig, append(initContainers, []corev1.Container{
{
Name: "zookeeper",
Image: r.NifiCluster.Spec.GetInitContainerImage(),
ImagePullPolicy: nodeConfig.GetImagePullPolicy(),
Env: []corev1.EnvVar{
{
Name: "ZK_ADDRESS",
Value: zkAddress,
},
},
// The zookeeper init check here just ensures that at least one configured zookeeper host is alive
Command: []string{"bash", "-c", `
set -e
echo "Trying to contact Zookeeper using connection string: ${ZK_ADDRESS}"
connected=0
IFS=',' read -r -a zk_hosts <<< "${ZK_ADDRESS}"
until [ $connected -eq 1 ]
do
for zk_host in "${zk_hosts[@]}"
do
IFS=':' read -r -a zk_host_port <<< "${zk_host}"
echo "Checking Zookeeper Host: [${zk_host_port[0]}] Port: [${zk_host_port[1]}]"
nc -vzw 1 ${zk_host_port[0]} ${zk_host_port[1]}
if [ $? -eq 0 ]; then
echo "Connected to ${zk_host_port}"
connected=1
fi
done
sleep 1
done
`},
Resources: generateInitContainerResources(),
},
}...))
}

sort.Slice(podVolumes, func(i, j int) bool {
return podVolumes[i].Name < podVolumes[j].Name
Expand Down Expand Up @@ -534,12 +494,40 @@ do
notMatchedIp=false
fi
done
echo "Hostname is successfully binded withy IP address"`, nodeAddress, nodeAddress)
echo "Hostname is successfully binded with IP address"`, nodeAddress, nodeAddress)
}

zkResolve := ""
if len(zkAddress) > 0 {
zkResolve = `echo "Trying to contact Zookeeper using connection string: ${NIFI_ZOOKEEPER_CONNECT_STRING}"
connected=0
IFS=',' read -r -a zk_hosts <<< "${NIFI_ZOOKEEPER_CONNECT_STRING}"
until [ $connected -eq 1 ]
do
for zk_host in "${zk_hosts[@]}"
do
IFS=':' read -r -a zk_host_port <<< "${zk_host}"
echo "Checking Zookeeper Host: [${zk_host_port[0]}] Port: [${zk_host_port[1]}]"
set +e
curl --telnet-option 'BOGUS=1' --connect-timeout 2 -s telnet://${zk_host_port[0]}:${zk_host_port[1]} < /dev/null
if [ $? -eq 48 ]; then
echo "Connected to ${zk_host_port}"
connected=1
fi
set -e
done
sleep 1
done`
}

command := []string{"bash", "-ce", fmt.Sprintf(`cp ${NIFI_HOME}/tmp/* ${NIFI_HOME}/conf/
%s
%s
exec bin/nifi.sh run`, resolveIp, singleUser)}
%s
exec bin/nifi.sh run`, zkResolve, resolveIp, singleUser)}

return corev1.Container{
Name: ContainerName,
Expand Down

0 comments on commit a53ef52

Please sign in to comment.