Skip to content

Commit

Permalink
Migrate to wal2json for easier handling
Browse files Browse the repository at this point in the history
  • Loading branch information
shayonj committed Dec 14, 2024
1 parent e726bc0 commit b25f512
Show file tree
Hide file tree
Showing 20 changed files with 288 additions and 629 deletions.
4 changes: 2 additions & 2 deletions internal/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ version: "3.8"

services:
postgres:
image: postgres:14
image: debezium/postgres:14-alpine
container_name: pg_logical_replication
environment:
POSTGRES_USER: myuser
Expand All @@ -23,7 +23,7 @@ services:
restart: unless-stopped

target_postgres:
image: postgres:14
image: postgres:14-alpine
container_name: pg_target
environment:
POSTGRES_USER: targetuser
Expand Down
2 changes: 1 addition & 1 deletion internal/scripts/e2e_resume_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,7 @@ def test_resume
@logger.info "Waiting for all inserts to complete..."
threads.each(&:join)

sleep 20
sleep 60

@logger.info "Sending final SIGTERM to cleanup..."
@replicator_pids.each do |pid|
Expand Down
6 changes: 3 additions & 3 deletions internal/scripts/e2e_stream_only.sh
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,9 @@ simulate_changes() {

verify_changes() {
log "Verifying changes in ${OUTPUT_DIR}..."
local insert_count=$(jq -s '[.[] | select(.Type == "INSERT")] | length' "$OUTPUT_DIR"/*.jsonl)
local update_count=$(jq -s '[.[] | select(.Type == "UPDATE")] | length' "$OUTPUT_DIR"/*.jsonl)
local delete_count=$(jq -s '[.[] | select(.Type == "DELETE")] | length' "$OUTPUT_DIR"/*.jsonl)
local insert_count=$(jq -s '[.[] | select(.operation == "INSERT")] | length' "$OUTPUT_DIR"/*.jsonl)
local update_count=$(jq -s '[.[] | select(.operation == "UPDATE")] | length' "$OUTPUT_DIR"/*.jsonl)
local delete_count=$(jq -s '[.[] | select(.operation == "DELETE")] | length' "$OUTPUT_DIR"/*.jsonl)

log "INSERT count: $insert_count (expected 1000)"
log "UPDATE count: $update_count (expected 500)"
Expand Down
2 changes: 1 addition & 1 deletion internal/scripts/e2e_test_local.sh
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ make build
setup_docker

log "Running e2e ddl tests..."
if CI=false ruby ./internal/scripts/e2e_resume_test.rb; then
if CI=false ./internal/scripts/e2e_postgres.sh; then
success "e2e ddl tests completed successfully"
else
error "Original e2e tests failed"
Expand Down
18 changes: 9 additions & 9 deletions internal/scripts/e2e_transform_filter.sh
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,9 @@ simulate_changes() {

verify_changes() {
log "Verifying changes..."
local insert_count=$(jq -s '[.[] | select(.Type == "INSERT")] | length' "$OUTPUT_DIR"/*.jsonl)
local update_count=$(jq -s '[.[] | select(.Type == "UPDATE")] | length' "$OUTPUT_DIR"/*.jsonl)
local delete_count=$(jq -s '[.[] | select(.Type == "DELETE")] | length' "$OUTPUT_DIR"/*.jsonl)
local insert_count=$(jq -s '[.[] | select(.operation == "INSERT")] | length' "$OUTPUT_DIR"/*.jsonl)
local update_count=$(jq -s '[.[] | select(.operation == "UPDATE")] | length' "$OUTPUT_DIR"/*.jsonl)
local delete_count=$(jq -s '[.[] | select(.operation == "DELETE")] | length' "$OUTPUT_DIR"/*.jsonl)

log "INSERT count: $insert_count (expected 2)"
log "UPDATE count: $update_count (expected 1)"
Expand All @@ -88,12 +88,12 @@ verify_changes() {
fi

# Verify transformations and filters
local masked_email=$(jq -r 'select(.Type == "INSERT" and .NewTuple.id == 1) | .NewTuple.email' "$OUTPUT_DIR"/*.jsonl)
local formatted_phone=$(jq -r 'select(.Type == "INSERT" and .NewTuple.id == 1) | .NewTuple.phone' "$OUTPUT_DIR"/*.jsonl)
local filtered_insert=$(jq -r 'select(.Type == "INSERT" and .NewTuple.id == 2) | .NewTuple.id' "$OUTPUT_DIR"/*.jsonl)
local updated_email=$(jq -r 'select(.Type == "UPDATE") | .NewTuple.email' "$OUTPUT_DIR"/*.jsonl)
local masked_ssn=$(jq -r 'select(.Type == "INSERT" and .NewTuple.id == 1) | .NewTuple.ssn' "$OUTPUT_DIR"/*.jsonl)
local filtered_age=$(jq -r 'select(.Type == "INSERT" and .NewTuple.id == 2) | .NewTuple.age' "$OUTPUT_DIR"/*.jsonl)
local masked_email=$(jq -r 'select(.operation == "INSERT" and .NewTuple.id == 1) | .NewTuple.email' "$OUTPUT_DIR"/*.jsonl)
local formatted_phone=$(jq -r 'select(.operation == "INSERT" and .NewTuple.id == 1) | .NewTuple.phone' "$OUTPUT_DIR"/*.jsonl)
local filtered_insert=$(jq -r 'select(.operation == "INSERT" and .NewTuple.id == 2) | .NewTuple.id' "$OUTPUT_DIR"/*.jsonl)
local updated_email=$(jq -r 'select(.operation == "UPDATE") | .NewTuple.email' "$OUTPUT_DIR"/*.jsonl)
local masked_ssn=$(jq -r 'select(.operation == "INSERT" and .NewTuple.id == 1) | .NewTuple.ssn' "$OUTPUT_DIR"/*.jsonl)
local filtered_age=$(jq -r 'select(.operation == "INSERT" and .NewTuple.id == 2) | .NewTuple.age' "$OUTPUT_DIR"/*.jsonl)

if [[ "$masked_email" == "j**************m" ]] &&
[[ "$formatted_phone" == "(123) 456-7890" ]] &&
Expand Down
182 changes: 61 additions & 121 deletions pkg/replicator/base_replicator.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package replicator

import (
"context"
"encoding/json"
"fmt"
"strings"
"sync"
Expand Down Expand Up @@ -153,20 +154,21 @@ func (r *BaseReplicator) checkPublicationExists(publicationName string) (bool, e
// StartReplicationFromLSN initiates the replication process from a given LSN
func (r *BaseReplicator) StartReplicationFromLSN(ctx context.Context, startLSN pglogrepl.LSN, stopChan <-chan struct{}) error {
publicationName := GeneratePublicationName(r.Config.Group)
r.Logger.Info().Str("startLSN", startLSN.String()).Str("publication", publicationName).Msg("Starting replication")

err := r.ReplicationConn.StartReplication(ctx, publicationName, startLSN, pglogrepl.StartReplicationOptions{
PluginArgs: []string{
"proto_version '1'",
fmt.Sprintf("publication_names '%s'", publicationName),
"\"pretty-print\" 'true'",
"\"include-types\" 'true'",
"\"include-timestamp\" 'true'",
"\"include-pk\" 'true'",
"\"format-version\" '2'",
"\"actions\" 'insert,update,delete'",
},
})
if err != nil {
return fmt.Errorf("failed to start replication: %w", err)
}

r.Logger.Info().Str("startLSN", startLSN.String()).Msg("Replication started successfully")

return r.StreamChanges(ctx, stopChan)
}

Expand Down Expand Up @@ -281,136 +283,73 @@ func (r *BaseReplicator) handlePrimaryKeepaliveMessage(ctx context.Context, data
return nil
}

// processWALData handles different types of WAL messages
// processWALData processes the WAL data from wal2json
func (r *BaseReplicator) processWALData(walData []byte, lsn pglogrepl.LSN) error {
logicalMsg, err := pglogrepl.Parse(walData)
if err != nil {
return fmt.Errorf("failed to parse WAL data: %w", err)
}

switch msg := logicalMsg.(type) {
case *pglogrepl.RelationMessage:
r.handleRelationMessage(msg)
case *pglogrepl.BeginMessage:
return r.HandleBeginMessage(msg)
case *pglogrepl.InsertMessage:
return r.HandleInsertMessage(msg, lsn)
case *pglogrepl.UpdateMessage:
return r.HandleUpdateMessage(msg, lsn)
case *pglogrepl.DeleteMessage:
return r.HandleDeleteMessage(msg, lsn)
case *pglogrepl.CommitMessage:
return r.HandleCommitMessage(msg)
default:
r.Logger.Warn().Type("message", msg).Msg("Received unexpected logical replication message")
var msg utils.Wal2JsonMessage
if err := json.Unmarshal(walData, &msg); err != nil {
return fmt.Errorf("failed to parse wal2json message: %w", err)
}

return nil
}

// handleRelationMessage handles RelationMessage messages
func (r *BaseReplicator) handleRelationMessage(msg *pglogrepl.RelationMessage) {
r.Relations[msg.RelationID] = msg
r.Logger.Info().Str("table", msg.RelationName).Uint32("id", msg.RelationID).Msg("Relation message received")
}

// HandleBeginMessage handles BeginMessage messages
func (r *BaseReplicator) HandleBeginMessage(msg *pglogrepl.BeginMessage) error {
r.currentTxBuffer = make([]utils.CDCMessage, 0)
r.currentTxLSN = msg.FinalLSN
return nil
}

// HandleInsertMessage handles InsertMessage messages
func (r *BaseReplicator) HandleInsertMessage(msg *pglogrepl.InsertMessage, lsn pglogrepl.LSN) error {
relation, ok := r.Relations[msg.RelationID]
if !ok {
return fmt.Errorf("unknown relation ID: %d", msg.RelationID)
}

cdcMessage := utils.CDCMessage{
Type: utils.OperationInsert,
Schema: relation.Namespace,
Table: relation.RelationName,
Columns: relation.Columns,
EmittedAt: time.Now(),
NewTuple: msg.Tuple,
LSN: lsn.String(),
}
switch msg.Action {
case "B": // Begin
r.mu.Lock()
r.currentTxBuffer = make([]utils.CDCMessage, 0)
r.mu.Unlock()
return nil

r.AddPrimaryKeyInfo(&cdcMessage, relation.RelationName)
r.currentTxBuffer = append(r.currentTxBuffer, cdcMessage)
return nil
}
case "C": // Commit
r.mu.Lock()
defer r.mu.Unlock()

// HandleUpdateMessage handles UpdateMessage messages
func (r *BaseReplicator) HandleUpdateMessage(msg *pglogrepl.UpdateMessage, lsn pglogrepl.LSN) error {
relation, ok := r.Relations[msg.RelationID]
if !ok {
return fmt.Errorf("unknown relation ID: %d", msg.RelationID)
}

cdcMessage := utils.CDCMessage{
Type: utils.OperationUpdate,
Schema: relation.Namespace,
Table: relation.RelationName,
Columns: relation.Columns,
NewTuple: msg.NewTuple,
OldTuple: msg.OldTuple,
LSN: lsn.String(),
EmittedAt: time.Now(),
ToastedColumns: make(map[string]bool),
}

for i, col := range relation.Columns {
if msg.NewTuple != nil {
newVal := msg.NewTuple.Columns[i]
cdcMessage.ToastedColumns[col.Name] = newVal.DataType == 'u'
for _, cdcMsg := range r.currentTxBuffer {
if err := r.PublishToNATS(cdcMsg); err != nil {
return fmt.Errorf("failed to publish message: %w", err)
}
}
}

r.AddPrimaryKeyInfo(&cdcMessage, relation.RelationName)
r.currentTxBuffer = append(r.currentTxBuffer, cdcMessage)
return nil
}

// HandleDeleteMessage handles DeleteMessage messages
func (r *BaseReplicator) HandleDeleteMessage(msg *pglogrepl.DeleteMessage, lsn pglogrepl.LSN) error {
relation, ok := r.Relations[msg.RelationID]
if !ok {
return fmt.Errorf("unknown relation ID: %d", msg.RelationID)
}
r.currentTxBuffer = nil
r.LastLSN = lsn
return r.SaveState(lsn)

case "I", "U", "D": // Insert, Update, Delete
cdcMsg := &utils.CDCMessage{
Schema: msg.Schema,
Table: msg.Table,
LSN: lsn.String(),
EmittedAt: time.Now(),
Data: make(map[string]interface{}),
OldData: make(map[string]interface{}),
ColumnTypes: make(map[string]string),
ToastedColumns: make(map[string]bool),
Columns: make([]utils.Column, len(msg.Columns)),
}

cdcMessage := utils.CDCMessage{
Type: utils.OperationDelete,
Schema: relation.Namespace,
Table: relation.RelationName,
Columns: relation.Columns,
OldTuple: msg.OldTuple,
EmittedAt: time.Now(),
LSN: lsn.String(),
}
// Convert action to operation
switch msg.Action {
case "I":
cdcMsg.Operation = utils.OperationInsert
case "U":
cdcMsg.Operation = utils.OperationUpdate
case "D":
cdcMsg.Operation = utils.OperationDelete
}

r.AddPrimaryKeyInfo(&cdcMessage, relation.RelationName)
r.currentTxBuffer = append(r.currentTxBuffer, cdcMessage)
return nil
}
// Process columns
for i, col := range msg.Columns {
cdcMsg.Data[col.Name] = col.Value
cdcMsg.ColumnTypes[col.Name] = col.Type

// HandleCommitMessage processes a commit message and publishes it to NATS
func (r *BaseReplicator) HandleCommitMessage(msg *pglogrepl.CommitMessage) error {
for _, cdcMessage := range r.currentTxBuffer {
if err := r.PublishToNATS(cdcMessage); err != nil {
return fmt.Errorf("failed to publish message: %w", err)
cdcMsg.Columns[i] = utils.Column{
Name: col.Name,
DataType: utils.GetOIDFromTypeName(col.Type),
}
}
}

r.LastLSN = msg.CommitLSN
if err := r.SaveState(msg.CommitLSN); err != nil {
r.Logger.Error().Err(err).Msg("Failed to save replication state")
return err
r.mu.Lock()
r.currentTxBuffer = append(r.currentTxBuffer, *cdcMsg)
r.mu.Unlock()
}

r.currentTxBuffer = nil
return nil
}

Expand All @@ -422,6 +361,7 @@ func (r *BaseReplicator) PublishToNATS(data utils.CDCMessage) error {
}

subject := fmt.Sprintf("pgflo.%s", r.Config.Group)

err = r.NATSClient.PublishMessage(subject, binaryData)
if err != nil {
r.Logger.Error().
Expand Down
Loading

0 comments on commit b25f512

Please sign in to comment.