Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added an option to read geography as strings #62

Merged
merged 3 commits into from
Nov 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
85 changes: 46 additions & 39 deletions README.md

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
package com.singlestore.debezium;

import com.singlestore.debezium.SingleStoreValueConverters.GeographyMode;
import io.debezium.config.CommonConnectorConfig;
import io.debezium.config.ConfigDefinition;
import io.debezium.config.Configuration;
import io.debezium.config.EnumeratedValue;
import io.debezium.config.Field;
import io.debezium.connector.SourceInfoStructMaker;
import io.debezium.jdbc.JdbcValueConverters.DecimalMode;
import io.debezium.relational.ColumnFilterMode;
import io.debezium.relational.RelationalDatabaseConnectorConfig;
import io.debezium.relational.RelationalTableFilters;
Expand Down Expand Up @@ -139,6 +141,18 @@ public class SingleStoreConnectorConfig extends RelationalDatabaseConnectorConfi
+ "Example: 0000000000000077000000000000000E000000000000E06E,0x0000000000000077000000000000000E000000000000E087,0000000000000077000000000000000E000000000000E088");
public static final Field TOPIC_NAMING_STRATEGY = CommonConnectorConfig.TOPIC_NAMING_STRATEGY
.withDefault(DefaultTopicNamingStrategy.class.getName());

public static final Field GEOGRAPHY_HANDLING_MODE = Field.create("geography.handling.mode")
.withDisplayName("Geography Handling")
.withGroup(Field.createGroupEntry(Field.Group.CONNECTOR, 8))
.withEnum(GeographyHandlingMode.class, GeographyHandlingMode.GEOMETRY)
.withWidth(Width.SHORT)
.withImportance(Importance.MEDIUM)
.withDescription(
"Specify how GEOGRAPHY and GEOGRAPHYPOINT columns should be represented in change events, including: "
+ "'geometry' (the default) uses io.debezium.data.geometry.Geometry to represent values, which contains a structure with two fields: srid (INT32): spatial reference system ID that defines the type of geometry object stored in the structure and wkb (BYTES): binary representation of the geometry object encoded in the Well-Known-Binary (wkb) format."
+ "'string' uses string to represent values.");

protected static final int DEFAULT_SNAPSHOT_FETCH_SIZE = 10_240;
protected static final int DEFAULT_PORT = 3306;
public static final Field PORT = RelationalDatabaseConnectorConfig.PORT
Expand Down Expand Up @@ -188,6 +202,7 @@ public class SingleStoreConnectorConfig extends RelationalDatabaseConnectorConfi
DRIVER_PARAMETERS,
SNAPSHOT_MODE,
BINARY_HANDLING_MODE,
GEOGRAPHY_HANDLING_MODE,
OFFSETS)
.events(
SOURCE_INFO_STRUCT_MAKER,
Expand Down Expand Up @@ -286,6 +301,16 @@ public List<String> offsets() {
return offsets;
}

/**
* Returns the Geography mode Enum configuration. This defaults to {@code geometry} if nothing is
* provided.
*/
public GeographyMode getGeographyMode() {
return GeographyHandlingMode
.parse(this.getConfig().getString(GEOGRAPHY_HANDLING_MODE))
.asDecimalMode();
}

/**
* The set of predefined SnapshotMode options or aliases.
*/
Expand Down Expand Up @@ -468,6 +493,77 @@ public String getValue() {
}
}

/**
* The set of predefined GeographyHandlingMode options or aliases.
*/
public enum GeographyHandlingMode implements EnumeratedValue {
/**
* Represent {@code GEOGRAPHY} and {@code GEOGRAPHYPOINT} values as geometry
* {@link io.debezium.data.geometry.Geometry} values.
*/
GEOMETRY("geometry"),

/**
* Represent {@code GEOGRAPHY} and {@code GEOGRAPHYPOINT} values as a string values.
*/
STRING("string");

private final String value;

GeographyHandlingMode(String value) {
this.value = value;
}

/**
* Determine if the supplied value is one of the predefined options.
*
* @param value the configuration property value; may not be null
* @return the matching option, or null if no match is found
*/
public static GeographyHandlingMode parse(String value) {
if (value == null) {
return null;
}
value = value.trim();
for (GeographyHandlingMode option : GeographyHandlingMode.values()) {
if (option.getValue().equalsIgnoreCase(value)) {
return option;
}
}
return null;
}

/**
* Determine if the supplied value is one of the predefined options.
*
* @param value the configuration property value; may not be null
* @param defaultValue the default value; may be null
* @return the matching option, or null if no match is found and the non-null default is invalid
*/
public static GeographyHandlingMode parse(String value, String defaultValue) {
GeographyHandlingMode mode = parse(value);
if (mode == null && defaultValue != null) {
mode = parse(defaultValue);
}
return mode;
}

@Override
public String getValue() {
return value;
}

public GeographyMode asDecimalMode() {
switch (this) {
case STRING:
return GeographyMode.STRING;
case GEOMETRY:
default:
return GeographyMode.GEOMETRY;
}
}
}

