Skip to content

Commit

Permalink
vtgateproxy v19 buildable and passing tests (#594)
Browse files Browse the repository at this point in the history
* get v19 to build (WIP)

* sort of builds?

* misnamed flag

* flag name

* remove binary

Signed-off-by: Esme Lamb <[email protected]>

* update go-mysql-driver

Signed-off-by: Esme Lamb <[email protected]>

* return ErrNoSubConnAvailable for no available connections

Signed-off-by: Esme Lamb <[email protected]>

---------

Signed-off-by: Esme Lamb <[email protected]>
  • Loading branch information
dedelala authored Feb 3, 2025
1 parent 407dcc1 commit 94c7f03
Show file tree
Hide file tree
Showing 7 changed files with 93 additions and 30 deletions.
7 changes: 5 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ require (
github.com/dave/jennifer v1.7.0
github.com/evanphx/json-patch v5.9.0+incompatible
github.com/fsnotify/fsnotify v1.7.0
github.com/go-sql-driver/mysql v1.7.1
github.com/go-sql-driver/mysql v1.8.1
github.com/golang/glog v1.2.0
github.com/golang/protobuf v1.5.4
github.com/golang/snappy v0.0.4
Expand Down Expand Up @@ -114,7 +114,10 @@ require (
modernc.org/sqlite v1.29.2
)

require github.com/go-chi/chi/v5 v5.0.10 // indirect
require (
filippo.io/edwards25519 v1.1.0 // indirect
github.com/go-chi/chi/v5 v5.0.10 // indirect
)

require (
cloud.google.com/go v0.112.1 // indirect
Expand Down
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ cloud.google.com/go/iam v1.1.6 h1:bEa06k05IO4f4uJonbB5iAgKTPpABy1ayxaIZV/GHVc=
cloud.google.com/go/iam v1.1.6/go.mod h1:O0zxdPeGBoFdWW3HWmBxJsk0pfvNM/p/qa82rWOGTwI=
cloud.google.com/go/storage v1.39.0 h1:brbjUa4hbDHhpQf48tjqMaXEV+f1OGoaTmQau9tmCsA=
cloud.google.com/go/storage v1.39.0/go.mod h1:OAEj/WZwUYjA3YHQ10/YcN9ttGuEpLwvaoyBXIPikEk=
filippo.io/edwards25519 v1.1.0 h1:FNf4tywRC1HmFuKW5xopWpigGjJKiJSV0Cqo0cJWDaA=
filippo.io/edwards25519 v1.1.0/go.mod h1:BxyFTGdWcka3PhytdK4V28tE5sGfRvvvRV7EaN4VDT4=
github.com/AdaLogics/go-fuzz-headers v0.0.0-20230811130428-ced1acdcaa24 h1:bvDV9vkmnHYOMsOr4WLk+Vo07yKIzd94sVoIqshQ4bU=
github.com/AdaLogics/go-fuzz-headers v0.0.0-20230811130428-ced1acdcaa24/go.mod h1:8o94RPi1/7XTJvwPpRSzSUedZrtlirdB3r9Z20bi2f8=
github.com/Azure/azure-pipeline-go v0.2.3 h1:7U9HBg1JFK3jHl5qmo4CTZKFTVgMwdFHMVtCdfBE21U=
Expand Down Expand Up @@ -153,6 +155,8 @@ github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag=
github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE=
github.com/go-sql-driver/mysql v1.7.1 h1:lUIinVbN1DY0xBg0eMOzmmtGoHwWBbvnWubQUrtU8EI=
github.com/go-sql-driver/mysql v1.7.1/go.mod h1:OXbVy3sEdcQ2Doequ6Z5BW6fXNQTmx+9S1MCJN5yJMI=
github.com/go-sql-driver/mysql v1.8.1 h1:LedoTUt/eveggdHS9qUFC1EFSa8bU2+1pZjSRpvNJ1Y=
github.com/go-sql-driver/mysql v1.8.1/go.mod h1:wEBSXgmK//2ZFJyE+qWnIsVGmvmEKlqwuVSjsCm7DZg=
github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY=
github.com/go-task/slim-sprig v0.0.0-20210107165309-348f09dbbbc0/go.mod h1:fyg7847qk6SyHyPtNmDHnmrv/HOrqktSC+C9fM+CJOE=
github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA=
Expand Down
Binary file removed go/cmd/vtgateproxy/vtgateproxy
Binary file not shown.
2 changes: 1 addition & 1 deletion go/vt/vtgateproxy/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -414,7 +414,7 @@ func (b *JSONGateResolverBuilder) update(r *JSONGateResolver) error {
if r.currentAddrs != nil && warmupTime.Seconds() > 0 {
combined := append(r.currentAddrs, addrs...)
log.V(100).Infof("updating targets for %s to warmup %v", r.target.URL.String(), targets)
r.clientConn.UpdateState(resolver.State{Addresses: combined})
_ = r.clientConn.UpdateState(resolver.State{Addresses: combined})
time.Sleep(*warmupTime)
}

Expand Down
2 changes: 1 addition & 1 deletion go/vt/vtgateproxy/firstready_balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func (f *frPickerBuilder) Build(info base.PickerBuildInfo) balancer.Picker {
log.V(100).Infof("first_ready: Build called with info: %v", info)

if len(info.ReadySCs) == 0 {
return base.NewErrPicker(errors.New("no available connections"))
return base.NewErrPicker(balancer.ErrNoSubConnAvailable)
}

f.mu.Lock()
Expand Down
103 changes: 78 additions & 25 deletions go/vt/vtgateproxy/mysql_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,13 @@ import (
"time"

"vitess.io/vitess/go/vt/sqlparser"
"vitess.io/vitess/go/vt/vtenv"
"vitess.io/vitess/go/vt/vterrors"
"vitess.io/vitess/go/vt/vtgate/vtgateconn"

"vitess.io/vitess/go/mysql"
"vitess.io/vitess/go/mysql/replication"
"vitess.io/vitess/go/mysql/sqlerror"
"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/trace"
"vitess.io/vitess/go/vt/callerid"
Expand All @@ -56,6 +59,8 @@ var (
mysqlAllowClearTextWithoutTLS = flag.Bool("mysql_allow_clear_text_without_tls", false, "If set, the server will allow the use of a clear text password over non-SSL connections.")
mysqlProxyProtocol = flag.Bool("proxy_protocol", false, "Enable HAProxy PROXY protocol on MySQL listener socket")
mysqlConnBufferPooling = flag.Bool("mysql_conn_buffer_pooling", false, "Enable mysql conn buffer pooling.")
mysqlKeepAlivePeriod = flag.Duration("mysql-server-keepalive-period", 0*time.Second, "TCP period between keep-alives")
mysqlServerFlushDelay = flag.Duration("mysql_server_flush_delay", 100*time.Millisecond, "Delay after which buffered response will be flushed to the client.")

mysqlServerRequireSecureTransport = flag.Bool("mysql_server_require_secure_transport", false, "Reject insecure connections but only if mysql_server_ssl_cert and mysql_server_ssl_key are provided")

Expand All @@ -68,7 +73,7 @@ var (

mysqlSslServerCA = flag.String("mysql_server_ssl_server_ca", "", "path to server CA in PEM format, which will be combine with server cert, return full certificate chain to clients")

mysqlSlowConnectWarnThreshold = flag.Duration("mysql_slow_connect_warn_threshold", 0, "Warn if it takes more than the given threshold for a mysql connection to establish")
mysqlSlowConnectWarnThreshold = flag.Int64("mysql_slow_connect_warn_threshold", 0, "Warn if it takes more than the given threshold for a mysql connection to establish")

mysqlConnReadTimeout = flag.Duration("mysql_server_read_timeout", 0, "connection read timeout")
mysqlConnWriteTimeout = flag.Duration("mysql_server_write_timeout", 0, "connection write timeout")
Expand All @@ -80,29 +85,43 @@ var (
busyConnections int32
)

// proxyHandler implements the Listener interface.
// proxyHandler implements the mysql.Handler interface.
// It stores the Session in the ClientData of a Connection.
type proxyHandler struct {
mysql.UnimplementedHandler
mu sync.Mutex

env *vtenv.Environment
mu sync.Mutex
proxy *VTGateProxy
}

func newProxyHandler(proxy *VTGateProxy) *proxyHandler {
func newProxyHandler(proxy *VTGateProxy) (*proxyHandler, error) {
env, err := vtenv.New(vtenv.Options{
MySQLServerVersion: servenv.MySQLServerVersion(),
TruncateUILen: servenv.TruncateUILen,
TruncateErrLen: servenv.TruncateErrLen,
})
if err != nil {
return nil, fmt.Errorf("unable to initialize env: %v", err)
}

return &proxyHandler{
env: env,
proxy: proxy,
}
}, nil
}

// NewConnection is called when a connection is created.
// It is not established yet. The handler can decide to
// set StatusFlags that will be returned by the handshake methods.
// In particular, ServerStatusAutocommit might be set.
func (ph *proxyHandler) NewConnection(c *mysql.Conn) {
}

func (ph *proxyHandler) ComResetConnection(c *mysql.Conn) {
ctx := context.Background()
ph.closeSession(ctx, c)
// ConnectionReady is called after the connection handshake, but
// before we begin to process commands.
func (ph *proxyHandler) ConnectionReady(c *mysql.Conn) {
}

// ConnectionClosed is called when a connection is closed.
func (ph *proxyHandler) ConnectionClosed(c *mysql.Conn) {
// Rollback if there is an ongoing transaction. Ignore error.
defer func() {
Expand Down Expand Up @@ -155,6 +174,10 @@ func startSpan(ctx context.Context, query, label string) (trace.Span, context.Co
return startSpanTestable(ctx, query, label, trace.NewSpan, trace.NewFromString)
}

// ComQuery is called when a connection receives a query.
// Note the contents of the query slice may change after
// the first call to callback. So the Handler should not
// hang on to the byte slice.
func (ph *proxyHandler) ComQuery(c *mysql.Conn, query string, callback func(*sqltypes.Result) error) error {
ctx := context.Background()
var cancel context.CancelFunc
Expand Down Expand Up @@ -198,12 +221,12 @@ func (ph *proxyHandler) ComQuery(c *mysql.Conn, query string, callback func(*sql

if session.SessionPb().Options.Workload == querypb.ExecuteOptions_OLAP {
err := ph.proxy.StreamExecute(ctx, session, query, make(map[string]*querypb.BindVariable), callback)
return mysql.NewSQLErrorFromError(err)
return sqlerror.NewSQLErrorFromError(err)
}

result, err := ph.proxy.Execute(ctx, session, query, make(map[string]*querypb.BindVariable))

if err := mysql.NewSQLErrorFromError(err); err != nil {
if err := sqlerror.NewSQLErrorFromError(err); err != nil {
return err
}
fillInTxStatusFlags(c, session)
Expand All @@ -223,7 +246,8 @@ func fillInTxStatusFlags(c *mysql.Conn, session *vtgateconn.VTGateSession) {
}
}

// ComPrepare is the handler for command prepare.
// ComPrepare is called when a connection receives a prepared
// statement query.
func (ph *proxyHandler) ComPrepare(c *mysql.Conn, query string, bindVars map[string]*querypb.BindVariable) ([]*querypb.Field, error) {
var ctx context.Context
var cancel context.CancelFunc
Expand Down Expand Up @@ -262,13 +286,15 @@ func (ph *proxyHandler) ComPrepare(c *mysql.Conn, query string, bindVars map[str
}(session)

_, fld, err := ph.proxy.Prepare(ctx, session, query, bindVars)
err = mysql.NewSQLErrorFromError(err)
err = sqlerror.NewSQLErrorFromError(err)
if err != nil {
return nil, err
}
return fld, nil
}

// ComStmtExecute is called when a connection receives a statement
// execute query.
func (ph *proxyHandler) ComStmtExecute(c *mysql.Conn, prepare *mysql.PrepareData, callback func(*sqltypes.Result) error) error {
var ctx context.Context
var cancel context.CancelFunc
Expand Down Expand Up @@ -308,18 +334,38 @@ func (ph *proxyHandler) ComStmtExecute(c *mysql.Conn, prepare *mysql.PrepareData

if session.SessionPb().Options.Workload == querypb.ExecuteOptions_OLAP {
err := ph.proxy.StreamExecute(ctx, session, prepare.PrepareStmt, prepare.BindVars, callback)
return mysql.NewSQLErrorFromError(err)
return sqlerror.NewSQLErrorFromError(err)
}

qr, err := ph.proxy.Execute(ctx, session, prepare.PrepareStmt, prepare.BindVars)
if err != nil {
return mysql.NewSQLErrorFromError(err)
return sqlerror.NewSQLErrorFromError(err)
}
fillInTxStatusFlags(c, session)

return callback(qr)
}

// ComRegisterReplica is called when a connection receives a ComRegisterReplica request
func (ph *proxyHandler) ComRegisterReplica(c *mysql.Conn, replicaHost string, replicaPort uint16, replicaUser string, replicaPassword string) error {
return vterrors.New(vtrpcpb.Code_UNIMPLEMENTED, "ComRegisterReplica")
}

// ComBinlogDump is called when a connection receives a ComBinlogDump request
func (ph *proxyHandler) ComBinlogDump(c *mysql.Conn, logFile string, binlogPos uint32) error {
return vterrors.New(vtrpcpb.Code_UNIMPLEMENTED, "ComBinlogDump")
}

// ComBinlogDumpGTID is part of the mysql.Handler interface.
func (ph *proxyHandler) ComBinlogDumpGTID(c *mysql.Conn, logFile string, logPos uint64, gtidSet replication.GTIDSet) error {
return vterrors.New(vtrpcpb.Code_UNIMPLEMENTED, "ComBinlogDumpGTID")
}

// WarningCount is called at the end of each query to obtain
// the value to be returned to the client in the EOF packet.
// Note that this will be called either in the context of the
// ComQuery callback if the result does not contain any fields,
// or after the last ComQuery call completes.
func (ph *proxyHandler) WarningCount(c *mysql.Conn) uint16 {
session, _ := c.ClientData.(*vtgateconn.VTGateSession)
if session == nil {
Expand All @@ -329,9 +375,13 @@ func (ph *proxyHandler) WarningCount(c *mysql.Conn) uint16 {
return uint16(len(session.SessionPb().GetWarnings()))
}

// ComBinlogDumpGTID is part of the mysql.Handler interface.
func (ph *proxyHandler) ComBinlogDumpGTID(c *mysql.Conn, gtidSet mysql.GTIDSet) error {
return vterrors.New(vtrpcpb.Code_UNIMPLEMENTED, "ComBinlogDumpGTID")
func (ph *proxyHandler) ComResetConnection(c *mysql.Conn) {
ctx := context.Background()
ph.closeSession(ctx, c)
}

func (ph *proxyHandler) Env() *vtenv.Environment {
return ph.env
}

func (ph *proxyHandler) getSession(ctx context.Context, c *mysql.Conn) (*vtgateconn.VTGateSession, error) {
Expand Down Expand Up @@ -437,10 +487,13 @@ func initMySQLProtocol() {

// Create a Listener.
var err error
proxyHandle = newProxyHandler(vtGateProxy)
proxyHandle, err = newProxyHandler(vtGateProxy)
if err != nil {
log.Exitf("newProxyHandler failed: %v", err)
}
if *mysqlServerPort >= 0 {
log.Infof("Mysql Server listening on Port %d", *mysqlServerPort)
mysqlListener, err = mysql.NewListener(*mysqlTCPVersion, net.JoinHostPort(*mysqlServerBindAddress, fmt.Sprintf("%v", *mysqlServerPort)), authServer, proxyHandle, *mysqlConnReadTimeout, *mysqlConnWriteTimeout, *mysqlProxyProtocol, *mysqlConnBufferPooling)
mysqlListener, err = mysql.NewListener(*mysqlTCPVersion, net.JoinHostPort(*mysqlServerBindAddress, fmt.Sprintf("%v", *mysqlServerPort)), authServer, proxyHandle, *mysqlConnReadTimeout, *mysqlConnWriteTimeout, *mysqlProxyProtocol, *mysqlConnBufferPooling, *mysqlKeepAlivePeriod, *mysqlServerFlushDelay)
if err != nil {
log.Exitf("mysql.NewListener failed: %v", err)
}
Expand All @@ -455,11 +508,11 @@ func initMySQLProtocol() {

_ = initTLSConfig(mysqlListener, *mysqlSslCert, *mysqlSslKey, *mysqlSslCa, *mysqlSslCrl, *mysqlSslServerCA, *mysqlServerRequireSecureTransport, tlsVersion)
}
mysqlListener.AllowClearTextWithoutTLS.Set(*mysqlAllowClearTextWithoutTLS)
mysqlListener.AllowClearTextWithoutTLS.Store(*mysqlAllowClearTextWithoutTLS)
// Check for the connection threshold
if *mysqlSlowConnectWarnThreshold != 0 {
log.Infof("setting mysql slow connection threshold to %v", mysqlSlowConnectWarnThreshold)
mysqlListener.SlowConnectWarnThreshold.Set(*mysqlSlowConnectWarnThreshold)
mysqlListener.SlowConnectWarnThreshold.Store(*mysqlSlowConnectWarnThreshold)
}
// Start listening for tcp
go mysqlListener.Accept()
Expand All @@ -483,7 +536,7 @@ func initMySQLProtocol() {
// newMysqlUnixSocket creates a new unix socket mysql listener. If a socket file already exists, attempts
// to clean it up.
func newMysqlUnixSocket(address string, authServer mysql.AuthServer, handler mysql.Handler) (*mysql.Listener, error) {
listener, err := mysql.NewListener("unix", address, authServer, handler, *mysqlConnReadTimeout, *mysqlConnWriteTimeout, false, false)
listener, err := mysql.NewListener("unix", address, authServer, handler, *mysqlConnReadTimeout, *mysqlConnWriteTimeout, false, false, *mysqlKeepAlivePeriod, *mysqlServerFlushDelay)
switch err := err.(type) {
case nil:
return listener, nil
Expand All @@ -504,7 +557,7 @@ func newMysqlUnixSocket(address string, authServer mysql.AuthServer, handler mys
log.Errorf("Couldn't remove existent socket file: %s", address)
return nil, err
}
listener, listenerErr := mysql.NewListener("unix", address, authServer, handler, *mysqlConnReadTimeout, *mysqlConnWriteTimeout, false, false)
listener, listenerErr := mysql.NewListener("unix", address, authServer, handler, *mysqlConnReadTimeout, *mysqlConnWriteTimeout, false, false, *mysqlKeepAlivePeriod, *mysqlServerFlushDelay)
return listener, listenerErr
default:
return nil, err
Expand Down
5 changes: 4 additions & 1 deletion go/vt/vtgateproxy/vtgateproxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,10 @@ func (proxy *VTGateProxy) Execute(ctx context.Context, session *vtgateconn.VTGat

// Intercept "use" statements since they just have to update the local session
if strings.HasPrefix(sql, "use ") {
targetString := sqlescape.UnescapeID(sql[4:])
targetString, err := sqlescape.UnescapeID(sql[4:])
if err != nil {
return &sqltypes.Result{}, fmt.Errorf("failed to unescape use statement target string: %w", err)
}
session.SessionPb().TargetString = targetString
return &sqltypes.Result{}, nil
}
Expand Down

0 comments on commit 94c7f03

Please sign in to comment.