Skip to content

Commit

Permalink
fixed context usage for closing of sessions
Browse files Browse the repository at this point in the history
  • Loading branch information
asmyasnikov committed Feb 4, 2025
1 parent 7c73ec5 commit caf6f23
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 43 deletions.
4 changes: 2 additions & 2 deletions internal/table/retry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,11 +88,11 @@ func TestDoBadSession(t *testing.T) {
p := pool.New[*Session, Session](ctx,
pool.WithCreateItemFunc[*Session, Session](func(ctx context.Context) (*Session, error) {
s := simpleSession(t)
s.onClose = append(s.onClose, func(s *Session) error {
s.closeOnce = func(_ context.Context) error {
closed[s] = true

return nil
})
}

return s, nil
}),
Expand Down
91 changes: 50 additions & 41 deletions internal/table/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"github.com/ydb-platform/ydb-go-sdk/v3/internal/value"
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xcontext"
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xerrors"
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xsync"
"github.com/ydb-platform/ydb-go-sdk/v3/retry"
"github.com/ydb-platform/ydb-go-sdk/v3/table"
"github.com/ydb-platform/ydb-go-sdk/v3/table/options"
Expand Down Expand Up @@ -248,7 +249,7 @@ var (
// Note that after session is no longer needed it should be destroyed by
// Close() call.
type Session struct {
onClose []func(s *Session) error
closeOnce func(ctx context.Context) error
id string
client Ydb_Table_V1.TableServiceClient
status table.SessionStatus
Expand Down Expand Up @@ -329,7 +330,7 @@ func newSession(ctx context.Context, cc grpc.ClientConnInterface, config *config
return newTableSession(ctx, cc, config)
}

func newTableSession( //nolint:funlen
func newTableSession(
ctx context.Context, cc grpc.ClientConnInterface, config *config.Config,
) (*Session, error) {
response, err := Ydb_Table_V1.NewTableServiceClient(cc).CreateSession(ctx,
Expand All @@ -355,29 +356,6 @@ func newTableSession( //nolint:funlen
id: result.GetSessionId(),
config: config,
status: table.SessionReady,
onClose: []func(s *Session) error{
func(s *Session) error {
if err := ctx.Err(); err != nil {
return xerrors.WithStackTrace(err)
}

_, err = s.client.DeleteSession(ctx,
&Ydb_Table.DeleteSessionRequest{
SessionId: s.id,
OperationParams: operation.Params(ctx,
s.config.OperationTimeout(),
s.config.OperationCancelAfter(),
operation.ModeSync,
),
},
)
if err != nil {
return xerrors.WithStackTrace(err)
}

return nil
},
},
}

s.lastUsage.Store(time.Now().Unix())
Expand All @@ -391,6 +369,7 @@ func newTableSession( //nolint:funlen
},
),
)
s.closeOnce = xsync.OnceFunc(closeTableSession(s.client, s.config, s.id))
s.dataQuery = tableClientExecutor{
client: s.client,
ignoreTruncated: s.config.IgnoreTruncated(),
Expand All @@ -399,7 +378,37 @@ func newTableSession( //nolint:funlen
return s, nil
}

func newQuerySession( //nolint:funlen
func closeTableSession(c Ydb_Table_V1.TableServiceClient, cfg *config.Config, id string) func(context.Context) error {
return func(ctx context.Context) error {
if err := ctx.Err(); err != nil {
return xerrors.WithStackTrace(err)
}

if t := cfg.DeleteTimeout(); t > 0 {
var cancel context.CancelFunc
ctx, cancel = xcontext.WithTimeout(ctx, t)
defer cancel()
}

_, err := c.DeleteSession(ctx,
&Ydb_Table.DeleteSessionRequest{
SessionId: id,
OperationParams: operation.Params(ctx,
cfg.OperationTimeout(),
cfg.OperationCancelAfter(),
operation.ModeSync,
),
},
)
if err != nil {
return xerrors.WithStackTrace(err)
}

return nil
}
}

func newQuerySession(
ctx context.Context, cc grpc.ClientConnInterface, config *config.Config,
) (*Session, error) {
s := &Session{
Expand Down Expand Up @@ -442,16 +451,7 @@ func newQuerySession( //nolint:funlen
},
),
)
s.onClose = []func(s *Session) error{
func(s *Session) error {
err := core.Close(ctx)
if err != nil {
return xerrors.WithStackTrace(err)
}

return nil
},
}
s.closeOnce = xsync.OnceFunc(closeQuerySession(core))
if config.ExecuteDataQueryOverQueryService() {
s.dataQuery = queryClientExecutor{
core: core,
Expand All @@ -467,6 +467,17 @@ func newQuerySession( //nolint:funlen
return s, nil
}

func closeQuerySession(core query.Core) func(context.Context) error {
return func(ctx context.Context) error {
err := core.Close(ctx)
if err != nil {
return xerrors.WithStackTrace(err)
}

return nil
}
}

func (s *Session) ID() string {
if s == nil {
return ""
Expand All @@ -485,11 +496,9 @@ func (s *Session) Close(ctx context.Context) (finalErr error) {
s.SetStatus(table.SessionClosed)
}()

for _, onClose := range s.onClose {
err := onClose(s)
if err != nil {
return xerrors.WithStackTrace(err)
}
err := s.closeOnce(ctx)
if err != nil {
return xerrors.WithStackTrace(err)
}

return nil
Expand Down

0 comments on commit caf6f23

Please sign in to comment.