Skip to content

Commit

Permalink
Merge pull request #701 from Altinity/2.2.1
Browse files Browse the repository at this point in the history
2.2.1
  • Loading branch information
subkanthi authored Jul 29, 2024
2 parents 7c814b6 + 49b83a7 commit ebaf3d6
Show file tree
Hide file tree
Showing 13 changed files with 172 additions and 12 deletions.
22 changes: 22 additions & 0 deletions release-notes/2.2.1.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
## What's Changed
* Update README.md by @subkanthi in https://github.com/Altinity/clickhouse-sink-connector/pull/687
* Added log4j2.xml by @subkanthi in https://github.com/Altinity/clickhouse-sink-connector/pull/688
* Update logging.md by @subkanthi in https://github.com/Altinity/clickhouse-sink-connector/pull/690
* Update logging.md by @subkanthi in https://github.com/Altinity/clickhouse-sink-connector/pull/691
* Update quickstart_kafka.md by @subkanthi in https://github.com/Altinity/clickhouse-sink-connector/pull/696
* Create incremental_snapshot.md- Document steps to add newer tables. by @subkanthi in https://github.com/Altinity/clickhouse-sink-connector/pull/697
* Update README.md to include incremental snapshot document by @subkanthi in https://github.com/Altinity/clickhouse-sink-connector/pull/698
* Update incremental_snapshot.md - fix typo by @subkanthi in https://github.com/Altinity/clickhouse-sink-connector/pull/699
* Added config with keepermap storage and removed FINAL in CREATE VIEW … by @subkanthi in https://github.com/Altinity/clickhouse-sink-connector/pull/695
* Added ClickHouse trademark by @hodgesrm in https://github.com/Altinity/clickhouse-sink-connector/pull/702
* Update incremental_snapshot.md by @subkanthi in https://github.com/Altinity/clickhouse-sink-connector/pull/706
* Add rrmt check kafka by @Selfeer in https://github.com/Altinity/clickhouse-sink-connector/pull/707
* Update incremental_snapshot.md to include schema changes by @subkanthi in https://github.com/Altinity/clickhouse-sink-connector/pull/709
* Snapshot out of memory documentation by @subkanthi in https://github.com/Altinity/clickhouse-sink-connector/pull/710
* Update production_setup.md for incremental snapshot size by @subkanthi in https://github.com/Altinity/clickhouse-sink-connector/pull/711
* 713 numeric datatype in postgres mapped to decimal102 in clickhouse by @subkanthi in https://github.com/Altinity/clickhouse-sink-connector/pull/715
* Ignore retrieval of scale/precision for enum data types by @subkanthi in https://github.com/Altinity/clickhouse-sink-connector/pull/693
* Added logic to support ALTER TABLE with ALGORITHM by @subkanthi in https://github.com/Altinity/clickhouse-sink-connector/pull/719


