Skip to content

Commit

Permalink
Added an option to read geography as strings (#62)
Browse files Browse the repository at this point in the history
* Added an option to tread geography as strings

---------

Co-authored-by: Adalbert Makarovych <[email protected]>
  • Loading branch information
AdalbertMemSQL and Adalbert Makarovych authored Nov 13, 2024
1 parent d91cdfb commit e8d1c6e
Show file tree
Hide file tree
Showing 7 changed files with 249 additions and 64 deletions.
85 changes: 46 additions & 39 deletions README.md

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
package com.singlestore.debezium;

import com.singlestore.debezium.SingleStoreValueConverters.GeographyMode;
import io.debezium.config.CommonConnectorConfig;
import io.debezium.config.ConfigDefinition;
import io.debezium.config.Configuration;
import io.debezium.config.EnumeratedValue;
import io.debezium.config.Field;
import io.debezium.connector.SourceInfoStructMaker;
import io.debezium.jdbc.JdbcValueConverters.DecimalMode;
import io.debezium.relational.ColumnFilterMode;
import io.debezium.relational.RelationalDatabaseConnectorConfig;
import io.debezium.relational.RelationalTableFilters;
Expand Down Expand Up @@ -139,6 +141,18 @@ public class SingleStoreConnectorConfig extends RelationalDatabaseConnectorConfi
+ "Example: 0000000000000077000000000000000E000000000000E06E,0x0000000000000077000000000000000E000000000000E087,0000000000000077000000000000000E000000000000E088");
public static final Field TOPIC_NAMING_STRATEGY = CommonConnectorConfig.TOPIC_NAMING_STRATEGY
.withDefault(DefaultTopicNamingStrategy.class.getName());

public static final Field GEOGRAPHY_HANDLING_MODE = Field.create("geography.handling.mode")
.withDisplayName("Geography Handling")
.withGroup(Field.createGroupEntry(Field.Group.CONNECTOR, 8))
.withEnum(GeographyHandlingMode.class, GeographyHandlingMode.GEOMETRY)
.withWidth(Width.SHORT)
.withImportance(Importance.MEDIUM)
.withDescription(
"Specify how GEOGRAPHY and GEOGRAPHYPOINT columns should be represented in change events, including: "
+ "'geometry' (the default) uses io.debezium.data.geometry.Geometry to represent values, which contains a structure with two fields: srid (INT32): spatial reference system ID that defines the type of geometry object stored in the structure and wkb (BYTES): binary representation of the geometry object encoded in the Well-Known-Binary (wkb) format."
+ "'string' uses string to represent values.");

protected static final int DEFAULT_SNAPSHOT_FETCH_SIZE = 10_240;
protected static final int DEFAULT_PORT = 3306;
public static final Field PORT = RelationalDatabaseConnectorConfig.PORT
Expand Down Expand Up @@ -188,6 +202,7 @@ public class SingleStoreConnectorConfig extends RelationalDatabaseConnectorConfi
DRIVER_PARAMETERS,
SNAPSHOT_MODE,
BINARY_HANDLING_MODE,
GEOGRAPHY_HANDLING_MODE,
OFFSETS)
.events(
SOURCE_INFO_STRUCT_MAKER,
Expand Down Expand Up @@ -286,6 +301,16 @@ public List<String> offsets() {
return offsets;
}

/**
* Returns the Geography mode Enum configuration. This defaults to {@code geometry} if nothing is
* provided.
*/
public GeographyMode getGeographyMode() {
return GeographyHandlingMode
.parse(this.getConfig().getString(GEOGRAPHY_HANDLING_MODE))
.asDecimalMode();
}

/**
* The set of predefined SnapshotMode options or aliases.
*/
Expand Down Expand Up @@ -468,6 +493,77 @@ public String getValue() {
}
}

/**
* The set of predefined GeographyHandlingMode options or aliases.
*/
public enum GeographyHandlingMode implements EnumeratedValue {
/**
* Represent {@code GEOGRAPHY} and {@code GEOGRAPHYPOINT} values as geometry
* {@link io.debezium.data.geometry.Geometry} values.
*/
GEOMETRY("geometry"),

/**
* Represent {@code GEOGRAPHY} and {@code GEOGRAPHYPOINT} values as a string values.
*/
STRING("string");

private final String value;

GeographyHandlingMode(String value) {
this.value = value;
}

/**
* Determine if the supplied value is one of the predefined options.
*
* @param value the configuration property value; may not be null
* @return the matching option, or null if no match is found
*/
public static GeographyHandlingMode parse(String value) {
if (value == null) {
return null;
}
value = value.trim();
for (GeographyHandlingMode option : GeographyHandlingMode.values()) {
if (option.getValue().equalsIgnoreCase(value)) {
return option;
}
}
return null;
}

/**
* Determine if the supplied value is one of the predefined options.
*
* @param value the configuration property value; may not be null
* @param defaultValue the default value; may be null
* @return the matching option, or null if no match is found and the non-null default is invalid
*/
public static GeographyHandlingMode parse(String value, String defaultValue) {
GeographyHandlingMode mode = parse(value);
if (mode == null && defaultValue != null) {
mode = parse(defaultValue);
}
return mode;
}

@Override
public String getValue() {
return value;
}

public GeographyMode asDecimalMode() {
switch (this) {
case STRING:
return GeographyMode.STRING;
case GEOMETRY:
default:
return GeographyMode.GEOMETRY;
}
}
}

