From e4527d2d63768af4c840fcfc52a67f13a665677f Mon Sep 17 00:00:00 2001 From: Aleksey Myasnikov Date: Thu, 23 Jan 2025 19:02:59 +0300 Subject: [PATCH] * Added environment variable `YDB_EXECUTE_DATA_QUERY_OVER_QUERY_SERVICE` for execute data queries from table service client using query client API --- .github/workflows/tests.yml | 2 +- CHANGELOG.md | 2 +- internal/query/client.go | 8 +- internal/query/execute_query.go | 4 +- internal/query/execute_query_test.go | 6 +- internal/query/result.go | 10 +- internal/query/result_test.go | 18 +- internal/query/session.go | 8 +- internal/query/transaction.go | 24 +- internal/table/client.go | 30 +- internal/table/client_test.go | 6 +- internal/table/config/config.go | 25 +- internal/table/retry.go | 4 +- internal/table/retry_test.go | 34 +- internal/table/session.go | 335 ++++++++++++++---- internal/table/session_test.go | 64 +++- internal/table/statement.go | 6 +- internal/table/transaction.go | 2 +- options.go | 10 + tests/integration/table_truncated_err_test.go | 137 ++++--- 20 files changed, 519 insertions(+), 216 deletions(-) diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index 4aa9f6ac1..04af27c13 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -134,7 +134,7 @@ jobs: YDB_SSL_ROOT_CERTIFICATES_FILE: /tmp/ydb_certs/ca.pem YDB_SESSIONS_SHUTDOWN_URLS: http://localhost:8765/actors/kqp_proxy?force_shutdown=all YDB_DATABASE_SQL_OVER_QUERY_SERVICE: 1 - YDB_TABLE_CLIENT_USE_QUERY_SESSION: 1 + YDB_EXECUTE_DATA_QUERY_OVER_QUERY_SERVICE: 1 HIDE_APPLICATION_OUTPUT: 1 steps: - name: Checkout code diff --git a/CHANGELOG.md b/CHANGELOG.md index 0d15f925e..1e5e49c8a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,4 +1,4 @@ -* Added environment variable `YDB_TABLE_CLIENT_USE_QUERY_SESSION` for create session in table client using query service client API +* Added environment variable `YDB_EXECUTE_DATA_QUERY_OVER_QUERY_SERVICE` for execute data queries from table service client using query client API ## v3.98.0 * Supported pool of encoders, which implement ResetableWriter interface diff --git a/internal/query/client.go b/internal/query/client.go index aa6727ed9..d23a4034c 100644 --- a/internal/query/client.go +++ b/internal/query/client.go @@ -326,7 +326,7 @@ func (c *Client) QueryRow(ctx context.Context, q string, opts ...options.Execute onDone(finalErr) }() - row, err := clientQueryRow(ctx, c.pool, q, options.ExecuteSettings(opts...), withTrace(c.config.Trace())) + row, err := clientQueryRow(ctx, c.pool, q, options.ExecuteSettings(opts...), WithTrace(c.config.Trace())) if err != nil { return nil, xerrors.WithStackTrace(err) } @@ -337,7 +337,7 @@ func (c *Client) QueryRow(ctx context.Context, q string, opts ...options.Execute func clientExec(ctx context.Context, pool sessionPool, q string, opts ...options.Execute) (finalErr error) { settings := options.ExecuteSettings(opts...) err := do(ctx, pool, func(ctx context.Context, s *Session) (err error) { - streamResult, err := execute(ctx, s.ID(), s.client, q, settings, withTrace(s.trace)) + streamResult, err := execute(ctx, s.ID(), s.client, q, settings, WithTrace(s.trace)) if err != nil { return xerrors.WithStackTrace(err) } @@ -382,7 +382,7 @@ func clientQuery(ctx context.Context, pool sessionPool, q string, opts ...option settings := options.ExecuteSettings(opts...) err = do(ctx, pool, func(ctx context.Context, s *Session) (err error) { streamResult, err := execute(ctx, s.ID(), s.client, q, - options.ExecuteSettings(opts...), withTrace(s.trace), + options.ExecuteSettings(opts...), WithTrace(s.trace), ) if err != nil { return xerrors.WithStackTrace(err) @@ -467,7 +467,7 @@ func (c *Client) QueryResultSet( onDone(finalErr) }() - rs, err := clientQueryResultSet(ctx, c.pool, q, options.ExecuteSettings(opts...), withTrace(c.config.Trace())) + rs, err := clientQueryResultSet(ctx, c.pool, q, options.ExecuteSettings(opts...), WithTrace(c.config.Trace())) if err != nil { return nil, xerrors.WithStackTrace(err) } diff --git a/internal/query/execute_query.go b/internal/query/execute_query.go index 577d7f8d7..4bd460c07 100644 --- a/internal/query/execute_query.go +++ b/internal/query/execute_query.go @@ -139,8 +139,8 @@ func execute( } r, err := newResult(ctx, stream, append(opts, - withStatsCallback(settings.StatsCallback()), - withOnClose(executeCancel), + WithStatsCallback(settings.StatsCallback()), + WithOnClose(executeCancel), )...) if err != nil { return nil, xerrors.WithStackTrace(err) diff --git a/internal/query/execute_query_test.go b/internal/query/execute_query_test.go index 8d4e2ab7f..2957e5e68 100644 --- a/internal/query/execute_query_test.go +++ b/internal/query/execute_query_test.go @@ -358,7 +358,7 @@ func TestExecute(t *testing.T) { client.EXPECT().ExecuteQuery(gomock.Any(), gomock.Any()).Return(stream, nil) var txID string r, err := execute(ctx, "123", client, "", options.ExecuteSettings(), - onTxMeta(func(txMeta *Ydb_Query.TransactionMeta) { + OnTxMeta(func(txMeta *Ydb_Query.TransactionMeta) { txID = txMeta.GetId() }), ) @@ -577,7 +577,7 @@ func TestExecute(t *testing.T) { t.Log("execute") var txID string r, err := execute(ctx, "123", client, "", options.ExecuteSettings(), - onTxMeta(func(txMeta *Ydb_Query.TransactionMeta) { + OnTxMeta(func(txMeta *Ydb_Query.TransactionMeta) { txID = txMeta.GetId() }), ) @@ -718,7 +718,7 @@ func TestExecute(t *testing.T) { t.Log("execute") var txID string r, err := execute(ctx, "123", client, "", options.ExecuteSettings(), - onTxMeta(func(txMeta *Ydb_Query.TransactionMeta) { + OnTxMeta(func(txMeta *Ydb_Query.TransactionMeta) { txID = txMeta.GetId() }), ) diff --git a/internal/query/result.go b/internal/query/result.go index c361484c0..2ed7c955e 100644 --- a/internal/query/result.go +++ b/internal/query/result.go @@ -87,31 +87,31 @@ func (r *materializedResult) NextResultSet(ctx context.Context) (result.Set, err return r.resultSets[r.idx], nil } -func withTrace(t *trace.Query) resultOption { +func WithTrace(t *trace.Query) resultOption { return func(s *streamResult) { s.trace = t } } -func withStatsCallback(callback func(queryStats stats.QueryStats)) resultOption { +func WithStatsCallback(callback func(queryStats stats.QueryStats)) resultOption { return func(s *streamResult) { s.statsCallback = callback } } -func withOnClose(onClose func()) resultOption { +func WithOnClose(onClose func()) resultOption { return func(s *streamResult) { s.onClose = append(s.onClose, onClose) } } -func onNextPartErr(callback func(err error)) resultOption { +func OnNextPartErr(callback func(err error)) resultOption { return func(s *streamResult) { s.onNextPartErr = append(s.onNextPartErr, callback) } } -func onTxMeta(callback func(txMeta *Ydb_Query.TransactionMeta)) resultOption { +func OnTxMeta(callback func(txMeta *Ydb_Query.TransactionMeta)) resultOption { return func(s *streamResult) { s.onTxMeta = append(s.onTxMeta, callback) } diff --git a/internal/query/result_test.go b/internal/query/result_test.go index 98bfb41da..fbc15e571 100644 --- a/internal/query/result_test.go +++ b/internal/query/result_test.go @@ -1544,7 +1544,7 @@ func TestCloseResultOnCloseClosableResultSet(t *testing.T) { }, nil) stream.EXPECT().Recv().Return(nil, io.EOF) var closed bool - r, err := newResult(ctx, stream, withTrace(&trace.Query{ + r, err := newResult(ctx, stream, WithTrace(&trace.Query{ OnResultClose: func(info trace.QueryResultCloseStartInfo) func(info trace.QueryResultCloseDoneInfo) { require.False(t, closed) closed = true @@ -1927,7 +1927,7 @@ func TestResultStats(t *testing.T) { }, nil) stream.EXPECT().Recv().Return(nil, io.EOF) var s stats.QueryStats - result, err := newResult(ctx, stream, withStatsCallback(func(queryStats stats.QueryStats) { + result, err := newResult(ctx, stream, WithStatsCallback(func(queryStats stats.QueryStats) { s = queryStats })) require.NoError(t, err) @@ -2288,7 +2288,7 @@ func TestResultStats(t *testing.T) { }, nil) stream.EXPECT().Recv().Return(nil, io.EOF) var s stats.QueryStats - result, err := newResult(ctx, stream, withStatsCallback(func(queryStats stats.QueryStats) { + result, err := newResult(ctx, stream, WithStatsCallback(func(queryStats stats.QueryStats) { s = queryStats })) require.NoError(t, err) @@ -2650,7 +2650,7 @@ func TestResultStats(t *testing.T) { }, nil) stream.EXPECT().Recv().Return(nil, io.EOF) var s stats.QueryStats - result, err := newResult(ctx, stream, withStatsCallback(func(queryStats stats.QueryStats) { + result, err := newResult(ctx, stream, WithStatsCallback(func(queryStats stats.QueryStats) { s = queryStats })) require.NoError(t, err) @@ -2987,7 +2987,7 @@ func TestResultStats(t *testing.T) { }, nil) stream.EXPECT().Recv().Return(nil, io.EOF) var s stats.QueryStats - result, err := newResult(ctx, stream, withStatsCallback(func(queryStats stats.QueryStats) { + result, err := newResult(ctx, stream, WithStatsCallback(func(queryStats stats.QueryStats) { s = queryStats })) require.NoError(t, err) @@ -3359,7 +3359,7 @@ func TestMaterializedResultStats(t *testing.T) { }, nil) stream.EXPECT().Recv().Return(nil, io.EOF) var s stats.QueryStats - result, err := newResult(ctx, stream, withStatsCallback(func(queryStats stats.QueryStats) { + result, err := newResult(ctx, stream, WithStatsCallback(func(queryStats stats.QueryStats) { s = queryStats })) require.NoError(t, err) @@ -3720,7 +3720,7 @@ func TestMaterializedResultStats(t *testing.T) { }, nil) stream.EXPECT().Recv().Return(nil, io.EOF) var s stats.QueryStats - result, err := newResult(ctx, stream, withStatsCallback(func(queryStats stats.QueryStats) { + result, err := newResult(ctx, stream, WithStatsCallback(func(queryStats stats.QueryStats) { s = queryStats })) require.NoError(t, err) @@ -4082,7 +4082,7 @@ func TestMaterializedResultStats(t *testing.T) { }, nil) stream.EXPECT().Recv().Return(nil, io.EOF) var s stats.QueryStats - result, err := newResult(ctx, stream, withStatsCallback(func(queryStats stats.QueryStats) { + result, err := newResult(ctx, stream, WithStatsCallback(func(queryStats stats.QueryStats) { s = queryStats })) require.NoError(t, err) @@ -4419,7 +4419,7 @@ func TestMaterializedResultStats(t *testing.T) { }, nil) stream.EXPECT().Recv().Return(nil, io.EOF) var s stats.QueryStats - result, err := newResult(ctx, stream, withStatsCallback(func(queryStats stats.QueryStats) { + result, err := newResult(ctx, stream, WithStatsCallback(func(queryStats stats.QueryStats) { s = queryStats })) require.NoError(t, err) diff --git a/internal/query/session.go b/internal/query/session.go index 138cb7ca0..edc8d9e32 100644 --- a/internal/query/session.go +++ b/internal/query/session.go @@ -37,7 +37,7 @@ func (s *Session) QueryResultSet( onDone(finalErr) }() - r, err := execute(ctx, s.ID(), s.client, q, options.ExecuteSettings(opts...), withTrace(s.trace)) + r, err := execute(ctx, s.ID(), s.client, q, options.ExecuteSettings(opts...), WithTrace(s.trace)) if err != nil { s.setStatusFromError(err) @@ -77,7 +77,7 @@ func (s *Session) QueryRow(ctx context.Context, q string, opts ...options.Execut onDone(finalErr) }() - row, err := s.queryRow(ctx, q, options.ExecuteSettings(opts...), withTrace(s.trace)) + row, err := s.queryRow(ctx, q, options.ExecuteSettings(opts...), WithTrace(s.trace)) if err != nil { return nil, xerrors.WithStackTrace(err) } @@ -145,7 +145,7 @@ func (s *Session) Exec( onDone(finalErr) }() - r, err := execute(ctx, s.ID(), s.client, q, options.ExecuteSettings(opts...), withTrace(s.trace)) + r, err := execute(ctx, s.ID(), s.client, q, options.ExecuteSettings(opts...), WithTrace(s.trace)) if err != nil { s.setStatusFromError(err) @@ -169,7 +169,7 @@ func (s *Session) Query( onDone(finalErr) }() - r, err := execute(ctx, s.ID(), s.client, q, options.ExecuteSettings(opts...), withTrace(s.trace)) + r, err := execute(ctx, s.ID(), s.client, q, options.ExecuteSettings(opts...), WithTrace(s.trace)) if err != nil { s.setStatusFromError(err) diff --git a/internal/query/transaction.go b/internal/query/transaction.go index c670cae52..8841e4b6f 100644 --- a/internal/query/transaction.go +++ b/internal/query/transaction.go @@ -93,8 +93,8 @@ func (tx *Transaction) QueryResultSet( } resultOpts := []resultOption{ - withTrace(tx.s.trace), - onTxMeta(func(txMeta *Ydb_Query.TransactionMeta) { + WithTrace(tx.s.trace), + OnTxMeta(func(txMeta *Ydb_Query.TransactionMeta) { tx.SetTxID(txMeta.GetId()) }), } @@ -107,7 +107,7 @@ func (tx *Transaction) QueryResultSet( // notification about complete transaction must be sended for any error or for successfully read all result if // it was execution with commit flag resultOpts = append(resultOpts, - onNextPartErr(func(err error) { + OnNextPartErr(func(err error) { tx.notifyOnCompleted(xerrors.HideEOF(err)) }), ) @@ -144,8 +144,8 @@ func (tx *Transaction) QueryRow( ) resultOpts := []resultOption{ - withTrace(tx.s.trace), - onTxMeta(func(txMeta *Ydb_Query.TransactionMeta) { + WithTrace(tx.s.trace), + OnTxMeta(func(txMeta *Ydb_Query.TransactionMeta) { tx.SetTxID(txMeta.GetId()) }), } @@ -158,7 +158,7 @@ func (tx *Transaction) QueryRow( // notification about complete transaction must be sended for any error or for successfully read all result if // it was execution with commit flag resultOpts = append(resultOpts, - onNextPartErr(func(err error) { + OnNextPartErr(func(err error) { tx.notifyOnCompleted(xerrors.HideEOF(err)) }), ) @@ -211,8 +211,8 @@ func (tx *Transaction) Exec(ctx context.Context, q string, opts ...options.Execu } resultOpts := []resultOption{ - withTrace(tx.s.trace), - onTxMeta(func(txMeta *Ydb_Query.TransactionMeta) { + WithTrace(tx.s.trace), + OnTxMeta(func(txMeta *Ydb_Query.TransactionMeta) { tx.SetTxID(txMeta.GetId()) }), } @@ -225,7 +225,7 @@ func (tx *Transaction) Exec(ctx context.Context, q string, opts ...options.Execu // notification about complete transaction must be sended for any error or for successfully read all result if // it was execution with commit flag resultOpts = append(resultOpts, - onNextPartErr(func(err error) { + OnNextPartErr(func(err error) { tx.notifyOnCompleted(xerrors.HideEOF(err)) }), ) @@ -282,8 +282,8 @@ func (tx *Transaction) Query(ctx context.Context, q string, opts ...options.Exec } resultOpts := []resultOption{ - withTrace(tx.s.trace), - onTxMeta(func(txMeta *Ydb_Query.TransactionMeta) { + WithTrace(tx.s.trace), + OnTxMeta(func(txMeta *Ydb_Query.TransactionMeta) { tx.SetTxID(txMeta.GetId()) }), } @@ -296,7 +296,7 @@ func (tx *Transaction) Query(ctx context.Context, q string, opts ...options.Exec // notification about complete transaction must be sended for any error or for successfully read all result if // it was execution with commit flag resultOpts = append(resultOpts, - onNextPartErr(func(err error) { + OnNextPartErr(func(err error) { tx.notifyOnCompleted(xerrors.HideEOF(err)) }), ) diff --git a/internal/table/client.go b/internal/table/client.go index e463eba16..deb912f3c 100644 --- a/internal/table/client.go +++ b/internal/table/client.go @@ -19,7 +19,7 @@ import ( ) // sessionBuilder is the interface that holds logic of creating sessions. -type sessionBuilder func(ctx context.Context) (*session, error) +type sessionBuilder func(ctx context.Context) (*Session, error) func New(ctx context.Context, cc grpc.ClientConnInterface, config *config.Config) *Client { onDone := trace.TableOnInit(config.Trace(), &ctx, @@ -30,20 +30,20 @@ func New(ctx context.Context, cc grpc.ClientConnInterface, config *config.Config clock: config.Clock(), config: config, cc: cc, - build: func(ctx context.Context) (s *session, err error) { + build: func(ctx context.Context) (s *Session, err error) { return newSession(ctx, cc, config) }, - pool: pool.New[*session, session](ctx, - pool.WithLimit[*session, session](config.SizeLimit()), - pool.WithItemUsageLimit[*session, session](config.SessionUsageLimit()), - pool.WithIdleTimeToLive[*session, session](config.IdleThreshold()), - pool.WithCreateItemTimeout[*session, session](config.CreateSessionTimeout()), - pool.WithCloseItemTimeout[*session, session](config.DeleteTimeout()), - pool.WithClock[*session, session](config.Clock()), - pool.WithCreateItemFunc[*session, session](func(ctx context.Context) (*session, error) { + pool: pool.New[*Session, Session](ctx, + pool.WithLimit[*Session, Session](config.SizeLimit()), + pool.WithItemUsageLimit[*Session, Session](config.SessionUsageLimit()), + pool.WithIdleTimeToLive[*Session, Session](config.IdleThreshold()), + pool.WithCreateItemTimeout[*Session, Session](config.CreateSessionTimeout()), + pool.WithCloseItemTimeout[*Session, Session](config.DeleteTimeout()), + pool.WithClock[*Session, Session](config.Clock()), + pool.WithCreateItemFunc[*Session, Session](func(ctx context.Context) (*Session, error) { return newSession(ctx, cc, config) }), - pool.WithTrace[*session, session](&pool.Trace{ + pool.WithTrace[*Session, Session](&pool.Trace{ OnNew: func(ctx *context.Context, call stack.Caller) func(limit int) { return func(limit int) { onDone(limit) @@ -51,7 +51,7 @@ func New(ctx context.Context, cc grpc.ClientConnInterface, config *config.Config }, OnPut: func(ctx *context.Context, call stack.Caller, item any) func(err error) { onDone := trace.TableOnPoolPut( //nolint:forcetypeassert - config.Trace(), ctx, call, item.(*session), + config.Trace(), ctx, call, item.(*Session), ) return func(err error) { @@ -62,7 +62,7 @@ func New(ctx context.Context, cc grpc.ClientConnInterface, config *config.Config onDone := trace.TableOnPoolGet(config.Trace(), ctx, call) return func(item any, attempts int, err error) { - onDone(item.(*session), attempts, err) //nolint:forcetypeassert + onDone(item.(*Session), attempts, err) //nolint:forcetypeassert } }, OnWith: func(ctx *context.Context, call stack.Caller) func(attempts int, err error) { @@ -102,7 +102,7 @@ func (c *Client) CreateSession(ctx context.Context, opts ...table.Option) (_ tab if c.isClosed() { return nil, xerrors.WithStackTrace(errClosedClient) } - createSession := func(ctx context.Context) (*session, error) { + createSession := func(ctx context.Context) (*Session, error) { s, err := c.build(ctx) if err != nil { return nil, xerrors.WithStackTrace(err) @@ -125,7 +125,7 @@ func (c *Client) CreateSession(ctx context.Context, opts ...table.Option) (_ tab "github.com/ydb-platform/ydb-go-sdk/v3/internal/table.(*Client).CreateSession"), ) attempts = 0 - s *session + s *Session ) defer func() { if s != nil { diff --git a/internal/table/client_test.go b/internal/table/client_test.go index 2590bf301..111cde261 100644 --- a/internal/table/client_test.go +++ b/internal/table/client_test.go @@ -128,7 +128,7 @@ var simpleCluster = testutil.NewBalancer( ), ) -func simpleSession(t testing.TB) *session { +func simpleSession(t testing.TB) *Session { s, err := newTableSession(context.Background(), simpleCluster, config.New()) if err != nil { t.Fatalf("newTableSession unexpected error: %v", err) @@ -138,7 +138,7 @@ func simpleSession(t testing.TB) *session { } type StubBuilder struct { - OnCreateSession func(ctx context.Context) (*session, error) + OnCreateSession func(ctx context.Context) (*Session, error) cc grpc.ClientConnInterface Limit int @@ -148,7 +148,7 @@ type StubBuilder struct { actual int } -func (s *StubBuilder) createSession(ctx context.Context) (session *session, err error) { +func (s *StubBuilder) createSession(ctx context.Context) (session *Session, err error) { defer s.mu.WithLock(func() { if session != nil { s.actual++ diff --git a/internal/table/config/config.go b/internal/table/config/config.go index ee8ee694b..b8906df5c 100644 --- a/internal/table/config/config.go +++ b/internal/table/config/config.go @@ -159,9 +159,6 @@ func WithIgnoreTruncated() Option { func ExecuteDataQueryOverQueryService(b bool) Option { return func(c *Config) { c.executeDataQueryOverQueryService = b - if b { - c.useQuerySession = true - } } } @@ -237,6 +234,11 @@ func (c *Config) IgnoreTruncated() bool { return c.ignoreTruncated } +// UseQuerySession specifies behavior on create/delete session +func (c *Config) UseQuerySession(b bool) bool { + return c.useQuerySession +} + // ExecuteDataQueryOverQueryService specifies behavior on execute handle func (c *Config) ExecuteDataQueryOverQueryService() bool { return c.executeDataQueryOverQueryService @@ -288,13 +290,16 @@ func (c *Config) DeleteTimeout() time.Duration { } func defaults() *Config { + executeDataQueryOverQueryService := os.Getenv("YDB_EXECUTE_DATA_QUERY_OVER_QUERY_SERVICE") != "" + return &Config{ - sizeLimit: DefaultSessionPoolSizeLimit, - createSessionTimeout: DefaultSessionPoolCreateSessionTimeout, - deleteTimeout: DefaultSessionPoolDeleteTimeout, - idleThreshold: DefaultSessionPoolIdleThreshold, - clock: clockwork.NewRealClock(), - trace: &trace.Table{}, - useQuerySession: os.Getenv("YDB_TABLE_CLIENT_USE_QUERY_SESSION") != "", + sizeLimit: DefaultSessionPoolSizeLimit, + createSessionTimeout: DefaultSessionPoolCreateSessionTimeout, + deleteTimeout: DefaultSessionPoolDeleteTimeout, + idleThreshold: DefaultSessionPoolIdleThreshold, + clock: clockwork.NewRealClock(), + trace: &trace.Table{}, + useQuerySession: executeDataQueryOverQueryService, + executeDataQueryOverQueryService: executeDataQueryOverQueryService, } } diff --git a/internal/table/retry.go b/internal/table/retry.go index 9e193fa5a..c63a8773b 100644 --- a/internal/table/retry.go +++ b/internal/table/retry.go @@ -18,7 +18,7 @@ type sessionPool interface { closer.Closer Stats() pool.Stats - With(ctx context.Context, f func(ctx context.Context, s *session) error, opts ...retry.Option) error + With(ctx context.Context, f func(ctx context.Context, s *Session) error, opts ...retry.Option) error } func do( @@ -64,7 +64,7 @@ func retryBackoff( op table.Operation, opts ...retry.Option, ) error { - return pool.With(ctx, func(ctx context.Context, s *session) error { + return pool.With(ctx, func(ctx context.Context, s *Session) error { if err := op(ctx, s); err != nil { s.checkError(err) diff --git a/internal/table/retry_test.go b/internal/table/retry_test.go index a6d209914..f4b01934e 100644 --- a/internal/table/retry_test.go +++ b/internal/table/retry_test.go @@ -85,16 +85,16 @@ func TestDoBadSession(t *testing.T) { ctx := xtest.Context(t) xtest.TestManyTimes(t, func(t testing.TB) { closed := make(map[table.Session]bool) - p := pool.New[*session, session](ctx, - pool.WithCreateItemFunc[*session, session](func(ctx context.Context) (*session, error) { + p := pool.New[*Session, Session](ctx, + pool.WithCreateItemFunc[*Session, Session](func(ctx context.Context) (*Session, error) { s := simpleSession(t) - s.onClose = append(s.onClose, func(s *session) { + s.onClose = append(s.onClose, func(s *Session) { closed[s] = true }) return s, nil }), - pool.WithSyncCloseItem[*session, session](), + pool.WithSyncCloseItem[*Session, Session](), ) var ( i int @@ -136,11 +136,11 @@ func TestDoCreateSessionError(t *testing.T) { xtest.TestManyTimes(t, func(t testing.TB) { ctx, cancel := xcontext.WithTimeout(rootCtx, 30*time.Millisecond) defer cancel() - p := pool.New[*session, session](ctx, - pool.WithCreateItemFunc[*session, session](func(ctx context.Context) (*session, error) { + p := pool.New[*Session, Session](ctx, + pool.WithCreateItemFunc[*Session, Session](func(ctx context.Context) (*Session, error) { return nil, xerrors.Operation(xerrors.WithStatusCode(Ydb.StatusIds_UNAVAILABLE)) }), - pool.WithSyncCloseItem[*session, session](), + pool.WithSyncCloseItem[*Session, Session](), ) err := do(ctx, p, config.New(), func(ctx context.Context, s table.Session) error { @@ -305,11 +305,11 @@ func TestDoContextDeadline(t *testing.T) { cc: testutil.NewBalancer(testutil.WithInvokeHandlers(testutil.InvokeHandlers{})), } ctx := xtest.Context(t) - p := pool.New[*session, session](ctx, - pool.WithCreateItemFunc[*session, session](func(ctx context.Context) (*session, error) { + p := pool.New[*Session, Session](ctx, + pool.WithCreateItemFunc[*Session, Session](func(ctx context.Context) (*Session, error) { return newTableSession(ctx, client.cc, config.New()) }), - pool.WithSyncCloseItem[*session, session](), + pool.WithSyncCloseItem[*Session, Session](), ) r := xrand.New(xrand.WithLock()) for i := range timeouts { @@ -354,12 +354,12 @@ func TestDoWithCustomErrors(t *testing.T) { var ( limit = 10 ctx = context.Background() - p = pool.New[*session, session](ctx, - pool.WithCreateItemFunc[*session, session](func(ctx context.Context) (*session, error) { + p = pool.New[*Session, Session](ctx, + pool.WithCreateItemFunc[*Session, Session](func(ctx context.Context) (*Session, error) { return simpleSession(t), nil }), - pool.WithLimit[*session, session](limit), - pool.WithSyncCloseItem[*session, session](), + pool.WithLimit[*Session, Session](limit), + pool.WithSyncCloseItem[*Session, Session](), ) ) for _, test := range []struct { @@ -465,12 +465,12 @@ func TestDoWithCustomErrors(t *testing.T) { } // SingleSession returns sessionPool that uses only given session during retries. -func SingleSession(s *session) sessionPool { +func SingleSession(s *Session) sessionPool { return &singleSession{s: s} } type singleSession struct { - s *session + s *Session } func (s *singleSession) Close(ctx context.Context) error { @@ -485,7 +485,7 @@ func (s *singleSession) Stats() pool.Stats { } func (s *singleSession) With(ctx context.Context, - f func(ctx context.Context, s *session) error, opts ...retry.Option, + f func(ctx context.Context, s *Session) error, opts ...retry.Option, ) error { return retry.Retry(ctx, func(ctx context.Context) error { return f(ctx, s.s) diff --git a/internal/table/session.go b/internal/table/session.go index 403869f06..4068ced0e 100644 --- a/internal/table/session.go +++ b/internal/table/session.go @@ -3,6 +3,7 @@ package table import ( "context" "fmt" + "io" "net/url" "strconv" "sync" @@ -12,6 +13,7 @@ import ( "github.com/ydb-platform/ydb-go-genproto/Ydb_Query_V1" "github.com/ydb-platform/ydb-go-genproto/Ydb_Table_V1" "github.com/ydb-platform/ydb-go-genproto/protos/Ydb" + "github.com/ydb-platform/ydb-go-genproto/protos/Ydb_Query" "github.com/ydb-platform/ydb-go-genproto/protos/Ydb_Table" "github.com/ydb-platform/ydb-go-genproto/protos/Ydb_TableStats" "google.golang.org/grpc" @@ -24,7 +26,7 @@ import ( "github.com/ydb-platform/ydb-go-sdk/v3/internal/meta" "github.com/ydb-platform/ydb-go-sdk/v3/internal/operation" "github.com/ydb-platform/ydb-go-sdk/v3/internal/params" - query "github.com/ydb-platform/ydb-go-sdk/v3/internal/query/session" + "github.com/ydb-platform/ydb-go-sdk/v3/internal/query/session" "github.com/ydb-platform/ydb-go-sdk/v3/internal/stack" "github.com/ydb-platform/ydb-go-sdk/v3/internal/table/config" "github.com/ydb-platform/ydb-go-sdk/v3/internal/table/scanner" @@ -40,30 +42,224 @@ import ( "github.com/ydb-platform/ydb-go-sdk/v3/trace" ) -// session represents a single table API session. +type ( + executor interface { + Execute( + ctx context.Context, + a *allocator.Allocator, + request *Ydb_Table.ExecuteDataQueryRequest, + callOptions ...grpc.CallOption, + ) (*transaction, result.Result, error) + } + tableExecutor struct { + client Ydb_Table_V1.TableServiceClient + ignoreTruncated bool + } + queryExecutor struct { + client Ydb_Query_V1.QueryServiceClient + } +) + +func statsModeToStatsMode(src Ydb_Table.QueryStatsCollection_Mode) (dst Ydb_Query.StatsMode) { + switch src { + case Ydb_Table.QueryStatsCollection_STATS_COLLECTION_NONE: + return Ydb_Query.StatsMode_STATS_MODE_NONE + case Ydb_Table.QueryStatsCollection_STATS_COLLECTION_BASIC: + return Ydb_Query.StatsMode_STATS_MODE_BASIC + case Ydb_Table.QueryStatsCollection_STATS_COLLECTION_FULL: + return Ydb_Query.StatsMode_STATS_MODE_FULL + case Ydb_Table.QueryStatsCollection_STATS_COLLECTION_PROFILE: + return Ydb_Query.StatsMode_STATS_MODE_PROFILE + default: + return Ydb_Query.StatsMode_STATS_MODE_UNSPECIFIED + } +} + +func txControlToTxControl(src *Ydb_Table.TransactionControl) (dst *Ydb_Query.TransactionControl) { + dst = &Ydb_Query.TransactionControl{ + CommitTx: src.GetCommitTx(), + } + + switch t := src.GetTxSelector().(type) { + case *Ydb_Table.TransactionControl_BeginTx: + switch tt := t.BeginTx.GetTxMode().(type) { + case *Ydb_Table.TransactionSettings_SerializableReadWrite: + dst.TxSelector = &Ydb_Query.TransactionControl_BeginTx{ + BeginTx: &Ydb_Query.TransactionSettings{ + TxMode: &Ydb_Query.TransactionSettings_SerializableReadWrite{ + SerializableReadWrite: &Ydb_Query.SerializableModeSettings{}, + }, + }, + } + case *Ydb_Table.TransactionSettings_SnapshotReadOnly: + dst.TxSelector = &Ydb_Query.TransactionControl_BeginTx{ + BeginTx: &Ydb_Query.TransactionSettings{ + TxMode: &Ydb_Query.TransactionSettings_SnapshotReadOnly{ + SnapshotReadOnly: &Ydb_Query.SnapshotModeSettings{}, + }, + }, + } + case *Ydb_Table.TransactionSettings_StaleReadOnly: + dst.TxSelector = &Ydb_Query.TransactionControl_BeginTx{ + BeginTx: &Ydb_Query.TransactionSettings{ + TxMode: &Ydb_Query.TransactionSettings_StaleReadOnly{ + StaleReadOnly: &Ydb_Query.StaleModeSettings{}, + }, + }, + } + case *Ydb_Table.TransactionSettings_OnlineReadOnly: + dst.TxSelector = &Ydb_Query.TransactionControl_BeginTx{ + BeginTx: &Ydb_Query.TransactionSettings{ + TxMode: &Ydb_Query.TransactionSettings_OnlineReadOnly{ + OnlineReadOnly: &Ydb_Query.OnlineModeSettings{ + AllowInconsistentReads: tt.OnlineReadOnly.GetAllowInconsistentReads(), + }, + }, + }, + } + default: + panic(fmt.Sprintf("unknown begin tx settings type: %v", tt)) + } + case *Ydb_Table.TransactionControl_TxId: + dst.TxSelector = &Ydb_Query.TransactionControl_TxId{ + TxId: t.TxId, + } + default: + panic(fmt.Sprintf("unknown tx selector type: %v", t)) + } + + return dst +} + +func queryExecuteStreamResultToTableResult( + ctx context.Context, + stream Ydb_Query_V1.QueryService_ExecuteQueryClient, +) (_ *transaction, _ result.Result, finalErr error) { + var ( + t *transaction + resultSets []*Ydb.ResultSet + queryStats *Ydb_TableStats.QueryStats + ) + + for { + if err := ctx.Err(); err != nil { + return nil, nil, xerrors.WithStackTrace(err) + } + + recv, err := stream.Recv() + if err != nil { + if xerrors.Is(err, io.EOF) { + break + } + + return nil, nil, xerrors.WithStackTrace(err) + } + + if recv.GetTxMeta() != nil { + t = &transaction{ + Identifier: tx.ID(recv.GetTxMeta().GetId()), + control: table.TxControl(table.WithTxID(recv.GetTxMeta().GetId())), + } + } + + if recv.GetExecStats() != nil { + queryStats = recv.GetExecStats() + } + + if rs := recv.GetResultSet(); rs != nil { + if idx := int(recv.GetResultSetIndex()); idx == len(resultSets) { + resultSets = append(resultSets, recv.GetResultSet()) + } else if idx < len(resultSets) { + resultSets[idx].Rows = append(resultSets[idx].GetRows(), recv.GetResultSet().GetRows()...) + } else { + return nil, nil, xerrors.WithStackTrace(fmt.Errorf("unexpected result set index: %d", idx)) + } + } + } + + return t, scanner.NewUnary( + resultSets, + queryStats, + scanner.WithIgnoreTruncated(false), + ), nil +} + +func (e queryExecutor) Execute( + ctx context.Context, + a *allocator.Allocator, + executeDataQueryRequest *Ydb_Table.ExecuteDataQueryRequest, + callOptions ...grpc.CallOption, +) (_ *transaction, _ result.Result, finalErr error) { + request := a.QueryExecuteQueryRequest() + + request.SessionId = executeDataQueryRequest.GetSessionId() + request.ExecMode = Ydb_Query.ExecMode_EXEC_MODE_EXECUTE + request.TxControl = txControlToTxControl(executeDataQueryRequest.GetTxControl()) + request.Query = &Ydb_Query.ExecuteQueryRequest_QueryContent{ + QueryContent: &Ydb_Query.QueryContent{ + Syntax: Ydb_Query.Syntax_SYNTAX_YQL_V1, + Text: executeDataQueryRequest.GetQuery().GetYqlText(), + }, + } + request.Parameters = executeDataQueryRequest.GetParameters() + request.StatsMode = statsModeToStatsMode(executeDataQueryRequest.GetCollectStats()) + request.ConcurrentResultSets = false + + ctx, cancel := xcontext.WithCancel(xcontext.ValueOnly(ctx)) + defer cancel() + + stream, err := e.client.ExecuteQuery(ctx, request, callOptions...) + if err != nil { + return nil, nil, xerrors.WithStackTrace(err) + } + + return queryExecuteStreamResultToTableResult(ctx, stream) +} + +func (e tableExecutor) Execute( + ctx context.Context, + a *allocator.Allocator, + request *Ydb_Table.ExecuteDataQueryRequest, + callOptions ...grpc.CallOption, +) (*transaction, result.Result, error) { + r, err := executeDataQuery(ctx, e.client, a, request, callOptions...) + if err != nil { + return nil, nil, xerrors.WithStackTrace(err) + } + + return executeQueryResult(r, request.GetTxControl(), e.ignoreTruncated) +} + +var ( + _ executor = (*tableExecutor)(nil) + _ executor = (*queryExecutor)(nil) +) + +// Session represents a single table API session. // // session methods are not goroutine safe. Simultaneous execution of requests // are forbidden within a single session. // // Note that after session is no longer needed it should be destroyed by // Close() call. -type session struct { - onClose []func(s *session) +type Session struct { + onClose []func(s *Session) id string client Ydb_Table_V1.TableServiceClient status table.SessionStatus config *config.Config + executor executor lastUsage atomic.Int64 statusMtx sync.RWMutex closeOnce sync.Once nodeID atomic.Uint32 } -func (s *session) IsAlive() bool { +func (s *Session) IsAlive() bool { return s.Status() == table.SessionReady } -func (s *session) LastUsage() time.Time { +func (s *Session) LastUsage() time.Time { return time.Unix(s.lastUsage.Load(), 0) } @@ -80,7 +276,7 @@ func nodeID(sessionID string) (uint32, error) { return uint32(id), err } -func (s *session) NodeID() uint32 { +func (s *Session) NodeID() uint32 { if s == nil { return 0 } @@ -96,7 +292,7 @@ func (s *session) NodeID() uint32 { return id } -func (s *session) Status() table.SessionStatus { +func (s *Session) Status() table.SessionStatus { if s == nil { return table.SessionStatusUnknown } @@ -106,14 +302,14 @@ func (s *session) Status() table.SessionStatus { return s.status } -func (s *session) SetStatus(status table.SessionStatus) { +func (s *Session) SetStatus(status table.SessionStatus) { s.statusMtx.Lock() defer s.statusMtx.Unlock() s.status = status } func newSession(ctx context.Context, cc grpc.ClientConnInterface, config *config.Config) ( - s *session, finalErr error, + s *Session, finalErr error, ) { onDone := trace.TableOnSessionNew(config.Trace(), &ctx, stack.FunctionID("github.com/ydb-platform/ydb-go-sdk/v3/internal/table.newSession"), @@ -129,7 +325,7 @@ func newSession(ctx context.Context, cc grpc.ClientConnInterface, config *config return newTableSession(ctx, cc, config) } -func newTableSession(ctx context.Context, cc grpc.ClientConnInterface, config *config.Config) (*session, error) { +func newTableSession(ctx context.Context, cc grpc.ClientConnInterface, config *config.Config) (*Session, error) { response, err := Ydb_Table_V1.NewTableServiceClient(cc).CreateSession(ctx, &Ydb_Table.CreateSessionRequest{ OperationParams: operation.Params( @@ -149,12 +345,12 @@ func newTableSession(ctx context.Context, cc grpc.ClientConnInterface, config *c return nil, xerrors.WithStackTrace(err) } - s := &session{ + s := &Session{ id: result.GetSessionId(), config: config, status: table.SessionReady, - onClose: []func(s *session){ - func(s *session) { + onClose: []func(s *Session){ + func(s *Session) { _, err = s.client.DeleteSession(ctx, &Ydb_Table.DeleteSessionRequest{ SessionId: s.id, @@ -180,29 +376,33 @@ func newTableSession(ctx context.Context, cc grpc.ClientConnInterface, config *c }, ), ) + s.executor = tableExecutor{ + client: s.client, + ignoreTruncated: false, + } return s, nil } -func newQuerySession(ctx context.Context, cc grpc.ClientConnInterface, config *config.Config) (*session, error) { - s := &session{ +func newQuerySession(ctx context.Context, cc grpc.ClientConnInterface, config *config.Config) (*Session, error) { + s := &Session{ config: config, status: table.SessionReady, } - core, err := query.Open(ctx, + core, err := session.Open(ctx, Ydb_Query_V1.NewQueryServiceClient(cc), - query.WithConn(cc), - query.OnChangeStatus(func(status query.Status) { + session.WithConn(cc), + session.OnChangeStatus(func(status session.Status) { switch status { - case query.StatusClosed: + case session.StatusClosed: s.SetStatus(table.SessionClosed) _ = s.Close(context.Background()) - case query.StatusClosing: + case session.StatusClosing: s.SetStatus(table.SessionClosing) - case query.StatusInUse: + case session.StatusInUse: s.SetStatus(table.SessionBusy) - case query.StatusIdle: + case session.StatusIdle: s.SetStatus(table.SessionReady) default: s.SetStatus(table.SessionStatusUnknown) @@ -225,16 +425,19 @@ func newQuerySession(ctx context.Context, cc grpc.ClientConnInterface, config *c }, ), ) - s.onClose = []func(s *session){ - func(s *session) { + s.onClose = []func(s *Session){ + func(s *Session) { _ = core.Close(ctx) }, } + s.executor = queryExecutor{ + client: core.Client, + } return s, nil } -func (s *session) ID() string { +func (s *Session) ID() string { if s == nil { return "" } @@ -242,9 +445,9 @@ func (s *session) ID() string { return s.id } -func (s *session) Close(ctx context.Context) (err error) { +func (s *Session) Close(ctx context.Context) (err error) { onDone := trace.TableOnSessionDelete(s.config.Trace(), &ctx, - stack.FunctionID("github.com/ydb-platform/ydb-go-sdk/v3/internal/table.(*session).Close"), + stack.FunctionID("github.com/ydb-platform/ydb-go-sdk/v3/internal/table.(*Session).Close"), s, ) defer func() { @@ -270,7 +473,7 @@ func (s *session) Close(ctx context.Context) (err error) { return nil } -func (s *session) checkCloseHint(md metadata.MD) { +func (s *Session) checkCloseHint(md metadata.MD) { for header, values := range md { if header != meta.HeaderServerHints { continue @@ -284,12 +487,12 @@ func (s *session) checkCloseHint(md metadata.MD) { } // KeepAlive keeps idle session alive. -func (s *session) KeepAlive(ctx context.Context) (err error) { +func (s *Session) KeepAlive(ctx context.Context) (err error) { var ( result Ydb_Table.KeepAliveResult onDone = trace.TableOnSessionKeepAlive( s.config.Trace(), &ctx, - stack.FunctionID("github.com/ydb-platform/ydb-go-sdk/v3/internal/table.(*session).KeepAlive"), + stack.FunctionID("github.com/ydb-platform/ydb-go-sdk/v3/internal/table.(*Session).KeepAlive"), s, ) ) @@ -328,7 +531,7 @@ func (s *session) KeepAlive(ctx context.Context) (err error) { } // CreateTable creates table at given path with given options. -func (s *session) CreateTable( +func (s *Session) CreateTable( ctx context.Context, path string, opts ...options.CreateTableOption, @@ -407,7 +610,7 @@ func DescribeTable( } // DescribeTable describes table at given path. -func (s *session) DescribeTable( +func (s *Session) DescribeTable( ctx context.Context, path string, opts ...options.DescribeTableOption, @@ -557,7 +760,7 @@ func processChangefeeds(changefeeds []*Ydb_Table.ChangefeedDescription) []option } // DropTable drops table at given path with given options. -func (s *session) DropTable( +func (s *Session) DropTable( ctx context.Context, path string, opts ...options.DropTableOption, @@ -582,7 +785,7 @@ func (s *session) DropTable( return xerrors.WithStackTrace(err) } -func (s *session) checkError(err error) { +func (s *Session) checkError(err error) { if err == nil { return } @@ -593,7 +796,7 @@ func (s *session) checkError(err error) { } // AlterTable modifies schema of table at given path with given options. -func (s *session) AlterTable( +func (s *Session) AlterTable( ctx context.Context, path string, opts ...options.AlterTableOption, @@ -623,7 +826,7 @@ func (s *session) AlterTable( } // CopyTable creates copy of table at given path. -func (s *session) CopyTable( +func (s *Session) CopyTable( ctx context.Context, dst, src string, opts ...options.CopyTableOption, @@ -690,7 +893,7 @@ func copyTables( } // CopyTables creates copy of table at given path. -func (s *session) CopyTables( +func (s *Session) CopyTables( ctx context.Context, opts ...options.CopyTablesOption, ) (err error) { @@ -740,7 +943,7 @@ func renameTables( } // RenameTables renames tables. -func (s *session) RenameTables( +func (s *Session) RenameTables( ctx context.Context, opts ...options.RenameTablesOption, ) (err error) { @@ -753,13 +956,13 @@ func (s *session) RenameTables( } // Explain explains data query represented by text. -func (s *session) Explain(ctx context.Context, sql string) (exp table.DataQueryExplanation, err error) { +func (s *Session) Explain(ctx context.Context, sql string) (exp table.DataQueryExplanation, err error) { var ( result Ydb_Table.ExplainQueryResult response *Ydb_Table.ExplainDataQueryResponse onDone = trace.TableOnSessionQueryExplain( s.config.Trace(), &ctx, - stack.FunctionID("github.com/ydb-platform/ydb-go-sdk/v3/internal/table.(*session).Explain"), + stack.FunctionID("github.com/ydb-platform/ydb-go-sdk/v3/internal/table.(*Session).Explain"), s, sql, ) ) @@ -801,14 +1004,14 @@ func (s *session) Explain(ctx context.Context, sql string) (exp table.DataQueryE } // Prepare prepares data query within session s. -func (s *session) Prepare(ctx context.Context, queryText string) (_ table.Statement, err error) { +func (s *Session) Prepare(ctx context.Context, queryText string) (_ table.Statement, err error) { var ( stmt *statement response *Ydb_Table.PrepareDataQueryResponse result Ydb_Table.PrepareQueryResult onDone = trace.TableOnSessionQueryPrepare( s.config.Trace(), &ctx, - stack.FunctionID("github.com/ydb-platform/ydb-go-sdk/v3/internal/table.(*session).Prepare"), + stack.FunctionID("github.com/ydb-platform/ydb-go-sdk/v3/internal/table.(*Session).Prepare"), s, queryText, ) ) @@ -851,7 +1054,7 @@ func (s *session) Prepare(ctx context.Context, queryText string) (_ table.Statem } // Execute executes given data query represented by text. -func (s *session) Execute(ctx context.Context, txControl *table.TransactionControl, sql string, params *params.Params, +func (s *Session) Execute(ctx context.Context, txControl *table.TransactionControl, sql string, params *params.Params, opts ...options.ExecuteDataQueryOption, ) ( txr table.Transaction, r result.Result, err error, @@ -892,7 +1095,7 @@ func (s *session) Execute(ctx context.Context, txControl *table.TransactionContr onDone := trace.TableOnSessionQueryExecute( s.config.Trace(), &ctx, - stack.FunctionID("github.com/ydb-platform/ydb-go-sdk/v3/internal/table.(*session).Execute"), + stack.FunctionID("github.com/ydb-platform/ydb-go-sdk/v3/internal/table.(*Session).Execute"), s, q, params, request.QueryCachePolicy.GetKeepInCache(), ) @@ -900,26 +1103,29 @@ func (s *session) Execute(ctx context.Context, txControl *table.TransactionContr onDone(txr, false, r, err) }() - result, err := s.executeDataQuery(ctx, a, request.ExecuteDataQueryRequest, callOptions...) + t, r, err := s.executor.Execute(ctx, a, request.ExecuteDataQueryRequest, callOptions...) if err != nil { return nil, nil, xerrors.WithStackTrace(err) } - return s.executeQueryResult(result, request.TxControl, request.IgnoreTruncated) + if t != nil { + t.s = s + } + + return t, r, nil } // executeQueryResult returns Transaction and result built from received // result. -func (s *session) executeQueryResult( +func executeQueryResult( res *Ydb_Table.ExecuteQueryResult, txControl *Ydb_Table.TransactionControl, ignoreTruncated bool, ) ( - table.Transaction, result.Result, error, + *transaction, result.Result, error, ) { tx := &transaction{ Identifier: tx.ID(res.GetTxMeta().GetId()), - s: s, } if txControl.GetCommitTx() { tx.state.Store(txStateCommitted) @@ -936,8 +1142,9 @@ func (s *session) executeQueryResult( } // executeDataQuery executes data query. -func (s *session) executeDataQuery( - ctx context.Context, a *allocator.Allocator, request *Ydb_Table.ExecuteDataQueryRequest, +func executeDataQuery( + ctx context.Context, client Ydb_Table_V1.TableServiceClient, + a *allocator.Allocator, request *Ydb_Table.ExecuteDataQueryRequest, callOptions ...grpc.CallOption, ) ( _ *Ydb_Table.ExecuteQueryResult, @@ -948,7 +1155,7 @@ func (s *session) executeDataQuery( response *Ydb_Table.ExecuteDataQueryResponse ) - response, err = s.client.ExecuteDataQuery(ctx, request, callOptions...) + response, err = client.ExecuteDataQuery(ctx, request, callOptions...) if err != nil { return nil, xerrors.WithStackTrace(err) } @@ -962,7 +1169,7 @@ func (s *session) executeDataQuery( } // ExecuteSchemeQuery executes scheme query. -func (s *session) ExecuteSchemeQuery(ctx context.Context, sql string, +func (s *Session) ExecuteSchemeQuery(ctx context.Context, sql string, opts ...options.ExecuteSchemeQueryOption, ) (err error) { request := Ydb_Table.ExecuteSchemeQueryRequest{ @@ -988,7 +1195,7 @@ func (s *session) ExecuteSchemeQuery(ctx context.Context, sql string, // DescribeTableOptions describes supported table options. // //nolint:funlen -func (s *session) DescribeTableOptions(ctx context.Context) ( +func (s *Session) DescribeTableOptions(ctx context.Context) ( desc options.TableOptionsDescription, err error, ) { @@ -1127,14 +1334,14 @@ func (s *session) DescribeTableOptions(ctx context.Context) ( // via Close() call or fully drained by sequential NextResultSet() calls. // //nolint:funlen -func (s *session) StreamReadTable( +func (s *Session) StreamReadTable( ctx context.Context, path string, opts ...options.ReadTableOption, ) (_ result.StreamResult, err error) { var ( onDone = trace.TableOnSessionQueryStreamRead(s.config.Trace(), &ctx, - stack.FunctionID("github.com/ydb-platform/ydb-go-sdk/v3/internal/table.(*session).StreamReadTable"), + stack.FunctionID("github.com/ydb-platform/ydb-go-sdk/v3/internal/table.(*Session).StreamReadTable"), s, ) request = Ydb_Table.ReadTableRequest{ @@ -1194,7 +1401,7 @@ func (s *session) StreamReadTable( ) } -func (s *session) ReadRows( +func (s *Session) ReadRows( ctx context.Context, path string, keys value.Value, @@ -1244,7 +1451,7 @@ func (s *session) ReadRows( // via Close() call or fully drained by sequential NextResultSet() calls. // //nolint:funlen -func (s *session) StreamExecuteScanQuery(ctx context.Context, sql string, parameters *params.Params, +func (s *Session) StreamExecuteScanQuery(ctx context.Context, sql string, parameters *params.Params, opts ...options.ExecuteScanQueryOption, ) (_ result.StreamResult, err error) { var ( @@ -1252,7 +1459,7 @@ func (s *session) StreamExecuteScanQuery(ctx context.Context, sql string, parame q = queryFromText(sql) onDone = trace.TableOnSessionQueryStreamExecute( s.config.Trace(), &ctx, - stack.FunctionID("github.com/ydb-platform/ydb-go-sdk/v3/internal/table.(*session).StreamExecuteScanQuery"), + stack.FunctionID("github.com/ydb-platform/ydb-go-sdk/v3/internal/table.(*Session).StreamExecuteScanQuery"), s, q, parameters, ) request = Ydb_Table.ExecuteScanQueryRequest{ @@ -1321,14 +1528,14 @@ func (s *session) StreamExecuteScanQuery(ctx context.Context, sql string, parame } // BulkUpsert uploads given list of ydb struct values to the table. -func (s *session) BulkUpsert(ctx context.Context, table string, rows value.Value, +func (s *Session) BulkUpsert(ctx context.Context, table string, rows value.Value, opts ...options.BulkUpsertOption, ) (err error) { var ( a = allocator.New() callOptions []grpc.CallOption onDone = trace.TableOnSessionBulkUpsert(s.config.Trace(), &ctx, - stack.FunctionID("github.com/ydb-platform/ydb-go-sdk/v3/internal/table.(*session).BulkUpsert"), s, + stack.FunctionID("github.com/ydb-platform/ydb-go-sdk/v3/internal/table.(*Session).BulkUpsert"), s, ) ) defer func() { @@ -1363,7 +1570,7 @@ func (s *session) BulkUpsert(ctx context.Context, table string, rows value.Value } // BeginTransaction begins new transaction within given session with given settings. -func (s *session) BeginTransaction( +func (s *Session) BeginTransaction( ctx context.Context, txSettings *table.TransactionSettings, ) (x table.Transaction, err error) { @@ -1372,7 +1579,7 @@ func (s *session) BeginTransaction( response *Ydb_Table.BeginTransactionResponse onDone = trace.TableOnTxBegin( s.config.Trace(), &ctx, - stack.FunctionID("github.com/ydb-platform/ydb-go-sdk/v3/internal/table.(*session).BeginTransaction"), + stack.FunctionID("github.com/ydb-platform/ydb-go-sdk/v3/internal/table.(*Session).BeginTransaction"), s, ) ) diff --git a/internal/table/session_test.go b/internal/table/session_test.go index 58c2170d3..b0c76f7c9 100644 --- a/internal/table/session_test.go +++ b/internal/table/session_test.go @@ -240,8 +240,12 @@ func TestSessionOperationModeOnExecuteDataQuery(t *testing.T) { { method: testutil.TableExecuteDataQuery, do: func(t *testing.T, ctx context.Context, c *Client) { - s := &session{ - client: Ydb_Table_V1.NewTableServiceClient(c.cc), + client := Ydb_Table_V1.NewTableServiceClient(c.cc) + s := &Session{ + client: client, + executor: tableExecutor{ + client: client, + }, config: config.New(), } _, _, err := s.Execute(ctx, table.TxControl(), "", table.NewQueryParameters()) @@ -251,8 +255,12 @@ func TestSessionOperationModeOnExecuteDataQuery(t *testing.T) { { method: testutil.TableExplainDataQuery, do: func(t *testing.T, ctx context.Context, c *Client) { - s := &session{ - client: Ydb_Table_V1.NewTableServiceClient(c.cc), + client := Ydb_Table_V1.NewTableServiceClient(c.cc) + s := &Session{ + client: client, + executor: tableExecutor{ + client: client, + }, config: config.New(), } _, err := s.Explain(ctx, "") @@ -262,8 +270,12 @@ func TestSessionOperationModeOnExecuteDataQuery(t *testing.T) { { method: testutil.TablePrepareDataQuery, do: func(t *testing.T, ctx context.Context, c *Client) { - s := &session{ - client: Ydb_Table_V1.NewTableServiceClient(c.cc), + client := Ydb_Table_V1.NewTableServiceClient(c.cc) + s := &Session{ + client: client, + executor: tableExecutor{ + client: client, + }, config: config.New(), } _, err := s.Prepare(ctx, "") @@ -279,8 +291,12 @@ func TestSessionOperationModeOnExecuteDataQuery(t *testing.T) { { method: testutil.TableDeleteSession, do: func(t *testing.T, ctx context.Context, c *Client) { - s := &session{ - client: Ydb_Table_V1.NewTableServiceClient(c.cc), + client := Ydb_Table_V1.NewTableServiceClient(c.cc) + s := &Session{ + client: client, + executor: tableExecutor{ + client: client, + }, config: config.New(), } require.NoError(t, s.Close(ctx)) @@ -289,8 +305,12 @@ func TestSessionOperationModeOnExecuteDataQuery(t *testing.T) { { method: testutil.TableBeginTransaction, do: func(t *testing.T, ctx context.Context, c *Client) { - s := &session{ - client: Ydb_Table_V1.NewTableServiceClient(c.cc), + client := Ydb_Table_V1.NewTableServiceClient(c.cc) + s := &Session{ + client: client, + executor: tableExecutor{ + client: client, + }, config: config.New(), } _, err := s.BeginTransaction(ctx, table.TxSettings()) @@ -300,10 +320,14 @@ func TestSessionOperationModeOnExecuteDataQuery(t *testing.T) { { method: testutil.TableCommitTransaction, do: func(t *testing.T, ctx context.Context, c *Client) { + client := Ydb_Table_V1.NewTableServiceClient(c.cc) tx := &transaction{ Identifier: tx.ID(""), - s: &session{ - client: Ydb_Table_V1.NewTableServiceClient(c.cc), + s: &Session{ + client: client, + executor: tableExecutor{ + client: client, + }, config: config.New(), }, } @@ -314,10 +338,14 @@ func TestSessionOperationModeOnExecuteDataQuery(t *testing.T) { { method: testutil.TableRollbackTransaction, do: func(t *testing.T, ctx context.Context, c *Client) { + client := Ydb_Table_V1.NewTableServiceClient(c.cc) tx := &transaction{ Identifier: tx.ID(""), - s: &session{ - client: Ydb_Table_V1.NewTableServiceClient(c.cc), + s: &Session{ + client: client, + executor: tableExecutor{ + client: client, + }, config: config.New(), }, } @@ -328,8 +356,12 @@ func TestSessionOperationModeOnExecuteDataQuery(t *testing.T) { { method: testutil.TableKeepAlive, do: func(t *testing.T, ctx context.Context, c *Client) { - s := &session{ - client: Ydb_Table_V1.NewTableServiceClient(c.cc), + client := Ydb_Table_V1.NewTableServiceClient(c.cc) + s := &Session{ + client: client, + executor: tableExecutor{ + client: client, + }, config: config.New(), } require.NoError(t, s.KeepAlive(ctx)) diff --git a/internal/table/statement.go b/internal/table/statement.go index 833973281..39404c9ca 100644 --- a/internal/table/statement.go +++ b/internal/table/statement.go @@ -19,7 +19,7 @@ import ( ) type statement struct { - session *session + session *Session query Query params map[string]*Ydb.Type } @@ -86,12 +86,12 @@ func (s *statement) execute( ) ( txr table.Transaction, r result.Result, err error, ) { - res, err := s.session.executeDataQuery(ctx, a, request.ExecuteDataQueryRequest, callOptions...) + res, err := executeDataQuery(ctx, s.session.client, a, request.ExecuteDataQueryRequest, callOptions...) if err != nil { return nil, nil, xerrors.WithStackTrace(err) } - return s.session.executeQueryResult(res, txControl, request.IgnoreTruncated) + return executeQueryResult(res, txControl, request.IgnoreTruncated) } func (s *statement) NumInput() int { diff --git a/internal/table/transaction.go b/internal/table/transaction.go index d221f1183..6e26da7ca 100644 --- a/internal/table/transaction.go +++ b/internal/table/transaction.go @@ -50,7 +50,7 @@ var _ tx.Identifier = (*transaction)(nil) type transaction struct { tx.Identifier - s *session + s *Session control *table.TransactionControl state txState } diff --git a/options.go b/options.go index 850bffde7..330669ce9 100644 --- a/options.go +++ b/options.go @@ -556,6 +556,16 @@ func WithSessionPoolIdleThreshold(idleThreshold time.Duration) Option { } } +// WithExecuteDataQueryOverQueryClient overrides table.Session.Execute with query service +// execute with materialized result +func WithExecuteDataQueryOverQueryClient() Option { + return func(ctx context.Context, d *Driver) error { + d.tableOptions = append(d.tableOptions, tableConfig.ExecuteDataQueryOverQueryService(true)) + + return nil + } +} + // WithSessionPoolSessionIdleTimeToLive limits maximum time to live of idle session // If idleTimeToLive is less than or equal to zero then sessions will not be closed by idle func WithSessionPoolSessionIdleTimeToLive(idleThreshold time.Duration) Option { diff --git a/tests/integration/table_truncated_err_test.go b/tests/integration/table_truncated_err_test.go index ced6647f1..6a1a512a7 100644 --- a/tests/integration/table_truncated_err_test.go +++ b/tests/integration/table_truncated_err_test.go @@ -7,6 +7,7 @@ import ( "context" "database/sql" "fmt" + "slices" "strconv" "testing" @@ -32,6 +33,22 @@ func TestIssue798TruncatedError(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() + // clear table + { + driver.Table().Do(ctx, func(ctx context.Context, s table.Session) error { + _, _, err := s.Execute(ctx, + table.DefaultTxControl(), + fmt.Sprintf("DELETE FROM `%s`;", tablePath), + nil, + ) + if err != nil { + return err + } + + return err + }, table.WithIdempotent()) + } + // upsert rows { rows := make([]types.Value, rowsLimit) @@ -41,9 +58,9 @@ func TestIssue798TruncatedError(t *testing.T) { types.StructFieldValue("val", types.TextValue(strconv.Itoa(i))), ) } - err := driver.Table().Do(ctx, func(ctx context.Context, s table.Session) error { - return s.BulkUpsert(ctx, tablePath, types.ListValue(rows...)) - }, table.WithIdempotent()) + err := driver.Table().BulkUpsert(ctx, tablePath, + table.BulkUpsertDataRows(types.ListValue(rows...)), table.WithIdempotent(), + ) scope.Require.NoError(err) } @@ -83,6 +100,7 @@ func TestIssue798TruncatedError(t *testing.T) { if count != rowsLimit { return fmt.Errorf("unexpected rows count: %d", count) } + return rows.Err() }, retry.WithIdempotent(true)) scope.Require.NoError(err) @@ -90,55 +108,86 @@ func TestIssue798TruncatedError(t *testing.T) { // upsert 1 row for get 1001 rows and truncated error { - err := driver.Table().Do(ctx, func(ctx context.Context, s table.Session) error { - return s.BulkUpsert(ctx, tablePath, types.ListValue(types.StructValue( + err := driver.Table().BulkUpsert(ctx, tablePath, + table.BulkUpsertDataRows(types.ListValue(types.StructValue( types.StructFieldValue("id", types.Int64Value(rowsLimit)), types.StructFieldValue("val", types.TextValue(strconv.Itoa(rowsLimit))), - ))) - }, table.WithIdempotent()) + ))), + table.WithIdempotent(), + ) scope.Require.NoError(err) } // select all rows with truncated result error { - err := driver.Table().Do(ctx, func(ctx context.Context, s table.Session) error { - _, results, err := s.Execute(ctx, - table.DefaultTxControl(), - fmt.Sprintf("SELECT * FROM `%s`;", tablePath), - nil, - ) - if err != nil { - return err - } - if err = results.NextResultSetErr(ctx); err != nil { - return fmt.Errorf("no result sets: %w", err) - } - if results.CurrentResultSet().RowCount() != rowsLimit { - return fmt.Errorf("unexpected rows count: %d", results.CurrentResultSet().RowCount()) - } - return results.Err() // expected truncated error - }, table.WithIdempotent()) - scope.Require.ErrorIs(err, result.ErrTruncated) + { + var rowsCount int + err := driver.Table().Do(ctx, func(ctx context.Context, s table.Session) error { + _, results, err := s.Execute(ctx, + table.DefaultTxControl(), + fmt.Sprintf("SELECT * FROM `%s`;", tablePath), + nil, + ) + if err != nil { + return err + } + if err = results.NextResultSetErr(ctx); err != nil { + return fmt.Errorf("no result sets: %w", err) + } - err = retry.Do(ctx, db, func(ctx context.Context, cc *sql.Conn) error { - rows, err := cc.QueryContext(ctx, fmt.Sprintf("SELECT * FROM `%s`;", tablePath)) - if err != nil { - return err + rowsCount = results.CurrentResultSet().RowCount() + if !slices.Contains([]int{rowsLimit, rowsLimit + 1}, rowsCount) { + return fmt.Errorf("unexpected rows count: %d", rowsCount) + } + + return results.Err() // expected truncated error + }, table.WithIdempotent()) + switch rowsCount { + case rowsLimit: + scope.Require.ErrorIs(err, result.ErrTruncated) + case rowsLimit + 1: + scope.Require.NoError(err) + default: + scope.Require.Error(err) + scope.Require.FailNow("unexpected rows count: %d", rowsCount) } - defer func() { - _ = rows.Close() - }() - count := 0 - for rows.Next() { - count++ + } + + { + var rowsCount int + err := retry.Do(ctx, db, func(ctx context.Context, cc *sql.Conn) error { + rowsCount = 0 + rows, err := cc.QueryContext(ctx, fmt.Sprintf("SELECT * FROM `%s`;", tablePath)) + if err != nil { + return err + } + defer func() { + _ = rows.Close() + }() + for rows.Next() { + rowsCount++ + } + if !slices.Contains([]int{rowsLimit, rowsLimit + 1}, rowsCount) { + return fmt.Errorf("unexpected rows count: %d", rowsCount) + } + + return rows.Err() + }, retry.WithIdempotent(true)) + switch driverEngine(db) { + case xsql.LEGACY: + switch rowsCount { + case rowsLimit: + scope.Require.ErrorIs(err, result.ErrTruncated) + case rowsLimit + 1: + scope.Require.NoError(err) + default: + scope.Require.Error(err) + scope.Require.FailNow("unexpected rows count: %d", rowsCount) + } + scope.Require.ErrorIs(err, result.ErrTruncated) + case xsql.PROPOSE: + scope.Require.NoError(err) } - return rows.Err() - }, retry.WithIdempotent(true)) - switch driverEngine(db) { - case xsql.LEGACY: - scope.Require.ErrorIs(err, result.ErrTruncated) - case xsql.PROPOSE: - scope.Require.NoError(err) } } @@ -157,7 +206,7 @@ func TestIssue798TruncatedError(t *testing.T) { if err = results.NextResultSetErr(ctx); err != nil { return fmt.Errorf("no result sets: %w", err) } - if results.CurrentResultSet().RowCount() != rowsLimit { + if !slices.Contains([]int{rowsLimit, rowsLimit + 1}, results.CurrentResultSet().RowCount()) { return fmt.Errorf("unexpected rows count: %d", results.CurrentResultSet().RowCount()) } return results.Err() // expected nil @@ -182,7 +231,7 @@ func TestIssue798TruncatedError(t *testing.T) { if err = results.NextResultSetErr(ctx); err != nil { return fmt.Errorf("no result sets: %w", err) } - if results.CurrentResultSet().RowCount() != rowsLimit { + if !slices.Contains([]int{rowsLimit, rowsLimit + 1}, results.CurrentResultSet().RowCount()) { return fmt.Errorf("unexpected rows count: %d", results.CurrentResultSet().RowCount()) } return results.Err() // expected nil