Skip to content

Commit

Permalink
Merge pull request #303 from beagles/predictable_ips
Browse files Browse the repository at this point in the history
Predictable ips
  • Loading branch information
openshift-merge-bot[bot] authored Jun 12, 2024
2 parents 5414e8c + 7e38b2c commit ac0c17f
Show file tree
Hide file tree
Showing 12 changed files with 433 additions and 79 deletions.
85 changes: 10 additions & 75 deletions controllers/amphoracontroller_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package controllers
import (
"context"
"fmt"
"sort"
"strings"
"time"

Expand All @@ -44,7 +43,6 @@ import (
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
k8s_errors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
Expand Down Expand Up @@ -577,46 +575,23 @@ func (r *OctaviaAmphoraControllerReconciler) generateServiceConfigMaps(
err.Error()))
return err
}

//
// TODO(beagles): Improve this with predictable IPs for the health managers because what is
// going to happen on start up is that health managers will restart each time a new one is deployed.
// The easiest strategy is to create a "hole" in the IP address range and control the
// allocation and configuration of an additional IP on each network attached interface. We will
// need a container in the Pod that has the ip command installed to do this however.
// Get the predicatable IPs from the HmConfigMap
//
healthManagerIPs, err := getPodIPs(
fmt.Sprintf("%s-%s", "octavia", octaviav1.HealthManager),
instance.Namespace,
r.Kclient,
&r.Log,
)
hmMap := &corev1.ConfigMap{}
err = helper.GetClient().Get(ctx, types.NamespacedName{Name: octavia.HmConfigMap, Namespace: instance.GetNamespace()}, hmMap)
if err != nil {
instance.Status.Conditions.Set(condition.FalseCondition(
condition.InputReadyCondition,
condition.ErrorReason,
condition.SeverityWarning,
condition.InputReadyErrorMessage,
err.Error()))
return err
}

// TODO(beagles): come up with a way to preallocate or ensure
// a stable list of IPs.

