Skip to content

Commit

Permalink
Merge pull request #1632 from ydb-platform/fix-close
Browse files Browse the repository at this point in the history
check context before call session close
  • Loading branch information
asmyasnikov authored Feb 4, 2025
2 parents dd9f27b + 54669b4 commit 06a2f27
Show file tree
Hide file tree
Showing 7 changed files with 65 additions and 52 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
* Fixed bug with wrong context on session closing
* Fixed goroutine leak on closing `database/sql` driver
* "No endpoints" is retriable error now

Expand Down
4 changes: 4 additions & 0 deletions internal/query/session_core.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,10 @@ func (core *sessionCore) deleteSession(ctx context.Context) (finalErr error) {
defer cancel()
}

if err := ctx.Err(); err != nil {
return xerrors.WithStackTrace(err)
}

_, err := core.Client.DeleteSession(ctx,
&Ydb_Query.DeleteSessionRequest{
SessionId: core.id,
Expand Down
4 changes: 2 additions & 2 deletions internal/table/retry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,11 +88,11 @@ func TestDoBadSession(t *testing.T) {
p := pool.New[*Session, Session](ctx,
pool.WithCreateItemFunc[*Session, Session](func(ctx context.Context) (*Session, error) {
s := simpleSession(t)
s.onClose = append(s.onClose, func(s *Session) error {
s.closeOnce = func(_ context.Context) error {
closed[s] = true

return nil
})
}

return s, nil
}),
Expand Down
87 changes: 51 additions & 36 deletions internal/table/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"github.com/ydb-platform/ydb-go-sdk/v3/internal/value"
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xcontext"
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xerrors"
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xsync"
"github.com/ydb-platform/ydb-go-sdk/v3/retry"
"github.com/ydb-platform/ydb-go-sdk/v3/table"
"github.com/ydb-platform/ydb-go-sdk/v3/table/options"
Expand Down Expand Up @@ -248,7 +249,7 @@ var (
// Note that after session is no longer needed it should be destroyed by
// Close() call.
type Session struct {
onClose []func(s *Session) error
closeOnce func(ctx context.Context) error
id string
client Ydb_Table_V1.TableServiceClient
status table.SessionStatus
Expand Down Expand Up @@ -329,7 +330,7 @@ func newSession(ctx context.Context, cc grpc.ClientConnInterface, config *config
return newTableSession(ctx, cc, config)
}

func newTableSession( //nolint:funlen
func newTableSession(
ctx context.Context, cc grpc.ClientConnInterface, config *config.Config,
) (*Session, error) {
response, err := Ydb_Table_V1.NewTableServiceClient(cc).CreateSession(ctx,
Expand All @@ -355,25 +356,6 @@ func newTableSession( //nolint:funlen
id: result.GetSessionId(),
config: config,
status: table.SessionReady,
onClose: []func(s *Session) error{
func(s *Session) error {
_, err = s.client.DeleteSession(ctx,
&Ydb_Table.DeleteSessionRequest{
SessionId: s.id,
OperationParams: operation.Params(ctx,
s.config.OperationTimeout(),
s.config.OperationCancelAfter(),
operation.ModeSync,
),
},
)
if err != nil {
return xerrors.WithStackTrace(err)
}

return nil
},
},
}

s.lastUsage.Store(time.Now().Unix())
Expand All @@ -387,6 +369,7 @@ func newTableSession( //nolint:funlen
},
),
)
s.closeOnce = xsync.OnceFunc(closeTableSession(s.client, s.config, s.id))
s.dataQuery = tableClientExecutor{
client: s.client,
ignoreTruncated: s.config.IgnoreTruncated(),
Expand All @@ -395,7 +378,37 @@ func newTableSession( //nolint:funlen
return s, nil
}

func newQuerySession( //nolint:funlen
func closeTableSession(c Ydb_Table_V1.TableServiceClient, cfg *config.Config, id string) func(context.Context) error {
return func(ctx context.Context) error {
if err := ctx.Err(); err != nil {
return xerrors.WithStackTrace(err)
}

if t := cfg.DeleteTimeout(); t > 0 {
var cancel context.CancelFunc
ctx, cancel = xcontext.WithTimeout(ctx, t)
defer cancel()
}

_, err := c.DeleteSession(ctx,
&Ydb_Table.DeleteSessionRequest{
SessionId: id,
OperationParams: operation.Params(ctx,
cfg.OperationTimeout(),
cfg.OperationCancelAfter(),
operation.ModeSync,
),
},
)
if err != nil {
return xerrors.WithStackTrace(err)
}

return nil
}
}

func newQuerySession(
ctx context.Context, cc grpc.ClientConnInterface, config *config.Config,
) (*Session, error) {
s := &Session{
Expand Down Expand Up @@ -438,16 +451,7 @@ func newQuerySession( //nolint:funlen
},
),
)
s.onClose = []func(s *Session) error{
func(s *Session) error {
err := core.Close(ctx)
if err != nil {
return xerrors.WithStackTrace(err)
}

return nil
},
}
s.closeOnce = xsync.OnceFunc(closeQuerySession(core))
if config.ExecuteDataQueryOverQueryService() {
s.dataQuery = queryClientExecutor{
core: core,
Expand All @@ -463,6 +467,17 @@ func newQuerySession( //nolint:funlen
return s, nil
}

func closeQuerySession(core query.Core) func(context.Context) error {
return func(ctx context.Context) error {
err := core.Close(ctx)
if err != nil {
return xerrors.WithStackTrace(err)
}

return nil
}
}

func (s *Session) ID() string {
if s == nil {
return ""
Expand All @@ -471,18 +486,18 @@ func (s *Session) ID() string {
return s.id
}

func (s *Session) Close(ctx context.Context) (err error) {
func (s *Session) Close(ctx context.Context) (finalErr error) {
onDone := trace.TableOnSessionDelete(s.config.Trace(), &ctx,
stack.FunctionID("github.com/ydb-platform/ydb-go-sdk/v3/internal/table.(*Session).Close"),
s,
)
defer func() {
onDone(err)
onDone(finalErr)
s.SetStatus(table.SessionClosed)
}()

for _, onClose := range s.onClose {
err := onClose(s)
if s.closeOnce != nil {
err := s.closeOnce(ctx)
if err != nil {
return xerrors.WithStackTrace(err)
}
Expand Down
2 changes: 1 addition & 1 deletion internal/xsql/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ func (c *Connector) Parent() ydbDriver {
func (c *Connector) Close() error {
select {
case <-c.done:
return xerrors.WithStackTrace(errAlreadyClosed)
return nil
default:
close(c.done)

Expand Down
1 change: 0 additions & 1 deletion internal/xsql/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
var (
ErrUnsupported = driver.ErrSkip
errDeprecated = driver.ErrSkip
errAlreadyClosed = errors.New("already closed")
errWrongQueryProcessor = errors.New("wrong query processor")
errNotReadyConn = xerrors.Retryable(errors.New("iface not ready"), xerrors.InvalidObject())
)
18 changes: 6 additions & 12 deletions tests/integration/basic_example_database_sql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,14 +46,12 @@ func TestBasicExampleDatabaseSql(t *testing.T) {
db, err := sql.Open("ydb", os.Getenv("YDB_CONNECTION_STRING"))
require.NoError(t, err)

err = db.PingContext(ctx)
require.NoError(t, err)
require.NoError(t, db.PingContext(ctx))

_, err = ydb.Unwrap(db)
require.NoError(t, err)

err = db.Close()
require.NoError(t, err)
require.NoError(t, db.Close())
})

t.Run("sql.OpenDB", func(t *testing.T) {
Expand All @@ -64,26 +62,22 @@ func TestBasicExampleDatabaseSql(t *testing.T) {
require.NoError(t, err)

defer func() {
// cleanup
_ = nativeDriver.Close(ctx)
require.NoError(t, nativeDriver.Close(ctx))
}()

c, err := ydb.Connector(nativeDriver)
require.NoError(t, err)

defer func() {
// cleanup
_ = c.Close()
require.NoError(t, c.Close())
}()

db := sql.OpenDB(c)
defer func() {
// cleanup
_ = db.Close()
require.NoError(t, db.Close())
}()

err = db.PingContext(ctx)
require.NoError(t, err)
require.NoError(t, db.PingContext(ctx))

db.SetMaxOpenConns(50)
db.SetMaxIdleConns(50)
Expand Down

0 comments on commit 06a2f27

Please sign in to comment.