Skip to content

Commit

Permalink
update OnResourceChanged() param to ResourceDataOrErr
Browse files Browse the repository at this point in the history
  • Loading branch information
purnesh42H committed Jan 10, 2025
1 parent 25dce25 commit 4c7a204
Show file tree
Hide file tree
Showing 20 changed files with 141 additions and 114 deletions.
10 changes: 5 additions & 5 deletions xds/csds/csds_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func Test(t *testing.T) {

type nopListenerWatcher struct{}

func (nopListenerWatcher) OnResourceChanged(_ *xdsresource.ListenerResourceData, _ error, onDone xdsresource.OnDoneFunc) {
func (nopListenerWatcher) OnResourceChanged(_ *xdsresource.ResourceDataOrError, onDone xdsresource.OnDoneFunc) {
onDone()
}
func (nopListenerWatcher) OnAmbientError(_ error, onDone xdsresource.OnDoneFunc) {
Expand All @@ -79,7 +79,7 @@ func (nopListenerWatcher) OnAmbientError(_ error, onDone xdsresource.OnDoneFunc)

type nopRouteConfigWatcher struct{}

func (nopRouteConfigWatcher) OnResourceChanged(_ *xdsresource.RouteConfigResourceData, _ error, onDone xdsresource.OnDoneFunc) {
func (nopRouteConfigWatcher) OnResourceChanged(_ *xdsresource.ResourceDataOrError, onDone xdsresource.OnDoneFunc) {
onDone()
}
func (nopRouteConfigWatcher) OnAmbientError(_ error, onDone xdsresource.OnDoneFunc) {
Expand All @@ -88,7 +88,7 @@ func (nopRouteConfigWatcher) OnAmbientError(_ error, onDone xdsresource.OnDoneFu

type nopClusterWatcher struct{}

func (nopClusterWatcher) OnResourceChanged(_ *xdsresource.ClusterResourceData, _ error, onDone xdsresource.OnDoneFunc) {
func (nopClusterWatcher) OnResourceChanged(_ *xdsresource.ResourceDataOrError, onDone xdsresource.OnDoneFunc) {
onDone()
}
func (nopClusterWatcher) OnAmbientError(_ error, onDone xdsresource.OnDoneFunc) {
Expand All @@ -97,7 +97,7 @@ func (nopClusterWatcher) OnAmbientError(_ error, onDone xdsresource.OnDoneFunc)

type nopEndpointsWatcher struct{}

func (nopEndpointsWatcher) OnResourceChanged(_ *xdsresource.EndpointsResourceData, _ error, onDone xdsresource.OnDoneFunc) {
func (nopEndpointsWatcher) OnResourceChanged(_ *xdsresource.ResourceDataOrError, onDone xdsresource.OnDoneFunc) {
onDone()
}
func (nopEndpointsWatcher) OnAmbientError(_ error, onDone xdsresource.OnDoneFunc) {
Expand Down Expand Up @@ -125,7 +125,7 @@ func newBlockingListenerWatcher(testCtxDone <-chan struct{}) *blockingListenerWa
}
}

func (w *blockingListenerWatcher) OnResourceChanged(_ *xdsresource.ListenerResourceData, _ error, onDone xdsresource.OnDoneFunc) {
func (w *blockingListenerWatcher) OnResourceChanged(_ *xdsresource.ResourceDataOrError, onDone xdsresource.OnDoneFunc) {
writeOnDone(w.testCtxDone, w.onDoneCh, onDone)
}
func (w *blockingListenerWatcher) OnAmbientError(_ error, onDone xdsresource.OnDoneFunc) {
Expand Down
9 changes: 5 additions & 4 deletions xds/internal/balancer/cdsbalancer/cluster_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,14 @@ type clusterWatcher struct {
parent *cdsBalancer
}

func (cw *clusterWatcher) OnResourceChanged(u *xdsresource.ClusterResourceData, err error, onDone xdsresource.OnDoneFunc) {
if err != nil {
handleError := func(context.Context) { cw.parent.onClusterResourceChangedError(cw.name, err); onDone() }
func (cw *clusterWatcher) OnResourceChanged(u *xdsresource.ResourceDataOrError, onDone xdsresource.OnDoneFunc) {
if u.Err != nil {
handleError := func(context.Context) { cw.parent.onClusterResourceChangedError(cw.name, u.Err); onDone() }
cw.parent.serializer.ScheduleOr(handleError, onDone)
return
}
handleUpdate := func(context.Context) { cw.parent.onClusterUpdate(cw.name, u.Resource); onDone() }
update := u.Data.(*xdsresource.ClusterResourceData)
handleUpdate := func(context.Context) { cw.parent.onClusterUpdate(cw.name, update.Resource); onDone() }
cw.parent.serializer.ScheduleOr(handleUpdate, onDone)
}

Expand Down
11 changes: 6 additions & 5 deletions xds/internal/balancer/clusterresolver/resource_resolver_eds.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,18 +76,18 @@ func newEDSResolver(nameToWatch string, producer xdsresource.Producer, topLevelR
}

// OnUpdate is invoked to report an update for the resource being watched.
func (er *edsDiscoveryMechanism) OnResourceChanged(update *xdsresource.EndpointsResourceData, err error, onDone xdsresource.OnDoneFunc) {
func (er *edsDiscoveryMechanism) OnResourceChanged(update *xdsresource.ResourceDataOrError, onDone xdsresource.OnDoneFunc) {
if er.stopped.HasFired() {
onDone()
return
}

if err != nil {
if update.Err != nil {
if er.logger.V(2) {
if xdsresource.ErrType(err) == xdsresource.ErrorTypeResourceNotFound {
if xdsresource.ErrType(update.Err) == xdsresource.ErrorTypeResourceNotFound {
er.logger.Infof("EDS discovery mechanism for resource %q reported resource-does-not-exist error", er.nameToWatch)
} else {
er.logger.Infof("EDS discovery mechanism for resource %q reported on resource changed error: %v", er.nameToWatch, err)
er.logger.Infof("EDS discovery mechanism for resource %q reported on resource changed error: %v", er.nameToWatch, update.Err)
}
}
// Report an empty update that would result in no priority child being
Expand All @@ -105,7 +105,8 @@ func (er *edsDiscoveryMechanism) OnResourceChanged(update *xdsresource.Endpoints
}

er.mu.Lock()
er.update = &update.Resource
u := update.Data.(*xdsresource.EndpointsResourceData)
er.update = &u.Resource
er.mu.Unlock()

er.topLevelResolver.onUpdate(onDone)
Expand Down
18 changes: 10 additions & 8 deletions xds/internal/resolver/watch_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,14 @@ func newListenerWatcher(resourceName string, parent *xdsResolver) *listenerWatch
return lw
}

func (l *listenerWatcher) OnResourceChanged(update *xdsresource.ListenerResourceData, err error, onDone xdsresource.OnDoneFunc) {
if err != nil {
handleError := func(context.Context) { l.parent.onListenerResourceChangedError(err); onDone() }
func (l *listenerWatcher) OnResourceChanged(update *xdsresource.ResourceDataOrError, onDone xdsresource.OnDoneFunc) {
if update.Err != nil {
handleError := func(context.Context) { l.parent.onListenerResourceChangedError(update.Err); onDone() }
l.parent.serializer.ScheduleOr(handleError, onDone)
return
}
handleUpdate := func(context.Context) { l.parent.onListenerResourceUpdate(update.Resource); onDone() }
u := update.Data.(*xdsresource.ListenerResourceData)
handleUpdate := func(context.Context) { l.parent.onListenerResourceUpdate(u.Resource); onDone() }
l.parent.serializer.ScheduleOr(handleUpdate, onDone)
}

Expand All @@ -68,14 +69,15 @@ func newRouteConfigWatcher(resourceName string, parent *xdsResolver) *routeConfi
return rw
}

func (r *routeConfigWatcher) OnResourceChanged(u *xdsresource.RouteConfigResourceData, err error, onDone xdsresource.OnDoneFunc) {
if err != nil {
handleError := func(context.Context) { r.parent.onRouteConfigResourceChangedError(r.resourceName, err); onDone() }
func (r *routeConfigWatcher) OnResourceChanged(u *xdsresource.ResourceDataOrError, onDone xdsresource.OnDoneFunc) {
if u.Err != nil {
handleError := func(context.Context) { r.parent.onRouteConfigResourceChangedError(r.resourceName, u.Err); onDone() }
r.parent.serializer.ScheduleOr(handleError, onDone)
return
}
handleUpdate := func(context.Context) {
r.parent.onRouteConfigResourceUpdate(r.resourceName, u.Resource)
update := u.Data.(*xdsresource.RouteConfigResourceData)
r.parent.onRouteConfigResourceUpdate(r.resourceName, update.Resource)
onDone()
}
r.parent.serializer.ScheduleOr(handleUpdate, onDone)
Expand Down
20 changes: 11 additions & 9 deletions xds/internal/server/listener_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -414,28 +414,30 @@ type ldsWatcher struct {
name string
}

func (lw *ldsWatcher) OnResourceChanged(update *xdsresource.ListenerResourceData, err error, onDone xdsresource.OnDoneFunc) {
func (lw *ldsWatcher) OnResourceChanged(update *xdsresource.ResourceDataOrError, onDone xdsresource.OnDoneFunc) {
defer onDone()
if lw.parent.closed.HasFired() {
if err != nil {
lw.logger.Warningf("Resource %q received err: %#v after listener was closed", lw.name, err)
if update.Err != nil {
lw.logger.Warningf("Resource %q received err: %#v after listener was closed", lw.name, update.Err)
} else {
lw.logger.Warningf("Resource %q received update: %#v after listener was closed", lw.name, update)
}
return
}
if lw.logger.V(2) {
if err != nil {
lw.logger.Infof("LDS watch for resource %q received error: %#v", lw.name, err)
if update.Err != nil {
lw.logger.Infof("LDS watch for resource %q received error: %#v", lw.name, update.Err)
} else {
lw.logger.Infof("LDS watch for resource %q received update: %#v", lw.name, update.Resource)
u := update.Data.(*xdsresource.ListenerResourceData)
lw.logger.Infof("LDS watch for resource %q received update: %#v", lw.name, u.Resource)
}
}
if err != nil {
lw.parent.onLDSResourceChangedError(err)
if update.Err != nil {
lw.parent.onLDSResourceChangedError(update.Err)
return
}
lw.parent.handleLDSUpdate(update.Resource)
u := update.Data.(*xdsresource.ListenerResourceData)
lw.parent.handleLDSUpdate(u.Resource)
}

func (lw *ldsWatcher) OnAmbientError(err error, onDone xdsresource.OnDoneFunc) {
Expand Down
16 changes: 9 additions & 7 deletions xds/internal/server/rds_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ type rdsWatcher struct {
canceled bool // eats callbacks if true
}

func (rw *rdsWatcher) OnResourceChanged(update *xdsresource.RouteConfigResourceData, err error, onDone xdsresource.OnDoneFunc) {
func (rw *rdsWatcher) OnResourceChanged(update *xdsresource.ResourceDataOrError, onDone xdsresource.OnDoneFunc) {
defer onDone()
rw.mu.Lock()
if rw.canceled {
Expand All @@ -156,17 +156,19 @@ func (rw *rdsWatcher) OnResourceChanged(update *xdsresource.RouteConfigResourceD
}
rw.mu.Unlock()
if rw.logger.V(2) {
if err != nil {
rw.logger.Infof("RDS watch for resource %q received error: %#v", rw.routeName, err)
if update.Err != nil {
rw.logger.Infof("RDS watch for resource %q received error: %#v", rw.routeName, update.Err)
} else {
rw.logger.Infof("RDS watch for resource %q received update: %#v", rw.routeName, update.Resource)
u := update.Data.(*xdsresource.RouteConfigResourceData)
rw.logger.Infof("RDS watch for resource %q received update: %#v", rw.routeName, u.Resource)
}
}
if err != nil {
rw.parent.handleRouteUpdate(rw.routeName, rdsWatcherUpdate{err: err})
if update.Err != nil {
rw.parent.handleRouteUpdate(rw.routeName, rdsWatcherUpdate{err: update.Err})
return
}
rw.parent.handleRouteUpdate(rw.routeName, rdsWatcherUpdate{data: &update.Resource})
u := update.Data.(*xdsresource.RouteConfigResourceData)
rw.parent.handleRouteUpdate(rw.routeName, rdsWatcherUpdate{data: &u.Resource})
}

func (rw *rdsWatcher) OnAmbientError(err error, onDone xdsresource.OnDoneFunc) {
Expand Down
10 changes: 5 additions & 5 deletions xds/internal/testutils/resource_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,10 @@ type TestResourceWatcher struct {

// OnResourceChanged is invoked by the xDS client to report the latest update
// or an error on the resource being watched.
func (w *TestResourceWatcher) OnResourceChanged(data xdsresource.ResourceData, err error, onDone xdsresource.OnDoneFunc) {
func (w *TestResourceWatcher) OnResourceChanged(update xdsresource.ResourceDataOrError, onDone xdsresource.OnDoneFunc) {
defer onDone()
if err != nil {
if xdsresource.ErrType(err) == xdsresource.ErrorTypeResourceNotFound {
if update.Err != nil {
if xdsresource.ErrType(update.Err) == xdsresource.ErrorTypeResourceNotFound {
select {
case <-w.ResourceDoesNotExistCh:
default:
Expand All @@ -52,15 +52,15 @@ func (w *TestResourceWatcher) OnResourceChanged(data xdsresource.ResourceData, e
case <-w.ErrorCh:
default:
}
w.ErrorCh <- err
w.ErrorCh <- update.Err
return

}
select {
case <-w.UpdateCh:
default:
}
w.UpdateCh <- &data
w.UpdateCh <- &update.Data
}

// OnAmbientError is invoked by the xDS client to report the latest error.
Expand Down
14 changes: 9 additions & 5 deletions xds/internal/xdsclient/authority.go
Original file line number Diff line number Diff line change
Expand Up @@ -388,7 +388,9 @@ func (a *authority) handleADSResourceUpdate(serverConfig *bootstrap.ServerConfig
watcher := watcher
resource := uErr.Resource
watcherCnt.Add(1)
funcsToSchedule = append(funcsToSchedule, func(context.Context) { watcher.OnResourceChanged(resource, nil, done) })
funcsToSchedule = append(funcsToSchedule, func(context.Context) {
watcher.OnResourceChanged(xdsresource.ResourceDataOrError{Data: resource}, done)
})
}
}

Expand Down Expand Up @@ -464,7 +466,7 @@ func (a *authority) handleADSResourceUpdate(serverConfig *bootstrap.ServerConfig
watcher := watcher
watcherCnt.Add(1)
funcsToSchedule = append(funcsToSchedule, func(context.Context) {
watcher.OnResourceChanged(nil, xdsresource.NewErrorf(xdsresource.ErrorTypeResourceNotFound, "xds: resource has been removed"), done)
watcher.OnResourceChanged(xdsresource.ResourceDataOrError{Err: xdsresource.NewErrorf(xdsresource.ErrorTypeResourceNotFound, "xds: resource has been removed")}, done)
})
}
}
Expand Down Expand Up @@ -508,7 +510,7 @@ func (a *authority) handleADSResourceDoesNotExist(rType xdsresource.Type, resour
for watcher := range state.watchers {
watcher := watcher
a.watcherCallbackSerializer.TrySchedule(func(context.Context) {
watcher.OnResourceChanged(nil, xdsresource.NewErrorf(xdsresource.ErrorTypeResourceNotFound, "xds: resource %s does not exist", rType.TypeName()), func() {})
watcher.OnResourceChanged(xdsresource.ResourceDataOrError{Err: xdsresource.NewErrorf(xdsresource.ErrorTypeResourceNotFound, "xds: resource %s does not exist", rType.TypeName())}, func() {})
})
}
}
Expand Down Expand Up @@ -645,7 +647,9 @@ func (a *authority) watchResource(rType xdsresource.Type, resourceName string, w
// xdsClientSerializer callback. Hence making a copy of the cached
// resource here for watchCallbackSerializer.
resource := state.cache
a.watcherCallbackSerializer.TrySchedule(func(context.Context) { watcher.OnResourceChanged(resource, nil, func() {}) })
a.watcherCallbackSerializer.TrySchedule(func(context.Context) {
watcher.OnResourceChanged(xdsresource.ResourceDataOrError{Data: resource}, func() {})
})
}
// If last update was NACK'd, notify the new watcher of error
// immediately as well.
Expand All @@ -663,7 +667,7 @@ func (a *authority) watchResource(rType xdsresource.Type, resourceName string, w
// server does not have this resource, notify the new watcher.
if state.md.Status == xdsresource.ServiceStatusNotExist {
a.watcherCallbackSerializer.TrySchedule(func(context.Context) {
watcher.OnResourceChanged(nil, xdsresource.NewErrorf(xdsresource.ErrorTypeResourceNotFound, "xds: resource %s does not exist", rType.TypeName()), func() {})
watcher.OnResourceChanged(xdsresource.ResourceDataOrError{Err: xdsresource.NewErrorf(xdsresource.ErrorTypeResourceNotFound, "xds: resource %s does not exist", rType.TypeName())}, func() {})
})
}
cleanup = a.unwatchResource(rType, resourceName, watcher)
Expand Down
6 changes: 4 additions & 2 deletions xds/internal/xdsclient/clientimpl_watchers.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,9 @@ func (c *clientImpl) WatchResource(rType xdsresource.Type, resourceName string,

if err := c.resourceTypes.maybeRegister(rType); err != nil {
logger.Warningf("Watch registered for name %q of type %q which is already registered", rType.TypeName(), resourceName)
c.serializer.TrySchedule(func(context.Context) { watcher.OnResourceChanged(nil, err, func() {}) })
c.serializer.TrySchedule(func(context.Context) {
watcher.OnResourceChanged(xdsresource.ResourceDataOrError{Err: err}, func() {})
})
return func() {}
}

Expand All @@ -54,7 +56,7 @@ func (c *clientImpl) WatchResource(rType xdsresource.Type, resourceName string,
if a == nil {
logger.Warningf("Watch registered for name %q of type %q, authority %q is not found", rType.TypeName(), resourceName, n.Authority)
c.serializer.TrySchedule(func(context.Context) {
watcher.OnResourceChanged(nil, fmt.Errorf("authority %q not found in bootstrap config for resource %q", n.Authority, resourceName), func() {})
watcher.OnResourceChanged(xdsresource.ResourceDataOrError{Err: fmt.Errorf("authority %q not found in bootstrap config for resource %q", n.Authority, resourceName)}, func() {})
})
return func() {}
}
Expand Down
6 changes: 3 additions & 3 deletions xds/internal/xdsclient/tests/ads_stream_flow_control_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,9 @@ func newBLockingListenerWatcher() *blockingListenerWatcher {
}
}

func (lw *blockingListenerWatcher) OnResourceChanged(update *xdsresource.ListenerResourceData, err error, done xdsresource.OnDoneFunc) {
if err != nil {
if xdsresource.ErrType(err) == xdsresource.ErrorTypeResourceNotFound {
func (lw *blockingListenerWatcher) OnResourceChanged(update *xdsresource.ResourceDataOrError, done xdsresource.OnDoneFunc) {
if update.Err != nil {
if xdsresource.ErrType(update.Err) == xdsresource.ErrorTypeResourceNotFound {
// Notify receipt of resource not found.
select {
case lw.notFoundCh <- struct{}{}:
Expand Down
11 changes: 6 additions & 5 deletions xds/internal/xdsclient/tests/cds_watchers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ import (

type noopClusterWatcher struct{}

func (noopClusterWatcher) OnResourceChanged(_ *xdsresource.ClusterResourceData, _ error, onDone xdsresource.OnDoneFunc) {
func (noopClusterWatcher) OnResourceChanged(_ *xdsresource.ResourceDataOrError, onDone xdsresource.OnDoneFunc) {
onDone()
}
func (noopClusterWatcher) OnAmbientError(_ error, onDone xdsresource.OnDoneFunc) {
Expand All @@ -64,13 +64,14 @@ func newClusterWatcher() *clusterWatcher {
return &clusterWatcher{updateCh: testutils.NewChannel()}
}

func (cw *clusterWatcher) OnResourceChanged(update *xdsresource.ClusterResourceData, err error, onDone xdsresource.OnDoneFunc) {
if err != nil {
cw.updateCh.Replace(clusterUpdateErrTuple{err: err})
func (cw *clusterWatcher) OnResourceChanged(update *xdsresource.ResourceDataOrError, onDone xdsresource.OnDoneFunc) {
if update.Err != nil {
cw.updateCh.Replace(clusterUpdateErrTuple{err: update.Err})
onDone()
return
}
cw.updateCh.Send(clusterUpdateErrTuple{update: update.Resource})
u := update.Data.(*xdsresource.ClusterResourceData)
cw.updateCh.Send(clusterUpdateErrTuple{update: u.Resource})
onDone()
}

Expand Down
11 changes: 6 additions & 5 deletions xds/internal/xdsclient/tests/eds_watchers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ const (

type noopEndpointsWatcher struct{}

func (noopEndpointsWatcher) OnResourceChanged(_ *xdsresource.EndpointsResourceData, _ error, onDone xdsresource.OnDoneFunc) {
func (noopEndpointsWatcher) OnResourceChanged(_ *xdsresource.ResourceDataOrError, onDone xdsresource.OnDoneFunc) {
onDone()
}
func (noopEndpointsWatcher) OnAmbientError(_ error, onDone xdsresource.OnDoneFunc) {
Expand All @@ -76,13 +76,14 @@ func newEndpointsWatcher() *endpointsWatcher {
return &endpointsWatcher{updateCh: testutils.NewChannel()}
}

func (ew *endpointsWatcher) OnResourceChanged(update *xdsresource.EndpointsResourceData, err error, onDone xdsresource.OnDoneFunc) {
if err != nil {
ew.updateCh.Replace(endpointsUpdateErrTuple{err: err})
func (ew *endpointsWatcher) OnResourceChanged(update *xdsresource.ResourceDataOrError, onDone xdsresource.OnDoneFunc) {
if update.Err != nil {
ew.updateCh.Replace(endpointsUpdateErrTuple{err: update.Err})
onDone()
return
}
ew.updateCh.Send(endpointsUpdateErrTuple{update: update.Resource})
u := update.Data.(*xdsresource.EndpointsResourceData)
ew.updateCh.Send(endpointsUpdateErrTuple{update: u.Resource})
onDone()
}

Expand Down
Loading

0 comments on commit 4c7a204

Please sign in to comment.