private static class SystemTablesPredicate implements TableFilter {

protected static final List<String> SYSTEM_SCHEMAS = Arrays
Expand All @@ -478,5 +574,4 @@ public boolean isIncluded(TableId t) {
return t.catalog() != null && !SYSTEM_SCHEMAS.contains(t.catalog().toLowerCase());
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ public ChangeEventSourceCoordinator<SingleStorePartition, SingleStoreOffsetConte
SingleStoreConnectorConfig.TOPIC_NAMING_STRATEGY);
final SingleStoreValueConverters valueConverter = new SingleStoreValueConverters(
connectorConfig.getDecimalMode(), connectorConfig.getTemporalPrecisionMode(),
connectorConfig.binaryHandlingMode());
connectorConfig.binaryHandlingMode(), connectorConfig.getGeographyMode());
final SingleStoreDefaultValueConverter defaultValueConverter = new SingleStoreDefaultValueConverter(
valueConverter);

Expand Down
Original file line number Diff line number Diff line change
@@ -1,32 +1,35 @@
package com.singlestore.debezium;

import com.singlestore.jdbc.SingleStoreBlob;
import java.time.Duration;
import java.time.LocalDateTime;
import java.time.LocalTime;
import org.locationtech.jts.io.ParseException;

import io.debezium.config.CommonConnectorConfig;
import io.debezium.data.Json;
import io.debezium.jdbc.JdbcValueConverters;
import io.debezium.jdbc.TemporalPrecisionMode;
import io.debezium.relational.Column;
import io.debezium.relational.ValueConverter;
import io.debezium.time.*;
import io.debezium.time.Date;
import io.debezium.time.MicroTime;
import io.debezium.time.MicroTimestamp;
import io.debezium.time.Timestamp;
import io.debezium.time.Year;
import io.debezium.util.IoUtil;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.source.SourceRecord;

import java.io.IOException;
import java.nio.ByteOrder;
import java.sql.SQLException;
import java.time.Duration;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.time.temporal.ChronoField;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.source.SourceRecord;
import org.locationtech.jts.io.ParseException;

public class SingleStoreValueConverters extends JdbcValueConverters {

private final GeographyMode geographyMode;

/**
* Create a new instance of JdbcValueConverters.
* <p>
Expand All @@ -41,8 +44,11 @@ public class SingleStoreValueConverters extends JdbcValueConverters {
*/
public SingleStoreValueConverters(DecimalMode decimalMode,
TemporalPrecisionMode temporalPrecisionMode,
CommonConnectorConfig.BinaryHandlingMode binaryMode) {
CommonConnectorConfig.BinaryHandlingMode binaryMode,
GeographyMode geographyMode
) {
super(decimalMode, temporalPrecisionMode, ZoneOffset.UTC, null, null, binaryMode);
this.geographyMode = geographyMode;
}

@Override
Expand All @@ -53,7 +59,11 @@ public SchemaBuilder schemaBuilder(Column column) {
return Json.builder();
case "GEOGRAPHYPOINT":
case "GEOGRAPHY":
return io.debezium.data.geometry.Geometry.builder();
if (geographyMode == GeographyMode.GEOMETRY) {
return io.debezium.data.geometry.Geometry.builder();
} else {
return SchemaBuilder.string();
}
case "ENUM":
case "SET":
return SchemaBuilder.string();
Expand Down Expand Up @@ -112,7 +122,11 @@ public ValueConverter converter(Column column, Field fieldDefn) {
return (data) -> convertString(column, fieldDefn, data);
case "GEOGRAPHYPOINT":
case "GEOGRAPHY":
return data -> convertGeometry(column, fieldDefn, data);
if (geographyMode == GeographyMode.GEOMETRY) {
return data -> convertGeometry(column, fieldDefn, data);
} else {
return data -> convertString(column, fieldDefn, data);
}
case "ENUM":
case "SET":
return data -> convertString(column, fieldDefn, data);
Expand Down Expand Up @@ -311,4 +325,9 @@ protected Object convertGeometry(Column column, Field fieldDefn, Object data)
}
});
}

