Skip to content

Commit

Permalink
feat: add flagd-proxy HA configuration (#712)
Browse files Browse the repository at this point in the history
Signed-off-by: Matthias Riegler <[email protected]>
  • Loading branch information
xvzf authored Oct 22, 2024
1 parent 99b1cd4 commit e115159
Show file tree
Hide file tree
Showing 12 changed files with 521 additions and 301 deletions.
205 changes: 139 additions & 66 deletions common/flagdproxy/flagdproxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,19 @@ import (
"golang.org/x/exp/maps"
appsV1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
policyv1 "k8s.io/api/policy/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/client-go/util/retry"
"sigs.k8s.io/controller-runtime/pkg/client"
)

const (
FlagdProxyDeploymentName = "flagd-proxy"
FlagdProxyServiceAccountName = "open-feature-operator-flagd-proxy"
FlagdProxyServiceName = "flagd-proxy-svc"
FlagdProxyDeploymentName = "flagd-proxy"
FlagdProxyServiceAccountName = "open-feature-operator-flagd-proxy"
FlagdProxyServiceName = "flagd-proxy-svc"
FlagdProxyPodDisruptionBudgetName = "flagd-proxy-pdb"
)

type FlagdProxyHandler struct {
Expand All @@ -37,6 +40,7 @@ type FlagdProxyConfiguration struct {
DebugLogging bool
Image string
Tag string
Replicas int
Namespace string
OperatorDeploymentName string
ImagePullSecrets []string
Expand All @@ -53,6 +57,7 @@ func NewFlagdProxyConfiguration(env types.EnvConfig, imagePullSecrets []string,
Port: env.FlagdProxyPort,
ManagementPort: env.FlagdProxyManagementPort,
DebugLogging: env.FlagdProxyDebugLogging,
Replicas: env.FlagdProxyReplicaCount,
ImagePullSecrets: imagePullSecrets,
Labels: labels,
Annotations: annotations,
Expand All @@ -71,58 +76,99 @@ func (f *FlagdProxyHandler) Config() *FlagdProxyConfiguration {
return f.config
}

func (f *FlagdProxyHandler) createObject(ctx context.Context, obj client.Object) error {
return f.Client.Create(ctx, obj)
func specDiffers(a, b client.Object) (bool, error) {
if a == nil || b == nil {
return false, fmt.Errorf("object is nil")
}

// Compare only spec based on the object type
switch a.(type) {
case *corev1.Service:
return !reflect.DeepEqual(a.(*corev1.Service).Spec, b.(*corev1.Service).Spec), nil
case *appsV1.Deployment:
return !reflect.DeepEqual(a.(*appsV1.Deployment).Spec, b.(*appsV1.Deployment).Spec), nil
case *policyv1.PodDisruptionBudget:
return !reflect.DeepEqual(a.(*policyv1.PodDisruptionBudget).Spec, b.(*policyv1.PodDisruptionBudget).Spec), nil
default:
return false, fmt.Errorf("unsupported object type")
}
}

func (f *FlagdProxyHandler) updateObject(ctx context.Context, obj client.Object) error {
return f.Client.Update(ctx, obj)
// ensureFlagdProxyResource ensures that the given object is reconciled in the cluster. If the object does not exist, it will be created.
func (f *FlagdProxyHandler) ensureFlagdProxyResource(ctx context.Context, obj client.Object) error {
if obj == nil {
return fmt.Errorf("object is nil")
}

return retry.RetryOnConflict(retry.DefaultRetry, func() error {
var old = obj.DeepCopyObject().(client.Object)
f.Log.Info("Ensuring object exists", "name", obj.GetName(), "namespace", obj.GetNamespace())

// Try to get the existing object
err := f.Client.Get(ctx, client.ObjectKey{Name: old.GetName(), Namespace: old.GetNamespace()}, old)
notFound := errors.IsNotFound(err)
if err != nil && !notFound {
return err
}

// If the object is not found, we will create it
if notFound {
return f.Client.Create(ctx, obj)
}
// If the object exists but is not managed by OFO, return an error
if !common.IsManagedByOFO(old) {
return fmt.Errorf("%s not managed by OFO", obj.GetName())
}

// If the object is found, update if necessary
needsUpdate, err := specDiffers(obj, old)
if err != nil {
return err
}

if needsUpdate {
obj.SetResourceVersion(old.GetResourceVersion())
return f.Client.Update(ctx, obj)
}

return nil
})
}

// HandleFlagdProxy ensures flagd-proxy kubernetes components are configured properly
func (f *FlagdProxyHandler) HandleFlagdProxy(ctx context.Context) error {
exists, deployment, err := f.doesFlagdProxyExist(ctx)
if err != nil {
return err
}
var err error

ownerReference, err := f.getOwnerReference(ctx)
ownerRef, err := f.getOwnerReference(ctx)
if err != nil {
return err
}
newDeployment := f.newFlagdProxyManifest(ownerReference)
newService := f.newFlagdProxyServiceManifest(ownerReference)

if !exists {
f.Log.Info("flagd-proxy Deployment does not exist, creating")
return f.deployFlagdProxy(ctx, f.createObject, newDeployment, newService)
}
// flagd-proxy exists, need to check if we should update it
if f.shouldUpdateFlagdProxy(deployment, newDeployment) {
f.Log.Info("flagd-proxy Deployment out of sync, updating")
return f.deployFlagdProxy(ctx, f.updateObject, newDeployment, newService)
}
f.Log.Info("flagd-proxy Deployment up-to-date")
return nil
}

func (f *FlagdProxyHandler) deployFlagdProxy(ctx context.Context, createUpdateFunc CreateUpdateFunc, deployment *appsV1.Deployment, service *corev1.Service) error {
f.Log.Info("deploying the flagd-proxy")
if err := createUpdateFunc(ctx, deployment); err != nil && !errors.IsAlreadyExists(err) {
if err = f.ensureFlagdProxyResource(ctx, f.newFlagdProxyDeployment(ownerRef)); err != nil {
return err
}
f.Log.Info("deploying the flagd-proxy service")
if err := createUpdateFunc(ctx, service); err != nil && !errors.IsAlreadyExists(err) {

if err = f.ensureFlagdProxyResource(ctx, f.newFlagdProxyService(ownerRef)); err != nil {
return err
}
return nil

err = f.ensureFlagdProxyResource(ctx, f.newFlagdProxyPodDisruptionBudget(ownerRef))
return err
}

func (f *FlagdProxyHandler) newFlagdProxyServiceManifest(ownerReference *metav1.OwnerReference) *corev1.Service {
func (f *FlagdProxyHandler) newFlagdProxyService(ownerReference *metav1.OwnerReference) *corev1.Service {
return &corev1.Service{
TypeMeta: metav1.TypeMeta{
Kind: "Service",
APIVersion: "v1",
},
ObjectMeta: metav1.ObjectMeta{
Name: FlagdProxyServiceName,
Namespace: f.config.Namespace,
OwnerReferences: []metav1.OwnerReference{*ownerReference},
Labels: map[string]string{
common.ManagedByAnnotationKey: common.ManagedByAnnotationValue,
},
},
Spec: corev1.ServiceSpec{
Selector: map[string]string{
Expand All @@ -140,8 +186,41 @@ func (f *FlagdProxyHandler) newFlagdProxyServiceManifest(ownerReference *metav1.
}
}

func (f *FlagdProxyHandler) newFlagdProxyManifest(ownerReference *metav1.OwnerReference) *appsV1.Deployment {
replicas := int32(1)
func (f *FlagdProxyHandler) newFlagdProxyPodDisruptionBudget(ownerReference *metav1.OwnerReference) *policyv1.PodDisruptionBudget {

// Only require pods to be available if there is >1 replica configured (HA setup)
minReplicas := intstr.FromInt(0)
if f.config.Replicas > 1 {
minReplicas = intstr.FromInt(f.config.Replicas / 2)
}

return &policyv1.PodDisruptionBudget{
TypeMeta: metav1.TypeMeta{
Kind: "PodDisruptionBudget",
APIVersion: "policy/v1",
},
ObjectMeta: metav1.ObjectMeta{
Name: FlagdProxyPodDisruptionBudgetName,
Namespace: f.config.Namespace,
OwnerReferences: []metav1.OwnerReference{*ownerReference},
Labels: map[string]string{
common.ManagedByAnnotationKey: common.ManagedByAnnotationValue,
},
},
Spec: policyv1.PodDisruptionBudgetSpec{
MinAvailable: &minReplicas,
Selector: &metav1.LabelSelector{
MatchLabels: map[string]string{
"app.kubernetes.io/name": FlagdProxyDeploymentName,
common.ManagedByAnnotationKey: common.ManagedByAnnotationValue,
},
},
},
}
}

func (f *FlagdProxyHandler) newFlagdProxyDeployment(ownerReference *metav1.OwnerReference) *appsV1.Deployment {
replicas := int32(f.config.Replicas)
args := []string{
"start",
"--management-port",
Expand All @@ -157,10 +236,10 @@ func (f *FlagdProxyHandler) newFlagdProxyManifest(ownerReference *metav1.OwnerRe
})
}
flagdLabels := map[string]string{
"app": FlagdProxyDeploymentName,
"app.kubernetes.io/name": FlagdProxyDeploymentName,
"app.kubernetes.io/managed-by": common.ManagedByAnnotationValue,
"app.kubernetes.io/version": f.config.Tag,
"app": FlagdProxyDeploymentName,
"app.kubernetes.io/name": FlagdProxyDeploymentName,
common.ManagedByAnnotationKey: common.ManagedByAnnotationValue,
"app.kubernetes.io/version": f.config.Tag,
}
if len(f.config.Labels) > 0 {
maps.Copy(flagdLabels, f.config.Labels)
Expand All @@ -173,13 +252,17 @@ func (f *FlagdProxyHandler) newFlagdProxyManifest(ownerReference *metav1.OwnerRe
}

return &appsV1.Deployment{
TypeMeta: metav1.TypeMeta{
Kind: "Deployment",
APIVersion: "apps/v1",
},
ObjectMeta: metav1.ObjectMeta{
Name: FlagdProxyDeploymentName,
Namespace: f.config.Namespace,
Labels: map[string]string{
"app": FlagdProxyDeploymentName,
"app.kubernetes.io/managed-by": common.ManagedByAnnotationValue,
"app.kubernetes.io/version": f.config.Tag,
"app": FlagdProxyDeploymentName,
common.ManagedByAnnotationKey: common.ManagedByAnnotationValue,
"app.kubernetes.io/version": f.config.Tag,
},
OwnerReferences: []metav1.OwnerReference{*ownerReference},
},
Expand Down Expand Up @@ -215,41 +298,31 @@ func (f *FlagdProxyHandler) newFlagdProxyManifest(ownerReference *metav1.OwnerRe
Args: args,
},
},
TopologySpreadConstraints: []corev1.TopologySpreadConstraint{
{
MaxSkew: 1,
TopologyKey: "kubernetes.io/hostname",
WhenUnsatisfiable: corev1.DoNotSchedule,
LabelSelector: &metav1.LabelSelector{
MatchLabels: map[string]string{
"app.kubernetes.io/name": FlagdProxyDeploymentName,
common.ManagedByAnnotationKey: common.ManagedByAnnotationValue,
},
},
},
},
},
},
},
}
}

func (f *FlagdProxyHandler) doesFlagdProxyExist(ctx context.Context) (bool, *appsV1.Deployment, error) {
d := &appsV1.Deployment{}
err := f.Client.Get(ctx, client.ObjectKey{Name: FlagdProxyDeploymentName, Namespace: f.config.Namespace}, d)
if err != nil {
if errors.IsNotFound(err) {
// does not exist, is not ready, no error
return false, nil, nil
}
// does not exist, is not ready, is in error
return false, nil, err
}
return true, d, nil
}

func (f *FlagdProxyHandler) shouldUpdateFlagdProxy(old, new *appsV1.Deployment) bool {
if !common.IsManagedByOFO(old) {
f.Log.Info("flagd-proxy Deployment not managed by OFO")
return false
}
return !reflect.DeepEqual(old.Spec, new.Spec)
}

func (f *FlagdProxyHandler) getOperatorDeployment(ctx context.Context) (*appsV1.Deployment, error) {
d := &appsV1.Deployment{}
if err := f.Client.Get(ctx, client.ObjectKey{Name: f.config.OperatorDeploymentName, Namespace: f.config.Namespace}, d); err != nil {
return nil, fmt.Errorf("unable to fetch operator deployment: %w", err)
}
return d, nil

}

func (f *FlagdProxyHandler) getOwnerReference(ctx context.Context) (*metav1.OwnerReference, error) {
Expand Down
Loading

0 comments on commit e115159

Please sign in to comment.