Skip to content

Commit

Permalink
fix(persistence): connection pool leak due to schema migrations
Browse files Browse the repository at this point in the history
Previously, a separate connection pool was created to execute schema
migrations. This pool was not properly shut down though, leading to open
connections and unused goroutines. This commit reuses the connection
pool created for entgo, thereby preventing any resource leaks.

Signed-off-by: Michael Adler <[email protected]>
  • Loading branch information
michaeladler committed Aug 3, 2023
1 parent 2e54f45 commit 2b0631a
Show file tree
Hide file tree
Showing 22 changed files with 242 additions and 38 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

- Send HTTP status code 404 when attempting to access the file server while it is disabled
- Configure TLS for Southbound API (if requested via CLI)
- Connection pool leak due to schema migrations (SQLite, MySQL)

### Changed

Expand Down
19 changes: 19 additions & 0 deletions api/main_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package api

/*
* SPDX-FileCopyrightText: 2023 Siemens AG
*
* SPDX-License-Identifier: Apache-2.0
*
* Author: Michael Adler <[email protected]>
*/

import (
"testing"

"go.uber.org/goleak"
)

func TestMain(m *testing.M) {
goleak.VerifyTestMain(m)
}
2 changes: 1 addition & 1 deletion api/workflow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ func newInMemoryDB(t *testing.T) persistence.Storage {
db := &entgo.SQLite{}
err := db.Initialize(context.Background(), "file:wfx?mode=memory&cache=shared&_fk=1")
require.NoError(t, err)

t.Cleanup(db.Shutdown)
t.Cleanup(func() {
{
list, _ := db.QueryJobs(context.Background(), persistence.FilterParams{}, persistence.SortParams{}, persistence.PaginationParams{Limit: 100})
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ require (
github.com/stretchr/testify v1.8.4
github.com/tsenart/vegeta/v12 v12.11.0
github.com/yourbasic/graph v0.0.0-20210606180040-8ecfec1c2869
go.uber.org/goleak v1.2.1
golang.org/x/term v0.10.0
gopkg.in/go-playground/colors.v1 v1.2.0
gopkg.in/yaml.v2 v2.4.0
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -317,6 +317,8 @@ go.opentelemetry.io/otel/trace v1.16.0 h1:8JRpaObFoW0pxuVPapkgH8UhHQj+bJW8jJsCZE
go.opentelemetry.io/otel/trace v1.16.0/go.mod h1:Yt9vYq1SdNz3xdjZZK7wcXv1qv2pwLkqr2QVwea0ef0=
go.uber.org/atomic v1.11.0 h1:ZvwS0R+56ePWxUNi+Atn9dWONBPp/AUETXlHW0DxSjE=
go.uber.org/atomic v1.11.0/go.mod h1:LUxbIzbOniOlMKjJjyPfpl4v+PKK2cNJn91OQbhoJI0=
go.uber.org/goleak v1.2.1 h1:NBol2c7O1ZokfZ0LEU9K6Whx/KnwvepVetCUhtKja4A=
go.uber.org/goleak v1.2.1/go.mod h1:qlT2yGI9QafXHhZZLxlSuNsMw3FFLxBr+tBRlmO1xH4=
golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20190422162423-af44ce270edf/go.mod h1:WFFai1msRO1wXaEeE5yQxYXgSfI8pQAWXbQop6sCtWE=
Expand Down
1 change: 1 addition & 0 deletions internal/handler/job/create_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ func newInMemoryDB(t *testing.T) persistence.Storage {
db := &entgo.SQLite{}
err := db.Initialize(context.Background(), "file:wfx?mode=memory&cache=shared&_fk=1")
require.NoError(t, err)
t.Cleanup(db.Shutdown)

require.NoError(t, err)
t.Cleanup(func() {
Expand Down
2 changes: 1 addition & 1 deletion internal/handler/job/definition/get_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func newInMemoryDB(t *testing.T) persistence.Storage {
db := &entgo.SQLite{}
err := db.Initialize(context.Background(), "file:wfx?mode=memory&cache=shared&_fk=1")
require.NoError(t, err)

t.Cleanup(db.Shutdown)
t.Cleanup(func() {
{
list, err := db.QueryJobs(context.Background(), persistence.FilterParams{}, persistence.SortParams{}, persistence.PaginationParams{Limit: 100})
Expand Down
19 changes: 19 additions & 0 deletions internal/handler/job/definition/main_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package definition

/*
* SPDX-FileCopyrightText: 2023 Siemens AG
*
* SPDX-License-Identifier: Apache-2.0
*
* Author: Michael Adler <[email protected]>
*/

import (
"testing"

"go.uber.org/goleak"
)

func TestMain(m *testing.M) {
goleak.VerifyTestMain(m)
}
19 changes: 19 additions & 0 deletions internal/handler/job/main_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package job

/*
* SPDX-FileCopyrightText: 2023 Siemens AG
*
* SPDX-License-Identifier: Apache-2.0
*
* Author: Michael Adler <[email protected]>
*/

import (
"testing"

"go.uber.org/goleak"
)

func TestMain(m *testing.M) {
goleak.VerifyTestMain(m)
}
1 change: 1 addition & 0 deletions internal/handler/job/status/get_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ func newInMemoryDB(t *testing.T) persistence.Storage {
db := &entgo.SQLite{}
err := db.Initialize(context.Background(), "file:wfx?mode=memory&cache=shared&_fk=1")
require.NoError(t, err)
t.Cleanup(db.Shutdown)

require.NoError(t, err)
t.Cleanup(func() {
Expand Down
19 changes: 19 additions & 0 deletions internal/handler/job/status/main_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package status

/*
* SPDX-FileCopyrightText: 2023 Siemens AG
*
* SPDX-License-Identifier: Apache-2.0
*
* Author: Michael Adler <[email protected]>
*/

import (
"testing"

"go.uber.org/goleak"
)

func TestMain(m *testing.M) {
goleak.VerifyTestMain(m)
}
1 change: 1 addition & 0 deletions internal/handler/job/tags/add_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ func newInMemoryDB(t *testing.T) persistence.Storage {
db := &entgo.SQLite{}
err := db.Initialize(context.Background(), "file:wfx?mode=memory&cache=shared&_fk=1")
require.NoError(t, err)
t.Cleanup(db.Shutdown)

t.Cleanup(func() {
{
Expand Down
19 changes: 19 additions & 0 deletions internal/handler/job/tags/main_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package tags

/*
* SPDX-FileCopyrightText: 2023 Siemens AG
*
* SPDX-License-Identifier: Apache-2.0
*
* Author: Michael Adler <[email protected]>
*/

import (
"testing"

"go.uber.org/goleak"
)

func TestMain(m *testing.M) {
goleak.VerifyTestMain(m)
}
9 changes: 5 additions & 4 deletions internal/handler/workflow/create_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,12 @@ func TestCreateWorkflow(t *testing.T) {
}

func newInMemoryDB(t *testing.T) persistence.Storage {
var sqlite entgo.SQLite
err := sqlite.Initialize(context.Background(), "file:wfx?mode=memory&cache=shared&_fk=1")
var db entgo.SQLite
err := db.Initialize(context.Background(), "file:wfx?mode=memory&cache=shared&_fk=1")
require.NoError(t, err)
t.Cleanup(db.Shutdown)
t.Cleanup(func() {
_ = sqlite.DeleteWorkflow(context.Background(), "wfx.workflow.dau.direct")
_ = db.DeleteWorkflow(context.Background(), "wfx.workflow.dau.direct")
})
return &sqlite
return &db
}
19 changes: 19 additions & 0 deletions internal/handler/workflow/main_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package workflow

/*
* SPDX-FileCopyrightText: 2023 Siemens AG
*
* SPDX-License-Identifier: Apache-2.0
*
* Author: Michael Adler <[email protected]>
*/

import (
"testing"

"go.uber.org/goleak"
)

func TestMain(m *testing.M) {
goleak.VerifyTestMain(m)
}
19 changes: 19 additions & 0 deletions internal/persistence/entgo/main_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package entgo

/*
* SPDX-FileCopyrightText: 2023 Siemens AG
*
* SPDX-License-Identifier: Apache-2.0
*
* Author: Michael Adler <[email protected]>
*/

import (
"testing"

"go.uber.org/goleak"
)

func TestMain(m *testing.M) {
goleak.VerifyTestMain(m)
}
26 changes: 17 additions & 9 deletions internal/persistence/entgo/mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,22 +58,30 @@ func (wrapper *MySQL) Initialize(ctx context.Context, options string) error {

db := sql.OpenDB(connector)
if err := db.PingContext(ctx); err != nil {
log.Error().Err(err).Msg("Failed to ping PostgreSQL database")
log.Error().Err(err).Msg("Failed to ping MySQL database")
_ = db.Close()
return fault.Wrap(err)
}

log.Info().Msg("Applying migrations")
src, err := iofs.New(mysqlMigrations, "migrations/mysql")
if err != nil {
return fault.Wrap(err)
}
{
log.Info().Msg("Applying migrations")
src, err := iofs.New(mysqlMigrations, "migrations/mysql")
if err != nil {
return fault.Wrap(err)
}

Check warning on line 71 in internal/persistence/entgo/mysql.go

View check run for this annotation

Codecov / codecov/patch

internal/persistence/entgo/mysql.go#L70-L71

Added lines #L70 - L71 were not covered by tests

{ // run migrations
drv, err := mysql.WithInstance(db, &mysql.Config{})
conn, err := db.Conn(ctx)
if err != nil {
return fault.Wrap(err)
}
if err := runMigrations(src, cfg.DBName, drv); err != nil {
defer conn.Close()

m, err := mysql.WithConnection(ctx, conn, &mysql.Config{})
if err != nil {
return fault.Wrap(err)
}

Check warning on line 82 in internal/persistence/entgo/mysql.go

View check run for this annotation

Codecov / codecov/patch

internal/persistence/entgo/mysql.go#L81-L82

Added lines #L81 - L82 were not covered by tests

if err := runMigrations(src, cfg.DBName, m); err != nil {
return fault.Wrap(err)
}
}
Expand Down
16 changes: 16 additions & 0 deletions internal/persistence/entgo/mysql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,27 @@ import (
"testing"

"github.com/siemens/wfx/internal/persistence/tests"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/goleak"
)

func TestMySQL_Initialize(t *testing.T) {
defer goleak.VerifyNone(t)
db := setupMySQL(t)
db.Shutdown()
}

func TestMain_InitializeFail(t *testing.T) {
dsn := "foo:bar@tcp(localhost)/wfx"
var mysql MySQL
err := mysql.Initialize(context.Background(), dsn)
assert.NotNil(t, err)
}

func TestMySQL(t *testing.T) {
db := setupMySQL(t)
t.Cleanup(db.Shutdown)
for _, testFn := range tests.AllTests {
name := runtime.FuncForPC(reflect.ValueOf(testFn).Pointer()).Name()
name = strings.TrimPrefix(filepath.Ext(name), ".")
Expand Down
8 changes: 8 additions & 0 deletions internal/persistence/entgo/postgres_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,18 @@ import (

"github.com/siemens/wfx/internal/persistence/tests"
"github.com/stretchr/testify/require"
"go.uber.org/goleak"
)

func TestPostgreSQL_Initialize(t *testing.T) {
defer goleak.VerifyNone(t)
db := setupPostgreSQL(t)
db.Shutdown()
}

func TestPostgreSQL(t *testing.T) {
db := setupPostgreSQL(t)
t.Cleanup(db.Shutdown)
for _, testFn := range tests.AllTests {
name := runtime.FuncForPC(reflect.ValueOf(testFn).Pointer()).Name()
name = strings.TrimPrefix(filepath.Ext(name), ".")
Expand Down
54 changes: 32 additions & 22 deletions internal/persistence/entgo/sqlite.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,10 @@ package entgo
import (
"context"
"embed"
"net/url"

"entgo.io/ent/dialect"
"entgo.io/ent/dialect/sql"
"github.com/Southclaws/fault"
"github.com/golang-migrate/migrate/v4/database/sqlite3"
"github.com/golang-migrate/migrate/v4/source/iofs"
Expand All @@ -36,34 +39,41 @@ func init() {
persistence.RegisterStorage("sqlite", &SQLite{})
}

func (wrapper *SQLite) Initialize(_ context.Context, options string) error {
log.Debug().
Str("dsn", options).
Msg("Initializing SQLite storage")

src, err := iofs.New(sqliteMigrations, "migrations/sqlite")
func (instance *SQLite) Initialize(_ context.Context, dsn string) error {
log.Debug().Str("dsn", dsn).Msg("Connecting to SQLite")
drv, err := sql.Open(dialect.SQLite, dsn)
if err != nil {
log.Error().Err(err).Msg("Failed opening connection to SQLite")

Check warning on line 46 in internal/persistence/entgo/sqlite.go

View check run for this annotation

Codecov / codecov/patch

internal/persistence/entgo/sqlite.go#L46

Added line #L46 was not covered by tests
return fault.Wrap(err)
}
client := ent.NewClient(ent.Driver(drv))
log.Debug().Msg("Connected to SQLite")
instance.Database = Database{client: client}

var sqlite sqlite3.Sqlite
driver, err := sqlite.Open(options)
if err != nil {
return fault.Wrap(err)
}
{
// run schema migrations
src, err := iofs.New(sqliteMigrations, "migrations/sqlite")
if err != nil {
return fault.Wrap(err)
}

Check warning on line 58 in internal/persistence/entgo/sqlite.go

View check run for this annotation

Codecov / codecov/patch

internal/persistence/entgo/sqlite.go#L57-L58

Added lines #L57 - L58 were not covered by tests

if err := runMigrations(src, "wfx", driver); err != nil {
return fault.Wrap(err)
}
purl, err := url.Parse(dsn)
if err != nil {
return fault.Wrap(err)
}

Check warning on line 63 in internal/persistence/entgo/sqlite.go

View check run for this annotation

Codecov / codecov/patch

internal/persistence/entgo/sqlite.go#L62-L63

Added lines #L62 - L63 were not covered by tests

log.Debug().Msg("Connecting to SQLite")
client, err := ent.Open("sqlite3", options)
if err != nil {
log.Error().Err(err).Msg("Failed opening connection to sqlite")
return fault.Wrap(err)
}
log.Debug().Msg("Connected to SQLite")
driver, err := sqlite3.WithInstance(drv.DB(), &sqlite3.Config{
MigrationsTable: sqlite3.DefaultMigrationsTable,
DatabaseName: purl.Path,
NoTxWrap: false,
})
if err != nil {
return fault.Wrap(err)
}

Check warning on line 72 in internal/persistence/entgo/sqlite.go

View check run for this annotation

Codecov / codecov/patch

internal/persistence/entgo/sqlite.go#L71-L72

Added lines #L71 - L72 were not covered by tests

wrapper.Database = Database{client: client}
if err := runMigrations(src, "wfx", driver); err != nil {
return fault.Wrap(err)
}

Check warning on line 76 in internal/persistence/entgo/sqlite.go

View check run for this annotation

Codecov / codecov/patch

internal/persistence/entgo/sqlite.go#L75-L76

Added lines #L75 - L76 were not covered by tests
}
return nil
}
Loading

0 comments on commit 2b0631a

Please sign in to comment.