From 7ca11359d7969cd7488fc546dd04f1bbfb8d6674 Mon Sep 17 00:00:00 2001 From: Wink <32723967+aiwenmo@users.noreply.github.com> Date: Wed, 7 Aug 2024 00:49:22 +0800 Subject: [PATCH] [FLINK-35743][cdc-runtime] Correct the temporal function semantics This closes #3449. Co-authored-by: wenmo <32723967+wenmo@users.noreply.github.com> Co-authored-by: yuxiqian <34335406+yuxiqian@users.noreply.github.com> --- .../flink/cdc/common/utils/DateTimeUtils.java | 19 ++ .../flink/FlinkPipelineComposerITCase.java | 1 + .../flink/FlinkPipelineTransformITCase.java | 281 ++++++++++++++++++ .../cdc/connectors/values/ValuesDatabase.java | 20 +- .../pipeline/tests/TransformE2eITCase.java | 123 ++++++++ .../functions/SystemFunctionUtils.java | 118 ++++++-- .../cdc/runtime/parser/JaninoCompiler.java | 81 ++++- .../metadata/TransformSqlOperatorTable.java | 6 +- .../transform/TransformDataOperatorTest.java | 219 ++++++++++++-- .../runtime/parser/JaninoCompilerTest.java | 21 -- .../runtime/parser/TransformParserTest.java | 14 +- 11 files changed, 793 insertions(+), 110 deletions(-) create mode 100644 flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineTransformITCase.java diff --git a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/DateTimeUtils.java b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/DateTimeUtils.java index 9a009df620..b923107e2c 100644 --- a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/DateTimeUtils.java +++ b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/DateTimeUtils.java @@ -92,6 +92,14 @@ public static int parseDate(String dateStr, String fromFormat) { return ymdToUnixDate(zdt.getYear(), zdt.getMonthValue(), zdt.getDayOfMonth()); } + public static int parseDate(String dateStr, String fromFormat, String timezone) { + long ts = internalParseTimestampMillis(dateStr, fromFormat, TimeZone.getTimeZone(timezone)); + ZoneId zoneId = ZoneId.of(timezone); + Instant instant = Instant.ofEpochMilli(ts); + ZonedDateTime zdt = ZonedDateTime.ofInstant(instant, zoneId); + return ymdToUnixDate(zdt.getYear(), zdt.getMonthValue(), zdt.getDayOfMonth()); + } + private static long internalParseTimestampMillis(String dateStr, String format, TimeZone tz) { SimpleDateFormat formatter = FORMATTER_CACHE.get(format); formatter.setTimeZone(tz); @@ -119,4 +127,15 @@ private static int ymdToJulian(int year, int month, int day) { int m = month + 12 * a - 3; return day + (153 * m + 2) / 5 + 365 * y + y / 4 - y / 100 + y / 400 - 32045; } + + // -------------------------------------------------------------------------------------------- + // Format + // -------------------------------------------------------------------------------------------- + + public static String formatTimestampMillis(long ts, String format, TimeZone timeZone) { + SimpleDateFormat formatter = FORMATTER_CACHE.get(format); + formatter.setTimeZone(timeZone); + Date dateTime = new Date(ts); + return formatter.format(dateTime); + } } diff --git a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposerITCase.java b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposerITCase.java index 26c9c91875..a8acf79899 100644 --- a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposerITCase.java +++ b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposerITCase.java @@ -68,6 +68,7 @@ /** Integration test for {@link FlinkPipelineComposer}. */ class FlinkPipelineComposerITCase { + private static final int MAX_PARALLELISM = 4; // Always use parent-first classloader for CDC classes. diff --git a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineTransformITCase.java b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineTransformITCase.java new file mode 100644 index 0000000000..77f89a37a4 --- /dev/null +++ b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineTransformITCase.java @@ -0,0 +1,281 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.composer.flink; + +import org.apache.flink.cdc.common.configuration.Configuration; +import org.apache.flink.cdc.common.data.binary.BinaryStringData; +import org.apache.flink.cdc.common.event.CreateTableEvent; +import org.apache.flink.cdc.common.event.DataChangeEvent; +import org.apache.flink.cdc.common.event.Event; +import org.apache.flink.cdc.common.event.TableId; +import org.apache.flink.cdc.common.pipeline.PipelineOptions; +import org.apache.flink.cdc.common.schema.Schema; +import org.apache.flink.cdc.common.types.DataType; +import org.apache.flink.cdc.common.types.DataTypes; +import org.apache.flink.cdc.composer.PipelineExecution; +import org.apache.flink.cdc.composer.definition.PipelineDef; +import org.apache.flink.cdc.composer.definition.SinkDef; +import org.apache.flink.cdc.composer.definition.SourceDef; +import org.apache.flink.cdc.composer.definition.TransformDef; +import org.apache.flink.cdc.connectors.values.ValuesDatabase; +import org.apache.flink.cdc.connectors.values.factory.ValuesDataFactory; +import org.apache.flink.cdc.connectors.values.sink.ValuesDataSinkOptions; +import org.apache.flink.cdc.connectors.values.source.ValuesDataSourceHelper; +import org.apache.flink.cdc.connectors.values.source.ValuesDataSourceOptions; +import org.apache.flink.cdc.runtime.typeutils.BinaryRecordDataGenerator; +import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; +import org.apache.flink.test.junit5.MiniClusterExtension; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +import java.io.ByteArrayOutputStream; +import java.io.PrintStream; +import java.time.Instant; +import java.time.LocalDateTime; +import java.time.ZoneOffset; +import java.time.format.DateTimeFormatter; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.stream.Stream; + +import static org.apache.flink.configuration.CoreOptions.ALWAYS_PARENT_FIRST_LOADER_PATTERNS_ADDITIONAL; +import static org.assertj.core.api.Assertions.assertThat; + +/** Integration test for {@link FlinkPipelineComposer}. */ +class FlinkPipelineTransformITCase { + + private static final int MAX_PARALLELISM = 4; + + // Always use parent-first classloader for CDC classes. + // The reason is that ValuesDatabase uses static field for holding data, we need to make sure + // the class is loaded by AppClassloader so that we can verify data in the test case. + private static final org.apache.flink.configuration.Configuration MINI_CLUSTER_CONFIG = + new org.apache.flink.configuration.Configuration(); + + static { + MINI_CLUSTER_CONFIG.set( + ALWAYS_PARENT_FIRST_LOADER_PATTERNS_ADDITIONAL, + Collections.singletonList("org.apache.flink.cdc")); + } + + /** + * Use {@link MiniClusterExtension} to reduce the overhead of restarting the MiniCluster for + * every test case. + */ + @RegisterExtension + static final MiniClusterExtension MINI_CLUSTER_RESOURCE = + new MiniClusterExtension( + new MiniClusterResourceConfiguration.Builder() + .setNumberTaskManagers(1) + .setNumberSlotsPerTaskManager(MAX_PARALLELISM) + .setConfiguration(MINI_CLUSTER_CONFIG) + .build()); + + private final PrintStream standardOut = System.out; + private final ByteArrayOutputStream outCaptor = new ByteArrayOutputStream(); + + @BeforeEach + void init() { + // Take over STDOUT as we need to check the output of values sink + System.setOut(new PrintStream(outCaptor)); + // Initialize in-memory database + ValuesDatabase.clear(); + } + + @AfterEach + void cleanup() { + System.setOut(standardOut); + } + + @Test + void testTransformWithTemporalFunction() throws Exception { + FlinkPipelineComposer composer = FlinkPipelineComposer.ofMiniCluster(); + + // Setup value source + Configuration sourceConfig = new Configuration(); + sourceConfig.set( + ValuesDataSourceOptions.EVENT_SET_ID, + ValuesDataSourceHelper.EventSetId.CUSTOM_SOURCE_EVENTS); + + TableId myTable1 = TableId.tableId("default_namespace", "default_schema", "mytable1"); + TableId myTable2 = TableId.tableId("default_namespace", "default_schema", "mytable2"); + Schema table1Schema = + Schema.newBuilder() + .physicalColumn("id", DataTypes.INT()) + .physicalColumn("name", DataTypes.STRING()) + .physicalColumn("age", DataTypes.INT()) + .primaryKey("id") + .build(); + Schema table2Schema = + Schema.newBuilder() + .physicalColumn("id", DataTypes.BIGINT()) + .physicalColumn("name", DataTypes.VARCHAR(255)) + .physicalColumn("age", DataTypes.TINYINT()) + .physicalColumn("description", DataTypes.STRING()) + .primaryKey("id") + .build(); + + List events = new ArrayList<>(); + BinaryRecordDataGenerator table1dataGenerator = + new BinaryRecordDataGenerator( + table1Schema.getColumnDataTypes().toArray(new DataType[0])); + BinaryRecordDataGenerator table2dataGenerator = + new BinaryRecordDataGenerator( + table2Schema.getColumnDataTypes().toArray(new DataType[0])); + events.add(new CreateTableEvent(myTable1, table1Schema)); + events.add( + DataChangeEvent.insertEvent( + myTable1, + table1dataGenerator.generate( + new Object[] {1, BinaryStringData.fromString("Alice"), 18}))); + events.add( + DataChangeEvent.insertEvent( + myTable1, + table1dataGenerator.generate( + new Object[] {2, BinaryStringData.fromString("Bob"), 20}))); + events.add( + DataChangeEvent.updateEvent( + myTable1, + table1dataGenerator.generate( + new Object[] {2, BinaryStringData.fromString("Bob"), 20}), + table1dataGenerator.generate( + new Object[] {2, BinaryStringData.fromString("Bob"), 30}))); + events.add(new CreateTableEvent(myTable2, table2Schema)); + events.add( + DataChangeEvent.insertEvent( + myTable2, + table2dataGenerator.generate( + new Object[] { + 3L, + BinaryStringData.fromString("Carol"), + (byte) 15, + BinaryStringData.fromString("student") + }))); + events.add( + DataChangeEvent.insertEvent( + myTable2, + table2dataGenerator.generate( + new Object[] { + 4L, + BinaryStringData.fromString("Derrida"), + (byte) 25, + BinaryStringData.fromString("student") + }))); + events.add( + DataChangeEvent.deleteEvent( + myTable2, + table2dataGenerator.generate( + new Object[] { + 4L, + BinaryStringData.fromString("Derrida"), + (byte) 25, + BinaryStringData.fromString("student") + }))); + + ValuesDataSourceHelper.setSourceEvents(Collections.singletonList(events)); + + SourceDef sourceDef = + new SourceDef(ValuesDataFactory.IDENTIFIER, "Value Source", sourceConfig); + + // Setup value sink + Configuration sinkConfig = new Configuration(); + sinkConfig.set(ValuesDataSinkOptions.MATERIALIZED_IN_MEMORY, true); + SinkDef sinkDef = new SinkDef(ValuesDataFactory.IDENTIFIER, "Value Sink", sinkConfig); + + // Setup pipeline + Configuration pipelineConfig = new Configuration(); + pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1); + pipelineConfig.set(PipelineOptions.PIPELINE_LOCAL_TIME_ZONE, "America/Los_Angeles"); + PipelineDef pipelineDef = + new PipelineDef( + sourceDef, + sinkDef, + Collections.emptyList(), + Collections.singletonList( + new TransformDef( + "default_namespace.default_schema.\\.*", + "*, LOCALTIME as lcl_t, CURRENT_TIME as cur_t, CAST(CURRENT_TIMESTAMP AS TIMESTAMP) as cur_ts, CAST(NOW() AS TIMESTAMP) as now_ts, LOCALTIMESTAMP as lcl_ts, CURRENT_DATE as cur_dt", + null, + null, + null, + null, + null)), + pipelineConfig); + + // Execute the pipeline + PipelineExecution execution = composer.compose(pipelineDef); + execution.execute(); + + // Check the order and content of all received events + String[] outputEvents = outCaptor.toString().trim().split("\n"); + + Arrays.stream(outputEvents).forEach(this::extractDataLines); + } + + void extractDataLines(String line) { + if (!line.startsWith("DataChangeEvent{")) { + return; + } + Stream.of("before", "after") + .forEach( + tag -> { + String[] arr = line.split(tag + "=\\[", 2); + String dataRecord = arr[arr.length - 1].split("]", 2)[0]; + if (!dataRecord.isEmpty()) { + verifyDataRecord(dataRecord); + } + }); + } + + void verifyDataRecord(String recordLine) { + List tokens = Arrays.asList(recordLine.split(", ")); + assertThat(tokens).hasSizeGreaterThanOrEqualTo(6); + + tokens = tokens.subList(tokens.size() - 6, tokens.size()); + + String localTime = tokens.get(0); + String currentTime = tokens.get(1); + assertThat(localTime).isEqualTo(currentTime); + + String currentTimestamp = tokens.get(2); + String nowTimestamp = tokens.get(3); + String localTimestamp = tokens.get(4); + assertThat(currentTimestamp).isEqualTo(nowTimestamp).isEqualTo(localTimestamp); + + Instant instant = + LocalDateTime.parse( + currentTimestamp, + DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSS")) + .toInstant(ZoneOffset.UTC); + + long milliSecondsInOneDay = 24 * 60 * 60 * 1000; + + assertThat(instant.toEpochMilli() % milliSecondsInOneDay) + .isEqualTo(Long.parseLong(localTime)); + + String currentDate = tokens.get(5); + + assertThat(instant.toEpochMilli() / milliSecondsInOneDay) + .isEqualTo(Long.parseLong(currentDate)); + } +} diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-values/src/main/java/org/apache/flink/cdc/connectors/values/ValuesDatabase.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-values/src/main/java/org/apache/flink/cdc/connectors/values/ValuesDatabase.java index 36edab6d86..4d80deecec 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-values/src/main/java/org/apache/flink/cdc/connectors/values/ValuesDatabase.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-values/src/main/java/org/apache/flink/cdc/connectors/values/ValuesDatabase.java @@ -32,6 +32,7 @@ import org.apache.flink.cdc.common.sink.MetadataApplier; import org.apache.flink.cdc.common.source.MetadataAccessor; import org.apache.flink.cdc.common.utils.Preconditions; +import org.apache.flink.cdc.common.utils.SchemaUtils; import org.apache.flink.cdc.connectors.values.sink.ValuesDataSink; import org.apache.flink.cdc.connectors.values.source.ValuesDataSource; @@ -48,6 +49,7 @@ import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.Optional; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.stream.Collectors; @@ -185,7 +187,7 @@ private static class ValuesTable { private final TableId tableId; // [primaryKeys, [column_name, column_value]] - private final Map> records; + private final Map> records; private final LinkedList columns; @@ -210,15 +212,21 @@ public ValuesTable(TableId tableId, Schema schema) { public List getResult() { List results = new ArrayList<>(); synchronized (lock) { + List fieldGetters = SchemaUtils.createFieldGetters(columns); records.forEach( (key, record) -> { StringBuilder stringBuilder = new StringBuilder(tableId.toString()); stringBuilder.append(":"); - for (Column column : columns) { + for (int i = 0; i < columns.size(); i++) { + Column column = columns.get(i); + RecordData.FieldGetter fieldGetter = fieldGetters.get(i); stringBuilder .append(column.getName()) .append("=") - .append(record.getOrDefault(column.getName(), "")) + .append( + Optional.ofNullable(record.get(column.getName())) + .map(fieldGetter::getFieldOrNull) + .orElse("")) .append(";"); } stringBuilder.deleteCharAt(stringBuilder.length() - 1); @@ -257,9 +265,9 @@ private void delete(RecordData recordData) { private void insert(RecordData recordData) { String primaryKey = buildPrimaryKeyStr(recordData); - Map record = new HashMap<>(); + Map record = new HashMap<>(); for (int i = 0; i < recordData.getArity(); i++) { - record.put(columns.get(i).getName(), recordData.getString(i).toString()); + record.put(columns.get(i).getName(), recordData); } records.put(primaryKey, record); } @@ -393,7 +401,7 @@ private void applyRenameColumnEvent(RenameColumnEvent event) { records.forEach( (key, record) -> { if (record.containsKey(beforeName)) { - String value = record.get(beforeName); + RecordData value = record.get(beforeName); record.remove(beforeName); record.put(afterName, value); } diff --git a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/TransformE2eITCase.java b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/TransformE2eITCase.java index 2f896d3344..3f0999ee14 100644 --- a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/TransformE2eITCase.java +++ b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/TransformE2eITCase.java @@ -25,6 +25,7 @@ import org.apache.flink.cdc.runtime.operators.transform.TransformSchemaOperator; import org.junit.After; +import org.junit.Assert; import org.junit.Before; import org.junit.ClassRule; import org.junit.Test; @@ -40,9 +41,14 @@ import java.sql.SQLException; import java.sql.Statement; import java.time.Duration; +import java.time.Instant; +import java.time.LocalDateTime; +import java.time.ZoneOffset; +import java.time.format.DateTimeFormatter; import java.util.Arrays; import java.util.List; import java.util.concurrent.TimeoutException; +import java.util.stream.Stream; /** E2e tests for the {@link TransformSchemaOperator}. */ @RunWith(Parameterized.class) @@ -315,6 +321,44 @@ public void testMultipleHittingTable() throws Exception { System.out.println(stdout); } + @Test + public void testTemporalFunctions() throws Exception { + String pipelineJob = + String.format( + "source:\n" + + " type: mysql\n" + + " hostname: %s\n" + + " port: 3306\n" + + " username: %s\n" + + " password: %s\n" + + " tables: %s.\\.*\n" + + " server-id: 5400-5404\n" + + " server-time-zone: UTC\n" + + "\n" + + "sink:\n" + + " type: values\n" + + "transform:\n" + + " - source-table: %s.\\.*\n" + + " projection: ID, LOCALTIME as lcl_t, CURRENT_TIME as cur_t, CAST(CURRENT_TIMESTAMP AS TIMESTAMP) as cur_ts, CAST(NOW() AS TIMESTAMP) as now_ts, LOCALTIMESTAMP as lcl_ts, CURRENT_DATE as cur_dt\n" + + "\n" + + "pipeline:\n" + + " parallelism: 1\n" + + " local-time-zone: America/Los_Angeles", + INTER_CONTAINER_MYSQL_ALIAS, + MYSQL_TEST_USER, + MYSQL_TEST_PASSWORD, + transformRenameDatabase.getDatabaseName(), + transformRenameDatabase.getDatabaseName()); + Path mysqlCdcJar = TestUtils.getResource("mysql-cdc-pipeline-connector.jar"); + Path valuesCdcJar = TestUtils.getResource("values-cdc-pipeline-connector.jar"); + Path mysqlDriverJar = TestUtils.getResource("mysql-driver.jar"); + submitPipelineJob(pipelineJob, mysqlCdcJar, valuesCdcJar, mysqlDriverJar); + waitUntilJobRunning(Duration.ofSeconds(30)); + LOG.info("Pipeline job is running"); + + waitForTemporaryRecords(8, 60000L); + } + private void validateResult(List expectedEvents) throws Exception { for (String event : expectedEvents) { waitUntilSpecificEvent(event, 6000L); @@ -340,4 +384,83 @@ private void waitUntilSpecificEvent(String event, long timeout) throws Exception + taskManagerConsumer.toUtf8String()); } } + + private int validateTemporaryRecords() { + int validRecordCount = 0; + for (String line : taskManagerConsumer.toUtf8String().split("\n")) { + if (extractDataLines(line)) { + validRecordCount++; + } + } + return validRecordCount; + } + + private void waitForTemporaryRecords(int expectedRecords, long timeout) throws Exception { + boolean result = false; + long endTimeout = System.currentTimeMillis() + timeout; + while (System.currentTimeMillis() < endTimeout) { + if (validateTemporaryRecords() >= expectedRecords) { + result = true; + break; + } + Thread.sleep(1000); + } + if (!result) { + throw new TimeoutException( + "failed to get enough temporary records: " + + expectedRecords + + " from stdout: " + + taskManagerConsumer.toUtf8String()); + } + } + + boolean extractDataLines(String line) { + if (!line.startsWith("DataChangeEvent{")) { + return false; + } + Stream.of("before", "after") + .forEach( + tag -> { + String[] arr = line.split(tag + "=\\[", 2); + String dataRecord = arr[arr.length - 1].split("]", 2)[0]; + if (!dataRecord.isEmpty()) { + verifyDataRecord(dataRecord); + } + }); + return true; + } + + void verifyDataRecord(String recordLine) { + LOG.info("Verifying data line {}", recordLine); + List tokens = Arrays.asList(recordLine.split(", ")); + Assert.assertTrue(tokens.size() >= 6); + + tokens = tokens.subList(tokens.size() - 6, tokens.size()); + + String localTime = tokens.get(0); + String currentTime = tokens.get(1); + Assert.assertEquals(localTime, currentTime); + + String currentTimestamp = tokens.get(2); + String nowTimestamp = tokens.get(3); + String localTimestamp = tokens.get(4); + Assert.assertEquals(currentTimestamp, nowTimestamp); + Assert.assertEquals(currentTimestamp, localTimestamp); + + Instant instant = + LocalDateTime.parse( + currentTimestamp, + DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSS")) + .toInstant(ZoneOffset.UTC); + + long milliSecondsInOneDay = 24 * 60 * 60 * 1000; + + Assert.assertEquals( + instant.toEpochMilli() % milliSecondsInOneDay, Long.parseLong(localTime)); + + String currentDate = tokens.get(5); + + Assert.assertEquals( + instant.toEpochMilli() / milliSecondsInOneDay, Long.parseLong(currentDate)); + } } diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/functions/SystemFunctionUtils.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/functions/SystemFunctionUtils.java index 34c299567a..ba569fc089 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/functions/SystemFunctionUtils.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/functions/SystemFunctionUtils.java @@ -30,6 +30,9 @@ import java.math.RoundingMode; import java.text.ParseException; import java.text.SimpleDateFormat; +import java.time.Instant; +import java.time.LocalDateTime; +import java.time.ZoneId; import java.util.Arrays; import java.util.Calendar; import java.util.Date; @@ -38,65 +41,62 @@ import java.util.regex.Matcher; import java.util.regex.Pattern; +import static org.apache.flink.cdc.common.utils.DateTimeUtils.timestampMillisToDate; +import static org.apache.flink.cdc.common.utils.DateTimeUtils.timestampMillisToTime; + /** System function utils to support the call of flink cdc pipeline transform. */ public class SystemFunctionUtils { - private static final Logger LOG = LoggerFactory.getLogger(SystemFunctionUtils.class); - public static int localtime(long epochTime, String timezone) { - return DateTimeUtils.timestampMillisToTime(epochTime); - } + private static final Logger LOG = LoggerFactory.getLogger(SystemFunctionUtils.class); - public static TimestampData localtimestamp(long epochTime, String timezone) { - return TimestampData.fromMillis(epochTime); + public static LocalZonedTimestampData currentTimestamp(long epochTime) { + return LocalZonedTimestampData.fromEpochMillis(epochTime); } - // synonym: localtime - public static int currentTime(long epochTime, String timezone) { - return localtime(epochTime, timezone); + // synonym with currentTimestamp + public static LocalZonedTimestampData now(long epochTime) { + return LocalZonedTimestampData.fromEpochMillis(epochTime); } - public static int currentDate(long epochTime, String timezone) { - return DateTimeUtils.timestampMillisToDate(epochTime); + public static TimestampData localtimestamp(long epochTime, String timezone) { + return TimestampData.fromLocalDateTime( + Instant.ofEpochMilli(epochTime).atZone(ZoneId.of(timezone)).toLocalDateTime()); } - public static TimestampData currentTimestamp(long epochTime, String timezone) { - return TimestampData.fromMillis( - epochTime + TimeZone.getTimeZone(timezone).getOffset(epochTime)); + public static int localtime(long epochTime, String timezone) { + return timestampMillisToTime(localtimestamp(epochTime, timezone).getMillisecond()); } - public static LocalZonedTimestampData now(long epochTime, String timezone) { - return LocalZonedTimestampData.fromEpochMillis(epochTime); + public static int currentTime(long epochTime, String timezone) { + // the time value of currentTimestamp under given session time zone + return timestampMillisToTime(localtimestamp(epochTime, timezone).getMillisecond()); } - public static String dateFormat(LocalZonedTimestampData timestamp, String format) { - SimpleDateFormat dateFormat = new SimpleDateFormat(format); - return dateFormat.format(new Date(timestamp.getEpochMillisecond())); + public static int currentDate(long epochTime, String timezone) { + // the date value of currentTimestamp under given session time zone + return timestampMillisToDate(localtimestamp(epochTime, timezone).getMillisecond()); } public static String dateFormat(TimestampData timestamp, String format) { - SimpleDateFormat dateFormat = new SimpleDateFormat(format); - return dateFormat.format(new Date(timestamp.getMillisecond())); - } - - public static String dateFormat(ZonedTimestampData timestamp, String format) { - SimpleDateFormat dateFormat = new SimpleDateFormat(format); - return dateFormat.format(new Date(timestamp.getMillisecond())); + return DateTimeUtils.formatTimestampMillis( + timestamp.getMillisecond(), format, TimeZone.getTimeZone("UTC")); } - public static int toDate(String str) { - return toDate(str, "yyyy-MM-dd"); + public static int toDate(String str, String timezone) { + return toDate(str, "yyyy-MM-dd", timezone); } - public static int toDate(String str, String format) { - return DateTimeUtils.parseDate(str, format); + public static int toDate(String str, String format, String timezone) { + return DateTimeUtils.parseDate(str, format, timezone); } - public static TimestampData toTimestamp(String str) { - return toTimestamp(str, "yyyy-MM-dd HH:mm:ss"); + public static TimestampData toTimestamp(String str, String timezone) { + return toTimestamp(str, "yyyy-MM-dd HH:mm:ss", timezone); } - public static TimestampData toTimestamp(String str, String format) { + public static TimestampData toTimestamp(String str, String format, String timezone) { SimpleDateFormat dateFormat = new SimpleDateFormat(format); + dateFormat.setTimeZone(TimeZone.getTimeZone(timezone)); try { return TimestampData.fromMillis(dateFormat.parse(str).getTime()); } catch (ParseException e) { @@ -118,11 +118,45 @@ public static int timestampDiff( return timestampDiff(symbol, fromTimestamp.getMillisecond(), toTimestamp.getMillisecond()); } + public static int timestampDiff( + String symbol, TimestampData fromTimestamp, LocalZonedTimestampData toTimestamp) { + return timestampDiff( + symbol, fromTimestamp.getMillisecond(), toTimestamp.getEpochMillisecond()); + } + + public static int timestampDiff( + String symbol, LocalZonedTimestampData fromTimestamp, TimestampData toTimestamp) { + return timestampDiff( + symbol, fromTimestamp.getEpochMillisecond(), toTimestamp.getMillisecond()); + } + public static int timestampDiff( String symbol, ZonedTimestampData fromTimestamp, ZonedTimestampData toTimestamp) { return timestampDiff(symbol, fromTimestamp.getMillisecond(), toTimestamp.getMillisecond()); } + public static int timestampDiff( + String symbol, LocalZonedTimestampData fromTimestamp, ZonedTimestampData toTimestamp) { + return timestampDiff( + symbol, fromTimestamp.getEpochMillisecond(), toTimestamp.getMillisecond()); + } + + public static int timestampDiff( + String symbol, ZonedTimestampData fromTimestamp, LocalZonedTimestampData toTimestamp) { + return timestampDiff( + symbol, fromTimestamp.getMillisecond(), toTimestamp.getEpochMillisecond()); + } + + public static int timestampDiff( + String symbol, TimestampData fromTimestamp, ZonedTimestampData toTimestamp) { + return timestampDiff(symbol, fromTimestamp.getMillisecond(), toTimestamp.getMillisecond()); + } + + public static int timestampDiff( + String symbol, ZonedTimestampData fromTimestamp, TimestampData toTimestamp) { + return timestampDiff(symbol, fromTimestamp.getMillisecond(), toTimestamp.getMillisecond()); + } + public static int timestampDiff(String symbol, long fromDate, long toDate) { Calendar from = Calendar.getInstance(); from.setTime(new Date(fromDate)); @@ -587,6 +621,24 @@ public static BigDecimal castToBigDecimal(Object object, int precision, int scal return bigDecimal; } + public static TimestampData castToTimestamp(Object object, String timezone) { + if (object == null) { + return null; + } + if (object instanceof LocalZonedTimestampData) { + return TimestampData.fromLocalDateTime( + LocalDateTime.ofInstant( + ((LocalZonedTimestampData) object).toInstant(), ZoneId.of(timezone))); + } else if (object instanceof ZonedTimestampData) { + return TimestampData.fromLocalDateTime( + LocalDateTime.ofInstant( + ((ZonedTimestampData) object).toInstant(), ZoneId.of(timezone))); + } else { + return TimestampData.fromLocalDateTime( + LocalDateTime.parse(castObjectIntoString(object))); + } + } + private static String castObjectIntoString(Object object) { if (object instanceof Boolean) { return Boolean.valueOf(castToString(object)) ? "1" : "0"; diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/JaninoCompiler.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/JaninoCompiler.java index 2dd1b8402e..4877d01754 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/JaninoCompiler.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/JaninoCompiler.java @@ -49,13 +49,18 @@ public class JaninoCompiler { private static final List SQL_TYPE_NAME_IGNORE = Arrays.asList(SqlTypeName.SYMBOL); - private static final List NO_OPERAND_TIMESTAMP_FUNCTIONS = - Arrays.asList( - "LOCALTIME", - "LOCALTIMESTAMP", - "CURRENT_TIME", - "CURRENT_DATE", - "CURRENT_TIMESTAMP"); + private static final List TIMEZONE_FREE_TEMPORAL_FUNCTIONS = + Arrays.asList("CURRENT_TIMESTAMP", "NOW"); + + private static final List TIMEZONE_REQUIRED_TEMPORAL_FUNCTIONS = + Arrays.asList("LOCALTIME", "LOCALTIMESTAMP", "CURRENT_TIME", "CURRENT_DATE"); + + private static final List TIMEZONE_FREE_TEMPORAL_CONVERSION_FUNCTIONS = + Arrays.asList("DATE_FORMAT"); + + private static final List TIMEZONE_REQUIRED_TEMPORAL_CONVERSION_FUNCTIONS = + Arrays.asList("TO_DATE", "TO_TIMESTAMP"); + public static final String DEFAULT_EPOCH_TIME = "__epoch_time__"; public static final String DEFAULT_TIME_ZONE = "__time_zone__"; @@ -107,8 +112,14 @@ public static Java.Rvalue translateSqlNodeToJaninoRvalue(SqlNode transform) { private static Java.Rvalue translateSqlIdentifier(SqlIdentifier sqlIdentifier) { String columnName = sqlIdentifier.names.get(sqlIdentifier.names.size() - 1); - if (NO_OPERAND_TIMESTAMP_FUNCTIONS.contains(columnName)) { - return generateNoOperandTimestampFunctionOperation(columnName); + if (TIMEZONE_FREE_TEMPORAL_FUNCTIONS.contains(columnName)) { + return generateTimezoneFreeTemporalFunctionOperation(columnName); + } else if (TIMEZONE_REQUIRED_TEMPORAL_FUNCTIONS.contains(columnName)) { + return generateTimezoneRequiredTemporalFunctionOperation(columnName); + } else if (TIMEZONE_FREE_TEMPORAL_CONVERSION_FUNCTIONS.contains(columnName)) { + return generateTimezoneFreeTemporalConversionFunctionOperation(columnName); + } else if (TIMEZONE_REQUIRED_TEMPORAL_CONVERSION_FUNCTIONS.contains(columnName)) { + return generateTimezoneRequiredTemporalConversionFunctionOperation(columnName); } else { return new Java.AmbiguousName(Location.NOWHERE, new String[] {columnName}); } @@ -135,8 +146,14 @@ private static Java.Rvalue translateSqlBasicCall(SqlBasicCall sqlBasicCall) { for (SqlNode sqlNode : operandList) { translateSqlNodeToAtoms(sqlNode, atoms); } - if (NO_OPERAND_TIMESTAMP_FUNCTIONS.contains(sqlBasicCall.getOperator().getName())) { + if (TIMEZONE_FREE_TEMPORAL_FUNCTIONS.contains(sqlBasicCall.getOperator().getName())) { atoms.add(new Java.AmbiguousName(Location.NOWHERE, new String[] {DEFAULT_EPOCH_TIME})); + } else if (TIMEZONE_REQUIRED_TEMPORAL_FUNCTIONS.contains( + sqlBasicCall.getOperator().getName())) { + atoms.add(new Java.AmbiguousName(Location.NOWHERE, new String[] {DEFAULT_EPOCH_TIME})); + atoms.add(new Java.AmbiguousName(Location.NOWHERE, new String[] {DEFAULT_TIME_ZONE})); + } else if (TIMEZONE_REQUIRED_TEMPORAL_CONVERSION_FUNCTIONS.contains( + sqlBasicCall.getOperator().getName())) { atoms.add(new Java.AmbiguousName(Location.NOWHERE, new String[] {DEFAULT_TIME_ZONE})); } return sqlBasicCallToJaninoRvalue(sqlBasicCall, atoms.toArray(new Java.Rvalue[0])); @@ -289,8 +306,6 @@ private static Java.Rvalue generateOtherFunctionOperation( } else { throw new ParseException("Unrecognized expression: " + sqlBasicCall.toString()); } - } else if (operationName.equals("NOW")) { - return generateNoOperandTimestampFunctionOperation(operationName); } else { return new Java.MethodInvocation( Location.NOWHERE, @@ -300,7 +315,18 @@ private static Java.Rvalue generateOtherFunctionOperation( } } - private static Java.Rvalue generateNoOperandTimestampFunctionOperation(String operationName) { + private static Java.Rvalue generateTimezoneFreeTemporalFunctionOperation(String operationName) { + return new Java.MethodInvocation( + Location.NOWHERE, + null, + StringUtils.convertToCamelCase(operationName), + new Java.Rvalue[] { + new Java.AmbiguousName(Location.NOWHERE, new String[] {DEFAULT_EPOCH_TIME}) + }); + } + + private static Java.Rvalue generateTimezoneRequiredTemporalFunctionOperation( + String operationName) { List timestampFunctionParam = new ArrayList<>(); timestampFunctionParam.add( new Java.AmbiguousName(Location.NOWHERE, new String[] {DEFAULT_EPOCH_TIME})); @@ -313,6 +339,26 @@ private static Java.Rvalue generateNoOperandTimestampFunctionOperation(String op timestampFunctionParam.toArray(new Java.Rvalue[0])); } + private static Java.Rvalue generateTimezoneFreeTemporalConversionFunctionOperation( + String operationName) { + return new Java.MethodInvocation( + Location.NOWHERE, + null, + StringUtils.convertToCamelCase(operationName), + new Java.Rvalue[0]); + } + + private static Java.Rvalue generateTimezoneRequiredTemporalConversionFunctionOperation( + String operationName) { + return new Java.MethodInvocation( + Location.NOWHERE, + null, + StringUtils.convertToCamelCase(operationName), + new Java.Rvalue[] { + new Java.AmbiguousName(Location.NOWHERE, new String[] {DEFAULT_TIME_ZONE}) + }); + } + private static Java.Rvalue generateTypeConvertMethod( SqlDataTypeSpec sqlDataTypeSpec, Java.Rvalue[] atoms) { switch (sqlDataTypeSpec.getTypeName().getSimple().toUpperCase()) { @@ -359,6 +405,15 @@ private static Java.Rvalue generateTypeConvertMethod( case "VARCHAR": case "STRING": return new Java.MethodInvocation(Location.NOWHERE, null, "castToString", atoms); + case "TIMESTAMP": + List timestampAtoms = new ArrayList<>(Arrays.asList(atoms)); + timestampAtoms.add( + new Java.AmbiguousName(Location.NOWHERE, new String[] {DEFAULT_TIME_ZONE})); + return new Java.MethodInvocation( + Location.NOWHERE, + null, + "castToTimestamp", + timestampAtoms.toArray(new Java.Rvalue[0])); default: throw new ParseException( "Unsupported data type cast: " + sqlDataTypeSpec.toString()); diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/metadata/TransformSqlOperatorTable.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/metadata/TransformSqlOperatorTable.java index 8ecefa8f4a..35253f2791 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/metadata/TransformSqlOperatorTable.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/metadata/TransformSqlOperatorTable.java @@ -26,6 +26,7 @@ import org.apache.calcite.sql.SqlKind; import org.apache.calcite.sql.SqlOperator; import org.apache.calcite.sql.SqlSyntax; +import org.apache.calcite.sql.fun.SqlCurrentDateFunction; import org.apache.calcite.sql.fun.SqlStdOperatorTable; import org.apache.calcite.sql.type.InferTypes; import org.apache.calcite.sql.type.OperandTypes; @@ -86,11 +87,12 @@ public void lookupOperatorOverloads( .build(); public static final SqlFunction LOCALTIMESTAMP = new BuiltInTimestampFunction("LOCALTIMESTAMP", SqlTypeName.TIMESTAMP, 3); + public static final SqlFunction CURRENT_TIME = + new BuiltInTimestampFunction("CURRENT_TIME", SqlTypeName.TIME, 0); public static final SqlFunction CURRENT_TIMESTAMP = new BuiltInTimestampFunction( "CURRENT_TIMESTAMP", SqlTypeName.TIMESTAMP_WITH_LOCAL_TIME_ZONE, 3); - public static final SqlFunction CURRENT_DATE = - new BuiltInTimestampFunction("CURRENT_DATE", SqlTypeName.DATE, 0); + public static final SqlFunction CURRENT_DATE = new SqlCurrentDateFunction(); public static final SqlFunction NOW = new BuiltInTimestampFunction("NOW", SqlTypeName.TIMESTAMP_WITH_LOCAL_TIME_ZONE, 3) { @Override diff --git a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/TransformDataOperatorTest.java b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/TransformDataOperatorTest.java index 2213127a4a..a19722ef15 100644 --- a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/TransformDataOperatorTest.java +++ b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/TransformDataOperatorTest.java @@ -110,8 +110,6 @@ public class TransformDataOperatorTest { .physicalColumn("minute_diff", DataTypes.INT()) .physicalColumn("hour_diff", DataTypes.INT()) .physicalColumn("day_diff", DataTypes.INT()) - .physicalColumn("month_diff", DataTypes.INT()) - .physicalColumn("year_diff", DataTypes.INT()) .primaryKey("col1") .build(); @@ -131,6 +129,7 @@ public class TransformDataOperatorTest { .physicalColumn("nullChar", DataTypes.CHAR(1)) .physicalColumn("nullVarchar", DataTypes.VARCHAR(1)) .physicalColumn("nullDecimal", DataTypes.DECIMAL(4, 2)) + .physicalColumn("nullTimestamp", DataTypes.TIMESTAMP(3)) .primaryKey("col1") .build(); @@ -149,6 +148,16 @@ public class TransformDataOperatorTest { .physicalColumn("castChar", DataTypes.CHAR(1)) .physicalColumn("castVarchar", DataTypes.VARCHAR(1)) .physicalColumn("castDecimal", DataTypes.DECIMAL(4, 2)) + .physicalColumn("castTimestamp", DataTypes.TIMESTAMP(3)) + .primaryKey("col1") + .build(); + + private static final TableId TIMEZONE_TABLEID = + TableId.tableId("my_company", "my_branch", "timezone_table"); + private static final Schema TIMEZONE_SCHEMA = + Schema.newBuilder() + .physicalColumn("col1", DataTypes.STRING()) + .physicalColumn("datetime", DataTypes.STRING()) .primaryKey("col1") .build(); @@ -502,10 +511,10 @@ void testTimestampTransform() throws Exception { .addTransform( TIMESTAMP_TABLEID.identifier(), "col1, IF(LOCALTIME = CURRENT_TIME, 1, 0) as time_equal," - + " IF(LOCALTIMESTAMP = CURRENT_TIMESTAMP, 1, 0) as timestamp_equal," + + " IF(DATE_FORMAT(CAST(CURRENT_TIMESTAMP AS TIMESTAMP), 'yyyy-MM-dd HH:mm:ss') = DATE_FORMAT(CAST(NOW() AS TIMESTAMP), 'yyyy-MM-dd HH:mm:ss'), 1, 0) as timestamp_equal," + " IF(TO_DATE(DATE_FORMAT(LOCALTIMESTAMP, 'yyyy-MM-dd')) = CURRENT_DATE, 1, 0) as date_equal", - "LOCALTIMESTAMP = CURRENT_TIMESTAMP") - .addTimezone("GMT") + "LOCALTIMESTAMP = CAST(CURRENT_TIMESTAMP AS TIMESTAMP)") + .addTimezone("UTC") .build(); EventOperatorTestHarness transformFunctionEventEventOperatorTestHarness = @@ -546,14 +555,19 @@ void testTimestampDiffTransform() throws Exception { TransformDataOperator.newBuilder() .addTransform( TIMESTAMPDIFF_TABLEID.identifier(), - "col1, TIMESTAMP_DIFF('SECOND', LOCALTIMESTAMP, CURRENT_TIMESTAMP) as second_diff," - + " TIMESTAMP_DIFF('MINUTE', LOCALTIMESTAMP, CURRENT_TIMESTAMP) as minute_diff," - + " TIMESTAMP_DIFF('HOUR', LOCALTIMESTAMP, CURRENT_TIMESTAMP) as hour_diff," - + " TIMESTAMP_DIFF('DAY', LOCALTIMESTAMP, CURRENT_TIMESTAMP) as day_diff," - + " TIMESTAMP_DIFF('MONTH', LOCALTIMESTAMP, CURRENT_TIMESTAMP) as month_diff," - + " TIMESTAMP_DIFF('YEAR', LOCALTIMESTAMP, CURRENT_TIMESTAMP) as year_diff", - null) - .addTimezone("GMT-8:00") + "col1, TIMESTAMP_DIFF('SECOND', LOCALTIMESTAMP, CAST(CURRENT_TIMESTAMP AS TIMESTAMP)) as second_diff," + + " TIMESTAMP_DIFF('MINUTE', LOCALTIMESTAMP, CAST(CURRENT_TIMESTAMP AS TIMESTAMP)) as minute_diff," + + " TIMESTAMP_DIFF('HOUR', LOCALTIMESTAMP, CAST(CURRENT_TIMESTAMP AS TIMESTAMP)) as hour_diff," + + " TIMESTAMP_DIFF('DAY', LOCALTIMESTAMP, CAST(CURRENT_TIMESTAMP AS TIMESTAMP)) as day_diff", + "col1='1'") + .addTransform( + TIMESTAMPDIFF_TABLEID.identifier(), + "col1, TIMESTAMP_DIFF('SECOND', LOCALTIMESTAMP, CAST(NOW() AS TIMESTAMP)) as second_diff," + + " TIMESTAMP_DIFF('MINUTE', LOCALTIMESTAMP, CAST(NOW() AS TIMESTAMP)) as minute_diff," + + " TIMESTAMP_DIFF('HOUR', LOCALTIMESTAMP, CAST(NOW() AS TIMESTAMP)) as hour_diff," + + " TIMESTAMP_DIFF('DAY', LOCALTIMESTAMP, CAST(NOW() AS TIMESTAMP)) as day_diff", + "col1='2'") + .addTimezone("Asia/Shanghai") .build(); EventOperatorTestHarness transformFunctionEventEventOperatorTestHarness = @@ -570,22 +584,79 @@ void testTimestampDiffTransform() throws Exception { DataChangeEvent.insertEvent( TIMESTAMPDIFF_TABLEID, recordDataGenerator.generate( - new Object[] { - new BinaryStringData("1"), null, null, null, null, null, null - })); + new Object[] {new BinaryStringData("1"), null, null, null, null})); DataChangeEvent insertEventExpect = DataChangeEvent.insertEvent( TIMESTAMPDIFF_TABLEID, + recordDataGenerator.generate( + new Object[] {new BinaryStringData("1"), 0, 0, 0, 0})); + transform.processElement(new StreamRecord<>(createTableEvent)); + Assertions.assertThat( + transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll()) + .isEqualTo( + new StreamRecord<>( + new CreateTableEvent(TIMESTAMPDIFF_TABLEID, TIMESTAMPDIFF_SCHEMA))); + transform.processElement(new StreamRecord<>(insertEvent)); + Assertions.assertThat( + transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll()) + .isEqualTo(new StreamRecord<>(insertEventExpect)); + + DataChangeEvent insertEvent2 = + DataChangeEvent.insertEvent( + TIMESTAMPDIFF_TABLEID, + recordDataGenerator.generate( + new Object[] {new BinaryStringData("2"), null, null, null, null})); + DataChangeEvent insertEventExpect2 = + DataChangeEvent.insertEvent( + TIMESTAMPDIFF_TABLEID, + recordDataGenerator.generate( + new Object[] {new BinaryStringData("2"), 0, 0, 0, 0})); + + transform.processElement(new StreamRecord<>(insertEvent2)); + Assertions.assertThat( + transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll()) + .isEqualTo(new StreamRecord<>(insertEventExpect2)); + } + + @Test + void testTimezoneTransform() throws Exception { + TransformDataOperator transform = + TransformDataOperator.newBuilder() + .addTransform( + TIMEZONE_TABLEID.identifier(), + "col1, DATE_FORMAT(TO_TIMESTAMP('2024-08-01 00:00:00'), 'yyyy-MM-dd HH:mm:ss') as datetime", + null) + .addTimezone("UTC") + .build(); + EventOperatorTestHarness + transformFunctionEventEventOperatorTestHarness = + new EventOperatorTestHarness<>(transform, 1); + // Initialization + transformFunctionEventEventOperatorTestHarness.open(); + // Create table + CreateTableEvent createTableEvent = new CreateTableEvent(TIMEZONE_TABLEID, TIMEZONE_SCHEMA); + BinaryRecordDataGenerator recordDataGenerator = + new BinaryRecordDataGenerator(((RowType) TIMEZONE_SCHEMA.toRowDataType())); + // Insert + DataChangeEvent insertEvent = + DataChangeEvent.insertEvent( + TIMEZONE_TABLEID, + recordDataGenerator.generate( + new Object[] {new BinaryStringData("1"), null})); + DataChangeEvent insertEventExpect = + DataChangeEvent.insertEvent( + TIMEZONE_TABLEID, recordDataGenerator.generate( new Object[] { - new BinaryStringData("1"), -28800, -480, -8, 0, 0, 0 + new BinaryStringData("1"), + new BinaryStringData("2024-08-01 00:00:00") })); transform.processElement(new StreamRecord<>(createTableEvent)); Assertions.assertThat( transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll()) .isEqualTo( new StreamRecord<>( - new CreateTableEvent(TIMESTAMPDIFF_TABLEID, TIMESTAMPDIFF_SCHEMA))); + new CreateTableEvent(TIMEZONE_TABLEID, TIMEZONE_SCHEMA))); transform.processElement(new StreamRecord<>(insertEvent)); Assertions.assertThat( transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll()) @@ -609,7 +680,8 @@ void testNullCastTransform() throws Exception { + ",cast(colString as double) as nullDouble" + ",cast(colString as char) as nullChar" + ",cast(colString as varchar) as nullVarchar" - + ",cast(colString as DECIMAL(4,2)) as nullDecimal", + + ",cast(colString as DECIMAL(4,2)) as nullDecimal" + + ",cast(colString as TIMESTAMP(3)) as nullTimestamp", null) .build(); EventOperatorTestHarness @@ -639,6 +711,7 @@ void testNullCastTransform() throws Exception { null, null, null, + null })); transform.processElement(new StreamRecord<>(createTableEvent)); Assertions.assertThat( @@ -666,7 +739,8 @@ void testCastTransform() throws Exception { + ",cast(col1 as double) as castDouble" + ",cast(col1 as char) as castChar" + ",cast(col1 as varchar) as castVarchar" - + ",cast(col1 as DECIMAL(4,2)) as castDecimal", + + ",cast(col1 as DECIMAL(4,2)) as castDecimal" + + ", castTimestamp", "col1 = '1'") .addTransform( CAST_TABLEID.identifier(), @@ -680,7 +754,8 @@ void testCastTransform() throws Exception { + ",cast(castInt as double) as castDouble" + ",cast(castInt as char) as castChar" + ",cast(castInt as varchar) as castVarchar" - + ",cast(castInt as DECIMAL(4,2)) as castDecimal", + + ",cast(castInt as DECIMAL(4,2)) as castDecimal" + + ", castTimestamp", "col1 = '2'") .addTransform( CAST_TABLEID.identifier(), @@ -694,7 +769,8 @@ void testCastTransform() throws Exception { + ",cast(castBoolean as double) as castDouble" + ",cast(castBoolean as char) as castChar" + ",cast(castBoolean as varchar) as castVarchar" - + ",cast(castBoolean as DECIMAL(4,2)) as castDecimal", + + ",cast(castBoolean as DECIMAL(4,2)) as castDecimal" + + ", castTimestamp", "col1 = '3'") .addTransform( CAST_TABLEID.identifier(), @@ -708,7 +784,8 @@ void testCastTransform() throws Exception { + ",cast(castTinyint as double) as castDouble" + ",cast(castTinyint as char) as castChar" + ",cast(castTinyint as varchar) as castVarchar" - + ",cast(castTinyint as DECIMAL(4,2)) as castDecimal", + + ",cast(castTinyint as DECIMAL(4,2)) as castDecimal" + + ", castTimestamp", "col1 = '4'") .addTransform( CAST_TABLEID.identifier(), @@ -722,7 +799,8 @@ void testCastTransform() throws Exception { + ",cast(castSmallint as double) as castDouble" + ",cast(castSmallint as char) as castChar" + ",cast(castSmallint as varchar) as castVarchar" - + ",cast(castSmallint as DECIMAL(4,2)) as castDecimal", + + ",cast(castSmallint as DECIMAL(4,2)) as castDecimal" + + ", castTimestamp", "col1 = '5'") .addTransform( CAST_TABLEID.identifier(), @@ -736,7 +814,8 @@ void testCastTransform() throws Exception { + ",cast(castBigint as double) as castDouble" + ",cast(castBigint as char) as castChar" + ",cast(castBigint as varchar) as castVarchar" - + ",cast(castBigint as DECIMAL(4,2)) as castDecimal", + + ",cast(castBigint as DECIMAL(4,2)) as castDecimal" + + ", castTimestamp", "col1 = '6'") .addTransform( CAST_TABLEID.identifier(), @@ -750,7 +829,8 @@ void testCastTransform() throws Exception { + ",cast(castFloat as double) as castDouble" + ",cast(castFloat as char) as castChar" + ",cast(castFloat as varchar) as castVarchar" - + ",cast(castFloat as DECIMAL(4,2)) as castDecimal", + + ",cast(castFloat as DECIMAL(4,2)) as castDecimal" + + ", castTimestamp", "col1 = '7'") .addTransform( CAST_TABLEID.identifier(), @@ -764,7 +844,8 @@ void testCastTransform() throws Exception { + ",cast(castDouble as double) as castDouble" + ",cast(castDouble as char) as castChar" + ",cast(castDouble as varchar) as castVarchar" - + ",cast(castDouble as DECIMAL(4,2)) as castDecimal", + + ",cast(castDouble as DECIMAL(4,2)) as castDecimal" + + ", castTimestamp", "col1 = '8'") .addTransform( CAST_TABLEID.identifier(), @@ -778,8 +859,24 @@ void testCastTransform() throws Exception { + ",cast(castDecimal as double) as castDouble" + ",cast(castDecimal as char) as castChar" + ",cast(castDecimal as varchar) as castVarchar" - + ",cast(castDecimal as DECIMAL(4,2)) as castDecimal", + + ",cast(castDecimal as DECIMAL(4,2)) as castDecimal" + + ", castTimestamp", "col1 = '9'") + .addTransform( + CAST_TABLEID.identifier(), + "col1" + + ",castInt" + + ",castBoolean" + + ",castTinyint" + + ",castSmallint" + + ",castBigint" + + ",castFloat" + + ",castDouble" + + ",castChar" + + ",cast(castTimestamp as varchar) as castVarchar" + + ",castDecimal" + + ",cast('1970-01-01T00:00:01.234' as TIMESTAMP(3)) as castTimestamp", + "col1 = '10'") .build(); EventOperatorTestHarness transformFunctionEventEventOperatorTestHarness = @@ -807,6 +904,7 @@ void testCastTransform() throws Exception { null, null, null, + null })); DataChangeEvent insertEventExpect1 = DataChangeEvent.insertEvent( @@ -824,6 +922,7 @@ void testCastTransform() throws Exception { new BinaryStringData("1"), new BinaryStringData("1"), DecimalData.fromBigDecimal(new BigDecimal(1.0), 4, 2), + null })); transform.processElement(new StreamRecord<>(createTableEvent)); Assertions.assertThat( @@ -849,6 +948,7 @@ void testCastTransform() throws Exception { null, null, null, + null })); DataChangeEvent insertEventExpect2 = DataChangeEvent.insertEvent( @@ -866,6 +966,7 @@ void testCastTransform() throws Exception { new BinaryStringData("1"), new BinaryStringData("1"), DecimalData.fromBigDecimal(new BigDecimal(1.0), 4, 2), + null })); transform.processElement(new StreamRecord<>(insertEvent2)); Assertions.assertThat( @@ -887,6 +988,7 @@ void testCastTransform() throws Exception { null, null, null, + null })); DataChangeEvent insertEventExpect3 = DataChangeEvent.insertEvent( @@ -904,6 +1006,7 @@ void testCastTransform() throws Exception { new BinaryStringData("true"), new BinaryStringData("true"), DecimalData.fromBigDecimal(new BigDecimal(1.0), 4, 2), + null })); transform.processElement(new StreamRecord<>(insertEvent3)); Assertions.assertThat( @@ -925,6 +1028,7 @@ void testCastTransform() throws Exception { null, null, null, + null })); DataChangeEvent insertEventExpect4 = DataChangeEvent.insertEvent( @@ -942,6 +1046,7 @@ void testCastTransform() throws Exception { new BinaryStringData("1"), new BinaryStringData("1"), DecimalData.fromBigDecimal(new BigDecimal(1.0), 4, 2), + null })); transform.processElement(new StreamRecord<>(insertEvent4)); Assertions.assertThat( @@ -963,6 +1068,7 @@ void testCastTransform() throws Exception { null, null, null, + null })); DataChangeEvent insertEventExpect5 = DataChangeEvent.insertEvent( @@ -980,6 +1086,7 @@ void testCastTransform() throws Exception { new BinaryStringData("1"), new BinaryStringData("1"), DecimalData.fromBigDecimal(new BigDecimal(1.0), 4, 2), + null })); transform.processElement(new StreamRecord<>(insertEvent5)); Assertions.assertThat( @@ -1001,6 +1108,7 @@ void testCastTransform() throws Exception { null, null, null, + null })); DataChangeEvent insertEventExpect6 = DataChangeEvent.insertEvent( @@ -1018,6 +1126,7 @@ void testCastTransform() throws Exception { new BinaryStringData("1"), new BinaryStringData("1"), DecimalData.fromBigDecimal(new BigDecimal(1.0), 4, 2), + null })); transform.processElement(new StreamRecord<>(insertEvent6)); Assertions.assertThat( @@ -1039,6 +1148,7 @@ void testCastTransform() throws Exception { null, null, null, + null })); DataChangeEvent insertEventExpect7 = DataChangeEvent.insertEvent( @@ -1056,6 +1166,7 @@ void testCastTransform() throws Exception { new BinaryStringData("1.0"), new BinaryStringData("1.0"), DecimalData.fromBigDecimal(new BigDecimal(1.0), 4, 2), + null })); transform.processElement(new StreamRecord<>(insertEvent7)); Assertions.assertThat( @@ -1077,6 +1188,7 @@ void testCastTransform() throws Exception { null, null, null, + null })); DataChangeEvent insertEventExpect8 = DataChangeEvent.insertEvent( @@ -1094,6 +1206,7 @@ void testCastTransform() throws Exception { new BinaryStringData("1.0"), new BinaryStringData("1.0"), DecimalData.fromBigDecimal(new BigDecimal(1.0), 4, 2), + null })); transform.processElement(new StreamRecord<>(insertEvent8)); Assertions.assertThat( @@ -1115,6 +1228,7 @@ void testCastTransform() throws Exception { null, null, DecimalData.fromBigDecimal(new BigDecimal(1.0), 4, 2), + null })); DataChangeEvent insertEventExpect9 = DataChangeEvent.insertEvent( @@ -1132,11 +1246,53 @@ void testCastTransform() throws Exception { new BinaryStringData("1.00"), new BinaryStringData("1.00"), DecimalData.fromBigDecimal(new BigDecimal(1.0), 4, 2), + null })); transform.processElement(new StreamRecord<>(insertEvent9)); Assertions.assertThat( transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll()) .isEqualTo(new StreamRecord<>(insertEventExpect9)); + + DataChangeEvent insertEvent10 = + DataChangeEvent.insertEvent( + CAST_TABLEID, + recordDataGenerator.generate( + new Object[] { + new BinaryStringData("10"), + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + TimestampData.fromMillis(1234, 0) + })); + DataChangeEvent insertEventExpect10 = + DataChangeEvent.insertEvent( + CAST_TABLEID, + recordDataGenerator.generate( + new Object[] { + new BinaryStringData("10"), + null, + null, + null, + null, + null, + null, + null, + null, + new BinaryStringData("1970-01-01T00:00:01.234"), + null, + TimestampData.fromMillis(1234, 0) + })); + transform.processElement(new StreamRecord<>(insertEvent10)); + Assertions.assertThat( + transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll()) + .isEqualTo(new StreamRecord<>(insertEventExpect10)); } @Test @@ -1155,7 +1311,8 @@ void testCastErrorTransform() throws Exception { + ",cast(castFloat as double) as castDouble" + ",cast(castFloat as char) as castChar" + ",cast(castFloat as varchar) as castVarchar" - + ",cast(castFloat as DECIMAL(4,2)) as castDecimal", + + ",cast(castFloat as DECIMAL(4,2)) as castDecimal" + + ",cast(castFloat as TIMESTAMP) as castTimestamp", "col1 = '1'") .build(); EventOperatorTestHarness @@ -1184,6 +1341,7 @@ void testCastErrorTransform() throws Exception { null, null, null, + null })); transform.processElement(new StreamRecord<>(createTableEvent)); Assertions.assertThat( @@ -1255,6 +1413,7 @@ void testBuildInFunctionTransform() throws Exception { testExpressionConditionTransform("cast(null as char) is null"); testExpressionConditionTransform("cast(null as varchar) is null"); testExpressionConditionTransform("cast(null as DECIMAL(4,2)) is null"); + testExpressionConditionTransform("cast(null as TIMESTAMP(3)) is null"); } private void testExpressionConditionTransform(String expression) throws Exception { @@ -1264,7 +1423,7 @@ private void testExpressionConditionTransform(String expression) throws Exceptio CONDITION_TABLEID.identifier(), "col1, IF(" + expression + ", true, false) as condition_result", expression) - .addTimezone("GMT") + .addTimezone("UTC") .build(); EventOperatorTestHarness transformFunctionEventEventOperatorTestHarness = diff --git a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/parser/JaninoCompilerTest.java b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/parser/JaninoCompilerTest.java index db86783d2b..6fbaef3bfe 100644 --- a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/parser/JaninoCompilerTest.java +++ b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/parser/JaninoCompilerTest.java @@ -17,8 +17,6 @@ package org.apache.flink.cdc.runtime.parser; -import org.apache.flink.cdc.common.data.TimestampData; - import org.codehaus.commons.compiler.CompileException; import org.codehaus.commons.compiler.Location; import org.codehaus.janino.ExpressionEvaluator; @@ -36,7 +34,6 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.List; -import java.util.TimeZone; /** Unit tests for the {@link JaninoCompiler}. */ public class JaninoCompilerTest { @@ -108,24 +105,6 @@ public void testJaninoStringCompare() throws InvocationTargetException { Assert.assertEquals(true, evaluate); } - @Test - public void testJaninoTimestampFunction() throws InvocationTargetException { - long epochTime = System.currentTimeMillis(); - long localTime = epochTime + TimeZone.getTimeZone("GMT-8:00").getOffset(epochTime); - String expression = "currentTimestamp(epochTime, \"GMT-8:00\")"; - List columnNames = Arrays.asList("epochTime"); - List> paramTypes = Arrays.asList(Long.class); - List params = Arrays.asList(epochTime); - ExpressionEvaluator expressionEvaluator = - JaninoCompiler.compileExpression( - JaninoCompiler.loadSystemFunction(expression), - columnNames, - paramTypes, - TimestampData.class); - Object evaluate = expressionEvaluator.evaluate(params.toArray()); - Assert.assertEquals(TimestampData.fromMillis(localTime), evaluate); - } - @Test public void testBuildInFunction() throws InvocationTargetException { String expression = "ceil(2.4)"; diff --git a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/parser/TransformParserTest.java b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/parser/TransformParserTest.java index f81c4a92c2..66a405a975 100644 --- a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/parser/TransformParserTest.java +++ b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/parser/TransformParserTest.java @@ -228,16 +228,16 @@ public void testTranslateFilterToJaninoExpression() { testFilterExpression( "id = CURRENT_DATE", "valueEquals(id, currentDate(__epoch_time__, __time_zone__))"); testFilterExpression( - "id = CURRENT_TIMESTAMP", - "valueEquals(id, currentTimestamp(__epoch_time__, __time_zone__))"); - testFilterExpression("NOW()", "now(__epoch_time__, __time_zone__)"); + "id = CURRENT_TIMESTAMP", "valueEquals(id, currentTimestamp(__epoch_time__))"); + testFilterExpression("NOW()", "now(__epoch_time__)"); testFilterExpression("YEAR(dt)", "year(dt)"); testFilterExpression("QUARTER(dt)", "quarter(dt)"); testFilterExpression("MONTH(dt)", "month(dt)"); testFilterExpression("WEEK(dt)", "week(dt)"); testFilterExpression("DATE_FORMAT(dt,'yyyy-MM-dd')", "dateFormat(dt, \"yyyy-MM-dd\")"); - testFilterExpression("TO_DATE(dt, 'yyyy-MM-dd')", "toDate(dt, \"yyyy-MM-dd\")"); - testFilterExpression("TO_TIMESTAMP(dt)", "toTimestamp(dt)"); + testFilterExpression( + "TO_DATE(dt, 'yyyy-MM-dd')", "toDate(dt, \"yyyy-MM-dd\", __time_zone__)"); + testFilterExpression("TO_TIMESTAMP(dt)", "toTimestamp(dt, __time_zone__)"); testFilterExpression("TIMESTAMP_DIFF('DAY', dt1, dt2)", "timestampDiff(\"DAY\", dt1, dt2)"); testFilterExpression("IF(a>b,a,b)", "a > b ? a : b"); testFilterExpression("NULLIF(a,b)", "nullif(a, b)"); @@ -292,6 +292,10 @@ public void testTranslateFilterToJaninoExpression() { testFilterExpression("cast(null as decimal)", "castToBigDecimal(null, 10, 0)"); testFilterExpression("cast(null as char)", "castToString(null)"); testFilterExpression("cast(null as varchar)", "castToString(null)"); + testFilterExpression( + "cast(CURRENT_TIMESTAMP as TIMESTAMP)", + "castToTimestamp(currentTimestamp(__epoch_time__), __time_zone__)"); + testFilterExpression("cast(dt as TIMESTAMP)", "castToTimestamp(dt, __time_zone__)"); } private void testFilterExpression(String expression, String expressionExpect) {