diff --git a/xds/csds/csds_e2e_test.go b/xds/csds/csds_e2e_test.go index bd11580bb640..7d75a2f83339 100644 --- a/xds/csds/csds_e2e_test.go +++ b/xds/csds/csds_e2e_test.go @@ -70,7 +70,7 @@ func Test(t *testing.T) { type nopListenerWatcher struct{} -func (nopListenerWatcher) OnResourceChanged(_ *xdsresource.ListenerResourceData, _ error, onDone xdsresource.OnDoneFunc) { +func (nopListenerWatcher) OnResourceChanged(_ *xdsresource.ResourceDataOrError, onDone xdsresource.OnDoneFunc) { onDone() } func (nopListenerWatcher) OnAmbientError(_ error, onDone xdsresource.OnDoneFunc) { @@ -79,7 +79,7 @@ func (nopListenerWatcher) OnAmbientError(_ error, onDone xdsresource.OnDoneFunc) type nopRouteConfigWatcher struct{} -func (nopRouteConfigWatcher) OnResourceChanged(_ *xdsresource.RouteConfigResourceData, _ error, onDone xdsresource.OnDoneFunc) { +func (nopRouteConfigWatcher) OnResourceChanged(_ *xdsresource.ResourceDataOrError, onDone xdsresource.OnDoneFunc) { onDone() } func (nopRouteConfigWatcher) OnAmbientError(_ error, onDone xdsresource.OnDoneFunc) { @@ -88,7 +88,7 @@ func (nopRouteConfigWatcher) OnAmbientError(_ error, onDone xdsresource.OnDoneFu type nopClusterWatcher struct{} -func (nopClusterWatcher) OnResourceChanged(_ *xdsresource.ClusterResourceData, _ error, onDone xdsresource.OnDoneFunc) { +func (nopClusterWatcher) OnResourceChanged(_ *xdsresource.ResourceDataOrError, onDone xdsresource.OnDoneFunc) { onDone() } func (nopClusterWatcher) OnAmbientError(_ error, onDone xdsresource.OnDoneFunc) { @@ -97,7 +97,7 @@ func (nopClusterWatcher) OnAmbientError(_ error, onDone xdsresource.OnDoneFunc) type nopEndpointsWatcher struct{} -func (nopEndpointsWatcher) OnResourceChanged(_ *xdsresource.EndpointsResourceData, _ error, onDone xdsresource.OnDoneFunc) { +func (nopEndpointsWatcher) OnResourceChanged(_ *xdsresource.ResourceDataOrError, onDone xdsresource.OnDoneFunc) { onDone() } func (nopEndpointsWatcher) OnAmbientError(_ error, onDone xdsresource.OnDoneFunc) { @@ -125,7 +125,7 @@ func newBlockingListenerWatcher(testCtxDone <-chan struct{}) *blockingListenerWa } } -func (w *blockingListenerWatcher) OnResourceChanged(_ *xdsresource.ListenerResourceData, _ error, onDone xdsresource.OnDoneFunc) { +func (w *blockingListenerWatcher) OnResourceChanged(_ *xdsresource.ResourceDataOrError, onDone xdsresource.OnDoneFunc) { writeOnDone(w.testCtxDone, w.onDoneCh, onDone) } func (w *blockingListenerWatcher) OnAmbientError(_ error, onDone xdsresource.OnDoneFunc) { diff --git a/xds/internal/balancer/cdsbalancer/cluster_watcher.go b/xds/internal/balancer/cdsbalancer/cluster_watcher.go index e6d6c6d0d34a..f6aeff1f7ef0 100644 --- a/xds/internal/balancer/cdsbalancer/cluster_watcher.go +++ b/xds/internal/balancer/cdsbalancer/cluster_watcher.go @@ -32,13 +32,14 @@ type clusterWatcher struct { parent *cdsBalancer } -func (cw *clusterWatcher) OnResourceChanged(u *xdsresource.ClusterResourceData, err error, onDone xdsresource.OnDoneFunc) { - if err != nil { - handleError := func(context.Context) { cw.parent.onClusterResourceChangedError(cw.name, err); onDone() } +func (cw *clusterWatcher) OnResourceChanged(u *xdsresource.ResourceDataOrError, onDone xdsresource.OnDoneFunc) { + if u.Err != nil { + handleError := func(context.Context) { cw.parent.onClusterResourceChangedError(cw.name, u.Err); onDone() } cw.parent.serializer.ScheduleOr(handleError, onDone) return } - handleUpdate := func(context.Context) { cw.parent.onClusterUpdate(cw.name, u.Resource); onDone() } + update := u.Data.(*xdsresource.ClusterResourceData) + handleUpdate := func(context.Context) { cw.parent.onClusterUpdate(cw.name, update.Resource); onDone() } cw.parent.serializer.ScheduleOr(handleUpdate, onDone) } diff --git a/xds/internal/balancer/clusterresolver/resource_resolver_eds.go b/xds/internal/balancer/clusterresolver/resource_resolver_eds.go index 7ad3628ccc22..9bd551331a31 100644 --- a/xds/internal/balancer/clusterresolver/resource_resolver_eds.go +++ b/xds/internal/balancer/clusterresolver/resource_resolver_eds.go @@ -76,18 +76,18 @@ func newEDSResolver(nameToWatch string, producer xdsresource.Producer, topLevelR } // OnUpdate is invoked to report an update for the resource being watched. -func (er *edsDiscoveryMechanism) OnResourceChanged(update *xdsresource.EndpointsResourceData, err error, onDone xdsresource.OnDoneFunc) { +func (er *edsDiscoveryMechanism) OnResourceChanged(update *xdsresource.ResourceDataOrError, onDone xdsresource.OnDoneFunc) { if er.stopped.HasFired() { onDone() return } - if err != nil { + if update.Err != nil { if er.logger.V(2) { - if xdsresource.ErrType(err) == xdsresource.ErrorTypeResourceNotFound { + if xdsresource.ErrType(update.Err) == xdsresource.ErrorTypeResourceNotFound { er.logger.Infof("EDS discovery mechanism for resource %q reported resource-does-not-exist error", er.nameToWatch) } else { - er.logger.Infof("EDS discovery mechanism for resource %q reported on resource changed error: %v", er.nameToWatch, err) + er.logger.Infof("EDS discovery mechanism for resource %q reported on resource changed error: %v", er.nameToWatch, update.Err) } } // Report an empty update that would result in no priority child being @@ -105,7 +105,8 @@ func (er *edsDiscoveryMechanism) OnResourceChanged(update *xdsresource.Endpoints } er.mu.Lock() - er.update = &update.Resource + u := update.Data.(*xdsresource.EndpointsResourceData) + er.update = &u.Resource er.mu.Unlock() er.topLevelResolver.onUpdate(onDone) diff --git a/xds/internal/resolver/watch_service.go b/xds/internal/resolver/watch_service.go index 6b716ea08b4c..cddd571cf4f3 100644 --- a/xds/internal/resolver/watch_service.go +++ b/xds/internal/resolver/watch_service.go @@ -36,13 +36,14 @@ func newListenerWatcher(resourceName string, parent *xdsResolver) *listenerWatch return lw } -func (l *listenerWatcher) OnResourceChanged(update *xdsresource.ListenerResourceData, err error, onDone xdsresource.OnDoneFunc) { - if err != nil { - handleError := func(context.Context) { l.parent.onListenerResourceChangedError(err); onDone() } +func (l *listenerWatcher) OnResourceChanged(update *xdsresource.ResourceDataOrError, onDone xdsresource.OnDoneFunc) { + if update.Err != nil { + handleError := func(context.Context) { l.parent.onListenerResourceChangedError(update.Err); onDone() } l.parent.serializer.ScheduleOr(handleError, onDone) return } - handleUpdate := func(context.Context) { l.parent.onListenerResourceUpdate(update.Resource); onDone() } + u := update.Data.(*xdsresource.ListenerResourceData) + handleUpdate := func(context.Context) { l.parent.onListenerResourceUpdate(u.Resource); onDone() } l.parent.serializer.ScheduleOr(handleUpdate, onDone) } @@ -68,14 +69,15 @@ func newRouteConfigWatcher(resourceName string, parent *xdsResolver) *routeConfi return rw } -func (r *routeConfigWatcher) OnResourceChanged(u *xdsresource.RouteConfigResourceData, err error, onDone xdsresource.OnDoneFunc) { - if err != nil { - handleError := func(context.Context) { r.parent.onRouteConfigResourceChangedError(r.resourceName, err); onDone() } +func (r *routeConfigWatcher) OnResourceChanged(u *xdsresource.ResourceDataOrError, onDone xdsresource.OnDoneFunc) { + if u.Err != nil { + handleError := func(context.Context) { r.parent.onRouteConfigResourceChangedError(r.resourceName, u.Err); onDone() } r.parent.serializer.ScheduleOr(handleError, onDone) return } handleUpdate := func(context.Context) { - r.parent.onRouteConfigResourceUpdate(r.resourceName, u.Resource) + update := u.Data.(*xdsresource.RouteConfigResourceData) + r.parent.onRouteConfigResourceUpdate(r.resourceName, update.Resource) onDone() } r.parent.serializer.ScheduleOr(handleUpdate, onDone) diff --git a/xds/internal/server/listener_wrapper.go b/xds/internal/server/listener_wrapper.go index a820a921afa1..19fe2acfe957 100644 --- a/xds/internal/server/listener_wrapper.go +++ b/xds/internal/server/listener_wrapper.go @@ -414,28 +414,30 @@ type ldsWatcher struct { name string } -func (lw *ldsWatcher) OnResourceChanged(update *xdsresource.ListenerResourceData, err error, onDone xdsresource.OnDoneFunc) { +func (lw *ldsWatcher) OnResourceChanged(update *xdsresource.ResourceDataOrError, onDone xdsresource.OnDoneFunc) { defer onDone() if lw.parent.closed.HasFired() { - if err != nil { - lw.logger.Warningf("Resource %q received err: %#v after listener was closed", lw.name, err) + if update.Err != nil { + lw.logger.Warningf("Resource %q received err: %#v after listener was closed", lw.name, update.Err) } else { lw.logger.Warningf("Resource %q received update: %#v after listener was closed", lw.name, update) } return } if lw.logger.V(2) { - if err != nil { - lw.logger.Infof("LDS watch for resource %q received error: %#v", lw.name, err) + if update.Err != nil { + lw.logger.Infof("LDS watch for resource %q received error: %#v", lw.name, update.Err) } else { - lw.logger.Infof("LDS watch for resource %q received update: %#v", lw.name, update.Resource) + u := update.Data.(*xdsresource.ListenerResourceData) + lw.logger.Infof("LDS watch for resource %q received update: %#v", lw.name, u.Resource) } } - if err != nil { - lw.parent.onLDSResourceChangedError(err) + if update.Err != nil { + lw.parent.onLDSResourceChangedError(update.Err) return } - lw.parent.handleLDSUpdate(update.Resource) + u := update.Data.(*xdsresource.ListenerResourceData) + lw.parent.handleLDSUpdate(u.Resource) } func (lw *ldsWatcher) OnAmbientError(err error, onDone xdsresource.OnDoneFunc) { diff --git a/xds/internal/server/rds_handler.go b/xds/internal/server/rds_handler.go index 998145b32767..90ba071ec226 100644 --- a/xds/internal/server/rds_handler.go +++ b/xds/internal/server/rds_handler.go @@ -147,7 +147,7 @@ type rdsWatcher struct { canceled bool // eats callbacks if true } -func (rw *rdsWatcher) OnResourceChanged(update *xdsresource.RouteConfigResourceData, err error, onDone xdsresource.OnDoneFunc) { +func (rw *rdsWatcher) OnResourceChanged(update *xdsresource.ResourceDataOrError, onDone xdsresource.OnDoneFunc) { defer onDone() rw.mu.Lock() if rw.canceled { @@ -156,17 +156,19 @@ func (rw *rdsWatcher) OnResourceChanged(update *xdsresource.RouteConfigResourceD } rw.mu.Unlock() if rw.logger.V(2) { - if err != nil { - rw.logger.Infof("RDS watch for resource %q received error: %#v", rw.routeName, err) + if update.Err != nil { + rw.logger.Infof("RDS watch for resource %q received error: %#v", rw.routeName, update.Err) } else { - rw.logger.Infof("RDS watch for resource %q received update: %#v", rw.routeName, update.Resource) + u := update.Data.(*xdsresource.RouteConfigResourceData) + rw.logger.Infof("RDS watch for resource %q received update: %#v", rw.routeName, u.Resource) } } - if err != nil { - rw.parent.handleRouteUpdate(rw.routeName, rdsWatcherUpdate{err: err}) + if update.Err != nil { + rw.parent.handleRouteUpdate(rw.routeName, rdsWatcherUpdate{err: update.Err}) return } - rw.parent.handleRouteUpdate(rw.routeName, rdsWatcherUpdate{data: &update.Resource}) + u := update.Data.(*xdsresource.RouteConfigResourceData) + rw.parent.handleRouteUpdate(rw.routeName, rdsWatcherUpdate{data: &u.Resource}) } func (rw *rdsWatcher) OnAmbientError(err error, onDone xdsresource.OnDoneFunc) { diff --git a/xds/internal/testutils/resource_watcher.go b/xds/internal/testutils/resource_watcher.go index 522b5d9f37a9..1c9fa5143d43 100644 --- a/xds/internal/testutils/resource_watcher.go +++ b/xds/internal/testutils/resource_watcher.go @@ -37,10 +37,10 @@ type TestResourceWatcher struct { // OnResourceChanged is invoked by the xDS client to report the latest update // or an error on the resource being watched. -func (w *TestResourceWatcher) OnResourceChanged(data xdsresource.ResourceData, err error, onDone xdsresource.OnDoneFunc) { +func (w *TestResourceWatcher) OnResourceChanged(update xdsresource.ResourceDataOrError, onDone xdsresource.OnDoneFunc) { defer onDone() - if err != nil { - if xdsresource.ErrType(err) == xdsresource.ErrorTypeResourceNotFound { + if update.Err != nil { + if xdsresource.ErrType(update.Err) == xdsresource.ErrorTypeResourceNotFound { select { case <-w.ResourceDoesNotExistCh: default: @@ -52,7 +52,7 @@ func (w *TestResourceWatcher) OnResourceChanged(data xdsresource.ResourceData, e case <-w.ErrorCh: default: } - w.ErrorCh <- err + w.ErrorCh <- update.Err return } @@ -60,7 +60,7 @@ func (w *TestResourceWatcher) OnResourceChanged(data xdsresource.ResourceData, e case <-w.UpdateCh: default: } - w.UpdateCh <- &data + w.UpdateCh <- &update.Data } // OnAmbientError is invoked by the xDS client to report the latest error. diff --git a/xds/internal/xdsclient/authority.go b/xds/internal/xdsclient/authority.go index 3313b9e45df8..977cdfa71e78 100644 --- a/xds/internal/xdsclient/authority.go +++ b/xds/internal/xdsclient/authority.go @@ -388,7 +388,9 @@ func (a *authority) handleADSResourceUpdate(serverConfig *bootstrap.ServerConfig watcher := watcher resource := uErr.Resource watcherCnt.Add(1) - funcsToSchedule = append(funcsToSchedule, func(context.Context) { watcher.OnResourceChanged(resource, nil, done) }) + funcsToSchedule = append(funcsToSchedule, func(context.Context) { + watcher.OnResourceChanged(xdsresource.ResourceDataOrError{Data: resource}, done) + }) } } @@ -464,7 +466,7 @@ func (a *authority) handleADSResourceUpdate(serverConfig *bootstrap.ServerConfig watcher := watcher watcherCnt.Add(1) funcsToSchedule = append(funcsToSchedule, func(context.Context) { - watcher.OnResourceChanged(nil, xdsresource.NewErrorf(xdsresource.ErrorTypeResourceNotFound, "xds: resource has been removed"), done) + watcher.OnResourceChanged(xdsresource.ResourceDataOrError{Err: xdsresource.NewErrorf(xdsresource.ErrorTypeResourceNotFound, "xds: resource has been removed")}, done) }) } } @@ -508,7 +510,7 @@ func (a *authority) handleADSResourceDoesNotExist(rType xdsresource.Type, resour for watcher := range state.watchers { watcher := watcher a.watcherCallbackSerializer.TrySchedule(func(context.Context) { - watcher.OnResourceChanged(nil, xdsresource.NewErrorf(xdsresource.ErrorTypeResourceNotFound, "xds: resource %s does not exist", rType.TypeName()), func() {}) + watcher.OnResourceChanged(xdsresource.ResourceDataOrError{Err: xdsresource.NewErrorf(xdsresource.ErrorTypeResourceNotFound, "xds: resource %s does not exist", rType.TypeName())}, func() {}) }) } } @@ -645,7 +647,9 @@ func (a *authority) watchResource(rType xdsresource.Type, resourceName string, w // xdsClientSerializer callback. Hence making a copy of the cached // resource here for watchCallbackSerializer. resource := state.cache - a.watcherCallbackSerializer.TrySchedule(func(context.Context) { watcher.OnResourceChanged(resource, nil, func() {}) }) + a.watcherCallbackSerializer.TrySchedule(func(context.Context) { + watcher.OnResourceChanged(xdsresource.ResourceDataOrError{Data: resource}, func() {}) + }) } // If last update was NACK'd, notify the new watcher of error // immediately as well. @@ -663,7 +667,7 @@ func (a *authority) watchResource(rType xdsresource.Type, resourceName string, w // server does not have this resource, notify the new watcher. if state.md.Status == xdsresource.ServiceStatusNotExist { a.watcherCallbackSerializer.TrySchedule(func(context.Context) { - watcher.OnResourceChanged(nil, xdsresource.NewErrorf(xdsresource.ErrorTypeResourceNotFound, "xds: resource %s does not exist", rType.TypeName()), func() {}) + watcher.OnResourceChanged(xdsresource.ResourceDataOrError{Err: xdsresource.NewErrorf(xdsresource.ErrorTypeResourceNotFound, "xds: resource %s does not exist", rType.TypeName())}, func() {}) }) } cleanup = a.unwatchResource(rType, resourceName, watcher) diff --git a/xds/internal/xdsclient/clientimpl_watchers.go b/xds/internal/xdsclient/clientimpl_watchers.go index b21f89131296..22292dbbdc7d 100644 --- a/xds/internal/xdsclient/clientimpl_watchers.go +++ b/xds/internal/xdsclient/clientimpl_watchers.go @@ -45,7 +45,9 @@ func (c *clientImpl) WatchResource(rType xdsresource.Type, resourceName string, if err := c.resourceTypes.maybeRegister(rType); err != nil { logger.Warningf("Watch registered for name %q of type %q which is already registered", rType.TypeName(), resourceName) - c.serializer.TrySchedule(func(context.Context) { watcher.OnResourceChanged(nil, err, func() {}) }) + c.serializer.TrySchedule(func(context.Context) { + watcher.OnResourceChanged(xdsresource.ResourceDataOrError{Err: err}, func() {}) + }) return func() {} } @@ -54,7 +56,7 @@ func (c *clientImpl) WatchResource(rType xdsresource.Type, resourceName string, if a == nil { logger.Warningf("Watch registered for name %q of type %q, authority %q is not found", rType.TypeName(), resourceName, n.Authority) c.serializer.TrySchedule(func(context.Context) { - watcher.OnResourceChanged(nil, fmt.Errorf("authority %q not found in bootstrap config for resource %q", n.Authority, resourceName), func() {}) + watcher.OnResourceChanged(xdsresource.ResourceDataOrError{Err: fmt.Errorf("authority %q not found in bootstrap config for resource %q", n.Authority, resourceName)}, func() {}) }) return func() {} } diff --git a/xds/internal/xdsclient/tests/ads_stream_flow_control_test.go b/xds/internal/xdsclient/tests/ads_stream_flow_control_test.go index ffa6cdf09d03..ee6b610ecb55 100644 --- a/xds/internal/xdsclient/tests/ads_stream_flow_control_test.go +++ b/xds/internal/xdsclient/tests/ads_stream_flow_control_test.go @@ -60,9 +60,9 @@ func newBLockingListenerWatcher() *blockingListenerWatcher { } } -func (lw *blockingListenerWatcher) OnResourceChanged(update *xdsresource.ListenerResourceData, err error, done xdsresource.OnDoneFunc) { - if err != nil { - if xdsresource.ErrType(err) == xdsresource.ErrorTypeResourceNotFound { +func (lw *blockingListenerWatcher) OnResourceChanged(update *xdsresource.ResourceDataOrError, done xdsresource.OnDoneFunc) { + if update.Err != nil { + if xdsresource.ErrType(update.Err) == xdsresource.ErrorTypeResourceNotFound { // Notify receipt of resource not found. select { case lw.notFoundCh <- struct{}{}: diff --git a/xds/internal/xdsclient/tests/cds_watchers_test.go b/xds/internal/xdsclient/tests/cds_watchers_test.go index 165ca0057b6b..229e821d9d5a 100644 --- a/xds/internal/xdsclient/tests/cds_watchers_test.go +++ b/xds/internal/xdsclient/tests/cds_watchers_test.go @@ -44,7 +44,7 @@ import ( type noopClusterWatcher struct{} -func (noopClusterWatcher) OnResourceChanged(_ *xdsresource.ClusterResourceData, _ error, onDone xdsresource.OnDoneFunc) { +func (noopClusterWatcher) OnResourceChanged(_ *xdsresource.ResourceDataOrError, onDone xdsresource.OnDoneFunc) { onDone() } func (noopClusterWatcher) OnAmbientError(_ error, onDone xdsresource.OnDoneFunc) { @@ -64,13 +64,14 @@ func newClusterWatcher() *clusterWatcher { return &clusterWatcher{updateCh: testutils.NewChannel()} } -func (cw *clusterWatcher) OnResourceChanged(update *xdsresource.ClusterResourceData, err error, onDone xdsresource.OnDoneFunc) { - if err != nil { - cw.updateCh.Replace(clusterUpdateErrTuple{err: err}) +func (cw *clusterWatcher) OnResourceChanged(update *xdsresource.ResourceDataOrError, onDone xdsresource.OnDoneFunc) { + if update.Err != nil { + cw.updateCh.Replace(clusterUpdateErrTuple{err: update.Err}) onDone() return } - cw.updateCh.Send(clusterUpdateErrTuple{update: update.Resource}) + u := update.Data.(*xdsresource.ClusterResourceData) + cw.updateCh.Send(clusterUpdateErrTuple{update: u.Resource}) onDone() } diff --git a/xds/internal/xdsclient/tests/eds_watchers_test.go b/xds/internal/xdsclient/tests/eds_watchers_test.go index c6506ddf408a..12b9b004b76d 100644 --- a/xds/internal/xdsclient/tests/eds_watchers_test.go +++ b/xds/internal/xdsclient/tests/eds_watchers_test.go @@ -53,7 +53,7 @@ const ( type noopEndpointsWatcher struct{} -func (noopEndpointsWatcher) OnResourceChanged(_ *xdsresource.EndpointsResourceData, _ error, onDone xdsresource.OnDoneFunc) { +func (noopEndpointsWatcher) OnResourceChanged(_ *xdsresource.ResourceDataOrError, onDone xdsresource.OnDoneFunc) { onDone() } func (noopEndpointsWatcher) OnAmbientError(_ error, onDone xdsresource.OnDoneFunc) { @@ -76,13 +76,14 @@ func newEndpointsWatcher() *endpointsWatcher { return &endpointsWatcher{updateCh: testutils.NewChannel()} } -func (ew *endpointsWatcher) OnResourceChanged(update *xdsresource.EndpointsResourceData, err error, onDone xdsresource.OnDoneFunc) { - if err != nil { - ew.updateCh.Replace(endpointsUpdateErrTuple{err: err}) +func (ew *endpointsWatcher) OnResourceChanged(update *xdsresource.ResourceDataOrError, onDone xdsresource.OnDoneFunc) { + if update.Err != nil { + ew.updateCh.Replace(endpointsUpdateErrTuple{err: update.Err}) onDone() return } - ew.updateCh.Send(endpointsUpdateErrTuple{update: update.Resource}) + u := update.Data.(*xdsresource.EndpointsResourceData) + ew.updateCh.Send(endpointsUpdateErrTuple{update: u.Resource}) onDone() } diff --git a/xds/internal/xdsclient/tests/lds_watchers_test.go b/xds/internal/xdsclient/tests/lds_watchers_test.go index b03e296e207e..ac913e01512f 100644 --- a/xds/internal/xdsclient/tests/lds_watchers_test.go +++ b/xds/internal/xdsclient/tests/lds_watchers_test.go @@ -48,7 +48,7 @@ import ( type noopListenerWatcher struct{} -func (noopListenerWatcher) OnResourceChanged(_ *xdsresource.ListenerResourceData, _ error, onDone xdsresource.OnDoneFunc) { +func (noopListenerWatcher) OnResourceChanged(_ *xdsresource.ResourceDataOrError, onDone xdsresource.OnDoneFunc) { onDone() } func (noopListenerWatcher) OnAmbientError(_ error, onDone xdsresource.OnDoneFunc) { @@ -68,13 +68,14 @@ func newListenerWatcher() *listenerWatcher { return &listenerWatcher{updateCh: testutils.NewChannel()} } -func (lw *listenerWatcher) OnResourceChanged(update *xdsresource.ListenerResourceData, err error, onDone xdsresource.OnDoneFunc) { - if err != nil { - lw.updateCh.Replace(listenerUpdateErrTuple{err: err}) +func (lw *listenerWatcher) OnResourceChanged(update *xdsresource.ResourceDataOrError, onDone xdsresource.OnDoneFunc) { + if update.Err != nil { + lw.updateCh.Replace(listenerUpdateErrTuple{err: update.Err}) onDone() return } - lw.updateCh.Send(listenerUpdateErrTuple{update: update.Resource}) + u := update.Data.(*xdsresource.ListenerResourceData) + lw.updateCh.Send(listenerUpdateErrTuple{update: u.Resource}) onDone() } @@ -97,13 +98,14 @@ func newListenerWatcherMultiple(size int) *listenerWatcherMultiple { return &listenerWatcherMultiple{updateCh: testutils.NewChannelWithSize(size)} } -func (lw *listenerWatcherMultiple) OnResourceChanged(update *xdsresource.ListenerResourceData, err error, onDone xdsresource.OnDoneFunc) { - if err != nil { - lw.updateCh.Send(listenerUpdateErrTuple{err: err}) +func (lw *listenerWatcherMultiple) OnResourceChanged(update *xdsresource.ResourceDataOrError, onDone xdsresource.OnDoneFunc) { + if update.Err != nil { + lw.updateCh.Send(listenerUpdateErrTuple{err: update.Err}) onDone() return } - lw.updateCh.Send(listenerUpdateErrTuple{update: update.Resource}) + u := update.Data.(*xdsresource.ListenerResourceData) + lw.updateCh.Send(listenerUpdateErrTuple{update: u.Resource}) onDone() } diff --git a/xds/internal/xdsclient/tests/misc_watchers_test.go b/xds/internal/xdsclient/tests/misc_watchers_test.go index 76e764421730..a77b59ae490a 100644 --- a/xds/internal/xdsclient/tests/misc_watchers_test.go +++ b/xds/internal/xdsclient/tests/misc_watchers_test.go @@ -69,13 +69,14 @@ func newTestRouteConfigWatcher(client xdsclient.XDSClient, name1, name2 string) } } -func (rw *testRouteConfigWatcher) OnResourceChanged(update *xdsresource.RouteConfigResourceData, err error, onDone xdsresource.OnDoneFunc) { - if err != nil { - rw.updateCh.Replace(routeConfigUpdateErrTuple{err: err}) +func (rw *testRouteConfigWatcher) OnResourceChanged(update *xdsresource.ResourceDataOrError, onDone xdsresource.OnDoneFunc) { + if update.Err != nil { + rw.updateCh.Replace(routeConfigUpdateErrTuple{err: update.Err}) onDone() return } - rw.updateCh.Send(routeConfigUpdateErrTuple{update: update.Resource}) + rc := update.Data.(*xdsresource.RouteConfigResourceData) + rw.updateCh.Send(routeConfigUpdateErrTuple{update: rc.Resource}) rw.cancel1 = xdsresource.WatchRouteConfig(rw.client, rw.name1, rw.rcw1) rw.cancel2 = xdsresource.WatchRouteConfig(rw.client, rw.name2, rw.rcw2) diff --git a/xds/internal/xdsclient/tests/rds_watchers_test.go b/xds/internal/xdsclient/tests/rds_watchers_test.go index dfb161bb69a5..1facba7afbce 100644 --- a/xds/internal/xdsclient/tests/rds_watchers_test.go +++ b/xds/internal/xdsclient/tests/rds_watchers_test.go @@ -43,7 +43,7 @@ import ( type noopRouteConfigWatcher struct{} -func (noopRouteConfigWatcher) OnResourceChanged(_ *xdsresource.RouteConfigResourceData, _ error, onDone xdsresource.OnDoneFunc) { +func (noopRouteConfigWatcher) OnResourceChanged(_ *xdsresource.ResourceDataOrError, onDone xdsresource.OnDoneFunc) { onDone() } func (noopRouteConfigWatcher) OnAmbientError(_ error, onDone xdsresource.OnDoneFunc) { @@ -63,13 +63,14 @@ func newRouteConfigWatcher() *routeConfigWatcher { return &routeConfigWatcher{updateCh: testutils.NewChannel()} } -func (rw *routeConfigWatcher) OnResourceChanged(update *xdsresource.RouteConfigResourceData, err error, onDone xdsresource.OnDoneFunc) { - if err != nil { - rw.updateCh.Replace(routeConfigUpdateErrTuple{err: err}) +func (rw *routeConfigWatcher) OnResourceChanged(update *xdsresource.ResourceDataOrError, onDone xdsresource.OnDoneFunc) { + if update.Err != nil { + rw.updateCh.Replace(routeConfigUpdateErrTuple{err: update.Err}) onDone() return } - rw.updateCh.Send(routeConfigUpdateErrTuple{update: update.Resource}) + rc := update.Data.(*xdsresource.RouteConfigResourceData) + rw.updateCh.Send(routeConfigUpdateErrTuple{update: rc.Resource}) onDone() } diff --git a/xds/internal/xdsclient/xdsresource/cluster_resource_type.go b/xds/internal/xdsclient/xdsresource/cluster_resource_type.go index 0e43f0261cd4..1c45867a419e 100644 --- a/xds/internal/xdsclient/xdsresource/cluster_resource_type.go +++ b/xds/internal/xdsclient/xdsresource/cluster_resource_type.go @@ -121,7 +121,7 @@ type ClusterWatcher interface { // - resource validation error (if resource is not cached) // - ADS stream failure (if resource is not cached) // - connection failure (if resource is not cached) - OnResourceChanged(*ClusterResourceData, error, OnDoneFunc) + OnResourceChanged(*ResourceDataOrError, OnDoneFunc) // If resource is already cached, it is invoked under different error // conditions including but not limited to the following: @@ -135,13 +135,13 @@ type delegatingClusterWatcher struct { watcher ClusterWatcher } -func (d *delegatingClusterWatcher) OnResourceChanged(data ResourceData, err error, onDone OnDoneFunc) { - if err != nil { - d.watcher.OnResourceChanged(nil, err, onDone) +func (d *delegatingClusterWatcher) OnResourceChanged(update ResourceDataOrError, onDone OnDoneFunc) { + if update.Err != nil { + d.watcher.OnResourceChanged(&ResourceDataOrError{Err: update.Err}, onDone) return } - c := data.(*ClusterResourceData) - d.watcher.OnResourceChanged(c, nil, onDone) + c := update.Data.(*ClusterResourceData) + d.watcher.OnResourceChanged(&ResourceDataOrError{Data: c}, onDone) } func (d *delegatingClusterWatcher) OnAmbientError(err error, onDone OnDoneFunc) { diff --git a/xds/internal/xdsclient/xdsresource/endpoints_resource_type.go b/xds/internal/xdsclient/xdsresource/endpoints_resource_type.go index 2f0faf5b70aa..f92dc1a2734b 100644 --- a/xds/internal/xdsclient/xdsresource/endpoints_resource_type.go +++ b/xds/internal/xdsclient/xdsresource/endpoints_resource_type.go @@ -117,7 +117,7 @@ type EndpointsWatcher interface { // - resource validation error (if resource is not cached) // - ADS stream failure (if resource is not cached) // - connection failure (if resource is not cached) - OnResourceChanged(*EndpointsResourceData, error, OnDoneFunc) + OnResourceChanged(*ResourceDataOrError, OnDoneFunc) // If resource is already cached, it is invoked under different error // conditions including but not limited to the following: @@ -131,13 +131,13 @@ type delegatingEndpointsWatcher struct { watcher EndpointsWatcher } -func (d *delegatingEndpointsWatcher) OnResourceChanged(data ResourceData, err error, onDone OnDoneFunc) { - if err != nil { - d.watcher.OnResourceChanged(nil, err, onDone) +func (d *delegatingEndpointsWatcher) OnResourceChanged(update ResourceDataOrError, onDone OnDoneFunc) { + if update.Err != nil { + d.watcher.OnResourceChanged(&ResourceDataOrError{Err: update.Err}, onDone) return } - e := data.(*EndpointsResourceData) - d.watcher.OnResourceChanged(e, nil, onDone) + e := update.Data.(*EndpointsResourceData) + d.watcher.OnResourceChanged(&ResourceDataOrError{Data: e}, onDone) } func (d *delegatingEndpointsWatcher) OnAmbientError(err error, onDone OnDoneFunc) { diff --git a/xds/internal/xdsclient/xdsresource/listener_resource_type.go b/xds/internal/xdsclient/xdsresource/listener_resource_type.go index 07ddd5ae1bfc..af5a8564924d 100644 --- a/xds/internal/xdsclient/xdsresource/listener_resource_type.go +++ b/xds/internal/xdsclient/xdsresource/listener_resource_type.go @@ -154,7 +154,7 @@ type ListenerWatcher interface { // - resource validation error (if resource is not cached) // - ADS stream failure (if resource is not cached) // - connection failure (if resource is not cached) - OnResourceChanged(*ListenerResourceData, error, OnDoneFunc) + OnResourceChanged(*ResourceDataOrError, OnDoneFunc) // If resource is already cached, it is invoked under different error // conditions including but not limited to the following: @@ -168,13 +168,13 @@ type delegatingListenerWatcher struct { watcher ListenerWatcher } -func (d *delegatingListenerWatcher) OnResourceChanged(data ResourceData, err error, onDone OnDoneFunc) { - if err != nil { - d.watcher.OnResourceChanged(nil, err, onDone) +func (d *delegatingListenerWatcher) OnResourceChanged(update ResourceDataOrError, onDone OnDoneFunc) { + if update.Err != nil { + d.watcher.OnResourceChanged(&ResourceDataOrError{Err: update.Err}, onDone) return } - l := data.(*ListenerResourceData) - d.watcher.OnResourceChanged(l, nil, onDone) + l := update.Data.(*ListenerResourceData) + d.watcher.OnResourceChanged(&ResourceDataOrError{Data: l}, onDone) } func (d *delegatingListenerWatcher) OnAmbientError(err error, onDone OnDoneFunc) { diff --git a/xds/internal/xdsclient/xdsresource/resource_type.go b/xds/internal/xdsclient/xdsresource/resource_type.go index 55b5f4a88430..19542b8c1096 100644 --- a/xds/internal/xdsclient/xdsresource/resource_type.go +++ b/xds/internal/xdsclient/xdsresource/resource_type.go @@ -58,18 +58,25 @@ type Producer interface { // from the xDS server. type OnDoneFunc func() -// ResourceWatcher is an interface that can to be implemented to wrap the -// callbacks to be invoked for different events corresponding to the resource -// being watched. +// ResourceDataOrError is a struct that contains either ResourceData or error. +// It is used to represent the result of an xDS resource update. Exactly one of +// Data or Err will be non-nil. +type ResourceDataOrError struct { + Data ResourceData + Err error +} + +// ResourceWatcher wraps the callbacks to be invoked for different events +// corresponding to the resource being watched. type ResourceWatcher interface { // OnResourceChanged is invoked to notify the watcher of a new version of // the resource received from the xDS server or an error indicating the - // reason why the resource cannot be obtained. + // reason why the resource could not be obtained. // - // The ResourceData parameter needs to be type asserted to the appropriate - // type for the resource being watched. In case of error, the ResourceData - // is nil otherwise its not nil and error is nil but both will never be nil - // together. + // The ResourceData of the ResourceDataOrError needs to be type asserted to + // the appropriate type for the resource being watched. In case of error, + // the ResourceData is nil otherwise its not nil and error is nil but both + // will never be nil together. // // Watcher is expected to use the most recent value passed to // OnResourceChanged(), regardless of whether that's a resource or an error @@ -83,7 +90,7 @@ type ResourceWatcher interface { // - resource validation error (if resource is not cached) // - ADS stream failure (if resource is not cached) // - connection failure (if resource is not cached) - OnResourceChanged(ResourceData, error, OnDoneFunc) + OnResourceChanged(ResourceDataOrError, OnDoneFunc) // OnAmbientError is invoked to notify the watcher of an error that occurs // after a resource has been received (i.e. we already have a cached diff --git a/xds/internal/xdsclient/xdsresource/route_config_resource_type.go b/xds/internal/xdsclient/xdsresource/route_config_resource_type.go index 25576903d96d..2569e2b62a3a 100644 --- a/xds/internal/xdsclient/xdsresource/route_config_resource_type.go +++ b/xds/internal/xdsclient/xdsresource/route_config_resource_type.go @@ -118,7 +118,7 @@ type RouteConfigWatcher interface { // - resource validation error (if resource is not cached) // - ADS stream failure (if resource is not cached) // - connection failure (if resource is not cached) - OnResourceChanged(*RouteConfigResourceData, error, OnDoneFunc) + OnResourceChanged(*ResourceDataOrError, OnDoneFunc) // If resource is already cached, it is invoked under different error // conditions including but not limited to the following: @@ -132,13 +132,13 @@ type delegatingRouteConfigWatcher struct { watcher RouteConfigWatcher } -func (d *delegatingRouteConfigWatcher) OnResourceChanged(data ResourceData, err error, onDone OnDoneFunc) { - if err != nil { - d.watcher.OnResourceChanged(nil, err, onDone) +func (d *delegatingRouteConfigWatcher) OnResourceChanged(update ResourceDataOrError, onDone OnDoneFunc) { + if update.Err != nil { + d.watcher.OnResourceChanged(&ResourceDataOrError{Err: update.Err}, onDone) return } - rc := data.(*RouteConfigResourceData) - d.watcher.OnResourceChanged(rc, nil, onDone) + rc := update.Data.(*RouteConfigResourceData) + d.watcher.OnResourceChanged(&ResourceDataOrError{Data: rc}, onDone) } func (d *delegatingRouteConfigWatcher) OnAmbientError(err error, onDone OnDoneFunc) {