**Full Changelog**: https://github.com/Altinity/clickhouse-sink-connector/compare/2.2.0...2.2.1
1 change: 1 addition & 0 deletions sink-connector-lightweight/clickhouse/config.xml
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,5 @@
<replica>clickhouse</replica>
<shard>02</shard>
</macros>
<keeper_map_path_prefix>/keeper_map_tables</keeper_map_path_prefix>
</clickhouse>
52 changes: 52 additions & 0 deletions sink-connector-lightweight/docker/config_keepermap_storage.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
name: "company-1"
database.hostname: "mysql-master"
database.port: "3306"
database.user: "root"
database.password: "root"
database.server.id: "12345"
database.server.name: "ER54"
database.include.list: test
table.include.list: ""
clickhouse.server.url: "clickhouse"
clickhouse.server.user: "root"
clickhouse.server.password: "root"
clickhouse.server.port: "8123"
database.allowPublicKeyRetrieval: "true"
snapshot.mode: "initial"
offset.flush.interval.ms: 5000
connector.class: "io.debezium.connector.mysql.MySqlConnector"
offset.storage: "io.debezium.storage.jdbc.offset.JdbcOffsetBackingStore"
offset.storage.jdbc.offset.table.name: "altinity_sink_connector.replica_source_info"
offset.storage.jdbc.url: "jdbc:clickhouse://clickhouse:8123/altinity_sink_connector"
offset.storage.jdbc.user: "root"
offset.storage.jdbc.password: "root"
offset.storage.jdbc.offset.table.ddl: "CREATE TABLE if not exists %s on cluster '{cluster}'
(
id String,
offset_key String,
offset_val String,
record_insert_ts DateTime,
record_insert_seq UInt64,
) ENGINE = KeeperMap('/asc_offsets201',10)
PRIMARY KEY offset_key"
offset.storage.jdbc.offset.table.delete: "select 1"
schema.history.internal: "io.debezium.storage.jdbc.history.JdbcSchemaHistory"
schema.history.internal.jdbc.url: "jdbc:clickhouse://clickhouse:8123/altinity_sink_connector"
schema.history.internal.jdbc.user: "root"
schema.history.internal.jdbc.password: "root"
schema.history.internal.jdbc.schema.history.table.ddl: "CREATE TABLE if not exists %s on cluster '{cluster}'
(
id FixedString(36),
history_data String,
history_data_seq UInt32,
record_insert_ts DateTime,
record_insert_seq UInt32
) ENGINE=ReplicatedReplacingMergeTree(record_insert_seq)
order by id"
schema.history.internal.jdbc.schema.history.table.name: "altinity_sink_connector.replicate_schema_history"
enable.snapshot.ddl: "true"
persist.raw.bytes: "false"
auto.create.tables: "true"
database.connectionTimeZone: "UTC"
restart.event.loop: "true"
restart.event.loop.timeout.period.secs: "3000"
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import io.debezium.ddl.parser.mysql.generated.MySqlParser;
import io.debezium.ddl.parser.mysql.generated.MySqlParser.AlterByAddColumnContext;
import io.debezium.ddl.parser.mysql.generated.MySqlParser.TableNameContext;
import io.debezium.relational.ddl.DataType;
import org.antlr.v4.runtime.tree.ParseTree;
import org.antlr.v4.runtime.tree.TerminalNodeImpl;
import org.antlr.v4.runtime.ParserRuleContext;
Expand Down Expand Up @@ -329,8 +330,13 @@ private String getClickHouseDataType(String parsedDataType, ParseTree colDefTree

String chDataType = null;
MySqlParser.DataTypeContext dtc = ((MySqlParser.ColumnDefinitionContext) colDefTree).dataType();
DataType dt = DataTypeConverter.getDataType(dtc);

if(parsedDataType.contains("(") && parsedDataType.contains(")") && parsedDataType.contains(",")) {
if(dt.name().equalsIgnoreCase("ENUM"))
{
// Dont try to get precision/scale for enums
}
else if(parsedDataType.contains("(") && parsedDataType.contains(")") && parsedDataType.contains(",") ) {
try {
precision = Integer.parseInt(parsedDataType.substring(parsedDataType.indexOf("(") + 1, parsedDataType.indexOf(",")));
scale = Integer.parseInt(parsedDataType.substring(parsedDataType.indexOf(",") + 1, parsedDataType.indexOf(")")));
Expand Down Expand Up @@ -559,7 +565,16 @@ public void enterAlterTable(MySqlParser.AlterTableContext alterTableContext) {
parseAlterTable(tree);
} else if (tree instanceof MySqlParser.AlterByAddIndexContext) {
parseAddIndex(tree);
} else if (tree instanceof TerminalNodeImpl) {
} else if(tree instanceof MySqlParser.AlterBySetAlgorithmContext) {
log.info("INSTANT ALGORITHM not supported in ClickHouse");
// Remove any terminating commas and break out of the parser loop.
// If the last character was comma.
if(this.query.charAt(this.query.length() - 1) == ',')
this.query.deleteCharAt(this.query.length() - 1);

break;
}
else if (tree instanceof TerminalNodeImpl) {
if (((TerminalNodeImpl) tree).symbol.getType() == MySqlParser.COMMA) {
this.query.append(",");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,11 @@ public static ClickHouseDataType convert(String columnName, MySqlParser.DataType
return ClickHouseDataTypeMapper.getClickHouseDataType(schemaBuilder.schema().type(), schemaBuilder.schema().name());
}

public static DataType getDataType(MySqlParser.DataTypeContext columnDefChild) {
String convertedDataType = null;
return initializeDataTypeResolver().resolveDataType(columnDefChild);
}

public static String convertToString(String columnName, int scale, int precision, MySqlParser.DataTypeContext columnDefChild, ZoneId userProvidedTimeZone) {
MySqlValueConverters mysqlConverter = new MySqlValueConverters(
JdbcValueConverters.DecimalMode.PRECISE,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,4 +21,4 @@ auto.create.tables= false
replacingmergetree.delete.column=_sign
metrics.port= 8083
snapshot.mode= "initial"
replica.status.view="CREATE VIEW IF NOT EXISTS %s.show_replica_status AS SELECT now() - fromUnixTimestamp(JSONExtractUInt(offset_val, 'ts_sec')) AS seconds_behind_source, toDateTime(fromUnixTimestamp(JSONExtractUInt(offset_val, 'ts_sec')), 'UTC') AS utc_time, fromUnixTimestamp(JSONExtractUInt(offset_val, 'ts_sec')) AS local_time FROM %s FINAL"
replica.status.view="CREATE VIEW IF NOT EXISTS %s.show_replica_status AS SELECT now() - fromUnixTimestamp(JSONExtractUInt(offset_val, 'ts_sec')) AS seconds_behind_source, toDateTime(fromUnixTimestamp(JSONExtractUInt(offset_val, 'ts_sec')), 'UTC') AS utc_time, fromUnixTimestamp(JSONExtractUInt(offset_val, 'ts_sec')) AS local_time FROM %s settings final=1"
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ static public Properties getDebeziumPropertiesForSchemaOnly(MySQLContainer mySql
props.replace("snapshot.mode", "schema_only");
props.replace("disable.drop.truncate", "true");
props.setProperty("disable.ddl", "true");

props.setProperty("replica.status.view", "CREATE VIEW IF NOT EXISTS %s.show_replica_status AS SELECT now() - fromUnixTimestamp(JSONExtractUInt(offset_val, 'ts_sec')) AS seconds_behind_source, toDateTime(fromUnixTimestamp(JSONExtractUInt(offset_val, 'ts_sec')), 'UTC') AS utc_time, fromUnixTimestamp(JSONExtractUInt(offset_val, 'ts_sec')) AS local_time FROM %s settings final=1");
return props;
}
}
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
package com.altinity.clickhouse.debezium.embedded;

import com.altinity.clickhouse.debezium.embedded.cdc.DebeziumChangeEventCapture;
import com.altinity.clickhouse.debezium.embedded.cdc.DebeziumOffsetStorage;
import com.altinity.clickhouse.debezium.embedded.ddl.parser.MySQLDDLParserService;
import com.altinity.clickhouse.debezium.embedded.parser.SourceRecordParserService;
import com.altinity.clickhouse.sink.connector.ClickHouseSinkConnectorConfig;
import com.altinity.clickhouse.sink.connector.db.BaseDbWriter;
import com.clickhouse.jdbc.ClickHouseConnection;
import org.junit.Assert;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Test;
import org.testcontainers.Testcontainers;
Expand Down Expand Up @@ -60,7 +60,7 @@ public Properties getProperties() throws Exception {
properties.put("slot.max.retries", "6");
properties.put("slot.retry.delay.ms", "5000");
properties.put("database.allowPublicKeyRetrieval", "true");
properties.put("table.include.list", "public.tm,public.tm2");
properties.put("table.include.list", "public.tm,public.tm2,public.redata");

return properties;
}
Expand Down Expand Up @@ -113,8 +113,22 @@ public void testDecoderBufsPlugin() throws Exception {
tmCount = chRs.getInt(1);
}

// Get the columns in re_data.
Map<String, String> reDataColumns = writer.getColumnsDataTypesForTable("redata");

Assert.assertTrue(reDataColumns.get("amount").equalsIgnoreCase("Decimal(64, 18)"));
Assert.assertTrue(reDataColumns.get("total_amount").equalsIgnoreCase("Decimal(21, 5)"));
Assert.assertTrue(tmCount == 2);

String offsetValue = new DebeziumOffsetStorage().getDebeziumStorageStatusQuery(getProperties(), writer);

// Parse offsetvalue json and check the keys
Assert.assertTrue(offsetValue.contains("last_snapshot_record"));
Assert.assertTrue(offsetValue.contains("lsn"));
Assert.assertTrue(offsetValue.contains("txId"));
Assert.assertTrue(offsetValue.contains("ts_usec"));
Assert.assertTrue(offsetValue.contains("snapshot"));

if(engine.get() != null) {
engine.get().stop();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,21 @@ public void testAlterDatabaseAddColumn() {
Assert.assertTrue(clickHouseQuery.toString().equalsIgnoreCase(clickhouseExpectedQuery));
}

@Test
public void testAlterAddColumnWithColumnKeyword() {

String alterDBAddColumn = "alter table db1.table1 add entity varchar(255) , ALGORITHM=INPLACE, LOCK=NONE";
String clickhouseExpectedQuery = "ALTER TABLE db1.table1 ADD COLUMN entity Nullable(String)";
StringBuffer clickHouseQuery = new StringBuffer();

mySQLDDLParserService.parseSql(alterDBAddColumn, "employees", clickHouseQuery);

log.info("CLICKHOUSE QUERY" + clickHouseQuery);

Assert.assertTrue(clickHouseQuery != null && clickHouseQuery.length() != 0);
Assert.assertTrue(clickHouseQuery.toString().equalsIgnoreCase(clickhouseExpectedQuery));
}

@Test
public void testAlterDatabaseAddColumnNullable() {

Expand All @@ -309,7 +324,7 @@ public void testAlterDatabaseAddColumnNullable() {
// Before, After
@Test
public void testAlterDatabaseAddMultipleColumns1() {
String expectedClickHouseQuery = "ALTER TABLE employees.employees ADD COLUMN ship_spec Nullable(String) first, ADD COLUMN somecol Nullable(Int32) after start_build,";
String expectedClickHouseQuery = "ALTER TABLE employees.employees ADD COLUMN ship_spec Nullable(String) first, ADD COLUMN somecol Nullable(Int32) after start_build";
StringBuffer clickHouseQuery = new StringBuffer();
String query = "alter table employees.employees add column ship_spec varchar(150) first, add somecol int after start_build, algorithm=instant;";
mySQLDDLParserService.parseSql(query, "employees", clickHouseQuery);
Expand Down
29 changes: 28 additions & 1 deletion sink-connector-lightweight/src/test/resources/init_postgres.sql
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,33 @@ INSERT INTO protocol_test VALUES ('1778430', '12041969', 'Henry VII stocks Skin
INSERT INTO protocol_test VALUES ('1778431', '1228695', 'Mary I silver Visiting 6041 Caesar Street cameras judge', '1970-11-07T02:49:21.977Z');
INSERT INTO protocol_test VALUES ('1778432', '21481203', 'Edward V prisoners Peterson Pte. Ltd 2577 Limebank Circle span religious', '1975-02-02T01:13:06.152Z');

CREATE TABLE "redata" (
"id" BIGSERIAL PRIMARY KEY NOT NULL,
"uid" BIGINT NOT NULL,
"e_type" VARCHAR(255) NOT NULL,
"e_data" JSONB NOT NULL,
"r_data" JSONB NULL,
"amount" numeric NOT NULL,
"total_amount" numeric(21,5) NOT NULL,
"e_time" TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT CURRENT_TIMESTAMP,
"u_time" TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT CURRENT_TIMESTAMP,
"c_time" TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT CURRENT_TIMESTAMP
);

INSERT INTO "redata" ("uid", "e_type", "e_data", "r_data", "amount", "total_amount", "e_time", "u_time", "c_time")
VALUES (
123456, -- example value for uid
'example_type', -- example value for e_type
'{"key": "value"}'::jsonb, -- example value for e_data
'{"key": "optional_value"}'::jsonb, -- example value for r_data (can be NULL)
12.2222,
122.22222,
'2024-07-24 12:34:56+00', -- example value for e_time
'2024-07-24 12:34:56+00', -- example value for u_time
'2024-07-24 12:34:56+00'
);


--
--CREATE TABLE test
--(
Expand All @@ -137,4 +164,4 @@ INSERT INTO protocol_test VALUES ('1778432', '21481203', 'Edward V prisoners Pe

create schema public2;
set schema 'public2';
CREATE TABLE "tm2" (id uuid DEFAULT gen_random_uuid() NOT NULL PRIMARY KEY, secid uuid, acc_id uuid);
CREATE TABLE "tm2" (id uuid DEFAULT gen_random_uuid() NOT NULL PRIMARY KEY, secid uuid, acc_id uuid);
4 changes: 2 additions & 2 deletions sink-connector/deploy/docker/docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,6 @@ services:
- JMXHOST=localhost
- JMXPORT=1976
- KAFKA_HEAP_OPTS=-Xms1G -Xmx5G
#- LOG_LEVEL=DEBUG
depends_on:
- kafka

Expand Down Expand Up @@ -115,7 +114,8 @@ services:
- KAFKA_DEBUG=true
- JMX_PORT=39999
- KAFKA_HEAP_OPTS=-Xms1G -Xmx5G
#- LOG_LEVEL=DEBUG
- log4j2.configurationFile=log4j2.xml
#- log4j2Debug=true
depends_on:
- kafka

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import com.altinity.clickhouse.sink.connector.converters.ClickHouseDataTypeMapper;
import com.clickhouse.data.ClickHouseDataType;
import com.clickhouse.jdbc.ClickHouseConnection;
import io.debezium.data.VariableScaleDecimal;
import io.debezium.time.MicroTimestamp;
import io.debezium.time.Timestamp;
import io.debezium.time.ZonedTimestamp;
Expand Down Expand Up @@ -55,11 +56,19 @@ public Map<String, String> getColumnNameToCHDataTypeMapping(Field[] fields) {
if(dataType != null) {
if(dataType == ClickHouseDataType.Decimal) {
//Get Scale, precision from parameters.

Map<String, String> params = f.schema().parameters();

//postgres numeric data type has no scale/precision.
if(schemaName.equalsIgnoreCase(VariableScaleDecimal.LOGICAL_NAME)){
columnToDataTypesMap.put(colName, "Decimal(64,18)");
continue;
}

if(params != null && params.containsKey(SCALE) && params.containsKey(PRECISION)) {
columnToDataTypesMap.put(colName, "Decimal(" + params.get(PRECISION) + "," + params.get(SCALE) + ")");
} else {
}
else {
columnToDataTypesMap.put(colName, "Decimal(10,2)");
}
} else if(dataType == ClickHouseDataType.DateTime64){
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -304,7 +304,7 @@ private boolean processRecordsByTopic(String topicName, List<ClickHouseStruct> r


if(writer == null || writer.wasTableMetaDataRetrieved() == false) {
log.error(String.format("*** TABLE METADATA not retrieved for Database(%), table(%s) retrying",
log.error(String.format("*** TABLE METADATA not retrieved for Database(%s), table(%s) retrying",
writer.getDatabaseName(), writer.getTableName()));
if(writer == null) {
writer = getDbWriterForTable(topicName, tableName, databaseName, firstRecord, databaseConn);
Expand Down

0 comments on commit ebaf3d6

Please sign in to comment.