Skip to content

Commit

Permalink
Added an option to tread geography as strings
Browse files Browse the repository at this point in the history
  • Loading branch information
AdalbertMemSQL committed Nov 13, 2024
1 parent b4c4b61 commit 114c53a
Show file tree
Hide file tree
Showing 8 changed files with 152 additions and 57 deletions.
85 changes: 46 additions & 39 deletions README.md

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import io.debezium.config.EnumeratedValue;
import io.debezium.config.Field;
import io.debezium.connector.SourceInfoStructMaker;
import io.debezium.jdbc.JdbcValueConverters.DecimalMode;
import io.debezium.relational.ColumnFilterMode;
import io.debezium.relational.RelationalDatabaseConnectorConfig;
import io.debezium.relational.RelationalTableFilters;
Expand Down Expand Up @@ -148,10 +149,9 @@ public class SingleStoreConnectorConfig extends RelationalDatabaseConnectorConfi
.withWidth(Width.SHORT)
.withImportance(Importance.MEDIUM)
.withDescription(
"Specify how DECIMAL and NUMERIC columns should be represented in change events, including: "
+ "'precise' (the default) uses java.math.BigDecimal to represent values, which are encoded in the change events using a binary representation and Kafka Connect's 'org.apache.kafka.connect.data.Decimal' type; "
+ "'string' uses string to represent values; "
+ "'double' represents values using Java's 'double', which may not offer the precision but will be far easier to use in consumers.");
"Specify how GEOGRAPHY and GEOGRAPHYPOINT columns should be represented in change events, including: "
+ "'geometry' (the default) uses io.debezium.data.geometry.Geometry to represent values, which contains a structure with two fields: srid (INT32): spatial reference system ID that defines the type of geometry object stored in the structure and wkb (BYTES): binary representation of the geometry object encoded in the Well-Known-Binary (wkb) format."
+ "'string' uses string to represent values.");

