Skip to content

Commit

Permalink
Fix convert to pgoutput to match the new format
Browse files Browse the repository at this point in the history
  • Loading branch information
shayonj committed Dec 24, 2024
1 parent faa65a8 commit 314af54
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 22 deletions.
2 changes: 1 addition & 1 deletion internal/scripts/e2e_test_local.sh
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ make build
setup_docker

log "Running e2e ddl tests..."
if CI=false ./internal/scripts/e2e_routing.sh; then
if CI=false ./internal/scripts/e2e_copy_only.sh; then
success "e2e ddl tests completed successfully"
else
error "Original e2e tests failed"
Expand Down
54 changes: 33 additions & 21 deletions pkg/utils/cdc_encoding.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,60 +12,72 @@ import (
)

// ConvertToPgCompatibleOutput converts a Go value to its PostgreSQL output format.
func ConvertToPgCompatibleOutput(value interface{}, oid uint32) ([]byte, error) {
func ConvertToPgCompatibleOutput(value interface{}, oid uint32) (string, error) {
if value == nil {
return nil, nil
return "", nil
}

switch oid {
case pgtype.BoolOID:
return strconv.AppendBool(nil, value.(bool)), nil
return strconv.FormatBool(value.(bool)), nil
case pgtype.Int2OID, pgtype.Int4OID, pgtype.Int8OID:
switch v := value.(type) {
case int:
return []byte(strconv.FormatInt(int64(v), 10)), nil
return strconv.FormatInt(int64(v), 10), nil
case int32:
return []byte(strconv.FormatInt(int64(v), 10)), nil
return strconv.FormatInt(int64(v), 10), nil
case int64:
return []byte(strconv.FormatInt(v, 10)), nil
return strconv.FormatInt(v, 10), nil
default:
return []byte(fmt.Sprintf("%d", value)), nil
return fmt.Sprintf("%d", value), nil
}
case pgtype.Float4OID, pgtype.Float8OID:
return []byte(strconv.FormatFloat(value.(float64), 'f', -1, 64)), nil
return strconv.FormatFloat(value.(float64), 'f', -1, 64), nil
case pgtype.NumericOID:
return []byte(fmt.Sprintf("%v", value)), nil
return fmt.Sprintf("%v", value), nil
case pgtype.TextOID, pgtype.VarcharOID:
return []byte(value.(string)), nil
return value.(string), nil
case pgtype.ByteaOID:
if byteaData, ok := value.([]byte); ok {
return byteaData, nil
return fmt.Sprintf("\\x%x", byteaData), nil
}
return nil, fmt.Errorf("invalid bytea data type")
return "", fmt.Errorf("invalid bytea data type")
case pgtype.TimestampOID, pgtype.TimestamptzOID:
return []byte(value.(time.Time).Format(time.RFC3339Nano)), nil
return value.(time.Time).Format(time.RFC3339Nano), nil
case pgtype.DateOID:
return []byte(value.(time.Time).Format("2006-01-02")), nil
return value.(time.Time).Format("2006-01-02"), nil
case pgtype.JSONOID:
switch v := value.(type) {
case string:
return []byte(v), nil
case []byte:
return v, nil
case []byte:
return string(v), nil
default:
return nil, fmt.Errorf("unsupported type for JSON data: %T", value)
return "", fmt.Errorf("unsupported type for JSON data: %T", value)
}
case pgtype.JSONBOID:
if jsonBytes, ok := value.([]byte); ok {
return jsonBytes, nil
return string(jsonBytes), nil
}
return json.Marshal(value)
jsonBytes, err := json.Marshal(value)
if err != nil {
return "", err
}
return string(jsonBytes), nil
case pgtype.TextArrayOID, pgtype.VarcharArrayOID,
pgtype.Int2ArrayOID, pgtype.Int4ArrayOID, pgtype.Int8ArrayOID,
pgtype.Float4ArrayOID, pgtype.Float8ArrayOID, pgtype.BoolArrayOID:
return EncodeArray(value)
arrayBytes, err := EncodeArray(value)
if err != nil {
return "", err
}
return string(arrayBytes), nil
default:
return []byte(fmt.Sprintf("%v", value)), nil
jsonBytes, err := json.Marshal(value)
if err != nil {
return "", fmt.Errorf("failed to marshal value to JSON: %w", err)
}
return string(jsonBytes), nil
}
}

Expand Down

0 comments on commit 314af54

Please sign in to comment.