Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-35743][cdc-runtime] Fix the time zone configuration for temporal functions is not effective #3449

Merged
merged 1 commit into from
Aug 6, 2024

Conversation

aiwenmo
Copy link
Contributor

@aiwenmo aiwenmo commented Jul 2, 2024

This closes FLINK-35743.

@aiwenmo
Copy link
Contributor Author

aiwenmo commented Jul 2, 2024

@yuxiqian PTAL

Copy link
Contributor

@yuxiqian yuxiqian left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for @aiwenmo's quick fix! Just left some minor comments about test cases.

yuxiqian

This comment was marked as duplicate.

Copy link
Contributor

@yuxiqian yuxiqian left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! Pulled, tested locally and dateFormat works as expected. cc @ruanhang1993

@yuxiqian
Copy link
Contributor

Considering this fixes timezone functions' incorrect behavior, should this be merged before 3.2 release? @leonardBang

@leonardBang
Copy link
Contributor

Considering this fixes timezone functions' incorrect behavior, should this be merged before 3.2 release? @leonardBang

+1 to merge this before 3.2 release

@yuxiqian
Copy link
Contributor

yuxiqian commented Aug 1, 2024

It's August 1st again and similar CI failure is observed on master branch (link). Verified locally that it should be fixed by this PR. Could @leonardBang help driving this?

@aiwenmo
Copy link
Contributor Author

aiwenmo commented Aug 1, 2024

thx. I will add more test.
What time was the test conducted?

@yuxiqian
Copy link
Contributor

yuxiqian commented Aug 1, 2024

Seems running tests between 00:00 ~ 08:00 UTC (08:00 ~ 16:00 UTC+08:00) on the first day of a month would trigger this problem.

@leonardBang
Copy link
Contributor

@aiwenmo @yuxiqian I append a commit to make sure our temporal function semantics align with Flink SQL, please help review my commit

@aiwenmo
Copy link
Contributor Author

aiwenmo commented Aug 5, 2024

thx. I'm reviewing it.

Copy link
Contributor

@yuxiqian yuxiqian left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for @leonardBang's great work! I just left some minor comments about argument list initialization.

@aiwenmo
Copy link
Contributor Author

aiwenmo commented Aug 6, 2024

@yuxiqian Hi. I'm busy with other things. Could you add an e2e test for the temporal function?
For example: Mysql -> Transform -> Doris/Values

projection: *,LOCALTIME as time1,CURRENT_TIME as time2,CURRENT_TIMESTAMP as timestamp1,NOW() as timestamp2,LOCALTIMESTAMP as timestamp3,CURRENT_DATE as date1

I guess the data type generated by the temporal function modified by this PR may not match the Flink type in practical use.

@yuxiqian
Copy link
Contributor

yuxiqian commented Aug 6, 2024

Sure, thanks for @aiwenmo's work so far, I'll create a patch for this.

@yuxiqian
Copy link
Contributor

yuxiqian commented Aug 6, 2024

Seems CURRENT_TIMESTAMP AS cur_ts still doesn't work since Calcite deduces its return type as TIMESTAMP instead of TIMESTAMP_LTZ, causing the following Janino exception:

Caused by: org.apache.flink.api.common.InvalidProgramException: Expression cannot be compiled. This is a bug. Please file an issue.
Expression: import static org.apache.flink.cdc.runtime.functions.SystemFunctionUtils.*;currentTimestamp(__epoch_time__)
	at org.apache.flink.cdc.runtime.operators.transform.TransformExpressionCompiler.lambda$compileExpression$0(TransformExpressionCompiler.java:63)
	at org.apache.flink.shaded.guava31.com.google.common.cache.LocalCache$LocalManualCache$1.load(LocalCache.java:4868)
	at org.apache.flink.shaded.guava31.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3533)
	at org.apache.flink.shaded.guava31.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2282)
	at org.apache.flink.shaded.guava31.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2159)
	at org.apache.flink.shaded.guava31.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2049)
	... 21 more
Caused by: org.codehaus.commons.compiler.CompileException: Line 1, Column 107: Assignment conversion not possible from type "org.apache.flink.cdc.common.data.LocalZonedTimestampData" to type "org.apache.flink.cdc.common.data.TimestampData"

        ...

Investigating this...


Update: Seems there's some conflicting function definitions among TransformSqlOperatorTable and SqlStdOperatorTable. Seems could be fixed by:

Index: flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/metadata/TransformSqlOperatorTable.java
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
===================================================================
 import org.apache.calcite.sql.SqlKind;
 import org.apache.calcite.sql.SqlOperator;
 import org.apache.calcite.sql.SqlSyntax;
+import org.apache.calcite.sql.fun.SqlCurrentDateFunction;
 import org.apache.calcite.sql.fun.SqlStdOperatorTable;
 import org.apache.calcite.sql.type.InferTypes;
 import org.apache.calcite.sql.type.OperandTypes;
