Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BugFix] Fix predicate push-down time dimension table error #376

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -34,15 +34,17 @@
import java.text.SimpleDateFormat;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Calendar;
import java.util.Date;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

public class StarRocksDynamicLRUFunction extends TableFunction<RowData> {

private static final Logger LOG = LoggerFactory.getLogger(StarRocksDynamicLRUFunction.class);

private final ColumnRichInfo[] filterRichInfos;
private final StarRocksSourceOptions sourceOptions;
private final ArrayList<String> filterList;
Expand All @@ -56,7 +58,7 @@ public class StarRocksDynamicLRUFunction extends TableFunction<RowData> {
private final long cacheExpireMs;
private final int maxRetryTimes;

public StarRocksDynamicLRUFunction(StarRocksSourceOptions sourceOptions,
public StarRocksDynamicLRUFunction(StarRocksSourceOptions sourceOptions,
ColumnRichInfo[] filterRichInfos,
List<ColumnRichInfo> columnRichInfos,
SelectColumn[] selectColumns) {
Expand All @@ -72,7 +74,7 @@ public StarRocksDynamicLRUFunction(StarRocksSourceOptions sourceOptions,
this.filterList = new ArrayList<>();
this.dataReaderList = new ArrayList<>();
}

@Override
public void open(FunctionContext context) throws Exception {
super.open(context);
Expand Down Expand Up @@ -101,14 +103,17 @@ public void eval(Object... keys) {
}
String filter = String.join(" and ", filterList);
filterList.clear();
String SQL = "select * from " + sourceOptions.getDatabaseName() + "." + sourceOptions.getTableName() + " where " + filter;
String columns = Arrays.stream(selectColumns)
.map(col -> "`" + col.getColumnName() + "`")
.collect(Collectors.joining(","));
String SQL = "select " + columns + " from " + sourceOptions.getDatabaseName() + "." + sourceOptions.getTableName() + " where " + filter;
LOG.info("LookUpFunction SQL [{}]", SQL);
this.queryInfo = StarRocksSourceCommonFunc.getQueryInfo(this.sourceOptions, SQL);
List<List<QueryBeXTablets>> lists = StarRocksSourceCommonFunc.splitQueryBeXTablets(1, queryInfo);
lists.get(0).forEach(beXTablets -> {
StarRocksSourceBeReader beReader = new StarRocksSourceBeReader(beXTablets.getBeNode(),
columnRichInfos,
selectColumns,
selectColumns,
sourceOptions);
beReader.openScanner(beXTablets.getTabletIds(), queryInfo.getQueryPlan().getOpaqued_query_plan(), sourceOptions);
beReader.startToRead();
Expand All @@ -132,7 +137,7 @@ public void eval(Object... keys) {
});
rows.trimToSize();
cache.put(keyRow, rows);
}
}
}

private void getFieldValue(Object obj, ColumnRichInfo columnRichInfo) {
Expand All @@ -147,9 +152,9 @@ private void getFieldValue(Object obj, ColumnRichInfo columnRichInfo) {
filter = columnRichInfo.getColumnName() + " = '" + sdf.format(d).toString() + "'";
}
if (flinkTypeRoot == LogicalTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE ||
flinkTypeRoot == LogicalTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZONE ||
flinkTypeRoot == LogicalTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZONE ||
flinkTypeRoot == LogicalTypeRoot.TIMESTAMP_WITH_TIME_ZONE) {

DateTimeFormatter dtf = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSSSSS");
String strDateTime = dtf.format(((TimestampData)obj).toLocalDateTime());
filter = columnRichInfo.getColumnName() + " = '" + strDateTime + "'";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,13 @@
import org.apache.flink.table.connector.source.abilities.SupportsFilterPushDown;
import org.apache.flink.table.connector.source.abilities.SupportsLimitPushDown;
import org.apache.flink.table.connector.source.abilities.SupportsProjectionPushDown;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.expressions.ResolvedExpression;

import com.starrocks.connector.flink.table.source.struct.ColumnRichInfo;
import com.starrocks.connector.flink.table.source.struct.PushDownHolder;
import com.starrocks.connector.flink.table.source.struct.SelectColumn;
import org.apache.flink.table.functions.TableFunction;

import java.util.ArrayList;
import java.util.Arrays;
Expand Down Expand Up @@ -58,32 +60,77 @@ public ChangelogMode getChangelogMode() {
@Override
public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) {
StarRocksDynamicSourceFunction sourceFunction = new StarRocksDynamicSourceFunction(
options, flinkSchema,
this.pushDownHolder.getFilter(),
this.pushDownHolder.getLimit(),
this.pushDownHolder.getSelectColumns(),
options, flinkSchema,
this.pushDownHolder.getFilter(),
this.pushDownHolder.getLimit(),
this.pushDownHolder.getSelectColumns(),
this.pushDownHolder.getQueryType());
return SourceFunctionProvider.of(sourceFunction, true);
}

@Override
public LookupRuntimeProvider getLookupRuntimeProvider(LookupContext context) {
int[] projectedFields = Arrays.stream(context.getKeys()).mapToInt(value -> value[0]).toArray();
Map<String, ColumnRichInfo> columnMap = StarRocksSourceCommonFunc.genColumnMap(flinkSchema);
List<ColumnRichInfo> allColumnRichInfos =
StarRocksSourceCommonFunc.genColumnRichInfo(columnMap);
SelectColumn[] pushDownSelectColumns = pushDownHolder.getSelectColumns();
SelectColumn[] selectColumns;
List<ColumnRichInfo> columnRichInfos;
int[] projectedFields =
Arrays.stream(context.getKeys()).mapToInt(value -> value[0]).toArray();
ColumnRichInfo[] filerRichInfo = new ColumnRichInfo[projectedFields.length];
for (int i = 0; i < projectedFields.length; i ++) {
ColumnRichInfo columnRichInfo = new ColumnRichInfo(
this.flinkSchema.getFieldName(projectedFields[i]).get(),
projectedFields[i],
this.flinkSchema.getFieldDataType(projectedFields[i]).get()
);
filerRichInfo[i] = columnRichInfo;
StarRocksSourceQueryType queryType = pushDownHolder.getQueryType();
if (queryType == StarRocksSourceQueryType.QuerySomeColumns) {
columnRichInfos = new ArrayList<>();
selectColumns = new SelectColumn[pushDownSelectColumns.length];
for (int i = 0; i < pushDownSelectColumns.length; i++) {
ColumnRichInfo columnRichInfo =
allColumnRichInfos.get(
pushDownSelectColumns[i].getColumnIndexInFlinkTable());
columnRichInfos.add(
new ColumnRichInfo(
columnRichInfo.getColumnName(), i, columnRichInfo.getDataType()));
selectColumns[i] = new SelectColumn(columnRichInfo.getColumnName(), i);
}
for (int i = 0; i < projectedFields.length; i++) {
int columnIndexInFlinkTable = pushDownSelectColumns[i].getColumnIndexInFlinkTable();
ColumnRichInfo columnRichInfo =
new ColumnRichInfo(
this.flinkSchema.getFieldName(columnIndexInFlinkTable).get(),
i,
this.flinkSchema.getFieldDataType(columnIndexInFlinkTable).get());

filerRichInfo[i] = columnRichInfo;
}
} else {
columnRichInfos = allColumnRichInfos;
selectColumns =
StarRocksSourceCommonFunc.genSelectedColumns(
columnMap, this.options, allColumnRichInfos);
for (int i = 0; i < projectedFields.length; i++) {
ColumnRichInfo columnRichInfo =
new ColumnRichInfo(
this.flinkSchema.getFieldName(i).get(),
projectedFields[i],
this.flinkSchema.getFieldDataType(i).get());
filerRichInfo[i] = columnRichInfo;
}
}

Map<String, ColumnRichInfo> columnMap = StarRocksSourceCommonFunc.genColumnMap(flinkSchema);
List<ColumnRichInfo> ColumnRichInfos = StarRocksSourceCommonFunc.genColumnRichInfo(columnMap);
SelectColumn[] selectColumns = StarRocksSourceCommonFunc.genSelectedColumns(columnMap, this.options, ColumnRichInfos);

StarRocksDynamicLookupFunction tableFunction = new StarRocksDynamicLookupFunction(this.options, filerRichInfo, ColumnRichInfos, selectColumns);
TableFunction<RowData> tableFunction = null;
StarRocksSourceOptions.CacheType lookupCacheType = options.getLookupCacheType();
switch (lookupCacheType) {
case ALL:
tableFunction =
new StarRocksDynamicLookupFunction(
this.options, filerRichInfo, columnRichInfos, selectColumns);
break;
case LRU:
tableFunction =
new StarRocksDynamicLRUFunction(
this.options, filerRichInfo, columnRichInfos, selectColumns);
break;
}
return TableFunctionProvider.of(tableFunction);
}

Expand Down Expand Up @@ -113,7 +160,7 @@ public void applyProjection(int[][] projectedFields) {
this.pushDownHolder.setQueryType(StarRocksSourceQueryType.QuerySomeColumns);

ArrayList<String> columnList = new ArrayList<>();
ArrayList<SelectColumn> selectColumns = new ArrayList<SelectColumn>();
ArrayList<SelectColumn> selectColumns = new ArrayList<SelectColumn>();
for (int index : curProjectedFields) {
String columnName = flinkSchema.getFieldName(index).get();
columnList.add(columnName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ public Set<ConfigOption<?>> optionalOptions() {
options.add(StarRocksSourceOptions.LOOKUP_CACHE_TTL_MS);
options.add(StarRocksSourceOptions.LOOKUP_CACHE_MAX_ROWS);
options.add(StarRocksSourceOptions.LOOKUP_MAX_RETRIES);
options.add(StarRocksSourceOptions.LOOKUP_CACHE_TYPE);
return options;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,11 @@


public class StarRocksSourceCommonFunc {

private static volatile StarRocksQueryVisitor starrocksQueryVisitor;

private static volatile StarRocksQueryPlanVisitor starRocksQueryPlanVisitor;


private static StarRocksQueryVisitor getStarRocksQueryVisitor(StarRocksSourceOptions sourceOptions) {
if (null == starrocksQueryVisitor) {
Expand Down Expand Up @@ -84,15 +84,15 @@ public static List<List<QueryBeXTablets>> splitQueryBeXTablets(int subTaskCount,
curBeXTabletList.set(i, Collections.singletonList(queryInfo.getBeXTablets().get(i)));
}
return curBeXTabletList;
}
}
if (subTaskCount < beXTabletsListCount) {
for (int i = 0; i < beXTabletsListCount; i ++) {
List<QueryBeXTablets> tList = curBeXTabletList.get(i%subTaskCount);
tList.add(queryInfo.getBeXTablets().get(i));
curBeXTabletList.set(i%subTaskCount, tList);
}
return curBeXTabletList;
}
}
List<QueryBeXTablets> beWithSingleTabletList = new ArrayList<>();
queryInfo.getBeXTablets().forEach(beXTablets -> {
beXTablets.getTabletIds().forEach(tabletId -> {
Expand All @@ -106,7 +106,7 @@ public static List<List<QueryBeXTablets>> splitQueryBeXTablets(int subTaskCount,
curBeXTabletList.set(i, Collections.singletonList(beWithSingleTabletList.get(i)));
}
return curBeXTabletList;
}
}
long newx = Math.round(x);
for (int i = 0; i < subTaskCount; i ++) {
int start = (int)(i * newx);
Expand All @@ -124,7 +124,7 @@ public static List<List<QueryBeXTablets>> splitQueryBeXTablets(int subTaskCount,
curBxTs = beWithSingleTabletList.subList(start, end);
Map<String, List<Long>> beXTabletsMap = new HashMap<>();
curBxTs.forEach(curBxT -> {
List<Long> tablets = new ArrayList<>();
List<Long> tablets = new ArrayList<>();
if (beXTabletsMap.containsKey(curBxT.getBeNode())) {
tablets = beXTabletsMap.get(curBxT.getBeNode());
} else {
Expand Down Expand Up @@ -174,8 +174,12 @@ public static List<ColumnRichInfo> genColumnRichInfo(Map<String, ColumnRichInfo>
return columnMap.values().stream().sorted(Comparator.comparing(ColumnRichInfo::getColumnIndexInSchema)).collect(Collectors.toList());
}

public static List<ColumnRichInfo> getSelectSql(Map<String, ColumnRichInfo> columnMap) {
return columnMap.values().stream().sorted(Comparator.comparing(ColumnRichInfo::getColumnIndexInSchema)).collect(Collectors.toList());
}

public static SelectColumn[] genSelectedColumns(Map<String, ColumnRichInfo> columnMap,
StarRocksSourceOptions sourceOptions,
StarRocksSourceOptions sourceOptions,
List<ColumnRichInfo> columnRichInfos) {
List<SelectColumn> selectedColumns = new ArrayList<>();
// user selected columns from sourceOptions
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,24 +53,24 @@ public class StarRocksSourceOptions implements Serializable {

public static final ConfigOption<String> TABLE_NAME = ConfigOptions.key("table-name")
.stringType().noDefaultValue().withDescription("Table name");


// optional Options
public static final ConfigOption<Integer> SCAN_CONNECT_TIMEOUT = ConfigOptions.key("scan.connect.timeout-ms")
.intType().defaultValue(1000).withDescription("Connect timeout");

public static final ConfigOption<Integer> SCAN_BATCH_ROWS = ConfigOptions.key("scan.params.batch-rows")
.intType().defaultValue(1000).withDescription("Batch rows");

public static final ConfigOption<String> SCAN_PROPERTIES = ConfigOptions.key("scan.params.properties")
.stringType().noDefaultValue().withDescription("Reserved params for use");

public static final ConfigOption<Integer> SCAN_LIMIT = ConfigOptions.key("scan.params.limit")
.intType().defaultValue(1).withDescription("The query limit, if specified.");

public static final ConfigOption<Integer> SCAN_KEEP_ALIVE_MIN = ConfigOptions.key("scan.params.keep-alive-min")
.intType().defaultValue(10).withDescription("Max keep alive time min");

public static final ConfigOption<Integer> SCAN_QUERTY_TIMEOUT_S = ConfigOptions.key("scan.params.query-timeout-s")
.intType().defaultValue(600).withDescription("Query timeout for a single query");

Expand All @@ -88,7 +88,7 @@ public class StarRocksSourceOptions implements Serializable {

public static final ConfigOption<String> SCAN_BE_HOST_MAPPING_LIST = ConfigOptions.key("scan.be-host-mapping-list")
.stringType().defaultValue("").withDescription("List of be host mapping");

// lookup Options
public static final ConfigOption<Long> LOOKUP_CACHE_MAX_ROWS = ConfigOptions.key("lookup.cache.max-rows")
.longType().defaultValue(-1L).withDescription(
Expand All @@ -102,6 +102,12 @@ public class StarRocksSourceOptions implements Serializable {
public static final ConfigOption<Integer> LOOKUP_MAX_RETRIES = ConfigOptions.key("lookup.max-retries")
.intType().defaultValue(1).withDescription("the max retry times if lookup database failed.");

public static final ConfigOption<CacheType> LOOKUP_CACHE_TYPE =
ConfigOptions.key("lookup.cache-type")
.enumType(CacheType.class)
.defaultValue(CacheType.ALL)
.withDescription("lookup type.");


public static final String SOURCE_PROPERTIES_PREFIX = "scan.params.";

Expand Down Expand Up @@ -150,7 +156,7 @@ public String getScanUrl() {
public String getJdbcUrl() {
return tableOptions.get(JDBC_URL);
}

public String getUsername() {
return tableOptions.get(USERNAME);
}
Expand All @@ -169,8 +175,8 @@ public String getTableName() {


// optional Options
public int getConnectTimeoutMs() {
return tableOptions.get(SCAN_CONNECT_TIMEOUT).intValue();
public int getConnectTimeoutMs() {
return tableOptions.get(SCAN_CONNECT_TIMEOUT).intValue();
}

public int getBatchRows() {
Expand Down Expand Up @@ -236,10 +242,23 @@ public int getLookupMaxRetries() {
return tableOptions.get(LOOKUP_MAX_RETRIES).intValue();
}


public static Builder builder() {
return new Builder();
}

public CacheType getLookupCacheType() {
return tableOptions.get(LOOKUP_CACHE_TYPE);
}

/**
* Cache Type
*/
public enum CacheType {
LRU,
ALL
}

/**
* Builder for {@link StarRocksSourceOptions}.
*/
Expand Down