Skip to content

Commit

Permalink
[fix](source) fix projection and function pushdown not functioning co…
Browse files Browse the repository at this point in the history
…rrectly (#383)
  • Loading branch information
vinlee19 authored May 11, 2024
1 parent 3b5d4ed commit 9b49bc4
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -99,15 +99,6 @@ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext runtimeProviderCon
String filterQuery = resolvedFilterQuery.stream().collect(Collectors.joining(" AND "));
readOptions.setFilterQuery(filterQuery);
}
if (StringUtils.isNullOrWhitespaceOnly(readOptions.getReadFields())) {
String[] selectFields =
DataType.getFieldNames(physicalRowDataType).toArray(new String[0]);
readOptions.setReadFields(
Arrays.stream(selectFields)
.map(item -> String.format("`%s`", item.trim().replace("`", "")))
.collect(Collectors.joining(", ")));
}

if (readOptions.getUseOldApi()) {
List<PartitionDefinition> dorisPartitions;
try {
Expand Down Expand Up @@ -194,7 +185,8 @@ public Result applyFilters(List<ResolvedExpression> filters) {
DorisExpressionVisitor expressionVisitor = new DorisExpressionVisitor();
for (ResolvedExpression filter : filters) {
String filterQuery = filter.accept(expressionVisitor);
if (!StringUtils.isNullOrWhitespaceOnly(filterQuery)) {
if (StringUtils.isNullOrWhitespaceOnly(readOptions.getFilterQuery())
&& !StringUtils.isNullOrWhitespaceOnly(filterQuery)) {
acceptedFilters.add(filter);
this.resolvedFilterQuery.add(filterQuery);
} else {
Expand All @@ -212,5 +204,13 @@ public boolean supportsNestedProjection() {
@Override
public void applyProjection(int[][] projectedFields, DataType producedDataType) {
this.physicalRowDataType = Projection.of(projectedFields).project(physicalRowDataType);
if (StringUtils.isNullOrWhitespaceOnly(readOptions.getReadFields())) {
String[] selectFields =
DataType.getFieldNames(physicalRowDataType).toArray(new String[0]);
this.readOptions.setReadFields(
Arrays.stream(selectFields)
.map(item -> String.format("`%s`", item.trim().replace("`", "")))
.collect(Collectors.joining(", ")));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.flink.table.expressions.ValueLiteralExpression;
import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
import org.apache.flink.table.types.logical.LogicalTypeRoot;
import org.apache.flink.util.StringUtils;

import java.util.List;

Expand Down Expand Up @@ -66,12 +67,22 @@ public String visit(CallExpression call) {
if (BuiltInFunctionDefinitions.IS_NOT_NULL.equals(call.getFunctionDefinition())) {
return combineLeftExpression("IS NOT NULL", call.getResolvedChildren().get(0));
}

if (BuiltInFunctionDefinitions.CAST.equals(call.getFunctionDefinition())) {
return call.getChildren().get(0).accept(this);
}
return null;
}

private String combineExpression(String operator, List<ResolvedExpression> operand) {
String left = operand.get(0).accept(this);
if (StringUtils.isNullOrWhitespaceOnly(left)) {
return null;
}
String right = operand.get(1).accept(this);
if (StringUtils.isNullOrWhitespaceOnly(right)) {
return null;
}
return String.format("(%s %s %s)", left, operator, right);
}

Expand Down

0 comments on commit 9b49bc4

Please sign in to comment.