From b4c4b615f6240681936a27bda9653c1cdd3b83e1 Mon Sep 17 00:00:00 2001 From: Adalbert Makarovych Date: Wed, 13 Nov 2024 09:59:06 +0200 Subject: [PATCH 1/3] check point --- .../debezium/SingleStoreConnectorConfig.java | 86 ++++++++++++++++++- .../debezium/SingleStoreValueConverters.java | 28 +++--- 2 files changed, 102 insertions(+), 12 deletions(-) diff --git a/src/main/java/com/singlestore/debezium/SingleStoreConnectorConfig.java b/src/main/java/com/singlestore/debezium/SingleStoreConnectorConfig.java index ad3889c..4ef44af 100644 --- a/src/main/java/com/singlestore/debezium/SingleStoreConnectorConfig.java +++ b/src/main/java/com/singlestore/debezium/SingleStoreConnectorConfig.java @@ -1,5 +1,6 @@ package com.singlestore.debezium; +import com.singlestore.debezium.SingleStoreValueConverters.GeographyMode; import io.debezium.config.CommonConnectorConfig; import io.debezium.config.ConfigDefinition; import io.debezium.config.Configuration; @@ -139,6 +140,19 @@ public class SingleStoreConnectorConfig extends RelationalDatabaseConnectorConfi + "Example: 0000000000000077000000000000000E000000000000E06E,0x0000000000000077000000000000000E000000000000E087,0000000000000077000000000000000E000000000000E088"); public static final Field TOPIC_NAMING_STRATEGY = CommonConnectorConfig.TOPIC_NAMING_STRATEGY .withDefault(DefaultTopicNamingStrategy.class.getName()); + + public static final Field GEOGRAPHY_HANDLING_MODE = Field.create("geography.handling.mode") + .withDisplayName("Geography Handling") + .withGroup(Field.createGroupEntry(Field.Group.CONNECTOR, 8)) + .withEnum(GeographyHandlingMode.class, GeographyHandlingMode.GEOMETRY) + .withWidth(Width.SHORT) + .withImportance(Importance.MEDIUM) + .withDescription( + "Specify how DECIMAL and NUMERIC columns should be represented in change events, including: " + + "'precise' (the default) uses java.math.BigDecimal to represent values, which are encoded in the change events using a binary representation and Kafka Connect's 'org.apache.kafka.connect.data.Decimal' type; " + + "'string' uses string to represent values; " + + "'double' represents values using Java's 'double', which may not offer the precision but will be far easier to use in consumers."); + protected static final int DEFAULT_SNAPSHOT_FETCH_SIZE = 10_240; protected static final int DEFAULT_PORT = 3306; public static final Field PORT = RelationalDatabaseConnectorConfig.PORT @@ -468,6 +482,77 @@ public String getValue() { } } + /** + * The set of predefined GeographyHandlingMode options or aliases. + */ + public enum GeographyHandlingMode implements EnumeratedValue { + /** + * Represent {@code GEOGRAPHY} and {@code GEOGRAPHYPOINT} values as geometry + * {@link io.debezium.data.geometry.Geometry} values. + */ + GEOMETRY("geometry"), + + /** + * Represent {@code GEOGRAPHY} and {@code GEOGRAPHYPOINT} values as a string values. + */ + STRING("string"); + + private final String value; + + GeographyHandlingMode(String value) { + this.value = value; + } + + /** + * Determine if the supplied value is one of the predefined options. + * + * @param value the configuration property value; may not be null + * @return the matching option, or null if no match is found + */ + public static GeographyHandlingMode parse(String value) { + if (value == null) { + return null; + } + value = value.trim(); + for (GeographyHandlingMode option : GeographyHandlingMode.values()) { + if (option.getValue().equalsIgnoreCase(value)) { + return option; + } + } + return null; + } + + /** + * Determine if the supplied value is one of the predefined options. + * + * @param value the configuration property value; may not be null + * @param defaultValue the default value; may be null + * @return the matching option, or null if no match is found and the non-null default is invalid + */ + public static GeographyHandlingMode parse(String value, String defaultValue) { + GeographyHandlingMode mode = parse(value); + if (mode == null && defaultValue != null) { + mode = parse(defaultValue); + } + return mode; + } + + @Override + public String getValue() { + return value; + } + + public GeographyMode asDecimalMode() { + switch (this) { + case STRING: + return GeographyMode.STRING; + case GEOMETRY: + default: + return GeographyMode.GEOMETRY; + } + } + } + private static class SystemTablesPredicate implements TableFilter { protected static final List SYSTEM_SCHEMAS = Arrays @@ -478,5 +563,4 @@ public boolean isIncluded(TableId t) { return t.catalog() != null && !SYSTEM_SCHEMAS.contains(t.catalog().toLowerCase()); } } - } diff --git a/src/main/java/com/singlestore/debezium/SingleStoreValueConverters.java b/src/main/java/com/singlestore/debezium/SingleStoreValueConverters.java index b99def2..78ba16b 100644 --- a/src/main/java/com/singlestore/debezium/SingleStoreValueConverters.java +++ b/src/main/java/com/singlestore/debezium/SingleStoreValueConverters.java @@ -1,29 +1,30 @@ package com.singlestore.debezium; import com.singlestore.jdbc.SingleStoreBlob; -import java.time.Duration; -import java.time.LocalDateTime; -import java.time.LocalTime; -import org.locationtech.jts.io.ParseException; - import io.debezium.config.CommonConnectorConfig; import io.debezium.data.Json; import io.debezium.jdbc.JdbcValueConverters; import io.debezium.jdbc.TemporalPrecisionMode; import io.debezium.relational.Column; import io.debezium.relational.ValueConverter; -import io.debezium.time.*; +import io.debezium.time.Date; +import io.debezium.time.MicroTime; +import io.debezium.time.MicroTimestamp; +import io.debezium.time.Timestamp; +import io.debezium.time.Year; import io.debezium.util.IoUtil; -import org.apache.kafka.connect.data.Field; -import org.apache.kafka.connect.data.Schema; -import org.apache.kafka.connect.data.SchemaBuilder; -import org.apache.kafka.connect.source.SourceRecord; - import java.io.IOException; import java.nio.ByteOrder; import java.sql.SQLException; +import java.time.Duration; +import java.time.LocalDateTime; import java.time.ZoneOffset; import java.time.temporal.ChronoField; +import org.apache.kafka.connect.data.Field; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.SchemaBuilder; +import org.apache.kafka.connect.source.SourceRecord; +import org.locationtech.jts.io.ParseException; public class SingleStoreValueConverters extends JdbcValueConverters { @@ -311,4 +312,9 @@ protected Object convertGeometry(Column column, Field fieldDefn, Object data) } }); } + + public enum GeographyMode { + GEOMETRY, + STRING + } } From 114c53ab3b7dd2a72664037f92be2cf346499578 Mon Sep 17 00:00:00 2001 From: Adalbert Makarovych Date: Wed, 13 Nov 2024 13:46:25 +0200 Subject: [PATCH 2/3] Added an option to tread geography as strings --- README.md | 85 ++++++++++--------- .../debezium/SingleStoreConnectorConfig.java | 19 ++++- .../debezium/SingleStoreConnectorTask.java | 2 +- .../debezium/SingleStoreValueConverters.java | 19 ++++- .../debezium/IntegrationTestBase.java | 2 +- .../debezium/SingleStoreDatabaseSchemaIT.java | 5 +- .../SingleStoreDefaultValueConverterIT.java | 40 ++++++++- .../SingleStoreValueConvertersIT.java | 37 ++++++-- 8 files changed, 152 insertions(+), 57 deletions(-) diff --git a/README.md b/README.md index 98b88be..a496273 100644 --- a/README.md +++ b/README.md @@ -446,37 +446,43 @@ connector configuration property. Debezium connectors handle binary data types based on the value of the [`binary.handling.mode`](#binary-handling-mode) connector configuration property. -| SingleStoreDB data type | Literal type | Semantic type -| - | - | - -| TINYBLOB | BYTES OR STRING | Either the raw bytes (the default), a base64-encoded String, or a -base64-url-safe-encoded String, or a hex-encoded String, based on -the [binary.handling.mode](#binary-handling-mode) connector configuration property. -| BLOB | BYTES OR STRING | Either the raw bytes (the default), a base64-encoded String, or a -base64-url-safe-encoded String, or a hex-encoded String, based on -the [binary.handling.mode](#binary-handling-mode) connector configuration property. -| MEDIUMBLOB | BYTES OR STRING | Either the raw bytes (the default), a base64-encoded String, or a -base64-url-safe-encoded String, or a hex-encoded String, based on -the [binary.handling.mode](#binary-handling-mode) connector configuration property. -| LONGBLOB | BYTES OR STRING | Either the raw bytes (the default), a base64-encoded String, or a -base64-url-safe-encoded String, or a hex-encoded String, based on -the [binary.handling.mode](#binary-handling-mode) connector configuration property. -| BINARY | BYTES OR STRING | Either the raw bytes (the default), a base64-encoded String, or a -base64-url-safe-encoded String, or a hex-encoded String, based on -the [binary.handling.mode](#binary-handling-mode) connector configuration property. -| VARBINARY | BYTES OR STRING | Either the raw bytes (the default), a base64-encoded String, or a -base64-url-safe-encoded String, or a hex-encoded String, based on -the [binary.handling.mode](#binary-handling-mode) connector configuration property. -### Other types +| SingleStoreDB data type | Literal type | Semantic type | +|-------------------------|-----------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| TINYBLOB | BYTES OR STRING | Either the raw bytes (the default), a base64-encoded String, or a base64-url-safe-encoded String, or a hex-encoded String, based on | | |the [binary.handling.mode](#binary-handling-mode) connector configuration property. +| BLOB | BYTES OR STRING | Either the raw bytes (the default), a base64-encoded String, or a base64-url-safe-encoded String, or a hex-encoded String, based on the [binary.handling.mode](#binary-handling-mode) connector configuration property. | +| MEDIUMBLOB | BYTES OR STRING | Either the raw bytes (the default), a base64-encoded String, or a base64-url-safe-encoded String, or a hex-encoded String, based on the [binary.handling.mode](#binary-handling-mode) connector configuration property. | +| LONGBLOB | BYTES OR STRING | Either the raw bytes (the default), a base64-encoded String, or a base64-url-safe-encoded String, or a hex-encoded String, based on the [binary.handling.mode](#binary-handling-mode) connector configuration property. | +| BINARY | BYTES OR STRING | Either the raw bytes (the default), a base64-encoded String, or a base64-url-safe-encoded String, or a hex-encoded String, based on the [binary.handling.mode](#binary-handling-mode) connector configuration property. | +| VARBINARY | BYTES OR STRING | Either the raw bytes (the default), a base64-encoded String, or a base64-url-safe-encoded String, or a hex-encoded String, based on the [binary.handling.mode](#binary-handling-mode) connector configuration property. | + +### Geospatial types + +Geospatial types depend on the value of the [`geography.handling.mode`](#geography-handling) +connector configuration property. + +`geography.handling.mode=geometry(default)` | SingleStoreDB data type | Literal type | Semantic type | |-------------------------|--------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| -| JSON | STRING | N/A | -| ENUM | STRING | N/A | -| SET | STRING | N/A | | GEOGRAPHYPOINT | STRUCT | `io.debezium.data.geometry.Geometry` - Contains a structure with two fields: `srid` (INT32): spatial reference system ID that defines the type of geometry object stored in the structure and `wkb` (BYTES): binary representation of the geometry object encoded in the Well-Known-Binary (wkb) format. Refer to the [Open Geospatial Consortium](https://www.opengeospatial.org/standards/sfa) for more details. | | GEOGRAPHY | STRUCT | `io.debezium.data.geometry.Geometry` - Contains a structure with two fields: `srid` (INT32): spatial reference system ID that defines the type of geometry object stored in the structure and `wkb` (BYTES): binary representation of the geometry object encoded in the Well-Known-Binary (wkb) format. Refer to the [Open Geospatial Consortium](https://www.opengeospatial.org/standards/sfa) for more details. | +`geography.handling.mode=string` + +| SingleStoreDB data type | Literal type | +|-------------------------|--------------| +| GEOGRAPHYPOINT | STRING | +| GEOGRAPHY | STRING | + +### Other types + +| SingleStoreDB data type | Literal type | +|-------------------------|--------------| +| JSON | STRING | +| ENUM | STRING | +| SET | STRING | + ## Connector properties The SingleStore Debezium connector supports the following configuration properties which can be used @@ -513,21 +519,22 @@ to achieve the right connector behavior for your application. The following configuration properties are required unless a default value is available. -| Property | Default | Description | -|---------------------------------------------------------------|----------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| -| topic.prefix | | Specifies the topic prefix that identifies and provides a namespace for the particular database server/cluster that is capturing the changes. The topic prefix should be unique across all other connectors, since it is used as a prefix for all Kafka topic names that receive events generated by this connector. Only alphanumeric characters, hyphens, dots, and underscores are accepted. | -| decimal.handling.mode | precise | Specifies how DECIMAL and NUMERIC columns are represented in change events. Values include: 'precise' - uses `java.math.BigDecimal` to represent values, which are encoded in the change events using a binary representation and Kafka Connect's 'org.apache.kafka.connect.data.Decimal' type; 'string' - uses string to represent values; 'double' - represents values using Java's 'double', which may not offer the precision, but it is easier to use in consumers. | -| binary.handling.mode | bytes | Specifies how binary (blob, binary, etc.) columns are represented in change events. Values include: 'bytes' - represents binary data as byte array (default); 'base64' - represents binary data as base64-encoded string; 'base64-url-safe' - represents binary data as base64-url-safe-encoded string; 'hex' - represents binary data as hex-encoded (base16) string. | -| time.precision.mode | advanced | Specifies the precision type for time, date, and timestamps.
Values include:
'adaptive' - bases the precision of time, date, and timestamp values on the database column's precision;
'adaptive_time_microseconds' - similar to 'adaptive' mode, but TIME fields always use microseconds precision;
'connect' - always represents time, date, and timestamp values using Kafka Connect's built-in representations for Time, Date, and Timestamp, which uses millisecond precision regardless of the database columns' precision. | -| tombstones.on.delete | true | Whether delete operations should be represented by a delete event and a subsequent tombstone event ('true') or only by a delete event ('false'). Generating the tombstone event (the default behavior) allows Kafka to completely delete all events pertaining to the given key once the source record is deleted. | -| column.include.list | | Regular expressions matching columns to include in change events. | -| column.exclude.list | | Regular expressions matching columns to exclude from change events. | -| column.mask.hash.([^.]+).with.salt.(.+) | | A comma-separated list of regular expressions matching fully-qualified names of columns that should be masked by hashing the input, using the specified hash algorithms and salt. | -| column.mask.with.(d+).chars | | A comma-separated list of regular expressions matching fully-qualified names of columns that should be masked with the specified number of asterisks ('*'). | -| column.truncate.to.(d+).chars | | A comma-separated list of regular expressions matching fully-qualified names of columns that should be truncated to the configured amount of characters. | -| column.propagate.source.type | | A comma-separated list of regular expressions matching fully-qualified names of columns that adds the column’s original type and original length as parameters to the corresponding field schemas in the emitted change records. | -| datatype.propagate.source.type | | A comma-separated list of regular expressions matching the database-specific data type names that adds the data type's original type and original length as parameters to the corresponding field schemas in the generated change records. | -| populate.internal.id | false | Specifies whether to add `internalId` to the `after` field of the event message. | +| Property | Default | Description | +|-------------------------------------------------------------------|----------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| topic.prefix | | Specifies the topic prefix that identifies and provides a namespace for the particular database server/cluster that is capturing the changes. The topic prefix should be unique across all other connectors, since it is used as a prefix for all Kafka topic names that receive events generated by this connector. Only alphanumeric characters, hyphens, dots, and underscores are accepted. | +| decimal.handling.mode | precise | Specifies how DECIMAL and NUMERIC columns are represented in change events. Values include: 'precise' - uses `java.math.BigDecimal` to represent values, which are encoded in the change events using a binary representation and Kafka Connect's 'org.apache.kafka.connect.data.Decimal' type; 'string' - uses string to represent values; 'double' - represents values using Java's 'double', which may not offer the precision, but it is easier to use in consumers. | +| binary.handling.mode | bytes | Specifies how binary (blob, binary, etc.) columns are represented in change events. Values include: 'bytes' - represents binary data as byte array (default); 'base64' - represents binary data as base64-encoded string; 'base64-url-safe' - represents binary data as base64-url-safe-encoded string; 'hex' - represents binary data as hex-encoded (base16) string. | +| time.precision.mode | advanced | Specifies the precision type for time, date, and timestamps.
Values include:
'adaptive' - bases the precision of time, date, and timestamp values on the database column's precision;
'adaptive_time_microseconds' - similar to 'adaptive' mode, but TIME fields always use microseconds precision;
'connect' - always represents time, date, and timestamp values using Kafka Connect's built-in representations for Time, Date, and Timestamp, which uses millisecond precision regardless of the database columns' precision. | +| geography.handling.mode | precise | Specifies how GEOGRAPHY and GEOGRAPHYPOINT columns are represented in change events. Values include: 'geometry' - uses io.debezium.data.geometry.Geometry to represent values, which contains a structure with two fields: srid (INT32): spatial reference system ID that defines the type of geometry object stored in the structure and wkb (BYTES): binary representation of the geometry object encoded in the Well-Known-Binary (wkb) format; 'string' - uses string to represent values. | +| tombstones.on.delete | true | Whether delete operations should be represented by a delete event and a subsequent tombstone event ('true') or only by a delete event ('false'). Generating the tombstone event (the default behavior) allows Kafka to completely delete all events pertaining to the given key once the source record is deleted. | +| column.include.list | | Regular expressions matching columns to include in change events. | +| column.exclude.list | | Regular expressions matching columns to exclude from change events. | +| column.mask.hash.([^.]+).with.salt.(.+) | | A comma-separated list of regular expressions matching fully-qualified names of columns that should be masked by hashing the input, using the specified hash algorithms and salt. | +| column.mask.with.(d+).chars | | A comma-separated list of regular expressions matching fully-qualified names of columns that should be masked with the specified number of asterisks ('*'). | +| column.truncate.to.(d+).chars | | A comma-separated list of regular expressions matching fully-qualified names of columns that should be truncated to the configured amount of characters. | +| column.propagate.source.type | | A comma-separated list of regular expressions matching fully-qualified names of columns that adds the column’s original type and original length as parameters to the corresponding field schemas in the emitted change records. | +| datatype.propagate.source.type | | A comma-separated list of regular expressions matching the database-specific data type names that adds the data type's original type and original length as parameters to the corresponding field schemas in the generated change records. | +| populate.internal.id | false | Specifies whether to add `internalId` to the `after` field of the event message. | ### Advanced connector configuration properties diff --git a/src/main/java/com/singlestore/debezium/SingleStoreConnectorConfig.java b/src/main/java/com/singlestore/debezium/SingleStoreConnectorConfig.java index 4ef44af..0283075 100644 --- a/src/main/java/com/singlestore/debezium/SingleStoreConnectorConfig.java +++ b/src/main/java/com/singlestore/debezium/SingleStoreConnectorConfig.java @@ -7,6 +7,7 @@ import io.debezium.config.EnumeratedValue; import io.debezium.config.Field; import io.debezium.connector.SourceInfoStructMaker; +import io.debezium.jdbc.JdbcValueConverters.DecimalMode; import io.debezium.relational.ColumnFilterMode; import io.debezium.relational.RelationalDatabaseConnectorConfig; import io.debezium.relational.RelationalTableFilters; @@ -148,10 +149,9 @@ public class SingleStoreConnectorConfig extends RelationalDatabaseConnectorConfi .withWidth(Width.SHORT) .withImportance(Importance.MEDIUM) .withDescription( - "Specify how DECIMAL and NUMERIC columns should be represented in change events, including: " - + "'precise' (the default) uses java.math.BigDecimal to represent values, which are encoded in the change events using a binary representation and Kafka Connect's 'org.apache.kafka.connect.data.Decimal' type; " - + "'string' uses string to represent values; " - + "'double' represents values using Java's 'double', which may not offer the precision but will be far easier to use in consumers."); + "Specify how GEOGRAPHY and GEOGRAPHYPOINT columns should be represented in change events, including: " + + "'geometry' (the default) uses io.debezium.data.geometry.Geometry to represent values, which contains a structure with two fields: srid (INT32): spatial reference system ID that defines the type of geometry object stored in the structure and wkb (BYTES): binary representation of the geometry object encoded in the Well-Known-Binary (wkb) format." + + "'string' uses string to represent values."); protected static final int DEFAULT_SNAPSHOT_FETCH_SIZE = 10_240; protected static final int DEFAULT_PORT = 3306; @@ -202,6 +202,7 @@ public class SingleStoreConnectorConfig extends RelationalDatabaseConnectorConfi DRIVER_PARAMETERS, SNAPSHOT_MODE, BINARY_HANDLING_MODE, + GEOGRAPHY_HANDLING_MODE, OFFSETS) .events( SOURCE_INFO_STRUCT_MAKER, @@ -300,6 +301,16 @@ public List offsets() { return offsets; } + /** + * Returns the Geography mode Enum configuration. This defaults to {@code geometry} if nothing is + * provided. + */ + public GeographyMode getGeographyMode() { + return GeographyHandlingMode + .parse(this.getConfig().getString(GEOGRAPHY_HANDLING_MODE)) + .asDecimalMode(); + } + /** * The set of predefined SnapshotMode options or aliases. */ diff --git a/src/main/java/com/singlestore/debezium/SingleStoreConnectorTask.java b/src/main/java/com/singlestore/debezium/SingleStoreConnectorTask.java index c788347..4e3ea3c 100644 --- a/src/main/java/com/singlestore/debezium/SingleStoreConnectorTask.java +++ b/src/main/java/com/singlestore/debezium/SingleStoreConnectorTask.java @@ -63,7 +63,7 @@ public ChangeEventSourceCoordinator @@ -42,8 +44,11 @@ public class SingleStoreValueConverters extends JdbcValueConverters { */ public SingleStoreValueConverters(DecimalMode decimalMode, TemporalPrecisionMode temporalPrecisionMode, - CommonConnectorConfig.BinaryHandlingMode binaryMode) { + CommonConnectorConfig.BinaryHandlingMode binaryMode, + GeographyMode geographyMode + ) { super(decimalMode, temporalPrecisionMode, ZoneOffset.UTC, null, null, binaryMode); + this.geographyMode = geographyMode; } @Override @@ -54,7 +59,11 @@ public SchemaBuilder schemaBuilder(Column column) { return Json.builder(); case "GEOGRAPHYPOINT": case "GEOGRAPHY": - return io.debezium.data.geometry.Geometry.builder(); + if (geographyMode == GeographyMode.GEOMETRY) { + return io.debezium.data.geometry.Geometry.builder(); + } else { + return SchemaBuilder.string(); + } case "ENUM": case "SET": return SchemaBuilder.string(); @@ -113,7 +122,11 @@ public ValueConverter converter(Column column, Field fieldDefn) { return (data) -> convertString(column, fieldDefn, data); case "GEOGRAPHYPOINT": case "GEOGRAPHY": - return data -> convertGeometry(column, fieldDefn, data); + if (geographyMode == GeographyMode.GEOMETRY) { + return data -> convertGeometry(column, fieldDefn, data); + } else { + return data -> convertString(column, fieldDefn, data); + } case "ENUM": case "SET": return data -> convertString(column, fieldDefn, data); diff --git a/src/test/java/com/singlestore/debezium/IntegrationTestBase.java b/src/test/java/com/singlestore/debezium/IntegrationTestBase.java index 07f8309..cddc468 100644 --- a/src/test/java/com/singlestore/debezium/IntegrationTestBase.java +++ b/src/test/java/com/singlestore/debezium/IntegrationTestBase.java @@ -32,7 +32,7 @@ abstract class IntegrationTestBase extends AbstractConnectorTest { static final String TEST_TOPIC_PREFIX = "singlestore_topic"; private static final String TEST_SERVER_VERSION = System.getProperty("singlestore.version", ""); private static final String TEST_USER = System.getProperty("singlestore.user", "root"); - private static final String TEST_PASSWORD = System.getProperty("singlestore.password", ""); + private static final String TEST_PASSWORD = "1"; private static final Logger LOGGER = LoggerFactory.getLogger(IntegrationTestBase.class); private static final String TEST_IMAGE = System.getProperty("singlestore.image", "ghcr.io/singlestore-labs/singlestoredb-dev:latest"); diff --git a/src/test/java/com/singlestore/debezium/SingleStoreDatabaseSchemaIT.java b/src/test/java/com/singlestore/debezium/SingleStoreDatabaseSchemaIT.java index 2258ffe..45da4c8 100644 --- a/src/test/java/com/singlestore/debezium/SingleStoreDatabaseSchemaIT.java +++ b/src/test/java/com/singlestore/debezium/SingleStoreDatabaseSchemaIT.java @@ -4,6 +4,7 @@ import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; +import com.singlestore.debezium.SingleStoreValueConverters.GeographyMode; import io.debezium.config.CommonConnectorConfig; import io.debezium.config.Configuration; import io.debezium.data.Bits; @@ -33,8 +34,8 @@ public class SingleStoreDatabaseSchemaIT extends IntegrationTestBase { private static final SingleStoreValueConverters CONVERTERS = new SingleStoreValueConverters( - JdbcValueConverters.DecimalMode.DOUBLE, - TemporalPrecisionMode.CONNECT, CommonConnectorConfig.BinaryHandlingMode.BYTES); + JdbcValueConverters.DecimalMode.DOUBLE, TemporalPrecisionMode.CONNECT, + CommonConnectorConfig.BinaryHandlingMode.BYTES, GeographyMode.GEOMETRY); private SingleStoreDatabaseSchema schema; public static SingleStoreDatabaseSchema getSchema(SingleStoreConnectorConfig config) { diff --git a/src/test/java/com/singlestore/debezium/SingleStoreDefaultValueConverterIT.java b/src/test/java/com/singlestore/debezium/SingleStoreDefaultValueConverterIT.java index e095435..b9395e5 100644 --- a/src/test/java/com/singlestore/debezium/SingleStoreDefaultValueConverterIT.java +++ b/src/test/java/com/singlestore/debezium/SingleStoreDefaultValueConverterIT.java @@ -1,5 +1,6 @@ package com.singlestore.debezium; +import com.singlestore.debezium.SingleStoreValueConverters.GeographyMode; import io.debezium.config.CommonConnectorConfig; import io.debezium.jdbc.JdbcValueConverters; import io.debezium.jdbc.TemporalPrecisionMode; @@ -26,8 +27,8 @@ public class SingleStoreDefaultValueConverterIT extends IntegrationTestBase { private static final SingleStoreValueConverters CONVERTERS = new SingleStoreValueConverters( - JdbcValueConverters.DecimalMode.DOUBLE, - TemporalPrecisionMode.ADAPTIVE, CommonConnectorConfig.BinaryHandlingMode.BYTES); + JdbcValueConverters.DecimalMode.DOUBLE, TemporalPrecisionMode.ADAPTIVE, + CommonConnectorConfig.BinaryHandlingMode.BYTES, GeographyMode.GEOMETRY); @Test public void testNumberValues() { @@ -118,6 +119,41 @@ public void testGeometryValues() throws ParseException { } } + @Test + public void testGeometryStringValues() { + try (SingleStoreConnection conn = new SingleStoreConnection(defaultJdbcConnectionConfig())) { + conn.execute(String.format("USE %s", TEST_DATABASE)); + conn.execute( + "CREATE ROWSTORE TABLE IF NOT EXISTS testGeometryStringValues(geographyColumn GEOGRAPHY DEFAULT 'POLYGON((1 1,2 1,2 2, 1 2, 1 1))', geographypointColumn GEOGRAPHYPOINT DEFAULT 'POINT(1.50000003 1.50000000)')"); + Tables tables = new Tables(); + conn.readSchema(tables, TEST_DATABASE, null, null, null, true); + Table table = tables.forTable(TEST_DATABASE, null, "testGeometryStringValues"); + assertThat(table).isNotNull(); + + SingleStoreValueConverters converters = new SingleStoreValueConverters( + JdbcValueConverters.DecimalMode.DOUBLE, TemporalPrecisionMode.CONNECT, + CommonConnectorConfig.BinaryHandlingMode.BYTES, GeographyMode.STRING); + SingleStoreDefaultValueConverter defaultValueConverter = new SingleStoreDefaultValueConverter( + converters); + + Column geographyColumn = table.columnWithName("geographyColumn"); + Optional geographyDefaultValue = defaultValueConverter.parseDefaultValue( + geographyColumn, geographyColumn.defaultValueExpression().orElse(null)); + assertTrue(geographyDefaultValue.isPresent()); + String geographyColumnDefaultValue = (String) geographyDefaultValue.get(); + assertEquals("POLYGON((1 1,2 1,2 2, 1 2, 1 1))", geographyColumnDefaultValue); + + Column geographypointColumn = table.columnWithName("geographypointColumn"); + Optional geographypointDefaultValue = defaultValueConverter.parseDefaultValue( + geographypointColumn, geographypointColumn.defaultValueExpression().orElse(null)); + assertTrue(geographypointDefaultValue.isPresent()); + String geographypointColumnDefaultValue = (String) geographypointDefaultValue.get(); + assertEquals("POINT(1.50000003 1.50000000)", geographypointColumnDefaultValue); + } catch (SQLException e) { + Assert.fail(e.getMessage()); + } + } + @Test public void testStringValues() { try (SingleStoreConnection conn = new SingleStoreConnection(defaultJdbcConnectionConfig())) { diff --git a/src/test/java/com/singlestore/debezium/SingleStoreValueConvertersIT.java b/src/test/java/com/singlestore/debezium/SingleStoreValueConvertersIT.java index c08e135..56e32cc 100644 --- a/src/test/java/com/singlestore/debezium/SingleStoreValueConvertersIT.java +++ b/src/test/java/com/singlestore/debezium/SingleStoreValueConvertersIT.java @@ -1,5 +1,6 @@ package com.singlestore.debezium; +import com.singlestore.debezium.SingleStoreValueConverters.GeographyMode; import com.singlestore.jdbc.SingleStoreBlob; import io.debezium.config.CommonConnectorConfig; import io.debezium.jdbc.JdbcValueConverters; @@ -30,7 +31,7 @@ public class SingleStoreValueConvertersIT extends IntegrationTestBase { private static final SingleStoreValueConverters CONVERTERS = new SingleStoreValueConverters( JdbcValueConverters.DecimalMode.DOUBLE, TemporalPrecisionMode.CONNECT, - CommonConnectorConfig.BinaryHandlingMode.BYTES); + CommonConnectorConfig.BinaryHandlingMode.BYTES, GeographyMode.GEOMETRY); @Test public void testNumberValues() { @@ -69,7 +70,8 @@ public void testDecimalModeValues() { private void testDecimalModeValues(JdbcValueConverters.DecimalMode mode) { SingleStoreValueConverters converters = new SingleStoreValueConverters(mode, - TemporalPrecisionMode.CONNECT, CommonConnectorConfig.BinaryHandlingMode.BYTES); + TemporalPrecisionMode.CONNECT, CommonConnectorConfig.BinaryHandlingMode.BYTES, + GeographyMode.GEOMETRY); try (SingleStoreConnection conn = new SingleStoreConnection(defaultJdbcConnectionConfig())) { Tables tables = new Tables(); conn.readSchema(tables, TEST_DATABASE, null, null, null, true); @@ -91,7 +93,7 @@ private void testDecimalModeValues(JdbcValueConverters.DecimalMode mode) { } @Test - public void testGeometry() throws ParseException { + public void testGeographyGeometry() throws ParseException { String geographyValue = "POLYGON ((1 1, 2 1, 2 2, 1 2, 1 1))"; String geographyPointValue = "POINT(1.50000003 1.50000000)"; SingleStoreGeometry singleStoregeographyValue = SingleStoreGeometry.fromEkt(geographyValue); @@ -114,6 +116,30 @@ public void testGeometry() throws ParseException { } } + @Test + public void testGeographyString() throws ParseException { + String geographyValue = "POLYGON ((1 1, 2 1, 2 2, 1 2, 1 1))"; + String geographyPointValue = "POINT(1.50000003 1.50000000)"; + try (SingleStoreConnection conn = new SingleStoreConnection(defaultJdbcConnectionConfig())) { + Tables tables = new Tables(); + conn.readSchema(tables, TEST_DATABASE, null, null, null, true); + Table table = tables.forTable(TEST_DATABASE, null, "allTypesTable"); + assertThat(table).isNotNull(); + + SingleStoreValueConverters converters = new SingleStoreValueConverters( + JdbcValueConverters.DecimalMode.DOUBLE, TemporalPrecisionMode.CONNECT, + CommonConnectorConfig.BinaryHandlingMode.BYTES, GeographyMode.STRING); + String convertedPolygon = (String) convertColumnValue(converters, table, "geographyColumn", + geographyValue); + assertEquals("POLYGON ((1 1, 2 1, 2 2, 1 2, 1 1))", convertedPolygon); + String convertedPoint = (String) convertColumnValue(converters, table, "geographypointColumn", + geographyPointValue); + assertEquals("POINT(1.50000003 1.50000000)", convertedPoint); + } catch (SQLException e) { + Assert.fail(e.getMessage()); + } + } + @Test public void testTimeAndDateValues() { testTimeAndDateValues(TemporalPrecisionMode.CONNECT); @@ -124,7 +150,7 @@ public void testTimeAndDateValues() { private void testTimeAndDateValues(TemporalPrecisionMode mode) { SingleStoreValueConverters converters = new SingleStoreValueConverters( JdbcValueConverters.DecimalMode.DOUBLE, mode, - CommonConnectorConfig.BinaryHandlingMode.BYTES); + CommonConnectorConfig.BinaryHandlingMode.BYTES, GeographyMode.GEOMETRY); try (SingleStoreConnection conn = new SingleStoreConnection(defaultJdbcConnectionConfig())) { Tables tables = new Tables(); conn.readSchema(tables, TEST_DATABASE, null, null, null, true); @@ -222,7 +248,8 @@ public void testBinaryMode() { private void testBinaryMode(CommonConnectorConfig.BinaryHandlingMode mode) { SingleStoreValueConverters converters = new SingleStoreValueConverters( - JdbcValueConverters.DecimalMode.DOUBLE, TemporalPrecisionMode.CONNECT, mode); + JdbcValueConverters.DecimalMode.DOUBLE, TemporalPrecisionMode.CONNECT, mode, + GeographyMode.GEOMETRY); try (SingleStoreConnection conn = new SingleStoreConnection(defaultJdbcConnectionConfig())) { Tables tables = new Tables(); conn.readSchema(tables, TEST_DATABASE, null, null, null, true); From 55f2344d0a10bbb4ab8c7655895aba023060d267 Mon Sep 17 00:00:00 2001 From: Adalbert Makarovych Date: Wed, 13 Nov 2024 14:49:53 +0200 Subject: [PATCH 3/3] Removed debug changes --- src/test/java/com/singlestore/debezium/IntegrationTestBase.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/test/java/com/singlestore/debezium/IntegrationTestBase.java b/src/test/java/com/singlestore/debezium/IntegrationTestBase.java index cddc468..07f8309 100644 --- a/src/test/java/com/singlestore/debezium/IntegrationTestBase.java +++ b/src/test/java/com/singlestore/debezium/IntegrationTestBase.java @@ -32,7 +32,7 @@ abstract class IntegrationTestBase extends AbstractConnectorTest { static final String TEST_TOPIC_PREFIX = "singlestore_topic"; private static final String TEST_SERVER_VERSION = System.getProperty("singlestore.version", ""); private static final String TEST_USER = System.getProperty("singlestore.user", "root"); - private static final String TEST_PASSWORD = "1"; + private static final String TEST_PASSWORD = System.getProperty("singlestore.password", ""); private static final Logger LOGGER = LoggerFactory.getLogger(IntegrationTestBase.class); private static final String TEST_IMAGE = System.getProperty("singlestore.image", "ghcr.io/singlestore-labs/singlestoredb-dev:latest");