Skip to content

Commit

Permalink
Refactor parse.Run to use an event funnel (#1272)
Browse files Browse the repository at this point in the history
Replace the timers in parse.Run with an set of event Producers that
send events into a Funnel which synchronizes them for an EventHandler
which executes the RunFunc as needed for specific event types.

The PublishingGroupBuilder takes the timer periods and generates
Publishers, one for each type of event. Then the event Funnel starts
all the Publishers and runs a select loop that calls the Publisher's
Publish method when that event is received. The Publish method then
calls the Subscriber.Handle method.

Motivation:
- Allows for testing retry backoff in the event funnel, separate
  from the parser logic, using a FakeClock.
- This should make parse.Run look a little more like a controller's
  Reconcile loop from controller-runtime, which paves the way for
  further refactoring.
- This is not an ideal solution yet, just a next step towards
  something that's easier to test and easier to change.
  • Loading branch information
karlkfi authored Sep 23, 2024
1 parent da1a5eb commit 9780d88
Show file tree
Hide file tree
Showing 13 changed files with 1,268 additions and 333 deletions.
152 changes: 152 additions & 0 deletions pkg/parse/event_handler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
// Copyright 2024 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package parse

import (
"context"
"errors"

"k8s.io/klog/v2"
"kpt.dev/configsync/pkg/parse/events"
"kpt.dev/configsync/pkg/reconciler/namespacecontroller"
)

// EventHandler is a events.Subscriber implementation that handles events and
// triggers the RunFunc when appropriate.
type EventHandler struct {
Context context.Context
Parser Parser
ReconcilerState *reconcilerState
NSControllerState *namespacecontroller.State
Run RunFunc
}

// NewEventHandler builds an EventHandler
func NewEventHandler(ctx context.Context, parser Parser, nsControllerState *namespacecontroller.State, runFn RunFunc) *EventHandler {
return &EventHandler{
Context: ctx,
Parser: parser,
ReconcilerState: &reconcilerState{},
NSControllerState: nsControllerState,
Run: runFn,
}
}

// Handle an Event and return the Result.
// - SyncWithReimportEventType - Reset the cache and sync from scratch.
// - SyncEventType - Sync from the cache, priming the cache from disk, if necessary.
// - StatusEventType - Update the RSync status with status from the Remediator & NSController.
// - NamespaceResyncEventType - Sync from the cache, if the NSController requested one.
// - RetrySyncEventType - Sync from the cache, if one of the following cases is detected:
// - Remediator or Reconciler reported a management conflict
// - Reconciler requested a retry due to error
// - Remediator requested a watch update
func (s *EventHandler) Handle(event events.Event) events.Result {
opts := s.Parser.options()

var eventResult events.Result
// Wrap the RunFunc to set Result.RunAttempted.
// This delays status update and sync events.
runFn := func(ctx context.Context, p Parser, trigger string, state *reconcilerState) RunResult {
result := s.Run(ctx, p, trigger, state)
eventResult.RunAttempted = true
return result
}

var runResult RunResult
switch event.Type {
case events.SyncWithReimportEventType:
// Re-apply even if no changes have been detected.
// This case should be checked first since it resets the cache.
// If the reconciler is in the process of reconciling a given commit, the resync won't
// happen until the ongoing reconciliation is done.
klog.Infof("It is time for a force-resync")
// Reset the cache partially to make sure all the steps of a parse-apply-watch loop will run.
// The cached sourceState will not be reset to avoid reading all the source files unnecessarily.
// The cached needToRetry will not be reset to avoid resetting the backoff retries.
s.ReconcilerState.resetPartialCache()
runResult = runFn(s.Context, s.Parser, triggerResync, s.ReconcilerState)

case events.SyncEventType:
// Re-import declared resources from the filesystem (from *-sync).
// If the reconciler is in the process of reconciling a given commit, the re-import won't
// happen until the ongoing reconciliation is done.
runResult = runFn(s.Context, s.Parser, triggerReimport, s.ReconcilerState)

case events.StatusEventType:
// Publish the sync status periodically to update remediator errors.
// Skip updates if the remediator is not running yet, paused, or watches haven't been updated yet.
// This implies that this reconciler has successfully parsed, rendered, validated, and synced.
if opts.Remediating() {
klog.V(3).Info("Updating sync status (periodic while not syncing)")
// Don't update the sync spec or commit.
if err := setSyncStatus(s.Context, s.Parser, s.ReconcilerState, s.ReconcilerState.status.SyncStatus.Spec, false, s.ReconcilerState.status.SyncStatus.Commit, s.Parser.SyncErrors()); err != nil {
if errors.Is(err, context.Canceled) {
klog.Infof("Sync status update skipped: %v", err)
} else {
klog.Warningf("Failed to update sync status: %v", err)
}
}
}

case events.NamespaceResyncEventType:
// If the namespace controller indicates that an update is needed,
// attempt to re-sync.
if !s.NSControllerState.ScheduleSync() {
// No RunFunc call
break
}

klog.Infof("A new sync is triggered by a Namespace event")
// Reset the cache partially to make sure all the steps of a parse-apply-watch loop will run.
// The cached sourceState will not be reset to avoid reading all the source files unnecessarily.
// The cached needToRetry will not be reset to avoid resetting the backoff retries.
s.ReconcilerState.resetPartialCache()
runResult = runFn(s.Context, s.Parser, namespaceEvent, s.ReconcilerState)

case events.RetrySyncEventType:
// Retry if there was an error, conflict, or any watches need to be updated.
var trigger string
if opts.HasManagementConflict() {
// Reset the cache partially to make sure all the steps of a parse-apply-watch loop will run.
// The cached sourceState will not be reset to avoid reading all the source files unnecessarily.
// The cached needToRetry will not be reset to avoid resetting the backoff retries.
s.ReconcilerState.resetPartialCache()
trigger = triggerManagementConflict
} else if s.ReconcilerState.cache.needToRetry {
trigger = triggerRetry
} else if opts.needToUpdateWatch() {
trigger = triggerWatchUpdate
} else {
// No RunFunc call
break
}

// During the execution of `run`, if a new commit is detected,
// retryTimer will be reset to `Options.RetryPeriod`, and state.backoff is reset to `defaultBackoff()`.
// In this case, `run` will try to sync the configs from the new commit instead of the old commit
// being retried.
runResult = runFn(s.Context, s.Parser, trigger, s.ReconcilerState)

default:
klog.Fatalf("Invalid event received: %#v", event)
}

// If the run succeeded or source changed, reset the retry backoff.
if runResult.Success || runResult.SourceChanged {
eventResult.ResetRetryBackoff = true
}
return eventResult
}
67 changes: 67 additions & 0 deletions pkg/parse/events/builder.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
// Copyright 2024 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package events

import (
"time"

"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/utils/clock"
)

// PublishingGroupBuilder oversees construction of event publishers.
//
// For now, the publishers are driven by clock-based delay and backoff timers.
type PublishingGroupBuilder struct {
// Clock is used for time tracking, namely to simplify testing by allowing
// a fake clock, instead of a RealClock.
Clock clock.Clock
// SyncPeriod is the period of time between checking the filesystem
// for publisher updates to sync.
SyncPeriod time.Duration
// SyncWithReimportPeriod is the period of time between forced re-sync from
// publisher (even without a new commit).
SyncWithReimportPeriod time.Duration
// StatusUpdatePeriod is how long the Parser waits between updates of the
// sync status, to account for management conflict errors from the Remediator.
StatusUpdatePeriod time.Duration
// NamespaceControllerPeriod is how long to wait between checks to see if
// the namespace-controller wants to trigger a resync.
// TODO: Use a channel, instead of a timer checking a locked variable.
NamespaceControllerPeriod time.Duration
// RetryBackoff is how long the Parser waits between retries, after an error.
RetryBackoff wait.Backoff
}

// Build a list of Publishers based on the PublishingGroupBuilder config.
func (t *PublishingGroupBuilder) Build() []Publisher {
var publishers []Publisher
if t.SyncPeriod > 0 {
publishers = append(publishers, NewResetOnRunAttemptPublisher(SyncEventType, t.Clock, t.SyncPeriod))
}
if t.SyncWithReimportPeriod > 0 {
publishers = append(publishers, NewTimeDelayPublisher(SyncWithReimportEventType, t.Clock, t.SyncWithReimportPeriod))
}
if t.NamespaceControllerPeriod > 0 {
publishers = append(publishers, NewTimeDelayPublisher(NamespaceResyncEventType, t.Clock, t.NamespaceControllerPeriod))
}
if t.RetryBackoff.Duration > 0 {
publishers = append(publishers, NewRetrySyncPublisher(t.Clock, t.RetryBackoff))
}
if t.StatusUpdatePeriod > 0 {
publishers = append(publishers, NewResetOnRunAttemptPublisher(StatusEventType, t.Clock, t.StatusUpdatePeriod))
}
return publishers
}
36 changes: 36 additions & 0 deletions pkg/parse/events/doc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
// Copyright 2024 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// Package events contains a set of Events sent by Publishers.
//
// PublishingGroupBuilder can be used to Build a set of Publishers.
//
// Publisher types:
// - TimeDelayPublisher - Publishes events with a configurable time delay.
// - ResetOnRunAttemptPublisher - TimeDelayPublisher with resettable timer (Result.RunAttempted).
// - RetrySyncPublisher - TimeDelayPublisher with resettable backoff (Result.ResetRetryBackoff).
//
// Events and their Publisher:
// - SyncEvent - ResetOnRunAttemptPublisher (SyncPeriod)
// - SyncWithReimportEvent - TimeDelayPublisher (SyncWithReimportPeriod)
// - NamespaceResyncEvent - TimeDelayPublisher (NamespaceControllerPeriod)
// - RetrySyncEvent - RetrySyncPublisher (RetryBackoff)
// - StatusEvent - ResetOnRunAttemptPublisher (StatusUpdatePeriod)
//
// EventResult flags:
// - ResetRetryBackoff - Set after a sync succeeds or the source changed (spec or commit).
// - DelayStatusUpdate - Set after a sync is attempted.
//
// PublishingGroupBuilder can be used to Build a set of the above Publishers.
package events
42 changes: 42 additions & 0 deletions pkg/parse/events/events.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
// Copyright 2024 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package events

// Event represents the cause of HandleEventFunc being called.
// Some event types may have special fields or methods.
type Event struct {
// Type of the event
Type EventType
}

// EventType is the name of the event
type EventType string

const (
// SyncWithReimportEventType is the EventType for a sync from disk,
// including reading and parsing resource objects from the shared volume.
SyncWithReimportEventType EventType = "SyncWithReimportEvent"
// SyncEventType is the EventType for a sync from the source cache,
// only parsing objects from the shared volume if there's a new commit.
SyncEventType EventType = "SyncEvent"
// StatusEventType is the EventType for a periodic RSync status update.
StatusEventType EventType = "StatusEvent"
// NamespaceResyncEventType is the EventType for a sync triggered by an
// update to a selected namespace.
NamespaceResyncEventType EventType = "NamespaceResyncEvent"
// RetrySyncEventType is the EventType for a sync triggered by an error
// during a previous sync attempt.
RetrySyncEventType EventType = "RetrySyncEvent"
)
95 changes: 95 additions & 0 deletions pkg/parse/events/funnel.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
// Copyright 2024 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package events

import (
"context"
"reflect"

"k8s.io/klog/v2"
)

// Funnel events from multiple publishers to a single subscriber.
type Funnel struct {
Publishers []Publisher
Subscriber Subscriber
}

// Start all the event publishers and sends events one at a time to the
// subscriber. Blocks until the publishers are started. Returns a channel that
// will be closed when the Funnel is done handling events. Stops when the
// context is done.
func (f *Funnel) Start(ctx context.Context) <-chan struct{} {
// Stop all sources when done, even if the parent context isn't done.
// This could happen if all the source channels are closed.
ctx, cancel := context.WithCancel(ctx)

klog.Info("Starting event loop")

// Create a list of cases to select from
cases := make([]reflect.SelectCase, len(f.Publishers)+1)
for i, source := range f.Publishers {
klog.Infof("Starting event source: %s", source.Type())
cases[i] = reflect.SelectCase{
Dir: reflect.SelectRecv,
Chan: source.Start(ctx),
}
}

// Add a select case that closes the event channel when the context is done
cases[len(cases)-1] = reflect.SelectCase{
Dir: reflect.SelectRecv,
Chan: reflect.ValueOf(ctx.Done()),
}

doneCh := make(chan struct{})
go func() {
defer cancel()
defer close(doneCh)
f.funnel(ctx, cases)
}()
return doneCh
}

func (f *Funnel) funnel(ctx context.Context, cases []reflect.SelectCase) {
// Select from cases until context is done or all source channels have been closed.
remaining := len(cases)
for remaining > 0 {
// Use reflect.Select to allow a dynamic set of cases.
// None of the channels return anything important, so ignore the value.
caseIndex, _, ok := reflect.Select(cases)
if !ok {
ctxErr := ctx.Err()
if ctxErr != nil {
klog.Infof("Stopping event loop: %v", ctxErr)
return
}
// Closed channels are always selected, so nil the channel to
// disable this case. We can't just remove the case from the list,
// because we need the index to match the handlers list.
cases[caseIndex].Chan = reflect.ValueOf(nil)
remaining--
continue
}

klog.V(1).Infof("Handling event: %s", f.Publishers[caseIndex].Type())
result := f.Publishers[caseIndex].Publish(f.Subscriber)

// Give all publishers a chance to handle the result
for _, source := range f.Publishers {
source.HandleResult(result)
}
}
}
Loading

0 comments on commit 9780d88

Please sign in to comment.