private static class SystemTablesPredicate implements TableFilter {

protected static final List<String> SYSTEM_SCHEMAS = Arrays
Expand All @@ -478,5 +574,4 @@ public boolean isIncluded(TableId t) {
return t.catalog() != null && !SYSTEM_SCHEMAS.contains(t.catalog().toLowerCase());
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ public ChangeEventSourceCoordinator<SingleStorePartition, SingleStoreOffsetConte
SingleStoreConnectorConfig.TOPIC_NAMING_STRATEGY);
final SingleStoreValueConverters valueConverter = new SingleStoreValueConverters(
connectorConfig.getDecimalMode(), connectorConfig.getTemporalPrecisionMode(),
connectorConfig.binaryHandlingMode());
connectorConfig.binaryHandlingMode(), connectorConfig.getGeographyMode());
final SingleStoreDefaultValueConverter defaultValueConverter = new SingleStoreDefaultValueConverter(
valueConverter);

Expand Down
Original file line number Diff line number Diff line change
@@ -1,32 +1,35 @@
package com.singlestore.debezium;

import com.singlestore.jdbc.SingleStoreBlob;
import java.time.Duration;
import java.time.LocalDateTime;
import java.time.LocalTime;
import org.locationtech.jts.io.ParseException;

import io.debezium.config.CommonConnectorConfig;
import io.debezium.data.Json;
import io.debezium.jdbc.JdbcValueConverters;
import io.debezium.jdbc.TemporalPrecisionMode;
import io.debezium.relational.Column;
import io.debezium.relational.ValueConverter;
import io.debezium.time.*;
import io.debezium.time.Date;
import io.debezium.time.MicroTime;
import io.debezium.time.MicroTimestamp;
import io.debezium.time.Timestamp;
import io.debezium.time.Year;
import io.debezium.util.IoUtil;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.source.SourceRecord;

import java.io.IOException;
import java.nio.ByteOrder;
import java.sql.SQLException;
import java.time.Duration;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.time.temporal.ChronoField;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.source.SourceRecord;
import org.locationtech.jts.io.ParseException;

public class SingleStoreValueConverters extends JdbcValueConverters {

private final GeographyMode geographyMode;

/**
* Create a new instance of JdbcValueConverters.
* <p>
Expand All @@ -41,8 +44,11 @@ public class SingleStoreValueConverters extends JdbcValueConverters {
*/
public SingleStoreValueConverters(DecimalMode decimalMode,
TemporalPrecisionMode temporalPrecisionMode,
CommonConnectorConfig.BinaryHandlingMode binaryMode) {
CommonConnectorConfig.BinaryHandlingMode binaryMode,
GeographyMode geographyMode
) {
super(decimalMode, temporalPrecisionMode, ZoneOffset.UTC, null, null, binaryMode);
this.geographyMode = geographyMode;
}

@Override
Expand All @@ -53,7 +59,11 @@ public SchemaBuilder schemaBuilder(Column column) {
return Json.builder();
case "GEOGRAPHYPOINT":
case "GEOGRAPHY":
return io.debezium.data.geometry.Geometry.builder();
if (geographyMode == GeographyMode.GEOMETRY) {
return io.debezium.data.geometry.Geometry.builder();
} else {
return SchemaBuilder.string();
}
case "ENUM":
case "SET":
return SchemaBuilder.string();
Expand Down Expand Up @@ -112,7 +122,11 @@ public ValueConverter converter(Column column, Field fieldDefn) {
return (data) -> convertString(column, fieldDefn, data);
case "GEOGRAPHYPOINT":
case "GEOGRAPHY":
return data -> convertGeometry(column, fieldDefn, data);
if (geographyMode == GeographyMode.GEOMETRY) {
return data -> convertGeometry(column, fieldDefn, data);
} else {
return data -> convertString(column, fieldDefn, data);
}
case "ENUM":
case "SET":
return data -> convertString(column, fieldDefn, data);
Expand Down Expand Up @@ -311,4 +325,9 @@ protected Object convertGeometry(Column column, Field fieldDefn, Object data)
}
});
}

