Skip to content

Commit

Permalink
[FLINK-34948][cdc][common] Fix cdc RowType can not convert to Flink type
Browse files Browse the repository at this point in the history
  • Loading branch information
zhongqishang authored Apr 1, 2024
1 parent 927a0ec commit d099603
Show file tree
Hide file tree
Showing 3 changed files with 150 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.flink.cdc.common.types;

import org.apache.flink.cdc.common.annotation.PublicEvolving;
import org.apache.flink.cdc.common.types.utils.DataTypeUtils;
import org.apache.flink.cdc.common.utils.Preconditions;

import javax.annotation.Nullable;
Expand Down Expand Up @@ -116,4 +117,12 @@ private String formatString(String typeString, boolean excludeDescription) {
escapeSingleQuotes(description));
}
}

public org.apache.flink.table.api.DataTypes.Field toFlinkDataTypeField() {
return description == null
? org.apache.flink.table.api.DataTypes.FIELD(
name, DataTypeUtils.toFlinkDataType(type))
: org.apache.flink.table.api.DataTypes.FIELD(
name, DataTypeUtils.toFlinkDataType(type), description);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,15 @@
import org.apache.flink.cdc.common.data.StringData;
import org.apache.flink.cdc.common.data.TimestampData;
import org.apache.flink.cdc.common.data.ZonedTimestampData;
import org.apache.flink.cdc.common.types.DataField;
import org.apache.flink.cdc.common.types.DataType;
import org.apache.flink.cdc.common.types.DataTypes;
import org.apache.flink.cdc.common.types.RowType;
import org.apache.flink.cdc.common.utils.Preconditions;
import org.apache.flink.util.CollectionUtil;

import java.util.List;
import java.util.stream.Collectors;

/** Utilities for handling {@link DataType}s. */
public class DataTypeUtils {
Expand Down Expand Up @@ -134,8 +137,12 @@ public static org.apache.flink.table.types.DataType toFlinkDataType(DataType typ
toFlinkDataType(children.get(0)), toFlinkDataType(children.get(1)));
case ROW:
Preconditions.checkState(!CollectionUtil.isNullOrEmpty(children));
return org.apache.flink.table.api.DataTypes.ROW(
children.toArray(new org.apache.flink.table.types.DataType[] {}));
RowType rowType = (RowType) type;
List<org.apache.flink.table.api.DataTypes.Field> fields =
rowType.getFields().stream()
.map(DataField::toFlinkDataTypeField)
.collect(Collectors.toList());
return org.apache.flink.table.api.DataTypes.ROW(fields);
default:
throw new IllegalArgumentException("Illegal type: " + type);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.cdc.common.types.utils;

import org.apache.flink.cdc.common.types.DataField;
import org.apache.flink.cdc.common.types.DataType;
import org.apache.flink.cdc.common.types.DataTypes;
import org.apache.flink.cdc.common.types.RowType;

import org.junit.jupiter.api.Test;

import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

import static org.apache.flink.table.api.DataTypes.ARRAY;
import static org.apache.flink.table.api.DataTypes.BIGINT;
import static org.apache.flink.table.api.DataTypes.BINARY;
import static org.apache.flink.table.api.DataTypes.BOOLEAN;
import static org.apache.flink.table.api.DataTypes.BYTES;
import static org.apache.flink.table.api.DataTypes.CHAR;
import static org.apache.flink.table.api.DataTypes.DATE;
import static org.apache.flink.table.api.DataTypes.DECIMAL;
import static org.apache.flink.table.api.DataTypes.DOUBLE;
import static org.apache.flink.table.api.DataTypes.FIELD;
import static org.apache.flink.table.api.DataTypes.FLOAT;
import static org.apache.flink.table.api.DataTypes.INT;
import static org.apache.flink.table.api.DataTypes.MAP;
import static org.apache.flink.table.api.DataTypes.ROW;
import static org.apache.flink.table.api.DataTypes.SMALLINT;
import static org.apache.flink.table.api.DataTypes.STRING;
import static org.apache.flink.table.api.DataTypes.TIME;
import static org.apache.flink.table.api.DataTypes.TIMESTAMP_LTZ;
import static org.apache.flink.table.api.DataTypes.TIMESTAMP_WITH_TIME_ZONE;
import static org.apache.flink.table.api.DataTypes.TINYINT;
import static org.apache.flink.table.api.DataTypes.VARBINARY;
import static org.apache.flink.table.api.DataTypes.VARCHAR;
import static org.assertj.core.api.Assertions.assertThat;

/** A test for the {@link org.apache.flink.cdc.common.types.utils.DataTypeUtils}. */
class DataTypeUtilsTest {
private static final DataType[] ALL_TYPES =
new DataType[] {
DataTypes.BOOLEAN(),
DataTypes.BYTES(),
DataTypes.BINARY(10),
DataTypes.VARBINARY(10),
DataTypes.CHAR(10),
DataTypes.VARCHAR(10),
DataTypes.STRING(),
DataTypes.INT(),
DataTypes.TINYINT(),
DataTypes.SMALLINT(),
DataTypes.BIGINT(),
DataTypes.DOUBLE(),
DataTypes.FLOAT(),
DataTypes.DECIMAL(6, 3),
DataTypes.DATE(),
DataTypes.TIME(),
DataTypes.TIME(6),
DataTypes.TIMESTAMP(),
DataTypes.TIMESTAMP(6),
DataTypes.TIMESTAMP_LTZ(),
DataTypes.TIMESTAMP_LTZ(6),
DataTypes.TIMESTAMP_TZ(),
DataTypes.TIMESTAMP_TZ(6),
DataTypes.ARRAY(DataTypes.BIGINT()),
DataTypes.MAP(DataTypes.SMALLINT(), DataTypes.STRING()),
DataTypes.ROW(
DataTypes.FIELD("f1", DataTypes.STRING()),
DataTypes.FIELD("f2", DataTypes.STRING(), "desc")),
DataTypes.ROW(DataTypes.SMALLINT(), DataTypes.STRING())
};

@Test
void testToFlinkDataType() {
List<DataField> list =
IntStream.range(0, ALL_TYPES.length)
.mapToObj(i -> DataTypes.FIELD("f" + i, ALL_TYPES[i]))
.collect(Collectors.toList());

org.apache.flink.table.types.DataType dataType =
DataTypeUtils.toFlinkDataType(new RowType(list));

org.apache.flink.table.types.DataType expectedDataType =
ROW(
FIELD("f0", BOOLEAN()),
FIELD("f1", BYTES()),
FIELD("f2", BINARY(10)),
FIELD("f3", VARBINARY(10)),
FIELD("f4", CHAR(10)),
FIELD("f5", VARCHAR(10)),
FIELD("f6", STRING()),
FIELD("f7", INT()),
FIELD("f8", TINYINT()),
FIELD("f9", SMALLINT()),
FIELD("f10", BIGINT()),
FIELD("f11", DOUBLE()),
FIELD("f12", FLOAT()),
FIELD("f13", DECIMAL(6, 3)),
FIELD("f14", DATE()),
FIELD("f15", TIME()),
FIELD("f16", TIME(6)),
FIELD("f17", TIMESTAMP_WITH_TIME_ZONE()),
FIELD("f18", TIMESTAMP_WITH_TIME_ZONE(6)),
FIELD("f19", TIMESTAMP_LTZ()),
FIELD("f20", TIMESTAMP_LTZ(6)),
FIELD("f21", TIMESTAMP_WITH_TIME_ZONE()),
FIELD("f22", TIMESTAMP_WITH_TIME_ZONE(6)),
FIELD("f23", ARRAY(BIGINT())),
FIELD("f24", MAP(SMALLINT(), STRING())),
FIELD("f25", ROW(FIELD("f1", STRING()), FIELD("f2", STRING(), "desc"))),
FIELD("f26", ROW(SMALLINT(), STRING())));

assertThat(dataType).isEqualTo(expectedDataType);
}
}

0 comments on commit d099603

Please sign in to comment.