Skip to content

Commit

Permalink
Speed up
Browse files Browse the repository at this point in the history
  • Loading branch information
shayonj committed Oct 14, 2024
1 parent 64057a0 commit bb34111
Show file tree
Hide file tree
Showing 6 changed files with 194 additions and 48 deletions.
30 changes: 28 additions & 2 deletions cmd/root.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
package cmd

import (
"context"
"fmt"
"os"
"os/signal"
"strings"
"syscall"

"github.com/rs/zerolog/log"
"github.com/shayonj/pg_flo/pkg/pgflonats"
Expand Down Expand Up @@ -106,6 +109,7 @@ func init() {
postgresWorkerCmd.Flags().String("postgres-user", "", "Target PostgreSQL user (env: PG_FLO_POSTGRES_USER)")
postgresWorkerCmd.Flags().String("postgres-password", "", "Target PostgreSQL password (env: PG_FLO_POSTGRES_PASSWORD)")
postgresWorkerCmd.Flags().Bool("postgres-sync-schema", false, "Sync schema from source to target (env: PG_FLO_POSTGRES_SYNC_SCHEMA)")
postgresWorkerCmd.Flags().Bool("postgres-disable-foreign-keys", false, "Disable foreign key checks during write (env: PG_FLO_POSTGRES_DISABLE_FOREIGN_KEYS)")

markFlagRequired(postgresWorkerCmd, "postgres-host", "postgres-dbname", "postgres-user", "postgres-password")

Expand Down Expand Up @@ -228,9 +232,30 @@ func runWorker(cmd *cobra.Command, _ []string) {
}

w := worker.NewWorker(natsClient, ruleEngine, sink, group)
if err := w.Start(cmd.Context()); err != nil {
log.Fatal().Err(err).Msg("Worker failed")

ctx, cancel := context.WithCancel(cmd.Context())
defer cancel()

// Set up signal handling
sigCh := make(chan os.Signal, 1)
signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)

go func() {
sig := <-sigCh
log.Info().Msgf("Received shutdown signal: %v. Canceling context...", sig)
cancel()
}()

log.Info().Msg("Starting worker...")
if err := w.Start(ctx); err != nil {
if err == context.Canceled {
log.Info().Msg("Worker shut down gracefully")
} else {
log.Error().Err(err).Msg("Worker encountered an error during shutdown")
}
}

log.Info().Msg("Worker process exiting")
}

func loadRulesConfig(filePath string) (rules.Config, error) {
Expand Down Expand Up @@ -267,6 +292,7 @@ func createSink(sinkType string) (sinks.Sink, error) {
viper.GetString("dbname"),
viper.GetString("user"),
viper.GetString("password"),
viper.GetBool("postgres-disable-foreign-keys"),
)
case "webhook":
return sinks.NewWebhookSink(
Expand Down
2 changes: 1 addition & 1 deletion internal/e2e_test_local.sh
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ trap cleanup EXIT

make build

# setup_docker
setup_docker

log "Running e2e copy & stream tests..."
if CI=false ./internal/e2e_copy_and_stream.sh; then
Expand Down
3 changes: 3 additions & 0 deletions internal/e2e_test_stream.sh
Original file line number Diff line number Diff line change
Expand Up @@ -46,14 +46,17 @@ simulate_changes() {
local update_count=500
local delete_count=250

log "Simulating inserts..."
for i in $(seq 1 $insert_count); do
run_sql "INSERT INTO public.users (data) VALUES ('Data $i');"
done

log "Simulating updates..."
for i in $(seq 1 $update_count); do
run_sql "UPDATE public.users SET data = 'Updated data $i' WHERE id = $i;"
done

log "Simulating deletes..."
for i in $(seq 1 $delete_count); do
run_sql "DELETE FROM public.users WHERE id = $i;"
done
Expand Down
2 changes: 1 addition & 1 deletion pkg/replicator/copy_and_stream_replicator.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ func (r *CopyAndStreamReplicator) getRelPages(ctx context.Context, tableName str
// generateRanges creates a set of page ranges for copying.
func (r *CopyAndStreamReplicator) generateRanges(relPages uint32) [][2]uint32 {
var ranges [][2]uint32
batchSize := uint32(100)
batchSize := uint32(1000)
for start := uint32(0); start < relPages; start += batchSize {
end := start + batchSize
if end >= relPages {
Expand Down
38 changes: 32 additions & 6 deletions pkg/sinks/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,12 @@ import (

// PostgresSink represents a sink for PostgreSQL database
type PostgresSink struct {
conn *pgx.Conn
conn *pgx.Conn
disableForeignKeyChecks bool
}

// NewPostgresSink creates a new PostgresSink instance
func NewPostgresSink(targetHost string, targetPort int, targetDBName, targetUser, targetPassword string, syncSchema bool, sourceHost string, sourcePort int, sourceDBName, sourceUser, sourcePassword string) (*PostgresSink, error) {
func NewPostgresSink(targetHost string, targetPort int, targetDBName, targetUser, targetPassword string, syncSchema bool, sourceHost string, sourcePort int, sourceDBName, sourceUser, sourcePassword string, disableForeignKeyChecks bool) (*PostgresSink, error) {
connConfig, err := pgx.ParseConfig(fmt.Sprintf("host=%s port=%d dbname=%s user=%s password=%s", targetHost, targetPort, targetDBName, targetUser, targetPassword))
if err != nil {
return nil, fmt.Errorf("failed to parse connection config: %v", err)
Expand All @@ -30,7 +31,8 @@ func NewPostgresSink(targetHost string, targetPort int, targetDBName, targetUser
}

sink := &PostgresSink{
conn: conn,
conn: conn,
disableForeignKeyChecks: disableForeignKeyChecks,
}

if syncSchema {
Expand Down Expand Up @@ -162,18 +164,42 @@ func (s *PostgresSink) handleDDL(tx pgx.Tx, message *utils.CDCMessage) error {
return err
}

// disableForeignKeys disables foreign key checks
func (s *PostgresSink) disableForeignKeys(ctx context.Context) error {
_, err := s.conn.Exec(ctx, "SET session_replication_role = 'replica';")
return err
}

// enableForeignKeys enables foreign key checks
func (s *PostgresSink) enableForeignKeys(ctx context.Context) error {
_, err := s.conn.Exec(ctx, "SET session_replication_role = 'origin';")
return err
}

// WriteBatch writes a batch of CDC messages to the target database
func (s *PostgresSink) WriteBatch(messages []*utils.CDCMessage) error {
tx, err := s.conn.Begin(context.Background())
ctx := context.Background()
tx, err := s.conn.Begin(ctx)
if err != nil {
return fmt.Errorf("failed to begin transaction: %v", err)
}
defer func() {
if err := tx.Rollback(context.Background()); err != nil && err != pgx.ErrTxClosed {
if err := tx.Rollback(ctx); err != nil && err != pgx.ErrTxClosed {
log.Error().Err(err).Msg("failed to rollback transaction")
}
}()

if s.disableForeignKeyChecks {
if err := s.disableForeignKeys(ctx); err != nil {
return fmt.Errorf("failed to disable foreign key checks: %v", err)
}
defer func() {
if err := s.enableForeignKeys(ctx); err != nil {
log.Error().Err(err).Msg("failed to re-enable foreign key checks")
}
}()
}

for _, message := range messages {
var err error
switch message.Type {
Expand All @@ -194,7 +220,7 @@ func (s *PostgresSink) WriteBatch(messages []*utils.CDCMessage) error {
}
}

if err := tx.Commit(context.Background()); err != nil {
if err := tx.Commit(ctx); err != nil {
return fmt.Errorf("failed to commit transaction: %v", err)
}

Expand Down
Loading

0 comments on commit bb34111

Please sign in to comment.