Skip to content

Commit

Permalink
[FLINK-35615][base] Add a workaround for timestamp binary en/decoding…
Browse files Browse the repository at this point in the history
… failure with precisions mismatch
  • Loading branch information
yuxiqian committed Jul 1, 2024
1 parent 0723009 commit e70b80d
Show file tree
Hide file tree
Showing 7 changed files with 139 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,8 @@ public static LocalZonedTimestampData fromInstant(Instant instant) {
* milliseconds.
*/
public static boolean isCompact(int precision) {
return precision <= 3;
// We don't use compact mode to store any timestamps for now. See TimestampData#isCompact
// for more details.
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,11 @@ public static TimestampData fromTimestamp(Timestamp timestamp) {
* Returns whether the timestamp data is small enough to be stored in a long of milliseconds.
*/
public static boolean isCompact(int precision) {
return precision <= 3;
// We don't use compact mode to store any timestamps for now since currently MySQL source
// could not correctly infer timestamp precision from Debezium records, and precision
// mismatch could cause downstream deserialization failure.
// By enforcing the non-compaction mode, we could ensure timestamp data with any precision
// could be correctly parsed.
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,8 @@ public static ZonedTimestampData of(long millisecond, int nanoOfMillisecond, Str
* Returns whether the date-time part is small enough to be stored in a long of milliseconds.
*/
public static boolean isCompact(int precision) {
return precision <= 3;
// We don't use compact mode to store any timestamps for now. See TimestampData#isCompact
// for more details.
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@
package org.apache.flink.cdc.composer.flink;

import org.apache.flink.cdc.common.configuration.Configuration;
import org.apache.flink.cdc.common.data.LocalZonedTimestampData;
import org.apache.flink.cdc.common.data.TimestampData;
import org.apache.flink.cdc.common.data.ZonedTimestampData;
import org.apache.flink.cdc.common.data.binary.BinaryStringData;
import org.apache.flink.cdc.common.event.AddColumnEvent;
import org.apache.flink.cdc.common.event.CreateTableEvent;
Expand Down Expand Up @@ -46,6 +49,7 @@
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.test.junit5.MiniClusterExtension;

import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
Expand Down Expand Up @@ -657,4 +661,118 @@ void testMergingWithRoute() throws Exception {
"DataChangeEvent{tableId=default_namespace.default_schema.merged, before=[], after=[5, null, 24, null, Eliza, null], op=INSERT, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.merged, before=[], after=[6, Frank, 30, student, null, male], op=INSERT, meta=()}");
}

@Test
void testTimestampWithPrecision() throws Exception {
FlinkPipelineComposer composer = FlinkPipelineComposer.ofMiniCluster();

// Setup value source
Configuration sourceConfig = new Configuration();
sourceConfig.set(
ValuesDataSourceOptions.EVENT_SET_ID,
ValuesDataSourceHelper.EventSetId.CUSTOM_SOURCE_EVENTS);

TableId sourceTable =
TableId.tableId("default_namespace", "default_schema", "timestamp_table");
Schema sourceTableSchema =
Schema.newBuilder()
.physicalColumn("id", DataTypes.INT())
.physicalColumn("ts0", DataTypes.TIMESTAMP(0))
.physicalColumn("ts1", DataTypes.TIMESTAMP(1))
.physicalColumn("ts2", DataTypes.TIMESTAMP(2))
.physicalColumn("ts3", DataTypes.TIMESTAMP(3))
.physicalColumn("ts4", DataTypes.TIMESTAMP(4))
.physicalColumn("ts5", DataTypes.TIMESTAMP(5))
.physicalColumn("ts6", DataTypes.TIMESTAMP(6))
.physicalColumn("ts_tz0", DataTypes.TIMESTAMP_TZ(0))
.physicalColumn("ts_tz1", DataTypes.TIMESTAMP_TZ(1))
.physicalColumn("ts_tz2", DataTypes.TIMESTAMP_TZ(2))
.physicalColumn("ts_tz3", DataTypes.TIMESTAMP_TZ(3))
.physicalColumn("ts_tz4", DataTypes.TIMESTAMP_TZ(4))
.physicalColumn("ts_tz5", DataTypes.TIMESTAMP_TZ(5))
.physicalColumn("ts_tz6", DataTypes.TIMESTAMP_TZ(6))
.physicalColumn("ts_ltz0", DataTypes.TIMESTAMP_LTZ(0))
.physicalColumn("ts_ltz1", DataTypes.TIMESTAMP_LTZ(1))
.physicalColumn("ts_ltz2", DataTypes.TIMESTAMP_LTZ(2))
.physicalColumn("ts_ltz3", DataTypes.TIMESTAMP_LTZ(3))
.physicalColumn("ts_ltz4", DataTypes.TIMESTAMP_LTZ(4))
.physicalColumn("ts_ltz5", DataTypes.TIMESTAMP_LTZ(5))
.physicalColumn("ts_ltz6", DataTypes.TIMESTAMP_LTZ(6))
.primaryKey("id")
.build();
List<Event> events = new ArrayList<>();
BinaryRecordDataGenerator sourceTableDataGenerator =
new BinaryRecordDataGenerator(
sourceTableSchema.getColumnDataTypes().toArray(new DataType[0]));

TimestampData timestampData = TimestampData.fromMillis(17_000_000_000L, 0);
ZonedTimestampData zonedTimestampData = ZonedTimestampData.of(17_000_000_000L, 0, "UTC");
LocalZonedTimestampData localZonedTimestampData =
LocalZonedTimestampData.fromEpochMillis(17_000_000_000L, 0);

events.add(new CreateTableEvent(sourceTable, sourceTableSchema));
events.add(
DataChangeEvent.insertEvent(
sourceTable,
sourceTableDataGenerator.generate(
new Object[] {
1,
timestampData,
timestampData,
timestampData,
timestampData,
timestampData,
timestampData,
timestampData,
zonedTimestampData,
zonedTimestampData,
zonedTimestampData,
zonedTimestampData,
zonedTimestampData,
zonedTimestampData,
zonedTimestampData,
localZonedTimestampData,
localZonedTimestampData,
localZonedTimestampData,
localZonedTimestampData,
localZonedTimestampData,
localZonedTimestampData,
localZonedTimestampData
})));

ValuesDataSourceHelper.setSourceEvents(Collections.singletonList(events));

SourceDef sourceDef =
new SourceDef(ValuesDataFactory.IDENTIFIER, "Value Source", sourceConfig);

// Setup value sink
Configuration sinkConfig = new Configuration();
sinkConfig.set(ValuesDataSinkOptions.MATERIALIZED_IN_MEMORY, true);
SinkDef sinkDef = new SinkDef(ValuesDataFactory.IDENTIFIER, "Value Sink", sinkConfig);

// Setup pipeline
Configuration pipelineConfig = new Configuration();
pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1);
PipelineDef pipelineDef =
new PipelineDef(
sourceDef,
sinkDef,
Collections.emptyList(),
Collections.emptyList(),
pipelineConfig);

// Execute the pipeline
PipelineExecution execution = composer.compose(pipelineDef);
execution.execute();
Schema sinkTableSchema = ValuesDatabase.getTableSchema(sourceTable);
Assertions.assertThat(sinkTableSchema).isEqualTo(sourceTableSchema);
String[] outputEvents = outCaptor.toString().trim().split("\n");

Assertions.assertThat(outputEvents)
.isEqualTo(
new String[] {
"CreateTableEvent{tableId=default_namespace.default_schema.timestamp_table, schema=columns={`id` INT,`ts0` TIMESTAMP(0),`ts1` TIMESTAMP(1),`ts2` TIMESTAMP(2),`ts3` TIMESTAMP(3),`ts4` TIMESTAMP(4),`ts5` TIMESTAMP(5),`ts6` TIMESTAMP(6),`ts_tz0` TIMESTAMP(0) WITH TIME ZONE,`ts_tz1` TIMESTAMP(1) WITH TIME ZONE,`ts_tz2` TIMESTAMP(2) WITH TIME ZONE,`ts_tz3` TIMESTAMP(3) WITH TIME ZONE,`ts_tz4` TIMESTAMP(4) WITH TIME ZONE,`ts_tz5` TIMESTAMP(5) WITH TIME ZONE,`ts_tz6` TIMESTAMP(6) WITH TIME ZONE,`ts_ltz0` TIMESTAMP_LTZ(0),`ts_ltz1` TIMESTAMP_LTZ(1),`ts_ltz2` TIMESTAMP_LTZ(2),`ts_ltz3` TIMESTAMP_LTZ(3),`ts_ltz4` TIMESTAMP_LTZ(4),`ts_ltz5` TIMESTAMP_LTZ(5),`ts_ltz6` TIMESTAMP_LTZ(6)}, primaryKeys=id, options=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.timestamp_table, before=[], after=[1, 1970-07-16T18:13:20, 1970-07-16T18:13:20, 1970-07-16T18:13:20, 1970-07-16T18:13:20, 1970-07-16T18:13:20, 1970-07-16T18:13:20, 1970-07-16T18:13:20, 1970-07-16T18:13:20Z, 1970-07-16T18:13:20Z, 1970-07-16T18:13:20Z, 1970-07-16T18:13:20Z, 1970-07-16T18:13:20Z, 1970-07-16T18:13:20Z, 1970-07-16T18:13:20Z, 1970-07-16T18:13:20, 1970-07-16T18:13:20, 1970-07-16T18:13:20, 1970-07-16T18:13:20, 1970-07-16T18:13:20, 1970-07-16T18:13:20, 1970-07-16T18:13:20], op=INSERT, meta=()}"
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import org.apache.flink.cdc.common.data.LocalZonedTimestampData;
import org.apache.flink.cdc.runtime.serializer.SerializerTestBase;

import static org.apache.flink.cdc.common.data.LocalZonedTimestampData.isCompact;

/** A test for the {@link LocalZonedTimestampDataSerializer}. */
abstract class LocalZonedTimestampDataSerializerTest
extends SerializerTestBase<LocalZonedTimestampData> {
Expand All @@ -31,7 +33,7 @@ protected TypeSerializer<LocalZonedTimestampData> createSerializer() {

@Override
protected int getLength() {
return (getPrecision() <= 3) ? 8 : 12;
return isCompact(getPrecision()) ? 8 : 12;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import org.apache.flink.cdc.common.data.TimestampData;
import org.apache.flink.cdc.runtime.serializer.SerializerTestBase;

import static org.apache.flink.cdc.common.data.TimestampData.isCompact;

/** A test for the {@link TimestampDataSerializer}. */
abstract class TimestampDataSerializerTest extends SerializerTestBase<TimestampData> {
@Override
Expand All @@ -30,7 +32,7 @@ protected TypeSerializer<TimestampData> createSerializer() {

@Override
protected int getLength() {
return (getPrecision() <= 3) ? 8 : 12;
return isCompact(getPrecision()) ? 8 : 12;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import java.time.LocalDateTime;
import java.time.ZoneId;

import static org.apache.flink.cdc.common.data.ZonedTimestampData.isCompact;

/** A test for the {@link ZonedTimestampDataSerializer}. */
abstract class ZonedTimestampDataSerializerTest extends SerializerTestBase<ZonedTimestampData> {
@Override
Expand All @@ -33,7 +35,7 @@ protected TypeSerializer<ZonedTimestampData> createSerializer() {

@Override
protected int getLength() {
return (getPrecision() <= 3) ? 8 : 12;
return isCompact(getPrecision()) ? 8 : 12;
}

@Override
Expand Down

0 comments on commit e70b80d

Please sign in to comment.