if instance.Spec.Role == octaviav1.HealthManager {
// TODO(gthiemonge) This is fine to leave this list empty in the HM when
// we use redis, because the HM doesn't create any LBs, but if we drop
// redis, failovers will be triggered in the HM
templateParameters["ControllerIPList"] = ""
} else if len(healthManagerIPs) == 0 {
return fmt.Errorf("Health manager ports are not ready yet")
} else {
withPorts := make([]string, len(healthManagerIPs))
for idx, val := range healthManagerIPs {
withPorts[idx] = fmt.Sprintf("%s:5555", val)
var ipAddresses []string
for key, val := range hmMap.Data {
if strings.HasPrefix(key, "hm_") {
ipAddresses = append(ipAddresses, fmt.Sprintf("%s:5555", val))
}
templateParameters["ControllerIPList"] = strings.Join(withPorts, ",")
}
ipAddressString := strings.Join(ipAddresses, ",")
templateParameters["ControllerIPList"] = ipAddressString

spec := instance.Spec
templateParameters["ServiceUser"] = spec.ServiceUser
Expand Down Expand Up @@ -727,46 +702,6 @@ func (r *OctaviaAmphoraControllerReconciler) SetupWithManager(mgr ctrl.Manager)
Complete(r)
}

func listHealthManagerPods(name string, ns string, client kubernetes.Interface, log *logr.Logger) (*corev1.PodList, error) {
listOptions := metav1.ListOptions{
LabelSelector: fmt.Sprintf("%s=%s", common.AppSelector, name),
FieldSelector: "status.phase==Running",
}
log.Info(fmt.Sprintf("Listing pods using label selector %s and field selector %s", listOptions.LabelSelector, listOptions.FieldSelector))
pods, err := client.CoreV1().Pods(ns).List(context.Background(), listOptions)
if err != nil {
return nil, err
}
return pods, nil
}

func getPodIPs(name string, ns string, client kubernetes.Interface, log *logr.Logger) ([]string, error) {
//
// Get the IPs for the network attachments for these PODs.
//
var result []string
pods, err := listHealthManagerPods(name, ns, client, log)
if err != nil {
return nil, err
}
for _, pod := range pods.Items {
annotations := pod.GetAnnotations()
networkStatusList, err := nad.GetNetworkStatusFromAnnotation(annotations)
if err != nil {
log.Error(err, fmt.Sprintf("Unable to get network annotations from %s", annotations))
return nil, err
}
for _, networkStatus := range networkStatusList {
netAttachName := fmt.Sprintf("%s/%s", ns, octavia.LbNetworkAttachmentName)
if networkStatus.Name == netAttachName {
result = append(result, networkStatus.IPs[0])
}
}
}
sort.Strings(result)
return result, nil
}

func (r *OctaviaAmphoraControllerReconciler) findObjectsForSrc(ctx context.Context, src client.Object) []reconcile.Request {
requests := []reconcile.Request{}

Expand Down
104 changes: 104 additions & 0 deletions controllers/octavia_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ import (
octaviav1 "github.com/openstack-k8s-operators/octavia-operator/api/v1beta1"
"github.com/openstack-k8s-operators/octavia-operator/pkg/octavia"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/intstr"

appsv1 "k8s.io/api/apps/v1"
Expand Down Expand Up @@ -675,6 +676,109 @@ func (r *OctaviaReconciler) reconcileNormal(ctx context.Context, instance *octav
return ctrl.Result{}, err
}

nodeConfigMap := &corev1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Name: octavia.HmConfigMap,
Namespace: instance.GetNamespace(),
Labels: labels.GetLabels(instance, labels.GetGroupLabel(instance.ObjectMeta.Name), map[string]string{}),
},
Data: make(map[string]string),
}

// Look for existing config map and if exists, read existing data and match
// against nodes.
foundMap := &corev1.ConfigMap{}
err = helper.GetClient().Get(ctx, types.NamespacedName{Name: octavia.HmConfigMap, Namespace: instance.GetNamespace()},
foundMap)
if err != nil {
if k8s_errors.IsNotFound(err) {
Log.Info(fmt.Sprintf("Port map %s doesn't exist, creating.", octavia.HmConfigMap))
} else {
return ctrl.Result{}, err
}
} else {
Log.Info("Retrieved existing map, updating..")
nodeConfigMap.Data = foundMap.Data
}

//
// Predictable IPs.
//
// NOTE(beagles): refactoring this might be nice. This could also be
// optimized but the data sets are small (nodes an IP ranges are less than
// 100) so optimization might be a waste.
//
predictableIPParams, err := octavia.GetPredictableIPAM(networkParameters)
if err != nil {
return ctrl.Result{}, err
}
// Get a list of the nodes in the cluster

// TODO(beagles):
// * confirm whether or not this lists only the nodes we want (i.e. ones
// that will host the daemonset)
// * do we want to provide a mechanism to temporarily disabling this list
// for maintenance windows where nodes might be "coming and going"

nodes, _ := helper.GetKClient().CoreV1().Nodes().List(ctx, metav1.ListOptions{})
updatedMap := make(map[string]string)
allocatedIPs := make(map[string]bool)
var predictableIPsRequired []string

// First scan existing allocations so we can keep existing allocations.
// Keeping track of what's required and what already exists. If a node is
// removed from the cluster, it's IPs will not be added to the allocated
// list and are effectively recycled.
for _, node := range nodes.Items {
Log.Info(fmt.Sprintf("cluster node name %s", node.Name))
portName := fmt.Sprintf("hm_%s", node.Name)
if ipValue, ok := nodeConfigMap.Data[portName]; ok {
updatedMap[portName] = ipValue
allocatedIPs[ipValue] = true
Log.Info(fmt.Sprintf("%s has IP mapping %s: %s", node.Name, portName, ipValue))
} else {
predictableIPsRequired = append(predictableIPsRequired, portName)
}
portName = fmt.Sprintf("rsyslog_%s", node.Name)
if ipValue, ok := nodeConfigMap.Data[portName]; ok {
updatedMap[portName] = ipValue
allocatedIPs[ipValue] = true
Log.Info(fmt.Sprintf("%s has IP mapping %s: %s", node.Name, portName, ipValue))
} else {
predictableIPsRequired = append(predictableIPsRequired, portName)
}
}
// Get new IPs using the range from predictableIPParmas minus the
// allocatedIPs captured above.
Log.Info(fmt.Sprintf("Allocating %d predictable IPs", len(predictableIPsRequired)))
for _, portName := range predictableIPsRequired {
hmPort, err := octavia.GetNextIP(predictableIPParams, allocatedIPs)
if err != nil {
// An error here is really unexpected- it means either we have
// messed up the allocatedIPs list or the range we are assuming is
// too small for the number of health managers and rsyslog
// containers.
return ctrl.Result{}, err
}
updatedMap[portName] = hmPort
}

mapLabels := labels.GetLabels(instance, labels.GetGroupLabel(instance.ObjectMeta.Name), map[string]string{})
_, err = controllerutil.CreateOrPatch(ctx, helper.GetClient(), nodeConfigMap, func() error {
nodeConfigMap.Labels = util.MergeStringMaps(nodeConfigMap.Labels, mapLabels)
nodeConfigMap.Data = updatedMap
err := controllerutil.SetControllerReference(instance, nodeConfigMap, helper.GetScheme())
if err != nil {
return err
}
return nil
})

if err != nil {
Log.Info("Unable to create config map for health manager ports...")
return ctrl.Result{}, err
}

octaviaHealthManager, op, err := r.amphoraControllerDaemonSetCreateOrUpdate(instance, networkInfo,
ampImageOwnerID, instance.Spec.OctaviaHealthManager, octaviav1.HealthManager)
if err != nil {
Expand Down
5 changes: 3 additions & 2 deletions pkg/amphoracontrollers/daemonset.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,12 @@ func DaemonSet(

// The API pod has an extra volume so the API and the provider agent can
// communicate with each other.
volumes := octavia.GetVolumes(instance.Name)
volumes := GetVolumes(instance.Name)
parentOctaviaName := octavia.GetOwningOctaviaControllerName(instance)
certsSecretName := fmt.Sprintf("%s-certs-secret", parentOctaviaName)
volumes = append(volumes, GetCertVolume(certsSecretName)...)

volumeMounts := octavia.GetVolumeMounts(serviceName)
volumeMounts := GetVolumeMounts(serviceName)
volumeMounts = append(volumeMounts, GetCertVolumeMount()...)

livenessProbe := &corev1.Probe{
Expand Down Expand Up @@ -83,6 +83,7 @@ func DaemonSet(

envVars["KOLLA_CONFIG_STRATEGY"] = env.SetValue("COPY_ALWAYS")
envVars["CONFIG_HASH"] = env.SetValue(configHash)
envVars["NODE_NAME"] = env.DownwardAPI("spec.nodeName")

envVars["MGMT_CIDR"] = env.SetValue(instance.Spec.OctaviaProviderSubnetCIDR)
envVars["MGMT_GATEWAY"] = env.SetValue(instance.Spec.OctaviaProviderSubnetGateway)
Expand Down
31 changes: 31 additions & 0 deletions pkg/amphoracontrollers/volumes.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ package amphoracontrollers

import (
corev1 "k8s.io/api/core/v1"

"github.com/openstack-k8s-operators/octavia-operator/pkg/octavia"
)

const (
Expand All @@ -28,6 +30,35 @@ var (
configMode int32 = 0644
)

func GetVolumes(name string) []corev1.Volume {
var config0640AccessMode int32 = 0640
return append(
octavia.GetVolumes(name),
corev1.Volume{
Name: "hm-ports",
VolumeSource: corev1.VolumeSource{
ConfigMap: &corev1.ConfigMapVolumeSource{
LocalObjectReference: corev1.LocalObjectReference{
Name: octavia.HmConfigMap,
},
DefaultMode: &config0640AccessMode,
},
},
},
)
}

func GetVolumeMounts(serviceName string) []corev1.VolumeMount {
return append(
octavia.GetVolumeMounts(serviceName),
corev1.VolumeMount{
Name: "hm-ports",
MountPath: "/var/lib/hmports",
ReadOnly: true,
},
)
}

// GetCertVolume - service volumes
func GetCertVolume(certSecretName string) []corev1.Volume {
return []corev1.Volume{
Expand Down
3 changes: 3 additions & 0 deletions pkg/octavia/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,4 +49,7 @@ const (

// AdminTenantName
AdminTenant = "admin"

// HmConfigMap ...
HmConfigMap = "octavia-hmport-map"
)
39 changes: 39 additions & 0 deletions pkg/octavia/lb_mgmt_network.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ import (
octaviav1 "github.com/openstack-k8s-operators/octavia-operator/api/v1beta1"
)

// NetworkProvisioningSummary -
// Type for conveying the results of the EnsureAmphoraManagementNetwork call.
type NetworkProvisioningSummary struct {
TenantNetworkID string
TenantSubnetID string
Expand Down Expand Up @@ -897,3 +899,40 @@ func EnsureAmphoraManagementNetwork(
ManagementSubnetGateway: networkParameters.ProviderGateway.String(),
}, nil
}

// GetPredictableIPAM returns a struct describing the available IP range. If the
// IP pool size does not fit in given networkParameters CIDR it will return an
// error instead.
func GetPredictableIPAM(networkParameters *NetworkParameters) (*NADIpam, error) {
predParams := &NADIpam{}
predParams.CIDR = networkParameters.ProviderCIDR
predParams.RangeStart = networkParameters.ProviderAllocationEnd.Next()
endRange := predParams.RangeStart
for i := 0; i < LbProvPredictablePoolSize; i++ {
if !predParams.CIDR.Contains(endRange) {
return nil, fmt.Errorf("predictable IPs: cannot allocate %d IP addresses in %s", LbProvPredictablePoolSize, predParams.CIDR)
}
endRange = endRange.Next()
}
predParams.RangeEnd = endRange
return predParams, nil
}

// GetNextIP picks the next available IP from the range defined by a NADIpam,
// skipping ones that are already used appear as keys in the currentValues map.
func GetNextIP(predParams *NADIpam, currentValues map[string]bool) (string, error) {
candidateAddress := predParams.RangeStart
for alloced := true; alloced; {

if _, ok := currentValues[candidateAddress.String()]; ok {
if candidateAddress == predParams.RangeEnd {
return "", fmt.Errorf("predictable IPs: out of available addresses")
}
candidateAddress = candidateAddress.Next()
} else {
alloced = false
}
}
currentValues[candidateAddress.String()] = true
return candidateAddress.String(), nil
}
3 changes: 3 additions & 0 deletions pkg/octavia/network_consts.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,9 @@ const (
// LbProvSubnetPoolSize -
LbProvSubnetPoolSize = 25

// LbProvPredictablePoolSize -
LbProvPredictablePoolSize = 25

// IPv4 consts

// TODO(beagles): support IPv6 for the provider network.
Expand Down
Loading

0 comments on commit ac0c17f

Please sign in to comment.