diff --git a/README.md b/README.md
index b83b1bda9..827452fdc 100644
--- a/README.md
+++ b/README.md
@@ -79,7 +79,7 @@ First two are good tutorials on MySQL and PostgreSQL respectively.
## Roadmap
-[2024 Roadmap](https://github.com/Altinity/clickhouse-sink-connector/issues/401)
+[2025 Roadmap](https://github.com/Altinity/clickhouse-sink-connector/issues/401)
## Help
diff --git a/doc/architecture.md b/doc/architecture.md
index 1c1171edc..f366c8c19 100644
--- a/doc/architecture.md
+++ b/doc/architecture.md
@@ -5,9 +5,9 @@ using [Debezium](debezium) into a common log format and then applies those
transactions to tables in ClickHouse.
There are two modes of operation.
-* Lightweight Sink Connector - Combines extract and apply operations
+* **Lightweight Sink Connector** - Combines extract and apply operations
into a single process.
-* Kafka Sink Connector - Separates extract and apply operations into separate
+* **Kafka Sink Connector** - Separates extract and apply operations into separate
processes, using a Kafka-compatible event stream for transport between them.
Debezium offers change data capture on a number of database types. The
diff --git a/doc/configuration.md b/doc/configuration.md
index d21ce4dbc..567511a1a 100644
--- a/doc/configuration.md
+++ b/doc/configuration.md
@@ -14,7 +14,7 @@
| clickhouse.server.password | ClickHouse password |
| clickhouse.server.port | ClickHouse port, For TLS(use the correct port `8443` or `443` |
| snapshot.mode | "initial" -> Data that already exists in source database will be replicated. "schema_only" -> Replicate data that is added/modified after the connector is started.\
MySQL: https://debezium.io/documentation/reference/stable/connectors/mysql.html#mysql-property-snapshot-mode \
PostgreSQL: https://debezium.io/documentation/reference/stable/connectors/postgresql.html#postgresql-property-snapshot-mode
MongoDB: initial, never. https://debezium.io/documentation/reference/stable/connectors/mongodb.html |
-| connector.class | MySQL -> "io.debezium.connector.mysql.MySqlConnector"
PostgreSQL ->
Mongo ->
|
+| connector.class | MySQL -> `io.debezium.connector.mysql.MySqlConnector`
PostgreSQL -> `io.debezium.connector.postgresql.PostgresConnector
Mongo -> `io.debezium.connector.mongodb.MongoDbConnector`
|
| offset.storage.file.filename | Offset storage file(This stores the offsets of the source database) MySQL: mysql binlog file and position, gtid set. Make sure this file is durable and its not persisted in temp directories. |
| database.history.file.filename | Database History: Make sure this file is durable and its not persisted in temp directories. |
| schema.history.internal.file.filename | Schema History: Make sure this file is durable and its not persisted in temp directories. |
diff --git a/doc/quickstart.md b/doc/quickstart.md
index 728b28bc0..3714b47e4 100644
--- a/doc/quickstart.md
+++ b/doc/quickstart.md
@@ -30,13 +30,17 @@ sudo apt install clickhouse-client
Use Docker Compose to start containers.
Set the `CLICKHOUSE_SINK_CONNECTOR_LT_IMAGE` to the latest release from the Releases page.
-or run `./getLatestTag.sh` which will set the environment variable
+or run `./getLatestTag.sh` which will print the environment variable
+that need to be exported.
```
cd sink-connector-lightweight/docker
./getLatestTag.sh
```
-
+Example:
+```
+ export CLICKHOUSE_SINK_CONNECTOR_LT_IMAGE=altinity/clickhouse-sink-connector:2.5.0-lt
+```
```
docker compose -f docker-compose-mysql.yml up --renew-anon-volumes
```
diff --git a/sink-connector-lightweight/docker/getLatestRelease.sh b/sink-connector-lightweight/docker/getLatestRelease.sh
index 6a32b9642..30dcbab75 100755
--- a/sink-connector-lightweight/docker/getLatestRelease.sh
+++ b/sink-connector-lightweight/docker/getLatestRelease.sh
@@ -28,7 +28,7 @@ echo -e "\n"
echo "****************************************************************************************************"
# Display a message to the usage of the latest_version in color green
-echo -e "\e[32m export CLICKHOUSE_SINK_CONNECTOR_LT_IMAGE=altinity/clickhouse-sink-connector:$latest_version-lt'\e[0m"
+echo -e "\e[32m export CLICKHOUSE_SINK_CONNECTOR_LT_IMAGE=altinity/clickhouse-sink-connector:$latest_version-lt\e[0m"
echo "****************************************************************************************************"
echo -e "\n"
diff --git a/sink-connector-lightweight/pom.xml b/sink-connector-lightweight/pom.xml
index 411c24da7..e7aff1665 100644
--- a/sink-connector-lightweight/pom.xml
+++ b/sink-connector-lightweight/pom.xml
@@ -209,7 +209,7 @@
org.yaml
snakeyaml
- 1.33
+ 2.0
diff --git a/sink-connector-lightweight/src/main/java/com/altinity/clickhouse/debezium/embedded/ClickHouseDebeziumEmbeddedApplication.java b/sink-connector-lightweight/src/main/java/com/altinity/clickhouse/debezium/embedded/ClickHouseDebeziumEmbeddedApplication.java
index 90dc38a1f..058c8a5ba 100644
--- a/sink-connector-lightweight/src/main/java/com/altinity/clickhouse/debezium/embedded/ClickHouseDebeziumEmbeddedApplication.java
+++ b/sink-connector-lightweight/src/main/java/com/altinity/clickhouse/debezium/embedded/ClickHouseDebeziumEmbeddedApplication.java
@@ -5,7 +5,6 @@
import com.altinity.clickhouse.debezium.embedded.common.PropertiesHelper;
import com.altinity.clickhouse.debezium.embedded.config.ConfigLoader;
import com.altinity.clickhouse.debezium.embedded.config.ConfigurationService;
-import com.altinity.clickhouse.debezium.embedded.ddl.parser.DDLParserService;
import com.altinity.clickhouse.debezium.embedded.parser.DebeziumRecordParserService;
import com.altinity.clickhouse.sink.connector.ClickHouseSinkConnectorConfig;
import com.altinity.clickhouse.sink.connector.ClickHouseSinkConnectorConfigVariables;
diff --git a/sink-connector-lightweight/src/main/java/com/altinity/clickhouse/debezium/embedded/cdc/DebeziumChangeEventCapture.java b/sink-connector-lightweight/src/main/java/com/altinity/clickhouse/debezium/embedded/cdc/DebeziumChangeEventCapture.java
index 3f102b4ed..0da7f1d7d 100644
--- a/sink-connector-lightweight/src/main/java/com/altinity/clickhouse/debezium/embedded/cdc/DebeziumChangeEventCapture.java
+++ b/sink-connector-lightweight/src/main/java/com/altinity/clickhouse/debezium/embedded/cdc/DebeziumChangeEventCapture.java
@@ -1,8 +1,8 @@
package com.altinity.clickhouse.debezium.embedded.cdc;
import com.altinity.clickhouse.debezium.embedded.common.PropertiesHelper;
+import com.altinity.clickhouse.debezium.embedded.config.ColumnOverrideParser;
import com.altinity.clickhouse.debezium.embedded.config.SinkConnectorLightWeightConfig;
-import com.altinity.clickhouse.debezium.embedded.ddl.parser.DDLParserService;
import com.altinity.clickhouse.debezium.embedded.ddl.parser.MySQLDDLParserService;
import com.altinity.clickhouse.debezium.embedded.parser.DebeziumRecordParserService;
import com.altinity.clickhouse.sink.connector.ClickHouseSinkConnectorConfig;
@@ -871,6 +871,7 @@ public void setup(Properties props, DebeziumRecordParserService debeziumRecordPa
Metrics.initialize(props.getProperty(ClickHouseSinkConnectorConfigVariables.ENABLE_METRICS.toString()),
props.getProperty(ClickHouseSinkConnectorConfigVariables.METRICS_ENDPOINT_PORT.toString()));
+
// Start debezium event loop if its requested from REST API.
if(!config.getBoolean(ClickHouseSinkConnectorConfigVariables.SKIP_REPLICA_START.toString()) || forceStart) {
this.setupProcessingThread(config);
diff --git a/sink-connector-lightweight/src/main/java/com/altinity/clickhouse/debezium/embedded/config/ColumnOverrideParser.java b/sink-connector-lightweight/src/main/java/com/altinity/clickhouse/debezium/embedded/config/ColumnOverrideParser.java
new file mode 100644
index 000000000..0c7a14e50
--- /dev/null
+++ b/sink-connector-lightweight/src/main/java/com/altinity/clickhouse/debezium/embedded/config/ColumnOverrideParser.java
@@ -0,0 +1,57 @@
+package com.altinity.clickhouse.debezium.embedded.config;
+
+import com.altinity.clickhouse.sink.connector.ClickHouseSinkConnectorConfigVariables;
+import org.yaml.snakeyaml.Yaml;
+
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.LinkedHashMap;
+import com.clickhouse.data.ClickHouseDataType;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+public class ColumnOverrideParser {
+
+ private static final Logger log = LogManager.getLogger(ColumnOverrideParser.class);
+ public static Map parseColumnOverrides(String yamlFile) throws FileNotFoundException {
+
+
+ Yaml yaml = new Yaml();
+ FileInputStream inputStream = new FileInputStream(yamlFile);
+
+ Map data = yaml.load(inputStream);
+
+ Object result = data.get(ClickHouseSinkConnectorConfigVariables.DEFAULT_COLUMN_DATATYPE_MAPPING.toString());
+
+
+ // if result is instance of LinkedHashMap , then cast it to LinkedHashMap
+ if (result instanceof LinkedHashMap) {
+ result = (LinkedHashMap) result;
+ }
+ else {
+ return new HashMap<>();
+ }
+ // Iterate through the map and convert values to ClickHouse data types
+ Map columnOverrides = new HashMap<>();
+ for (Map.Entry entry : ((Map) result).entrySet()) {
+ String key = entry.getKey();
+ Object value = entry.getValue();
+
+ // Match to ClickHouseDataType
+ ClickHouseDataType clickHouseDataType = ClickHouseDataType.valueOf(value.toString());
+
+ // if clickhouseDataType is null, then log an error.
+ if(clickHouseDataType == null) {
+ log.error("*********** Invalid ClickHouse data type passed by user in yaml file for column override:******** " + value.toString());
+ }
+ columnOverrides.put(key, clickHouseDataType.toString());
+ }
+
+ return columnOverrides;
+ }
+
+
+
+}
diff --git a/sink-connector-lightweight/src/main/java/com/altinity/clickhouse/debezium/embedded/config/ConfigLoader.java b/sink-connector-lightweight/src/main/java/com/altinity/clickhouse/debezium/embedded/config/ConfigLoader.java
index 07f4db0ab..2b6163996 100644
--- a/sink-connector-lightweight/src/main/java/com/altinity/clickhouse/debezium/embedded/config/ConfigLoader.java
+++ b/sink-connector-lightweight/src/main/java/com/altinity/clickhouse/debezium/embedded/config/ConfigLoader.java
@@ -1,15 +1,20 @@
package com.altinity.clickhouse.debezium.embedded.config;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
import org.yaml.snakeyaml.Yaml;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.InputStream;
+import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Properties;
public class ConfigLoader {
+ private static final Logger log = LogManager.getLogger(ConfigLoader.class);
+
public Properties load(String resourceFileName) {
InputStream fis = this.getClass()
.getClassLoader()
@@ -24,8 +29,27 @@ public Properties load(String resourceFileName) {
if(entry.getValue() instanceof Integer) {
props.setProperty(entry.getKey(), Integer.toString((Integer) entry.getValue()));
} else {
- String value = (String) entry.getValue();
- props.setProperty(entry.getKey(), value.replace("\"", ""));
+ Object entryValue = entry.getValue();
+ // Check if value is an instance of String.
+ if (entryValue instanceof String) {
+ entryValue = (String) entryValue;
+ }
+ else {
+ // Additional
+ log.info("entryValue is not a String");
+ if (entryValue instanceof LinkedHashMap) {
+ // iterate through the map and add the properties to the props.
+ for (Map.Entry mapEntry : ((LinkedHashMap) entryValue).entrySet()) {
+ // prfix the key with the entry key.
+ String key = entry.getKey() + "." + mapEntry.getKey();
+ props.setProperty(key, mapEntry.getValue().toString());
+ }
+ }
+
+ }
+ if (entryValue instanceof String) {
+ props.setProperty(entry.getKey(), ((String) entryValue).replace("\"", ""));
+ }
}
}
diff --git a/sink-connector-lightweight/src/main/java/com/altinity/clickhouse/debezium/embedded/parser/DataTypeConverter.java b/sink-connector-lightweight/src/main/java/com/altinity/clickhouse/debezium/embedded/parser/DataTypeConverter.java
index d07008ba3..aa061eade 100644
--- a/sink-connector-lightweight/src/main/java/com/altinity/clickhouse/debezium/embedded/parser/DataTypeConverter.java
+++ b/sink-connector-lightweight/src/main/java/com/altinity/clickhouse/debezium/embedded/parser/DataTypeConverter.java
@@ -16,7 +16,6 @@
import io.debezium.jdbc.JdbcValueConverters;
import io.debezium.jdbc.TemporalPrecisionMode;
import io.debezium.relational.Column;
-import io.debezium.relational.RelationalDatabaseConnectorConfig;
import io.debezium.relational.ddl.DataType;
import io.debezium.service.DefaultServiceRegistry;
import io.debezium.service.spi.ServiceRegistry;
@@ -25,7 +24,6 @@
import java.sql.Types;
import java.time.ZoneId;
import java.util.Arrays;
-import java.util.Map;
/**
*
diff --git a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/config/ColumnOverrideParserTest.java b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/config/ColumnOverrideParserTest.java
new file mode 100644
index 000000000..78f39d07c
--- /dev/null
+++ b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/config/ColumnOverrideParserTest.java
@@ -0,0 +1,21 @@
+package com.altinity.clickhouse.debezium.embedded.config;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Assertions;
+
+import java.util.Map;
+import java.io.FileNotFoundException;
+
+public class ColumnOverrideParserTest {
+
+ @Test
+ public void testParseColumnOverrides() {
+ String yamlFile = "src/test/resources/config.yml";
+ try {
+ Map result = ColumnOverrideParser.parseColumnOverrides(yamlFile);
+ Assertions.assertEquals(result.size(), 7);
+ } catch (FileNotFoundException e) {
+ e.printStackTrace();
+ }
+ }
+}
diff --git a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/config/ConfigLoaderTest.java b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/config/ConfigLoaderTest.java
index ec25b1a40..8ac84f35f 100644
--- a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/config/ConfigLoaderTest.java
+++ b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/config/ConfigLoaderTest.java
@@ -4,6 +4,8 @@
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Test;
+import com.altinity.clickhouse.sink.connector.ClickHouseSinkConnectorConfigVariables;
+
import java.util.Properties;
public class ConfigLoaderTest {
@@ -16,4 +18,24 @@ public void testLoad() {
Assertions.assertNotNull(props);
}
+
+
+ @Test
+ @DisplayName("Unit test to validate loading of nested entries in config.yml")
+ public void testLoadNestedEntries() {
+ ConfigLoader loader = new ConfigLoader();
+ Properties props = loader.load("config.yml");
+
+ int defaultColumnDataTypeMappingCount = 0;
+ // iterate through the properties and check if the nested entries are loaded correctly
+ // the nested entries have the prefix ClickHouseSinkConnectorConfigVariables.DEFAULT_COLUMN_DATATYPE_MAPPING
+ for (Object key : props.keySet()) {
+ if (key.toString().startsWith(ClickHouseSinkConnectorConfigVariables.DEFAULT_COLUMN_DATATYPE_MAPPING.toString())) {
+ Assertions.assertNotNull(props.getProperty(key.toString()));
+ defaultColumnDataTypeMappingCount++;
+ }
+ }
+
+ Assertions.assertEquals(defaultColumnDataTypeMappingCount, 7);
+ }
}
diff --git a/sink-connector-lightweight/src/test/resources/config.yml b/sink-connector-lightweight/src/test/resources/config.yml
index e39ce2d93..bd2c9f61c 100644
--- a/sink-connector-lightweight/src/test/resources/config.yml
+++ b/sink-connector-lightweight/src/test/resources/config.yml
@@ -30,4 +30,20 @@ schema.history.internal.jdbc.schema.history.table.name: "altinity_sink_connector
enable.snapshot.ddl: "true"
auto.create.tables: "true"
metrics.enable: "false"
-database.connectionTimeZone: "America/Chicago"
\ No newline at end of file
+database.connectionTimeZone: "America/Chicago"
+default_column_datatype_mapping:
+ # we are no longer turning Date/DateTime/Timestamp as a String
+ transaction_id: String
+ exchange_transaction_id: String
+ unique_transaction_id: String
+ account_ref: String
+ otm_identifier: String
+ tag_reserved_4: String
+ initiator: String
+databases:
+ dbo:
+ tables:
+ tr_live:
+ partition_by: tr_date_id
+ primary_key: gmt_time
+ settings: allow_nullable_key=1
\ No newline at end of file
diff --git a/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/ClickHouseSinkConnectorConfigVariables.java b/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/ClickHouseSinkConnectorConfigVariables.java
index b4fddc77c..11b2174de 100644
--- a/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/ClickHouseSinkConnectorConfigVariables.java
+++ b/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/ClickHouseSinkConnectorConfigVariables.java
@@ -3,6 +3,7 @@
public enum ClickHouseSinkConnectorConfigVariables {
+ DEFAULT_COLUMN_DATATYPE_MAPPING("default_column_datatype_mapping"),
IGNORE_DELETE("ignore_delete"),
THREAD_POOL_SIZE("thread.pool.size"),
BUFFER_COUNT("buffer.count"),
diff --git a/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/converters/ClickHouseDataTypeMapper.java b/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/converters/ClickHouseDataTypeMapper.java
index 415895659..ae92a2baf 100644
--- a/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/converters/ClickHouseDataTypeMapper.java
+++ b/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/converters/ClickHouseDataTypeMapper.java
@@ -20,7 +20,6 @@
import org.apache.kafka.connect.data.Struct;
import org.locationtech.jts.geom.Coordinate;
-import org.locationtech.jts.geom.LinearRing;
import org.locationtech.jts.geom.Polygon;
import org.locationtech.jts.io.ParseException;
import org.locationtech.jts.io.WKBReader;