-
Notifications
You must be signed in to change notification settings - Fork 4.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
rls: Fix flaky test Test/ControlChannelConnectivityStateMonitoring #8055
base: master
Are you sure you want to change the base?
Conversation
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## master #8055 +/- ##
==========================================
- Coverage 82.29% 82.29% -0.01%
==========================================
Files 387 387
Lines 39065 39081 +16
==========================================
+ Hits 32150 32163 +13
+ Misses 5584 5581 -3
- Partials 1331 1337 +6
|
Can you try 10K or 1M runs in forge before and after the fix to ensure that flakes are eliminated by the fix? |
balancer/rls/control_channel.go
Outdated
func (c *ccStateSubscriber) OnMessage(msg any) { | ||
st, ok := msg.(connectivity.State) | ||
if !ok { | ||
return // Ignore invalid messages |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is an error we don't expect to happen in practice and if it does, it indicates a severe programming error. I would be OK to add a panic
here that includes the type being received.
balancer/rls/control_channel.go
Outdated
stateSubscriber := &ccStateSubscriber{ | ||
state: buffer.NewUnbounded(), | ||
} | ||
unsubscribe := internal.SubscribeToConnectivityStateChanges.(func(cc *grpc.ClientConn, s grpcsync.Subscriber) func())(cc.cc, stateSubscriber) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When a new subscriber is added to the pubsub, it receives the most recent message posted on the pubsub. This means that if there were N messages posted on the pubsub when a new subscriber is added, the subscriber only receives the most recently posted message. This might not be good enough for our purposes here. So, I suggest making the following changes.
- Get rid of the
ccStateSubscriber
. Instead store thebuffer.Unbounded
as a field ofcontrolChannel
. - Initialize the unbounded buffer when the control channel is created in
newControlChannel
. - Change
grpc.Dial
togrpc.NewClient
innewControlChannel
. - Register the subscriber right after creating the ClientConn to the RLS server, but before calling
Connect
on it. This will ensure that the subscriber will receive every single state change on the ClientConn.- Implement the
OnMessage
method on thecontrolChannel
type and pass it to the call tointernal.SubscribeToConnectivityStateChanges
- Implement the
balancer/rls/control_channel.go
Outdated
unsubscribe() | ||
stateSubscriber.state.Close() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This would move to controlChannel.close
.
for { | ||
// Wait for the control channel to become READY. | ||
for s := cc.cc.GetState(); s != connectivity.Ready; s = cc.cc.GetState() { | ||
var s any |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we define this variable to be of the concrete type connectivity.State
instead?
balancer/rls/control_channel.go
Outdated
@@ -176,11 +197,15 @@ func (cc *controlChannel) monitorConnectivityState() { | |||
first = false | |||
|
|||
// Wait for the control channel to move out of READY. | |||
cc.cc.WaitForStateChange(ctx, connectivity.Ready) | |||
if cc.cc.GetState() == connectivity.Shutdown { | |||
for s = <-stateSubscriber.state.Get(); s == connectivity.Ready; s = <-stateSubscriber.state.Get() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need a for
loop here? We know for a fact that we are in READY
. So, the first time we actually read anything out of the unbounded buffer, we can be sure that we have moved out of READY
. Am I missing something?
fixes #5468
The test flakes because
FIX : Use channel to make sure the go routine starts
FIX : The fix is to use
grpcsync.pubsub
to subscribe to the state changes so that we do not loose state changes.RELEASE NOTES: N/A