Skip to content

Commit

Permalink
* Added environment variable YDB_TABLE_CLIENT_USE_QUERY_SESSION for…
Browse files Browse the repository at this point in the history
… create session in table client using query service client API
  • Loading branch information
asmyasnikov committed Jan 23, 2025
1 parent 9780940 commit 6c58f66
Show file tree
Hide file tree
Showing 14 changed files with 237 additions and 104 deletions.
1 change: 1 addition & 0 deletions .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ jobs:
YDB_SSL_ROOT_CERTIFICATES_FILE: /tmp/ydb_certs/ca.pem
YDB_SESSIONS_SHUTDOWN_URLS: http://localhost:8765/actors/kqp_proxy?force_shutdown=all
YDB_DATABASE_SQL_OVER_QUERY_SERVICE: 1
YDB_TABLE_CLIENT_USE_QUERY_SESSION: 1
HIDE_APPLICATION_OUTPUT: 1
steps:
- name: Checkout code
Expand Down
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
* Added environment variable `YDB_TABLE_CLIENT_USE_QUERY_SESSION` for create session in table client using query service client API

## v3.98.0
* Supported pool of encoders, which implement ResetableWriter interface

Expand Down
27 changes: 20 additions & 7 deletions internal/query/session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/ydb-platform/ydb-go-genproto/protos/Ydb_Query"
"google.golang.org/grpc"

"github.com/ydb-platform/ydb-go-sdk/v3/internal/closer"
"github.com/ydb-platform/ydb-go-sdk/v3/internal/conn"
balancerContext "github.com/ydb-platform/ydb-go-sdk/v3/internal/endpoint"
"github.com/ydb-platform/ydb-go-sdk/v3/internal/pool"
Expand All @@ -23,6 +24,7 @@ import (
type (
Core interface {
query.SessionInfo
closer.Closer
pool.Item

SetStatus(code Status)
Expand All @@ -32,12 +34,13 @@ type (
Client Ydb_Query_V1.QueryServiceClient
Trace *trace.Query

deleteTimeout time.Duration
id string
nodeID uint32
status atomic.Uint32
closeOnce func(ctx context.Context) error
checks []func(s *core) bool
deleteTimeout time.Duration
id string
nodeID uint32
status atomic.Uint32
onChangeStatus []func(status Status)
closeOnce func(ctx context.Context) error
checks []func(s *core) bool
}
)

Expand All @@ -58,7 +61,11 @@ func (c *core) SetStatus(status Status) {
case StatusClosed, StatusError:
// nop
default:
c.status.Store(uint32(status))
if old := c.status.Swap(uint32(status)); old != uint32(status) {
for _, onChangeStatus := range c.onChangeStatus {
onChangeStatus(Status(old))
}
}
}
}

Expand All @@ -74,6 +81,12 @@ func WithConn(cc grpc.ClientConnInterface) Option {
}
}

func OnChangeStatus(onChangeStatus func(status Status)) Option {
return func(c *core) {
c.onChangeStatus = append(c.onChangeStatus, onChangeStatus)
}
}