@@ -86,11 +87,12 @@
                     .build();
     public static final SqlFunction LOCALTIMESTAMP =
             new BuiltInTimestampFunction("LOCALTIMESTAMP", SqlTypeName.TIMESTAMP, 3);
+    public static final SqlFunction CURRENT_TIME =
+            new BuiltInTimestampFunction("CURRENT_TIME", SqlTypeName.TIME, 0);
     public static final SqlFunction CURRENT_TIMESTAMP =
             new BuiltInTimestampFunction(
                     "CURRENT_TIMESTAMP", SqlTypeName.TIMESTAMP_WITH_LOCAL_TIME_ZONE, 3);
-    public static final SqlFunction CURRENT_DATE =
-            new BuiltInTimestampFunction("CURRENT_DATE", SqlTypeName.DATE, 0);
+    public static final SqlFunction CURRENT_DATE = new SqlCurrentDateFunction();
     public static final SqlFunction NOW =
             new BuiltInTimestampFunction("NOW", SqlTypeName.TIMESTAMP_WITH_LOCAL_TIME_ZONE, 3) {
                 @Override

and getting rid of SqlStdOperatorTable when parsing.

@yuxiqian
Copy link
Contributor

yuxiqian commented Aug 6, 2024

I've added a draft integrated test case for this at my branch here, but noticed something strange. Run FlinkPipelineTransformITCase#testTransformWithTemporalFunction reveals that:

Output for manual check:

CreateTableEvent{tableId=default_namespace.default_schema.mytable1, schema=columns={`id` INT,`name` STRING,`age` INT,`lcl_t` TIME(0),`cur_t` TIME(0),`cur_ts` TIMESTAMP_LTZ(3),`now_ts` TIMESTAMP_LTZ(3),`lcl_ts` TIMESTAMP(3),`cur_dt` DATE}, primaryKeys=id, options=()}
DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[1, Alice, 18, 6410611, 6410611, 2024-08-06T09:46:50.611, 2024-08-06T09:46:50.611, 2024-08-06T01:46:50.611, 19941], op=INSERT, meta=()}
DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[2, Bob, 20, 6410818, 6410818, 2024-08-06T09:46:50.818, 2024-08-06T09:46:50.818, 2024-08-06T01:46:50.818, 19941], op=INSERT, meta=()}
DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[2, Bob, 20, 6410819, 6410819, 2024-08-06T09:46:50.819, 2024-08-06T09:46:50.819, 2024-08-06T01:46:50.819, 19941], after=[2, Bob, 30, 6410819, 6410819, 2024-08-06T09:46:50.819, 2024-08-06T09:46:50.819, 2024-08-06T01:46:50.819, 19941], op=UPDATE, meta=()}
CreateTableEvent{tableId=default_namespace.default_schema.mytable2, schema=columns={`id` BIGINT,`name` STRING,`age` TINYINT,`description` STRING,`lcl_t` TIME(0),`cur_t` TIME(0),`cur_ts` TIMESTAMP_LTZ(3),`now_ts` TIMESTAMP_LTZ(3),`lcl_ts` TIMESTAMP(3),`cur_dt` DATE}, primaryKeys=id, options=()}
DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[], after=[3, Carol, 15, student, 6411969, 6411969, 2024-08-06T09:46:51.969, 2024-08-06T09:46:51.969, 2024-08-06T01:46:51.969, 19941], op=INSERT, meta=()}
DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[], after=[4, Derrida, 25, student, 6411972, 6411972, 2024-08-06T09:46:51.972, 2024-08-06T09:46:51.972, 2024-08-06T01:46:51.972, 19941], op=INSERT, meta=()}
DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[4, Derrida, 25, student, 6411973, 6411973, 2024-08-06T09:46:51.973, 2024-08-06T09:46:51.973, 2024-08-06T01:46:51.973, 19941], after=[], op=DELETE, meta=()}

(Pipeline local time zone was set to GMT-08:00.)

IIUC, CURRENT_TIMESTAMP, NOW(), and LOCALTIMESTAMP should prints out identical timestamp, since they either 1) ignores the time zone info or 2) converts internally to appears like local timestamp, but seems there's an 8 hour gap.

@leonardBang
Copy link
Contributor

It's expected as TIMESTAMP_LTZ type uses UTC0 timezone to get the string expression

@yuxiqian
Copy link
Contributor

yuxiqian commented Aug 6, 2024

Seems CI is failing since ValuesDatabase is trying to deserialize every RecordData as STRINGs. Changes like yuxiqian@197819f#diff-901d80a3cf5b27fbd459d425310b5f27922d1963103b0f00dc58687f0440c24c might be necessary.

Copy link
Contributor

@leonardBang leonardBang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @aiwenmo and @yuxiqian for the continuously work, looks good to me know ,wait the CI

@leonardBang leonardBang merged commit 7ca1135 into apache:master Aug 6, 2024
20 checks passed
qiaozongmi pushed a commit to qiaozongmi/flink-cdc that referenced this pull request Sep 23, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants