Skip to content

Commit

Permalink
Simple factory pattern (#51)
Browse files Browse the repository at this point in the history
  • Loading branch information
shayonj authored Nov 16, 2024
1 parent ad62ed4 commit 24be6e3
Show file tree
Hide file tree
Showing 12 changed files with 297 additions and 124 deletions.
17 changes: 16 additions & 1 deletion cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,22 @@ func runReplicator(_ *cobra.Command, _ []string) {

maxCopyWorkersPerTable := viper.GetInt("max-copy-workers-per-table")

rep, err := replicator.NewReplicator(config, natsClient, copyAndStream, copyMode, maxCopyWorkersPerTable)
var factory replicator.Factory

if copyMode {
factory = &replicator.CopyAndStreamReplicatorFactory{
MaxCopyWorkersPerTable: maxCopyWorkersPerTable,
CopyOnly: true,
}
} else if copyAndStream {
factory = &replicator.CopyAndStreamReplicatorFactory{
MaxCopyWorkersPerTable: maxCopyWorkersPerTable,
}
} else {
factory = &replicator.StreamReplicatorFactory{}
}

rep, err := factory.CreateReplicator(config, natsClient)
if err != nil {
log.Fatal().Err(err).Msg("Failed to create replicator")
}
Expand Down
6 changes: 3 additions & 3 deletions internal/scripts/e2e_test_local.sh
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,9 @@ make build

setup_docker

log "Running e2e transform filter tests..."
if CI=false ./internal/scripts/e2e_transform_filter.sh; then
success "e2e transform filter tests completed successfully"
log "Running e2e postgres tests..."
if CI=false ./internal/scripts/e2e_postgres.sh; then
success "e2e postgres tests completed successfully"
else
error "Original e2e tests failed"
exit 1
Expand Down
11 changes: 2 additions & 9 deletions pkg/replicator/base_replicator.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package replicator
import (
"context"
"fmt"
"os"
"strings"
"time"

Expand All @@ -13,15 +12,9 @@ import (
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgproto3"
"github.com/pgflo/pg_flo/pkg/utils"
"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
)

func init() {
log.Logger = log.Output(zerolog.ConsoleWriter{Out: os.Stdout, TimeFormat: "15:04:05.000"})
zerolog.TimeFieldFormat = "2006-01-02T15:04:05.000Z07:00"
}

// GeneratePublicationName generates a deterministic publication name based on the group name
func GeneratePublicationName(group string) string {
group = strings.ReplaceAll(group, "-", "_")
Expand All @@ -34,7 +27,7 @@ type BaseReplicator struct {
ReplicationConn ReplicationConnection
StandardConn StandardConnection
Relations map[uint32]*pglogrepl.RelationMessage
Logger zerolog.Logger
Logger utils.Logger
TableDetails map[string][]string
LastLSN pglogrepl.LSN
NATSClient NATSClient
Expand All @@ -47,7 +40,7 @@ func NewBaseReplicator(config Config, replicationConn ReplicationConnection, sta
config.Schema = "public"
}

logger := log.With().Str("component", "replicator").Logger()
logger := utils.NewZerologLogger(log.With().Str("component", "replicator").Logger())

br := &BaseReplicator{
Config: config,
Expand Down
111 changes: 111 additions & 0 deletions pkg/replicator/factory.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
package replicator

import (
"context"
"fmt"
)

// ReplicatorFactory defines the interface for creating replicators
type Factory interface {
CreateReplicator(config Config, natsClient NATSClient) (Replicator, error)
}

// BaseFactory provides common functionality for factories
type BaseFactory struct{}

// CreateConnections creates replication and standard connections
func (f *BaseFactory) CreateConnections(config Config) (ReplicationConnection, StandardConnection, error) {
replicationConn := NewReplicationConnection(config)
if err := replicationConn.Connect(context.Background()); err != nil {
return nil, nil, fmt.Errorf("failed to connect for replication: %v", err)
}

standardConn, err := NewStandardConnection(config)
if err != nil {
return nil, nil, fmt.Errorf("failed to create standard connection: %v", err)
}

return replicationConn, standardConn, nil
}

// StreamReplicatorFactory creates `StreamReplicator` instances
type StreamReplicatorFactory struct {
BaseFactory
}

// CreateReplicator creates a new `StreamReplicator`
func (f *StreamReplicatorFactory) CreateReplicator(config Config, natsClient NATSClient) (Replicator, error) {
replicationConn, standardConn, err := f.CreateConnections(config)
if err != nil {
return nil, err
}

baseReplicator := NewBaseReplicator(config, replicationConn, standardConn, natsClient)

var ddlReplicator *DDLReplicator
if config.TrackDDL {
ddlConn, err := NewStandardConnection(config)
if err != nil {
return nil, fmt.Errorf("failed to create DDL connection: %v", err)
}
ddlReplicator, err = NewDDLReplicator(config, baseReplicator, ddlConn)
if err != nil {
return nil, fmt.Errorf("failed to create DDL replicator: %v", err)
}
}

streamReplicator := &StreamReplicator{
BaseReplicator: *baseReplicator,
}

if ddlReplicator != nil {
streamReplicator.DDLReplicator = *ddlReplicator
}

return streamReplicator, nil
}

// CopyAndStreamReplicatorFactory creates `CopyAndStreamReplicator` instances
type CopyAndStreamReplicatorFactory struct {
BaseFactory
MaxCopyWorkersPerTable int
CopyOnly bool
}

// CreateReplicator creates a new `CopyAndStreamReplicator`
func (f *CopyAndStreamReplicatorFactory) CreateReplicator(config Config, natsClient NATSClient) (Replicator, error) {
replicationConn, standardConn, err := f.CreateConnections(config)
if err != nil {
return nil, err
}

baseReplicator := NewBaseReplicator(config, replicationConn, standardConn, natsClient)

var ddlReplicator *DDLReplicator
if config.TrackDDL {
ddlConn, err := NewStandardConnection(config)
if err != nil {
return nil, fmt.Errorf("failed to create DDL connection: %v", err)
}
ddlReplicator, err = NewDDLReplicator(config, baseReplicator, ddlConn)
if err != nil {
return nil, fmt.Errorf("failed to create DDL replicator: %v", err)
}
}

if f.MaxCopyWorkersPerTable <= 0 {
f.MaxCopyWorkersPerTable = 4
}

copyAndStreamReplicator := &CopyAndStreamReplicator{
BaseReplicator: *baseReplicator,
MaxCopyWorkersPerTable: f.MaxCopyWorkersPerTable,
CopyOnly: f.CopyOnly,
}

if ddlReplicator != nil {
copyAndStreamReplicator.DDLReplicator = *ddlReplicator
}

return copyAndStreamReplicator, nil
}
72 changes: 0 additions & 72 deletions pkg/replicator/replicator.go

This file was deleted.

Loading

0 comments on commit 24be6e3

Please sign in to comment.