Skip to content

Commit

Permalink
Use informer to retry failed resources due to missing namespace
Browse files Browse the repository at this point in the history
...rather than exponential requeuing which is problematic at scale.
The user can specify a SharedInformer that is used to requeue resources
that had previously failed when a missing namespace is later created.

Signed-off-by: Tom Pantelis <[email protected]>
  • Loading branch information
tpantelis committed May 16, 2024
1 parent 6f941d6 commit 6f6d2d9
Show file tree
Hide file tree
Showing 4 changed files with 209 additions and 44 deletions.
51 changes: 38 additions & 13 deletions pkg/federate/fake/federator.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ package fake

import (
"context"
"sync"
"sync/atomic"
"time"

. "github.com/onsi/gomega"
Expand All @@ -30,26 +32,46 @@ import (
)

type Federator struct {
lock sync.Mutex
distribute chan *unstructured.Unstructured
delete chan *unstructured.Unstructured
FailOnDistribute error
FailOnDelete error
ResetOnFailure bool
failOnDistribute error
failOnDelete error
ResetOnFailure atomic.Bool
}

func New() *Federator {
return &Federator{
distribute: make(chan *unstructured.Unstructured, 100),
delete: make(chan *unstructured.Unstructured, 100),
ResetOnFailure: true,
f := &Federator{
distribute: make(chan *unstructured.Unstructured, 100),
delete: make(chan *unstructured.Unstructured, 100),
}
f.ResetOnFailure.Store(true)

return f
}

func (f *Federator) FailOnDistribute(err error) {
f.lock.Lock()
defer f.lock.Unlock()

f.failOnDistribute = err
}

func (f *Federator) FailOnDelete(err error) {
f.lock.Lock()
defer f.lock.Unlock()

f.failOnDelete = err
}

func (f *Federator) Distribute(_ context.Context, obj runtime.Object) error {
err := f.FailOnDistribute
f.lock.Lock()
defer f.lock.Unlock()

err := f.failOnDistribute
if err != nil {
if f.ResetOnFailure {
f.FailOnDistribute = nil
if f.ResetOnFailure.Load() {
f.failOnDistribute = nil
}

return err
Expand All @@ -61,10 +83,13 @@ func (f *Federator) Distribute(_ context.Context, obj runtime.Object) error {
}

func (f *Federator) Delete(_ context.Context, obj runtime.Object) error {
err := f.FailOnDelete
f.lock.Lock()
defer f.lock.Unlock()

err := f.failOnDelete
if err != nil {
if f.ResetOnFailure {
f.FailOnDelete = nil
if f.ResetOnFailure.Load() {
f.failOnDelete = nil
}

return err
Expand Down
5 changes: 5 additions & 0 deletions pkg/syncer/broker/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"
"k8s.io/utils/ptr"
logf "sigs.k8s.io/controller-runtime/pkg/log"
)
Expand Down Expand Up @@ -134,6 +135,9 @@ type SyncerConfig struct {

// Scheme used to convert resource objects. By default the global k8s Scheme is used.
Scheme *runtime.Scheme

// NamespaceInformer if specified, used to retry local resources that initially failed due to missing namespace.
NamespaceInformer cache.SharedInformer
}

type Syncer struct {
Expand Down Expand Up @@ -254,6 +258,7 @@ func NewSyncer(config SyncerConfig) (*Syncer, error) { //nolint:gocritic // Mini
Scheme: config.Scheme,
ResyncPeriod: rc.BrokerResyncPeriod,
SyncCounter: syncCounter,
NamespaceInformer: config.NamespaceInformer,
})
if err != nil {
return nil, errors.Wrap(err, "error creating remote resource syncer")
Expand Down
111 changes: 90 additions & 21 deletions pkg/syncer/resource_syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,14 @@ import (
"k8s.io/client-go/dynamic"
"k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/tools/cache"
"k8s.io/utils/set"
logf "sigs.k8s.io/controller-runtime/pkg/log"
)

const OrigNamespaceLabelKey = "submariner-io/originatingNamespace"
const (
OrigNamespaceLabelKey = "submariner-io/originatingNamespace"
namespaceKey = "$namespace$"
)

type SyncDirection int

Expand Down Expand Up @@ -183,24 +187,31 @@ type ResourceSyncerConfig struct {

// SyncCounter if specified, used to record counter metrics.
SyncCounter *prometheus.GaugeVec

// NamespaceInformer if specified, used to retry resources that initially failed due to missing namespace.
NamespaceInformer cache.SharedInformer
}

type resourceSyncer struct {
workQueue workqueue.Interface
hasSynced func() bool
informer cache.Controller
store cache.Store
config ResourceSyncerConfig
deleted sync.Map
created sync.Map
stopped chan struct{}
syncCounter *prometheus.GaugeVec
stopCh <-chan struct{}
log log.Logger
workQueue workqueue.Interface
hasSynced func() bool
informer cache.Controller
store cache.Store
config ResourceSyncerConfig
deleted sync.Map
created sync.Map
stopped chan struct{}
syncCounter *prometheus.GaugeVec
stopCh <-chan struct{}
log log.Logger
missingNamespaces map[string]set.Set[string]
}

func NewResourceSyncer(config *ResourceSyncerConfig) (Interface, error) {
syncer := newResourceSyncer(config)
syncer, err := newResourceSyncer(config)
if err != nil {
return nil, err
}

rawType, gvr, err := util.ToUnstructuredResource(config.ResourceType, config.RestMapper)
if err != nil {
Expand Down Expand Up @@ -232,7 +243,11 @@ func NewResourceSyncer(config *ResourceSyncerConfig) (Interface, error) {
}

func NewResourceSyncerWithSharedInformer(config *ResourceSyncerConfig, informer cache.SharedInformer) (Interface, error) {
syncer := newResourceSyncer(config)
syncer, err := newResourceSyncer(config)
if err != nil {
return nil, err
}

syncer.store = informer.GetStore()

reg, err := informer.AddEventHandlerWithResyncPeriod(cache.ResourceEventHandlerFuncs{
Expand All @@ -241,19 +256,20 @@ func NewResourceSyncerWithSharedInformer(config *ResourceSyncerConfig, informer
DeleteFunc: syncer.onDelete,
}, config.ResyncPeriod)
if err != nil {
return nil, errors.Wrapf(err, "error registering even handler")
return nil, errors.Wrapf(err, "error registering event handler")
}

syncer.hasSynced = reg.HasSynced

return syncer, nil
}

func newResourceSyncer(config *ResourceSyncerConfig) *resourceSyncer {
func newResourceSyncer(config *ResourceSyncerConfig) (*resourceSyncer, error) {
syncer := &resourceSyncer{
config: *config,
stopped: make(chan struct{}),
log: log.Logger{Logger: logf.Log.WithName("ResourceSyncer")},
config: *config,
stopped: make(chan struct{}),
log: log.Logger{Logger: logf.Log.WithName("ResourceSyncer")},
missingNamespaces: map[string]set.Set[string]{},
}

if syncer.config.Scheme == nil {
Expand Down Expand Up @@ -285,7 +301,18 @@ func newResourceSyncer(config *ResourceSyncerConfig) *resourceSyncer {

syncer.workQueue = workqueue.New(config.Name)

return syncer
if config.NamespaceInformer != nil {
_, err := config.NamespaceInformer.AddEventHandler(cache.ResourceEventHandlerDetailedFuncs{
AddFunc: func(obj interface{}, _ bool) {
syncer.workQueue.Enqueue(cache.ExplicitKey(cache.NewObjectName(namespaceKey, resourceUtil.MustToMeta(obj).GetName()).String()))
},
})
if err != nil {
return nil, errors.Wrapf(err, "error registering namespace handler")
}
}

return syncer, nil
}

func NewSharedInformer(config *ResourceSyncerConfig) (cache.SharedInformer, error) {
Expand Down Expand Up @@ -476,7 +503,12 @@ func (r *resourceSyncer) runIfCacheSynced(defaultReturn any, run func() any) any
return run()
}

func (r *resourceSyncer) processNextWorkItem(key, _, _ string) (bool, error) {
func (r *resourceSyncer) processNextWorkItem(key, name, ns string) (bool, error) {
if ns == namespaceKey {
r.handleNamespaceAdded(name)
return false, nil
}

obj, exists, err := r.store.GetByKey(key)
if err != nil {
return true, errors.Wrapf(err, "error retrieving resource %q", key)
Expand Down Expand Up @@ -513,6 +545,13 @@ func (r *resourceSyncer) processNextWorkItem(key, _, _ string) (bool, error) {

err = r.config.Federator.Distribute(context.Background(), resource)
if err != nil || r.onSuccessfulSync(resource, transformed, op) {
missing, namespace := resourceUtil.IsMissingNamespaceErr(err)
if missing {
r.handleMissingNamespace(key, namespace)

return false, nil
}

return true, errors.Wrapf(err, "error distributing resource %q", key)
}

Expand Down Expand Up @@ -728,6 +767,36 @@ func (r *resourceSyncer) assertUnstructured(obj interface{}) *unstructured.Unstr
return u
}

func (r *resourceSyncer) handleMissingNamespace(key, namespace string) {
r.log.Warningf("Syncer %q: Unable to distribute resource %q due to missing namespace %q", r.config.Name, key, namespace)

if r.config.NamespaceInformer == nil {
return
}

keys, ok := r.missingNamespaces[namespace]
if !ok {
keys = set.New[string]()
r.missingNamespaces[namespace] = keys
}

keys.Insert(key)
}

func (r *resourceSyncer) handleNamespaceAdded(namespace string) {
keys, ok := r.missingNamespaces[namespace]
if ok {
r.log.V(log.LIBDEBUG).Infof("Syncer %q: namespace %q created - re-queueing %d resources", r.config.Name, namespace, keys.Len())

for _, k := range keys.UnsortedList() {
ns, name, _ := cache.SplitMetaNamespaceKey(k)
r.RequeueResource(name, ns)
}

delete(r.missingNamespaces, namespace)
}
}

func getClusterIDLabel(resource runtime.Object) (string, bool) {
clusterID, found := resourceUtil.MustToMeta(resource).GetLabels()[federate.ClusterIDLabelKey]
return clusterID, found
Expand Down
Loading

0 comments on commit 6f6d2d9

Please sign in to comment.