diff --git a/pkg/replicator/pgreplicator/pg.go b/pkg/replicator/pgreplicator/pg.go index b68ee98..3fa4972 100644 --- a/pkg/replicator/pgreplicator/pg.go +++ b/pkg/replicator/pgreplicator/pg.go @@ -145,11 +145,6 @@ func (p *pg) Close(ctx context.Context) error { return nil } -func (p *pg) TestConnection(ctx context.Context) (replicator.ConnectionResult, error) { - _, err := p.ReplicationSlot(ctx) - return nil, err -} - func (p *pg) ReplicationSlot(ctx context.Context) (ReplicationSlot, error) { mode, err := p.walMode(ctx) if err != nil { diff --git a/pkg/replicator/pgreplicator/validator.go b/pkg/replicator/pgreplicator/validator.go new file mode 100644 index 0000000..0957ae4 --- /dev/null +++ b/pkg/replicator/pgreplicator/validator.go @@ -0,0 +1,42 @@ +package pgreplicator + +import ( + "context" + + "github.com/inngest/dbcap/pkg/replicator" + "github.com/inngest/dbcap/pkg/replicator/pgreplicator/pgsetup" + "github.com/jackc/pgx/v5" +) + +type InitializerOpts struct { + // AdminConfig are admin credentials to verify DB config, eg. replication slots, publications, + // wal mode, etc. + AdminConfig pgx.ConnConfig + + // Password is the password to use when creating the new replication user. + Password string +} + +func NewInitializer(ctx context.Context, opts InitializerOpts) replicator.SystemInitializer { + return initializer{opts: opts} +} + +type initializer struct { + opts InitializerOpts +} + +// PerformInit perform setup for the replicator. +func (i initializer) PerformInit(ctx context.Context) (replicator.ConnectionResult, error) { + return pgsetup.Setup(ctx, pgsetup.SetupOpts{ + AdminConfig: i.opts.AdminConfig, + Password: i.opts.Password, + }) +} + +// CheckInit checks setup for the replicator. +func (i initializer) CheckInit(ctx context.Context) (replicator.ConnectionResult, error) { + return pgsetup.Check(ctx, pgsetup.SetupOpts{ + AdminConfig: i.opts.AdminConfig, + Password: i.opts.Password, + }) +} diff --git a/pkg/replicator/replicator.go b/pkg/replicator/replicator.go index d5b2b30..9744a8e 100644 --- a/pkg/replicator/replicator.go +++ b/pkg/replicator/replicator.go @@ -19,6 +19,8 @@ type WatermarkSaver func(ctx context.Context, watermark changeset.Watermark) err type WatermarkLoader func(ctx context.Context) (*changeset.Watermark, error) type Replicator interface { + changeset.WatermarkCommitter + // Pull is a blocking method which pulls changes from an external source, // sending all found changesets on the given changeset channel. // @@ -32,12 +34,6 @@ type Replicator interface { // Stop stops pulling and shuts down the replicator. This is an alternative // to cancelling the context passed into Pull. Stop() - - // TestConnection tests the replicator connection, returning connection information - // and any errors with the setup. - TestConnection(ctx context.Context) (ConnectionResult, error) - - changeset.WatermarkCommitter } type ConnectionResult interface { @@ -55,3 +51,11 @@ type ConnectionStepResult struct { Error error `json:"error"` Complete bool `json:"complete"` } + +type SystemInitializer interface { + // PerformInit perform setup for the replicator. + PerformInit(ctx context.Context) (ConnectionResult, error) + + // CheckInit ensures that the setup for the replicator is complete. + CheckInit(ctx context.Context) (ConnectionResult, error) +}