diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/JaninoCompiler.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/JaninoCompiler.java index d1ad505c160..06bde05d7a2 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/JaninoCompiler.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/JaninoCompiler.java @@ -316,14 +316,13 @@ private static Java.Rvalue generateOtherFunctionOperation( } private static Java.Rvalue generateTimezoneFreeTemporalFunctionOperation(String operationName) { - List timestampFunctionParam = new ArrayList<>(); - timestampFunctionParam.add( - new Java.AmbiguousName(Location.NOWHERE, new String[] {DEFAULT_EPOCH_TIME})); return new Java.MethodInvocation( Location.NOWHERE, null, StringUtils.convertToCamelCase(operationName), - timestampFunctionParam.toArray(new Java.Rvalue[0])); + new Java.Rvalue[] { + new Java.AmbiguousName(Location.NOWHERE, new String[] {DEFAULT_EPOCH_TIME}) + }); } private static Java.Rvalue generateTimezoneRequiredTemporalFunctionOperation( @@ -346,19 +345,18 @@ private static Java.Rvalue generateTimezoneFreeTemporalConversionFunctionOperati Location.NOWHERE, null, StringUtils.convertToCamelCase(operationName), - new ArrayList<>().toArray(new Java.Rvalue[0])); + new Java.Rvalue[0]); } private static Java.Rvalue generateTimezoneRequiredTemporalConversionFunctionOperation( String operationName) { - List timestampFunctionParam = new ArrayList<>(); - timestampFunctionParam.add( - new Java.AmbiguousName(Location.NOWHERE, new String[] {DEFAULT_TIME_ZONE})); return new Java.MethodInvocation( Location.NOWHERE, null, StringUtils.convertToCamelCase(operationName), - timestampFunctionParam.toArray(new Java.Rvalue[0])); + new Java.Rvalue[] { + new Java.AmbiguousName(Location.NOWHERE, new String[] {DEFAULT_TIME_ZONE}) + }); } private static Java.Rvalue generateTypeConvertMethod( @@ -415,7 +413,10 @@ private static Java.Rvalue generateTypeConvertMethod( Location.NOWHERE, null, "castToTimestamp", - timestampFunctionParam.toArray(new Java.Rvalue[0])); + new Java.Rvalue[] { + new Java.AmbiguousName( + Location.NOWHERE, new String[] {DEFAULT_TIME_ZONE}) + }); default: throw new ParseException( "Unsupported data type cast: " + sqlDataTypeSpec.toString()); diff --git a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/TransformDataOperatorTest.java b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/TransformDataOperatorTest.java index 38668bf6005..a19722ef156 100644 --- a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/TransformDataOperatorTest.java +++ b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/TransformDataOperatorTest.java @@ -129,6 +129,7 @@ public class TransformDataOperatorTest { .physicalColumn("nullChar", DataTypes.CHAR(1)) .physicalColumn("nullVarchar", DataTypes.VARCHAR(1)) .physicalColumn("nullDecimal", DataTypes.DECIMAL(4, 2)) + .physicalColumn("nullTimestamp", DataTypes.TIMESTAMP(3)) .primaryKey("col1") .build(); @@ -147,6 +148,7 @@ public class TransformDataOperatorTest { .physicalColumn("castChar", DataTypes.CHAR(1)) .physicalColumn("castVarchar", DataTypes.VARCHAR(1)) .physicalColumn("castDecimal", DataTypes.DECIMAL(4, 2)) + .physicalColumn("castTimestamp", DataTypes.TIMESTAMP(3)) .primaryKey("col1") .build(); @@ -678,7 +680,8 @@ void testNullCastTransform() throws Exception { + ",cast(colString as double) as nullDouble" + ",cast(colString as char) as nullChar" + ",cast(colString as varchar) as nullVarchar" - + ",cast(colString as DECIMAL(4,2)) as nullDecimal", + + ",cast(colString as DECIMAL(4,2)) as nullDecimal" + + ",cast(colString as TIMESTAMP(3)) as nullTimestamp", null) .build(); EventOperatorTestHarness @@ -708,6 +711,7 @@ void testNullCastTransform() throws Exception { null, null, null, + null })); transform.processElement(new StreamRecord<>(createTableEvent)); Assertions.assertThat( @@ -735,7 +739,8 @@ void testCastTransform() throws Exception { + ",cast(col1 as double) as castDouble" + ",cast(col1 as char) as castChar" + ",cast(col1 as varchar) as castVarchar" - + ",cast(col1 as DECIMAL(4,2)) as castDecimal", + + ",cast(col1 as DECIMAL(4,2)) as castDecimal" + + ", castTimestamp", "col1 = '1'") .addTransform( CAST_TABLEID.identifier(), @@ -749,7 +754,8 @@ void testCastTransform() throws Exception { + ",cast(castInt as double) as castDouble" + ",cast(castInt as char) as castChar" + ",cast(castInt as varchar) as castVarchar" - + ",cast(castInt as DECIMAL(4,2)) as castDecimal", + + ",cast(castInt as DECIMAL(4,2)) as castDecimal" + + ", castTimestamp", "col1 = '2'") .addTransform( CAST_TABLEID.identifier(), @@ -763,7 +769,8 @@ void testCastTransform() throws Exception { + ",cast(castBoolean as double) as castDouble" + ",cast(castBoolean as char) as castChar" + ",cast(castBoolean as varchar) as castVarchar" - + ",cast(castBoolean as DECIMAL(4,2)) as castDecimal", + + ",cast(castBoolean as DECIMAL(4,2)) as castDecimal" + + ", castTimestamp", "col1 = '3'") .addTransform( CAST_TABLEID.identifier(), @@ -777,7 +784,8 @@ void testCastTransform() throws Exception { + ",cast(castTinyint as double) as castDouble" + ",cast(castTinyint as char) as castChar" + ",cast(castTinyint as varchar) as castVarchar" - + ",cast(castTinyint as DECIMAL(4,2)) as castDecimal", + + ",cast(castTinyint as DECIMAL(4,2)) as castDecimal" + + ", castTimestamp", "col1 = '4'") .addTransform( CAST_TABLEID.identifier(), @@ -791,7 +799,8 @@ void testCastTransform() throws Exception { + ",cast(castSmallint as double) as castDouble" + ",cast(castSmallint as char) as castChar" + ",cast(castSmallint as varchar) as castVarchar" - + ",cast(castSmallint as DECIMAL(4,2)) as castDecimal", + + ",cast(castSmallint as DECIMAL(4,2)) as castDecimal" + + ", castTimestamp", "col1 = '5'") .addTransform( CAST_TABLEID.identifier(), @@ -805,7 +814,8 @@ void testCastTransform() throws Exception { + ",cast(castBigint as double) as castDouble" + ",cast(castBigint as char) as castChar" + ",cast(castBigint as varchar) as castVarchar" - + ",cast(castBigint as DECIMAL(4,2)) as castDecimal", + + ",cast(castBigint as DECIMAL(4,2)) as castDecimal" + + ", castTimestamp", "col1 = '6'") .addTransform( CAST_TABLEID.identifier(), @@ -819,7 +829,8 @@ void testCastTransform() throws Exception { + ",cast(castFloat as double) as castDouble" + ",cast(castFloat as char) as castChar" + ",cast(castFloat as varchar) as castVarchar" - + ",cast(castFloat as DECIMAL(4,2)) as castDecimal", + + ",cast(castFloat as DECIMAL(4,2)) as castDecimal" + + ", castTimestamp", "col1 = '7'") .addTransform( CAST_TABLEID.identifier(), @@ -833,7 +844,8 @@ void testCastTransform() throws Exception { + ",cast(castDouble as double) as castDouble" + ",cast(castDouble as char) as castChar" + ",cast(castDouble as varchar) as castVarchar" - + ",cast(castDouble as DECIMAL(4,2)) as castDecimal", + + ",cast(castDouble as DECIMAL(4,2)) as castDecimal" + + ", castTimestamp", "col1 = '8'") .addTransform( CAST_TABLEID.identifier(), @@ -847,8 +859,24 @@ void testCastTransform() throws Exception { + ",cast(castDecimal as double) as castDouble" + ",cast(castDecimal as char) as castChar" + ",cast(castDecimal as varchar) as castVarchar" - + ",cast(castDecimal as DECIMAL(4,2)) as castDecimal", + + ",cast(castDecimal as DECIMAL(4,2)) as castDecimal" + + ", castTimestamp", "col1 = '9'") + .addTransform( + CAST_TABLEID.identifier(), + "col1" + + ",castInt" + + ",castBoolean" + + ",castTinyint" + + ",castSmallint" + + ",castBigint" + + ",castFloat" + + ",castDouble" + + ",castChar" + + ",cast(castTimestamp as varchar) as castVarchar" + + ",castDecimal" + + ",cast('1970-01-01T00:00:01.234' as TIMESTAMP(3)) as castTimestamp", + "col1 = '10'") .build(); EventOperatorTestHarness transformFunctionEventEventOperatorTestHarness = @@ -876,6 +904,7 @@ void testCastTransform() throws Exception { null, null, null, + null })); DataChangeEvent insertEventExpect1 = DataChangeEvent.insertEvent( @@ -893,6 +922,7 @@ void testCastTransform() throws Exception { new BinaryStringData("1"), new BinaryStringData("1"), DecimalData.fromBigDecimal(new BigDecimal(1.0), 4, 2), + null })); transform.processElement(new StreamRecord<>(createTableEvent)); Assertions.assertThat( @@ -918,6 +948,7 @@ void testCastTransform() throws Exception { null, null, null, + null })); DataChangeEvent insertEventExpect2 = DataChangeEvent.insertEvent( @@ -935,6 +966,7 @@ void testCastTransform() throws Exception { new BinaryStringData("1"), new BinaryStringData("1"), DecimalData.fromBigDecimal(new BigDecimal(1.0), 4, 2), + null })); transform.processElement(new StreamRecord<>(insertEvent2)); Assertions.assertThat( @@ -956,6 +988,7 @@ void testCastTransform() throws Exception { null, null, null, + null })); DataChangeEvent insertEventExpect3 = DataChangeEvent.insertEvent( @@ -973,6 +1006,7 @@ void testCastTransform() throws Exception { new BinaryStringData("true"), new BinaryStringData("true"), DecimalData.fromBigDecimal(new BigDecimal(1.0), 4, 2), + null })); transform.processElement(new StreamRecord<>(insertEvent3)); Assertions.assertThat( @@ -994,6 +1028,7 @@ void testCastTransform() throws Exception { null, null, null, + null })); DataChangeEvent insertEventExpect4 = DataChangeEvent.insertEvent( @@ -1011,6 +1046,7 @@ void testCastTransform() throws Exception { new BinaryStringData("1"), new BinaryStringData("1"), DecimalData.fromBigDecimal(new BigDecimal(1.0), 4, 2), + null })); transform.processElement(new StreamRecord<>(insertEvent4)); Assertions.assertThat( @@ -1032,6 +1068,7 @@ void testCastTransform() throws Exception { null, null, null, + null })); DataChangeEvent insertEventExpect5 = DataChangeEvent.insertEvent( @@ -1049,6 +1086,7 @@ void testCastTransform() throws Exception { new BinaryStringData("1"), new BinaryStringData("1"), DecimalData.fromBigDecimal(new BigDecimal(1.0), 4, 2), + null })); transform.processElement(new StreamRecord<>(insertEvent5)); Assertions.assertThat( @@ -1070,6 +1108,7 @@ void testCastTransform() throws Exception { null, null, null, + null })); DataChangeEvent insertEventExpect6 = DataChangeEvent.insertEvent( @@ -1087,6 +1126,7 @@ void testCastTransform() throws Exception { new BinaryStringData("1"), new BinaryStringData("1"), DecimalData.fromBigDecimal(new BigDecimal(1.0), 4, 2), + null })); transform.processElement(new StreamRecord<>(insertEvent6)); Assertions.assertThat( @@ -1108,6 +1148,7 @@ void testCastTransform() throws Exception { null, null, null, + null })); DataChangeEvent insertEventExpect7 = DataChangeEvent.insertEvent( @@ -1125,6 +1166,7 @@ void testCastTransform() throws Exception { new BinaryStringData("1.0"), new BinaryStringData("1.0"), DecimalData.fromBigDecimal(new BigDecimal(1.0), 4, 2), + null })); transform.processElement(new StreamRecord<>(insertEvent7)); Assertions.assertThat( @@ -1146,6 +1188,7 @@ void testCastTransform() throws Exception { null, null, null, + null })); DataChangeEvent insertEventExpect8 = DataChangeEvent.insertEvent( @@ -1163,6 +1206,7 @@ void testCastTransform() throws Exception { new BinaryStringData("1.0"), new BinaryStringData("1.0"), DecimalData.fromBigDecimal(new BigDecimal(1.0), 4, 2), + null })); transform.processElement(new StreamRecord<>(insertEvent8)); Assertions.assertThat( @@ -1184,6 +1228,7 @@ void testCastTransform() throws Exception { null, null, DecimalData.fromBigDecimal(new BigDecimal(1.0), 4, 2), + null })); DataChangeEvent insertEventExpect9 = DataChangeEvent.insertEvent( @@ -1201,11 +1246,53 @@ void testCastTransform() throws Exception { new BinaryStringData("1.00"), new BinaryStringData("1.00"), DecimalData.fromBigDecimal(new BigDecimal(1.0), 4, 2), + null })); transform.processElement(new StreamRecord<>(insertEvent9)); Assertions.assertThat( transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll()) .isEqualTo(new StreamRecord<>(insertEventExpect9)); + + DataChangeEvent insertEvent10 = + DataChangeEvent.insertEvent( + CAST_TABLEID, + recordDataGenerator.generate( + new Object[] { + new BinaryStringData("10"), + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + TimestampData.fromMillis(1234, 0) + })); + DataChangeEvent insertEventExpect10 = + DataChangeEvent.insertEvent( + CAST_TABLEID, + recordDataGenerator.generate( + new Object[] { + new BinaryStringData("10"), + null, + null, + null, + null, + null, + null, + null, + null, + new BinaryStringData("1970-01-01T00:00:01.234"), + null, + TimestampData.fromMillis(1234, 0) + })); + transform.processElement(new StreamRecord<>(insertEvent10)); + Assertions.assertThat( + transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll()) + .isEqualTo(new StreamRecord<>(insertEventExpect10)); } @Test @@ -1224,7 +1311,8 @@ void testCastErrorTransform() throws Exception { + ",cast(castFloat as double) as castDouble" + ",cast(castFloat as char) as castChar" + ",cast(castFloat as varchar) as castVarchar" - + ",cast(castFloat as DECIMAL(4,2)) as castDecimal", + + ",cast(castFloat as DECIMAL(4,2)) as castDecimal" + + ",cast(castFloat as TIMESTAMP) as castTimestamp", "col1 = '1'") .build(); EventOperatorTestHarness @@ -1253,6 +1341,7 @@ void testCastErrorTransform() throws Exception { null, null, null, + null })); transform.processElement(new StreamRecord<>(createTableEvent)); Assertions.assertThat( @@ -1324,6 +1413,7 @@ void testBuildInFunctionTransform() throws Exception { testExpressionConditionTransform("cast(null as char) is null"); testExpressionConditionTransform("cast(null as varchar) is null"); testExpressionConditionTransform("cast(null as DECIMAL(4,2)) is null"); + testExpressionConditionTransform("cast(null as TIMESTAMP(3)) is null"); } private void testExpressionConditionTransform(String expression) throws Exception { diff --git a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/parser/TransformParserTest.java b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/parser/TransformParserTest.java index 688efecbbf5..66a405a9755 100644 --- a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/parser/TransformParserTest.java +++ b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/parser/TransformParserTest.java @@ -292,6 +292,10 @@ public void testTranslateFilterToJaninoExpression() { testFilterExpression("cast(null as decimal)", "castToBigDecimal(null, 10, 0)"); testFilterExpression("cast(null as char)", "castToString(null)"); testFilterExpression("cast(null as varchar)", "castToString(null)"); + testFilterExpression( + "cast(CURRENT_TIMESTAMP as TIMESTAMP)", + "castToTimestamp(currentTimestamp(__epoch_time__), __time_zone__)"); + testFilterExpression("cast(dt as TIMESTAMP)", "castToTimestamp(dt, __time_zone__)"); } private void testFilterExpression(String expression, String expressionExpect) {