Skip to content

Commit

Permalink
Resumbale through NATs, bunch more work around encoding for now
Browse files Browse the repository at this point in the history
  • Loading branch information
shayonj committed Sep 11, 2024
1 parent a9b0f6e commit 64057a0
Show file tree
Hide file tree
Showing 16 changed files with 312 additions and 239 deletions.
3 changes: 2 additions & 1 deletion internal/e2e_copy_and_stream.sh
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ populate_initial_data() {
decode(lpad(to_hex(generate_series(1, 4)), 8, '0'), 'hex')
;"
run_sql "UPDATE public.users SET text_col = text_col || ' - Updated';"
run_sql "ANALYZE public.users;"
success "Initial data populated"
}

Expand Down Expand Up @@ -121,7 +122,7 @@ test_pg_flo_cdc() {
simulate_concurrent_changes

log "Waiting for pg_flo to process changes..."
sleep 30
sleep 2

stop_pg_flo_gracefully

Expand Down
32 changes: 20 additions & 12 deletions internal/e2e_resume.sh
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ create_users() {

start_pg_flo_replication() {
log "Starting pg_flo replication..."
$pg_flo_BIN stream file \
$pg_flo_BIN replicator \
--host "$PG_HOST" \
--port "$PG_PORT" \
--dbname "$PG_DB" \
Expand All @@ -26,13 +26,25 @@ start_pg_flo_replication() {
--group "group_resume" \
--tables "users" \
--schema "public" \
--status-dir "/tmp" \
--output-dir "$OUTPUT_DIR" >"$pg_flo_LOG" 2>&1 &
--nats-url "$NATS_URL" \
>"$pg_flo_LOG" 2>&1 &
pg_flo_PID=$!
log "pg_flo started with PID: $pg_flo_PID"
success "pg_flo replication started"
}

start_pg_flo_worker() {
log "Starting pg_flo worker with file sink..."
$pg_flo_BIN worker file \
--group "group_resume" \
--nats-url "$NATS_URL" \
--file-output-dir "$OUTPUT_DIR" \
>"$pg_flo_WORKER_LOG" 2>&1 &
pg_flo_WORKER_PID=$!
log "pg_flo worker started with PID: $pg_flo_WORKER_PID"
success "pg_flo worker started"
}

simulate_concurrent_inserts() {
log "Starting concurrent inserts..."
for i in $(seq 1 $TOTAL_INSERTS); do
Expand All @@ -43,20 +55,14 @@ simulate_concurrent_inserts() {
}

interrupt_pg_flo() {
log "Interrupting pg_flo process..."
if kill -0 $pg_flo_PID 2>/dev/null; then
kill -15 $pg_flo_PID
wait $pg_flo_PID 2>/dev/null || true
success "pg_flo process interrupted"
else
log "pg_flo process not found, it may have already stopped"
fi
log "Interrupting pg_flo processes..."
stop_pg_flo_gracefully
}

verify_results() {
log "Verifying results..."
local db_count=$(run_sql "SELECT COUNT(*) FROM public.users")
local json_count=$(jq -s '[.[] | select(.type == "INSERT")] | length' "$OUTPUT_DIR"/*.jsonl)
local json_count=$(jq -s '[.[] | select(.Type == "INSERT")] | length' "$OUTPUT_DIR"/*.jsonl)

log "Database row count: $db_count"
log "JSON INSERT count: $json_count"
Expand All @@ -74,6 +80,7 @@ test_pg_flo_resume() {
setup_postgres
create_users
start_pg_flo_replication
start_pg_flo_worker

rm -f $INSERT_COMPLETE_FLAG

Expand All @@ -90,6 +97,7 @@ test_pg_flo_resume() {
sleep $RESUME_WAIT_TIME

start_pg_flo_replication
start_pg_flo_worker

log "Waiting for inserts to complete..."
while [ ! -f $INSERT_COMPLETE_FLAG ]; do
Expand Down
54 changes: 27 additions & 27 deletions internal/e2e_test_local.sh
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ trap cleanup EXIT

make build

setup_docker
# setup_docker

log "Running e2e copy & stream tests..."
if CI=false ./internal/e2e_copy_and_stream.sh; then
Expand All @@ -40,37 +40,37 @@ else
exit 1
fi

# setup_docker
setup_docker

# log "Running new e2e stream tests with changes..."
# if ./internal/e2e_test_stream.sh; then
# success "New e2e tests with changes completed successfully"
# else
# error "New e2e tests with changes failed"
# exit 1
# fi
log "Running new e2e stream tests with changes..."
if ./internal/e2e_test_stream.sh; then
success "New e2e tests with changes completed successfully"
else
error "New e2e tests with changes failed"
exit 1
fi

# setup_docker
setup_docker

# # Run new e2e resume test
# log "Running new e2e resume test..."
# if ./internal/e2e_resume.sh; then
# success "E2E resume test completed successfully"
# else
# error "E2E resume test failed"
# exit 1
# fi
# Run new e2e resume test
log "Running new e2e resume test..."
if ./internal/e2e_resume.sh; then
success "E2E resume test completed successfully"
else
error "E2E resume test failed"
exit 1
fi

# setup_docker
setup_docker

# # Run new e2e test for transform & filter
# log "Running new e2e test for transform & filter..."
# if ./internal/e2e_transform_filter.sh; then
# success "E2E test for transform & filter test completed successfully"
# else
# error "E2E test for transform & filter test failed"
# exit 1
# fi
# Run new e2e test for transform & filter
log "Running new e2e test for transform & filter..."
if ./internal/e2e_transform_filter.sh; then
success "E2E test for transform & filter test completed successfully"
else
error "E2E test for transform & filter test failed"
exit 1
fi

# setup_docker

Expand Down
1 change: 0 additions & 1 deletion internal/how-it-works.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

- If no valid LSN (Log Sequence Number) is found in the target sink, `pg_flo` performs an initial bulk copy of existing data.
- This process is parallelized for fast data sync:
- Tables are analyzed to optimize the copy process.
- A snapshot is taken to ensure consistency.
- Each table is divided into page ranges.
- Multiple workers copy different ranges concurrently.
Expand Down
54 changes: 32 additions & 22 deletions pkg/pgflonats/pgflonats.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,13 @@ type NATSClient struct {
stateBucket string
}

// NewNATSClient creates a new NATS client with the specified configuration
// State represents the current state of the replication process
type State struct {
LSN pglogrepl.LSN `json:"lsn"`
LastProcessedSeq uint64 `json:"last_processed_seq"`
}

// NewNATSClient creates a new NATS client with the specified configuration, setting up the connection, main stream, and state bucket.
func NewNATSClient(url, stream, group string) (*NATSClient, error) {
if url == "" {
url = os.Getenv(envNATSURL)
Expand Down Expand Up @@ -79,7 +85,7 @@ func NewNATSClient(url, stream, group string) (*NATSClient, error) {
_, err = js.CreateKeyValue(context.Background(), jetstream.KeyValueConfig{
Bucket: stateBucket,
})
if err != nil && err != jetstream.ErrConsumerNameAlreadyInUse {
if err != nil && err != jetstream.ErrBucketExists {
return nil, fmt.Errorf("failed to create state bucket: %w", err)
}

Expand All @@ -91,7 +97,7 @@ func NewNATSClient(url, stream, group string) (*NATSClient, error) {
}, nil
}

// PublishMessage publishes a message to the specified NATS subject
// PublishMessage publishes a message to the specified NATS subject.
func (nc *NATSClient) PublishMessage(ctx context.Context, subject string, data []byte) error {
_, err := nc.js.Publish(ctx, subject, data)
if err != nil {
Expand All @@ -100,13 +106,13 @@ func (nc *NATSClient) PublishMessage(ctx context.Context, subject string, data [
return nil
}

// Close closes the NATS connection
// Close closes the NATS connection.
func (nc *NATSClient) Close() error {
nc.conn.Close()
return nil
}

// GetStreamInfo retrieves information about the NATS stream
// GetStreamInfo retrieves information about the NATS stream.
func (nc *NATSClient) GetStreamInfo(ctx context.Context) (*jetstream.StreamInfo, error) {
stream, err := nc.js.Stream(ctx, nc.stream)
if err != nil {
Expand All @@ -115,7 +121,7 @@ func (nc *NATSClient) GetStreamInfo(ctx context.Context) (*jetstream.StreamInfo,
return stream.Info(ctx)
}

// PurgeStream purges all messages from the NATS stream
// PurgeStream purges all messages from the NATS stream.
func (nc *NATSClient) PurgeStream(ctx context.Context) error {
stream, err := nc.js.Stream(ctx, nc.stream)
if err != nil {
Expand All @@ -124,55 +130,59 @@ func (nc *NATSClient) PurgeStream(ctx context.Context) error {
return stream.Purge(ctx)
}

// DeleteStream deletes the NATS stream
// DeleteStream deletes the NATS stream.
func (nc *NATSClient) DeleteStream(ctx context.Context) error {
return nc.js.DeleteStream(ctx, nc.stream)
}

// SaveState saves the current replication state to NATS
func (nc *NATSClient) SaveState(ctx context.Context, lsn pglogrepl.LSN) error {
// SaveState saves the current replication state to NATS.
func (nc *NATSClient) SaveState(ctx context.Context, state State) error {
kv, err := nc.js.KeyValue(ctx, nc.stateBucket)
if err != nil {
return fmt.Errorf("failed to get KV bucket: %v", err)
}

data, err := json.Marshal(lsn)
data, err := json.Marshal(state)
if err != nil {
return fmt.Errorf("failed to marshal state: %v", err)
}

_, err = kv.Put(ctx, "lsn", data)
_, err = kv.Put(ctx, "state", data)
if err != nil {
return fmt.Errorf("failed to save state: %v", err)
}

return nil
}

// GetLastState retrieves the last saved replication state from NATS
func (nc *NATSClient) GetLastState(ctx context.Context) (pglogrepl.LSN, error) {
// GetState retrieves the last saved state from NATS, initializing a new state if none is found.
func (nc *NATSClient) GetState(ctx context.Context) (State, error) {
kv, err := nc.js.KeyValue(ctx, nc.stateBucket)
if err != nil {
return 0, fmt.Errorf("failed to get KV bucket: %v", err)
return State{}, fmt.Errorf("failed to get KV bucket: %v", err)
}

entry, err := kv.Get(ctx, "lsn")
entry, err := kv.Get(ctx, "state")
if err != nil {
if err == jetstream.ErrKeyNotFound {
return 0, nil // No state yet, start from the beginning
initialState := State{LastProcessedSeq: 0}
if err := nc.SaveState(ctx, initialState); err != nil {
return State{}, fmt.Errorf("failed to save initial state: %v", err)
}
return initialState, nil
}
return 0, fmt.Errorf("failed to get last state: %v", err)
return State{}, fmt.Errorf("failed to get state: %v", err)
}

var lsn pglogrepl.LSN
if err := json.Unmarshal(entry.Value(), &lsn); err != nil {
return 0, fmt.Errorf("failed to unmarshal state: %v", err)
var state State
if err := json.Unmarshal(entry.Value(), &state); err != nil {
return State{}, fmt.Errorf("failed to unmarshal state: %v", err)
}

return lsn, nil
return state, nil
}

// JetStream returns the JetStream context
// JetStream returns the JetStream context.
func (nc *NATSClient) JetStream() jetstream.JetStream {
return nc.js
}
54 changes: 38 additions & 16 deletions pkg/replicator/base_replicator.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,11 +124,8 @@ func (r *BaseReplicator) checkPublicationExists(publicationName string) (bool, e

// StartReplicationFromLSN initiates the replication process from a given LSN
func (r *BaseReplicator) StartReplicationFromLSN(ctx context.Context, startLSN pglogrepl.LSN) error {
if err := r.CreatePublication(); err != nil {
return err
}

publicationName := GeneratePublicationName(r.Config.Group)
r.Logger.Info().Str("startLSN", startLSN.String()).Str("publication", publicationName).Msg("Starting replication")
err := r.ReplicationConn.StartReplication(ctx, publicationName, startLSN, pglogrepl.StartReplicationOptions{
PluginArgs: []string{
"proto_version '1'",
Expand Down Expand Up @@ -308,11 +305,12 @@ func (r *BaseReplicator) HandleInsertMessage(ctx context.Context, msg *pglogrepl
}

cdcMessage := utils.CDCMessage{
Type: "INSERT",
Schema: relation.Namespace,
Table: relation.RelationName,
Columns: relation.Columns,
NewTuple: msg.Tuple,
Type: "INSERT",
Schema: relation.Namespace,
Table: relation.RelationName,
Columns: relation.Columns,
EmittedAt: time.Now(),
NewTuple: msg.Tuple,
}

r.AddPrimaryKeyInfo(&cdcMessage, relation.RelationName)
Expand Down Expand Up @@ -348,11 +346,12 @@ func (r *BaseReplicator) HandleDeleteMessage(ctx context.Context, msg *pglogrepl

// todo: write lsn
cdcMessage := utils.CDCMessage{
Type: "DELETE",
Schema: relation.Namespace,
Table: relation.RelationName,
Columns: relation.Columns,
OldTuple: msg.OldTuple,
Type: "DELETE",
Schema: relation.Namespace,
Table: relation.RelationName,
Columns: relation.Columns,
OldTuple: msg.OldTuple,
EmittedAt: time.Now(),
}

r.AddPrimaryKeyInfo(&cdcMessage, relation.RelationName)
Expand Down Expand Up @@ -528,10 +527,33 @@ func (r *BaseReplicator) getPrimaryKeyColumn(schema, table string) (string, erro

// SaveState saves the current replication state
func (r *BaseReplicator) SaveState(ctx context.Context, lsn pglogrepl.LSN) error {
return r.NATSClient.SaveState(ctx, lsn)
state, err := r.NATSClient.GetState(ctx)
if err != nil {
return fmt.Errorf("failed to get current state: %w", err)
}
state.LSN = lsn
return r.NATSClient.SaveState(ctx, state)
}

// GetLastState retrieves the last saved replication state
func (r *BaseReplicator) GetLastState(ctx context.Context) (pglogrepl.LSN, error) {
return r.NATSClient.GetLastState(ctx)
state, err := r.NATSClient.GetState(ctx)
if err != nil {
return 0, fmt.Errorf("failed to get state: %w", err)
}
return state.LSN, nil
}

// CheckReplicationSlotStatus checks the status of the replication slot
func (r *BaseReplicator) CheckReplicationSlotStatus(ctx context.Context) error {
publicationName := GeneratePublicationName(r.Config.Group)
var restartLSN string
err := r.StandardConn.QueryRow(ctx,
"SELECT restart_lsn FROM pg_replication_slots WHERE slot_name = $1",
publicationName).Scan(&restartLSN)
if err != nil {
return fmt.Errorf("failed to query replication slot status: %w", err)
}
r.Logger.Info().Str("slotName", publicationName).Str("restartLSN", restartLSN).Msg("Replication slot status")
return nil
}
Loading

0 comments on commit 64057a0

Please sign in to comment.