Skip to content

Commit

Permalink
easwars review 2
Browse files Browse the repository at this point in the history
  • Loading branch information
purnesh42H committed Jan 13, 2025
1 parent 4c7a204 commit 385ff44
Show file tree
Hide file tree
Showing 13 changed files with 50 additions and 62 deletions.
2 changes: 2 additions & 0 deletions xds/internal/balancer/cdsbalancer/cdsbalancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -526,6 +526,8 @@ func (b *cdsBalancer) onClusterAmbientError(name string, err error) {
//
// Only executed in the context of a serializer callback.
func (b *cdsBalancer) onClusterResourceChangedError(name string, err error) {
b.logger.Errorf("Cluster resource %q received on-resource-changed error update: %v", name, err)

if b.childLB != nil {
b.childLB.ResolverError(err)
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,11 +84,7 @@ func (er *edsDiscoveryMechanism) OnResourceChanged(update *xdsresource.ResourceD

if update.Err != nil {
if er.logger.V(2) {
if xdsresource.ErrType(update.Err) == xdsresource.ErrorTypeResourceNotFound {
er.logger.Infof("EDS discovery mechanism for resource %q reported resource-does-not-exist error", er.nameToWatch)
} else {
er.logger.Infof("EDS discovery mechanism for resource %q reported on resource changed error: %v", er.nameToWatch, update.Err)
}
er.logger.Infof("EDS discovery mechanism for resource %q reported on resource changed error: %v", er.nameToWatch, update.Err)
}
// Report an empty update that would result in no priority child being
// created for this discovery mechanism. This would result in the priority
Expand Down
12 changes: 2 additions & 10 deletions xds/internal/resolver/xds_resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -528,11 +528,7 @@ func (r *xdsResolver) onListenerResourceAmbientError(err error) {
// Only executed in the context of a serializer callback.
func (r *xdsResolver) onListenerResourceChangedError(err error) {
if r.logger.V(2) {
if xdsresource.ErrType(err) == xdsresource.ErrorTypeResourceNotFound {
r.logger.Infof("Received resource-not-found-error for Listener resource %q", r.ldsResourceName)
} else {
r.logger.Infof("Received on-resource-changed error for Listener resource %q: %v", r.ldsResourceName, err)
}
r.logger.Infof("Received on-resource-changed error for Listener resource %q: %v", r.ldsResourceName, err)
}

r.listenerUpdateRecvd = false
Expand Down Expand Up @@ -573,11 +569,7 @@ func (r *xdsResolver) onRouteConfigResourceAmbientError(name string, err error)
// Only executed in the context of a serializer callback.
func (r *xdsResolver) onRouteConfigResourceChangedError(name string, err error) {
if r.logger.V(2) {
if xdsresource.ErrType(err) == xdsresource.ErrorTypeResourceNotFound {
r.logger.Infof("Received resource-not-found-error for RouteConfiguration resource %q", name)
} else {
r.logger.Infof("Received on-resource-changed error for RouteConfiguration resource %q: %v", name, err)
}
r.logger.Infof("Received on-resource-changed error for RouteConfiguration resource %q: %v", name, err)
}

if r.rdsResourceName != name {
Expand Down
14 changes: 6 additions & 8 deletions xds/internal/server/listener_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -424,19 +424,17 @@ func (lw *ldsWatcher) OnResourceChanged(update *xdsresource.ResourceDataOrError,
}
return
}
if lw.logger.V(2) {
if update.Err != nil {
lw.logger.Infof("LDS watch for resource %q received error: %#v", lw.name, update.Err)
} else {
u := update.Data.(*xdsresource.ListenerResourceData)
lw.logger.Infof("LDS watch for resource %q received update: %#v", lw.name, u.Resource)
}
}
if update.Err != nil {
if lw.logger.V(2) {
lw.logger.Infof("LDS watch for resource %q received error: %v", lw.name, update.Err)
}
lw.parent.onLDSResourceChangedError(update.Err)
return
}
u := update.Data.(*xdsresource.ListenerResourceData)
if update.Err != nil {
lw.logger.Infof("LDS watch for resource %q received update: %v", lw.name, u.Resource)
}
lw.parent.handleLDSUpdate(u.Resource)
}

Expand Down
14 changes: 6 additions & 8 deletions xds/internal/server/rds_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,19 +155,17 @@ func (rw *rdsWatcher) OnResourceChanged(update *xdsresource.ResourceDataOrError,
return
}
rw.mu.Unlock()
if rw.logger.V(2) {
if update.Err != nil {
rw.logger.Infof("RDS watch for resource %q received error: %#v", rw.routeName, update.Err)
} else {
u := update.Data.(*xdsresource.RouteConfigResourceData)
rw.logger.Infof("RDS watch for resource %q received update: %#v", rw.routeName, u.Resource)
}
}
if update.Err != nil {
if rw.logger.V(2) {
rw.logger.Infof("RDS watch for resource %q received error: %v", rw.routeName, update.Err)
}
rw.parent.handleRouteUpdate(rw.routeName, rdsWatcherUpdate{err: update.Err})
return
}
u := update.Data.(*xdsresource.RouteConfigResourceData)
if rw.logger.V(2) {
rw.logger.Infof("RDS watch for resource %q received update: %#v", rw.routeName, u.Resource)
}
rw.parent.handleRouteUpdate(rw.routeName, rdsWatcherUpdate{data: &u.Resource})
}

Expand Down
1 change: 0 additions & 1 deletion xds/internal/testutils/resource_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@ func (w *TestResourceWatcher) OnResourceChanged(update xdsresource.ResourceDataO
}
w.ErrorCh <- update.Err
return

}
select {
case <-w.UpdateCh:
Expand Down
6 changes: 3 additions & 3 deletions xds/internal/xdsclient/authority.go
Original file line number Diff line number Diff line change
Expand Up @@ -466,7 +466,7 @@ func (a *authority) handleADSResourceUpdate(serverConfig *bootstrap.ServerConfig
watcher := watcher
watcherCnt.Add(1)
funcsToSchedule = append(funcsToSchedule, func(context.Context) {
watcher.OnResourceChanged(xdsresource.ResourceDataOrError{Err: xdsresource.NewErrorf(xdsresource.ErrorTypeResourceNotFound, "xds: resource has been removed")}, done)
watcher.OnResourceChanged(xdsresource.ResourceDataOrError{Err: xdsresource.NewErrorf(xdsresource.ErrorTypeResourceNotFound, "xds: resource %q of type %q has been removed", name, rType.TypeName())}, done)
})
}
}
Expand Down Expand Up @@ -510,7 +510,7 @@ func (a *authority) handleADSResourceDoesNotExist(rType xdsresource.Type, resour
for watcher := range state.watchers {
watcher := watcher
a.watcherCallbackSerializer.TrySchedule(func(context.Context) {
watcher.OnResourceChanged(xdsresource.ResourceDataOrError{Err: xdsresource.NewErrorf(xdsresource.ErrorTypeResourceNotFound, "xds: resource %s does not exist", rType.TypeName())}, func() {})
watcher.OnResourceChanged(xdsresource.ResourceDataOrError{Err: xdsresource.NewErrorf(xdsresource.ErrorTypeResourceNotFound, "xds: resource %q of type %q does not exist", resourceName, rType.TypeName())}, func() {})
})
}
}
Expand Down Expand Up @@ -667,7 +667,7 @@ func (a *authority) watchResource(rType xdsresource.Type, resourceName string, w
// server does not have this resource, notify the new watcher.
if state.md.Status == xdsresource.ServiceStatusNotExist {
a.watcherCallbackSerializer.TrySchedule(func(context.Context) {
watcher.OnResourceChanged(xdsresource.ResourceDataOrError{Err: xdsresource.NewErrorf(xdsresource.ErrorTypeResourceNotFound, "xds: resource %s does not exist", rType.TypeName())}, func() {})
watcher.OnResourceChanged(xdsresource.ResourceDataOrError{Err: xdsresource.NewErrorf(xdsresource.ErrorTypeResourceNotFound, "xds: resource %q of type %q does not exist", resourceName, rType.TypeName())}, func() {})
})
}
cleanup = a.unwatchResource(rType, resourceName, watcher)
Expand Down
24 changes: 12 additions & 12 deletions xds/internal/xdsclient/tests/resource_update_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ func (s) TestHandleListenerResponseFromManagementServer(t *testing.T) {
Value: []byte{1, 2, 3, 4},
}},
},
wantErr: "xds: resource ListenerResource does not exist",
wantErr: fmt.Sprintf("xds: resource \"%v\" of type \"ListenerResource\" does not exist", resourceName1),
wantGenericXDSConfig: []*v3statuspb.ClientConfig_GenericXdsConfig{
{
TypeUrl: "type.googleapis.com/envoy.config.listener.v3.Listener",
Expand All @@ -177,7 +177,7 @@ func (s) TestHandleListenerResponseFromManagementServer(t *testing.T) {
TypeUrl: "type.googleapis.com/envoy.config.listener.v3.Listener",
VersionInfo: "1",
},
wantErr: "xds: resource ListenerResource does not exist",
wantErr: fmt.Sprintf("xds: resource \"%v\" of type \"ListenerResource\" does not exist", resourceName1),
wantGenericXDSConfig: []*v3statuspb.ClientConfig_GenericXdsConfig{
{
TypeUrl: "type.googleapis.com/envoy.config.listener.v3.Listener",
Expand All @@ -194,7 +194,7 @@ func (s) TestHandleListenerResponseFromManagementServer(t *testing.T) {
VersionInfo: "1",
Resources: []*anypb.Any{testutils.MarshalAny(t, &v3routepb.RouteConfiguration{})},
},
wantErr: "xds: resource ListenerResource does not exist",
wantErr: fmt.Sprintf("xds: resource \"%v\" of type \"ListenerResource\" does not exist", resourceName1),
wantGenericXDSConfig: []*v3statuspb.ClientConfig_GenericXdsConfig{
{
TypeUrl: "type.googleapis.com/envoy.config.listener.v3.Listener",
Expand Down Expand Up @@ -418,7 +418,7 @@ func (s) TestHandleRouteConfigResponseFromManagementServer(t *testing.T) {
Value: []byte{1, 2, 3, 4},
}},
},
wantErr: "xds: resource RouteConfigResource does not exist",
wantErr: fmt.Sprintf("xds: resource \"%v\" of type \"RouteConfigResource\" does not exist", resourceName1),
wantGenericXDSConfig: []*v3statuspb.ClientConfig_GenericXdsConfig{
{
TypeUrl: "type.googleapis.com/envoy.config.route.v3.RouteConfiguration",
Expand All @@ -434,7 +434,7 @@ func (s) TestHandleRouteConfigResponseFromManagementServer(t *testing.T) {
TypeUrl: "type.googleapis.com/envoy.config.route.v3.RouteConfiguration",
VersionInfo: "1",
},
wantErr: "xds: resource RouteConfigResource does not exist",
wantErr: fmt.Sprintf("xds: resource \"%v\" of type \"RouteConfigResource\" does not exist", resourceName1),
wantGenericXDSConfig: []*v3statuspb.ClientConfig_GenericXdsConfig{
{
TypeUrl: "type.googleapis.com/envoy.config.route.v3.RouteConfiguration",
Expand All @@ -451,7 +451,7 @@ func (s) TestHandleRouteConfigResponseFromManagementServer(t *testing.T) {
VersionInfo: "1",
Resources: []*anypb.Any{testutils.MarshalAny(t, &v3clusterpb.Cluster{})},
},
wantErr: "xds: resource RouteConfigResource does not exist",
wantErr: fmt.Sprintf("xds: resource \"%v\" of type \"RouteConfigResource\" does not exist", resourceName1),
wantGenericXDSConfig: []*v3statuspb.ClientConfig_GenericXdsConfig{
{
TypeUrl: "type.googleapis.com/envoy.config.route.v3.RouteConfiguration",
Expand Down Expand Up @@ -667,7 +667,7 @@ func (s) TestHandleClusterResponseFromManagementServer(t *testing.T) {
Value: []byte{1, 2, 3, 4},
}},
},
wantErr: "xds: resource ClusterResource does not exist",
wantErr: fmt.Sprintf("xds: resource \"%v\" of type \"ClusterResource\" does not exist", resourceName1),
wantGenericXDSConfig: []*v3statuspb.ClientConfig_GenericXdsConfig{
{
TypeUrl: "type.googleapis.com/envoy.config.cluster.v3.Cluster",
Expand All @@ -683,7 +683,7 @@ func (s) TestHandleClusterResponseFromManagementServer(t *testing.T) {
TypeUrl: "type.googleapis.com/envoy.config.cluster.v3.Cluster",
VersionInfo: "1",
},
wantErr: "xds: resource ClusterResource does not exist",
wantErr: fmt.Sprintf("xds: resource \"%v\" of type \"ClusterResource\" does not exist", resourceName1),
wantGenericXDSConfig: []*v3statuspb.ClientConfig_GenericXdsConfig{
{
TypeUrl: "type.googleapis.com/envoy.config.cluster.v3.Cluster",
Expand All @@ -700,7 +700,7 @@ func (s) TestHandleClusterResponseFromManagementServer(t *testing.T) {
VersionInfo: "1",
Resources: []*anypb.Any{testutils.MarshalAny(t, &v3endpointpb.ClusterLoadAssignment{})},
},
wantErr: "xds: resource ClusterResource does not exist",
wantErr: fmt.Sprintf("xds: resource \"%v\" of type \"ClusterResource\" does not exist", resourceName1),
wantGenericXDSConfig: []*v3statuspb.ClientConfig_GenericXdsConfig{
{
TypeUrl: "type.googleapis.com/envoy.config.cluster.v3.Cluster",
Expand Down Expand Up @@ -974,7 +974,7 @@ func (s) TestHandleEndpointsResponseFromManagementServer(t *testing.T) {
Value: []byte{1, 2, 3, 4},
}},
},
wantErr: "xds: resource EndpointsResource does not exist",
wantErr: fmt.Sprintf("xds: resource \"%v\" of type \"EndpointsResource\" does not exist", resourceName1),
wantGenericXDSConfig: []*v3statuspb.ClientConfig_GenericXdsConfig{
{
TypeUrl: "type.googleapis.com/envoy.config.endpoint.v3.ClusterLoadAssignment",
Expand All @@ -990,7 +990,7 @@ func (s) TestHandleEndpointsResponseFromManagementServer(t *testing.T) {
TypeUrl: "type.googleapis.com/envoy.config.endpoint.v3.ClusterLoadAssignment",
VersionInfo: "1",
},
wantErr: "xds: resource EndpointsResource does not exist",
wantErr: fmt.Sprintf("xds: resource \"%v\" of type \"EndpointsResource\" does not exist", resourceName1),
wantGenericXDSConfig: []*v3statuspb.ClientConfig_GenericXdsConfig{
{
TypeUrl: "type.googleapis.com/envoy.config.endpoint.v3.ClusterLoadAssignment",
Expand All @@ -1007,7 +1007,7 @@ func (s) TestHandleEndpointsResponseFromManagementServer(t *testing.T) {
VersionInfo: "1",
Resources: []*anypb.Any{testutils.MarshalAny(t, &v3listenerpb.Listener{})},
},
wantErr: "xds: resource EndpointsResource does not exist",
wantErr: fmt.Sprintf("xds: resource \"%v\" of type \"EndpointsResource\" does not exist", resourceName1),
wantGenericXDSConfig: []*v3statuspb.ClientConfig_GenericXdsConfig{
{
TypeUrl: "type.googleapis.com/envoy.config.endpoint.v3.ClusterLoadAssignment",
Expand Down
4 changes: 2 additions & 2 deletions xds/internal/xdsclient/xdsresource/cluster_resource_type.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,8 +123,8 @@ type ClusterWatcher interface {
// - connection failure (if resource is not cached)
OnResourceChanged(*ResourceDataOrError, OnDoneFunc)

// If resource is already cached, it is invoked under different error
// conditions including but not limited to the following:
// OnAmbientError is invoked if resource is already cached under different
// error conditions including but not limited to the following:
// - resource validation error
// - ADS stream failure
// - connection failure
Expand Down
4 changes: 2 additions & 2 deletions xds/internal/xdsclient/xdsresource/endpoints_resource_type.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,8 +119,8 @@ type EndpointsWatcher interface {
// - connection failure (if resource is not cached)
OnResourceChanged(*ResourceDataOrError, OnDoneFunc)

// If resource is already cached, it is invoked under different error
// conditions including but not limited to the following:
// OnAmbientError is invoked if resource is already cached under different
// error conditions including but not limited to the following:
// - resource validation error
// - ADS stream failure
// - connection failure
Expand Down
4 changes: 2 additions & 2 deletions xds/internal/xdsclient/xdsresource/listener_resource_type.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,8 +156,8 @@ type ListenerWatcher interface {
// - connection failure (if resource is not cached)
OnResourceChanged(*ResourceDataOrError, OnDoneFunc)

// If resource is already cached, it is invoked under different error
// conditions including but not limited to the following:
// OnAmbientError is invoked if resource is already cached under different
// error conditions including but not limited to the following:
// - resource validation error
// - ADS stream failure
// - connection failure
Expand Down
17 changes: 10 additions & 7 deletions xds/internal/xdsclient/xdsresource/resource_type.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,15 +73,18 @@ type ResourceWatcher interface {
// the resource received from the xDS server or an error indicating the
// reason why the resource could not be obtained.
//
// The ResourceData of the ResourceDataOrError needs to be type asserted to
// the appropriate type for the resource being watched. In case of error,
// the ResourceData is nil otherwise its not nil and error is nil but both
// will never be nil together.
// In the former case, this callback will be invoked with a non-nil
// ResourceData in ResourceDataOrError. The ResourceData of the
// ResourceDataOrError needs to be type asserted to the appropriate type
// for the resource being watched.
//
// In the latter case, this callback will be invoked with a non-nil error
// value in ResourceDataOrError.
//
// Watcher is expected to use the most recent value passed to
// OnResourceChanged(), regardless of whether that's a resource or an error
// i.e., if the watcher is given an error via OnResourceChanged(), that
// means it should stop using any previously delivered resource.
// OnResourceChanged(), regardless of whether that's a ResourceData or an
// error i.e., if the watcher is given an error via OnResourceChanged(),
// that means it should stop using any previously delivered resource.
//
// It is invoked under different error conditions including but not
// limited to the following:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,8 +120,8 @@ type RouteConfigWatcher interface {
// - connection failure (if resource is not cached)
OnResourceChanged(*ResourceDataOrError, OnDoneFunc)

// If resource is already cached, it is invoked under different error
// conditions including but not limited to the following:
// OnAmbientError is invoked if resource is already cached under different
// error conditions including but not limited to the following:
// - resource validation error
// - ADS stream failure
// - connection failure
Expand Down

0 comments on commit 385ff44

Please sign in to comment.