From c7f90aca2aca30a5e2bb059f148d8a2c3a054563 Mon Sep 17 00:00:00 2001 From: Shayon Mukherjee Date: Tue, 1 Oct 2024 12:06:14 -0400 Subject: [PATCH] wip --- cmd/root.go | 21 ++++++++++-- internal/e2e_test_local.sh | 16 +++++----- pkg/worker/worker.go | 65 +++++++++++++++++++++++++++++++------- 3 files changed, 81 insertions(+), 21 deletions(-) diff --git a/cmd/root.go b/cmd/root.go index 053490b..19a49c1 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -1,9 +1,12 @@ package cmd import ( + "context" "fmt" "os" + "os/signal" "strings" + "syscall" "github.com/rs/zerolog/log" "github.com/shayonj/pg_flo/pkg/pgflonats" @@ -228,10 +231,24 @@ func runWorker(cmd *cobra.Command, _ []string) { log.Fatal().Err(err).Msg("Failed to create sink") } + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + w := worker.NewWorker(natsClient, ruleEngine, sink, group) - if err := w.Start(cmd.Context()); err != nil { - log.Fatal().Err(err).Msg("Worker failed") + + go func() { + sigChan := make(chan os.Signal, 1) + signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM) + <-sigChan + log.Info().Msg("Shutdown signal received. Cancelling context...") + cancel() + }() + + if err := w.Start(ctx); err != nil { + log.Error().Err(err).Msg("Worker encountered an error") } + + log.Info().Msg("Worker shut down gracefully") } func loadRulesConfig(filePath string) (rules.Config, error) { diff --git a/internal/e2e_test_local.sh b/internal/e2e_test_local.sh index 48ec1b0..d25e370 100755 --- a/internal/e2e_test_local.sh +++ b/internal/e2e_test_local.sh @@ -30,15 +30,15 @@ trap cleanup EXIT make build -setup_docker +# setup_docker -log "Running e2e copy & stream tests..." -if CI=false ./internal/e2e_copy_and_stream.sh; then - success "Original e2e tests completed successfully" -else - error "Original e2e tests failed" - exit 1 -fi +# log "Running e2e copy & stream tests..." +# if CI=false ./internal/e2e_copy_and_stream.sh; then +# success "Original e2e tests completed successfully" +# else +# error "Original e2e tests failed" +# exit 1 +# fi setup_docker diff --git a/pkg/worker/worker.go b/pkg/worker/worker.go index 4a112be..2882e9a 100644 --- a/pkg/worker/worker.go +++ b/pkg/worker/worker.go @@ -3,6 +3,10 @@ package worker import ( "context" "fmt" + "os" + "os/signal" + "syscall" + "time" "github.com/nats-io/nats.go/jetstream" "github.com/rs/zerolog" @@ -23,6 +27,8 @@ type Worker struct { maxRetries int buffer []*utils.CDCMessage lastSavedState uint64 + flushInterval time.Duration + done chan struct{} } // NewWorker creates and returns a new Worker instance with the provided NATS client, rule engine, sink, and group. @@ -39,6 +45,8 @@ func NewWorker(natsClient *pgflonats.NATSClient, ruleEngine *rules.RuleEngine, s maxRetries: 3, buffer: make([]*utils.CDCMessage, 0, 1000), lastSavedState: 0, + flushInterval: 5 * time.Second, + done: make(chan struct{}), } } @@ -99,6 +107,20 @@ func (w *Worker) Start(ctx context.Context) error { return fmt.Errorf("failed to create ordered consumer: %w", err) } + // Create a new context with cancellation + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + // Set up signal handling for graceful shutdown + sigChan := make(chan os.Signal, 1) + signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM) + + go func() { + <-sigChan + w.logger.Info().Msg("Received shutdown signal. Initiating graceful shutdown...") + cancel() + }() + return w.processMessages(ctx, cons) } @@ -110,24 +132,43 @@ func (w *Worker) processMessages(ctx context.Context, cons jetstream.Consumer) e } defer iter.Stop() + flushTicker := time.NewTicker(w.flushInterval) + defer flushTicker.Stop() + for { select { case <-ctx.Done(): + w.logger.Info().Msg("Context cancelled. Flushing remaining messages...") + if err := w.flushBuffer(); err != nil { + w.logger.Error().Err(err).Msg("Failed to flush buffer during shutdown") + } return ctx.Err() - default: - msg, err := iter.Next() - if err != nil { - w.logger.Error().Err(err).Msg("Failed to get next message") - continue + case <-flushTicker.C: + if err := w.flushBuffer(); err != nil { + w.logger.Error().Err(err).Msg("Failed to flush buffer on interval") } + default: + select { + case <-ctx.Done(): + return ctx.Err() + default: + msg, err := iter.Next() + if err != nil { + if err == context.Canceled { + return err + } + w.logger.Error().Err(err).Msg("Failed to get next message") + continue + } - if err := w.processMessage(msg); err != nil { - w.logger.Error().Err(err).Msg("Failed to process message") - } + if err := w.processMessage(msg); err != nil { + w.logger.Error().Err(err).Msg("Failed to process message") + } - if len(w.buffer) >= w.batchSize { - if err := w.flushBuffer(); err != nil { - w.logger.Error().Err(err).Msg("Failed to flush buffer") + if len(w.buffer) >= w.batchSize { + if err := w.flushBuffer(); err != nil { + w.logger.Error().Err(err).Msg("Failed to flush buffer") + } } } } @@ -173,6 +214,8 @@ func (w *Worker) flushBuffer() error { return nil } + w.logger.Debug().Int("messages", len(w.buffer)).Msg("Flushing buffer") + err := w.sink.WriteBatch(w.buffer) if err != nil { w.logger.Error().Err(err).Msg("Failed to write batch to sink")