public enum GeographyMode {
GEOMETRY,
STRING
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;

import com.singlestore.debezium.SingleStoreValueConverters.GeographyMode;
import io.debezium.config.CommonConnectorConfig;
import io.debezium.config.Configuration;
import io.debezium.data.Bits;
Expand Down Expand Up @@ -33,8 +34,8 @@
public class SingleStoreDatabaseSchemaIT extends IntegrationTestBase {

private static final SingleStoreValueConverters CONVERTERS = new SingleStoreValueConverters(
JdbcValueConverters.DecimalMode.DOUBLE,
TemporalPrecisionMode.CONNECT, CommonConnectorConfig.BinaryHandlingMode.BYTES);
JdbcValueConverters.DecimalMode.DOUBLE, TemporalPrecisionMode.CONNECT,
CommonConnectorConfig.BinaryHandlingMode.BYTES, GeographyMode.GEOMETRY);
private SingleStoreDatabaseSchema schema;

public static SingleStoreDatabaseSchema getSchema(SingleStoreConnectorConfig config) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.singlestore.debezium;

import com.singlestore.debezium.SingleStoreValueConverters.GeographyMode;
import io.debezium.config.CommonConnectorConfig;
import io.debezium.jdbc.JdbcValueConverters;
import io.debezium.jdbc.TemporalPrecisionMode;
Expand All @@ -26,8 +27,8 @@
public class SingleStoreDefaultValueConverterIT extends IntegrationTestBase {

private static final SingleStoreValueConverters CONVERTERS = new SingleStoreValueConverters(
JdbcValueConverters.DecimalMode.DOUBLE,
TemporalPrecisionMode.ADAPTIVE, CommonConnectorConfig.BinaryHandlingMode.BYTES);
JdbcValueConverters.DecimalMode.DOUBLE, TemporalPrecisionMode.ADAPTIVE,
CommonConnectorConfig.BinaryHandlingMode.BYTES, GeographyMode.GEOMETRY);

@Test
public void testNumberValues() {
Expand Down Expand Up @@ -118,6 +119,41 @@ public void testGeometryValues() throws ParseException {
}
}

@Test
public void testGeometryStringValues() {
try (SingleStoreConnection conn = new SingleStoreConnection(defaultJdbcConnectionConfig())) {
conn.execute(String.format("USE %s", TEST_DATABASE));
conn.execute(
"CREATE ROWSTORE TABLE IF NOT EXISTS testGeometryStringValues(geographyColumn GEOGRAPHY DEFAULT 'POLYGON((1 1,2 1,2 2, 1 2, 1 1))', geographypointColumn GEOGRAPHYPOINT DEFAULT 'POINT(1.50000003 1.50000000)')");
Tables tables = new Tables();
conn.readSchema(tables, TEST_DATABASE, null, null, null, true);
Table table = tables.forTable(TEST_DATABASE, null, "testGeometryStringValues");
assertThat(table).isNotNull();

SingleStoreValueConverters converters = new SingleStoreValueConverters(
JdbcValueConverters.DecimalMode.DOUBLE, TemporalPrecisionMode.CONNECT,
CommonConnectorConfig.BinaryHandlingMode.BYTES, GeographyMode.STRING);
SingleStoreDefaultValueConverter defaultValueConverter = new SingleStoreDefaultValueConverter(
converters);

Column geographyColumn = table.columnWithName("geographyColumn");
Optional<Object> geographyDefaultValue = defaultValueConverter.parseDefaultValue(
geographyColumn, geographyColumn.defaultValueExpression().orElse(null));
assertTrue(geographyDefaultValue.isPresent());
String geographyColumnDefaultValue = (String) geographyDefaultValue.get();
assertEquals("POLYGON((1 1,2 1,2 2, 1 2, 1 1))", geographyColumnDefaultValue);

Column geographypointColumn = table.columnWithName("geographypointColumn");
Optional<Object> geographypointDefaultValue = defaultValueConverter.parseDefaultValue(
geographypointColumn, geographypointColumn.defaultValueExpression().orElse(null));
assertTrue(geographypointDefaultValue.isPresent());
String geographypointColumnDefaultValue = (String) geographypointDefaultValue.get();
assertEquals("POINT(1.50000003 1.50000000)", geographypointColumnDefaultValue);
} catch (SQLException e) {
Assert.fail(e.getMessage());
}
}

@Test
public void testStringValues() {
try (SingleStoreConnection conn = new SingleStoreConnection(defaultJdbcConnectionConfig())) {
Expand Down
Loading
Loading