Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-36461] Fix schema evolution failure with un-transformed tables #3632

Open
wants to merge 2 commits into
base: master
Choose a base branch
from

Conversation

yuxiqian
Copy link
Contributor

@yuxiqian yuxiqian commented Oct 10, 2024

This closes FLINK-36461.

Currently, such transform configuration will fail for any schema change events from tables except foo.bar.baz:

transform:
  - source-table: foo.bar.baz
    projection: \*

The exception message is as follows:

java.lang.ArrayIndexOutOfBoundsException: -1
at java.util.ArrayList.elementData(ArrayList.java:424)
at java.util.ArrayList.get(ArrayList.java:437)
at org.apache.flink.cdc.common.utils.SchemaUtils.lambda$transformSchemaChangeEvent$11(SchemaUtils.java:371)
at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1384)
at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:566)
at org.apache.flink.cdc.common.utils.SchemaUtils.transformSchemaChangeEvent(SchemaUtils.java:386)
at org.apache.flink.cdc.runtime.operators.transform.PreTransformOperator.cacheChangeSchema(PreTransformOperator.java:272)
at org.apache.flink.cdc.runtime.operators.transform.PreTransformOperator.processElement(PreTransformOperator.java:248)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:75)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:50)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29)
at org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask$AsyncDataOutputToOutput.emitRecord(SourceOperatorStreamTask.java:310)
at org.apache.flink.streaming.api.operators.source.SourceOutputWithWatermarks.collect(SourceOutputWithWatermarks.java:110)
at org.apache.flink.streaming.api.operators.source.SourceOutputWithWatermarks.collect(SourceOutputWithWatermarks.java:101)
at org.apache.flink.api.connector.source.lib.util.IteratorSourceReaderBase.pollNext(IteratorSourceReaderBase.java:103)
at org.apache.flink.cdc.connectors.values.source.ValuesDataSource$EventIteratorReader.pollNext(ValuesDataSource.java:294)
at org.apache.flink.streaming.api.operators.SourceOperator.pollNext(SourceOperator.java:779)
at org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:457)
at org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:70)
at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:68)
at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:616)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:1071)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:1020)
at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:959)
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:938)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:751)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:567)
at java.lang.Thread.run(Thread.java:879)}}

This could be fixed by always passing latest schema as the "referenced column names" if an asterisk was used in projection expressions.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant