Skip to content

Commit

Permalink
yux's patch
Browse files Browse the repository at this point in the history
  • Loading branch information
leonardBang committed Aug 6, 2024
1 parent 955a245 commit cac651e
Showing 1 changed file with 280 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,280 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.cdc.composer.flink;

import org.apache.flink.cdc.common.configuration.Configuration;
import org.apache.flink.cdc.common.data.binary.BinaryStringData;
import org.apache.flink.cdc.common.event.CreateTableEvent;
import org.apache.flink.cdc.common.event.DataChangeEvent;
import org.apache.flink.cdc.common.event.Event;
import org.apache.flink.cdc.common.event.TableId;
import org.apache.flink.cdc.common.pipeline.PipelineOptions;
import org.apache.flink.cdc.common.schema.Schema;
import org.apache.flink.cdc.common.types.DataType;
import org.apache.flink.cdc.common.types.DataTypes;
import org.apache.flink.cdc.composer.PipelineExecution;
import org.apache.flink.cdc.composer.definition.PipelineDef;
import org.apache.flink.cdc.composer.definition.SinkDef;
import org.apache.flink.cdc.composer.definition.SourceDef;
import org.apache.flink.cdc.composer.definition.TransformDef;
import org.apache.flink.cdc.connectors.values.ValuesDatabase;
import org.apache.flink.cdc.connectors.values.factory.ValuesDataFactory;
import org.apache.flink.cdc.connectors.values.sink.ValuesDataSinkOptions;
import org.apache.flink.cdc.connectors.values.source.ValuesDataSourceHelper;
import org.apache.flink.cdc.connectors.values.source.ValuesDataSourceOptions;
import org.apache.flink.cdc.runtime.typeutils.BinaryRecordDataGenerator;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.test.junit5.MiniClusterExtension;

import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

import java.io.ByteArrayOutputStream;
import java.io.PrintStream;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.stream.Stream;

import static org.apache.flink.configuration.CoreOptions.ALWAYS_PARENT_FIRST_LOADER_PATTERNS_ADDITIONAL;
import static org.assertj.core.api.Assertions.assertThat;

/** Integration test for {@link FlinkPipelineComposer}. */
class FlinkPipelineTransformITCase {
private static final int MAX_PARALLELISM = 4;

// Always use parent-first classloader for CDC classes.
// The reason is that ValuesDatabase uses static field for holding data, we need to make sure
// the class is loaded by AppClassloader so that we can verify data in the test case.
private static final org.apache.flink.configuration.Configuration MINI_CLUSTER_CONFIG =
new org.apache.flink.configuration.Configuration();

static {
MINI_CLUSTER_CONFIG.set(
ALWAYS_PARENT_FIRST_LOADER_PATTERNS_ADDITIONAL,
Collections.singletonList("org.apache.flink.cdc"));
}

/**
* Use {@link MiniClusterExtension} to reduce the overhead of restarting the MiniCluster for
* every test case.
*/
@RegisterExtension
static final MiniClusterExtension MINI_CLUSTER_RESOURCE =
new MiniClusterExtension(
new MiniClusterResourceConfiguration.Builder()
.setNumberTaskManagers(1)
.setNumberSlotsPerTaskManager(MAX_PARALLELISM)
.setConfiguration(MINI_CLUSTER_CONFIG)
.build());

private final PrintStream standardOut = System.out;
private final ByteArrayOutputStream outCaptor = new ByteArrayOutputStream();

@BeforeEach
void init() {
// Take over STDOUT as we need to check the output of values sink
System.setOut(new PrintStream(outCaptor));
// Initialize in-memory database
ValuesDatabase.clear();
}

@AfterEach
void cleanup() {
System.setOut(standardOut);
}

@Test
void testTransformWithTemporalFunction() throws Exception {
FlinkPipelineComposer composer = FlinkPipelineComposer.ofMiniCluster();

// Setup value source
Configuration sourceConfig = new Configuration();
sourceConfig.set(
ValuesDataSourceOptions.EVENT_SET_ID,
ValuesDataSourceHelper.EventSetId.CUSTOM_SOURCE_EVENTS);

TableId myTable1 = TableId.tableId("default_namespace", "default_schema", "mytable1");
TableId myTable2 = TableId.tableId("default_namespace", "default_schema", "mytable2");
Schema table1Schema =
Schema.newBuilder()
.physicalColumn("id", DataTypes.INT())
.physicalColumn("name", DataTypes.STRING())
.physicalColumn("age", DataTypes.INT())
.primaryKey("id")
.build();
Schema table2Schema =
Schema.newBuilder()
.physicalColumn("id", DataTypes.BIGINT())
.physicalColumn("name", DataTypes.VARCHAR(255))
.physicalColumn("age", DataTypes.TINYINT())
.physicalColumn("description", DataTypes.STRING())
.primaryKey("id")
.build();

List<Event> events = new ArrayList<>();
BinaryRecordDataGenerator table1dataGenerator =
new BinaryRecordDataGenerator(
table1Schema.getColumnDataTypes().toArray(new DataType[0]));
BinaryRecordDataGenerator table2dataGenerator =
new BinaryRecordDataGenerator(
table2Schema.getColumnDataTypes().toArray(new DataType[0]));
events.add(new CreateTableEvent(myTable1, table1Schema));
events.add(
DataChangeEvent.insertEvent(
myTable1,
table1dataGenerator.generate(
new Object[] {1, BinaryStringData.fromString("Alice"), 18})));
events.add(
DataChangeEvent.insertEvent(
myTable1,
table1dataGenerator.generate(
new Object[] {2, BinaryStringData.fromString("Bob"), 20})));
events.add(
DataChangeEvent.updateEvent(
myTable1,
table1dataGenerator.generate(
new Object[] {2, BinaryStringData.fromString("Bob"), 20}),
table1dataGenerator.generate(
new Object[] {2, BinaryStringData.fromString("Bob"), 30})));
events.add(new CreateTableEvent(myTable2, table2Schema));
events.add(
DataChangeEvent.insertEvent(
myTable2,
table2dataGenerator.generate(
new Object[] {
3L,
BinaryStringData.fromString("Carol"),
(byte) 15,
BinaryStringData.fromString("student")
})));
events.add(
DataChangeEvent.insertEvent(
myTable2,
table2dataGenerator.generate(
new Object[] {
4L,
BinaryStringData.fromString("Derrida"),
(byte) 25,
BinaryStringData.fromString("student")
})));
events.add(
DataChangeEvent.deleteEvent(
myTable2,
table2dataGenerator.generate(
new Object[] {
4L,
BinaryStringData.fromString("Derrida"),
(byte) 25,
BinaryStringData.fromString("student")
})));

ValuesDataSourceHelper.setSourceEvents(Collections.singletonList(events));

SourceDef sourceDef =
new SourceDef(ValuesDataFactory.IDENTIFIER, "Value Source", sourceConfig);

// Setup value sink
Configuration sinkConfig = new Configuration();
sinkConfig.set(ValuesDataSinkOptions.MATERIALIZED_IN_MEMORY, true);
SinkDef sinkDef = new SinkDef(ValuesDataFactory.IDENTIFIER, "Value Sink", sinkConfig);

// Setup pipeline
Configuration pipelineConfig = new Configuration();
pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1);
pipelineConfig.set(PipelineOptions.PIPELINE_LOCAL_TIME_ZONE, "America/Los_Angeles");
PipelineDef pipelineDef =
new PipelineDef(
sourceDef,
sinkDef,
Collections.emptyList(),
Collections.singletonList(
new TransformDef(
"default_namespace.default_schema.\\.*",
"*, LOCALTIME as lcl_t, CURRENT_TIME as cur_t, CAST(CURRENT_TIMESTAMP AS TIMESTAMP) as cur_ts, CAST(NOW() AS TIMESTAMP) as now_ts, LOCALTIMESTAMP as lcl_ts, CURRENT_DATE as cur_dt",
null,
null,
null,
null,
null)),
pipelineConfig);

// Execute the pipeline
PipelineExecution execution = composer.compose(pipelineDef);
execution.execute();

// Check the order and content of all received events
String[] outputEvents = outCaptor.toString().trim().split("\n");

Arrays.stream(outputEvents).forEach(this::extractDataLines);
}

void extractDataLines(String line) {
if (!line.startsWith("DataChangeEvent{")) {
return;
}
Stream.of("before", "after")
.forEach(
tag -> {
String[] arr = line.split(tag + "=\\[", 2);
String dataRecord = arr[arr.length - 1].split("]", 2)[0];
if (!dataRecord.isEmpty()) {
verifyDataRecord(dataRecord);
}
});
}

void verifyDataRecord(String recordLine) {
List<String> tokens = Arrays.asList(recordLine.split(", "));
assertThat(tokens).hasSizeGreaterThanOrEqualTo(6);

tokens = tokens.subList(tokens.size() - 6, tokens.size());

String localTime = tokens.get(0);
String currentTime = tokens.get(1);
assertThat(localTime).isEqualTo(currentTime);

String currentTimestamp = tokens.get(2);
String nowTimestamp = tokens.get(3);
String localTimestamp = tokens.get(4);
assertThat(currentTimestamp).isEqualTo(nowTimestamp).isEqualTo(localTimestamp);

Instant instant =
LocalDateTime.parse(
currentTimestamp,
DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSS"))
.toInstant(ZoneOffset.UTC);

long milliSecondsInOneDay = 24 * 60 * 60 * 1000;

assertThat(instant.toEpochMilli() % milliSecondsInOneDay)
.isEqualTo(Long.parseLong(localTime));

String currentDate = tokens.get(5);

assertThat(instant.toEpochMilli() / milliSecondsInOneDay)
.isEqualTo(Long.parseLong(currentDate));
}
}

0 comments on commit cac651e

Please sign in to comment.