Skip to content

Commit

Permalink
[FLINK-35743][cdc-runtime] Correct the temporal function semantics
Browse files Browse the repository at this point in the history
This closes apache#3449.
  • Loading branch information
leonardBang committed Aug 5, 2024
1 parent 13f9e18 commit 77e3b05
Show file tree
Hide file tree
Showing 4 changed files with 121 additions and 64 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@
import java.math.RoundingMode;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.util.Arrays;
import java.util.Calendar;
import java.util.Date;
Expand All @@ -38,54 +41,45 @@
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import static org.apache.flink.cdc.common.utils.DateTimeUtils.timestampMillisToDate;
import static org.apache.flink.cdc.common.utils.DateTimeUtils.timestampMillisToTime;

/** System function utils to support the call of flink cdc pipeline transform. */
public class SystemFunctionUtils {
private static final Logger LOG = LoggerFactory.getLogger(SystemFunctionUtils.class);

public static int localtime(long epochTime) {
return DateTimeUtils.timestampMillisToTime(epochTime);
}

public static TimestampData localtimestamp(long epochTime) {
return TimestampData.fromMillis(epochTime);
}

// synonym: localtime
public static int currentTime(long epochTime) {
return localtime(epochTime);
}

public static int currentDate(long epochTime) {
return DateTimeUtils.timestampMillisToDate(epochTime);
}
private static final Logger LOG = LoggerFactory.getLogger(SystemFunctionUtils.class);

public static TimestampData currentTimestamp(long epochTime) {
return TimestampData.fromMillis(epochTime);
public static LocalZonedTimestampData currentTimestamp(long epochTime) {
return LocalZonedTimestampData.fromEpochMillis(epochTime);
}

// synonym with currentTimestamp
public static LocalZonedTimestampData now(long epochTime) {
return LocalZonedTimestampData.fromEpochMillis(epochTime);
}

public static String dateFormat(long epochTime, String format, String timezone) {
return DateTimeUtils.formatTimestampMillis(
epochTime, format, TimeZone.getTimeZone(timezone));
public static TimestampData localtimestamp(long epochTime, String timezone) {
return TimestampData.fromLocalDateTime(
Instant.ofEpochMilli(epochTime).atZone(ZoneId.of(timezone)).toLocalDateTime());
}

public static String dateFormat(
LocalZonedTimestampData timestamp, String format, String timezone) {
return DateTimeUtils.formatTimestampMillis(
timestamp.getEpochMillisecond(), format, TimeZone.getTimeZone(timezone));
public static int localtime(long epochTime, String timezone) {
return timestampMillisToTime(localtimestamp(epochTime, timezone).getMillisecond());
}

public static String dateFormat(TimestampData timestamp, String format, String timezone) {
return DateTimeUtils.formatTimestampMillis(
timestamp.getMillisecond(), format, TimeZone.getTimeZone(timezone));
public static int currentTime(long epochTime, String timezone) {
// the time value of currentTimestamp under given session time zone
return timestampMillisToTime(localtimestamp(epochTime, timezone).getMillisecond());
}

public static String dateFormat(ZonedTimestampData timestamp, String format, String timezone) {
public static int currentDate(long epochTime, String timezone) {
// the date value of currentTimestamp under given session time zone
return timestampMillisToDate(localtimestamp(epochTime, timezone).getMillisecond());
}

public static String dateFormat(TimestampData timestamp, String format) {
return DateTimeUtils.formatTimestampMillis(
timestamp.getMillisecond(), format, TimeZone.getTimeZone(timezone));
timestamp.getMillisecond(), format, TimeZone.getTimeZone("UTC"));
}

public static int toDate(String str, String timezone) {
Expand Down Expand Up @@ -627,6 +621,24 @@ public static BigDecimal castToBigDecimal(Object object, int precision, int scal
return bigDecimal;
}

public static TimestampData castToTimestamp(Object object, String timezone) {
if (object == null) {
return null;
}
if (object instanceof LocalZonedTimestampData) {
return TimestampData.fromLocalDateTime(
LocalDateTime.ofInstant(
((LocalZonedTimestampData) object).toInstant(), ZoneId.of(timezone)));
} else if (object instanceof ZonedTimestampData) {
return TimestampData.fromLocalDateTime(
LocalDateTime.ofInstant(
((ZonedTimestampData) object).toInstant(), ZoneId.of(timezone)));
} else {
return TimestampData.fromLocalDateTime(
LocalDateTime.parse(castObjectIntoString(object)));
}
}

private static String castObjectIntoString(Object object) {
if (object instanceof Boolean) {
return Boolean.valueOf(castToString(object)) ? "1" : "0";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,17 +49,17 @@
public class JaninoCompiler {

private static final List<SqlTypeName> SQL_TYPE_NAME_IGNORE = Arrays.asList(SqlTypeName.SYMBOL);
private static final List<String> NO_OPERAND_TIMESTAMP_FUNCTIONS =
Arrays.asList(
"LOCALTIME",
"LOCALTIMESTAMP",
"CURRENT_TIME",
"CURRENT_DATE",
"CURRENT_TIMESTAMP",
"NOW");
private static final List<String> TIMEZONE_FREE_TEMPORAL_FUNCTIONS =
Arrays.asList("CURRENT_TIMESTAMP", "NOW");

private static final List<String> TIMEZONE_TIMESTAMP_FUNCTIONS =
Arrays.asList("DATE_FORMAT", "TO_DATE", "TO_TIMESTAMP");
private static final List<String> TIMEZONE_REQUIRED_TEMPORAL_FUNCTIONS =
Arrays.asList("LOCALTIME", "LOCALTIMESTAMP", "CURRENT_TIME", "CURRENT_DATE");

private static final List<String> TIMEZONE_FREE_TEMPORAL_CONVERSION_FUNCTIONS =
Arrays.asList("DATE_FORMAT");

private static final List<String> TIMEZONE_REQUIRED_TEMPORAL_CONVERSION_FUNCTIONS =
Arrays.asList("TO_DATE", "TO_TIMESTAMP");

public static final String DEFAULT_EPOCH_TIME = "__epoch_time__";
public static final String DEFAULT_TIME_ZONE = "__time_zone__";
Expand Down Expand Up @@ -112,10 +112,14 @@ public static Java.Rvalue translateSqlNodeToJaninoRvalue(SqlNode transform) {

private static Java.Rvalue translateSqlIdentifier(SqlIdentifier sqlIdentifier) {
String columnName = sqlIdentifier.names.get(sqlIdentifier.names.size() - 1);
if (NO_OPERAND_TIMESTAMP_FUNCTIONS.contains(columnName)) {
return generateNoOperandTimestampFunctionOperation(columnName);
} else if (TIMEZONE_TIMESTAMP_FUNCTIONS.contains(columnName)) {
return generateTimezoneTimestampFunctionOperation(columnName);
if (TIMEZONE_FREE_TEMPORAL_FUNCTIONS.contains(columnName)) {
return generateTimezoneFreeTemporalFunctionOperation(columnName);
} else if (TIMEZONE_REQUIRED_TEMPORAL_FUNCTIONS.contains(columnName)) {
return generateTimezoneRequiredTemporalFunctionOperation(columnName);
} else if (TIMEZONE_FREE_TEMPORAL_CONVERSION_FUNCTIONS.contains(columnName)) {
return generateTimezoneFreeTemporalConversionFunctionOperation(columnName);
} else if (TIMEZONE_REQUIRED_TEMPORAL_CONVERSION_FUNCTIONS.contains(columnName)) {
return generateTimezoneRequiredTemporalConversionFunctionOperation(columnName);
} else {
return new Java.AmbiguousName(Location.NOWHERE, new String[] {columnName});
}
Expand All @@ -142,10 +146,14 @@ private static Java.Rvalue translateSqlBasicCall(SqlBasicCall sqlBasicCall) {
for (SqlNode sqlNode : operandList) {
translateSqlNodeToAtoms(sqlNode, atoms);
}
if (NO_OPERAND_TIMESTAMP_FUNCTIONS.contains(sqlBasicCall.getOperator().getName())) {
if (TIMEZONE_FREE_TEMPORAL_FUNCTIONS.contains(sqlBasicCall.getOperator().getName())) {
atoms.add(new Java.AmbiguousName(Location.NOWHERE, new String[] {DEFAULT_EPOCH_TIME}));
} else if (TIMEZONE_REQUIRED_TEMPORAL_FUNCTIONS.contains(
sqlBasicCall.getOperator().getName())) {
atoms.add(new Java.AmbiguousName(Location.NOWHERE, new String[] {DEFAULT_EPOCH_TIME}));
}
if (TIMEZONE_TIMESTAMP_FUNCTIONS.contains(sqlBasicCall.getOperator().getName())) {
atoms.add(new Java.AmbiguousName(Location.NOWHERE, new String[] {DEFAULT_TIME_ZONE}));
} else if (TIMEZONE_REQUIRED_TEMPORAL_CONVERSION_FUNCTIONS.contains(
sqlBasicCall.getOperator().getName())) {
atoms.add(new Java.AmbiguousName(Location.NOWHERE, new String[] {DEFAULT_TIME_ZONE}));
}
return sqlBasicCallToJaninoRvalue(sqlBasicCall, atoms.toArray(new Java.Rvalue[0]));
Expand Down Expand Up @@ -307,18 +315,42 @@ private static Java.Rvalue generateOtherFunctionOperation(
}
}

private static Java.Rvalue generateNoOperandTimestampFunctionOperation(String operationName) {
private static Java.Rvalue generateTimezoneFreeTemporalFunctionOperation(String operationName) {
List<Java.Rvalue> timestampFunctionParam = new ArrayList<>();
timestampFunctionParam.add(
new Java.AmbiguousName(Location.NOWHERE, new String[] {DEFAULT_EPOCH_TIME}));
return new Java.MethodInvocation(
Location.NOWHERE,
null,
StringUtils.convertToCamelCase(operationName),
timestampFunctionParam.toArray(new Java.Rvalue[0]));
}

private static Java.Rvalue generateTimezoneRequiredTemporalFunctionOperation(
String operationName) {
List<Java.Rvalue> timestampFunctionParam = new ArrayList<>();
timestampFunctionParam.add(
new Java.AmbiguousName(Location.NOWHERE, new String[] {DEFAULT_EPOCH_TIME}));
timestampFunctionParam.add(
new Java.AmbiguousName(Location.NOWHERE, new String[] {DEFAULT_TIME_ZONE}));
return new Java.MethodInvocation(
Location.NOWHERE,
null,
StringUtils.convertToCamelCase(operationName),
timestampFunctionParam.toArray(new Java.Rvalue[0]));
}

private static Java.Rvalue generateTimezoneTimestampFunctionOperation(String operationName) {
private static Java.Rvalue generateTimezoneFreeTemporalConversionFunctionOperation(
String operationName) {
return new Java.MethodInvocation(
Location.NOWHERE,
null,
StringUtils.convertToCamelCase(operationName),
new ArrayList<>().toArray(new Java.Rvalue[0]));
}

private static Java.Rvalue generateTimezoneRequiredTemporalConversionFunctionOperation(
String operationName) {
List<Java.Rvalue> timestampFunctionParam = new ArrayList<>();
timestampFunctionParam.add(
new Java.AmbiguousName(Location.NOWHERE, new String[] {DEFAULT_TIME_ZONE}));
Expand Down Expand Up @@ -375,6 +407,15 @@ private static Java.Rvalue generateTypeConvertMethod(
case "VARCHAR":
case "STRING":
return new Java.MethodInvocation(Location.NOWHERE, null, "castToString", atoms);
case "TIMESTAMP":
List<Java.Rvalue> timestampFunctionParam = new ArrayList<>(Arrays.asList(atoms));
timestampFunctionParam.add(
new Java.AmbiguousName(Location.NOWHERE, new String[] {DEFAULT_TIME_ZONE}));
return new Java.MethodInvocation(
Location.NOWHERE,
null,
"castToTimestamp",
timestampFunctionParam.toArray(new Java.Rvalue[0]));
default:
throw new ParseException(
"Unsupported data type cast: " + sqlDataTypeSpec.toString());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -509,9 +509,9 @@ void testTimestampTransform() throws Exception {
.addTransform(
TIMESTAMP_TABLEID.identifier(),
"col1, IF(LOCALTIME = CURRENT_TIME, 1, 0) as time_equal,"
+ " IF(DATE_FORMAT(CURRENT_TIMESTAMP, 'yyyy-MM-dd HH:mm:ss') = DATE_FORMAT(NOW(), 'yyyy-MM-dd HH:mm:ss'), 1, 0) as timestamp_equal,"
+ " IF(DATE_FORMAT(CAST(CURRENT_TIMESTAMP AS TIMESTAMP), 'yyyy-MM-dd HH:mm:ss') = DATE_FORMAT(CAST(NOW() AS TIMESTAMP), 'yyyy-MM-dd HH:mm:ss'), 1, 0) as timestamp_equal,"
+ " IF(TO_DATE(DATE_FORMAT(LOCALTIMESTAMP, 'yyyy-MM-dd')) = CURRENT_DATE, 1, 0) as date_equal",
"LOCALTIMESTAMP = CURRENT_TIMESTAMP")
"LOCALTIMESTAMP = CAST(CURRENT_TIMESTAMP AS TIMESTAMP)")
.addTimezone("UTC")
.build();
EventOperatorTestHarness<TransformDataOperator, Event>
Expand Down Expand Up @@ -553,17 +553,17 @@ void testTimestampDiffTransform() throws Exception {
TransformDataOperator.newBuilder()
.addTransform(
TIMESTAMPDIFF_TABLEID.identifier(),
"col1, TIMESTAMP_DIFF('SECOND', LOCALTIMESTAMP, CURRENT_TIMESTAMP) as second_diff,"
+ " TIMESTAMP_DIFF('MINUTE', LOCALTIMESTAMP, CURRENT_TIMESTAMP) as minute_diff,"
+ " TIMESTAMP_DIFF('HOUR', LOCALTIMESTAMP, CURRENT_TIMESTAMP) as hour_diff,"
+ " TIMESTAMP_DIFF('DAY', LOCALTIMESTAMP, CURRENT_TIMESTAMP) as day_diff",
"col1, TIMESTAMP_DIFF('SECOND', LOCALTIMESTAMP, CAST(CURRENT_TIMESTAMP AS TIMESTAMP)) as second_diff,"
+ " TIMESTAMP_DIFF('MINUTE', LOCALTIMESTAMP, CAST(CURRENT_TIMESTAMP AS TIMESTAMP)) as minute_diff,"
+ " TIMESTAMP_DIFF('HOUR', LOCALTIMESTAMP, CAST(CURRENT_TIMESTAMP AS TIMESTAMP)) as hour_diff,"
+ " TIMESTAMP_DIFF('DAY', LOCALTIMESTAMP, CAST(CURRENT_TIMESTAMP AS TIMESTAMP)) as day_diff",
"col1='1'")
.addTransform(
TIMESTAMPDIFF_TABLEID.identifier(),
"col1, TIMESTAMP_DIFF('SECOND', LOCALTIMESTAMP, NOW()) as second_diff,"
+ " TIMESTAMP_DIFF('MINUTE', LOCALTIMESTAMP, NOW()) as minute_diff,"
+ " TIMESTAMP_DIFF('HOUR', LOCALTIMESTAMP, NOW()) as hour_diff,"
+ " TIMESTAMP_DIFF('DAY', LOCALTIMESTAMP, NOW()) as day_diff",
"col1, TIMESTAMP_DIFF('SECOND', LOCALTIMESTAMP, CAST(NOW() AS TIMESTAMP)) as second_diff,"
+ " TIMESTAMP_DIFF('MINUTE', LOCALTIMESTAMP, CAST(NOW() AS TIMESTAMP)) as minute_diff,"
+ " TIMESTAMP_DIFF('HOUR', LOCALTIMESTAMP, CAST(NOW() AS TIMESTAMP)) as hour_diff,"
+ " TIMESTAMP_DIFF('DAY', LOCALTIMESTAMP, CAST(NOW() AS TIMESTAMP)) as day_diff",
"col1='2'")
.addTimezone("Asia/Shanghai")
.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -218,11 +218,15 @@ public void testTranslateFilterToJaninoExpression() {
testFilterExpression("floor(2)", "floor(2)");
testFilterExpression("round(2,2)", "round(2, 2)");
testFilterExpression("uuid()", "uuid()");
testFilterExpression("id = LOCALTIME", "valueEquals(id, localtime(__epoch_time__))");
testFilterExpression(
"id = LOCALTIMESTAMP", "valueEquals(id, localtimestamp(__epoch_time__))");
testFilterExpression("id = CURRENT_TIME", "valueEquals(id, currentTime(__epoch_time__))");
testFilterExpression("id = CURRENT_DATE", "valueEquals(id, currentDate(__epoch_time__))");
"id = LOCALTIME", "valueEquals(id, localtime(__epoch_time__, __time_zone__))");
testFilterExpression(
"id = LOCALTIMESTAMP",
"valueEquals(id, localtimestamp(__epoch_time__, __time_zone__))");
testFilterExpression(
"id = CURRENT_TIME", "valueEquals(id, currentTime(__epoch_time__, __time_zone__))");
testFilterExpression(
"id = CURRENT_DATE", "valueEquals(id, currentDate(__epoch_time__, __time_zone__))");
testFilterExpression(
"id = CURRENT_TIMESTAMP", "valueEquals(id, currentTimestamp(__epoch_time__))");
testFilterExpression("NOW()", "now(__epoch_time__)");
Expand Down

0 comments on commit 77e3b05

Please sign in to comment.