Skip to content

Commit

Permalink
Add unit tests covering logical replication and conn slot errors
Browse files Browse the repository at this point in the history
  • Loading branch information
tonyhb committed Sep 3, 2024
1 parent 0a3e4a0 commit 751d0a0
Show file tree
Hide file tree
Showing 1,633 changed files with 266,363 additions and 20,936 deletions.
62 changes: 54 additions & 8 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,18 +7,64 @@ require (
github.com/jackc/pglogrepl v0.0.0-20240307033717-828fbfe908e9
github.com/jackc/pgx/v5 v5.6.0
github.com/replicase/pgcapture v0.0.62
github.com/stretchr/testify v1.9.0
github.com/testcontainers/testcontainers-go v0.33.0
github.com/testcontainers/testcontainers-go/modules/postgres v0.33.0
)

require (
github.com/golang/protobuf v1.5.3 // indirect
dario.cat/mergo v1.0.0 // indirect
github.com/Azure/go-ansiterm v0.0.0-20210617225240-d185dfc1b5a1 // indirect
github.com/Microsoft/go-winio v0.6.2 // indirect
github.com/cenkalti/backoff/v4 v4.2.1 // indirect
github.com/containerd/containerd v1.7.18 // indirect
github.com/containerd/log v0.1.0 // indirect
github.com/containerd/platforms v0.2.1 // indirect
github.com/cpuguy83/dockercfg v0.3.1 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/distribution/reference v0.6.0 // indirect
github.com/docker/docker v27.1.1+incompatible // indirect
github.com/docker/go-connections v0.5.0 // indirect
github.com/docker/go-units v0.5.0 // indirect
github.com/felixge/httpsnoop v1.0.4 // indirect
github.com/go-logr/logr v1.4.1 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/go-ole/go-ole v1.2.6 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/google/uuid v1.6.0 // indirect
github.com/jackc/pgio v1.0.0 // indirect
github.com/jackc/pgpassfile v1.0.0 // indirect
github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a // indirect
golang.org/x/crypto v0.17.0 // indirect
golang.org/x/net v0.12.0 // indirect
golang.org/x/sys v0.15.0 // indirect
golang.org/x/text v0.14.0 // indirect
google.golang.org/genproto v0.0.0-20210602131652-f16073e35f0c // indirect
google.golang.org/grpc v1.38.0 // indirect
google.golang.org/protobuf v1.30.0 // indirect
github.com/klauspost/compress v1.17.4 // indirect
github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0 // indirect
github.com/magiconair/properties v1.8.7 // indirect
github.com/moby/docker-image-spec v1.3.1 // indirect
github.com/moby/patternmatcher v0.6.0 // indirect
github.com/moby/sys/sequential v0.5.0 // indirect
github.com/moby/sys/user v0.1.0 // indirect
github.com/moby/term v0.5.0 // indirect
github.com/morikuni/aec v1.0.0 // indirect
github.com/opencontainers/go-digest v1.0.0 // indirect
github.com/opencontainers/image-spec v1.1.0 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c // indirect
github.com/shirou/gopsutil/v3 v3.23.12 // indirect
github.com/shoenig/go-m1cpu v0.1.6 // indirect
github.com/sirupsen/logrus v1.9.3 // indirect
github.com/tklauser/go-sysconf v0.3.12 // indirect
github.com/tklauser/numcpus v0.6.1 // indirect
github.com/yusufpapurcu/wmi v1.2.3 // indirect
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.49.0 // indirect
go.opentelemetry.io/otel v1.24.0 // indirect
go.opentelemetry.io/otel/metric v1.24.0 // indirect
go.opentelemetry.io/otel/trace v1.24.0 // indirect
golang.org/x/crypto v0.24.0 // indirect
golang.org/x/net v0.26.0 // indirect
golang.org/x/sys v0.21.0 // indirect
golang.org/x/text v0.16.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240318140521-94a12d6c2237 // indirect
google.golang.org/grpc v1.64.1 // indirect
google.golang.org/protobuf v1.33.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
269 changes: 167 additions & 102 deletions go.sum

Large diffs are not rendered by default.

101 changes: 101 additions & 0 deletions internal/test/pg.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
package test

import (
"context"
"fmt"
"strings"
"testing"

"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgconn"
"github.com/stretchr/testify/require"
tc "github.com/testcontainers/testcontainers-go"
pgtc "github.com/testcontainers/testcontainers-go/modules/postgres"
)

type StartPGOpts struct {
Version int
DisableLogicalReplication bool
DisableCreateRoles bool
DisableCreateSlot bool
}

func StartPG(t *testing.T, ctx context.Context, opts StartPGOpts) (tc.Container, pgx.ConnConfig) {
t.Helper()
args := []tc.ContainerCustomizer{
pgtc.WithDatabase("db"),
pgtc.WithUsername("postgres"),
pgtc.WithPassword("password"),
pgtc.BasicWaitStrategies(),
}
if !opts.DisableLogicalReplication {
args = append(args, tc.CustomizeRequest(tc.GenericContainerRequest{
ContainerRequest: tc.ContainerRequest{
Cmd: []string{"-c", "wal_level=logical"},
},
}))
}
c, err := pgtc.Run(ctx,
fmt.Sprintf("docker.io/postgres:%d-alpine", opts.Version),
args...,
)

conn, err := pgconn.Connect(ctx, connString(t, c))
if err != nil {
require.NoError(t, err)
}

if !opts.DisableCreateRoles {
// Create the replication slot.
err := prepareRoles(ctx, conn)
require.NoError(t, err)
}
if !opts.DisableCreateSlot {
// Create the replication slot.
err := createReplicationSlot(ctx, conn)
require.NoError(t, err)
}

require.NoError(t, err)
return c, connOpts(t, c)
}

func prepareRoles(ctx context.Context, c *pgconn.PgConn) error {
stmt := `
CREATE USER inngest WITH REPLICATION PASSWORD 'password';
GRANT USAGE ON SCHEMA public TO inngest;
GRANT SELECT ON ALL TABLES IN SCHEMA public TO inngest;
ALTER DEFAULT PRIVILEGES IN SCHEMA public GRANT SELECT ON TABLES TO inngest;
CREATE PUBLICATION inngest FOR ALL TABLES;
`
res := c.Exec(ctx, stmt)
if err := res.Close(); err != nil {
return err
}
return nil
}

func createReplicationSlot(ctx context.Context, c *pgconn.PgConn) error {
stmt := `
-- pgoutput logical repl plugin
SELECT pg_create_logical_replication_slot('inngest_cdc', 'pgoutput');
`
res := c.Exec(ctx, stmt)
if err := res.Close(); err != nil {
return err
}
return nil
}

func connString(t *testing.T, c tc.Container) string {
p, err := c.MappedPort(context.TODO(), "5432")
require.NoError(t, err)
port := strings.ReplaceAll(string(p), "/tcp", "")
return fmt.Sprintf("postgres://postgres:password@localhost:%s/db", port)
}

func connOpts(t *testing.T, c tc.Container) pgx.ConnConfig {
cfg, err := pgx.ParseConfig(connString(t, c))
require.NoError(t, err)
return *cfg
}
40 changes: 31 additions & 9 deletions pkg/replicator/pg.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"log/slog"
"strings"
"sync/atomic"
"time"

Expand All @@ -19,6 +20,9 @@ import (

var (
ReadTimeout = time.Second * 5

ErrLogicalReplicationNotSetUp = fmt.Errorf("ERR_PG_001: Your database does not have logical replication configured. You must set the WAL level to 'logical' to stream events.")
ErrReplicationSlotNotFound = fmt.Errorf("ERR_PG_002: The replication slot 'inngest_cdc' doesn't exist in your database. Please create the logical replication slot to stream events.")
)

type WatermarkCommitter interface {
Expand Down Expand Up @@ -119,21 +123,16 @@ func (p *pg) Commit(wm changeset.Watermark) {
atomic.StoreInt64(&p.lsnTime, wm.ServerTime.UnixNano())
}

func (p *pg) Pull(ctx context.Context, cc chan *changeset.Changeset) error {
func (p *pg) Connect(ctx context.Context, lsn pglogrepl.LSN) error {
identify, err := pglogrepl.IdentifySystem(ctx, p.conn)
if err != nil {
return fmt.Errorf("error identifying postgres: %w", err)
}

// By default, start at the current LSN, ie. the latest point in the stream.
startLSN := identify.XLogPos

if p.opts.WatermarkLoader != nil {
watermark, err := p.opts.WatermarkLoader(ctx)
if err != nil {
return fmt.Errorf("error loading watermark: %w", err)
}
startLSN = watermark.LSN
if lsn > 0 {
startLSN = lsn
}

err = pglogrepl.StartReplication(
Expand All @@ -147,9 +146,32 @@ func (p *pg) Pull(ctx context.Context, cc chan *changeset.Changeset) error {
},
)
if err != nil {
// XXX: Failure modes - what if the LSN is too far behind?
msg := err.Error()
if strings.Contains(msg, "logical decoding requires wal_level") {
return ErrLogicalReplicationNotSetUp
}
if strings.Contains(msg, fmt.Sprintf(`replication slot "%s" does not exist`, pgconsts.SlotName)) {
return ErrReplicationSlotNotFound
}
return fmt.Errorf("error starting logical replication: %w", err)
}
return nil
}

func (p *pg) Pull(ctx context.Context, cc chan *changeset.Changeset) error {
// By default, start at the current LSN, ie. the latest point in the stream.
var startLSN pglogrepl.LSN
if p.opts.WatermarkLoader != nil {
watermark, err := p.opts.WatermarkLoader(ctx)
if err != nil {
return fmt.Errorf("error loading watermark: %w", err)
}
startLSN = watermark.LSN
}

if err := p.Connect(ctx, pglogrepl.LSN(startLSN)); err != nil {
return err
}

for {
if ctx.Err() != nil {
Expand Down
85 changes: 85 additions & 0 deletions pkg/replicator/pg_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
package replicator

import (
"context"
"sync"
"testing"

"github.com/inngest/pgcap/internal/test"
"github.com/stretchr/testify/require"
)

func TestConnectingWithoutLogicalReplicationFails(t *testing.T) {
ctx := context.Background()
versions := []int{12, 13, 14, 15, 16}

for _, v := range versions {
c, conn := test.StartPG(t, ctx, test.StartPGOpts{
Version: v,
DisableLogicalReplication: true,
DisableCreateSlot: true,
})

opts := PostgresOpts{Config: conn}
r, err := Postgres(ctx, opts)
require.NoError(t, err)

err = r.Pull(ctx, nil)
require.ErrorIs(t, err, ErrLogicalReplicationNotSetUp)

c.Stop(ctx, nil)
}
}

func TestConnectingWithoutReplicationSlotFails(t *testing.T) {
ctx := context.Background()
versions := []int{12, 13, 14, 15, 16}

for _, v := range versions {
c, conn := test.StartPG(t, ctx, test.StartPGOpts{
Version: v,
DisableCreateSlot: true,
})

opts := PostgresOpts{Config: conn}
r, err := Postgres(ctx, opts)
require.NoError(t, err)

err = r.Pull(ctx, nil)
require.ErrorIs(t, err, ErrReplicationSlotNotFound)

c.Stop(ctx, nil)
}
}

func TestMultipleConectionsFail(t *testing.T) {
// versions := []int{12, 13, 14, 15, 16}
versions := []int{14}

for _, v := range versions {
ctx := context.Background()
c, conn := test.StartPG(t, ctx, test.StartPGOpts{
Version: v,
})

// The first time we connect things should succeed.
opts := PostgresOpts{Config: conn}
r1, err := Postgres(ctx, opts)
require.NoError(t, err)

wg := sync.WaitGroup{}

wg.Add(1)
go func() {
defer wg.Done()
err := r1.Pull(ctx, nil)
require.NoError(t, err)
}()

r2, err := Postgres(ctx, opts)
err = r2.Pull(ctx, nil)
require.Error(t, err)

c.Stop(ctx, nil)
}
}
12 changes: 12 additions & 0 deletions vendor/dario.cat/mergo/.deepsource.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
version = 1

test_patterns = [
"*_test.go"
]

[[analyzers]]
name = "go"
enabled = true

[analyzers.meta]
import_path = "dario.cat/mergo"
33 changes: 33 additions & 0 deletions vendor/dario.cat/mergo/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
#### joe made this: http://goel.io/joe

#### go ####
# Binaries for programs and plugins
*.exe
*.dll
*.so
*.dylib

# Test binary, build with `go test -c`
*.test

# Output of the go coverage tool, specifically when used with LiteIDE
*.out

# Project-local glide cache, RE: https://github.com/Masterminds/glide/issues/736
.glide/

#### vim ####
# Swap
[._]*.s[a-v][a-z]
[._]*.sw[a-p]
[._]s[a-v][a-z]
[._]sw[a-p]

# Session
Session.vim

# Temporary
.netrwhist
*~
# Auto-generated tag files
tags
12 changes: 12 additions & 0 deletions vendor/dario.cat/mergo/.travis.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
language: go
arch:
- amd64
- ppc64le
install:
- go get -t
- go get golang.org/x/tools/cmd/cover
- go get github.com/mattn/goveralls
script:
- go test -race -v ./...
after_script:
- $HOME/gopath/bin/goveralls -service=travis-ci -repotoken $COVERALLS_TOKEN
Loading

0 comments on commit 751d0a0

Please sign in to comment.