public enum GeographyMode {
GEOMETRY,
STRING
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;

import com.singlestore.debezium.SingleStoreValueConverters.GeographyMode;
import io.debezium.config.CommonConnectorConfig;
import io.debezium.config.Configuration;
import io.debezium.data.Bits;
Expand Down Expand Up @@ -33,8 +34,8 @@
public class SingleStoreDatabaseSchemaIT extends IntegrationTestBase {

private static final SingleStoreValueConverters CONVERTERS = new SingleStoreValueConverters(
JdbcValueConverters.DecimalMode.DOUBLE,
TemporalPrecisionMode.CONNECT, CommonConnectorConfig.BinaryHandlingMode.BYTES);
JdbcValueConverters.DecimalMode.DOUBLE, TemporalPrecisionMode.CONNECT,
CommonConnectorConfig.BinaryHandlingMode.BYTES, GeographyMode.GEOMETRY);
private SingleStoreDatabaseSchema schema;

public static SingleStoreDatabaseSchema getSchema(SingleStoreConnectorConfig config) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.singlestore.debezium;

import com.singlestore.debezium.SingleStoreValueConverters.GeographyMode;
import io.debezium.config.CommonConnectorConfig;
import io.debezium.jdbc.JdbcValueConverters;
import io.debezium.jdbc.TemporalPrecisionMode;
Expand All @@ -26,8 +27,8 @@
public class SingleStoreDefaultValueConverterIT extends IntegrationTestBase {

private static final SingleStoreValueConverters CONVERTERS = new SingleStoreValueConverters(
JdbcValueConverters.DecimalMode.DOUBLE,
TemporalPrecisionMode.ADAPTIVE, CommonConnectorConfig.BinaryHandlingMode.BYTES);
JdbcValueConverters.DecimalMode.DOUBLE, TemporalPrecisionMode.ADAPTIVE,
CommonConnectorConfig.BinaryHandlingMode.BYTES, GeographyMode.GEOMETRY);

@Test
public void testNumberValues() {
Expand Down Expand Up @@ -118,6 +119,41 @@ public void testGeometryValues() throws ParseException {
}
}

@Test
public void testGeometryStringValues() {
try (SingleStoreConnection conn = new SingleStoreConnection(defaultJdbcConnectionConfig())) {
conn.execute(String.format("USE %s", TEST_DATABASE));
conn.execute(
"CREATE ROWSTORE TABLE IF NOT EXISTS testGeometryStringValues(geographyColumn GEOGRAPHY DEFAULT 'POLYGON((1 1,2 1,2 2, 1 2, 1 1))', geographypointColumn GEOGRAPHYPOINT DEFAULT 'POINT(1.50000003 1.50000000)')");
Tables tables = new Tables();
conn.readSchema(tables, TEST_DATABASE, null, null, null, true);
Table table = tables.forTable(TEST_DATABASE, null, "testGeometryStringValues");
assertThat(table).isNotNull();

SingleStoreValueConverters converters = new SingleStoreValueConverters(
JdbcValueConverters.DecimalMode.DOUBLE, TemporalPrecisionMode.CONNECT,
CommonConnectorConfig.BinaryHandlingMode.BYTES, GeographyMode.STRING);
SingleStoreDefaultValueConverter defaultValueConverter = new SingleStoreDefaultValueConverter(
converters);

Column geographyColumn = table.columnWithName("geographyColumn");
Optional<Object> geographyDefaultValue = defaultValueConverter.parseDefaultValue(
geographyColumn, geographyColumn.defaultValueExpression().orElse(null));
assertTrue(geographyDefaultValue.isPresent());
String geographyColumnDefaultValue = (String) geographyDefaultValue.get();
assertEquals("POLYGON((1 1,2 1,2 2, 1 2, 1 1))", geographyColumnDefaultValue);

Column geographypointColumn = table.columnWithName("geographypointColumn");
Optional<Object> geographypointDefaultValue = defaultValueConverter.parseDefaultValue(
geographypointColumn, geographypointColumn.defaultValueExpression().orElse(null));
assertTrue(geographypointDefaultValue.isPresent());
String geographypointColumnDefaultValue = (String) geographypointDefaultValue.get();
assertEquals("POINT(1.50000003 1.50000000)", geographypointColumnDefaultValue);
} catch (SQLException e) {
Assert.fail(e.getMessage());
}
}

@Test
public void testStringValues() {
try (SingleStoreConnection conn = new SingleStoreConnection(defaultJdbcConnectionConfig())) {
Expand Down
Loading

0 comments on commit e8d1c6e

Please sign in to comment.