protected static final int DEFAULT_SNAPSHOT_FETCH_SIZE = 10_240;
protected static final int DEFAULT_PORT = 3306;
Expand Down Expand Up @@ -202,6 +202,7 @@ public class SingleStoreConnectorConfig extends RelationalDatabaseConnectorConfi
DRIVER_PARAMETERS,
SNAPSHOT_MODE,
BINARY_HANDLING_MODE,
GEOGRAPHY_HANDLING_MODE,
OFFSETS)
.events(
SOURCE_INFO_STRUCT_MAKER,
Expand Down Expand Up @@ -300,6 +301,16 @@ public List<String> offsets() {
return offsets;
}

/**
* Returns the Geography mode Enum configuration. This defaults to {@code geometry} if nothing is
* provided.
*/
public GeographyMode getGeographyMode() {
return GeographyHandlingMode
.parse(this.getConfig().getString(GEOGRAPHY_HANDLING_MODE))
.asDecimalMode();
}

/**
* The set of predefined SnapshotMode options or aliases.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ public ChangeEventSourceCoordinator<SingleStorePartition, SingleStoreOffsetConte
SingleStoreConnectorConfig.TOPIC_NAMING_STRATEGY);
final SingleStoreValueConverters valueConverter = new SingleStoreValueConverters(
connectorConfig.getDecimalMode(), connectorConfig.getTemporalPrecisionMode(),
connectorConfig.binaryHandlingMode());
connectorConfig.binaryHandlingMode(), connectorConfig.getGeographyMode());
final SingleStoreDefaultValueConverter defaultValueConverter = new SingleStoreDefaultValueConverter(
valueConverter);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@

public class SingleStoreValueConverters extends JdbcValueConverters {

private final GeographyMode geographyMode;

/**
* Create a new instance of JdbcValueConverters.
* <p>
Expand All @@ -42,8 +44,11 @@ public class SingleStoreValueConverters extends JdbcValueConverters {
*/
public SingleStoreValueConverters(DecimalMode decimalMode,
TemporalPrecisionMode temporalPrecisionMode,
CommonConnectorConfig.BinaryHandlingMode binaryMode) {
CommonConnectorConfig.BinaryHandlingMode binaryMode,
GeographyMode geographyMode
) {
super(decimalMode, temporalPrecisionMode, ZoneOffset.UTC, null, null, binaryMode);
this.geographyMode = geographyMode;
}

@Override
Expand All @@ -54,7 +59,11 @@ public SchemaBuilder schemaBuilder(Column column) {
return Json.builder();
case "GEOGRAPHYPOINT":
case "GEOGRAPHY":
return io.debezium.data.geometry.Geometry.builder();
if (geographyMode == GeographyMode.GEOMETRY) {
return io.debezium.data.geometry.Geometry.builder();
} else {
return SchemaBuilder.string();
}
case "ENUM":
case "SET":
return SchemaBuilder.string();
Expand Down Expand Up @@ -113,7 +122,11 @@ public ValueConverter converter(Column column, Field fieldDefn) {
return (data) -> convertString(column, fieldDefn, data);
case "GEOGRAPHYPOINT":
case "GEOGRAPHY":
return data -> convertGeometry(column, fieldDefn, data);
if (geographyMode == GeographyMode.GEOMETRY) {
return data -> convertGeometry(column, fieldDefn, data);
} else {
return data -> convertString(column, fieldDefn, data);
}
case "ENUM":
case "SET":
return data -> convertString(column, fieldDefn, data);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ abstract class IntegrationTestBase extends AbstractConnectorTest {
static final String TEST_TOPIC_PREFIX = "singlestore_topic";
private static final String TEST_SERVER_VERSION = System.getProperty("singlestore.version", "");
private static final String TEST_USER = System.getProperty("singlestore.user", "root");
private static final String TEST_PASSWORD = System.getProperty("singlestore.password", "");
private static final String TEST_PASSWORD = "1";
private static final Logger LOGGER = LoggerFactory.getLogger(IntegrationTestBase.class);
private static final String TEST_IMAGE = System.getProperty("singlestore.image",
"ghcr.io/singlestore-labs/singlestoredb-dev:latest");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;

import com.singlestore.debezium.SingleStoreValueConverters.GeographyMode;
import io.debezium.config.CommonConnectorConfig;
import io.debezium.config.Configuration;
import io.debezium.data.Bits;
Expand Down Expand Up @@ -33,8 +34,8 @@
public class SingleStoreDatabaseSchemaIT extends IntegrationTestBase {

private static final SingleStoreValueConverters CONVERTERS = new SingleStoreValueConverters(
JdbcValueConverters.DecimalMode.DOUBLE,
TemporalPrecisionMode.CONNECT, CommonConnectorConfig.BinaryHandlingMode.BYTES);
JdbcValueConverters.DecimalMode.DOUBLE, TemporalPrecisionMode.CONNECT,
CommonConnectorConfig.BinaryHandlingMode.BYTES, GeographyMode.GEOMETRY);
private SingleStoreDatabaseSchema schema;

public static SingleStoreDatabaseSchema getSchema(SingleStoreConnectorConfig config) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.singlestore.debezium;

Check failure on line 1 in src/test/java/com/singlestore/debezium/SingleStoreDefaultValueConverterIT.java

View workflow job for this annotation

GitHub Actions / Test Report

SingleStoreDefaultValueConverterIT.

Could not connect to 127.0.0.1:32778 : unexpected end of stream, read 0 bytes from 4 (socket was closed by server)
Raw output
java.sql.SQLNonTransientConnectionException: Could not connect to 127.0.0.1:32778 : unexpected end of stream, read 0 bytes from 4 (socket was closed by server)
	at com.singlestore.jdbc.export.ExceptionFactory.createException(ExceptionFactory.java:280)
	at com.singlestore.jdbc.export.ExceptionFactory.create(ExceptionFactory.java:346)
	at com.singlestore.jdbc.client.impl.StandardClient.connect(StandardClient.java:270)
	at com.singlestore.jdbc.client.impl.StandardClient.<init>(StandardClient.java:112)
	at com.singlestore.jdbc.Driver.connect(Driver.java:70)
	at com.singlestore.jdbc.Driver.connect(Driver.java:101)
	at com.singlestore.jdbc.Driver.connect(Driver.java:27)
	at io.debezium.jdbc.JdbcConnection.lambda$patternBasedFactory$1(JdbcConnection.java:243)
	at io.debezium.jdbc.JdbcConnection$ConnectionFactoryDecorator.connect(JdbcConnection.java:129)
	at io.debezium.jdbc.JdbcConnection.connection(JdbcConnection.java:875)
	at io.debezium.jdbc.JdbcConnection.connection(JdbcConnection.java:870)
	at io.debezium.jdbc.JdbcConnection.execute(JdbcConnection.java:432)
	at io.debezium.jdbc.JdbcConnection.execute(JdbcConnection.java:411)
	at com.singlestore.debezium.IntegrationTestBase.executeDDL(IntegrationTestBase.java:188)
	at com.singlestore.debezium.IntegrationTestBase.init(IntegrationTestBase.java:67)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
	at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
	at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
	at org.junit.internal.runners.statements.RunBefores.invokeMethod(RunBefores.java:33)
	at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:24)
	at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
	at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
	at org.junit.runners.ParentRunner.run(ParentRunner.java:413)
	at org.apache.maven.surefire.junit4.JUnit4Provider.execute(JUnit4Provider.java:316)
	at org.apache.maven.surefire.junit4.JUnit4Provider.executeWithRerun(JUnit4Provider.java:240)
	at org.apache.maven.surefire.junit4.JUnit4Provider.executeTestSet(JUnit4Provider.java:214)
	at org.apache.maven.surefire.junit4.JUnit4Provider.invoke(JUnit4Provider.java:155)
	at org.apache.maven.surefire.booter.ForkedBooter.runSuitesInProcess(ForkedBooter.java:385)
	at org.apache.maven.surefire.booter.ForkedBooter.execute(ForkedBooter.java:162)
	at org.apache.maven.surefire.booter.ForkedBooter.run(ForkedBooter.java:507)
	at org.apache.maven.surefire.booter.ForkedBooter.main(ForkedBooter.java:495)
Caused by: java.io.EOFException: unexpected end of stream, read 0 bytes from 4 (socket was closed by server)
	at com.singlestore.jdbc.client.socket.impl.PacketReader.readReusablePacket(PacketReader.java:72)
	at com.singlestore.jdbc.client.impl.StandardClient.connect(StandardClient.java:158)
	... 32 more

import com.singlestore.debezium.SingleStoreValueConverters.GeographyMode;
import io.debezium.config.CommonConnectorConfig;
import io.debezium.jdbc.JdbcValueConverters;
import io.debezium.jdbc.TemporalPrecisionMode;
Expand All @@ -26,8 +27,8 @@
public class SingleStoreDefaultValueConverterIT extends IntegrationTestBase {

private static final SingleStoreValueConverters CONVERTERS = new SingleStoreValueConverters(
JdbcValueConverters.DecimalMode.DOUBLE,
TemporalPrecisionMode.ADAPTIVE, CommonConnectorConfig.BinaryHandlingMode.BYTES);
JdbcValueConverters.DecimalMode.DOUBLE, TemporalPrecisionMode.ADAPTIVE,
CommonConnectorConfig.BinaryHandlingMode.BYTES, GeographyMode.GEOMETRY);

@Test
public void testNumberValues() {
Expand Down Expand Up @@ -118,6 +119,41 @@ public void testGeometryValues() throws ParseException {
}
}

@Test
public void testGeometryStringValues() {
try (SingleStoreConnection conn = new SingleStoreConnection(defaultJdbcConnectionConfig())) {
conn.execute(String.format("USE %s", TEST_DATABASE));
conn.execute(
"CREATE ROWSTORE TABLE IF NOT EXISTS testGeometryStringValues(geographyColumn GEOGRAPHY DEFAULT 'POLYGON((1 1,2 1,2 2, 1 2, 1 1))', geographypointColumn GEOGRAPHYPOINT DEFAULT 'POINT(1.50000003 1.50000000)')");
Tables tables = new Tables();
conn.readSchema(tables, TEST_DATABASE, null, null, null, true);
Table table = tables.forTable(TEST_DATABASE, null, "testGeometryStringValues");
assertThat(table).isNotNull();

SingleStoreValueConverters converters = new SingleStoreValueConverters(
JdbcValueConverters.DecimalMode.DOUBLE, TemporalPrecisionMode.CONNECT,
CommonConnectorConfig.BinaryHandlingMode.BYTES, GeographyMode.STRING);
SingleStoreDefaultValueConverter defaultValueConverter = new SingleStoreDefaultValueConverter(
converters);

Column geographyColumn = table.columnWithName("geographyColumn");
Optional<Object> geographyDefaultValue = defaultValueConverter.parseDefaultValue(
geographyColumn, geographyColumn.defaultValueExpression().orElse(null));
assertTrue(geographyDefaultValue.isPresent());
String geographyColumnDefaultValue = (String) geographyDefaultValue.get();
assertEquals("POLYGON((1 1,2 1,2 2, 1 2, 1 1))", geographyColumnDefaultValue);

Column geographypointColumn = table.columnWithName("geographypointColumn");
Optional<Object> geographypointDefaultValue = defaultValueConverter.parseDefaultValue(
geographypointColumn, geographypointColumn.defaultValueExpression().orElse(null));
assertTrue(geographypointDefaultValue.isPresent());
String geographypointColumnDefaultValue = (String) geographypointDefaultValue.get();
assertEquals("POINT(1.50000003 1.50000000)", geographypointColumnDefaultValue);
} catch (SQLException e) {
Assert.fail(e.getMessage());
}
}

@Test
public void testStringValues() {
try (SingleStoreConnection conn = new SingleStoreConnection(defaultJdbcConnectionConfig())) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.singlestore.debezium;

Check failure on line 1 in src/test/java/com/singlestore/debezium/SingleStoreValueConvertersIT.java

View workflow job for this annotation

GitHub Actions / Test Report

SingleStoreValueConvertersIT.

Could not connect to 127.0.0.1:32774 : unexpected end of stream, read 0 bytes from 4 (socket was closed by server)
Raw output
java.sql.SQLNonTransientConnectionException: Could not connect to 127.0.0.1:32774 : unexpected end of stream, read 0 bytes from 4 (socket was closed by server)
	at com.singlestore.jdbc.export.ExceptionFactory.createException(ExceptionFactory.java:280)
	at com.singlestore.jdbc.export.ExceptionFactory.create(ExceptionFactory.java:346)
	at com.singlestore.jdbc.client.impl.StandardClient.connect(StandardClient.java:270)
	at com.singlestore.jdbc.client.impl.StandardClient.<init>(StandardClient.java:112)
	at com.singlestore.jdbc.Driver.connect(Driver.java:70)
	at com.singlestore.jdbc.Driver.connect(Driver.java:101)
	at com.singlestore.jdbc.Driver.connect(Driver.java:27)
	at io.debezium.jdbc.JdbcConnection.lambda$patternBasedFactory$1(JdbcConnection.java:243)
	at io.debezium.jdbc.JdbcConnection$ConnectionFactoryDecorator.connect(JdbcConnection.java:129)
	at io.debezium.jdbc.JdbcConnection.connection(JdbcConnection.java:875)
	at io.debezium.jdbc.JdbcConnection.connection(JdbcConnection.java:870)
	at io.debezium.jdbc.JdbcConnection.execute(JdbcConnection.java:432)
	at io.debezium.jdbc.JdbcConnection.execute(JdbcConnection.java:411)
	at com.singlestore.debezium.IntegrationTestBase.executeDDL(IntegrationTestBase.java:188)
	at com.singlestore.debezium.IntegrationTestBase.init(IntegrationTestBase.java:67)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
	at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
	at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
	at org.junit.internal.runners.statements.RunBefores.invokeMethod(RunBefores.java:33)
	at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:24)
	at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
	at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
	at org.junit.runners.ParentRunner.run(ParentRunner.java:413)
	at org.apache.maven.surefire.junit4.JUnit4Provider.execute(JUnit4Provider.java:316)
	at org.apache.maven.surefire.junit4.JUnit4Provider.executeWithRerun(JUnit4Provider.java:240)
	at org.apache.maven.surefire.junit4.JUnit4Provider.executeTestSet(JUnit4Provider.java:214)
	at org.apache.maven.surefire.junit4.JUnit4Provider.invoke(JUnit4Provider.java:155)
	at org.apache.maven.surefire.booter.ForkedBooter.runSuitesInProcess(ForkedBooter.java:385)
	at org.apache.maven.surefire.booter.ForkedBooter.execute(ForkedBooter.java:162)
	at org.apache.maven.surefire.booter.ForkedBooter.run(ForkedBooter.java:507)
	at org.apache.maven.surefire.booter.ForkedBooter.main(ForkedBooter.java:495)
Caused by: java.io.EOFException: unexpected end of stream, read 0 bytes from 4 (socket was closed by server)
	at com.singlestore.jdbc.client.socket.impl.PacketReader.readReusablePacket(PacketReader.java:72)
	at com.singlestore.jdbc.client.impl.StandardClient.connect(StandardClient.java:158)
	... 32 more

import com.singlestore.debezium.SingleStoreValueConverters.GeographyMode;
import com.singlestore.jdbc.SingleStoreBlob;
import io.debezium.config.CommonConnectorConfig;
import io.debezium.jdbc.JdbcValueConverters;
Expand Down Expand Up @@ -30,7 +31,7 @@ public class SingleStoreValueConvertersIT extends IntegrationTestBase {

private static final SingleStoreValueConverters CONVERTERS = new SingleStoreValueConverters(
JdbcValueConverters.DecimalMode.DOUBLE, TemporalPrecisionMode.CONNECT,
CommonConnectorConfig.BinaryHandlingMode.BYTES);
CommonConnectorConfig.BinaryHandlingMode.BYTES, GeographyMode.GEOMETRY);

@Test
public void testNumberValues() {
Expand Down Expand Up @@ -69,7 +70,8 @@ public void testDecimalModeValues() {

private void testDecimalModeValues(JdbcValueConverters.DecimalMode mode) {
SingleStoreValueConverters converters = new SingleStoreValueConverters(mode,
TemporalPrecisionMode.CONNECT, CommonConnectorConfig.BinaryHandlingMode.BYTES);
TemporalPrecisionMode.CONNECT, CommonConnectorConfig.BinaryHandlingMode.BYTES,
GeographyMode.GEOMETRY);
try (SingleStoreConnection conn = new SingleStoreConnection(defaultJdbcConnectionConfig())) {
Tables tables = new Tables();
conn.readSchema(tables, TEST_DATABASE, null, null, null, true);
Expand All @@ -91,7 +93,7 @@ private void testDecimalModeValues(JdbcValueConverters.DecimalMode mode) {
}

@Test
public void testGeometry() throws ParseException {
public void testGeographyGeometry() throws ParseException {
String geographyValue = "POLYGON ((1 1, 2 1, 2 2, 1 2, 1 1))";
String geographyPointValue = "POINT(1.50000003 1.50000000)";
SingleStoreGeometry singleStoregeographyValue = SingleStoreGeometry.fromEkt(geographyValue);
Expand All @@ -114,6 +116,30 @@ public void testGeometry() throws ParseException {
}
}

@Test
public void testGeographyString() throws ParseException {
String geographyValue = "POLYGON ((1 1, 2 1, 2 2, 1 2, 1 1))";
String geographyPointValue = "POINT(1.50000003 1.50000000)";
try (SingleStoreConnection conn = new SingleStoreConnection(defaultJdbcConnectionConfig())) {
Tables tables = new Tables();
conn.readSchema(tables, TEST_DATABASE, null, null, null, true);
Table table = tables.forTable(TEST_DATABASE, null, "allTypesTable");
assertThat(table).isNotNull();

SingleStoreValueConverters converters = new SingleStoreValueConverters(
JdbcValueConverters.DecimalMode.DOUBLE, TemporalPrecisionMode.CONNECT,
CommonConnectorConfig.BinaryHandlingMode.BYTES, GeographyMode.STRING);
String convertedPolygon = (String) convertColumnValue(converters, table, "geographyColumn",
geographyValue);
assertEquals("POLYGON ((1 1, 2 1, 2 2, 1 2, 1 1))", convertedPolygon);
String convertedPoint = (String) convertColumnValue(converters, table, "geographypointColumn",
geographyPointValue);
assertEquals("POINT(1.50000003 1.50000000)", convertedPoint);
} catch (SQLException e) {
Assert.fail(e.getMessage());
}
}

@Test
public void testTimeAndDateValues() {
testTimeAndDateValues(TemporalPrecisionMode.CONNECT);
Expand All @@ -124,7 +150,7 @@ public void testTimeAndDateValues() {
private void testTimeAndDateValues(TemporalPrecisionMode mode) {
SingleStoreValueConverters converters = new SingleStoreValueConverters(
JdbcValueConverters.DecimalMode.DOUBLE, mode,
CommonConnectorConfig.BinaryHandlingMode.BYTES);
CommonConnectorConfig.BinaryHandlingMode.BYTES, GeographyMode.GEOMETRY);
try (SingleStoreConnection conn = new SingleStoreConnection(defaultJdbcConnectionConfig())) {
Tables tables = new Tables();
conn.readSchema(tables, TEST_DATABASE, null, null, null, true);
Expand Down Expand Up @@ -222,7 +248,8 @@ public void testBinaryMode() {

private void testBinaryMode(CommonConnectorConfig.BinaryHandlingMode mode) {
SingleStoreValueConverters converters = new SingleStoreValueConverters(
JdbcValueConverters.DecimalMode.DOUBLE, TemporalPrecisionMode.CONNECT, mode);
JdbcValueConverters.DecimalMode.DOUBLE, TemporalPrecisionMode.CONNECT, mode,
GeographyMode.GEOMETRY);
try (SingleStoreConnection conn = new SingleStoreConnection(defaultJdbcConnectionConfig())) {
Tables tables = new Tables();
conn.readSchema(tables, TEST_DATABASE, null, null, null, true);
Expand Down

0 comments on commit 114c53a

Please sign in to comment.