Skip to content

Commit

Permalink
[Enhancement] Support to map a subset of StarRocks columns to Flink s…
Browse files Browse the repository at this point in the history
…ource table

Signed-off-by: PengFei Li <[email protected]>
  • Loading branch information
banmoy committed Apr 12, 2024
1 parent 45fceec commit 1f46522
Show file tree
Hide file tree
Showing 5 changed files with 99 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,6 @@

package com.starrocks.connector.flink.table.source;

import com.google.common.base.Strings;
import com.starrocks.connector.flink.table.source.struct.ColumnRichInfo;
import com.starrocks.connector.flink.table.source.struct.QueryBeXTablets;
import com.starrocks.connector.flink.table.source.struct.QueryInfo;
import com.starrocks.connector.flink.table.source.struct.SelectColumn;
import com.starrocks.connector.flink.tools.EnvUtils;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
Expand All @@ -28,13 +22,22 @@
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.data.RowData;

import com.google.common.base.Strings;
import com.starrocks.connector.flink.table.source.struct.ColumnRichInfo;
import com.starrocks.connector.flink.table.source.struct.QueryBeXTablets;
import com.starrocks.connector.flink.table.source.struct.QueryInfo;
import com.starrocks.connector.flink.table.source.struct.SelectColumn;
import com.starrocks.connector.flink.tools.EnvUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;

