From caf6f23e8501c8b33f32ed6313b86a869fc1e5b8 Mon Sep 17 00:00:00 2001 From: Aleksey Myasnikov Date: Tue, 4 Feb 2025 17:47:07 +0300 Subject: [PATCH] fixed context usage for closing of sessions --- internal/table/retry_test.go | 4 +- internal/table/session.go | 91 ++++++++++++++++++++---------------- 2 files changed, 52 insertions(+), 43 deletions(-) diff --git a/internal/table/retry_test.go b/internal/table/retry_test.go index 8d7ba91e1..681537518 100644 --- a/internal/table/retry_test.go +++ b/internal/table/retry_test.go @@ -88,11 +88,11 @@ func TestDoBadSession(t *testing.T) { p := pool.New[*Session, Session](ctx, pool.WithCreateItemFunc[*Session, Session](func(ctx context.Context) (*Session, error) { s := simpleSession(t) - s.onClose = append(s.onClose, func(s *Session) error { + s.closeOnce = func(_ context.Context) error { closed[s] = true return nil - }) + } return s, nil }), diff --git a/internal/table/session.go b/internal/table/session.go index ef49db10a..9a3407b2a 100644 --- a/internal/table/session.go +++ b/internal/table/session.go @@ -35,6 +35,7 @@ import ( "github.com/ydb-platform/ydb-go-sdk/v3/internal/value" "github.com/ydb-platform/ydb-go-sdk/v3/internal/xcontext" "github.com/ydb-platform/ydb-go-sdk/v3/internal/xerrors" + "github.com/ydb-platform/ydb-go-sdk/v3/internal/xsync" "github.com/ydb-platform/ydb-go-sdk/v3/retry" "github.com/ydb-platform/ydb-go-sdk/v3/table" "github.com/ydb-platform/ydb-go-sdk/v3/table/options" @@ -248,7 +249,7 @@ var ( // Note that after session is no longer needed it should be destroyed by // Close() call. type Session struct { - onClose []func(s *Session) error + closeOnce func(ctx context.Context) error id string client Ydb_Table_V1.TableServiceClient status table.SessionStatus @@ -329,7 +330,7 @@ func newSession(ctx context.Context, cc grpc.ClientConnInterface, config *config return newTableSession(ctx, cc, config) } -func newTableSession( //nolint:funlen +func newTableSession( ctx context.Context, cc grpc.ClientConnInterface, config *config.Config, ) (*Session, error) { response, err := Ydb_Table_V1.NewTableServiceClient(cc).CreateSession(ctx, @@ -355,29 +356,6 @@ func newTableSession( //nolint:funlen id: result.GetSessionId(), config: config, status: table.SessionReady, - onClose: []func(s *Session) error{ - func(s *Session) error { - if err := ctx.Err(); err != nil { - return xerrors.WithStackTrace(err) - } - - _, err = s.client.DeleteSession(ctx, - &Ydb_Table.DeleteSessionRequest{ - SessionId: s.id, - OperationParams: operation.Params(ctx, - s.config.OperationTimeout(), - s.config.OperationCancelAfter(), - operation.ModeSync, - ), - }, - ) - if err != nil { - return xerrors.WithStackTrace(err) - } - - return nil - }, - }, } s.lastUsage.Store(time.Now().Unix()) @@ -391,6 +369,7 @@ func newTableSession( //nolint:funlen }, ), ) + s.closeOnce = xsync.OnceFunc(closeTableSession(s.client, s.config, s.id)) s.dataQuery = tableClientExecutor{ client: s.client, ignoreTruncated: s.config.IgnoreTruncated(), @@ -399,7 +378,37 @@ func newTableSession( //nolint:funlen return s, nil } -func newQuerySession( //nolint:funlen +func closeTableSession(c Ydb_Table_V1.TableServiceClient, cfg *config.Config, id string) func(context.Context) error { + return func(ctx context.Context) error { + if err := ctx.Err(); err != nil { + return xerrors.WithStackTrace(err) + } + + if t := cfg.DeleteTimeout(); t > 0 { + var cancel context.CancelFunc + ctx, cancel = xcontext.WithTimeout(ctx, t) + defer cancel() + } + + _, err := c.DeleteSession(ctx, + &Ydb_Table.DeleteSessionRequest{ + SessionId: id, + OperationParams: operation.Params(ctx, + cfg.OperationTimeout(), + cfg.OperationCancelAfter(), + operation.ModeSync, + ), + }, + ) + if err != nil { + return xerrors.WithStackTrace(err) + } + + return nil + } +} + +func newQuerySession( ctx context.Context, cc grpc.ClientConnInterface, config *config.Config, ) (*Session, error) { s := &Session{ @@ -442,16 +451,7 @@ func newQuerySession( //nolint:funlen }, ), ) - s.onClose = []func(s *Session) error{ - func(s *Session) error { - err := core.Close(ctx) - if err != nil { - return xerrors.WithStackTrace(err) - } - - return nil - }, - } + s.closeOnce = xsync.OnceFunc(closeQuerySession(core)) if config.ExecuteDataQueryOverQueryService() { s.dataQuery = queryClientExecutor{ core: core, @@ -467,6 +467,17 @@ func newQuerySession( //nolint:funlen return s, nil } +func closeQuerySession(core query.Core) func(context.Context) error { + return func(ctx context.Context) error { + err := core.Close(ctx) + if err != nil { + return xerrors.WithStackTrace(err) + } + + return nil + } +} + func (s *Session) ID() string { if s == nil { return "" @@ -485,11 +496,9 @@ func (s *Session) Close(ctx context.Context) (finalErr error) { s.SetStatus(table.SessionClosed) }() - for _, onClose := range s.onClose { - err := onClose(s) - if err != nil { - return xerrors.WithStackTrace(err) - } + err := s.closeOnce(ctx) + if err != nil { + return xerrors.WithStackTrace(err) } return nil