diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSource.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSource.java index 32851d40d..e2b837c32 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSource.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSource.java @@ -99,15 +99,6 @@ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext runtimeProviderCon String filterQuery = resolvedFilterQuery.stream().collect(Collectors.joining(" AND ")); readOptions.setFilterQuery(filterQuery); } - if (StringUtils.isNullOrWhitespaceOnly(readOptions.getReadFields())) { - String[] selectFields = - DataType.getFieldNames(physicalRowDataType).toArray(new String[0]); - readOptions.setReadFields( - Arrays.stream(selectFields) - .map(item -> String.format("`%s`", item.trim().replace("`", ""))) - .collect(Collectors.joining(", "))); - } - if (readOptions.getUseOldApi()) { List dorisPartitions; try { @@ -194,7 +185,8 @@ public Result applyFilters(List filters) { DorisExpressionVisitor expressionVisitor = new DorisExpressionVisitor(); for (ResolvedExpression filter : filters) { String filterQuery = filter.accept(expressionVisitor); - if (!StringUtils.isNullOrWhitespaceOnly(filterQuery)) { + if (StringUtils.isNullOrWhitespaceOnly(readOptions.getFilterQuery()) + && !StringUtils.isNullOrWhitespaceOnly(filterQuery)) { acceptedFilters.add(filter); this.resolvedFilterQuery.add(filterQuery); } else { @@ -212,5 +204,13 @@ public boolean supportsNestedProjection() { @Override public void applyProjection(int[][] projectedFields, DataType producedDataType) { this.physicalRowDataType = Projection.of(projectedFields).project(physicalRowDataType); + if (StringUtils.isNullOrWhitespaceOnly(readOptions.getReadFields())) { + String[] selectFields = + DataType.getFieldNames(physicalRowDataType).toArray(new String[0]); + this.readOptions.setReadFields( + Arrays.stream(selectFields) + .map(item -> String.format("`%s`", item.trim().replace("`", ""))) + .collect(Collectors.joining(", "))); + } } } diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisExpressionVisitor.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisExpressionVisitor.java index 3f327fe2d..66242e1e1 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisExpressionVisitor.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisExpressionVisitor.java @@ -26,6 +26,7 @@ import org.apache.flink.table.expressions.ValueLiteralExpression; import org.apache.flink.table.functions.BuiltInFunctionDefinitions; import org.apache.flink.table.types.logical.LogicalTypeRoot; +import org.apache.flink.util.StringUtils; import java.util.List; @@ -66,12 +67,22 @@ public String visit(CallExpression call) { if (BuiltInFunctionDefinitions.IS_NOT_NULL.equals(call.getFunctionDefinition())) { return combineLeftExpression("IS NOT NULL", call.getResolvedChildren().get(0)); } + + if (BuiltInFunctionDefinitions.CAST.equals(call.getFunctionDefinition())) { + return call.getChildren().get(0).accept(this); + } return null; } private String combineExpression(String operator, List operand) { String left = operand.get(0).accept(this); + if (StringUtils.isNullOrWhitespaceOnly(left)) { + return null; + } String right = operand.get(1).accept(this); + if (StringUtils.isNullOrWhitespaceOnly(right)) { + return null; + } return String.format("(%s %s %s)", left, operator, right); }