From c7bacbda2106024d88f1f6f2971debe626062773 Mon Sep 17 00:00:00 2001
From: Dinu John <86094133+dinujoh@users.noreply.github.com>
Date: Tue, 12 Nov 2024 17:44:46 -0600
Subject: [PATCH] Add MySQL to OpenSearch Geometry data type transformation
Signed-off-by: Dinu John <86094133+dinujoh@users.noreply.github.com>
---
.../rds/datatype/impl/SpatialTypeHandler.java | 356 +++++++++++++++++-
.../datatype/impl/SpatialTypeHandlerTest.java | 284 ++++++++++++--
2 files changed, 596 insertions(+), 44 deletions(-)
diff --git a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/datatype/impl/SpatialTypeHandler.java b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/datatype/impl/SpatialTypeHandler.java
index 73694c0073..ff36b1be8b 100644
--- a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/datatype/impl/SpatialTypeHandler.java
+++ b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/datatype/impl/SpatialTypeHandler.java
@@ -4,22 +4,364 @@
import org.opensearch.dataprepper.plugins.source.rds.model.TableMetadata;
import org.opensearch.dataprepper.plugins.source.rds.datatype.MySQLDataType;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.util.ArrayList;
+import java.util.List;
import java.util.Map;
+/**
+ * Handles spatial data types for OpenSearch geo_shape field type compatibility.
+ * This handler supports various geometric shapes including Point, LineString, Polygon,
+ * MultiPoint, MultiLineString, MultiPolygon, and GeometryCollection.
+ *
+ * @see OpenSearch Geo Shape Documentation
+
+ * Supported WKT (Well-Known Text) formats:
+ *
+ * - POINT(x y)
+ * - LINESTRING(x y, x y, ...)
+ * - POLYGON((x y, x y, ..., x y))
+ * - MULTIPOINT((x y), (x y), ...)
+ * - MULTILINESTRING((x y, x y, ...), (x y, x y, ...))
+ * - MULTIPOLYGON(((x y, x y, ...)), ((x y, x y, ...)))
+ * - GEOMETRYCOLLECTION(POINT(x y), LINESTRING(x y, x y))
+ *
+ * */
public class SpatialTypeHandler implements DataTypeHandler {
+ // MySQL geometry type constants
+ private static final int GEOMETRY_POINT = 1;
+ private static final int GEOMETRY_LINESTRING = 2;
+ private static final int GEOMETRY_POLYGON = 3;
+ private static final int GEOMETRY_MULTIPOINT = 4;
+ private static final int GEOMETRY_MULTILINESTRING = 5;
+ private static final int GEOMETRY_MULTIPOLYGON= 6;
+ private static final int GEOMETRY_COLLECTION = 7;
@Override
public String handle(final MySQLDataType columnType, final String columnName, final Object value,
final TableMetadata metadata) {
- // TODO: Implement the transformation
+ if (value == null) {
+ return null;
+ }
+
if (value instanceof Map) {
- Object data = ((Map, ?>)value).get(BYTES_KEY);
- if (data instanceof byte[]) {
- return new String((byte[]) data);
- } else {
- return data.toString();
+ final Object data = ((Map, ?>)value).get(BYTES_KEY);
+
+ if (data == null) {
+ return null;
+ }
+
+ final String val = data.toString();
+ // val.getBytes() uses the platform's default charset encoding (usually UTF-8)
+ // Treats special characters as multi-byte UTF-8 characters
+ // Corrupts the binary data because it's interpreting raw bytes as UTF-8 encoded text
+
+ // Copy directly to preserve the raw byte values
+ // Treats each character as a single byte
+ // Maintains the original binary data structure without charset encoding
+ byte[] wkbBytes = new byte[val.length()];
+ for (int i = 0; i < val.length(); i++) {
+ wkbBytes[i] = (byte) val.charAt(i);
+ }
+ return parseGeometry(wkbBytes, columnName);
+
+ } else if (value instanceof byte[]) {
+ return parseGeometry((byte[]) value, columnName);
+ }
+
+ throw new IllegalArgumentException("Unsupported value type. The value is of type: " + value.getClass());
+ }
+
+ private String parseGeometry(final byte[] rawData, final String columnName) {
+ try {
+ return parseGeometry(ByteBuffer.wrap(rawData).asReadOnlyBuffer());
+ } catch (Exception e) {
+ throw new RuntimeException("Error processing the geometry data type value for columnName: " + columnName, e);
+ }
+ }
+
+ private String parseGeometry(final ByteBuffer buffer) {
+ // Skip SRID (4 bytes)
+ buffer.position(buffer.position() + 4);
+
+ // Read WKB byte order (1 byte)
+ final byte wkbByteOrder = buffer.get();
+ final ByteOrder order = (wkbByteOrder == 1) ? ByteOrder.LITTLE_ENDIAN : ByteOrder.BIG_ENDIAN;
+ buffer.order(order);
+
+ // Read WKB type (4 bytes)
+ final int wkbType = buffer.getInt();
+
+ switch (wkbType) {
+ case GEOMETRY_POINT:
+ return parsePoint(buffer);
+ case GEOMETRY_LINESTRING:
+ return parseLineString(buffer);
+ case GEOMETRY_POLYGON:
+ return parsePolygon(buffer);
+ case GEOMETRY_MULTIPOINT:
+ return parseMultiPoint(buffer);
+ case GEOMETRY_MULTILINESTRING:
+ return parseMultiLineString(buffer);
+ case GEOMETRY_MULTIPOLYGON:
+ return parseMultiPolygon(buffer);
+ case GEOMETRY_COLLECTION:
+ return parseGeometryCollection(buffer);
+ default:
+ throw new IllegalArgumentException("Unsupported WKB type: " + wkbType);
+ }
+ }
+
+ private String parseGeometryCollection(final ByteBuffer buffer) {
+ int numGeometries = buffer.getInt();
+
+ if (numGeometries < 1) {
+ throw new IllegalArgumentException("GeometryCollection must have at least 1 geometry");
+ }
+
+ List geometries = new ArrayList<>();
+
+ for (int i = 0; i < numGeometries; i++) {
+ // Read WKB byte order for this geometry
+ byte geomByteOrder = buffer.get();
+ buffer.order(geomByteOrder == 1 ? ByteOrder.LITTLE_ENDIAN : ByteOrder.BIG_ENDIAN);
+
+ // Read geometry type
+ int geomType = buffer.getInt();
+
+ // Parse the individual geometry based on its type
+ String geometry;
+ switch (geomType) {
+ case GEOMETRY_POINT:
+ geometry = parsePoint(buffer);
+ break;
+ case GEOMETRY_LINESTRING:
+ geometry = parseLineString(buffer);
+ break;
+ case GEOMETRY_POLYGON:
+ geometry = parsePolygon(buffer);
+ break;
+ case GEOMETRY_MULTIPOINT:
+ geometry = parseMultiPoint(buffer);
+ break;
+ case GEOMETRY_MULTILINESTRING:
+ geometry = parseMultiLineString(buffer);
+ break;
+ case GEOMETRY_MULTIPOLYGON:
+ geometry = parseMultiPolygon(buffer);
+ break;
+ default:
+ throw new IllegalArgumentException("Unsupported geometry type in collection: " + geomType);
+ }
+
+ geometries.add(geometry);
+ }
+
+ return formatGeometryCollection(geometries);
+ }
+
+ private String parsePoint(final ByteBuffer buffer) {
+ double x = buffer.getDouble();
+ double y = buffer.getDouble();
+ return String.format("POINT(%f %f)", x, y);
+ }
+
+ private String parseLineString(final ByteBuffer buffer) {
+ int numPoints = buffer.getInt();
+
+ if (numPoints < 2) {
+ throw new IllegalArgumentException("LineString must have at least 2 points");
+ }
+
+ List points = createPoints(buffer, numPoints);
+
+ return formatLineString(points);
+ }
+
+ private String parsePolygon(final ByteBuffer buffer) {
+ int numRings = buffer.getInt();
+
+ List> rings = parseLinearRing(buffer, numRings);
+
+ return formatPolygon(rings);
+ }
+
+ private List> parseLinearRing(final ByteBuffer buffer, final int numRings) {
+ if (numRings < 1) {
+ throw new IllegalArgumentException("Polygon must have at least 1 ring");
+ }
+
+ List> rings = new ArrayList<>();
+ for (int ring = 0; ring < numRings; ring++) {
+ int numPoints = buffer.getInt();
+ if (numPoints < 4) {
+ throw new IllegalArgumentException("Polygon ring must have at least 4 points");
+ }
+
+ List points = createPoints(buffer, numPoints);
+ rings.add(points);
+ }
+
+ return rings;
+ }
+
+ private String parseMultiPoint(final ByteBuffer buffer) {
+ int numPoints = buffer.getInt();
+
+ if (numPoints < 1) {
+ throw new IllegalArgumentException("MultiPoint must have at least 1 point");
+ }
+
+ List points = new ArrayList<>();
+ for (int i = 0; i < numPoints; i++) {
+ // Skip WKB header for each point
+ buffer.position(buffer.position() + 5);
+ double x = buffer.getDouble();
+ double y = buffer.getDouble();
+ points.add(new Point(x, y));
+ }
+
+ return formatMultiPoint(points);
+ }
+
+ private String parseMultiLineString(final ByteBuffer buffer) {
+ int numLines = buffer.getInt();
+
+ if (numLines < 1) {
+ throw new IllegalArgumentException("MultiLineString must have at least 1 line");
+ }
+
+ List> lines = new ArrayList<>();
+ for (int i = 0; i < numLines; i++) {
+ // Skip WKB header for each line
+ buffer.position(buffer.position() + 5);
+
+ int numPoints = buffer.getInt();
+ if (numPoints < 2) {
+ throw new IllegalArgumentException("LineString must have at least 2 points");
+ }
+
+ List points = createPoints(buffer, numPoints);
+ lines.add(points);
+ }
+
+ return formatMultiLineString(lines);
+ }
+
+ private String parseMultiPolygon(final ByteBuffer buffer) {
+ int numPolygons = buffer.getInt();
+
+ if (numPolygons < 1) {
+ throw new IllegalArgumentException("MultiPolygon must have at least 1 polygon");
+ }
+
+ List>> polygons = new ArrayList<>();
+ for (int i = 0; i < numPolygons; i++) {
+ // Skip WKB header for each polygon
+ buffer.position(buffer.position() + 5);
+
+ int numRings = buffer.getInt();
+ List> rings = parseLinearRing(buffer, numRings);
+ polygons.add(rings);
+ }
+
+ return formatMultiPolygon(polygons);
+ }
+
+ // Helper formatting methods
+ private String formatGeometryCollection(final List geometries) {
+ StringBuilder wkt = new StringBuilder("GEOMETRYCOLLECTION(");
+ for (int i = 0; i < geometries.size(); i++) {
+ if (i > 0) {
+ wkt.append(", ");
}
+ wkt.append(geometries.get(i));
+ }
+ wkt.append(")");
+ return wkt.toString();
+ }
+
+ private String formatLineString(final List points) {
+ StringBuilder wkt = new StringBuilder("LINESTRING(");
+ appendPoints(wkt, points);
+ wkt.append(")");
+ return wkt.toString();
+ }
+
+ private void appendPoints(final StringBuilder wkt, final List points) {
+ for (int i = 0; i < points.size(); i++) {
+ if (i > 0) wkt.append(", ");
+ Point p = points.get(i);
+ wkt.append(formatPoint(p));
+ }
+ }
+
+ private void appendPointsList(final StringBuilder wkt, final List> rings) {
+ for (int j = 0; j < rings.size(); j++) {
+ if (j > 0) wkt.append(", ");
+ wkt.append("(");
+ List ring = rings.get(j);
+ appendPoints(wkt, ring);
+ wkt.append(")");
+ }
+ }
+
+ private static String formatPoint(final Point p) {
+ return String.format("%f %f", p.x, p.y);
+ }
+
+ private String formatPolygon(final List> rings) {
+ StringBuilder wkt = new StringBuilder("POLYGON(");
+ appendPointsList(wkt, rings);
+ wkt.append(")");
+ return wkt.toString();
+ }
+
+ private String formatMultiPoint(final List points) {
+ StringBuilder wkt = new StringBuilder("MULTIPOINT(");
+ appendPoints(wkt, points);
+ wkt.append(")");
+ return wkt.toString();
+ }
+
+ private String formatMultiLineString(final List> lines) {
+ StringBuilder wkt = new StringBuilder("MULTILINESTRING(");
+ appendPointsList(wkt, lines);
+ wkt.append(")");
+ return wkt.toString();
+ }
+
+ private String formatMultiPolygon(final List>> polygons) {
+ StringBuilder wkt = new StringBuilder("MULTIPOLYGON(");
+ for (int i = 0; i < polygons.size(); i++) {
+ if (i > 0) wkt.append(", ");
+ wkt.append("(");
+ List> rings = polygons.get(i);
+ appendPointsList(wkt, rings);
+ wkt.append(")");
+ }
+ wkt.append(")");
+ return wkt.toString();
+ }
+
+ private List createPoints(final ByteBuffer buffer, final int numPoints) {
+ List points = new ArrayList<>();
+ for (int i = 0; i < numPoints; i++) {
+ double x = buffer.getDouble();
+ double y = buffer.getDouble();
+ points.add(new Point(x, y));
+ }
+ return points;
+ }
+
+ private static class Point {
+ final double x;
+ final double y;
+
+ Point(double x, double y) {
+ this.x = x;
+ this.y = y;
}
- return new String((byte[]) value);
}
}
diff --git a/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/datatype/impl/SpatialTypeHandlerTest.java b/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/datatype/impl/SpatialTypeHandlerTest.java
index 5dd2beb33f..194a4c70ef 100644
--- a/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/datatype/impl/SpatialTypeHandlerTest.java
+++ b/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/datatype/impl/SpatialTypeHandlerTest.java
@@ -1,69 +1,279 @@
package org.opensearch.dataprepper.plugins.source.rds.datatype.impl;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
import org.opensearch.dataprepper.plugins.source.rds.datatype.DataTypeHandler;
import org.opensearch.dataprepper.plugins.source.rds.datatype.MySQLDataType;
import org.opensearch.dataprepper.plugins.source.rds.model.TableMetadata;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
import java.util.Collections;
-import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
+import java.util.stream.Stream;
import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.jupiter.api.Assertions.assertThrows;
public class SpatialTypeHandlerTest {
@Test
- public void test_handle() {
- final DataTypeHandler handler = new SpatialTypeHandler();
- final MySQLDataType columnType = MySQLDataType.GEOMETRY;
- final String columnName = "geometryColumn";
- final String value = UUID.randomUUID().toString();
+ public void test_handleInvalidType() {
final TableMetadata metadata = new TableMetadata(
- UUID.randomUUID().toString(), UUID.randomUUID().toString(), List.of(columnName), List.of(columnName),
- Collections.emptyMap(), Collections.emptyMap());
- Object result = handler.handle(columnType, columnName, value.getBytes(), metadata);
+ UUID.randomUUID().toString(), UUID.randomUUID().toString(),
+ List.of("invalid_col"), List.of("invalid_col"));
+ final DataTypeHandler spatialTypeHandler = new SpatialTypeHandler();
- assertThat(result, is(instanceOf(String.class)));
- assertThat(result, is(value));
+ assertThrows(IllegalArgumentException.class, () -> {
+ spatialTypeHandler.handle(MySQLDataType.GEOMETRY, "invalid_col", "not_a_geometry", metadata);
+ });
}
-
@Test
- public void testHandleMapWithByteArrayData() {
- final DataTypeHandler handler = new SpatialTypeHandler();
- final MySQLDataType columnType = MySQLDataType.GEOMETRY;
- final String columnName = "geometryColumn";
- final String testData = UUID.randomUUID().toString();
- final Map value = new HashMap<>();
- value.put("bytes", testData.getBytes());
+ public void test_handleInvalidGeometryValue() {
final TableMetadata metadata = new TableMetadata(
- UUID.randomUUID().toString(), UUID.randomUUID().toString(), List.of(columnName), List.of(columnName),
- Collections.emptyMap(), Collections.emptyMap());
- Object result = handler.handle(columnType, columnName, value, metadata);
+ UUID.randomUUID().toString(), UUID.randomUUID().toString(),
+ List.of("invalid_col"), List.of("invalid_col"));
+ final DataTypeHandler spatialTypeHandler = new SpatialTypeHandler();
- assertThat(result, is(instanceOf(String.class)));
- assertThat(result, is(testData));
+ assertThrows(RuntimeException.class, () -> {
+ spatialTypeHandler.handle(MySQLDataType.GEOMETRY, "invalid_col", "not_a_geometry".getBytes(), metadata);
+ });
}
- @Test
- public void testHandleMapWithByteStringData() {
- final DataTypeHandler handler = new SpatialTypeHandler();
- final MySQLDataType columnType = MySQLDataType.GEOMETRY;
- final String columnName = "geometryColumn";
- final String testData = UUID.randomUUID().toString();
- final Map value = new HashMap<>();
- value.put("bytes", testData);
+ @ParameterizedTest
+ @MethodSource("provideGeometryTypeData")
+ public void test_handleGeometryTypes_success(final MySQLDataType mySQLDataType, final String columnName, final Object value, final Object expectedValue) {
final TableMetadata metadata = new TableMetadata(
- UUID.randomUUID().toString(), UUID.randomUUID().toString(), List.of(columnName), List.of(columnName),
- Collections.emptyMap(), Collections.emptyMap());
- Object result = handler.handle(columnType, columnName, value, metadata);
+ UUID.randomUUID().toString(), UUID.randomUUID().toString(), List.of(columnName), List.of(columnName));
+ final DataTypeHandler numericTypeHandler = new SpatialTypeHandler();
+ Object result = numericTypeHandler.handle(mySQLDataType, columnName, value, metadata);
+
+ if (result != null) {
+ assertThat(result, instanceOf(expectedValue.getClass()));
+ }
+ assertThat(result, is(expectedValue));
+ }
+
+ private static Stream provideGeometryTypeData() {
+ return Stream.of(
+ Arguments.of(MySQLDataType.GEOMETRY, "point_col", Map.of("bytes", createGeometryString("Point", new double[]{1.0, 1.0})),
+ "POINT(1.000000 1.000000)"),
+ Arguments.of(MySQLDataType.GEOMETRY, "point_col", null, null),
+ Arguments.of(MySQLDataType.GEOMETRY, "point_col", Collections.singletonMap("bytes", null), null),
+ Arguments.of(MySQLDataType.GEOMETRY, "point_col", createGeometryBytes("Point", new double[]{1.0, 1.0}),
+ "POINT(1.000000 1.000000)"),
+ Arguments.of(MySQLDataType.GEOMETRY, "linestring_col", Map.of("bytes", createGeometryString("LineString", new double[][]{{1.0, 1.0}, {2.0, 2.0}, {3.0, 3.0}})),
+ "LINESTRING(1.000000 1.000000, 2.000000 2.000000, 3.000000 3.000000)"),
+ Arguments.of(MySQLDataType.GEOMETRY, "polygon_col", Map.of("bytes", createGeometryString("Polygon", new double[][][]{{{0.0, 0.0}, {0.0, 4.0}, {4.0, 4.0}, {4.0, 0.0}, {0.0, 0.0}}})),
+ "POLYGON((0.000000 0.000000, 0.000000 4.000000, 4.000000 4.000000, 4.000000 0.000000, 0.000000 0.000000))"),
+ Arguments.of(MySQLDataType.GEOMETRY, "multipoint_col", Map.of("bytes", createGeometryString("MultiPoint", new double[][]{{1.0, 1.0}, {2.0, 2.0}})),
+ "MULTIPOINT(1.000000 1.000000, 2.000000 2.000000)"),
+ Arguments.of(MySQLDataType.GEOMETRY, "multilinestring_col", Map.of("bytes", createGeometryString("MultiLineString", new double[][][]{{{1.0, 1.0}, {2.0, 2.0}}, {{3.0, 3.0}, {4.0, 4.0}}})),
+ "MULTILINESTRING((1.000000 1.000000, 2.000000 2.000000), (3.000000 3.000000, 4.000000 4.000000))"),
+ Arguments.of(MySQLDataType.GEOMETRY, "multipolygon_col", Map.of("bytes", createGeometryString("MultiPolygon", new double[][][][]{{{{0.0, 0.0}, {0.0, 4.0}, {4.0, 4.0}, {4.0, 0.0}, {0.0, 0.0}}}, {{{5.0, 5.0}, {5.0, 7.0}, {7.0, 7.0}, {7.0, 5.0}, {5.0, 5.0}}}})),
+ "MULTIPOLYGON(((0.000000 0.000000, 0.000000 4.000000, 4.000000 4.000000, 4.000000 0.000000, 0.000000 0.000000)), ((5.000000 5.000000, 5.000000 7.000000, 7.000000 7.000000, 7.000000 5.000000, 5.000000 5.000000)))"),
+ Arguments.of(MySQLDataType.GEOMETRY, "geometrycollection_col", Map.of("bytes", createGeometryString("GeometryCollection", List.of(
+ new Object[]{"Point", new double[]{1.0, 1.0}},
+ new Object[]{"LineString", new double[][]{{2.0, 2.0}, {3.0, 3.0}}}
+ ))),
+ "GEOMETRYCOLLECTION(POINT(1.000000 1.000000), LINESTRING(2.000000 2.000000, 3.000000 3.000000))"),
+ Arguments.of(MySQLDataType.GEOMETRY, "geometrycollection_col", createGeometryBytes("GeometryCollection", List.of(
+ new Object[]{"Point", new double[]{1.0, 1.0}},
+ new Object[]{"LineString", new double[][]{{2.0, 2.0}, {3.0, 3.0}}}
+ )),
+ "GEOMETRYCOLLECTION(POINT(1.000000 1.000000), LINESTRING(2.000000 2.000000, 3.000000 3.000000))")
+ );
+ }
+
+ public static ByteBuffer createGeometryBuffer(String type, Object data) {
+ ByteBuffer buffer;
+ switch (type) {
+ case "Point":
+ buffer = createPoint((double[]) data);
+ break;
+ case "LineString":
+ buffer = createLineString((double[][]) data);
+ break;
+ case "Polygon":
+ buffer = createPolygon((double[][][]) data);
+ break;
+ case "MultiPoint":
+ buffer = createMultiPoint((double[][]) data);
+ break;
+ case "MultiLineString":
+ buffer = createMultiLineString((double[][][]) data);
+ break;
+ case "MultiPolygon":
+ buffer = createMultiPolygon((double[][][][]) data);
+ break;
+ case "GeometryCollection":
+ buffer = createGeometryCollection((List