public class StarRocksDynamicSourceFunction extends RichParallelSourceFunction<RowData> implements ResultTypeQueryable<RowData> {

Expand Down Expand Up @@ -71,7 +74,7 @@ public StarRocksDynamicSourceFunction(TableSchema flinkSchema, StarRocksSourceOp
}

public StarRocksDynamicSourceFunction(StarRocksSourceOptions sourceOptions, TableSchema flinkSchema,
String filter, long limit, SelectColumn[] selectColumns, String columns, StarRocksSourceQueryType queryType) {
String filter, long limit, SelectColumn[] selectColumns, StarRocksSourceQueryType queryType) {
// StarRocksSourceCommonFunc.validateTableStructure(sourceOptions, flinkSchema);
this.sourceOptions = sourceOptions;
Map<String, ColumnRichInfo> columnMap = StarRocksSourceCommonFunc.genColumnMap(flinkSchema);
Expand All @@ -82,7 +85,7 @@ public StarRocksDynamicSourceFunction(StarRocksSourceOptions sourceOptions, Tabl
} else {
this.selectColumns = selectColumns;
}
String SQL = genSQL(queryType, columns, filter, limit);
String SQL = genSQL(queryType, this.selectColumns, filter, limit);
if (queryType == StarRocksSourceQueryType.QueryCount) {
this.dataCount = StarRocksSourceCommonFunc.getQueryCount(this.sourceOptions, SQL);
} else {
Expand All @@ -103,16 +106,17 @@ private String genSQL(StarRocksSourceOptions options) {
filter;
}

private String genSQL(StarRocksSourceQueryType queryType, String columns, String filter, long limit) {
private String genSQL(StarRocksSourceQueryType queryType, SelectColumn[] selectColumns, String filter, long limit) {
StringBuilder sqlSb = new StringBuilder("select ");
switch (queryType) {
case QueryCount:
sqlSb.append("count(*)");
break;
case QueryAllColumns:
sqlSb.append("*");
break;
case QuerySomeColumns:
String columns = Arrays.stream(selectColumns)
.map(SelectColumn::getColumnName)
.collect(Collectors.joining(","));
sqlSb.append(columns);
break;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,6 @@

package com.starrocks.connector.flink.table.source;

import com.starrocks.connector.flink.table.source.struct.ColumnRichInfo;
import com.starrocks.connector.flink.table.source.struct.PushDownHolder;
import com.starrocks.connector.flink.table.source.struct.SelectColumn;

import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.connector.ChangelogMode;
Expand All @@ -31,6 +27,10 @@
import org.apache.flink.table.connector.source.abilities.SupportsProjectionPushDown;
import org.apache.flink.table.expressions.ResolvedExpression;

import com.starrocks.connector.flink.table.source.struct.ColumnRichInfo;
import com.starrocks.connector.flink.table.source.struct.PushDownHolder;
import com.starrocks.connector.flink.table.source.struct.SelectColumn;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.LinkedList;
Expand Down Expand Up @@ -62,7 +62,6 @@ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) {
this.pushDownHolder.getFilter(),
this.pushDownHolder.getLimit(),
this.pushDownHolder.getSelectColumns(),
this.pushDownHolder.getColumns(),
this.pushDownHolder.getQueryType());
return SourceFunctionProvider.of(sourceFunction, true);
}
Expand Down Expand Up @@ -120,8 +119,6 @@ public void applyProjection(int[][] projectedFields) {
columnList.add(columnName);
selectColumns.add(new SelectColumn(columnName, index));
}
String columns = String.join(", ", columnList);
this.pushDownHolder.setColumns(columns);
this.pushDownHolder.setSelectColumns(selectColumns.toArray(new SelectColumn[selectColumns.size()]));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ public class PushDownHolder implements Serializable {
private String filter = "";
private long limit;
private SelectColumn[] selectColumns;
private String columns;
private StarRocksSourceQueryType queryType;

public String getFilter() {
Expand All @@ -46,12 +45,6 @@ public SelectColumn[] getSelectColumns() {
public void setSelectColumns(SelectColumn[] selectColumns) {
this.selectColumns = selectColumns;
}
public String getColumns() {
return columns;
}
public void setColumns(String columns) {
this.columns = columns;
}
public StarRocksSourceQueryType getQueryType() {
return queryType;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,83 @@

public class StarRocksSourceITTest extends StarRocksITTestBase {

@Test
public void testMapSubsetColumns() throws Exception {
String tableName = createPartialTables("testMapSubsetColumns");
executeSrSQL(String.format("INSERT INTO `%s`.`%s` VALUES (%s)", DB_NAME, tableName,
"0, 1.1, '1', [1], map{1:'1'}"));

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

String createSrcSQL = "CREATE TABLE sr_src(" +
"c3 ARRAY<BIGINT>," +
"c4 MAP<INT, STRING>," +
"c1 FLOAT," +
"c2 STRING" +
") WITH ( " +
"'connector' = 'starrocks'," +
"'jdbc-url'='" + getJdbcUrl() + "'," +
"'scan-url'='" + String.join(";", getHttpUrls()) + "'," +
"'database-name' = '" + DB_NAME + "'," +
"'table-name' = '" + tableName + "'," +
"'username' = 'root'," +
"'password' = ''" +
")";
tEnv.executeSql(createSrcSQL);
List<Row> result1 =
CollectionUtil.iteratorToList(
tEnv.executeSql("SELECT * FROM sr_src").collect());
Map<Integer, String> c4 = new HashMap<>();
c4.put(1, "1");
Row row1 = Row.of(
new Long[]{1L},
c4,
1.1f,
"1"
);
assertThat(result1).containsExactlyInAnyOrderElementsOf(Collections.singleton(row1));

List<Row> result2 =
CollectionUtil.iteratorToList(
tEnv.executeSql("SELECT c4 FROM sr_src").collect());
Row row2 = Row.of(c4);
assertThat(result2).containsExactlyInAnyOrderElementsOf(Collections.singleton(row2));

List<Row> result3 =
CollectionUtil.iteratorToList(
tEnv.executeSql("SELECT c1, c2, c3, c4 FROM sr_src WHERE c2 = '1'").collect());
Row row3 = Row.of(
1.1f,
"1",
new Long[]{1L},
c4
);
assertThat(result3).containsExactlyInAnyOrderElementsOf(Collections.singleton(row3));
}

private String createPartialTables(String tablePrefix) throws Exception {
String tableName = tablePrefix + "_" + genRandomUuid();
String createStarRocksTable =
String.format(
"CREATE TABLE `%s`.`%s` (" +
"c0 INT," +
"c1 FLOAT," +
"c2 STRING," +
"c3 ARRAY<BIGINT>," +
"c4 MAP<INT, STRING>" +
") ENGINE = OLAP " +
"DUPLICATE KEY(c0) " +
"DISTRIBUTED BY HASH (c0) BUCKETS 8 " +
"PROPERTIES (" +
"\"replication_num\" = \"1\"" +
")",
DB_NAME, tableName);
executeSrSQL(createStarRocksTable);
return tableName;
}

@Test
public void testArrayType() throws Exception {
String tableName = createArrayTypeTable("testArrayType");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,16 @@

package com.starrocks.connector.flink.table.source;

import com.starrocks.connector.flink.it.source.StarRocksSourceBaseTest;
import com.starrocks.connector.flink.table.source.struct.PushDownHolder;

import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.expressions.CallExpression;
import org.apache.flink.table.expressions.FieldReferenceExpression;
import org.apache.flink.table.expressions.ResolvedExpression;
import org.apache.flink.table.functions.BuiltInFunctionDefinitions;

import com.starrocks.connector.flink.it.source.StarRocksSourceBaseTest;
import com.starrocks.connector.flink.table.source.struct.PushDownHolder;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
Expand All @@ -50,7 +50,6 @@ public void init() {
@Test
public void testApplyProjection() {
dynamicTableSource.applyProjection(PROJECTION_ARRAY);
assertEquals("char_1, int_1", pushDownHolder.getColumns());

for (int i = 0; i < SELECT_COLUMNS.length; i ++) {
assertEquals(SELECT_COLUMNS[i].getColumnIndexInFlinkTable(), pushDownHolder.getSelectColumns()[i].getColumnIndexInFlinkTable());
Expand Down

0 comments on commit 1f46522

Please sign in to comment.