Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Migrate to wal2json for easier handling #62

Draft
wants to merge 4 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions internal/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ version: "3.8"

services:
postgres:
image: postgres:14
image: debezium/postgres:14-alpine
container_name: pg_logical_replication
environment:
POSTGRES_USER: myuser
Expand All @@ -23,7 +23,7 @@ services:
restart: unless-stopped

target_postgres:
image: postgres:14
image: postgres:14-alpine
container_name: pg_target
environment:
POSTGRES_USER: targetuser
Expand Down
2 changes: 1 addition & 1 deletion internal/scripts/e2e_copy_and_stream.sh
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ test_pg_flo_cdc() {
simulate_concurrent_changes

log "Waiting for changes to replicate..."
sleep 90
sleep 180
stop_pg_flo_gracefully
compare_row_counts || return 1
verify_large_json || return 1
Expand Down
1 change: 1 addition & 0 deletions internal/scripts/e2e_copy_only.sh
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ start_pg_flo_worker() {
--target-dbname "$TARGET_PG_DB" \
--target-user "$TARGET_PG_USER" \
--target-password "$TARGET_PG_PASSWORD" \
--batch-size 5000 \
--target-sync-schema \
>"$pg_flo_WORKER_LOG" 2>&1 &
pg_flo_WORKER_PID=$!
Expand Down
2 changes: 1 addition & 1 deletion internal/scripts/e2e_postgres.sh
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ start_pg_flo_worker() {

simulate_changes() {
log "Simulating changes..."
local insert_count=6000
local insert_count=1000

for i in $(seq 1 "$insert_count"); do
run_sql "INSERT INTO public.users (data, nullable_column, toasted_column) VALUES ('Data $i', 'Nullable $i', 'Toasted $i');"
Expand Down
16 changes: 9 additions & 7 deletions internal/scripts/e2e_postgres_uniqueness_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
require 'logger'
require 'securerandom'
require 'json'
require 'digest'

class PostgresUniquenessTest
# Database configuration
Expand Down Expand Up @@ -200,16 +201,18 @@ def test_unique_operations
@logger.info "Testing operations on unique_test..."
uuid = SecureRandom.uuid

# Test INSERT
logo_path = "internal/pg_flo_logo.png"
binary_data = File.binread(logo_path).force_encoding('BINARY')

@source_db.exec_params(
"INSERT INTO public.unique_test (unique_col, binary_data, interval_data, data)
VALUES ($1, $2, $3, $4)",
[uuid, '\xdeadbeef', '1 year 2 months 3 days', '{"value": "test"}']
VALUES ($1, $2::bytea, $3, $4)",
[uuid, { value: binary_data, format: 1 }, '1 year 2 months 3 days', '{"value": "test"}']
)

sleep 1
verify_table_data("unique_test", "unique_col = '#{uuid}'",
"1 | #{uuid} | \\xdeadbeef | 1 year 2 mons 3 days | {\"value\": \"test\"}")
"1 | #{uuid} | #{Digest::MD5.hexdigest(binary_data)} | 1 year 2 mons 3 days | {\"value\": \"test\"}")

# Test UPDATE
@source_db.exec_params(
Expand All @@ -219,8 +222,7 @@ def test_unique_operations

sleep 1
verify_table_data("unique_test", "unique_col = '#{uuid}'",
"1 | #{uuid} | \\xdeadbeef | 1 year 2 mons 3 days | {\"value\": \"updated_data\"}")

"1 | #{uuid} | #{Digest::MD5.hexdigest(binary_data)} | 1 year 2 mons 3 days | {\"value\": \"updated_data\"}")

# Test DELETE
@source_db.exec_params("DELETE FROM public.unique_test WHERE unique_col = $1", [uuid])
Expand Down Expand Up @@ -345,7 +347,7 @@ def build_verification_query(table, condition)
SELECT (
id::text || ' | ' ||
unique_col::text || ' | ' ||
'\\x' || encode(binary_data, 'hex') || ' | ' ||
MD5(binary_data) || ' | ' ||
interval_data::text || ' | ' ||
data::text
) AS row_data
Expand Down
2 changes: 1 addition & 1 deletion internal/scripts/e2e_resume_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,7 @@ def test_resume
@logger.info "Waiting for all inserts to complete..."
threads.each(&:join)

sleep 20
sleep 60

@logger.info "Sending final SIGTERM to cleanup..."
@replicator_pids.each do |pid|
Expand Down
6 changes: 3 additions & 3 deletions internal/scripts/e2e_stream_only.sh
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,9 @@ simulate_changes() {

verify_changes() {
log "Verifying changes in ${OUTPUT_DIR}..."
local insert_count=$(jq -s '[.[] | select(.Type == "INSERT")] | length' "$OUTPUT_DIR"/*.jsonl)
local update_count=$(jq -s '[.[] | select(.Type == "UPDATE")] | length' "$OUTPUT_DIR"/*.jsonl)
local delete_count=$(jq -s '[.[] | select(.Type == "DELETE")] | length' "$OUTPUT_DIR"/*.jsonl)
local insert_count=$(jq -s '[.[] | select(.operation == "INSERT")] | length' "$OUTPUT_DIR"/*.jsonl)
local update_count=$(jq -s '[.[] | select(.operation == "UPDATE")] | length' "$OUTPUT_DIR"/*.jsonl)
local delete_count=$(jq -s '[.[] | select(.operation == "DELETE")] | length' "$OUTPUT_DIR"/*.jsonl)

log "INSERT count: $insert_count (expected 1000)"
log "UPDATE count: $update_count (expected 500)"
Expand Down
2 changes: 1 addition & 1 deletion internal/scripts/e2e_test_local.sh
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ make build
setup_docker

log "Running e2e ddl tests..."
if CI=false ruby ./internal/scripts/e2e_resume_test.rb; then
if CI=false ./internal/scripts/e2e_copy_only.sh; then
success "e2e ddl tests completed successfully"
else
error "Original e2e tests failed"
Expand Down
61 changes: 44 additions & 17 deletions internal/scripts/e2e_transform_filter.sh
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,10 @@ create_users() {
email text,
phone text,
age int,
balance numeric(10,2),
score bigint,
rating real,
weight double precision,
ssn text,
created_at timestamp DEFAULT current_timestamp
);"
Expand Down Expand Up @@ -58,12 +62,12 @@ start_pg_flo_worker() {

simulate_changes() {
log "Simulating changes..."
run_sql "INSERT INTO public.users (email, phone, age, ssn) VALUES
('[email protected]', '1234567890', 25, '123-45-6789'),
('[email protected]', '9876543210', 17, '987-65-4321'),
('[email protected]', '5551234567', 30, '555-12-3456');"
run_sql "INSERT INTO public.users (email, phone, age, balance, score, rating, weight, ssn) VALUES
('[email protected]', '1234567890', 25, 100.50, 1000000000, 4.5, 75.5, '123-45-6789'),
('[email protected]', '9876543210', 17, 50.25, 2000000000, 3.8, 65.3, '987-65-4321'),
('[email protected]', '5551234567', 30, 75.75, 3000000000, 4.2, 80.1, '555-12-3456');"

run_sql "UPDATE public.users SET email = '[email protected]', phone = '1112223333' WHERE id = 1;"
run_sql "UPDATE public.users SET email = '[email protected]', phone = '1112223333', balance = 150.75 WHERE id = 1;"
run_sql "DELETE FROM public.users WHERE age = 30;"
run_sql "DELETE FROM public.users WHERE age = 17;"

Expand All @@ -72,28 +76,51 @@ simulate_changes() {

verify_changes() {
log "Verifying changes..."
local insert_count=$(jq -s '[.[] | select(.Type == "INSERT")] | length' "$OUTPUT_DIR"/*.jsonl)
local update_count=$(jq -s '[.[] | select(.Type == "UPDATE")] | length' "$OUTPUT_DIR"/*.jsonl)
local delete_count=$(jq -s '[.[] | select(.Type == "DELETE")] | length' "$OUTPUT_DIR"/*.jsonl)

log "INSERT count: $insert_count (expected 2)"
local insert_count=$(jq -s '[.[] | select(.operation == "INSERT")] | length' "$OUTPUT_DIR"/*.jsonl)
local update_count=$(jq -s '[.[] | select(.operation == "UPDATE")] | length' "$OUTPUT_DIR"/*.jsonl)
local delete_count=$(jq -s '[.[] | select(.operation == "DELETE")] | length' "$OUTPUT_DIR"/*.jsonl)

# We expect:
# - 1 INSERT (id=1, age=25 passes all filters)
# - 1 UPDATE (for id=1)
# - 2 DELETEs (for age=30 and age=17)
log "INSERT count: $insert_count (expected 1)"
log "UPDATE count: $update_count (expected 1)"
log "DELETE count: $delete_count (expected 2)"

if [ "$insert_count" -eq 2 ] && [ "$update_count" -eq 1 ] && [ "$delete_count" -eq 2 ]; then
if [ "$insert_count" -eq 1 ] && [ "$update_count" -eq 1 ] && [ "$delete_count" -eq 2 ]; then
success "Change counts match expected values"
else
error "Change counts do not match expected values"
return 1
fi

# Verify numeric filters
local filtered_records=$(jq -r '.operation as $op |
select($op == "INSERT") |
select(
(.data.balance < 75.00) or
(.data.score >= 2500000000) or
(.data.rating <= 4.0) or
(.data.weight > 80.0)
) | .data.id' "$OUTPUT_DIR"/*.jsonl)

if [[ -z "$filtered_records" ]]; then
success "Numeric filters working for all types"
else
error "Numeric filters not working correctly"
log "Records that should have been filtered: $filtered_records"
jq -r 'select(.data.id == '"$filtered_records"') | {id: .data.id, balance: .data.balance, score: .data.score, rating: .data.rating, weight: .data.weight}' "$OUTPUT_DIR"/*.jsonl
return 1
fi

# Verify transformations and filters
local masked_email=$(jq -r 'select(.Type == "INSERT" and .NewTuple.id == 1) | .NewTuple.email' "$OUTPUT_DIR"/*.jsonl)
local formatted_phone=$(jq -r 'select(.Type == "INSERT" and .NewTuple.id == 1) | .NewTuple.phone' "$OUTPUT_DIR"/*.jsonl)
local filtered_insert=$(jq -r 'select(.Type == "INSERT" and .NewTuple.id == 2) | .NewTuple.id' "$OUTPUT_DIR"/*.jsonl)
local updated_email=$(jq -r 'select(.Type == "UPDATE") | .NewTuple.email' "$OUTPUT_DIR"/*.jsonl)
local masked_ssn=$(jq -r 'select(.Type == "INSERT" and .NewTuple.id == 1) | .NewTuple.ssn' "$OUTPUT_DIR"/*.jsonl)
local filtered_age=$(jq -r 'select(.Type == "INSERT" and .NewTuple.id == 2) | .NewTuple.age' "$OUTPUT_DIR"/*.jsonl)
local masked_email=$(jq -r 'select(.operation == "INSERT" and .data.id == 1) | .data.email' "$OUTPUT_DIR"/*.jsonl)
local formatted_phone=$(jq -r 'select(.operation == "INSERT" and .data.id == 1) | .data.phone' "$OUTPUT_DIR"/*.jsonl)
local filtered_insert=$(jq -r 'select(.operation == "INSERT" and .data.id == 2) | .data.id' "$OUTPUT_DIR"/*.jsonl)
local updated_email=$(jq -r 'select(.operation == "UPDATE") | .data.email' "$OUTPUT_DIR"/*.jsonl)
local masked_ssn=$(jq -r 'select(.operation == "INSERT" and .data.id == 1) | .data.ssn' "$OUTPUT_DIR"/*.jsonl)
local filtered_age=$(jq -r 'select(.operation == "INSERT" and .data.id == 2) | .data.age' "$OUTPUT_DIR"/*.jsonl)

if [[ "$masked_email" == "j**************m" ]] &&
[[ "$formatted_phone" == "(123) 456-7890" ]] &&
Expand Down
28 changes: 28 additions & 0 deletions internal/scripts/rules.yml
Original file line number Diff line number Diff line change
Expand Up @@ -29,3 +29,31 @@ tables:
mask_char: "X"
allow_empty_deletes: true
operations: [INSERT, UPDATE, DELETE]
- type: filter
column: balance
parameters:
operator: gte
value: 75.00
allow_empty_deletes: true
operations: [INSERT, UPDATE, DELETE]
- type: filter
column: score
parameters:
operator: lt
value: 2500000000
allow_empty_deletes: true
operations: [INSERT, UPDATE, DELETE]
- type: filter
column: rating
parameters:
operator: gt
value: 4.0
allow_empty_deletes: true
operations: [INSERT, UPDATE, DELETE]
- type: filter
column: weight
parameters:
operator: lte
value: 80.0
allow_empty_deletes: true
operations: [INSERT, UPDATE, DELETE]
Loading
Loading