Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
shayonj committed Oct 1, 2024
1 parent b01b17c commit c7f90ac
Show file tree
Hide file tree
Showing 3 changed files with 81 additions and 21 deletions.
21 changes: 19 additions & 2 deletions cmd/root.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
package cmd

import (
"context"
"fmt"
"os"
"os/signal"
"strings"
"syscall"

"github.com/rs/zerolog/log"
"github.com/shayonj/pg_flo/pkg/pgflonats"
Expand Down Expand Up @@ -228,10 +231,24 @@ func runWorker(cmd *cobra.Command, _ []string) {
log.Fatal().Err(err).Msg("Failed to create sink")
}

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

w := worker.NewWorker(natsClient, ruleEngine, sink, group)
if err := w.Start(cmd.Context()); err != nil {
log.Fatal().Err(err).Msg("Worker failed")

go func() {
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
<-sigChan
log.Info().Msg("Shutdown signal received. Cancelling context...")
cancel()
}()

if err := w.Start(ctx); err != nil {
log.Error().Err(err).Msg("Worker encountered an error")
}

log.Info().Msg("Worker shut down gracefully")
}

func loadRulesConfig(filePath string) (rules.Config, error) {
Expand Down
16 changes: 8 additions & 8 deletions internal/e2e_test_local.sh
Original file line number Diff line number Diff line change
Expand Up @@ -30,15 +30,15 @@ trap cleanup EXIT

make build

setup_docker
# setup_docker

log "Running e2e copy & stream tests..."
if CI=false ./internal/e2e_copy_and_stream.sh; then
success "Original e2e tests completed successfully"
else
error "Original e2e tests failed"
exit 1
fi
# log "Running e2e copy & stream tests..."
# if CI=false ./internal/e2e_copy_and_stream.sh; then
# success "Original e2e tests completed successfully"
# else
# error "Original e2e tests failed"
# exit 1
# fi

setup_docker

Expand Down
65 changes: 54 additions & 11 deletions pkg/worker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,10 @@ package worker
import (
"context"
"fmt"
"os"
"os/signal"
"syscall"
"time"

"github.com/nats-io/nats.go/jetstream"
"github.com/rs/zerolog"
Expand All @@ -23,6 +27,8 @@ type Worker struct {
maxRetries int
buffer []*utils.CDCMessage
lastSavedState uint64
flushInterval time.Duration
done chan struct{}
}

// NewWorker creates and returns a new Worker instance with the provided NATS client, rule engine, sink, and group.
Expand All @@ -39,6 +45,8 @@ func NewWorker(natsClient *pgflonats.NATSClient, ruleEngine *rules.RuleEngine, s
maxRetries: 3,
buffer: make([]*utils.CDCMessage, 0, 1000),
lastSavedState: 0,
flushInterval: 5 * time.Second,
done: make(chan struct{}),
}
}

Expand Down Expand Up @@ -99,6 +107,20 @@ func (w *Worker) Start(ctx context.Context) error {
return fmt.Errorf("failed to create ordered consumer: %w", err)
}

// Create a new context with cancellation
ctx, cancel := context.WithCancel(ctx)
defer cancel()

// Set up signal handling for graceful shutdown
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)

go func() {
<-sigChan
w.logger.Info().Msg("Received shutdown signal. Initiating graceful shutdown...")
cancel()
}()

return w.processMessages(ctx, cons)
}

Expand All @@ -110,24 +132,43 @@ func (w *Worker) processMessages(ctx context.Context, cons jetstream.Consumer) e
}
defer iter.Stop()

flushTicker := time.NewTicker(w.flushInterval)
defer flushTicker.Stop()

for {
select {
case <-ctx.Done():
w.logger.Info().Msg("Context cancelled. Flushing remaining messages...")
if err := w.flushBuffer(); err != nil {
w.logger.Error().Err(err).Msg("Failed to flush buffer during shutdown")
}
return ctx.Err()
default:
msg, err := iter.Next()
if err != nil {
w.logger.Error().Err(err).Msg("Failed to get next message")
continue
case <-flushTicker.C:
if err := w.flushBuffer(); err != nil {
w.logger.Error().Err(err).Msg("Failed to flush buffer on interval")
}
default:
select {
case <-ctx.Done():
return ctx.Err()
default:
msg, err := iter.Next()
if err != nil {
if err == context.Canceled {
return err
}
w.logger.Error().Err(err).Msg("Failed to get next message")
continue
}

if err := w.processMessage(msg); err != nil {
w.logger.Error().Err(err).Msg("Failed to process message")
}
if err := w.processMessage(msg); err != nil {
w.logger.Error().Err(err).Msg("Failed to process message")
}

if len(w.buffer) >= w.batchSize {
if err := w.flushBuffer(); err != nil {
w.logger.Error().Err(err).Msg("Failed to flush buffer")
if len(w.buffer) >= w.batchSize {
if err := w.flushBuffer(); err != nil {
w.logger.Error().Err(err).Msg("Failed to flush buffer")
}
}
}
}
Expand Down Expand Up @@ -173,6 +214,8 @@ func (w *Worker) flushBuffer() error {
return nil
}

w.logger.Debug().Int("messages", len(w.buffer)).Msg("Flushing buffer")

err := w.sink.WriteBatch(w.buffer)
if err != nil {
w.logger.Error().Err(err).Msg("Failed to write batch to sink")
Expand Down

0 comments on commit c7f90ac

Please sign in to comment.