Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix locking in getConnection #288

Merged
merged 3 commits into from
Apr 10, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 17 additions & 8 deletions go/vt/vtgateproxy/vtgateproxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"sync"

"google.golang.org/grpc"

"vitess.io/vitess/go/sqlescape"
"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/vt/grpcclient"
Expand Down Expand Up @@ -59,32 +60,36 @@ func (proxy *VTGateProxy) getConnection(ctx context.Context, target string) (*vt
// If the connection exists, return it
proxy.mu.RLock()
existingConn := proxy.targetConns[target]
proxy.mu.RUnlock()

if existingConn != nil {
proxy.mu.RUnlock()
log.V(100).Infof("Reused connection for %v\n", target)
return existingConn, nil
}

// No luck, need to create a new one. Serialize new additions so we don't create multiple
// for a given target.
log.V(100).Infof("Need to create connection for %v\n", target)
proxy.mu.RUnlock()

proxy.mu.Lock()
defer proxy.mu.Unlock()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oops much better thanks.

But I think we still need the RUnlock() because IIRC you can't "promote" a RWLock without first dropping the read lock.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's still there - just moved above the log / error checking logic.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OMG duuuh.


// Otherwise create a new connection after dropping the lock, allowing multiple requests to
// race to create the conn for now.
grpcclient.RegisterGRPCDialOptions(func(opts []grpc.DialOption) ([]grpc.DialOption, error) {
return append(opts, grpc.WithDefaultServiceConfig(`{"loadBalancingConfig": [{"round_robin":{}}]}`)), nil
})
// Check again in case conn was made between lock acquisitions.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also very good idea, thanks.

existingConn = proxy.targetConns[target]
if existingConn != nil {
log.V(100).Infof("Reused connection for %v\n", target)
return existingConn, nil
}

// Otherwise create a new connection. TODO: confirm this doesn't actually make a TCP connection, and returns quickly,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For sure it does not. There's a WithBlock thing that IIRC you can pass to make this blocking, but if not then it is non-blocking.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ya - confirmed, the model in grpc is pretty weird, you create a "conn", but that's basically just a handle to make RPC requests. It's not until you make an RPC call that it will actually start mapping the RPC to a subconn (which maps to a TCP connection).

That's also why you can reuse the conn - it should represent a logical target to send the RPC commands to.

// otherwise we're going to have to do this while not holding the lock.
conn, err := vtgateconn.DialProtocol(ctx, "grpc", target)
if err != nil {
return nil, err
}

log.V(100).Infof("Created new connection for %v\n", target)
proxy.targetConns[target] = conn
proxy.mu.Unlock()

return conn, nil
}
Expand Down Expand Up @@ -174,4 +179,8 @@ func (proxy *VTGateProxy) StreamExecute(ctx context.Context, session *vtgateconn
}

func Init() {
log.V(100).Infof("Registering GRPC dial options")

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

eek - I think demmer and I talked about this and never fixed it - thank you for moving this here and doing it once!

grpcclient.RegisterGRPCDialOptions(func(opts []grpc.DialOption) ([]grpc.DialOption, error) {
return append(opts, grpc.WithDefaultServiceConfig(`{"loadBalancingConfig": [{"round_robin":{}}]}`)), nil
})
}
Loading