allCommittables = new ArrayList<>(committables);
+ committables.clear();
+ return allCommittables;
+ }
+
+ @Override
+ public void write(InputT event, Context context) throws IOException {
+ PaimonEvent paimonEvent = serializer.serialize(event);
+ Identifier tableId = paimonEvent.getTableId();
+ if (paimonEvent.isShouldRefreshSchema()) {
+ // remove the table temporarily, then add the table with latest schema when received
+ // DataChangeEvent.
+ writes.remove(tableId);
+ tables.remove(tableId);
+ }
+ if (paimonEvent.getGenericRow() != null) {
+ FileStoreTable table;
+ table = getTable(tableId);
+ if (memoryPoolFactory == null) {
+ memoryPoolFactory =
+ new MemoryPoolFactory(
+ // currently, the options of all tables are the same in CDC
+ new HeapMemorySegmentPool(
+ table.coreOptions().writeBufferSize(),
+ table.coreOptions().pageSize()));
+ }
+ StoreSinkWrite write =
+ writes.computeIfAbsent(
+ tableId,
+ id -> {
+ StoreSinkWriteImpl storeSinkWrite =
+ new StoreSinkWriteImpl(
+ table,
+ commitUser,
+ ioManager,
+ false,
+ false,
+ true,
+ memoryPoolFactory,
+ metricGroup);
+ storeSinkWrite.withCompactExecutor(compactExecutor);
+ return storeSinkWrite;
+ });
+ try {
+ write.write(paimonEvent.getGenericRow());
+ } catch (Exception e) {
+ throw new IOException(e);
+ }
+ }
+ }
+
+ private FileStoreTable getTable(Identifier tableId) {
+ FileStoreTable table =
+ tables.computeIfAbsent(
+ tableId,
+ id -> {
+ try {
+ return (FileStoreTable) catalog.getTable(tableId);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ });
+
+ if (table.bucketMode() != BucketMode.FIXED) {
+ throw new UnsupportedOperationException(
+ "Unified Sink only supports FIXED bucket mode, but is " + table.bucketMode());
+ }
+ return table;
+ }
+
+ /**
+ * Called on checkpoint or end of input so that the writer to flush all pending data for
+ * at-least-once.
+ *
+ * this method will also be called when receiving {@link FlushEvent}, but we don't need to
+ * commit the MultiTableCommittables immediately in this case, because {@link PaimonCommitter}
+ * support committing data of different schemas.
+ */
+ @Override
+ public void flush(boolean endOfInput) throws IOException {
+ for (Map.Entry entry : writes.entrySet()) {
+ Identifier key = entry.getKey();
+ StoreSinkWrite write = entry.getValue();
+ boolean waitCompaction = false;
+ // checkpointId will be updated correctly by PreCommitOperator.
+ long checkpointId = 1L;
+ committables.addAll(
+ write.prepareCommit(waitCompaction, checkpointId).stream()
+ .map(
+ committable ->
+ MultiTableCommittable.fromCommittable(key, committable))
+ .collect(Collectors.toList()));
+ }
+ }
+
+ @Override
+ public void close() throws Exception {
+ for (StoreSinkWrite write : writes.values()) {
+ write.close();
+ }
+ if (compactExecutor != null) {
+ compactExecutor.shutdownNow();
+ }
+ }
+}
diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonWriterHelper.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonWriterHelper.java
new file mode 100644
index 0000000000..a2df17455f
--- /dev/null
+++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonWriterHelper.java
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.paimon.sink.v2;
+
+import org.apache.flink.cdc.common.data.DecimalData;
+import org.apache.flink.cdc.common.data.RecordData;
+import org.apache.flink.cdc.common.event.DataChangeEvent;
+import org.apache.flink.cdc.common.schema.Column;
+import org.apache.flink.cdc.common.schema.Schema;
+import org.apache.flink.cdc.common.types.DataType;
+import org.apache.flink.cdc.common.types.DataTypeChecks;
+
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.Decimal;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.data.Timestamp;
+import org.apache.paimon.types.RowKind;
+
+import java.time.ZoneId;
+import java.time.ZonedDateTime;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.apache.flink.cdc.common.types.DataTypeChecks.getFieldCount;
+
+/** A helper class for {@link PaimonWriter} to create FieldGetter and GenericRow. */
+public class PaimonWriterHelper {
+
+ /** create a list of {@link RecordData.FieldGetter} for {@link PaimonWriter}. */
+ public static List createFieldGetters(Schema schema, ZoneId zoneId) {
+ List columns = schema.getColumns();
+ List fieldGetters = new ArrayList<>(columns.size());
+ for (int i = 0; i < columns.size(); i++) {
+ fieldGetters.add(createFieldGetter(columns.get(i).getType(), i, zoneId));
+ }
+ return fieldGetters;
+ }
+
+ private static RecordData.FieldGetter createFieldGetter(
+ DataType fieldType, int fieldPos, ZoneId zoneId) {
+ final RecordData.FieldGetter fieldGetter;
+ // ordered by type root definition
+ switch (fieldType.getTypeRoot()) {
+ case CHAR:
+ case VARCHAR:
+ fieldGetter = row -> BinaryString.fromString(row.getString(fieldPos).toString());
+ break;
+ case BOOLEAN:
+ fieldGetter = row -> row.getBoolean(fieldPos);
+ break;
+ case BINARY:
+ case VARBINARY:
+ fieldGetter = row -> row.getBinary(fieldPos);
+ break;
+ case DECIMAL:
+ final int decimalPrecision = DataTypeChecks.getPrecision(fieldType);
+ final int decimalScale = DataTypeChecks.getScale(fieldType);
+ fieldGetter =
+ row -> {
+ DecimalData decimalData =
+ row.getDecimal(fieldPos, decimalPrecision, decimalScale);
+ return Decimal.fromBigDecimal(
+ decimalData.toBigDecimal(), decimalPrecision, decimalScale);
+ };
+ break;
+ case TINYINT:
+ fieldGetter = row -> row.getByte(fieldPos);
+ break;
+ case SMALLINT:
+ fieldGetter = row -> row.getShort(fieldPos);
+ break;
+ case BIGINT:
+ fieldGetter = row -> row.getLong(fieldPos);
+ break;
+ case FLOAT:
+ fieldGetter = row -> row.getFloat(fieldPos);
+ break;
+ case DOUBLE:
+ fieldGetter = row -> row.getDouble(fieldPos);
+ break;
+ case INTEGER:
+ case DATE:
+ case TIME_WITHOUT_TIME_ZONE:
+ fieldGetter = row -> row.getInt(fieldPos);
+ break;
+ case TIMESTAMP_WITHOUT_TIME_ZONE:
+ fieldGetter =
+ row ->
+ Timestamp.fromSQLTimestamp(
+ row.getTimestamp(
+ fieldPos,
+ DataTypeChecks.getPrecision(fieldType))
+ .toTimestamp());
+ break;
+ case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
+ case TIMESTAMP_WITH_TIME_ZONE:
+ fieldGetter =
+ row ->
+ Timestamp.fromLocalDateTime(
+ ZonedDateTime.ofInstant(
+ row.getLocalZonedTimestampData(
+ fieldPos,
+ DataTypeChecks.getPrecision(
+ fieldType))
+ .toInstant(),
+ zoneId)
+ .toLocalDateTime());
+ break;
+ case ROW:
+ final int rowFieldCount = getFieldCount(fieldType);
+ fieldGetter = row -> row.getRow(fieldPos, rowFieldCount);
+ break;
+ default:
+ throw new IllegalArgumentException(
+ "don't support type of " + fieldType.getTypeRoot());
+ }
+ if (!fieldType.isNullable()) {
+ return fieldGetter;
+ }
+ return row -> {
+ if (row.isNullAt(fieldPos)) {
+ return null;
+ }
+ return fieldGetter.getFieldOrNull(row);
+ };
+ }
+
+ /** create a {@link GenericRow} from a {@link DataChangeEvent} for {@link PaimonWriter}. */
+ public static GenericRow convertEventToGenericRow(
+ DataChangeEvent dataChangeEvent, List fieldGetters) {
+ GenericRow genericRow;
+ RecordData recordData;
+ switch (dataChangeEvent.op()) {
+ case INSERT:
+ case UPDATE:
+ case REPLACE:
+ {
+ recordData = dataChangeEvent.after();
+ genericRow = new GenericRow(RowKind.INSERT, recordData.getArity());
+ break;
+ }
+ case DELETE:
+ {
+ recordData = dataChangeEvent.before();
+ genericRow = new GenericRow(RowKind.DELETE, recordData.getArity());
+ break;
+ }
+ default:
+ throw new IllegalArgumentException("don't support type of " + dataChangeEvent.op());
+ }
+ for (int i = 0; i < recordData.getArity(); i++) {
+ genericRow.setField(i, fieldGetters.get(i).getFieldOrNull(recordData));
+ }
+ return genericRow;
+ }
+}
diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PreCommitOperator.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PreCommitOperator.java
new file mode 100644
index 0000000000..07a7e0cd74
--- /dev/null
+++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PreCommitOperator.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.paimon.sink.v2;
+
+import org.apache.flink.streaming.api.connector.sink2.CommittableMessage;
+import org.apache.flink.streaming.api.connector.sink2.CommittableSummary;
+import org.apache.flink.streaming.api.connector.sink2.CommittableWithLineage;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
+import org.apache.paimon.flink.sink.MultiTableCommittable;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/** An Operator to add checkpointId to MultiTableCommittable and generate CommittableSummary. */
+public class PreCommitOperator
+ extends AbstractStreamOperator>
+ implements OneInputStreamOperator<
+ CommittableMessage,
+ CommittableMessage> {
+
+ /** store a list of MultiTableCommittable in one checkpoint. */
+ private final List results;
+
+ public PreCommitOperator() {
+ results = new ArrayList<>();
+ }
+
+ @Override
+ public void open() throws Exception {
+ super.open();
+ }
+
+ @Override
+ public void processElement(StreamRecord> element) {
+ if (element.getValue() instanceof CommittableWithLineage) {
+ results.add(
+ ((CommittableWithLineage) element.getValue())
+ .getCommittable());
+ }
+ }
+
+ @Override
+ public void finish() {
+ prepareSnapshotPreBarrier(Long.MAX_VALUE);
+ }
+
+ @Override
+ public void prepareSnapshotPreBarrier(long checkpointId) {
+ // CommittableSummary should be sent before all CommittableWithLineage.
+ CommittableMessage summary =
+ new CommittableSummary<>(
+ getRuntimeContext().getIndexOfThisSubtask(),
+ getRuntimeContext().getNumberOfParallelSubtasks(),
+ checkpointId,
+ results.size(),
+ results.size(),
+ 0);
+ output.collect(new StreamRecord<>(summary));
+
+ results.forEach(
+ committable -> {
+ // update the right checkpointId for MultiTableCommittable
+ MultiTableCommittable committableWithCheckPointId =
+ new MultiTableCommittable(
+ committable.getDatabase(),
+ committable.getTable(),
+ checkpointId,
+ committable.kind(),
+ committable.wrappedCommittable());
+ CommittableMessage message =
+ new CommittableWithLineage<>(
+ committableWithCheckPointId,
+ checkpointId,
+ getRuntimeContext().getIndexOfThisSubtask());
+ output.collect(new StreamRecord<>(message));
+ });
+ results.clear();
+ }
+}
diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/StoreSinkWriteImpl.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/StoreSinkWriteImpl.java
new file mode 100644
index 0000000000..3d49086b2d
--- /dev/null
+++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/StoreSinkWriteImpl.java
@@ -0,0 +1,223 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.paimon.sink.v2;
+
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.state.StateInitializationContext;
+
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.disk.IOManagerImpl;
+import org.apache.paimon.flink.metrics.FlinkMetricRegistry;
+import org.apache.paimon.flink.sink.Committable;
+import org.apache.paimon.flink.sink.StoreSinkWrite;
+import org.apache.paimon.flink.sink.StoreSinkWriteState;
+import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.memory.HeapMemorySegmentPool;
+import org.apache.paimon.memory.MemoryPoolFactory;
+import org.apache.paimon.memory.MemorySegmentPool;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.sink.CommitMessage;
+import org.apache.paimon.table.sink.SinkRecord;
+import org.apache.paimon.table.sink.TableWriteImpl;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+
+import static org.apache.paimon.utils.Preconditions.checkArgument;
+
+/**
+ * copy from {@link org.apache.paimon.flink.sink.StoreSinkWriteImpl}. remove {@link
+ * StoreSinkWriteState} because we can't create it using {@link StateInitializationContext}.
+ */
+public class StoreSinkWriteImpl implements StoreSinkWrite {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(org.apache.paimon.flink.sink.StoreSinkWriteImpl.class);
+
+ protected final String commitUser;
+ private final IOManagerImpl paimonIOManager;
+ private final boolean ignorePreviousFiles;
+ private final boolean waitCompaction;
+ private final boolean isStreamingMode;
+ @Nullable private final MemorySegmentPool memoryPool;
+ @Nullable private final MemoryPoolFactory memoryPoolFactory;
+
+ protected TableWriteImpl> write;
+
+ @Nullable private final MetricGroup metricGroup;
+
+ public StoreSinkWriteImpl(
+ FileStoreTable table,
+ String commitUser,
+ IOManager ioManager,
+ boolean ignorePreviousFiles,
+ boolean waitCompaction,
+ boolean isStreamingMode,
+ MemoryPoolFactory memoryPoolFactory,
+ @Nullable MetricGroup metricGroup) {
+ this(
+ table,
+ commitUser,
+ ioManager,
+ ignorePreviousFiles,
+ waitCompaction,
+ isStreamingMode,
+ null,
+ memoryPoolFactory,
+ metricGroup);
+ }
+
+ private StoreSinkWriteImpl(
+ FileStoreTable table,
+ String commitUser,
+ IOManager ioManager,
+ boolean ignorePreviousFiles,
+ boolean waitCompaction,
+ boolean isStreamingMode,
+ @Nullable MemorySegmentPool memoryPool,
+ @Nullable MemoryPoolFactory memoryPoolFactory,
+ @Nullable MetricGroup metricGroup) {
+ this.commitUser = commitUser;
+ this.paimonIOManager = new IOManagerImpl(ioManager.getSpillingDirectoriesPaths());
+ this.ignorePreviousFiles = ignorePreviousFiles;
+ this.waitCompaction = waitCompaction;
+ this.isStreamingMode = isStreamingMode;
+ this.memoryPool = memoryPool;
+ this.memoryPoolFactory = memoryPoolFactory;
+ this.metricGroup = metricGroup;
+ this.write = newTableWrite(table);
+ }
+
+ private TableWriteImpl> newTableWrite(FileStoreTable table) {
+ checkArgument(
+ !(memoryPool != null && memoryPoolFactory != null),
+ "memoryPool and memoryPoolFactory cannot be set at the same time.");
+
+ TableWriteImpl> tableWrite =
+ table.newWrite(commitUser, (part, bucket) -> true)
+ .withIOManager(paimonIOManager)
+ .withIgnorePreviousFiles(ignorePreviousFiles);
+
+ if (metricGroup != null) {
+ tableWrite.withMetricRegistry(new FlinkMetricRegistry(metricGroup));
+ }
+
+ if (memoryPoolFactory != null) {
+ return tableWrite.withMemoryPoolFactory(memoryPoolFactory);
+ } else {
+ return tableWrite.withMemoryPool(
+ memoryPool != null
+ ? memoryPool
+ : new HeapMemorySegmentPool(
+ table.coreOptions().writeBufferSize(),
+ table.coreOptions().pageSize()));
+ }
+ }
+
+ public void withCompactExecutor(ExecutorService compactExecutor) {
+ write.withCompactExecutor(compactExecutor);
+ }
+
+ @Override
+ public SinkRecord write(InternalRow internalRow) throws Exception {
+ return write.writeAndReturn(internalRow);
+ }
+
+ @Override
+ public SinkRecord write(InternalRow internalRow, int i) throws Exception {
+ return write.writeAndReturn(internalRow);
+ }
+
+ @Override
+ public SinkRecord toLogRecord(SinkRecord record) {
+ return write.toLogRecord(record);
+ }
+
+ @Override
+ public void compact(BinaryRow partition, int bucket, boolean fullCompaction) throws Exception {
+ write.compact(partition, bucket, fullCompaction);
+ }
+
+ @Override
+ public void notifyNewFiles(
+ long snapshotId, BinaryRow partition, int bucket, List files) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(
+ "Receive {} new files from snapshot {}, partition {}, bucket {}",
+ files.size(),
+ snapshotId,
+ partition,
+ bucket);
+ }
+ write.notifyNewFiles(snapshotId, partition, bucket, files);
+ }
+
+ @Override
+ public List prepareCommit(boolean waitCompaction, long checkpointId)
+ throws IOException {
+ List committables = new ArrayList<>();
+ if (write != null) {
+ try {
+ for (CommitMessage committable :
+ write.prepareCommit(this.waitCompaction || waitCompaction, checkpointId)) {
+ committables.add(
+ new Committable(checkpointId, Committable.Kind.FILE, committable));
+ }
+ } catch (Exception e) {
+ throw new IOException(e);
+ }
+ }
+ return committables;
+ }
+
+ @Override
+ public void snapshotState() {
+ // do nothing
+ }
+
+ @Override
+ public boolean streamingMode() {
+ return isStreamingMode;
+ }
+
+ @Override
+ public void close() throws Exception {
+ if (write != null) {
+ write.close();
+ }
+
+ paimonIOManager.close();
+ }
+
+ @Override
+ public void replace(FileStoreTable newTable) throws Exception {
+ if (commitUser == null) {
+ return;
+ }
+ write.close();
+ write = newTableWrite(newTable);
+ }
+}
diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/TableSchemaInfo.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/TableSchemaInfo.java
new file mode 100644
index 0000000000..26b4a1b125
--- /dev/null
+++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/TableSchemaInfo.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.paimon.sink.v2;
+
+import org.apache.flink.cdc.common.data.RecordData;
+import org.apache.flink.cdc.common.schema.Schema;
+
+import java.time.ZoneId;
+import java.util.List;
+
+/** Keep a list of {@link RecordData.FieldGetter} for a specific {@link Schema}. */
+public class TableSchemaInfo {
+
+ private final Schema schema;
+
+ private final List fieldGetters;
+
+ public TableSchemaInfo(Schema schema, ZoneId zoneId) {
+ this.schema = schema;
+ this.fieldGetters = PaimonWriterHelper.createFieldGetters(schema, zoneId);
+ }
+
+ public Schema getSchema() {
+ return schema;
+ }
+
+ public List getFieldGetters() {
+ return fieldGetters;
+ }
+}
diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/resources/META-INF/services/org.apache.flink.cdc.common.factories.Factory b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/resources/META-INF/services/org.apache.flink.cdc.common.factories.Factory
new file mode 100644
index 0000000000..6ad058535b
--- /dev/null
+++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/resources/META-INF/services/org.apache.flink.cdc.common.factories.Factory
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.flink.cdc.connectors.paimon.sink.PaimonDataSinkFactory
diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonDataSinkFactoryTest.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonDataSinkFactoryTest.java
new file mode 100644
index 0000000000..f2ab4a51bb
--- /dev/null
+++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonDataSinkFactoryTest.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.paimon.sink;
+
+import org.apache.flink.cdc.common.configuration.Configuration;
+import org.apache.flink.cdc.common.factories.DataSinkFactory;
+import org.apache.flink.cdc.common.factories.FactoryHelper;
+import org.apache.flink.cdc.common.sink.DataSink;
+import org.apache.flink.cdc.composer.utils.FactoryDiscoveryUtils;
+
+import org.apache.flink.shaded.guava31.com.google.common.collect.ImmutableMap;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.File;
+import java.util.UUID;
+
+/** Tests for {@link PaimonDataSinkFactory}. */
+public class PaimonDataSinkFactoryTest {
+
+ @TempDir public static java.nio.file.Path temporaryFolder;
+
+ @Test
+ public void testCreateDataSink() {
+ DataSinkFactory sinkFactory =
+ FactoryDiscoveryUtils.getFactoryByIdentifier("paimon", DataSinkFactory.class);
+ Assertions.assertInstanceOf(PaimonDataSinkFactory.class, sinkFactory);
+
+ Configuration conf =
+ Configuration.fromMap(
+ ImmutableMap.builder()
+ .put(PaimonDataSinkOptions.METASTORE.key(), "filesystem")
+ .put(
+ PaimonDataSinkOptions.WAREHOUSE.key(),
+ new File(
+ temporaryFolder.toFile(),
+ UUID.randomUUID().toString())
+ .toString())
+ .build());
+ DataSink dataSink =
+ sinkFactory.createDataSink(
+ new FactoryHelper.DefaultContext(
+ conf, conf, Thread.currentThread().getContextClassLoader()));
+ Assertions.assertInstanceOf(PaimonDataSink.class, dataSink);
+ }
+}
diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonMetadataApplierTest.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonMetadataApplierTest.java
new file mode 100644
index 0000000000..dc24ff37ff
--- /dev/null
+++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonMetadataApplierTest.java
@@ -0,0 +1,336 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.paimon.sink;
+
+import org.apache.flink.cdc.common.event.AddColumnEvent;
+import org.apache.flink.cdc.common.event.AlterColumnTypeEvent;
+import org.apache.flink.cdc.common.event.CreateTableEvent;
+import org.apache.flink.cdc.common.event.DropColumnEvent;
+import org.apache.flink.cdc.common.event.RenameColumnEvent;
+import org.apache.flink.cdc.common.event.TableId;
+import org.apache.flink.cdc.common.schema.Column;
+import org.apache.flink.cdc.common.sink.MetadataApplier;
+import org.apache.flink.cdc.common.types.DataType;
+
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.flink.FlinkCatalogFactory;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.table.Table;
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.RowType;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.io.TempDir;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.UUID;
+
+/** Tests for {@link PaimonMetadataApplier}. */
+public class PaimonMetadataApplierTest {
+
+ @TempDir public static java.nio.file.Path temporaryFolder;
+
+ private Catalog catalog;
+
+ private Options catalogOptions;
+
+ public static final String TEST_DATABASE = "test";
+
+ private static final String HADOOP_CONF_DIR =
+ Objects.requireNonNull(
+ Thread.currentThread()
+ .getContextClassLoader()
+ .getResource("hadoop-conf-dir"))
+ .getPath();
+
+ private static final String HIVE_CONF_DIR =
+ Objects.requireNonNull(
+ Thread.currentThread()
+ .getContextClassLoader()
+ .getResource("hive-conf-dir"))
+ .getPath();
+
+ private void initialize(String metastore)
+ throws Catalog.DatabaseNotEmptyException, Catalog.DatabaseNotExistException {
+ catalogOptions = new Options();
+ String warehouse =
+ new File(temporaryFolder.toFile(), UUID.randomUUID().toString()).toString();
+ if ("hive".equals(metastore)) {
+ catalogOptions.setString("hadoop-conf-dir", HADOOP_CONF_DIR);
+ catalogOptions.setString("hive-conf-dir", HIVE_CONF_DIR);
+ }
+ catalogOptions.setString("metastore", metastore);
+ catalogOptions.setString("warehouse", warehouse);
+ this.catalog = FlinkCatalogFactory.createPaimonCatalog(catalogOptions);
+ this.catalog.dropDatabase(TEST_DATABASE, true, true);
+ }
+
+ @ParameterizedTest
+ @ValueSource(strings = {"filesystem", "hive"})
+ public void testApplySchemaChange(String metastore)
+ throws Catalog.TableNotExistException, Catalog.DatabaseNotEmptyException,
+ Catalog.DatabaseNotExistException {
+ initialize(metastore);
+ MetadataApplier metadataApplier = new PaimonMetadataApplier(catalogOptions);
+ CreateTableEvent createTableEvent =
+ new CreateTableEvent(
+ TableId.parse("test.table1"),
+ org.apache.flink.cdc.common.schema.Schema.newBuilder()
+ .physicalColumn(
+ "col1",
+ org.apache.flink.cdc.common.types.DataTypes.STRING()
+ .notNull())
+ .physicalColumn(
+ "col2", org.apache.flink.cdc.common.types.DataTypes.INT())
+ .primaryKey("col1")
+ .build());
+ metadataApplier.applySchemaChange(createTableEvent);
+ RowType tableSchema =
+ new RowType(
+ Arrays.asList(
+ new DataField(0, "col1", DataTypes.STRING().notNull()),
+ new DataField(1, "col2", DataTypes.INT())));
+ Assertions.assertEquals(
+ tableSchema, catalog.getTable(Identifier.fromString("test.table1")).rowType());
+
+ List addedColumns = new ArrayList<>();
+ addedColumns.add(
+ new AddColumnEvent.ColumnWithPosition(
+ Column.physicalColumn(
+ "col3", org.apache.flink.cdc.common.types.DataTypes.STRING())));
+ AddColumnEvent addColumnEvent =
+ new AddColumnEvent(TableId.parse("test.table1"), addedColumns);
+ metadataApplier.applySchemaChange(addColumnEvent);
+ tableSchema =
+ new RowType(
+ Arrays.asList(
+ new DataField(0, "col1", DataTypes.STRING().notNull()),
+ new DataField(1, "col2", DataTypes.INT()),
+ new DataField(2, "col3", DataTypes.STRING())));
+ Assertions.assertEquals(
+ tableSchema, catalog.getTable(Identifier.fromString("test.table1")).rowType());
+
+ Map nameMapping = new HashMap<>();
+ nameMapping.put("col2", "newcol2");
+ nameMapping.put("col3", "newcol3");
+ RenameColumnEvent renameColumnEvent =
+ new RenameColumnEvent(TableId.parse("test.table1"), nameMapping);
+ metadataApplier.applySchemaChange(renameColumnEvent);
+ tableSchema =
+ new RowType(
+ Arrays.asList(
+ new DataField(0, "col1", DataTypes.STRING().notNull()),
+ new DataField(1, "newcol2", DataTypes.INT()),
+ new DataField(2, "newcol3", DataTypes.STRING())));
+ Assertions.assertEquals(
+ tableSchema, catalog.getTable(Identifier.fromString("test.table1")).rowType());
+
+ Map typeMapping = new HashMap<>();
+ typeMapping.put("newcol2", org.apache.flink.cdc.common.types.DataTypes.STRING());
+ AlterColumnTypeEvent alterColumnTypeEvent =
+ new AlterColumnTypeEvent(TableId.parse("test.table1"), typeMapping);
+ metadataApplier.applySchemaChange(alterColumnTypeEvent);
+ tableSchema =
+ new RowType(
+ Arrays.asList(
+ new DataField(0, "col1", DataTypes.STRING().notNull()),
+ new DataField(1, "newcol2", DataTypes.STRING()),
+ new DataField(2, "newcol3", DataTypes.STRING())));
+ Assertions.assertEquals(
+ tableSchema, catalog.getTable(Identifier.fromString("test.table1")).rowType());
+
+ DropColumnEvent dropColumnEvent =
+ new DropColumnEvent(
+ TableId.parse("test.table1"), Collections.singletonList("newcol2"));
+ metadataApplier.applySchemaChange(dropColumnEvent);
+ // id of DataField should keep the same as before dropping column
+ tableSchema =
+ new RowType(
+ Arrays.asList(
+ new DataField(0, "col1", DataTypes.STRING().notNull()),
+ new DataField(2, "newcol3", DataTypes.STRING())));
+ Assertions.assertEquals(
+ tableSchema, catalog.getTable(Identifier.fromString("test.table1")).rowType());
+ }
+
+ @ParameterizedTest
+ @ValueSource(strings = {"filesystem", "hive"})
+ public void testCreateTableWithOptions(String metastore)
+ throws Catalog.TableNotExistException, Catalog.DatabaseNotEmptyException,
+ Catalog.DatabaseNotExistException {
+ initialize(metastore);
+ Map tableOptions = new HashMap<>();
+ tableOptions.put("bucket", "-1");
+ Map> partitionMaps = new HashMap<>();
+ partitionMaps.put(TableId.parse("test.table1"), Arrays.asList("col3", "col4"));
+ MetadataApplier metadataApplier =
+ new PaimonMetadataApplier(catalogOptions, tableOptions, partitionMaps);
+ CreateTableEvent createTableEvent =
+ new CreateTableEvent(
+ TableId.parse("test.table1"),
+ org.apache.flink.cdc.common.schema.Schema.newBuilder()
+ .physicalColumn(
+ "col1",
+ org.apache.flink.cdc.common.types.DataTypes.STRING()
+ .notNull())
+ .physicalColumn(
+ "col2",
+ org.apache.flink.cdc.common.types.DataTypes.STRING())
+ .physicalColumn(
+ "col3",
+ org.apache.flink.cdc.common.types.DataTypes.STRING())
+ .physicalColumn(
+ "col4",
+ org.apache.flink.cdc.common.types.DataTypes.STRING())
+ .primaryKey("col1")
+ .build());
+ metadataApplier.applySchemaChange(createTableEvent);
+ Table table = catalog.getTable(Identifier.fromString("test.table1"));
+ RowType tableSchema =
+ new RowType(
+ Arrays.asList(
+ new DataField(0, "col1", DataTypes.STRING().notNull()),
+ new DataField(1, "col2", DataTypes.STRING()),
+ new DataField(2, "col3", DataTypes.STRING()),
+ new DataField(3, "col4", DataTypes.STRING())));
+ Assertions.assertEquals(tableSchema, table.rowType());
+ Assertions.assertEquals(Collections.singletonList("col1"), table.primaryKeys());
+ Assertions.assertEquals(Arrays.asList("col3", "col4"), table.partitionKeys());
+ Assertions.assertEquals("-1", table.options().get("bucket"));
+ }
+
+ @ParameterizedTest
+ @ValueSource(strings = {"filesystem", "hive"})
+ public void testCreateTableWithAllDataTypes(String metastore)
+ throws Catalog.TableNotExistException, Catalog.DatabaseNotEmptyException,
+ Catalog.DatabaseNotExistException {
+ initialize(metastore);
+ MetadataApplier metadataApplier = new PaimonMetadataApplier(catalogOptions);
+ CreateTableEvent createTableEvent =
+ new CreateTableEvent(
+ TableId.parse("test.table1"),
+ org.apache.flink.cdc.common.schema.Schema.newBuilder()
+ .physicalColumn(
+ "col1",
+ org.apache.flink.cdc.common.types.DataTypes.STRING()
+ .notNull())
+ .physicalColumn(
+ "boolean",
+ org.apache.flink.cdc.common.types.DataTypes.BOOLEAN())
+ .physicalColumn(
+ "binary",
+ org.apache.flink.cdc.common.types.DataTypes.BINARY(3))
+ .physicalColumn(
+ "varbinary",
+ org.apache.flink.cdc.common.types.DataTypes.VARBINARY(10))
+ .physicalColumn(
+ "bytes",
+ org.apache.flink.cdc.common.types.DataTypes.BYTES())
+ .physicalColumn(
+ "tinyint",
+ org.apache.flink.cdc.common.types.DataTypes.TINYINT())
+ .physicalColumn(
+ "smallint",
+ org.apache.flink.cdc.common.types.DataTypes.SMALLINT())
+ .physicalColumn(
+ "int", org.apache.flink.cdc.common.types.DataTypes.INT())
+ .physicalColumn(
+ "float",
+ org.apache.flink.cdc.common.types.DataTypes.FLOAT())
+ .physicalColumn(
+ "double",
+ org.apache.flink.cdc.common.types.DataTypes.DOUBLE())
+ .physicalColumn(
+ "decimal",
+ org.apache.flink.cdc.common.types.DataTypes.DECIMAL(6, 3))
+ .physicalColumn(
+ "char", org.apache.flink.cdc.common.types.DataTypes.CHAR(5))
+ .physicalColumn(
+ "varchar",
+ org.apache.flink.cdc.common.types.DataTypes.VARCHAR(10))
+ .physicalColumn(
+ "string",
+ org.apache.flink.cdc.common.types.DataTypes.STRING())
+ .physicalColumn(
+ "date", org.apache.flink.cdc.common.types.DataTypes.DATE())
+ .physicalColumn(
+ "time", org.apache.flink.cdc.common.types.DataTypes.TIME())
+ .physicalColumn(
+ "time_with_precision",
+ org.apache.flink.cdc.common.types.DataTypes.TIME(6))
+ .physicalColumn(
+ "timestamp",
+ org.apache.flink.cdc.common.types.DataTypes.TIMESTAMP())
+ .physicalColumn(
+ "timestamp_with_precision",
+ org.apache.flink.cdc.common.types.DataTypes.TIMESTAMP(3))
+ .physicalColumn(
+ "timestamp_ltz",
+ org.apache.flink.cdc.common.types.DataTypes.TIMESTAMP_LTZ())
+ .physicalColumn(
+ "timestamp_ltz_with_precision",
+ org.apache.flink.cdc.common.types.DataTypes.TIMESTAMP_LTZ(
+ 3))
+ .primaryKey("col1")
+ .build());
+ metadataApplier.applySchemaChange(createTableEvent);
+ RowType tableSchema =
+ new RowType(
+ Arrays.asList(
+ new DataField(0, "col1", DataTypes.STRING().notNull()),
+ new DataField(1, "boolean", DataTypes.BOOLEAN()),
+ new DataField(2, "binary", DataTypes.BINARY(3)),
+ new DataField(3, "varbinary", DataTypes.VARBINARY(10)),
+ new DataField(4, "bytes", DataTypes.BYTES()),
+ new DataField(5, "tinyint", DataTypes.TINYINT()),
+ new DataField(6, "smallint", DataTypes.SMALLINT()),
+ new DataField(7, "int", DataTypes.INT()),
+ new DataField(8, "float", DataTypes.FLOAT()),
+ new DataField(9, "double", DataTypes.DOUBLE()),
+ new DataField(10, "decimal", DataTypes.DECIMAL(6, 3)),
+ new DataField(11, "char", DataTypes.CHAR(5)),
+ new DataField(12, "varchar", DataTypes.VARCHAR(10)),
+ new DataField(13, "string", DataTypes.STRING()),
+ new DataField(14, "date", DataTypes.DATE()),
+ new DataField(15, "time", DataTypes.TIME(0)),
+ new DataField(16, "time_with_precision", DataTypes.TIME(6)),
+ new DataField(17, "timestamp", DataTypes.TIMESTAMP(6)),
+ new DataField(
+ 18, "timestamp_with_precision", DataTypes.TIMESTAMP(3)),
+ new DataField(
+ 19,
+ "timestamp_ltz",
+ DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(6)),
+ new DataField(
+ 20,
+ "timestamp_ltz_with_precision",
+ DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(3))));
+ Assertions.assertEquals(
+ tableSchema, catalog.getTable(Identifier.fromString("test.table1")).rowType());
+ }
+}
diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonSinkITCase.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonSinkITCase.java
new file mode 100644
index 0000000000..edc785e891
--- /dev/null
+++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonSinkITCase.java
@@ -0,0 +1,519 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.paimon.sink.v2;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.operators.MailboxExecutor;
+import org.apache.flink.api.common.operators.ProcessingTimeService;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.connector.sink2.Committer;
+import org.apache.flink.api.connector.sink2.Sink;
+import org.apache.flink.cdc.common.data.binary.BinaryStringData;
+import org.apache.flink.cdc.common.event.AddColumnEvent;
+import org.apache.flink.cdc.common.event.CreateTableEvent;
+import org.apache.flink.cdc.common.event.DataChangeEvent;
+import org.apache.flink.cdc.common.event.DropColumnEvent;
+import org.apache.flink.cdc.common.event.Event;
+import org.apache.flink.cdc.common.event.TableId;
+import org.apache.flink.cdc.common.schema.Column;
+import org.apache.flink.cdc.common.schema.Schema;
+import org.apache.flink.cdc.common.types.DataTypes;
+import org.apache.flink.cdc.common.types.RowType;
+import org.apache.flink.cdc.connectors.paimon.sink.PaimonMetadataApplier;
+import org.apache.flink.cdc.runtime.typeutils.BinaryRecordDataGenerator;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
+import org.apache.flink.streaming.runtime.operators.sink.committables.CommitRequestImpl;
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.types.Row;
+import org.apache.flink.types.RowKind;
+import org.apache.flink.util.UserCodeClassLoader;
+
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.flink.FlinkCatalogFactory;
+import org.apache.paimon.flink.sink.MultiTableCommittable;
+import org.apache.paimon.options.Options;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.io.TempDir;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
+import java.io.File;
+import java.io.IOException;
+import java.time.ZoneId;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+import java.util.OptionalLong;
+import java.util.UUID;
+import java.util.stream.Collectors;
+
+/** An ITCase for {@link PaimonWriter} and {@link PaimonCommitter}. */
+public class PaimonSinkITCase {
+
+ @TempDir public static java.nio.file.Path temporaryFolder;
+
+ private Options catalogOptions;
+
+ private TableEnvironment tEnv;
+
+ private String warehouse;
+
+ private TableId table1;
+
+ private BinaryRecordDataGenerator generator;
+
+ public static final String TEST_DATABASE = "test";
+ private static final String HADOOP_CONF_DIR =
+ Objects.requireNonNull(
+ Thread.currentThread()
+ .getContextClassLoader()
+ .getResource("hadoop-conf-dir"))
+ .getPath();
+
+ private static final String HIVE_CONF_DIR =
+ Objects.requireNonNull(
+ Thread.currentThread()
+ .getContextClassLoader()
+ .getResource("hive-conf-dir"))
+ .getPath();
+
+ private void initialize(String metastore)
+ throws Catalog.DatabaseNotEmptyException, Catalog.DatabaseNotExistException {
+ tEnv = TableEnvironment.create(EnvironmentSettings.newInstance().inBatchMode().build());
+ warehouse = new File(temporaryFolder.toFile(), UUID.randomUUID().toString()).toString();
+ catalogOptions = new Options();
+ catalogOptions.setString("metastore", metastore);
+ catalogOptions.setString("warehouse", warehouse);
+ table1 = TableId.tableId("test", "table1");
+ if ("hive".equals(metastore)) {
+ catalogOptions.setString("hadoop-conf-dir", HADOOP_CONF_DIR);
+ catalogOptions.setString("hive-conf-dir", HIVE_CONF_DIR);
+ tEnv.executeSql(
+ String.format(
+ "CREATE CATALOG paimon_catalog WITH ("
+ + "'type'='paimon', "
+ + "'warehouse'='%s', "
+ + "'metastore'='hive', "
+ + "'hadoop-conf-dir'='%s', "
+ + "'hive-conf-dir'='%s' "
+ + ")",
+ warehouse, HADOOP_CONF_DIR, HIVE_CONF_DIR));
+ } else {
+ tEnv.executeSql(
+ String.format(
+ "CREATE CATALOG paimon_catalog WITH ('type'='paimon', 'warehouse'='%s')",
+ warehouse));
+ }
+ FlinkCatalogFactory.createPaimonCatalog(catalogOptions)
+ .dropDatabase(TEST_DATABASE, true, true);
+ }
+
+ private List createTestEvents() {
+ List testEvents = new ArrayList<>();
+ // create table
+ Schema schema =
+ Schema.newBuilder()
+ .physicalColumn("col1", DataTypes.STRING())
+ .physicalColumn("col2", DataTypes.STRING())
+ .primaryKey("col1")
+ .build();
+ CreateTableEvent createTableEvent = new CreateTableEvent(table1, schema);
+ testEvents.add(createTableEvent);
+ PaimonMetadataApplier metadataApplier = new PaimonMetadataApplier(catalogOptions);
+ metadataApplier.applySchemaChange(createTableEvent);
+
+ generator =
+ new BinaryRecordDataGenerator(RowType.of(DataTypes.STRING(), DataTypes.STRING()));
+ // insert
+ DataChangeEvent insertEvent1 =
+ DataChangeEvent.insertEvent(
+ table1,
+ generator.generate(
+ new Object[] {
+ BinaryStringData.fromString("1"),
+ BinaryStringData.fromString("1")
+ }));
+ testEvents.add(insertEvent1);
+ DataChangeEvent insertEvent2 =
+ DataChangeEvent.insertEvent(
+ table1,
+ generator.generate(
+ new Object[] {
+ BinaryStringData.fromString("2"),
+ BinaryStringData.fromString("2")
+ }));
+ testEvents.add(insertEvent2);
+ return testEvents;
+ }
+
+ @ParameterizedTest
+ @ValueSource(strings = {"filesystem", "hive"})
+ public void testSinkWithDataChange(String metastore)
+ throws IOException, InterruptedException, Catalog.DatabaseNotEmptyException,
+ Catalog.DatabaseNotExistException {
+ initialize(metastore);
+ PaimonSink paimonSink =
+ new PaimonSink<>(
+ catalogOptions, new PaimonRecordEventSerializer(ZoneId.systemDefault()));
+ PaimonWriter writer = paimonSink.createWriter(new MockInitContext());
+ Committer committer = paimonSink.createCommitter();
+
+ // insert
+ for (Event event : createTestEvents()) {
+ writer.write(event, null);
+ }
+ writer.flush(false);
+ Collection> commitRequests =
+ writer.prepareCommit().stream()
+ .map(MockCommitRequestImpl::new)
+ .collect(Collectors.toList());
+ committer.commit(commitRequests);
+ List result = new ArrayList<>();
+ tEnv.sqlQuery("select * from paimon_catalog.test.table1")
+ .execute()
+ .collect()
+ .forEachRemaining(result::add);
+ Assertions.assertEquals(
+ Arrays.asList(
+ Row.ofKind(RowKind.INSERT, "1", "1"), Row.ofKind(RowKind.INSERT, "2", "2")),
+ result);
+
+ // delete
+ Event event =
+ DataChangeEvent.deleteEvent(
+ TableId.tableId("test", "table1"),
+ generator.generate(
+ new Object[] {
+ BinaryStringData.fromString("1"),
+ BinaryStringData.fromString("1")
+ }));
+ writer.write(event, null);
+ writer.flush(false);
+ commitRequests =
+ writer.prepareCommit().stream()
+ .map(MockCommitRequestImpl::new)
+ .collect(Collectors.toList());
+ committer.commit(commitRequests);
+ result = new ArrayList<>();
+ tEnv.sqlQuery("select * from paimon_catalog.test.table1")
+ .execute()
+ .collect()
+ .forEachRemaining(result::add);
+ Assertions.assertEquals(
+ Collections.singletonList(Row.ofKind(RowKind.INSERT, "2", "2")), result);
+
+ // update
+ event =
+ DataChangeEvent.updateEvent(
+ TableId.tableId("test", "table1"),
+ generator.generate(
+ new Object[] {
+ BinaryStringData.fromString("2"),
+ BinaryStringData.fromString("2")
+ }),
+ generator.generate(
+ new Object[] {
+ BinaryStringData.fromString("2"),
+ BinaryStringData.fromString("x")
+ }));
+ writer.write(event, null);
+ writer.flush(false);
+ commitRequests =
+ writer.prepareCommit().stream()
+ .map(MockCommitRequestImpl::new)
+ .collect(Collectors.toList());
+ committer.commit(commitRequests);
+ result = new ArrayList<>();
+ tEnv.sqlQuery("select * from paimon_catalog.test.table1")
+ .execute()
+ .collect()
+ .forEachRemaining(result::add);
+ Assertions.assertEquals(
+ Collections.singletonList(Row.ofKind(RowKind.INSERT, "2", "x")), result);
+ }
+
+ @ParameterizedTest
+ @ValueSource(strings = {"filesystem", "hive"})
+ public void testSinkWithSchemaChange(String metastore)
+ throws IOException, InterruptedException, Catalog.DatabaseNotEmptyException,
+ Catalog.DatabaseNotExistException {
+ initialize(metastore);
+ PaimonSink paimonSink =
+ new PaimonSink(
+ catalogOptions, new PaimonRecordEventSerializer(ZoneId.systemDefault()));
+ PaimonWriter writer = paimonSink.createWriter(new MockInitContext());
+ Committer committer = paimonSink.createCommitter();
+
+ // 1. receive only DataChangeEvents during one checkpoint
+ for (Event event : createTestEvents()) {
+ writer.write(event, null);
+ }
+ writer.flush(false);
+ Collection> commitRequests =
+ writer.prepareCommit().stream()
+ .map(MockCommitRequestImpl::new)
+ .collect(Collectors.toList());
+ committer.commit(commitRequests);
+ List result = new ArrayList<>();
+ tEnv.sqlQuery("select * from paimon_catalog.test.table1")
+ .execute()
+ .collect()
+ .forEachRemaining(result::add);
+ Assertions.assertEquals(
+ Arrays.asList(
+ Row.ofKind(RowKind.INSERT, "1", "1"), Row.ofKind(RowKind.INSERT, "2", "2")),
+ result);
+
+ // 2. receive DataChangeEvents and SchemaChangeEvents during one checkpoint
+ DataChangeEvent insertEvent3 =
+ DataChangeEvent.insertEvent(
+ table1,
+ generator.generate(
+ new Object[] {
+ BinaryStringData.fromString("3"),
+ BinaryStringData.fromString("3")
+ }));
+ writer.write(insertEvent3, null);
+ writer.flush(false);
+
+ // add column
+ AddColumnEvent.ColumnWithPosition columnWithPosition =
+ new AddColumnEvent.ColumnWithPosition(
+ Column.physicalColumn("col3", DataTypes.STRING()));
+ AddColumnEvent addColumnEvent =
+ new AddColumnEvent(table1, Collections.singletonList(columnWithPosition));
+ PaimonMetadataApplier metadataApplier = new PaimonMetadataApplier(catalogOptions);
+ writer.write(addColumnEvent, null);
+ metadataApplier.applySchemaChange(addColumnEvent);
+ generator =
+ new BinaryRecordDataGenerator(
+ RowType.of(DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING()));
+ DataChangeEvent insertEvent4 =
+ DataChangeEvent.insertEvent(
+ table1,
+ generator.generate(
+ new Object[] {
+ BinaryStringData.fromString("4"),
+ BinaryStringData.fromString("4"),
+ BinaryStringData.fromString("4")
+ }));
+ writer.write(insertEvent4, null);
+ writer.flush(false);
+ commitRequests =
+ writer.prepareCommit().stream()
+ .map(MockCommitRequestImpl::new)
+ .collect(Collectors.toList());
+ committer.commit(commitRequests);
+ result = new ArrayList<>();
+ tEnv.sqlQuery("select * from paimon_catalog.test.table1")
+ .execute()
+ .collect()
+ .forEachRemaining(result::add);
+ Assertions.assertEquals(
+ Arrays.asList(
+ Row.ofKind(RowKind.INSERT, "1", "1", null),
+ Row.ofKind(RowKind.INSERT, "2", "2", null),
+ Row.ofKind(RowKind.INSERT, "3", "3", null),
+ Row.ofKind(RowKind.INSERT, "4", "4", "4")),
+ result);
+
+ // 2. receive DataChangeEvents and SchemaChangeEvents during one checkpoint
+ DataChangeEvent insertEvent5 =
+ DataChangeEvent.insertEvent(
+ table1,
+ generator.generate(
+ new Object[] {
+ BinaryStringData.fromString("5"),
+ BinaryStringData.fromString("5"),
+ BinaryStringData.fromString("5")
+ }));
+ writer.write(insertEvent5, null);
+ writer.flush(false);
+ // drop column
+ DropColumnEvent dropColumnEvent =
+ new DropColumnEvent(table1, Collections.singletonList("col2"));
+ metadataApplier.applySchemaChange(dropColumnEvent);
+ writer.write(dropColumnEvent, null);
+ generator =
+ new BinaryRecordDataGenerator(RowType.of(DataTypes.STRING(), DataTypes.STRING()));
+ DataChangeEvent insertEvent6 =
+ DataChangeEvent.insertEvent(
+ table1,
+ generator.generate(
+ new Object[] {
+ BinaryStringData.fromString("6"),
+ BinaryStringData.fromString("6")
+ }));
+ writer.write(insertEvent6, null);
+ writer.flush(false);
+ commitRequests =
+ writer.prepareCommit().stream()
+ .map(MockCommitRequestImpl::new)
+ .collect(Collectors.toList());
+ committer.commit(commitRequests);
+ result = new ArrayList<>();
+ tEnv.sqlQuery("select * from paimon_catalog.test.table1")
+ .execute()
+ .collect()
+ .forEachRemaining(result::add);
+ Assertions.assertEquals(
+ Arrays.asList(
+ Row.ofKind(RowKind.INSERT, "1", null),
+ Row.ofKind(RowKind.INSERT, "2", null),
+ Row.ofKind(RowKind.INSERT, "3", null),
+ Row.ofKind(RowKind.INSERT, "4", "4"),
+ Row.ofKind(RowKind.INSERT, "5", "5"),
+ Row.ofKind(RowKind.INSERT, "6", "6")),
+ result);
+ }
+
+ @ParameterizedTest
+ @ValueSource(strings = {"filesystem", "hive"})
+ public void testSinkWithMultiTables(String metastore)
+ throws IOException, InterruptedException, Catalog.DatabaseNotEmptyException,
+ Catalog.DatabaseNotExistException {
+ initialize(metastore);
+ PaimonSink paimonSink =
+ new PaimonSink<>(
+ catalogOptions, new PaimonRecordEventSerializer(ZoneId.systemDefault()));
+ PaimonWriter writer = paimonSink.createWriter(new MockInitContext());
+ Committer committer = paimonSink.createCommitter();
+ List testEvents = createTestEvents();
+ // create table
+ TableId table2 = TableId.tableId("test", "table2");
+ Schema schema =
+ Schema.newBuilder()
+ .physicalColumn("col1", DataTypes.STRING())
+ .physicalColumn("col2", DataTypes.STRING())
+ .primaryKey("col1")
+ .build();
+ CreateTableEvent createTableEvent = new CreateTableEvent(table2, schema);
+ testEvents.add(createTableEvent);
+ PaimonMetadataApplier metadataApplier = new PaimonMetadataApplier(catalogOptions);
+ metadataApplier.applySchemaChange(createTableEvent);
+ // insert
+ DataChangeEvent insertEvent1 =
+ DataChangeEvent.insertEvent(
+ table2,
+ generator.generate(
+ new Object[] {
+ BinaryStringData.fromString("1"),
+ BinaryStringData.fromString("1")
+ }));
+ testEvents.add(insertEvent1);
+
+ // insert
+ for (Event event : testEvents) {
+ writer.write(event, null);
+ }
+ writer.flush(false);
+ Collection> commitRequests =
+ writer.prepareCommit().stream()
+ .map(MockCommitRequestImpl::new)
+ .collect(Collectors.toList());
+ committer.commit(commitRequests);
+ List result = new ArrayList<>();
+ tEnv.sqlQuery("select * from paimon_catalog.test.table1")
+ .execute()
+ .collect()
+ .forEachRemaining(result::add);
+ Assertions.assertEquals(
+ Arrays.asList(
+ Row.ofKind(RowKind.INSERT, "1", "1"), Row.ofKind(RowKind.INSERT, "2", "2")),
+ result);
+ result = new ArrayList<>();
+ tEnv.sqlQuery("select * from paimon_catalog.test.table2")
+ .execute()
+ .collect()
+ .forEachRemaining(result::add);
+ Assertions.assertEquals(
+ Collections.singletonList(Row.ofKind(RowKind.INSERT, "1", "1")), result);
+ }
+
+ private static class MockCommitRequestImpl extends CommitRequestImpl {
+
+ protected MockCommitRequestImpl(CommT committable) {
+ super(committable);
+ }
+ }
+
+ private static class MockInitContext
+ implements Sink.InitContext, SerializationSchema.InitializationContext {
+
+ private MockInitContext() {}
+
+ public UserCodeClassLoader getUserCodeClassLoader() {
+ return null;
+ }
+
+ public MailboxExecutor getMailboxExecutor() {
+ return null;
+ }
+
+ public ProcessingTimeService getProcessingTimeService() {
+ return null;
+ }
+
+ public int getSubtaskId() {
+ return 0;
+ }
+
+ public int getNumberOfParallelSubtasks() {
+ return 0;
+ }
+
+ public int getAttemptNumber() {
+ return 0;
+ }
+
+ public SinkWriterMetricGroup metricGroup() {
+ return null;
+ }
+
+ public MetricGroup getMetricGroup() {
+ return null;
+ }
+
+ public OptionalLong getRestoredCheckpointId() {
+ return OptionalLong.empty();
+ }
+
+ public SerializationSchema.InitializationContext
+ asSerializationSchemaInitializationContext() {
+ return this;
+ }
+
+ public boolean isObjectReuseEnabled() {
+ return false;
+ }
+
+ public TypeSerializer createInputSerializer() {
+ return null;
+ }
+
+ public JobID getJobId() {
+ return null;
+ }
+ }
+}
diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonWriterHelperTest.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonWriterHelperTest.java
new file mode 100644
index 0000000000..d432e3fe2c
--- /dev/null
+++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonWriterHelperTest.java
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.paimon.sink.v2;
+
+import org.apache.flink.cdc.common.data.DecimalData;
+import org.apache.flink.cdc.common.data.LocalZonedTimestampData;
+import org.apache.flink.cdc.common.data.RecordData;
+import org.apache.flink.cdc.common.data.TimestampData;
+import org.apache.flink.cdc.common.data.binary.BinaryRecordData;
+import org.apache.flink.cdc.common.data.binary.BinaryStringData;
+import org.apache.flink.cdc.common.event.DataChangeEvent;
+import org.apache.flink.cdc.common.event.TableId;
+import org.apache.flink.cdc.common.schema.Schema;
+import org.apache.flink.cdc.common.types.DataTypes;
+import org.apache.flink.cdc.common.types.RowType;
+import org.apache.flink.cdc.runtime.typeutils.BinaryRecordDataGenerator;
+
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.Decimal;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.data.Timestamp;
+import org.apache.paimon.types.RowKind;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.math.BigDecimal;
+import java.time.Instant;
+import java.time.ZoneId;
+import java.util.List;
+
+/** Tests for {@link PaimonWriterHelper}. */
+public class PaimonWriterHelperTest {
+
+ @Test
+ public void testConvertEventToGenericRowOfAllDataTypes() {
+ RowType rowType =
+ RowType.of(
+ DataTypes.BOOLEAN(),
+ DataTypes.BINARY(3),
+ DataTypes.VARBINARY(10),
+ DataTypes.BYTES(),
+ DataTypes.TINYINT(),
+ DataTypes.SMALLINT(),
+ DataTypes.INT(),
+ DataTypes.BIGINT(),
+ DataTypes.FLOAT(),
+ DataTypes.DOUBLE(),
+ DataTypes.DECIMAL(6, 3),
+ DataTypes.CHAR(5),
+ DataTypes.VARCHAR(10),
+ DataTypes.STRING(),
+ DataTypes.DATE(),
+ DataTypes.TIME(),
+ DataTypes.TIME(6),
+ DataTypes.TIMESTAMP(),
+ DataTypes.TIMESTAMP(3),
+ DataTypes.TIMESTAMP_LTZ(),
+ DataTypes.TIMESTAMP_LTZ(3),
+ DataTypes.STRING());
+ Object[] testData =
+ new Object[] {
+ true,
+ new byte[] {1, 2},
+ new byte[] {3, 4},
+ new byte[] {5, 6, 7},
+ (byte) 1,
+ (short) 2,
+ 3,
+ 4L,
+ 5.1f,
+ 6.2,
+ DecimalData.fromBigDecimal(new BigDecimal("7.123"), 6, 3),
+ BinaryStringData.fromString("test1"),
+ BinaryStringData.fromString("test2"),
+ BinaryStringData.fromString("test3"),
+ 100,
+ 200,
+ 300,
+ TimestampData.fromTimestamp(
+ java.sql.Timestamp.valueOf("2023-01-01 00:00:00.000")),
+ TimestampData.fromTimestamp(java.sql.Timestamp.valueOf("2023-01-01 00:00:00")),
+ LocalZonedTimestampData.fromInstant(Instant.parse("2023-01-01T00:00:00.000Z")),
+ LocalZonedTimestampData.fromInstant(Instant.parse("2023-01-01T00:00:00.000Z")),
+ null
+ };
+ BinaryRecordData recordData = new BinaryRecordDataGenerator(rowType).generate(testData);
+ Schema schema = Schema.newBuilder().fromRowDataType(rowType).build();
+ List fieldGetters =
+ PaimonWriterHelper.createFieldGetters(schema, ZoneId.of("UTC+8"));
+ DataChangeEvent dataChangeEvent =
+ DataChangeEvent.insertEvent(TableId.parse("database.table"), recordData);
+ GenericRow genericRow =
+ PaimonWriterHelper.convertEventToGenericRow(dataChangeEvent, fieldGetters);
+ Assertions.assertEquals(
+ GenericRow.ofKind(
+ RowKind.INSERT,
+ true,
+ new byte[] {1, 2},
+ new byte[] {3, 4},
+ new byte[] {5, 6, 7},
+ (byte) 1,
+ (short) 2,
+ 3,
+ 4L,
+ 5.1f,
+ 6.2,
+ Decimal.fromBigDecimal(new BigDecimal("7.123"), 6, 3),
+ BinaryString.fromString("test1"),
+ BinaryString.fromString("test2"),
+ BinaryString.fromString("test3"),
+ 100,
+ 200,
+ 300,
+ Timestamp.fromSQLTimestamp(
+ java.sql.Timestamp.valueOf("2023-01-01 00:00:00.000")),
+ Timestamp.fromSQLTimestamp(
+ java.sql.Timestamp.valueOf("2023-01-01 00:00:00")),
+ // plus 8 hours.
+ Timestamp.fromInstant(Instant.parse("2023-01-01T08:00:00.000Z")),
+ Timestamp.fromInstant(Instant.parse("2023-01-01T08:00:00.000Z")),
+ null),
+ genericRow);
+ }
+
+ @Test
+ public void testConvertEventToGenericRowOfDataChangeTypes() {
+ Schema schema =
+ Schema.newBuilder()
+ .physicalColumn("col1", DataTypes.STRING())
+ .physicalColumn("col2", DataTypes.STRING())
+ .build();
+ List fieldGetters =
+ PaimonWriterHelper.createFieldGetters(schema, ZoneId.systemDefault());
+ TableId tableId = TableId.parse("database.table");
+ BinaryRecordDataGenerator generator =
+ new BinaryRecordDataGenerator(RowType.of(DataTypes.STRING(), DataTypes.STRING()));
+ BinaryRecordData recordData =
+ generator.generate(
+ new Object[] {
+ BinaryStringData.fromString("1"), BinaryStringData.fromString("1")
+ });
+
+ DataChangeEvent dataChangeEvent = DataChangeEvent.insertEvent(tableId, recordData);
+ GenericRow genericRow =
+ PaimonWriterHelper.convertEventToGenericRow(dataChangeEvent, fieldGetters);
+ Assertions.assertEquals(genericRow.getRowKind(), RowKind.INSERT);
+
+ dataChangeEvent = DataChangeEvent.deleteEvent(tableId, recordData);
+ genericRow = PaimonWriterHelper.convertEventToGenericRow(dataChangeEvent, fieldGetters);
+ Assertions.assertEquals(genericRow.getRowKind(), RowKind.DELETE);
+
+ dataChangeEvent = DataChangeEvent.updateEvent(tableId, recordData, recordData);
+ genericRow = PaimonWriterHelper.convertEventToGenericRow(dataChangeEvent, fieldGetters);
+ Assertions.assertEquals(genericRow.getRowKind(), RowKind.INSERT);
+
+ dataChangeEvent = DataChangeEvent.replaceEvent(tableId, recordData, null);
+ genericRow = PaimonWriterHelper.convertEventToGenericRow(dataChangeEvent, fieldGetters);
+ Assertions.assertEquals(genericRow.getRowKind(), RowKind.INSERT);
+ }
+}
diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/v2/TestMultiTableCommittableChannelComputer.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/v2/TestMultiTableCommittableChannelComputer.java
new file mode 100644
index 0000000000..2c0a9b4cba
--- /dev/null
+++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/v2/TestMultiTableCommittableChannelComputer.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.paimon.sink.v2;
+
+import org.apache.flink.streaming.api.connector.sink2.CommittableWithLineage;
+
+import org.apache.paimon.flink.sink.MultiTableCommittable;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/** A test for {@link MultiTableCommittableChannelComputer}. */
+public class TestMultiTableCommittableChannelComputer {
+
+ @Test
+ public void testChannel() {
+ MultiTableCommittableChannelComputer computer = new MultiTableCommittableChannelComputer();
+ computer.setup(4);
+ List commits =
+ Arrays.asList(
+ new MultiTableCommittable("database", "table1", 1L, null, null),
+ new MultiTableCommittable("database", "table2", 1L, null, null),
+ new MultiTableCommittable("database", "table1", 1L, null, null),
+ new MultiTableCommittable("database", "table5", 1L, null, null),
+ new MultiTableCommittable("database", "table3", 1L, null, null),
+ new MultiTableCommittable("database", "table8", 1L, null, null),
+ new MultiTableCommittable("database", "table5", 1L, null, null),
+ new MultiTableCommittable("database", "table1", 1L, null, null),
+ new MultiTableCommittable("database", "table9", 1L, null, null),
+ new MultiTableCommittable("database", "table5", 1L, null, null),
+ new MultiTableCommittable("database", "table3", 1L, null, null),
+ new MultiTableCommittable("database", "table8", 1L, null, null));
+ Map> map = new HashMap<>();
+ commits.forEach(
+ (commit) -> {
+ int channel = computer.channel(new CommittableWithLineage<>(commit, 1L, 0));
+ Set set = map.getOrDefault(channel, new HashSet<>());
+ set.add(commit.getTable());
+ map.put(channel, set);
+ });
+ Set actualtables = new HashSet<>();
+ for (Map.Entry> entry : map.entrySet()) {
+ actualtables.addAll(entry.getValue());
+ }
+ Set expectedTables =
+ new HashSet<>(
+ Arrays.asList("table1", "table2", "table3", "table5", "table8", "table9"));
+ // Not a table is appeared in more than one channel.
+ Assertions.assertEquals(actualtables, expectedTables);
+ }
+}
diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/resources/hadoop-conf-dir/core-site.xml b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/resources/hadoop-conf-dir/core-site.xml
new file mode 100644
index 0000000000..38b1c42029
--- /dev/null
+++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/resources/hadoop-conf-dir/core-site.xml
@@ -0,0 +1,36 @@
+
+
+
+
+
+ fs.defaultFS
+ file:///
+
+
+ fs.trash.interval
+ 1
+
+
+ io.compression.codecs
+ org.apache.hadoop.io.compress.DefaultCodec,org.apache.hadoop.io.compress.GzipCodec,org.apache.hadoop.io.compress.BZip2Codec,org.apache.hadoop.io.compress.DeflateCodec,org.apache.hadoop.io.compress.SnappyCodec,org.apache.hadoop.io.compress.Lz4Codec
+
+
+ hadoop.security.authentication
+ none
+
+
diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/resources/hive-conf-dir/hive-site.xml b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/resources/hive-conf-dir/hive-site.xml
new file mode 100644
index 0000000000..845ff9f625
--- /dev/null
+++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/resources/hive-conf-dir/hive-site.xml
@@ -0,0 +1,47 @@
+
+
+
+
+
+
+
+ hive.metastore.schema.verification
+ false
+
+
+
+ hive.metastore.client.capability.check
+ false
+
+
+
+ datanucleus.schema.autoCreateTables
+ true
+
+
+
+ datanucleus.schema.autoCreateAll
+ true
+
+
+
+ javax.jdo.option.ConnectionURL
+ jdbc:derby:memory:test;create=true
+
+
+
diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/resources/log4j2-test.properties b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/resources/log4j2-test.properties
new file mode 100644
index 0000000000..8f6c074738
--- /dev/null
+++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/resources/log4j2-test.properties
@@ -0,0 +1,25 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+# http://www.apache.org/licenses/LICENSE-2.0
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+# Set root logger level to OFF to not flood build logs
+# set manually to INFO for debugging purposes
+rootLogger.level=OFF
+rootLogger.appenderRef.test.ref = TestLogger
+
+appender.testlogger.name = TestLogger
+appender.testlogger.type = CONSOLE
+appender.testlogger.target = SYSTEM_ERR
+appender.testlogger.layout.type = PatternLayout
+appender.testlogger.layout.pattern = %-4r [%t] %-5p %c - %m%n
diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/pom.xml b/flink-cdc-connect/flink-cdc-pipeline-connectors/pom.xml
index c5265af9fa..424372f4c1 100644
--- a/flink-cdc-connect/flink-cdc-pipeline-connectors/pom.xml
+++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/pom.xml
@@ -33,6 +33,7 @@ limitations under the License.
flink-cdc-pipeline-connector-doris
flink-cdc-pipeline-connector-starrocks
flink-cdc-pipeline-connector-kafka
+ flink-cdc-pipeline-connector-paimon
diff --git a/flink-cdc-e2e-tests/flink-cdc-e2e-utils/pom.xml b/flink-cdc-e2e-tests/flink-cdc-e2e-utils/pom.xml
new file mode 100644
index 0000000000..2e38a55eda
--- /dev/null
+++ b/flink-cdc-e2e-tests/flink-cdc-e2e-utils/pom.xml
@@ -0,0 +1,31 @@
+
+
+
+ 4.0.0
+
+ flink-cdc-e2e-tests
+ org.apache.flink
+ ${revision}
+
+
+ flink-cdc-e2e-utils
+ flink-cdc-e2e-utils
+ jar
+
\ No newline at end of file
diff --git a/flink-cdc-e2e-tests/flink-cdc-source-e2e-tests/src/test/java/org/apache/flink/cdc/connectors/tests/utils/JdbcProxy.java b/flink-cdc-e2e-tests/flink-cdc-e2e-utils/src/test/java/org/apache/flink/cdc/common/test/utils/JdbcProxy.java
similarity index 88%
rename from flink-cdc-e2e-tests/flink-cdc-source-e2e-tests/src/test/java/org/apache/flink/cdc/connectors/tests/utils/JdbcProxy.java
rename to flink-cdc-e2e-tests/flink-cdc-e2e-utils/src/test/java/org/apache/flink/cdc/common/test/utils/JdbcProxy.java
index c0f0167ae8..cb1735de64 100644
--- a/flink-cdc-e2e-tests/flink-cdc-source-e2e-tests/src/test/java/org/apache/flink/cdc/connectors/tests/utils/JdbcProxy.java
+++ b/flink-cdc-e2e-tests/flink-cdc-e2e-utils/src/test/java/org/apache/flink/cdc/common/test/utils/JdbcProxy.java
@@ -15,9 +15,10 @@
* limitations under the License.
*/
-package org.apache.flink.cdc.connectors.tests.utils;
+package org.apache.flink.cdc.common.test.utils;
import org.apache.commons.lang3.StringUtils;
+import org.junit.Assert;
import java.sql.Connection;
import java.sql.DriverManager;
@@ -28,8 +29,6 @@
import java.util.Collections;
import java.util.List;
-import static org.junit.Assert.assertArrayEquals;
-
/** Proxy to communicate with database using JDBC protocol. */
public class JdbcProxy {
@@ -45,11 +44,11 @@ public JdbcProxy(String url, String userName, String password, String driverClas
this.driverClass = driverClass;
}
- public void checkResult(List expectedResult, String table, String[] fields)
+ private void checkResult(List expectedResult, String table, String[] fields)
throws SQLException, ClassNotFoundException {
Class.forName(driverClass);
try (Connection dbConn = DriverManager.getConnection(url, userName, password);
- PreparedStatement statement = dbConn.prepareStatement("select * from " + table);
+ PreparedStatement statement = dbConn.prepareStatement("SELECT * FROM " + table);
ResultSet resultSet = statement.executeQuery()) {
List results = new ArrayList<>();
while (resultSet.next()) {
@@ -68,10 +67,14 @@ public void checkResult(List expectedResult, String table, String[] fiel
Collections.sort(results);
Collections.sort(expectedResult);
// make it easier to check the result
- assertArrayEquals(expectedResult.toArray(), results.toArray());
+ Assert.assertArrayEquals(expectedResult.toArray(), results.toArray());
}
}
+ /**
+ * Check the result of a table with specified fields. If the result is not as expected, it will
+ * retry until timeout.
+ */
public void checkResultWithTimeout(
List expectedResult, String table, String[] fields, long timeout)
throws Exception {
diff --git a/flink-cdc-e2e-tests/flink-cdc-source-e2e-tests/src/test/java/org/apache/flink/cdc/connectors/tests/utils/ParameterProperty.java b/flink-cdc-e2e-tests/flink-cdc-e2e-utils/src/test/java/org/apache/flink/cdc/common/test/utils/ParameterProperty.java
similarity index 96%
rename from flink-cdc-e2e-tests/flink-cdc-source-e2e-tests/src/test/java/org/apache/flink/cdc/connectors/tests/utils/ParameterProperty.java
rename to flink-cdc-e2e-tests/flink-cdc-e2e-utils/src/test/java/org/apache/flink/cdc/common/test/utils/ParameterProperty.java
index ac13e95196..55abbd5a7f 100644
--- a/flink-cdc-e2e-tests/flink-cdc-source-e2e-tests/src/test/java/org/apache/flink/cdc/connectors/tests/utils/ParameterProperty.java
+++ b/flink-cdc-e2e-tests/flink-cdc-e2e-utils/src/test/java/org/apache/flink/cdc/common/test/utils/ParameterProperty.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.flink.cdc.connectors.tests.utils;
+package org.apache.flink.cdc.common.test.utils;
import java.util.function.Function;
diff --git a/flink-cdc-e2e-tests/flink-cdc-source-e2e-tests/src/test/java/org/apache/flink/cdc/connectors/tests/utils/TestUtils.java b/flink-cdc-e2e-tests/flink-cdc-e2e-utils/src/test/java/org/apache/flink/cdc/common/test/utils/TestUtils.java
similarity index 80%
rename from flink-cdc-e2e-tests/flink-cdc-source-e2e-tests/src/test/java/org/apache/flink/cdc/connectors/tests/utils/TestUtils.java
rename to flink-cdc-e2e-tests/flink-cdc-e2e-utils/src/test/java/org/apache/flink/cdc/common/test/utils/TestUtils.java
index bb6492eee3..7923bb9b9b 100644
--- a/flink-cdc-e2e-tests/flink-cdc-source-e2e-tests/src/test/java/org/apache/flink/cdc/connectors/tests/utils/TestUtils.java
+++ b/flink-cdc-e2e-tests/flink-cdc-e2e-utils/src/test/java/org/apache/flink/cdc/common/test/utils/TestUtils.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.flink.cdc.connectors.tests.utils;
+package org.apache.flink.cdc.common.test.utils;
import java.io.FileNotFoundException;
import java.io.IOException;
@@ -28,8 +28,7 @@
import java.util.stream.Stream;
/** General test utilities. */
-public enum TestUtils {
- ;
+public class TestUtils {
private static final ParameterProperty MODULE_DIRECTORY =
new ParameterProperty<>("moduleDir", Paths::get);
@@ -37,19 +36,28 @@ public enum TestUtils {
/**
* Searches for a resource file matching the given regex in the given directory. This method is
* primarily intended to be used for the initialization of static {@link Path} fields for
- * resource file(i.e. jar, config file) that reside in the modules {@code target} directory.
+ * resource file(i.e. jar, config file). if resolvePaths is empty, this method will search file
+ * under the modules {@code target} directory. if resolvePaths is not empty, this method will
+ * search file under resolvePaths of current project.
*
* @param resourceNameRegex regex pattern to match against
- * @return Path pointing to the matching jar
+ * @param resolvePaths an array of resolve paths of current project
+ * @return Path pointing to the matching file
* @throws RuntimeException if none or multiple resource files could be found
*/
- public static Path getResource(final String resourceNameRegex) {
+ public static Path getResource(final String resourceNameRegex, String... resolvePaths) {
// if the property is not set then we are most likely running in the IDE, where the working
// directory is the
// module of the test that is currently running, which is exactly what we want
- Path moduleDirectory = MODULE_DIRECTORY.get(Paths.get("").toAbsolutePath());
+ Path path = MODULE_DIRECTORY.get(Paths.get("").toAbsolutePath());
+ if (resolvePaths != null && resolvePaths.length > 0) {
+ path = path.getParent().getParent();
+ for (String resolvePath : resolvePaths) {
+ path = path.resolve(resolvePath);
+ }
+ }
- try (Stream dependencyResources = Files.walk(moduleDirectory)) {
+ try (Stream dependencyResources = Files.walk(path)) {
final List matchingResources =
dependencyResources
.filter(
diff --git a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/pom.xml b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/pom.xml
index 854bab7143..e785ba0a5a 100644
--- a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/pom.xml
+++ b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/pom.xml
@@ -27,4 +27,210 @@ limitations under the License.
flink-cdc-pipeline-e2e-tests
+
+ 1.17.1
+ 1.18.0
+ 8.0.27
+ 1.2.9_flink-${flink.major.version}
+
+
+
+
+ org.apache.flink
+ flink-cdc-e2e-utils
+ ${project.version}
+ test-jar
+ test
+
+
+
+
+ mysql
+ mysql-connector-java
+
+
+ com.google.protobuf
+ protobuf-java
+
+
+ ${mysql.driver.version}
+ test
+
+
+
+
+ org.apache.flink
+ flink-cdc-dist
+ ${project.version}
+ test
+
+
+ org.apache.flink
+ flink-connector-mysql-cdc
+ ${project.version}
+ test-jar
+ test
+
+
+ org.apache.flink
+ flink-cdc-pipeline-connector-values
+ ${project.version}
+ test
+
+
+ org.apache.flink
+ flink-cdc-pipeline-connector-mysql
+ ${project.version}
+ test-jar
+ test
+
+
+ org.apache.flink
+ flink-cdc-pipeline-connector-doris
+ ${project.version}
+ test
+
+
+ org.apache.flink
+ flink-cdc-pipeline-connector-starrocks
+ ${project.version}
+ test
+
+
+ org.apache.flink
+ flink-connector-test-util
+ ${project.version}
+ test
+
+
+
+
+ org.testcontainers
+ mysql
+ ${testcontainers.version}
+ test
+
+
+
+
+
+
+ src/test/resources
+
+ **/flink-cdc.sh
+ **/flink-cdc.yaml
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-surefire-plugin
+
+
+ default-test
+ none
+
+
+ integration-tests
+ none
+
+
+ end-to-end-tests
+ integration-test
+
+ test
+
+
+
+ **/*.*
+
+ 1
+
+ ${project.basedir}
+
+
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-dependency-plugin
+
+
+ copy-jars
+ package
+
+ copy
+
+
+
+
+
+
+
+ mysql
+ mysql-connector-java
+ ${mysql.driver.version}
+ mysql-driver.jar
+ jar
+ ${project.build.directory}/dependencies
+
+
+
+
+ org.apache.flink
+ flink-cdc-dist
+ ${project.version}
+ flink-cdc-dist.jar
+ jar
+ ${project.build.directory}/dependencies
+
+
+
+
+ org.apache.flink
+ flink-cdc-pipeline-connector-values
+ ${project.version}
+ values-cdc-pipeline-connector.jar
+ jar
+ ${project.build.directory}/dependencies
+
+
+
+
+ org.apache.flink
+ flink-cdc-pipeline-connector-mysql
+ ${project.version}
+ mysql-cdc-pipeline-connector.jar
+ jar
+ ${project.build.directory}/dependencies
+
+
+
+
+ org.apache.flink
+ flink-cdc-pipeline-connector-doris
+ ${project.version}
+ doris-cdc-pipeline-connector.jar
+ jar
+ ${project.build.directory}/dependencies
+
+
+
+
+ org.apache.flink
+ flink-cdc-pipeline-connector-starrocks
+ ${project.version}
+ starrocks-cdc-pipeline-connector.jar
+ jar
+ ${project.build.directory}/dependencies
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/MysqlE2eITCase.java b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/MysqlE2eITCase.java
new file mode 100644
index 0000000000..3307175864
--- /dev/null
+++ b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/MysqlE2eITCase.java
@@ -0,0 +1,268 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.pipeline.tests;
+
+import org.apache.flink.cdc.common.test.utils.TestUtils;
+import org.apache.flink.cdc.connectors.mysql.testutils.MySqlContainer;
+import org.apache.flink.cdc.connectors.mysql.testutils.MySqlVersion;
+import org.apache.flink.cdc.connectors.mysql.testutils.UniqueDatabase;
+import org.apache.flink.cdc.pipeline.tests.utils.PipelineTestEnvironment;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+
+import java.nio.file.Path;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.TimeoutException;
+
+/** End-to-end tests for mysql cdc pipeline job. */
+@RunWith(Parameterized.class)
+public class MysqlE2eITCase extends PipelineTestEnvironment {
+ private static final Logger LOG = LoggerFactory.getLogger(MysqlE2eITCase.class);
+
+ // ------------------------------------------------------------------------------------------
+ // MySQL Variables (we always use MySQL as the data source for easier verifying)
+ // ------------------------------------------------------------------------------------------
+ protected static final String MYSQL_TEST_USER = "mysqluser";
+ protected static final String MYSQL_TEST_PASSWORD = "mysqlpw";
+ protected static final String MYSQL_DRIVER_CLASS = "com.mysql.cj.jdbc.Driver";
+ protected static final String INTER_CONTAINER_MYSQL_ALIAS = "mysql";
+
+ @ClassRule
+ public static final MySqlContainer MYSQL =
+ (MySqlContainer)
+ new MySqlContainer(
+ MySqlVersion.V8_0) // v8 support both ARM and AMD architectures
+ .withConfigurationOverride("docker/mysql/my.cnf")
+ .withSetupSQL("docker/mysql/setup.sql")
+ .withDatabaseName("flink-test")
+ .withUsername("flinkuser")
+ .withPassword("flinkpw")
+ .withNetwork(NETWORK)
+ .withNetworkAliases(INTER_CONTAINER_MYSQL_ALIAS)
+ .withLogConsumer(new Slf4jLogConsumer(LOG));
+
+ protected final UniqueDatabase mysqlInventoryDatabase =
+ new UniqueDatabase(MYSQL, "mysql_inventory", MYSQL_TEST_USER, MYSQL_TEST_PASSWORD);
+
+ @Before
+ public void before() throws Exception {
+ super.before();
+ mysqlInventoryDatabase.createAndInitialize();
+ }
+
+ @After
+ public void after() {
+ super.after();
+ mysqlInventoryDatabase.dropDatabase();
+ }
+
+ @Test
+ public void testSyncWholeDatabase() throws Exception {
+ String pipelineJob =
+ String.format(
+ "source:\n"
+ + " type: mysql\n"
+ + " hostname: %s\n"
+ + " port: 3306\n"
+ + " username: %s\n"
+ + " password: %s\n"
+ + " tables: %s.\\.*\n"
+ + " server-id: 5400-5404\n"
+ + " server-time-zone: UTC\n"
+ + "\n"
+ + "sink:\n"
+ + " type: values\n"
+ + "\n"
+ + "pipeline:\n"
+ + " parallelism: 1",
+ INTER_CONTAINER_MYSQL_ALIAS,
+ MYSQL_TEST_USER,
+ MYSQL_TEST_PASSWORD,
+ mysqlInventoryDatabase.getDatabaseName());
+ Path mysqlCdcJar = TestUtils.getResource("mysql-cdc-pipeline-connector.jar");
+ Path valuesCdcJar = TestUtils.getResource("values-cdc-pipeline-connector.jar");
+ Path mysqlDriverJar = TestUtils.getResource("mysql-driver.jar");
+ submitPipelineJob(pipelineJob, mysqlCdcJar, valuesCdcJar, mysqlDriverJar);
+ waitUntilJobRunning(Duration.ofSeconds(30));
+ LOG.info("Pipeline job is running");
+ waitUtilSpecificEvent(
+ String.format(
+ "DataChangeEvent{tableId=%s.customers, before=[], after=[104, user_4, Shanghai, 123567891234], op=INSERT, meta=()}",
+ mysqlInventoryDatabase.getDatabaseName()),
+ 60000L);
+ waitUtilSpecificEvent(
+ String.format(
+ "DataChangeEvent{tableId=%s.products, before=[], after=[109, spare tire, 24 inch spare tire, 22.2, null, null, null], op=INSERT, meta=()}",
+ mysqlInventoryDatabase.getDatabaseName()),
+ 60000L);
+ List expectedEvents =
+ Arrays.asList(
+ String.format(
+ "CreateTableEvent{tableId=%s.customers, schema=columns={`id` INT NOT NULL,`name` VARCHAR(255) NOT NULL,`address` VARCHAR(1024),`phone_number` VARCHAR(512)}, primaryKeys=id, options=()}",
+ mysqlInventoryDatabase.getDatabaseName()),
+ String.format(
+ "DataChangeEvent{tableId=%s.customers, before=[], after=[104, user_4, Shanghai, 123567891234], op=INSERT, meta=()}",
+ mysqlInventoryDatabase.getDatabaseName()),
+ String.format(
+ "DataChangeEvent{tableId=%s.customers, before=[], after=[103, user_3, Shanghai, 123567891234], op=INSERT, meta=()}",
+ mysqlInventoryDatabase.getDatabaseName()),
+ String.format(
+ "DataChangeEvent{tableId=%s.customers, before=[], after=[102, user_2, Shanghai, 123567891234], op=INSERT, meta=()}",
+ mysqlInventoryDatabase.getDatabaseName()),
+ String.format(
+ "DataChangeEvent{tableId=%s.customers, before=[], after=[101, user_1, Shanghai, 123567891234], op=INSERT, meta=()}",
+ mysqlInventoryDatabase.getDatabaseName()),
+ String.format(
+ "CreateTableEvent{tableId=%s.products, schema=columns={`id` INT NOT NULL,`name` VARCHAR(255) NOT NULL,`description` VARCHAR(512),`weight` FLOAT,`enum_c` STRING,`json_c` STRING,`point_c` STRING}, primaryKeys=id, options=()}",
+ mysqlInventoryDatabase.getDatabaseName()),
+ String.format(
+ "DataChangeEvent{tableId=%s.products, before=[], after=[109, spare tire, 24 inch spare tire, 22.2, null, null, null], op=INSERT, meta=()}",
+ mysqlInventoryDatabase.getDatabaseName()),
+ String.format(
+ "DataChangeEvent{tableId=%s.products, before=[], after=[107, rocks, box of assorted rocks, 5.3, null, null, null], op=INSERT, meta=()}",
+ mysqlInventoryDatabase.getDatabaseName()),
+ String.format(
+ "DataChangeEvent{tableId=%s.products, before=[], after=[108, jacket, water resistent black wind breaker, 0.1, null, null, null], op=INSERT, meta=()}",
+ mysqlInventoryDatabase.getDatabaseName()),
+ String.format(
+ "DataChangeEvent{tableId=%s.products, before=[], after=[105, hammer, 14oz carpenter's hammer, 0.875, red, {\"k1\": \"v1\", \"k2\": \"v2\"}, {\"coordinates\":[5,5],\"type\":\"Point\",\"srid\":0}], op=INSERT, meta=()}",
+ mysqlInventoryDatabase.getDatabaseName()),
+ String.format(
+ "DataChangeEvent{tableId=%s.products, before=[], after=[106, hammer, 16oz carpenter's hammer, 1.0, null, null, null], op=INSERT, meta=()}",
+ mysqlInventoryDatabase.getDatabaseName()),
+ String.format(
+ "DataChangeEvent{tableId=%s.products, before=[], after=[103, 12-pack drill bits, 12-pack of drill bits with sizes ranging from #40 to #3, 0.8, red, {\"key3\": \"value3\"}, {\"coordinates\":[3,3],\"type\":\"Point\",\"srid\":0}], op=INSERT, meta=()}",
+ mysqlInventoryDatabase.getDatabaseName()),
+ String.format(
+ "DataChangeEvent{tableId=%s.products, before=[], after=[104, hammer, 12oz carpenter's hammer, 0.75, white, {\"key4\": \"value4\"}, {\"coordinates\":[4,4],\"type\":\"Point\",\"srid\":0}], op=INSERT, meta=()}",
+ mysqlInventoryDatabase.getDatabaseName()),
+ String.format(
+ "DataChangeEvent{tableId=%s.products, before=[], after=[101, scooter, Small 2-wheel scooter, 3.14, red, {\"key1\": \"value1\"}, {\"coordinates\":[1,1],\"type\":\"Point\",\"srid\":0}], op=INSERT, meta=()}",
+ mysqlInventoryDatabase.getDatabaseName()),
+ String.format(
+ "DataChangeEvent{tableId=%s.products, before=[], after=[102, car battery, 12V car battery, 8.1, white, {\"key2\": \"value2\"}, {\"coordinates\":[2,2],\"type\":\"Point\",\"srid\":0}], op=INSERT, meta=()}",
+ mysqlInventoryDatabase.getDatabaseName()));
+ validateResult(expectedEvents);
+ LOG.info("Begin incremental reading stage.");
+ // generate binlogs
+ String mysqlJdbcUrl =
+ String.format(
+ "jdbc:mysql://%s:%s/%s",
+ MYSQL.getHost(),
+ MYSQL.getDatabasePort(),
+ mysqlInventoryDatabase.getDatabaseName());
+ try (Connection conn =
+ DriverManager.getConnection(
+ mysqlJdbcUrl, MYSQL_TEST_USER, MYSQL_TEST_PASSWORD);
+ Statement stat = conn.createStatement()) {
+ stat.execute("UPDATE products SET description='18oz carpenter hammer' WHERE id=106;");
+ stat.execute("UPDATE products SET weight='5.1' WHERE id=107;");
+ // modify table schema
+ stat.execute("ALTER TABLE products ADD COLUMN new_col INT;");
+ stat.execute(
+ "INSERT INTO products VALUES (default,'jacket','water resistent white wind breaker',0.2, null, null, null, 1);"); // 110
+ stat.execute(
+ "INSERT INTO products VALUES (default,'scooter','Big 2-wheel scooter ',5.18, null, null, null, 1);"); // 111
+ stat.execute(
+ "UPDATE products SET description='new water resistent white wind breaker', weight='0.5' WHERE id=110;");
+ stat.execute("UPDATE products SET weight='5.17' WHERE id=111;");
+ stat.execute("DELETE FROM products WHERE id=111;");
+ } catch (SQLException e) {
+ LOG.error("Update table for CDC failed.", e);
+ throw e;
+ }
+
+ waitUtilSpecificEvent(
+ String.format(
+ "DataChangeEvent{tableId=%s.products, before=[111, scooter, Big 2-wheel scooter , 5.17, null, null, null, 1], after=[], op=DELETE, meta=()}",
+ mysqlInventoryDatabase.getDatabaseName()),
+ 60000L);
+
+ expectedEvents =
+ Arrays.asList(
+ String.format(
+ "DataChangeEvent{tableId=%s.products, before=[106, hammer, 16oz carpenter's hammer, 1.0, null, null, null], after=[106, hammer, 18oz carpenter hammer, 1.0, null, null, null], op=UPDATE, meta=()}",
+ mysqlInventoryDatabase.getDatabaseName()),
+ String.format(
+ "DataChangeEvent{tableId=%s.products, before=[107, rocks, box of assorted rocks, 5.3, null, null, null], after=[107, rocks, box of assorted rocks, 5.1, null, null, null], op=UPDATE, meta=()}",
+ mysqlInventoryDatabase.getDatabaseName()),
+ String.format(
+ "AddColumnEvent{tableId=%s.products, addedColumns=[ColumnWithPosition{column=`new_col` INT, position=LAST, existedColumnName=null}]}",
+ mysqlInventoryDatabase.getDatabaseName()),
+ String.format(
+ "DataChangeEvent{tableId=%s.products, before=[], after=[110, jacket, water resistent white wind breaker, 0.2, null, null, null, 1], op=INSERT, meta=()}",
+ mysqlInventoryDatabase.getDatabaseName()),
+ String.format(
+ "DataChangeEvent{tableId=%s.products, before=[], after=[111, scooter, Big 2-wheel scooter , 5.18, null, null, null, 1], op=INSERT, meta=()}",
+ mysqlInventoryDatabase.getDatabaseName()),
+ String.format(
+ "DataChangeEvent{tableId=%s.products, before=[110, jacket, water resistent white wind breaker, 0.2, null, null, null, 1], after=[110, jacket, new water resistent white wind breaker, 0.5, null, null, null, 1], op=UPDATE, meta=()}",
+ mysqlInventoryDatabase.getDatabaseName()),
+ String.format(
+ "DataChangeEvent{tableId=%s.products, before=[111, scooter, Big 2-wheel scooter , 5.18, null, null, null, 1], after=[111, scooter, Big 2-wheel scooter , 5.17, null, null, null, 1], op=UPDATE, meta=()}",
+ mysqlInventoryDatabase.getDatabaseName()),
+ String.format(
+ "DataChangeEvent{tableId=%s.products, before=[111, scooter, Big 2-wheel scooter , 5.17, null, null, null, 1], after=[], op=DELETE, meta=()}",
+ mysqlInventoryDatabase.getDatabaseName()));
+ validateResult(expectedEvents);
+ }
+
+ private void validateResult(List expectedEvents) {
+ String stdout = taskManagerConsumer.toUtf8String();
+ for (String event : expectedEvents) {
+ if (!stdout.contains(event)) {
+ throw new RuntimeException(
+ "failed to get specific event: " + event + " from stdout: " + stdout);
+ }
+ }
+ }
+
+ private void waitUtilSpecificEvent(String event, long timeout) throws Exception {
+ boolean result = false;
+ long endTimeout = System.currentTimeMillis() + timeout;
+ while (System.currentTimeMillis() < endTimeout) {
+ String stdout = taskManagerConsumer.toUtf8String();
+ if (stdout.contains(event)) {
+ result = true;
+ break;
+ }
+ Thread.sleep(1000);
+ }
+ if (!result) {
+ throw new TimeoutException(
+ "failed to get specific event: "
+ + event
+ + " from stdout: "
+ + taskManagerConsumer.toUtf8String());
+ }
+ }
+}
diff --git a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/utils/PipelineTestEnvironment.java b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/utils/PipelineTestEnvironment.java
new file mode 100644
index 0000000000..a448bf554c
--- /dev/null
+++ b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/utils/PipelineTestEnvironment.java
@@ -0,0 +1,249 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.pipeline.tests.utils;
+
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.api.common.time.Deadline;
+import org.apache.flink.cdc.common.test.utils.TestUtils;
+import org.apache.flink.client.deployment.StandaloneClusterId;
+import org.apache.flink.client.program.rest.RestClusterClient;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.RestOptions;
+import org.apache.flink.runtime.client.JobStatusMessage;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.Container.ExecResult;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.Network;
+import org.testcontainers.containers.output.ToStringConsumer;
+import org.testcontainers.lifecycle.Startables;
+import org.testcontainers.utility.MountableFile;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Stream;
+
+import static org.apache.flink.util.Preconditions.checkState;
+
+/** Test environment running pipeline job on Flink containers. */
+@RunWith(Parameterized.class)
+public abstract class PipelineTestEnvironment extends TestLogger {
+ private static final Logger LOG = LoggerFactory.getLogger(PipelineTestEnvironment.class);
+
+ @Parameterized.Parameter public String flinkVersion;
+
+ // ------------------------------------------------------------------------------------------
+ // Flink Variables
+ // ------------------------------------------------------------------------------------------
+ public static final int JOB_MANAGER_REST_PORT = 8081;
+ public static final String INTER_CONTAINER_JM_ALIAS = "jobmanager";
+ public static final String INTER_CONTAINER_TM_ALIAS = "taskmanager";
+ public static final String FLINK_PROPERTIES =
+ String.join(
+ "\n",
+ Arrays.asList(
+ "jobmanager.rpc.address: jobmanager",
+ "taskmanager.numberOfTaskSlots: 10",
+ "parallelism.default: 4",
+ "execution.checkpointing.interval: 300",
+ // this is needed for oracle-cdc tests.
+ // see https://stackoverflow.com/a/47062742/4915129
+ "env.java.opts: -Doracle.jdbc.timezoneAsRegion=false"));
+
+ @ClassRule public static final Network NETWORK = Network.newNetwork();
+
+ @Rule public final TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+ @Nullable protected RestClusterClient restClusterClient;
+ protected GenericContainer> jobManager;
+ protected GenericContainer> taskManager;
+
+ protected ToStringConsumer jobManagerConsumer;
+
+ protected ToStringConsumer taskManagerConsumer;
+
+ @Parameterized.Parameters(name = "flinkVersion: {0}")
+ public static List getFlinkVersion() {
+ return Arrays.asList("1.17.1", "1.18.0");
+ }
+
+ @Before
+ public void before() throws Exception {
+ LOG.info("Starting containers...");
+ jobManagerConsumer = new ToStringConsumer();
+ jobManager =
+ new GenericContainer<>(getFlinkDockerImageTag())
+ .withCommand("jobmanager")
+ .withNetwork(NETWORK)
+ .withExtraHost("host.docker.internal", "host-gateway")
+ .withNetworkAliases(INTER_CONTAINER_JM_ALIAS)
+ .withExposedPorts(JOB_MANAGER_REST_PORT)
+ .withEnv("FLINK_PROPERTIES", FLINK_PROPERTIES)
+ .withLogConsumer(jobManagerConsumer);
+ taskManagerConsumer = new ToStringConsumer();
+ taskManager =
+ new GenericContainer<>(getFlinkDockerImageTag())
+ .withCommand("taskmanager")
+ .withExtraHost("host.docker.internal", "host-gateway")
+ .withNetwork(NETWORK)
+ .withNetworkAliases(INTER_CONTAINER_TM_ALIAS)
+ .withEnv("FLINK_PROPERTIES", FLINK_PROPERTIES)
+ .dependsOn(jobManager)
+ .withLogConsumer(taskManagerConsumer);
+
+ Startables.deepStart(Stream.of(jobManager)).join();
+ Startables.deepStart(Stream.of(taskManager)).join();
+ LOG.info("Containers are started.");
+ }
+
+ @After
+ public void after() {
+ if (restClusterClient != null) {
+ restClusterClient.close();
+ }
+ if (jobManager != null) {
+ jobManager.stop();
+ }
+ if (taskManager != null) {
+ taskManager.stop();
+ }
+ }
+
+ /** Allow overriding the default flink properties. */
+ public void overrideFlinkProperties(String properties) {
+ jobManager.withEnv("FLINK_PROPERTIES", properties);
+ taskManager.withEnv("FLINK_PROPERTIES", properties);
+ }
+
+ /**
+ * Submits a SQL job to the running cluster.
+ *
+ * NOTE: You should not use {@code '\t'}.
+ */
+ public void submitPipelineJob(String pipelineJob, Path... jars)
+ throws IOException, InterruptedException {
+ for (Path jar : jars) {
+ jobManager.copyFileToContainer(
+ MountableFile.forHostPath(jar), "/tmp/flinkCDC/lib/" + jar.getFileName());
+ }
+ jobManager.copyFileToContainer(
+ MountableFile.forHostPath(
+ TestUtils.getResource("flink-cdc.sh", "flink-cdc-dist", "src"), 755),
+ "/tmp/flinkCDC/bin/flink-cdc.sh");
+ jobManager.copyFileToContainer(
+ MountableFile.forHostPath(
+ TestUtils.getResource("flink-cdc.yaml", "flink-cdc-dist", "src"), 755),
+ "/tmp/flinkCDC/conf/flink-cdc.yaml");
+ jobManager.copyFileToContainer(
+ MountableFile.forHostPath(TestUtils.getResource("flink-cdc-dist.jar")),
+ "/tmp/flinkCDC/lib/flink-cdc-dist.jar");
+ Path script = temporaryFolder.newFile().toPath();
+ Files.write(script, pipelineJob.getBytes());
+ jobManager.copyFileToContainer(
+ MountableFile.forHostPath(script), "/tmp/flinkCDC/conf/pipeline.yaml");
+ StringBuilder sb = new StringBuilder();
+ for (Path jar : jars) {
+ sb.append(" --jar /tmp/flinkCDC/lib/").append(jar.getFileName());
+ }
+ String commands =
+ "/tmp/flinkCDC/bin/flink-cdc.sh /tmp/flinkCDC/conf/pipeline.yaml --flink-home /opt/flink"
+ + sb;
+ ExecResult execResult = jobManager.execInContainer("bash", "-c", commands);
+ LOG.info(execResult.getStdout());
+ LOG.error(execResult.getStderr());
+ if (execResult.getExitCode() != 0) {
+ throw new AssertionError("Failed when submitting the pipeline job.");
+ }
+ }
+
+ /**
+ * Get {@link RestClusterClient} connected to this FlinkContainer.
+ *
+ *
This method lazily initializes the REST client on-demand.
+ */
+ public RestClusterClient getRestClusterClient() {
+ if (restClusterClient != null) {
+ return restClusterClient;
+ }
+ checkState(
+ jobManager.isRunning(),
+ "Cluster client should only be retrieved for a running cluster");
+ try {
+ final Configuration clientConfiguration = new Configuration();
+ clientConfiguration.set(RestOptions.ADDRESS, jobManager.getHost());
+ clientConfiguration.set(
+ RestOptions.PORT, jobManager.getMappedPort(JOB_MANAGER_REST_PORT));
+ this.restClusterClient =
+ new RestClusterClient<>(clientConfiguration, StandaloneClusterId.getInstance());
+ } catch (Exception e) {
+ throw new IllegalStateException(
+ "Failed to create client for Flink container cluster", e);
+ }
+ return restClusterClient;
+ }
+
+ public void waitUntilJobRunning(Duration timeout) {
+ RestClusterClient> clusterClient = getRestClusterClient();
+ Deadline deadline = Deadline.fromNow(timeout);
+ while (deadline.hasTimeLeft()) {
+ Collection jobStatusMessages;
+ try {
+ jobStatusMessages = clusterClient.listJobs().get(10, TimeUnit.SECONDS);
+ } catch (Exception e) {
+ LOG.warn("Error when fetching job status.", e);
+ continue;
+ }
+ if (jobStatusMessages != null && !jobStatusMessages.isEmpty()) {
+ JobStatusMessage message = jobStatusMessages.iterator().next();
+ JobStatus jobStatus = message.getJobState();
+ if (jobStatus.isTerminalState()) {
+ throw new ValidationException(
+ String.format(
+ "Job has been terminated! JobName: %s, JobID: %s, Status: %s",
+ message.getJobName(),
+ message.getJobId(),
+ message.getJobState()));
+ } else if (jobStatus == JobStatus.RUNNING) {
+ return;
+ }
+ }
+ }
+ }
+
+ protected String getFlinkDockerImageTag() {
+ return String.format("flink:%s-scala_2.12", flinkVersion);
+ }
+}
diff --git a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/resources/ddl/mysql_inventory.sql b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/resources/ddl/mysql_inventory.sql
new file mode 100644
index 0000000000..4e9b44f71d
--- /dev/null
+++ b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/resources/ddl/mysql_inventory.sql
@@ -0,0 +1,53 @@
+-- Copyright 2023 Ververica Inc.
+--
+-- Licensed under the Apache License, Version 2.0 (the "License");
+-- you may not use this file except in compliance with the License.
+-- You may obtain a copy of the License at
+-- http://www.apache.org/licenses/LICENSE-2.0
+-- Unless required by applicable law or agreed to in writing,
+-- software distributed under the License is distributed on an
+-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+-- KIND, either express or implied. See the License for the
+-- specific language governing permissions and limitations
+-- under the License.
+
+-- ----------------------------------------------------------------------------------------------------------------
+-- DATABASE: mysql_inventory
+-- ----------------------------------------------------------------------------------------------------------------
+
+-- Create and populate our products using a single insert with many rows
+CREATE TABLE products (
+ id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
+ name VARCHAR(255) NOT NULL DEFAULT 'flink',
+ description VARCHAR(512),
+ weight FLOAT,
+ enum_c enum('red', 'white') default 'red', -- test some complex types as well,
+ json_c JSON, -- because we use additional dependencies to deserialize complex types.
+ point_c POINT
+);
+ALTER TABLE products AUTO_INCREMENT = 101;
+
+INSERT INTO products
+VALUES (default,"scooter","Small 2-wheel scooter",3.14, 'red', '{"key1": "value1"}', ST_GeomFromText('POINT(1 1)')),
+ (default,"car battery","12V car battery",8.1, 'white', '{"key2": "value2"}', ST_GeomFromText('POINT(2 2)')),
+ (default,"12-pack drill bits","12-pack of drill bits with sizes ranging from #40 to #3",0.8, 'red', '{"key3": "value3"}', ST_GeomFromText('POINT(3 3)')),
+ (default,"hammer","12oz carpenter's hammer",0.75, 'white', '{"key4": "value4"}', ST_GeomFromText('POINT(4 4)')),
+ (default,"hammer","14oz carpenter's hammer",0.875, 'red', '{"k1": "v1", "k2": "v2"}', ST_GeomFromText('POINT(5 5)')),
+ (default,"hammer","16oz carpenter's hammer",1.0, null, null, null),
+ (default,"rocks","box of assorted rocks",5.3, null, null, null),
+ (default,"jacket","water resistent black wind breaker",0.1, null, null, null),
+ (default,"spare tire","24 inch spare tire",22.2, null, null, null);
+
+-- Create and populate our customers using a single insert with many rows
+CREATE TABLE customers (
+ id INTEGER NOT NULL PRIMARY KEY,
+ name VARCHAR(255) NOT NULL DEFAULT 'flink',
+ address VARCHAR(1024),
+ phone_number VARCHAR(512)
+);
+
+INSERT INTO customers
+VALUES (101,"user_1","Shanghai","123567891234"),
+ (102,"user_2","Shanghai","123567891234"),
+ (103,"user_3","Shanghai","123567891234"),
+ (104,"user_4","Shanghai","123567891234");
diff --git a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/resources/docker/mysql/my.cnf b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/resources/docker/mysql/my.cnf
new file mode 100644
index 0000000000..11d6c94eef
--- /dev/null
+++ b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/resources/docker/mysql/my.cnf
@@ -0,0 +1,62 @@
+# Copyright 2023 Ververica Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+# http://www.apache.org/licenses/LICENSE-2.0
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# For advice on how to change settings please see
+# http://dev.mysql.com/doc/refman/5.7/en/server-configuration-defaults.html
+
+[mysqld]
+#
+# Remove leading # and set to the amount of RAM for the most important data
+# cache in MySQL. Start at 70% of total RAM for dedicated server, else 10%.
+# innodb_buffer_pool_size = 128M
+#
+# Remove leading # to turn on a very important data integrity option: logging
+# changes to the binary log between backups.
+# log_bin
+#
+# Remove leading # to set options mainly useful for reporting servers.
+# The server defaults are faster for transactions and fast SELECTs.
+# Adjust sizes as needed, experiment to find the optimal values.
+# join_buffer_size = 128M
+# sort_buffer_size = 2M
+# read_rnd_buffer_size = 2M
+skip-host-cache
+skip-name-resolve
+#datadir=/var/lib/mysql
+#socket=/var/lib/mysql/mysql.sock
+#secure-file-priv=/var/lib/mysql-files
+secure-file-priv=/var/lib/mysql
+user=mysql
+
+# Disabling symbolic-links is recommended to prevent assorted security risks
+symbolic-links=0
+
+#log-error=/var/log/mysqld.log
+#pid-file=/var/run/mysqld/mysqld.pid
+
+# ----------------------------------------------
+# Enable the binlog for replication & CDC
+# ----------------------------------------------
+
+# Enable binary replication log and set the prefix, expiration, and log format.
+# The prefix is arbitrary, expiration can be short for integration tests but would
+# be longer on a production system. Row-level info is required for ingest to work.
+# Server ID is required, but this will vary on production systems
+server-id = 223344
+log_bin = mysql-bin
+expire_logs_days = 1
+binlog_format = row
+
+# enable gtid mode
+gtid_mode = on
+enforce_gtid_consistency = on
\ No newline at end of file
diff --git a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/resources/docker/mysql/setup.sql b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/resources/docker/mysql/setup.sql
new file mode 100644
index 0000000000..8586a84894
--- /dev/null
+++ b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/resources/docker/mysql/setup.sql
@@ -0,0 +1,28 @@
+-- Copyright 2023 Ververica Inc.
+--
+-- Licensed under the Apache License, Version 2.0 (the "License");
+-- you may not use this file except in compliance with the License.
+-- You may obtain a copy of the License at
+-- http://www.apache.org/licenses/LICENSE-2.0
+-- Unless required by applicable law or agreed to in writing,
+-- software distributed under the License is distributed on an
+-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+-- KIND, either express or implied. See the License for the
+-- specific language governing permissions and limitations
+-- under the License.
+
+-- In production you would almost certainly limit the replication user must be on the follower (slave) machine,
+-- to prevent other clients accessing the log from other machines. For example, 'replicator'@'follower.acme.com'.
+-- However, in this database we'll grant 2 users different privileges:
+--
+-- 1) 'flinkuser' - all privileges required by the snapshot reader AND binlog reader (used for testing)
+-- 2) 'mysqluser' - all privileges
+--
+GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT, LOCK TABLES ON *.* TO 'flinkuser'@'%';
+CREATE USER 'mysqluser' IDENTIFIED BY 'mysqlpw';
+GRANT ALL PRIVILEGES ON *.* TO 'mysqluser'@'%';
+
+-- ----------------------------------------------------------------------------------------------------------------
+-- DATABASE: emptydb
+-- ----------------------------------------------------------------------------------------------------------------
+CREATE DATABASE emptydb;
diff --git a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/resources/log4j2-test.properties b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/resources/log4j2-test.properties
new file mode 100644
index 0000000000..a9d045e0ef
--- /dev/null
+++ b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/resources/log4j2-test.properties
@@ -0,0 +1,26 @@
+################################################################################
+# Copyright 2023 Ververica Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+# Set root logger level to OFF to not flood build logs
+# set manually to INFO for debugging purposes
+rootLogger.level=INFO
+rootLogger.appenderRef.test.ref = TestLogger
+
+appender.testlogger.name = TestLogger
+appender.testlogger.type = CONSOLE
+appender.testlogger.target = SYSTEM_ERR
+appender.testlogger.layout.type = PatternLayout
+appender.testlogger.layout.pattern = %-4r [%t] %-5p %c - %m%n
diff --git a/flink-cdc-e2e-tests/flink-cdc-source-e2e-tests/pom.xml b/flink-cdc-e2e-tests/flink-cdc-source-e2e-tests/pom.xml
index 795f7938b4..adb3e0f823 100644
--- a/flink-cdc-e2e-tests/flink-cdc-source-e2e-tests/pom.xml
+++ b/flink-cdc-e2e-tests/flink-cdc-source-e2e-tests/pom.xml
@@ -40,6 +40,14 @@ limitations under the License.
+
+ org.apache.flink
+ flink-cdc-e2e-utils
+ ${project.version}
+ test-jar
+ test
+
+
mysql
diff --git a/flink-cdc-e2e-tests/flink-cdc-source-e2e-tests/src/test/java/org/apache/flink/cdc/connectors/tests/Db2E2eITCase.java b/flink-cdc-e2e-tests/flink-cdc-source-e2e-tests/src/test/java/org/apache/flink/cdc/connectors/tests/Db2E2eITCase.java
index e7c898e9ff..9094da69e0 100644
--- a/flink-cdc-e2e-tests/flink-cdc-source-e2e-tests/src/test/java/org/apache/flink/cdc/connectors/tests/Db2E2eITCase.java
+++ b/flink-cdc-e2e-tests/flink-cdc-source-e2e-tests/src/test/java/org/apache/flink/cdc/connectors/tests/Db2E2eITCase.java
@@ -17,9 +17,9 @@
package org.apache.flink.cdc.connectors.tests;
+import org.apache.flink.cdc.common.test.utils.JdbcProxy;
+import org.apache.flink.cdc.common.test.utils.TestUtils;
import org.apache.flink.cdc.connectors.tests.utils.FlinkContainerTestEnvironment;
-import org.apache.flink.cdc.connectors.tests.utils.JdbcProxy;
-import org.apache.flink.cdc.connectors.tests.utils.TestUtils;
import org.junit.After;
import org.junit.Before;
diff --git a/flink-cdc-e2e-tests/flink-cdc-source-e2e-tests/src/test/java/org/apache/flink/cdc/connectors/tests/MongoE2eITCase.java b/flink-cdc-e2e-tests/flink-cdc-source-e2e-tests/src/test/java/org/apache/flink/cdc/connectors/tests/MongoE2eITCase.java
index 6108971717..f3db2624d0 100644
--- a/flink-cdc-e2e-tests/flink-cdc-source-e2e-tests/src/test/java/org/apache/flink/cdc/connectors/tests/MongoE2eITCase.java
+++ b/flink-cdc-e2e-tests/flink-cdc-source-e2e-tests/src/test/java/org/apache/flink/cdc/connectors/tests/MongoE2eITCase.java
@@ -17,10 +17,10 @@
package org.apache.flink.cdc.connectors.tests;
+import org.apache.flink.cdc.common.test.utils.JdbcProxy;
+import org.apache.flink.cdc.common.test.utils.TestUtils;
import org.apache.flink.cdc.connectors.mongodb.utils.MongoDBContainer;
import org.apache.flink.cdc.connectors.tests.utils.FlinkContainerTestEnvironment;
-import org.apache.flink.cdc.connectors.tests.utils.JdbcProxy;
-import org.apache.flink.cdc.connectors.tests.utils.TestUtils;
import com.mongodb.ConnectionString;
import com.mongodb.MongoClientSettings;
diff --git a/flink-cdc-e2e-tests/flink-cdc-source-e2e-tests/src/test/java/org/apache/flink/cdc/connectors/tests/MySqlE2eITCase.java b/flink-cdc-e2e-tests/flink-cdc-source-e2e-tests/src/test/java/org/apache/flink/cdc/connectors/tests/MySqlE2eITCase.java
index 2a5f7a10f6..bf54cb4d76 100644
--- a/flink-cdc-e2e-tests/flink-cdc-source-e2e-tests/src/test/java/org/apache/flink/cdc/connectors/tests/MySqlE2eITCase.java
+++ b/flink-cdc-e2e-tests/flink-cdc-source-e2e-tests/src/test/java/org/apache/flink/cdc/connectors/tests/MySqlE2eITCase.java
@@ -17,9 +17,9 @@
package org.apache.flink.cdc.connectors.tests;
+import org.apache.flink.cdc.common.test.utils.JdbcProxy;
+import org.apache.flink.cdc.common.test.utils.TestUtils;
import org.apache.flink.cdc.connectors.tests.utils.FlinkContainerTestEnvironment;
-import org.apache.flink.cdc.connectors.tests.utils.JdbcProxy;
-import org.apache.flink.cdc.connectors.tests.utils.TestUtils;
import org.junit.Test;
import org.slf4j.Logger;
diff --git a/flink-cdc-e2e-tests/flink-cdc-source-e2e-tests/src/test/java/org/apache/flink/cdc/connectors/tests/OceanBaseE2eITCase.java b/flink-cdc-e2e-tests/flink-cdc-source-e2e-tests/src/test/java/org/apache/flink/cdc/connectors/tests/OceanBaseE2eITCase.java
index e3d329882d..e257311a5b 100644
--- a/flink-cdc-e2e-tests/flink-cdc-source-e2e-tests/src/test/java/org/apache/flink/cdc/connectors/tests/OceanBaseE2eITCase.java
+++ b/flink-cdc-e2e-tests/flink-cdc-source-e2e-tests/src/test/java/org/apache/flink/cdc/connectors/tests/OceanBaseE2eITCase.java
@@ -17,9 +17,9 @@
package org.apache.flink.cdc.connectors.tests;
+import org.apache.flink.cdc.common.test.utils.JdbcProxy;
+import org.apache.flink.cdc.common.test.utils.TestUtils;
import org.apache.flink.cdc.connectors.tests.utils.FlinkContainerTestEnvironment;
-import org.apache.flink.cdc.connectors.tests.utils.JdbcProxy;
-import org.apache.flink.cdc.connectors.tests.utils.TestUtils;
import org.junit.Before;
import org.junit.ClassRule;
diff --git a/flink-cdc-e2e-tests/flink-cdc-source-e2e-tests/src/test/java/org/apache/flink/cdc/connectors/tests/OracleE2eITCase.java b/flink-cdc-e2e-tests/flink-cdc-source-e2e-tests/src/test/java/org/apache/flink/cdc/connectors/tests/OracleE2eITCase.java
index 585778f2d2..f72f03be73 100644
--- a/flink-cdc-e2e-tests/flink-cdc-source-e2e-tests/src/test/java/org/apache/flink/cdc/connectors/tests/OracleE2eITCase.java
+++ b/flink-cdc-e2e-tests/flink-cdc-source-e2e-tests/src/test/java/org/apache/flink/cdc/connectors/tests/OracleE2eITCase.java
@@ -17,9 +17,9 @@
package org.apache.flink.cdc.connectors.tests;
+import org.apache.flink.cdc.common.test.utils.JdbcProxy;
+import org.apache.flink.cdc.common.test.utils.TestUtils;
import org.apache.flink.cdc.connectors.tests.utils.FlinkContainerTestEnvironment;
-import org.apache.flink.cdc.connectors.tests.utils.JdbcProxy;
-import org.apache.flink.cdc.connectors.tests.utils.TestUtils;
import org.junit.After;
import org.junit.Before;
diff --git a/flink-cdc-e2e-tests/flink-cdc-source-e2e-tests/src/test/java/org/apache/flink/cdc/connectors/tests/PostgresE2eITCase.java b/flink-cdc-e2e-tests/flink-cdc-source-e2e-tests/src/test/java/org/apache/flink/cdc/connectors/tests/PostgresE2eITCase.java
index aa4f3daf15..2580f8c56d 100644
--- a/flink-cdc-e2e-tests/flink-cdc-source-e2e-tests/src/test/java/org/apache/flink/cdc/connectors/tests/PostgresE2eITCase.java
+++ b/flink-cdc-e2e-tests/flink-cdc-source-e2e-tests/src/test/java/org/apache/flink/cdc/connectors/tests/PostgresE2eITCase.java
@@ -17,9 +17,9 @@
package org.apache.flink.cdc.connectors.tests;
+import org.apache.flink.cdc.common.test.utils.JdbcProxy;
+import org.apache.flink.cdc.common.test.utils.TestUtils;
import org.apache.flink.cdc.connectors.tests.utils.FlinkContainerTestEnvironment;
-import org.apache.flink.cdc.connectors.tests.utils.JdbcProxy;
-import org.apache.flink.cdc.connectors.tests.utils.TestUtils;
import org.junit.After;
import org.junit.Before;
diff --git a/flink-cdc-e2e-tests/flink-cdc-source-e2e-tests/src/test/java/org/apache/flink/cdc/connectors/tests/SqlServerE2eITCase.java b/flink-cdc-e2e-tests/flink-cdc-source-e2e-tests/src/test/java/org/apache/flink/cdc/connectors/tests/SqlServerE2eITCase.java
index ad9bc6acbf..8c832aa5e7 100644
--- a/flink-cdc-e2e-tests/flink-cdc-source-e2e-tests/src/test/java/org/apache/flink/cdc/connectors/tests/SqlServerE2eITCase.java
+++ b/flink-cdc-e2e-tests/flink-cdc-source-e2e-tests/src/test/java/org/apache/flink/cdc/connectors/tests/SqlServerE2eITCase.java
@@ -17,9 +17,9 @@
package org.apache.flink.cdc.connectors.tests;
+import org.apache.flink.cdc.common.test.utils.JdbcProxy;
+import org.apache.flink.cdc.common.test.utils.TestUtils;
import org.apache.flink.cdc.connectors.tests.utils.FlinkContainerTestEnvironment;
-import org.apache.flink.cdc.connectors.tests.utils.JdbcProxy;
-import org.apache.flink.cdc.connectors.tests.utils.TestUtils;
import org.junit.After;
import org.junit.Before;
diff --git a/flink-cdc-e2e-tests/flink-cdc-source-e2e-tests/src/test/java/org/apache/flink/cdc/connectors/tests/TiDBE2eITCase.java b/flink-cdc-e2e-tests/flink-cdc-source-e2e-tests/src/test/java/org/apache/flink/cdc/connectors/tests/TiDBE2eITCase.java
index f64d33ea1f..232d6f7b7b 100644
--- a/flink-cdc-e2e-tests/flink-cdc-source-e2e-tests/src/test/java/org/apache/flink/cdc/connectors/tests/TiDBE2eITCase.java
+++ b/flink-cdc-e2e-tests/flink-cdc-source-e2e-tests/src/test/java/org/apache/flink/cdc/connectors/tests/TiDBE2eITCase.java
@@ -17,9 +17,9 @@
package org.apache.flink.cdc.connectors.tests;
+import org.apache.flink.cdc.common.test.utils.JdbcProxy;
+import org.apache.flink.cdc.common.test.utils.TestUtils;
import org.apache.flink.cdc.connectors.tests.utils.FlinkContainerTestEnvironment;
-import org.apache.flink.cdc.connectors.tests.utils.JdbcProxy;
-import org.apache.flink.cdc.connectors.tests.utils.TestUtils;
import org.junit.After;
import org.junit.Before;
diff --git a/flink-cdc-e2e-tests/flink-cdc-source-e2e-tests/src/test/java/org/apache/flink/cdc/connectors/tests/VitessE2eITCase.java b/flink-cdc-e2e-tests/flink-cdc-source-e2e-tests/src/test/java/org/apache/flink/cdc/connectors/tests/VitessE2eITCase.java
index b2e94622c8..ce0d311326 100644
--- a/flink-cdc-e2e-tests/flink-cdc-source-e2e-tests/src/test/java/org/apache/flink/cdc/connectors/tests/VitessE2eITCase.java
+++ b/flink-cdc-e2e-tests/flink-cdc-source-e2e-tests/src/test/java/org/apache/flink/cdc/connectors/tests/VitessE2eITCase.java
@@ -17,9 +17,9 @@
package org.apache.flink.cdc.connectors.tests;
+import org.apache.flink.cdc.common.test.utils.JdbcProxy;
+import org.apache.flink.cdc.common.test.utils.TestUtils;
import org.apache.flink.cdc.connectors.tests.utils.FlinkContainerTestEnvironment;
-import org.apache.flink.cdc.connectors.tests.utils.JdbcProxy;
-import org.apache.flink.cdc.connectors.tests.utils.TestUtils;
import org.apache.flink.cdc.connectors.vitess.VitessTestBase;
import org.apache.flink.cdc.connectors.vitess.container.VitessContainer;
diff --git a/flink-cdc-e2e-tests/flink-cdc-source-e2e-tests/src/test/java/org/apache/flink/cdc/connectors/tests/utils/FlinkContainerTestEnvironment.java b/flink-cdc-e2e-tests/flink-cdc-source-e2e-tests/src/test/java/org/apache/flink/cdc/connectors/tests/utils/FlinkContainerTestEnvironment.java
index bef68a6765..650e205b8b 100644
--- a/flink-cdc-e2e-tests/flink-cdc-source-e2e-tests/src/test/java/org/apache/flink/cdc/connectors/tests/utils/FlinkContainerTestEnvironment.java
+++ b/flink-cdc-e2e-tests/flink-cdc-source-e2e-tests/src/test/java/org/apache/flink/cdc/connectors/tests/utils/FlinkContainerTestEnvironment.java
@@ -19,6 +19,7 @@
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.common.time.Deadline;
+import org.apache.flink.cdc.common.test.utils.TestUtils;
import org.apache.flink.cdc.connectors.mysql.testutils.MySqlContainer;
import org.apache.flink.cdc.connectors.mysql.testutils.MySqlVersion;
import org.apache.flink.cdc.connectors.mysql.testutils.UniqueDatabase;
diff --git a/flink-cdc-e2e-tests/pom.xml b/flink-cdc-e2e-tests/pom.xml
index b22be5c23b..3ac5fe5980 100644
--- a/flink-cdc-e2e-tests/pom.xml
+++ b/flink-cdc-e2e-tests/pom.xml
@@ -30,6 +30,7 @@ limitations under the License.
pom
+ flink-cdc-e2e-utils
flink-cdc-source-e2e-tests
flink-cdc-pipeline-e2e-tests