Skip to content

Commit

Permalink
fix: RepoSync management conflict reporting (#1354)
Browse files Browse the repository at this point in the history
- Change the run loop retry handler to check the conflict handler
  for errors, instead of using a flag in the remediator watchers.
  This fixes a bug where the applier may only be triggered once
  after a conflict, even if the conflict was not resolved.
- Change the remediator to treat namespace reconcilers the same as
  root reconcilers when reporting a management conflict error.
  This fixes a bug which could happen when the applier succeeded
  and then a new conflict was discovered, which wouldn't report the
  conflict until the next resync or source change.
- Use a RWMutex in the conflict handler, to allows parallel reads.
- Change the remote reporting (RootSync -> RootSync) of conflict
  status errors to read the errors from the conflict handler,
  instead of the applier errors. This just simplifies the code.
  The reported errors should be identical.
  • Loading branch information
karlkfi authored Jul 25, 2024
1 parent 6299a2e commit 285229e
Show file tree
Hide file tree
Showing 8 changed files with 83 additions and 119 deletions.
31 changes: 22 additions & 9 deletions e2e/testcases/multi_sync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -322,7 +322,7 @@ func TestConflictingDefinitions_RootToNamespace(t *testing.T) {
nt.T.Logf("The RepoSync %s reports a problem since it can't sync the declaration.", repoSyncNN)
nt.WaitForRepoSyncSyncError(repoSyncNN.Namespace, repoSyncNN.Name, status.ManagementConflictErrorCode, "detected a management conflict", nil)

nt.T.Logf("Validate reconciler error metric is emitted from namespace reconciler %s", repoSyncNN)
nt.T.Logf("Validate conflict metric is emitted from Namespace reconciler %s", repoSyncNN)
repoSyncLabels, err := nomostest.MetricLabelsForRepoSync(nt, repoSyncNN)
if err != nil {
nt.T.Fatal(err)
Expand All @@ -336,7 +336,8 @@ func TestConflictingDefinitions_RootToNamespace(t *testing.T) {
// metric, there might not be a LastSyncTimestamp with status=error.
// nomostest.ReconcilerSyncError(nt, repoSyncLabels, commitHash),
nomostest.ReconcilerErrorMetrics(nt, repoSyncLabels, commitHash, metrics.ErrorSummary{
Sync: 1,
Conflicts: 1,
Sync: 1,
}))
if err != nil {
nt.T.Fatal(err)
Expand Down Expand Up @@ -380,6 +381,11 @@ func TestConflictingDefinitions_RootToNamespace(t *testing.T) {

err = nomostest.ValidateStandardMetricsForRepoSync(nt, metrics.Summary{
Sync: repoSyncNN,
Errors: metrics.ErrorSummary{
// resource_conflicts_total is cumulative and ony resets whe the commit changes
// TODO: Fix resource_conflicts_total to reflect the actual current total number of conflicts.
Conflicts: 1,
},
})
if err != nil {
nt.T.Fatal(err)
Expand Down Expand Up @@ -453,17 +459,18 @@ func TestConflictingDefinitions_NamespaceToRoot(t *testing.T) {
nt.T.Fatal(err)
}

// Validate reconciler error metric is emitted from namespace reconciler.
rootSyncLabels, err := nomostest.MetricLabelsForRepoSync(nt, repoSyncNN)
nt.T.Logf("Validate conflict metric is emitted from Namespace reconciler %s", repoSyncNN)
repoSyncLabels, err := nomostest.MetricLabelsForRepoSync(nt, repoSyncNN)
if err != nil {
nt.T.Fatal(err)
}
commitHash := nt.NonRootRepos[repoSyncNN].MustHash(nt.T)

err = nomostest.ValidateMetrics(nt,
nomostest.ReconcilerSyncError(nt, rootSyncLabels, commitHash),
nomostest.ReconcilerErrorMetrics(nt, rootSyncLabels, commitHash, metrics.ErrorSummary{
Sync: 1,
nomostest.ReconcilerSyncError(nt, repoSyncLabels, commitHash),
nomostest.ReconcilerErrorMetrics(nt, repoSyncLabels, commitHash, metrics.ErrorSummary{
Conflicts: 1,
Sync: 1,
}))
if err != nil {
nt.T.Fatal(err)
Expand Down Expand Up @@ -711,7 +718,7 @@ func TestConflictingDefinitions_NamespaceToNamespace(t *testing.T) {
if err != nil {
nt.T.Fatal(err)
}
nt.T.Logf("Validate reconciler error metric is emitted from Namespace reconciler %s", repoSyncNN2)
nt.T.Logf("Validate conflict metric is emitted from Namespace reconciler %s", repoSyncNN2)
repoSync2Labels, err := nomostest.MetricLabelsForRepoSync(nt, repoSyncNN2)
if err != nil {
nt.T.Fatal(err)
Expand All @@ -721,7 +728,8 @@ func TestConflictingDefinitions_NamespaceToNamespace(t *testing.T) {
err = nomostest.ValidateMetrics(nt,
nomostest.ReconcilerSyncError(nt, repoSync2Labels, commitHash),
nomostest.ReconcilerErrorMetrics(nt, repoSync2Labels, commitHash, metrics.ErrorSummary{
Sync: 1,
Conflicts: 1,
Sync: 1,
}))
if err != nil {
nt.T.Fatal(err)
Expand Down Expand Up @@ -766,6 +774,11 @@ func TestConflictingDefinitions_NamespaceToNamespace(t *testing.T) {
// Validate no errors from namespace reconciler #2.
err = nomostest.ValidateStandardMetricsForRepoSync(nt, metrics.Summary{
Sync: repoSyncNN2,
Errors: metrics.ErrorSummary{
// resource_conflicts_total is cumulative and ony resets whe the commit changes
// TODO: Fix resource_conflicts_total to reflect the actual current total number of conflicts.
Conflicts: 1,
},
})
if err != nil {
nt.T.Fatal(err)
Expand Down
13 changes: 2 additions & 11 deletions pkg/parse/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ func Run(ctx context.Context, p Parser, nsControllerState *namespacecontroller.S
// Retry if there was an error, conflict, or any watches need to be updated.
case <-retryTimer.C():
var trigger string
if opts.managementConflict() {
if opts.HasManagementConflict() {
// Reset the cache partially to make sure all the steps of a parse-apply-watch loop will run.
// The cached sourceState will not be reset to avoid reading all the source files unnecessarily.
// The cached needToRetry will not be reset to avoid resetting the backoff retries.
Expand Down Expand Up @@ -619,17 +619,8 @@ func setSyncStatus(ctx context.Context, p Parser, state *reconcilerState, spec S
state.status.SyncingConditionLastUpdate = newSyncStatus.LastUpdate
}

// Extract conflict errors from sync errors.
var conflictErrs []status.ManagementConflictError
if syncErrs != nil {
for _, err := range syncErrs.Errors() {
if conflictErr, ok := err.(status.ManagementConflictError); ok {
conflictErrs = append(conflictErrs, conflictErr)
}
}
}
// Report conflict errors to the remote manager, if it's a RootSync.
if err := reportRootSyncConflicts(ctx, p.K8sClient(), conflictErrs); err != nil {
if err := reportRootSyncConflicts(ctx, p.K8sClient(), options.ManagementConflicts()); err != nil {
return fmt.Errorf("failed to report remote conflicts: %w", err)
}
return nil
Expand Down
12 changes: 10 additions & 2 deletions pkg/parse/updater.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,16 @@ func (u *Updater) needToUpdateWatch() bool {
return u.Remediator.NeedsUpdate()
}

func (u *Updater) managementConflict() bool {
return u.Remediator.ManagementConflict()
// HasManagementConflict returns true when conflict errors have been encountered
// by the Applier or Remediator for at least one currently managed object.
func (u *Updater) HasManagementConflict() bool {
return u.SyncErrorCache.conflictHandler.HasConflictErrors()
}

// ManagementConflicts returns a list of conflict errors encountered by the
// Applier or Remediator.
func (u *Updater) ManagementConflicts() []status.ManagementConflictError {
return u.SyncErrorCache.conflictHandler.ConflictErrors()
}

// Remediating returns true if the Remediator is remediating.
Expand Down
15 changes: 12 additions & 3 deletions pkg/remediator/conflict/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,14 @@ type Handler interface {

// ConflictErrors returns the management conflict errors (KNV1060) the remediator encounters.
ConflictErrors() []status.ManagementConflictError
// HasConflictErrors returns true when there are conflict errors
HasConflictErrors() bool
}

// handler implements Handler.
type handler struct {
// mux guards the conflictErrs
mux sync.Mutex
mux sync.RWMutex
// conflictErrs tracks all the conflict errors (KNV1060) the remediator encounters,
// and report to RootSync|RepoSync status.
conflictErrs *orderedmap.OrderedMap[core.ID, status.ManagementConflictError]
Expand Down Expand Up @@ -94,8 +96,8 @@ func (h *handler) ClearConflictErrorsWithKind(gk schema.GroupKind) {
}

func (h *handler) ConflictErrors() []status.ManagementConflictError {
h.mux.Lock()
defer h.mux.Unlock()
h.mux.RLock()
defer h.mux.RUnlock()

// Return a copy
var result []status.ManagementConflictError
Expand All @@ -104,3 +106,10 @@ func (h *handler) ConflictErrors() []status.ManagementConflictError {
}
return result
}

func (h *handler) HasConflictErrors() bool {
h.mux.RLock()
defer h.mux.RUnlock()

return h.conflictErrs.Len() > 0
}
7 changes: 0 additions & 7 deletions pkg/remediator/remediator.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,6 @@ type Interface interface {
// UpdateWatches starts and stops server-side watches based upon the given map
// of GVKs which should be watched.
UpdateWatches(context.Context, map[schema.GroupVersionKind]struct{}) status.MultiError
// ManagementConflict returns true if one of the watchers noticed a management conflict.
ManagementConflict() bool
}

var _ Interface = &Remediator{}
Expand Down Expand Up @@ -238,8 +236,3 @@ func (r *Remediator) AddWatches(ctx context.Context, gvks map[schema.GroupVersio
func (r *Remediator) UpdateWatches(ctx context.Context, gvks map[schema.GroupVersionKind]struct{}) status.MultiError {
return r.watchMgr.UpdateWatches(ctx, gvks)
}

// ManagementConflict implements Interface.
func (r *Remediator) ManagementConflict() bool {
return r.watchMgr.ManagementConflict()
}
101 changes: 32 additions & 69 deletions pkg/remediator/watch/filteredwatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ import (
"kpt.dev/configsync/pkg/core"
"kpt.dev/configsync/pkg/declared"
"kpt.dev/configsync/pkg/diff"
"kpt.dev/configsync/pkg/metadata"
"kpt.dev/configsync/pkg/metrics"
"kpt.dev/configsync/pkg/remediator/conflict"
"kpt.dev/configsync/pkg/remediator/queue"
Expand Down Expand Up @@ -73,9 +72,6 @@ const maxWatchRetryFactor = 18
type Runnable interface {
Stop()
Run(ctx context.Context) status.Error
ManagementConflict() bool
SetManagementConflict(object client.Object, commit string)
ClearManagementConflicts()
ClearManagementConflictsWithKind(gk schema.GroupKind)
}

Expand All @@ -101,14 +97,13 @@ type filteredWatcher struct {
scope declared.Scope
syncName string
// errorTracker maps an error to the time when the same error happened last time.
errorTracker map[string]time.Time
errorTracker map[string]time.Time
conflictHandler conflict.Handler

// The following fields are guarded by the mutex.
mux sync.Mutex
base watch.Interface
stopped bool
managementConflict bool
conflictHandler conflict.Handler
mux sync.Mutex
base watch.Interface
stopped bool
}

// filteredWatcher implements the Runnable interface.
Expand Down Expand Up @@ -156,59 +151,6 @@ func (w *filteredWatcher) addError(errorID string) bool {
return false
}

func (w *filteredWatcher) ManagementConflict() bool {
w.mux.Lock()
defer w.mux.Unlock()
return w.managementConflict
}

func (w *filteredWatcher) SetManagementConflict(object client.Object, commit string) {
w.mux.Lock()
defer w.mux.Unlock()

// The resource should be managed by a namespace reconciler, but now is updated.
// Most likely the resource is now managed by a root reconciler.
// In this case, set `managementConflict` true to trigger the namespace reconciler's
// parse-apply-watch loop. The kpt_applier should report the ManagementConflictError (KNV1060).
// No need to add the conflictError to the remediator because it will be surfaced by kpt_applier.
if w.scope != declared.RootScope {
w.managementConflict = true
return
}

manager, found := object.GetAnnotations()[metadata.ResourceManagerKey]
// There should be no conflict if the resource manager key annotation is not found
// because any reconciler can manage a non-ConfigSync managed resource.
if !found {
klog.Warningf("No management conflict as the object %q is not managed by Config Sync", core.IDOf(object))
return
}
// Root reconciler can override resource managed by namespace reconciler.
// It is not a conflict in this case.
if !declared.IsRootManager(manager) {
klog.Warningf("No management conflict as the root reconciler %q should update the object %q that is managed by the namespace reconciler %q",
w.syncName, core.IDOf(object), manager)
return
}

// The remediator detects conflict between two root reconcilers.
// Add the conflict error to the remediator, and the updateStatus goroutine will surface the ManagementConflictError (KNV1060).
// It also sets `managementConflict` true to keep retrying the parse-apply-watch loop
// so that the error can auto-resolve if the resource is removed from the conflicting manager's repository.
w.managementConflict = true
newManager := declared.ResourceManager(w.scope, w.syncName)
klog.Warningf("The remediator detects a management conflict for object %q between root reconcilers: %q and %q",
core.GKNN(object), newManager, manager)
w.conflictHandler.AddConflictError(core.IDOf(object), status.ManagementConflictErrorWrap(object, newManager))
metrics.RecordResourceConflict(context.Background(), commit)
}

func (w *filteredWatcher) ClearManagementConflicts() {
w.mux.Lock()
w.managementConflict = false
w.mux.Unlock()
}

func (w *filteredWatcher) ClearManagementConflictsWithKind(gk schema.GroupKind) {
w.conflictHandler.ClearConflictErrorsWithKind(gk)
}
Expand Down Expand Up @@ -489,11 +431,14 @@ func (w *filteredWatcher) handle(ctx context.Context, event watch.Event) (string
// watcher for processing.
func (w *filteredWatcher) shouldProcess(object client.Object) bool {
id := core.IDOf(object)

// Process the resource if we are the manager regardless if it is declared or not.
if diff.IsManager(w.scope, w.syncName, object) {
// TODO: Remove conflict error AFTER it has been resolved, not before.
w.conflictHandler.RemoveConflictError(id)
return true
}

decl, commit, found := w.resources.Get(id)
if !found {
// The resource is neither declared nor managed by the same reconciler, so don't manage it.
Expand All @@ -504,14 +449,32 @@ func (w *filteredWatcher) shouldProcess(object client.Object) bool {
// its declaration. Otherwise we expect to get another event for the same
// object but with a matching GVK so we can actually compare it to its
// declaration.
if object.GetObjectKind().GroupVersionKind() != decl.GroupVersionKind() {
currentGVK := object.GetObjectKind().GroupVersionKind()
declaredGVK := decl.GroupVersionKind()
if currentGVK != declaredGVK {
klog.V(5).Infof("Received a watch event for object %q with kind %s, which does not match the declared kind %s. ",
id, currentGVK, declaredGVK)
return false
}

if !diff.CanManage(w.scope, w.syncName, object, diff.OperationManage) {
w.SetManagementConflict(object, commit)
return false
if diff.CanManage(w.scope, w.syncName, object, diff.OperationManage) {
// TODO: Remove conflict error AFTER it has been resolved, not before.
w.conflictHandler.RemoveConflictError(id)
return true
}
w.conflictHandler.RemoveConflictError(id)
return true

desiredManager := declared.ResourceManager(w.scope, w.syncName)
conflictErr := status.ManagementConflictErrorWrap(object, desiredManager)
currentManager := conflictErr.ConflictingManager()

// TODO: Move logging & metric recording into the conflict handler
klog.Errorf("Management conflict detected. "+
"Reconciler %q received a watch event for object %q, which is managed by namespace reconciler %q. ",
desiredManager, id, currentManager)
// Add the conflict error to the conflict handler.
// The async status updater will handle updating the RSync status.
w.conflictHandler.AddConflictError(id, conflictErr)
// TODO: Use separate metrics for management conflicts vs resource conflicts
metrics.RecordResourceConflict(context.Background(), commit)
return false
}
18 changes: 0 additions & 18 deletions pkg/remediator/watch/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,24 +122,6 @@ func (m *Manager) NeedsUpdate() bool {
return m.needsUpdate
}

// ManagementConflict returns true if any watcher notices any management conflicts. This function is threadsafe.
func (m *Manager) ManagementConflict() bool {
m.mux.Lock()
defer m.mux.Unlock()

managementConflict := false
// If one of the watchers noticed a management conflict, the remediator will indicate that
// it needs an update so that the parse-apply-watch loop can also detect the conflict and
//report it as an error status.
for _, watcher := range m.watcherMap {
if watcher.ManagementConflict() {
managementConflict = true
watcher.ClearManagementConflicts()
}
}
return managementConflict
}

// AddWatches accepts a map of GVKs that should be watched and takes the
// following actions:
// - start watchers for any GroupVersionKind that is present in the given map
Expand Down
5 changes: 5 additions & 0 deletions pkg/syncer/syncertest/fake/conflict_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,11 @@ func (h *ConflictHandler) ConflictErrors() []status.ManagementConflictError {
return nil
}

// HasConflictErrors is a fake implementation of HasConflictErrors of conflict.Handler.
func (h *ConflictHandler) HasConflictErrors() bool {
return false
}

// ClearConflictErrorsWithKind is a fake implementation of ClearConflictErrorsWithKind of conflict.Handler.
func (h *ConflictHandler) ClearConflictErrorsWithKind(schema.GroupKind) {
}
Expand Down

0 comments on commit 285229e

Please sign in to comment.