Skip to content

Commit

Permalink
Support avro arrays for postgres insertion. (#2154)
Browse files Browse the repository at this point in the history
Co-authored-by: Claude <[email protected]>
  • Loading branch information
claudevdm and Claude authored Jan 29, 2025
1 parent 208e2ba commit 2631a1d
Show file tree
Hide file tree
Showing 3 changed files with 132 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -295,7 +295,6 @@ public String getValueSql(JsonNode rowObj, String columnName, Map<String, String
} else {
columnValue = columnObj.toString();
}

return cleanDataTypeValueSql(columnValue, columnName, tableSchema);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,13 @@
*/
package com.google.cloud.teleport.v2.utils;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.cloud.teleport.v2.datastream.io.CdcJdbcIO.DataSourceConfiguration;
import com.google.cloud.teleport.v2.datastream.values.DatastreamRow;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -102,6 +107,56 @@ public String cleanDataTypeValueSql(
}
break;
}
// Arrays in Postgres are prefixed with underscore e.g. _INT4 for integer array.
if (dataType.startsWith("_")) {
return convertJsonToPostgresArray(columnValue);
}
return columnValue;
}

private String convertJsonToPostgresArray(String jsonValue) {
if (jsonValue == null || jsonValue.equals("''") || jsonValue.equals("")) {
return getNullValueSql();
}

try {
ObjectMapper mapper = new ObjectMapper();
JsonNode rootNode = mapper.readTree(jsonValue);

if (!(rootNode.isObject() && rootNode.has("nestedArray"))) {
LOG.warn("Empty array: {}", jsonValue);
return getNullValueSql();
}

JsonNode arrayNode = rootNode.get("nestedArray");

// Handle nested structure with elementValue
List<String> elements = new ArrayList<>();
if (arrayNode.isArray()) {
for (JsonNode element : arrayNode) {
if (element.has("elementValue")) {
JsonNode elementValue = element.get("elementValue");
if (!elementValue.isNull()) {
elements.add(formatArrayElement(elementValue));
} else {
elements.add(getNullValueSql());
}
} else if (!element.isNull()) {
elements.add(formatArrayElement(element));
}
}
}
return "ARRAY[" + String.join(",", elements) + "]";
} catch (JsonProcessingException e) {
LOG.error("Error parsing JSON array: {}", jsonValue);
return getNullValueSql();
}
}

private String formatArrayElement(JsonNode element) {
if (element.isTextual()) {
return "\'" + cleanSql(element.textValue()) + "\'";
}
return element.toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
public class DatastreamToDMLTest {

private static final Logger LOG = LoggerFactory.getLogger(DatastreamToDMLTest.class);
private String jsonString =
private static final String JSON_STRING =
"{"
+ "\"text_column\":\"value\","
+ "\"quoted_text_column\":\"Test Values: '!@#$%^\","
Expand All @@ -42,7 +42,7 @@ public class DatastreamToDMLTest {
+ "\"_metadata_table\":\"MY_TABLE$NAME\""
+ "}";

private JsonNode getRowObj() {
private JsonNode getRowObj(String jsonString) {
ObjectMapper mapper = new ObjectMapper();
JsonNode rowObj;
try {
Expand All @@ -59,7 +59,7 @@ private JsonNode getRowObj() {
*/
@Test
public void testGetValueSql() {
JsonNode rowObj = this.getRowObj();
JsonNode rowObj = this.getRowObj(JSON_STRING);

String expectedTextContent = "'value'";
String testSqlContent =
Expand All @@ -82,14 +82,86 @@ public void testGetValueSql() {
assertEquals(expectedNullByteTextContent, testNullByteSqlContent);
}

/**
* Test whether {@link DatastreamToPostgresDML#getValueSql(JsonNode, String, Map)} converts array
* data into correct integer array syntax.
*/
@Test
public void testIntArrayWithNullTypeCoercion() {
String arrayJson =
"{\"number_array\": {"
+ "\"nestedArray\": ["
+ " {\"nestedArray\": null, \"elementValue\": null},"
+ " {\"nestedArray\": null, \"elementValue\": 456}"
+ "], \"elementValue\": null}}";
JsonNode rowObj = this.getRowObj(arrayJson);
Map<String, String> tableSchema = new HashMap<>();
tableSchema.put("number_array", "_int4");
DatastreamToPostgresDML dml = DatastreamToPostgresDML.of(null);
String expectedInt = "ARRAY[NULL,456]";

String actualInt =
DatastreamToPostgresDML.of(null).getValueSql(rowObj, "number_array", tableSchema);

assertEquals(expectedInt, actualInt);
}

/**
* Test whether {@link DatastreamToPostgresDML#getValueSql(JsonNode, String, Map)} converts array
* data into correct integer array syntax.
*/
@Test
public void testIntArrayTypeCoercion() {
String arrayJson =
"{\"number_array\": {"
+ "\"nestedArray\": ["
+ " {\"nestedArray\": null, \"elementValue\": 123},"
+ " {\"nestedArray\": null, \"elementValue\": 456}"
+ "], \"elementValue\": null}}";
JsonNode rowObj = this.getRowObj(arrayJson);
Map<String, String> tableSchema = new HashMap<>();
tableSchema.put("number_array", "_int4");
DatastreamToPostgresDML dml = DatastreamToPostgresDML.of(null);
String expectedInt = "ARRAY[123,456]";

String actualInt =
DatastreamToPostgresDML.of(null).getValueSql(rowObj, "number_array", tableSchema);

assertEquals(expectedInt, actualInt);
}

/**
* Test whether {@link DatastreamToPostgresDML#getValueSql(JsonNode, String, Map)} converts array
* data into correct text array syntax.
*/
@Test
public void testTextArrayTypeCoercion() {
String arrayJson =
"{\"text_array\": {"
+ "\"nestedArray\": ["
+ " {\"nestedArray\": null, \"elementValue\": \"apple\"},"
+ " {\"nestedArray\": null, \"elementValue\": \"cherry\"}"
+ "], \"elementValue\": null}}";
JsonNode rowObj = this.getRowObj(arrayJson);
Map<String, String> tableSchema = new HashMap<>();
tableSchema.put("text_array", "_text");
DatastreamToPostgresDML dml = DatastreamToPostgresDML.of(null);
String expectedInt = "ARRAY['apple','cherry']";

String actualInt =
DatastreamToPostgresDML.of(null).getValueSql(rowObj, "text_array", tableSchema);

assertEquals(expectedInt, actualInt);
}

/**
* Test whether {@link DatastreamToDML#getTargetSchemaName} converts the Oracle schema into the
* correct Postgres schema.
*/
@Test
public void testGetPostgresSchemaName() {
DatastreamToDML datastreamToDML = DatastreamToPostgresDML.of(null);
JsonNode rowObj = this.getRowObj();
JsonNode rowObj = this.getRowObj(JSON_STRING);
DatastreamRow row = DatastreamRow.of(rowObj);

String expectedSchemaName = "my_schema";
Expand All @@ -104,7 +176,7 @@ public void testGetPostgresSchemaName() {
@Test
public void testGetPostgresTableName() {
DatastreamToDML datastreamToDML = DatastreamToPostgresDML.of(null);
JsonNode rowObj = this.getRowObj();
JsonNode rowObj = this.getRowObj(JSON_STRING);
DatastreamRow row = DatastreamRow.of(rowObj);

String expectedTableName = "my_table$name";
Expand Down

0 comments on commit 2631a1d

Please sign in to comment.