From 1f465226385f5098fa04bfb6a7f36e284d1d23fa Mon Sep 17 00:00:00 2001 From: PengFei Li Date: Wed, 10 Apr 2024 11:28:26 +0800 Subject: [PATCH] [Enhancement] Support to map a subset of StarRocks columns to Flink source table Signed-off-by: PengFei Li --- .../StarRocksDynamicSourceFunction.java | 26 ++++--- .../source/StarRocksDynamicTableSource.java | 11 +-- .../table/source/struct/PushDownHolder.java | 7 -- .../it/source/StarRocksSourceITTest.java | 77 +++++++++++++++++++ .../StarRocksDynamicTableSourceTest.java | 7 +- 5 files changed, 99 insertions(+), 29 deletions(-) diff --git a/src/main/java/com/starrocks/connector/flink/table/source/StarRocksDynamicSourceFunction.java b/src/main/java/com/starrocks/connector/flink/table/source/StarRocksDynamicSourceFunction.java index 082ead0d..a16a1da7 100644 --- a/src/main/java/com/starrocks/connector/flink/table/source/StarRocksDynamicSourceFunction.java +++ b/src/main/java/com/starrocks/connector/flink/table/source/StarRocksDynamicSourceFunction.java @@ -14,12 +14,6 @@ package com.starrocks.connector.flink.table.source; -import com.google.common.base.Strings; -import com.starrocks.connector.flink.table.source.struct.ColumnRichInfo; -import com.starrocks.connector.flink.table.source.struct.QueryBeXTablets; -import com.starrocks.connector.flink.table.source.struct.QueryInfo; -import com.starrocks.connector.flink.table.source.struct.SelectColumn; -import com.starrocks.connector.flink.tools.EnvUtils; import org.apache.flink.api.common.typeinfo.TypeHint; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.typeutils.ResultTypeQueryable; @@ -28,13 +22,22 @@ import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; import org.apache.flink.table.api.TableSchema; import org.apache.flink.table.data.RowData; + +import com.google.common.base.Strings; +import com.starrocks.connector.flink.table.source.struct.ColumnRichInfo; +import com.starrocks.connector.flink.table.source.struct.QueryBeXTablets; +import com.starrocks.connector.flink.table.source.struct.QueryInfo; +import com.starrocks.connector.flink.table.source.struct.SelectColumn; +import com.starrocks.connector.flink.tools.EnvUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; import java.util.Map; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.stream.Collectors; public class StarRocksDynamicSourceFunction extends RichParallelSourceFunction implements ResultTypeQueryable { @@ -71,7 +74,7 @@ public StarRocksDynamicSourceFunction(TableSchema flinkSchema, StarRocksSourceOp } public StarRocksDynamicSourceFunction(StarRocksSourceOptions sourceOptions, TableSchema flinkSchema, - String filter, long limit, SelectColumn[] selectColumns, String columns, StarRocksSourceQueryType queryType) { + String filter, long limit, SelectColumn[] selectColumns, StarRocksSourceQueryType queryType) { // StarRocksSourceCommonFunc.validateTableStructure(sourceOptions, flinkSchema); this.sourceOptions = sourceOptions; Map columnMap = StarRocksSourceCommonFunc.genColumnMap(flinkSchema); @@ -82,7 +85,7 @@ public StarRocksDynamicSourceFunction(StarRocksSourceOptions sourceOptions, Tabl } else { this.selectColumns = selectColumns; } - String SQL = genSQL(queryType, columns, filter, limit); + String SQL = genSQL(queryType, this.selectColumns, filter, limit); if (queryType == StarRocksSourceQueryType.QueryCount) { this.dataCount = StarRocksSourceCommonFunc.getQueryCount(this.sourceOptions, SQL); } else { @@ -103,16 +106,17 @@ private String genSQL(StarRocksSourceOptions options) { filter; } - private String genSQL(StarRocksSourceQueryType queryType, String columns, String filter, long limit) { + private String genSQL(StarRocksSourceQueryType queryType, SelectColumn[] selectColumns, String filter, long limit) { StringBuilder sqlSb = new StringBuilder("select "); switch (queryType) { case QueryCount: sqlSb.append("count(*)"); break; case QueryAllColumns: - sqlSb.append("*"); - break; case QuerySomeColumns: + String columns = Arrays.stream(selectColumns) + .map(SelectColumn::getColumnName) + .collect(Collectors.joining(",")); sqlSb.append(columns); break; } diff --git a/src/main/java/com/starrocks/connector/flink/table/source/StarRocksDynamicTableSource.java b/src/main/java/com/starrocks/connector/flink/table/source/StarRocksDynamicTableSource.java index 0764ed5d..27dde932 100644 --- a/src/main/java/com/starrocks/connector/flink/table/source/StarRocksDynamicTableSource.java +++ b/src/main/java/com/starrocks/connector/flink/table/source/StarRocksDynamicTableSource.java @@ -14,10 +14,6 @@ package com.starrocks.connector.flink.table.source; -import com.starrocks.connector.flink.table.source.struct.ColumnRichInfo; -import com.starrocks.connector.flink.table.source.struct.PushDownHolder; -import com.starrocks.connector.flink.table.source.struct.SelectColumn; - import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.api.TableSchema; import org.apache.flink.table.connector.ChangelogMode; @@ -31,6 +27,10 @@ import org.apache.flink.table.connector.source.abilities.SupportsProjectionPushDown; import org.apache.flink.table.expressions.ResolvedExpression; +import com.starrocks.connector.flink.table.source.struct.ColumnRichInfo; +import com.starrocks.connector.flink.table.source.struct.PushDownHolder; +import com.starrocks.connector.flink.table.source.struct.SelectColumn; + import java.util.ArrayList; import java.util.Arrays; import java.util.LinkedList; @@ -62,7 +62,6 @@ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) { this.pushDownHolder.getFilter(), this.pushDownHolder.getLimit(), this.pushDownHolder.getSelectColumns(), - this.pushDownHolder.getColumns(), this.pushDownHolder.getQueryType()); return SourceFunctionProvider.of(sourceFunction, true); } @@ -120,8 +119,6 @@ public void applyProjection(int[][] projectedFields) { columnList.add(columnName); selectColumns.add(new SelectColumn(columnName, index)); } - String columns = String.join(", ", columnList); - this.pushDownHolder.setColumns(columns); this.pushDownHolder.setSelectColumns(selectColumns.toArray(new SelectColumn[selectColumns.size()])); } diff --git a/src/main/java/com/starrocks/connector/flink/table/source/struct/PushDownHolder.java b/src/main/java/com/starrocks/connector/flink/table/source/struct/PushDownHolder.java index 8ab38ffe..3d985df8 100644 --- a/src/main/java/com/starrocks/connector/flink/table/source/struct/PushDownHolder.java +++ b/src/main/java/com/starrocks/connector/flink/table/source/struct/PushDownHolder.java @@ -25,7 +25,6 @@ public class PushDownHolder implements Serializable { private String filter = ""; private long limit; private SelectColumn[] selectColumns; - private String columns; private StarRocksSourceQueryType queryType; public String getFilter() { @@ -46,12 +45,6 @@ public SelectColumn[] getSelectColumns() { public void setSelectColumns(SelectColumn[] selectColumns) { this.selectColumns = selectColumns; } - public String getColumns() { - return columns; - } - public void setColumns(String columns) { - this.columns = columns; - } public StarRocksSourceQueryType getQueryType() { return queryType; } diff --git a/src/test/java/com/starrocks/connector/flink/it/source/StarRocksSourceITTest.java b/src/test/java/com/starrocks/connector/flink/it/source/StarRocksSourceITTest.java index 5ac4185a..ad63656d 100644 --- a/src/test/java/com/starrocks/connector/flink/it/source/StarRocksSourceITTest.java +++ b/src/test/java/com/starrocks/connector/flink/it/source/StarRocksSourceITTest.java @@ -41,6 +41,83 @@ public class StarRocksSourceITTest extends StarRocksITTestBase { + @Test + public void testMapSubsetColumns() throws Exception { + String tableName = createPartialTables("testMapSubsetColumns"); + executeSrSQL(String.format("INSERT INTO `%s`.`%s` VALUES (%s)", DB_NAME, tableName, + "0, 1.1, '1', [1], map{1:'1'}")); + + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(1); + StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); + + String createSrcSQL = "CREATE TABLE sr_src(" + + "c3 ARRAY," + + "c4 MAP," + + "c1 FLOAT," + + "c2 STRING" + + ") WITH ( " + + "'connector' = 'starrocks'," + + "'jdbc-url'='" + getJdbcUrl() + "'," + + "'scan-url'='" + String.join(";", getHttpUrls()) + "'," + + "'database-name' = '" + DB_NAME + "'," + + "'table-name' = '" + tableName + "'," + + "'username' = 'root'," + + "'password' = ''" + + ")"; + tEnv.executeSql(createSrcSQL); + List result1 = + CollectionUtil.iteratorToList( + tEnv.executeSql("SELECT * FROM sr_src").collect()); + Map c4 = new HashMap<>(); + c4.put(1, "1"); + Row row1 = Row.of( + new Long[]{1L}, + c4, + 1.1f, + "1" + ); + assertThat(result1).containsExactlyInAnyOrderElementsOf(Collections.singleton(row1)); + + List result2 = + CollectionUtil.iteratorToList( + tEnv.executeSql("SELECT c4 FROM sr_src").collect()); + Row row2 = Row.of(c4); + assertThat(result2).containsExactlyInAnyOrderElementsOf(Collections.singleton(row2)); + + List result3 = + CollectionUtil.iteratorToList( + tEnv.executeSql("SELECT c1, c2, c3, c4 FROM sr_src WHERE c2 = '1'").collect()); + Row row3 = Row.of( + 1.1f, + "1", + new Long[]{1L}, + c4 + ); + assertThat(result3).containsExactlyInAnyOrderElementsOf(Collections.singleton(row3)); + } + + private String createPartialTables(String tablePrefix) throws Exception { + String tableName = tablePrefix + "_" + genRandomUuid(); + String createStarRocksTable = + String.format( + "CREATE TABLE `%s`.`%s` (" + + "c0 INT," + + "c1 FLOAT," + + "c2 STRING," + + "c3 ARRAY," + + "c4 MAP" + + ") ENGINE = OLAP " + + "DUPLICATE KEY(c0) " + + "DISTRIBUTED BY HASH (c0) BUCKETS 8 " + + "PROPERTIES (" + + "\"replication_num\" = \"1\"" + + ")", + DB_NAME, tableName); + executeSrSQL(createStarRocksTable); + return tableName; + } + @Test public void testArrayType() throws Exception { String tableName = createArrayTypeTable("testArrayType"); diff --git a/src/test/java/com/starrocks/connector/flink/table/source/StarRocksDynamicTableSourceTest.java b/src/test/java/com/starrocks/connector/flink/table/source/StarRocksDynamicTableSourceTest.java index edfd49b7..6c01e947 100644 --- a/src/test/java/com/starrocks/connector/flink/table/source/StarRocksDynamicTableSourceTest.java +++ b/src/test/java/com/starrocks/connector/flink/table/source/StarRocksDynamicTableSourceTest.java @@ -14,9 +14,6 @@ package com.starrocks.connector.flink.table.source; -import com.starrocks.connector.flink.it.source.StarRocksSourceBaseTest; -import com.starrocks.connector.flink.table.source.struct.PushDownHolder; - import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.TableEnvironment; @@ -24,6 +21,9 @@ import org.apache.flink.table.expressions.FieldReferenceExpression; import org.apache.flink.table.expressions.ResolvedExpression; import org.apache.flink.table.functions.BuiltInFunctionDefinitions; + +import com.starrocks.connector.flink.it.source.StarRocksSourceBaseTest; +import com.starrocks.connector.flink.table.source.struct.PushDownHolder; import org.junit.Assert; import org.junit.Before; import org.junit.Test; @@ -50,7 +50,6 @@ public void init() { @Test public void testApplyProjection() { dynamicTableSource.applyProjection(PROJECTION_ARRAY); - assertEquals("char_1, int_1", pushDownHolder.getColumns()); for (int i = 0; i < SELECT_COLUMNS.length; i ++) { assertEquals(SELECT_COLUMNS[i].getColumnIndexInFlinkTable(), pushDownHolder.getSelectColumns()[i].getColumnIndexInFlinkTable());