func WithDeleteTimeout(deleteTimeout time.Duration) Option {
return func(c *core) {
c.deleteTimeout = deleteTimeout
Expand Down
6 changes: 3 additions & 3 deletions internal/table/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,9 +129,9 @@ var simpleCluster = testutil.NewBalancer(
)

func simpleSession(t testing.TB) *session {
s, err := newSession(context.Background(), simpleCluster, config.New())
s, err := newTableSession(context.Background(), simpleCluster, config.New())
if err != nil {
t.Fatalf("newSession unexpected error: %v", err)
t.Fatalf("newTableSession unexpected error: %v", err)
}

return s
Expand Down Expand Up @@ -168,5 +168,5 @@ func (s *StubBuilder) createSession(ctx context.Context) (session *session, err
return f(ctx)
}

return newSession(ctx, s.cc, config.New())
return newTableSession(ctx, s.cc, config.New())
}
28 changes: 27 additions & 1 deletion internal/table/config/config.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package config

import (
"os"
"time"

"github.com/jonboulle/clockwork"
Expand Down Expand Up @@ -154,6 +155,23 @@ func WithIgnoreTruncated() Option {
}
}

// ExecuteDataQueryOverQueryService overrides Execute handle with query service execute with materialized result
func ExecuteDataQueryOverQueryService(b bool) Option {
return func(c *Config) {
c.executeDataQueryOverQueryService = b
if b {
c.useQuerySession = true
}
}
}

// UseQuerySession creates session using query service client
func UseQuerySession(b bool) Option {
return func(c *Config) {
c.useQuerySession = b
}
}

// WithClock replaces default clock
func WithClock(clock clockwork.Clock) Option {
return func(c *Config) {
Expand All @@ -172,7 +190,9 @@ type Config struct {
deleteTimeout time.Duration
idleThreshold time.Duration

ignoreTruncated bool
ignoreTruncated bool
useQuerySession bool
executeDataQueryOverQueryService bool

trace *trace.Table

Expand Down Expand Up @@ -217,6 +237,11 @@ func (c *Config) IgnoreTruncated() bool {
return c.ignoreTruncated
}

// ExecuteDataQueryOverQueryService specifies behavior on execute handle
func (c *Config) ExecuteDataQueryOverQueryService() bool {
return c.executeDataQueryOverQueryService
}

// IdleKeepAliveThreshold is a number of keepAlive messages to call before the
// session is removed if it is an excess session (see KeepAliveMinSize)
// This means that session will be deleted after the expiration of lifetime = IdleThreshold * IdleKeepAliveThreshold
Expand Down Expand Up @@ -270,5 +295,6 @@ func defaults() *Config {
idleThreshold: DefaultSessionPoolIdleThreshold,
clock: clockwork.NewRealClock(),
trace: &trace.Table{},
useQuerySession: os.Getenv("YDB_TABLE_CLIENT_USE_QUERY_SESSION") != "",
}
}
30 changes: 15 additions & 15 deletions internal/table/data_query.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,64 +7,64 @@ import (
)

type (
query interface {
Query interface {
String() string
ID() string
YQL() string

toYDB(a *allocator.Allocator) *Ydb_Table.Query
}
textDataQuery string
preparedDataQuery struct {
textQuery string
preparedQuery struct {
id string
sql string
}
)

func (q textDataQuery) String() string {
func (q textQuery) String() string {
return string(q)
}

func (q textDataQuery) ID() string {
func (q textQuery) ID() string {
return ""
}

func (q textDataQuery) YQL() string {
func (q textQuery) YQL() string {
return string(q)
}

func (q textDataQuery) toYDB(a *allocator.Allocator) *Ydb_Table.Query {
func (q textQuery) toYDB(a *allocator.Allocator) *Ydb_Table.Query {
query := a.TableQuery()
query.Query = a.TableQueryYqlText(string(q))

return query
}

func (q preparedDataQuery) String() string {
func (q preparedQuery) String() string {
return q.sql
}

func (q preparedDataQuery) ID() string {
func (q preparedQuery) ID() string {
return q.id
}

func (q preparedDataQuery) YQL() string {
func (q preparedQuery) YQL() string {
return q.sql
}

func (q preparedDataQuery) toYDB(a *allocator.Allocator) *Ydb_Table.Query {
func (q preparedQuery) toYDB(a *allocator.Allocator) *Ydb_Table.Query {
query := a.TableQuery()
query.Query = a.TableQueryID(q.id)

return query
}

func queryFromText(s string) query {
return textDataQuery(s)
func queryFromText(s string) Query {
return textQuery(s)
}

func queryPrepared(id, sql string) query {
return preparedDataQuery{
func queryPrepared(id, sql string) Query {
return preparedQuery{
id: id,
sql: sql,
}
Expand Down
2 changes: 1 addition & 1 deletion internal/table/retry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -307,7 +307,7 @@ func TestDoContextDeadline(t *testing.T) {
ctx := xtest.Context(t)
p := pool.New[*session, session](ctx,
pool.WithCreateItemFunc[*session, session](func(ctx context.Context) (*session, error) {
return newSession(ctx, client.cc, config.New())
return newTableSession(ctx, client.cc, config.New())
}),
pool.WithSyncCloseItem[*session, session](),
)
Expand Down
Loading

0 comments on commit 6c58f66

Please sign in to comment.