diff --git a/internal/scripts/e2e_test_local.sh b/internal/scripts/e2e_test_local.sh index c400621..bdc80cf 100755 --- a/internal/scripts/e2e_test_local.sh +++ b/internal/scripts/e2e_test_local.sh @@ -34,7 +34,7 @@ make build setup_docker log "Running e2e ddl tests..." -if CI=false ./internal/scripts/e2e_routing.sh; then +if CI=false ./internal/scripts/e2e_copy_only.sh; then success "e2e ddl tests completed successfully" else error "Original e2e tests failed" diff --git a/pkg/utils/cdc_encoding.go b/pkg/utils/cdc_encoding.go index 528aeb1..1a6995d 100644 --- a/pkg/utils/cdc_encoding.go +++ b/pkg/utils/cdc_encoding.go @@ -12,60 +12,72 @@ import ( ) // ConvertToPgCompatibleOutput converts a Go value to its PostgreSQL output format. -func ConvertToPgCompatibleOutput(value interface{}, oid uint32) ([]byte, error) { +func ConvertToPgCompatibleOutput(value interface{}, oid uint32) (string, error) { if value == nil { - return nil, nil + return "", nil } switch oid { case pgtype.BoolOID: - return strconv.AppendBool(nil, value.(bool)), nil + return strconv.FormatBool(value.(bool)), nil case pgtype.Int2OID, pgtype.Int4OID, pgtype.Int8OID: switch v := value.(type) { case int: - return []byte(strconv.FormatInt(int64(v), 10)), nil + return strconv.FormatInt(int64(v), 10), nil case int32: - return []byte(strconv.FormatInt(int64(v), 10)), nil + return strconv.FormatInt(int64(v), 10), nil case int64: - return []byte(strconv.FormatInt(v, 10)), nil + return strconv.FormatInt(v, 10), nil default: - return []byte(fmt.Sprintf("%d", value)), nil + return fmt.Sprintf("%d", value), nil } case pgtype.Float4OID, pgtype.Float8OID: - return []byte(strconv.FormatFloat(value.(float64), 'f', -1, 64)), nil + return strconv.FormatFloat(value.(float64), 'f', -1, 64), nil case pgtype.NumericOID: - return []byte(fmt.Sprintf("%v", value)), nil + return fmt.Sprintf("%v", value), nil case pgtype.TextOID, pgtype.VarcharOID: - return []byte(value.(string)), nil + return value.(string), nil case pgtype.ByteaOID: if byteaData, ok := value.([]byte); ok { - return byteaData, nil + return fmt.Sprintf("\\x%x", byteaData), nil } - return nil, fmt.Errorf("invalid bytea data type") + return "", fmt.Errorf("invalid bytea data type") case pgtype.TimestampOID, pgtype.TimestamptzOID: - return []byte(value.(time.Time).Format(time.RFC3339Nano)), nil + return value.(time.Time).Format(time.RFC3339Nano), nil case pgtype.DateOID: - return []byte(value.(time.Time).Format("2006-01-02")), nil + return value.(time.Time).Format("2006-01-02"), nil case pgtype.JSONOID: switch v := value.(type) { case string: - return []byte(v), nil - case []byte: return v, nil + case []byte: + return string(v), nil default: - return nil, fmt.Errorf("unsupported type for JSON data: %T", value) + return "", fmt.Errorf("unsupported type for JSON data: %T", value) } case pgtype.JSONBOID: if jsonBytes, ok := value.([]byte); ok { - return jsonBytes, nil + return string(jsonBytes), nil } - return json.Marshal(value) + jsonBytes, err := json.Marshal(value) + if err != nil { + return "", err + } + return string(jsonBytes), nil case pgtype.TextArrayOID, pgtype.VarcharArrayOID, pgtype.Int2ArrayOID, pgtype.Int4ArrayOID, pgtype.Int8ArrayOID, pgtype.Float4ArrayOID, pgtype.Float8ArrayOID, pgtype.BoolArrayOID: - return EncodeArray(value) + arrayBytes, err := EncodeArray(value) + if err != nil { + return "", err + } + return string(arrayBytes), nil default: - return []byte(fmt.Sprintf("%v", value)), nil + jsonBytes, err := json.Marshal(value) + if err != nil { + return "", fmt.Errorf("failed to marshal value to JSON: %w", err) + } + return string(jsonBytes), nil } }