From 2b9a8b1f750a7dc182b70ff5aab6e2acf505f512 Mon Sep 17 00:00:00 2001 From: vinlee19 <1401597760@qq.com> Date: Thu, 9 May 2024 17:00:48 +0800 Subject: [PATCH 01/11] fix cast function can not pushdown --- .../org/apache/doris/flink/table/DorisExpressionVisitor.java | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisExpressionVisitor.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisExpressionVisitor.java index 3f327fe2d..72a14c5f4 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisExpressionVisitor.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisExpressionVisitor.java @@ -66,6 +66,10 @@ public String visit(CallExpression call) { if (BuiltInFunctionDefinitions.IS_NOT_NULL.equals(call.getFunctionDefinition())) { return combineLeftExpression("IS NOT NULL", call.getResolvedChildren().get(0)); } + + if (BuiltInFunctionDefinitions.CAST.equals(call.getFunctionDefinition())) { + return call.getChildren().get(0).accept(this); + } return null; } From d0c80f95441b796781829ff3dec1de112b9446a8 Mon Sep 17 00:00:00 2001 From: vinlee19 <1401597760@qq.com> Date: Thu, 9 May 2024 18:46:36 +0800 Subject: [PATCH 02/11] add more function pushdown --- .../doris/flink/table/DorisExpressionVisitor.java | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisExpressionVisitor.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisExpressionVisitor.java index 72a14c5f4..2aacdc17d 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisExpressionVisitor.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisExpressionVisitor.java @@ -67,6 +67,18 @@ public String visit(CallExpression call) { return combineLeftExpression("IS NOT NULL", call.getResolvedChildren().get(0)); } + if (BuiltInFunctionDefinitions.PLUS.equals(call.getFunctionDefinition())) { + return combineExpression("+", call.getResolvedChildren()); + } + + if (BuiltInFunctionDefinitions.DIVIDE.equals(call.getFunctionDefinition())) { + return combineExpression("/", call.getResolvedChildren()); + } + + if (BuiltInFunctionDefinitions.BETWEEN.equals(call.getFunctionDefinition())) { + return combineExpression("between", call.getResolvedChildren()); + } + if (BuiltInFunctionDefinitions.CAST.equals(call.getFunctionDefinition())) { return call.getChildren().get(0).accept(this); } From 79c27a1ed98f3f27c6446a06d3f4c63932ac22a3 Mon Sep 17 00:00:00 2001 From: vinlee19 <1401597760@qq.com> Date: Fri, 10 May 2024 12:17:19 +0800 Subject: [PATCH 03/11] fix can not projection --- .../flink/table/DorisDynamicTableSource.java | 24 +++++++++---------- 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSource.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSource.java index 32851d40d..7f46dda4d 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSource.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSource.java @@ -95,18 +95,6 @@ public ChangelogMode getChangelogMode() { @Override public ScanRuntimeProvider getScanRuntimeProvider(ScanContext runtimeProviderContext) { - if (StringUtils.isNullOrWhitespaceOnly(readOptions.getFilterQuery())) { - String filterQuery = resolvedFilterQuery.stream().collect(Collectors.joining(" AND ")); - readOptions.setFilterQuery(filterQuery); - } - if (StringUtils.isNullOrWhitespaceOnly(readOptions.getReadFields())) { - String[] selectFields = - DataType.getFieldNames(physicalRowDataType).toArray(new String[0]); - readOptions.setReadFields( - Arrays.stream(selectFields) - .map(item -> String.format("`%s`", item.trim().replace("`", ""))) - .collect(Collectors.joining(", "))); - } if (readOptions.getUseOldApi()) { List dorisPartitions; @@ -212,5 +200,17 @@ public boolean supportsNestedProjection() { @Override public void applyProjection(int[][] projectedFields, DataType producedDataType) { this.physicalRowDataType = Projection.of(projectedFields).project(physicalRowDataType); + if (StringUtils.isNullOrWhitespaceOnly(readOptions.getFilterQuery())) { + String filterQuery = resolvedFilterQuery.stream().collect(Collectors.joining(" AND ")); + this.readOptions.setFilterQuery(filterQuery); + } + if (StringUtils.isNullOrWhitespaceOnly(readOptions.getReadFields())) { + String[] selectFields = + DataType.getFieldNames(physicalRowDataType).toArray(new String[0]); + this.readOptions.setReadFields( + Arrays.stream(selectFields) + .map(item -> String.format("`%s`", item.trim().replace("`", ""))) + .collect(Collectors.joining(", "))); + } } } From bcf272da9b8b9ee952d36fcc99f3ef35133b0501 Mon Sep 17 00:00:00 2001 From: vinlee19 <1401597760@qq.com> Date: Fri, 10 May 2024 14:26:51 +0800 Subject: [PATCH 04/11] fix can not projection --- .../flink/table/DorisDynamicTableSource.java | 9 ++++---- .../flink/table/DorisExpressionVisitor.java | 23 ++++++++++++++++++- 2 files changed, 27 insertions(+), 5 deletions(-) diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSource.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSource.java index 7f46dda4d..a216c35b5 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSource.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSource.java @@ -189,6 +189,10 @@ public Result applyFilters(List filters) { remainingFilters.add(filter); } } + if (StringUtils.isNullOrWhitespaceOnly(readOptions.getFilterQuery())) { + String filterQuery = resolvedFilterQuery.stream().collect(Collectors.joining(" AND ")); + this.readOptions.setFilterQuery(filterQuery); + } return Result.of(acceptedFilters, remainingFilters); } @@ -200,10 +204,7 @@ public boolean supportsNestedProjection() { @Override public void applyProjection(int[][] projectedFields, DataType producedDataType) { this.physicalRowDataType = Projection.of(projectedFields).project(physicalRowDataType); - if (StringUtils.isNullOrWhitespaceOnly(readOptions.getFilterQuery())) { - String filterQuery = resolvedFilterQuery.stream().collect(Collectors.joining(" AND ")); - this.readOptions.setFilterQuery(filterQuery); - } + if (StringUtils.isNullOrWhitespaceOnly(readOptions.getReadFields())) { String[] selectFields = DataType.getFieldNames(physicalRowDataType).toArray(new String[0]); diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisExpressionVisitor.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisExpressionVisitor.java index 2aacdc17d..a0707767e 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisExpressionVisitor.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisExpressionVisitor.java @@ -71,12 +71,28 @@ public String visit(CallExpression call) { return combineExpression("+", call.getResolvedChildren()); } + if (BuiltInFunctionDefinitions.MINUS.equals(call.getFunctionDefinition())) { + return combineExpression("-", call.getResolvedChildren()); + } + if (BuiltInFunctionDefinitions.DIVIDE.equals(call.getFunctionDefinition())) { return combineExpression("/", call.getResolvedChildren()); } + if (BuiltInFunctionDefinitions.TIMES.equals(call.getFunctionDefinition())) { + return combineExpression("*", call.getResolvedChildren()); + } + + if (BuiltInFunctionDefinitions.MOD.equals(call.getFunctionDefinition())) { + return combineExpression("%", call.getResolvedChildren()); + } + if (BuiltInFunctionDefinitions.BETWEEN.equals(call.getFunctionDefinition())) { - return combineExpression("between", call.getResolvedChildren()); + return combineExpression("BETWEEN", call.getResolvedChildren()); + } + + if (BuiltInFunctionDefinitions.ABS.equals(call.getFunctionDefinition())) { + return combineMathFunctionExpression("ABS", call.getResolvedChildren().get(0)); } if (BuiltInFunctionDefinitions.CAST.equals(call.getFunctionDefinition())) { @@ -96,6 +112,11 @@ private String combineLeftExpression(String operator, ResolvedExpression operand return String.format("(%s %s)", left, operator); } + private String combineMathFunctionExpression(String operator, ResolvedExpression operand) { + String column = operand.accept(this); + return String.format("%s(%s)", operator, column); + } + @Override public String visit(ValueLiteralExpression valueLiteral) { LogicalTypeRoot typeRoot = valueLiteral.getOutputDataType().getLogicalType().getTypeRoot(); From b25d451be629e5d9c9c5591e4ca88b7c9730d813 Mon Sep 17 00:00:00 2001 From: vinlee19 <1401597760@qq.com> Date: Fri, 10 May 2024 17:22:40 +0800 Subject: [PATCH 05/11] add more functioin pushdown --- .../flink/table/DorisExpressionVisitor.java | 39 +++++++++++++++++++ 1 file changed, 39 insertions(+) diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisExpressionVisitor.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisExpressionVisitor.java index a0707767e..11d056434 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisExpressionVisitor.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisExpressionVisitor.java @@ -26,6 +26,7 @@ import org.apache.flink.table.expressions.ValueLiteralExpression; import org.apache.flink.table.functions.BuiltInFunctionDefinitions; import org.apache.flink.table.types.logical.LogicalTypeRoot; +import org.apache.flink.util.StringUtils; import java.util.List; @@ -95,6 +96,34 @@ public String visit(CallExpression call) { return combineMathFunctionExpression("ABS", call.getResolvedChildren().get(0)); } + if (BuiltInFunctionDefinitions.CEIL.equals(call.getFunctionDefinition())) { + return combineMathFunctionExpression("CEIL", call.getResolvedChildren().get(0)); + } + + if (BuiltInFunctionDefinitions.FLOOR.equals(call.getFunctionDefinition())) { + return combineMathFunctionExpression("FLOOR", call.getResolvedChildren().get(0)); + } + + if (BuiltInFunctionDefinitions.LN.equals(call.getFunctionDefinition())) { + return combineMathFunctionExpression("LN", call.getResolvedChildren().get(0)); + } + + if (BuiltInFunctionDefinitions.EXP.equals(call.getFunctionDefinition())) { + return combineMathFunctionExpression("EXP", call.getResolvedChildren().get(0)); + } + + if (BuiltInFunctionDefinitions.CURRENT_TIMESTAMP.equals(call.getFunctionDefinition())) { + return noArgsFunctionExpression("CURRENT_TIMESTAMP()"); + } + + if (BuiltInFunctionDefinitions.CURRENT_DATE.equals(call.getFunctionDefinition())) { + return noArgsFunctionExpression("CURRENT_DATE()"); + } + + if (BuiltInFunctionDefinitions.LOCAL_TIMESTAMP.equals(call.getFunctionDefinition())) { + return noArgsFunctionExpression("LOCALTIMESTAMP()"); + } + if (BuiltInFunctionDefinitions.CAST.equals(call.getFunctionDefinition())) { return call.getChildren().get(0).accept(this); } @@ -103,7 +132,13 @@ public String visit(CallExpression call) { private String combineExpression(String operator, List operand) { String left = operand.get(0).accept(this); + if (StringUtils.isNullOrWhitespaceOnly(left)) { + return null; + } String right = operand.get(1).accept(this); + if (StringUtils.isNullOrWhitespaceOnly(right)) { + return null; + } return String.format("(%s %s %s)", left, operator, right); } @@ -117,6 +152,10 @@ private String combineMathFunctionExpression(String operator, ResolvedExpression return String.format("%s(%s)", operator, column); } + private String noArgsFunctionExpression(String operator) { + return String.format("%s", operator); + } + @Override public String visit(ValueLiteralExpression valueLiteral) { LogicalTypeRoot typeRoot = valueLiteral.getOutputDataType().getLogicalType().getTypeRoot(); From 9b187f2069bf5e9904b7efaa4058e0bab06fb53a Mon Sep 17 00:00:00 2001 From: vinlee19 <1401597760@qq.com> Date: Fri, 10 May 2024 18:37:05 +0800 Subject: [PATCH 06/11] revert code --- .../flink/table/DorisExpressionVisitor.java | 65 ------------------- 1 file changed, 65 deletions(-) diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisExpressionVisitor.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisExpressionVisitor.java index 11d056434..66242e1e1 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisExpressionVisitor.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisExpressionVisitor.java @@ -68,62 +68,6 @@ public String visit(CallExpression call) { return combineLeftExpression("IS NOT NULL", call.getResolvedChildren().get(0)); } - if (BuiltInFunctionDefinitions.PLUS.equals(call.getFunctionDefinition())) { - return combineExpression("+", call.getResolvedChildren()); - } - - if (BuiltInFunctionDefinitions.MINUS.equals(call.getFunctionDefinition())) { - return combineExpression("-", call.getResolvedChildren()); - } - - if (BuiltInFunctionDefinitions.DIVIDE.equals(call.getFunctionDefinition())) { - return combineExpression("/", call.getResolvedChildren()); - } - - if (BuiltInFunctionDefinitions.TIMES.equals(call.getFunctionDefinition())) { - return combineExpression("*", call.getResolvedChildren()); - } - - if (BuiltInFunctionDefinitions.MOD.equals(call.getFunctionDefinition())) { - return combineExpression("%", call.getResolvedChildren()); - } - - if (BuiltInFunctionDefinitions.BETWEEN.equals(call.getFunctionDefinition())) { - return combineExpression("BETWEEN", call.getResolvedChildren()); - } - - if (BuiltInFunctionDefinitions.ABS.equals(call.getFunctionDefinition())) { - return combineMathFunctionExpression("ABS", call.getResolvedChildren().get(0)); - } - - if (BuiltInFunctionDefinitions.CEIL.equals(call.getFunctionDefinition())) { - return combineMathFunctionExpression("CEIL", call.getResolvedChildren().get(0)); - } - - if (BuiltInFunctionDefinitions.FLOOR.equals(call.getFunctionDefinition())) { - return combineMathFunctionExpression("FLOOR", call.getResolvedChildren().get(0)); - } - - if (BuiltInFunctionDefinitions.LN.equals(call.getFunctionDefinition())) { - return combineMathFunctionExpression("LN", call.getResolvedChildren().get(0)); - } - - if (BuiltInFunctionDefinitions.EXP.equals(call.getFunctionDefinition())) { - return combineMathFunctionExpression("EXP", call.getResolvedChildren().get(0)); - } - - if (BuiltInFunctionDefinitions.CURRENT_TIMESTAMP.equals(call.getFunctionDefinition())) { - return noArgsFunctionExpression("CURRENT_TIMESTAMP()"); - } - - if (BuiltInFunctionDefinitions.CURRENT_DATE.equals(call.getFunctionDefinition())) { - return noArgsFunctionExpression("CURRENT_DATE()"); - } - - if (BuiltInFunctionDefinitions.LOCAL_TIMESTAMP.equals(call.getFunctionDefinition())) { - return noArgsFunctionExpression("LOCALTIMESTAMP()"); - } - if (BuiltInFunctionDefinitions.CAST.equals(call.getFunctionDefinition())) { return call.getChildren().get(0).accept(this); } @@ -147,15 +91,6 @@ private String combineLeftExpression(String operator, ResolvedExpression operand return String.format("(%s %s)", left, operator); } - private String combineMathFunctionExpression(String operator, ResolvedExpression operand) { - String column = operand.accept(this); - return String.format("%s(%s)", operator, column); - } - - private String noArgsFunctionExpression(String operator) { - return String.format("%s", operator); - } - @Override public String visit(ValueLiteralExpression valueLiteral) { LogicalTypeRoot typeRoot = valueLiteral.getOutputDataType().getLogicalType().getTypeRoot(); From 9f04c93f83409e900cd6f595b3ad9728c8fed67f Mon Sep 17 00:00:00 2001 From: vinlee19 <1401597760@qq.com> Date: Fri, 10 May 2024 22:37:58 +0800 Subject: [PATCH 07/11] fix --- .../doris/flink/table/DorisDynamicTableSource.java | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSource.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSource.java index a216c35b5..bbf958990 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSource.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSource.java @@ -95,7 +95,10 @@ public ChangelogMode getChangelogMode() { @Override public ScanRuntimeProvider getScanRuntimeProvider(ScanContext runtimeProviderContext) { - + if (StringUtils.isNullOrWhitespaceOnly(readOptions.getFilterQuery())) { + String filterQuery = resolvedFilterQuery.stream().collect(Collectors.joining(" AND ")); + this.readOptions.setFilterQuery(filterQuery); + } if (readOptions.getUseOldApi()) { List dorisPartitions; try { @@ -189,10 +192,6 @@ public Result applyFilters(List filters) { remainingFilters.add(filter); } } - if (StringUtils.isNullOrWhitespaceOnly(readOptions.getFilterQuery())) { - String filterQuery = resolvedFilterQuery.stream().collect(Collectors.joining(" AND ")); - this.readOptions.setFilterQuery(filterQuery); - } return Result.of(acceptedFilters, remainingFilters); } From 3bd613ce753a64835ee6666968558c7567bc9a86 Mon Sep 17 00:00:00 2001 From: vinlee19 <1401597760@qq.com> Date: Fri, 10 May 2024 22:48:36 +0800 Subject: [PATCH 08/11] fix --- .../flink/table/DorisDynamicTableSource.java | 21 +++++++++---------- 1 file changed, 10 insertions(+), 11 deletions(-) diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSource.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSource.java index bbf958990..ed1fa8670 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSource.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSource.java @@ -47,7 +47,6 @@ import org.slf4j.LoggerFactory; import java.util.ArrayList; -import java.util.Arrays; import java.util.List; import java.util.stream.Collectors; @@ -97,7 +96,7 @@ public ChangelogMode getChangelogMode() { public ScanRuntimeProvider getScanRuntimeProvider(ScanContext runtimeProviderContext) { if (StringUtils.isNullOrWhitespaceOnly(readOptions.getFilterQuery())) { String filterQuery = resolvedFilterQuery.stream().collect(Collectors.joining(" AND ")); - this.readOptions.setFilterQuery(filterQuery); + readOptions.setFilterQuery(filterQuery); } if (readOptions.getUseOldApi()) { List dorisPartitions; @@ -203,14 +202,14 @@ public boolean supportsNestedProjection() { @Override public void applyProjection(int[][] projectedFields, DataType producedDataType) { this.physicalRowDataType = Projection.of(projectedFields).project(physicalRowDataType); - - if (StringUtils.isNullOrWhitespaceOnly(readOptions.getReadFields())) { - String[] selectFields = - DataType.getFieldNames(physicalRowDataType).toArray(new String[0]); - this.readOptions.setReadFields( - Arrays.stream(selectFields) - .map(item -> String.format("`%s`", item.trim().replace("`", ""))) - .collect(Collectors.joining(", "))); - } + // if (StringUtils.isNullOrWhitespaceOnly(readOptions.getReadFields())) { + // String[] selectFields = + // DataType.getFieldNames(physicalRowDataType).toArray(new String[0]); + // this.readOptions.setReadFields( + // Arrays.stream(selectFields) + // .map(item -> String.format("`%s`", item.trim().replace("`", + // ""))) + // .collect(Collectors.joining(", "))); + // } } } From c1c18b82097fdb6442ef11f089fa8dea7082c1e3 Mon Sep 17 00:00:00 2001 From: vinlee19 <1401597760@qq.com> Date: Fri, 10 May 2024 22:49:22 +0800 Subject: [PATCH 09/11] fix --- .../flink/table/DorisDynamicTableSource.java | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSource.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSource.java index ed1fa8670..2c37da56b 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSource.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSource.java @@ -47,6 +47,7 @@ import org.slf4j.LoggerFactory; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; import java.util.stream.Collectors; @@ -202,14 +203,13 @@ public boolean supportsNestedProjection() { @Override public void applyProjection(int[][] projectedFields, DataType producedDataType) { this.physicalRowDataType = Projection.of(projectedFields).project(physicalRowDataType); - // if (StringUtils.isNullOrWhitespaceOnly(readOptions.getReadFields())) { - // String[] selectFields = - // DataType.getFieldNames(physicalRowDataType).toArray(new String[0]); - // this.readOptions.setReadFields( - // Arrays.stream(selectFields) - // .map(item -> String.format("`%s`", item.trim().replace("`", - // ""))) - // .collect(Collectors.joining(", "))); - // } + if (StringUtils.isNullOrWhitespaceOnly(readOptions.getReadFields())) { + String[] selectFields = + DataType.getFieldNames(physicalRowDataType).toArray(new String[0]); + this.readOptions.setReadFields( + Arrays.stream(selectFields) + .map(item -> String.format("`%s`", item.trim().replace("`", ""))) + .collect(Collectors.joining(", "))); + } } } From 906a4c97615dfebf92eafc5639f36f4ef2927144 Mon Sep 17 00:00:00 2001 From: vinlee19 <1401597760@qq.com> Date: Sat, 11 May 2024 11:46:01 +0800 Subject: [PATCH 10/11] fix doris.query.filter is not null, the pushdown errro --- .../org/apache/doris/flink/table/DorisDynamicTableSource.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSource.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSource.java index 2c37da56b..c57ce2a69 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSource.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSource.java @@ -185,7 +185,8 @@ public Result applyFilters(List filters) { DorisExpressionVisitor expressionVisitor = new DorisExpressionVisitor(); for (ResolvedExpression filter : filters) { String filterQuery = filter.accept(expressionVisitor); - if (!StringUtils.isNullOrWhitespaceOnly(filterQuery)) { + if (StringUtils.isNullOrWhitespaceOnly(filterQuery) + && !StringUtils.isNullOrWhitespaceOnly(filterQuery)) { acceptedFilters.add(filter); this.resolvedFilterQuery.add(filterQuery); } else { From 4d99d41dbdce3f9b113b465df8191bbfec0d4bf7 Mon Sep 17 00:00:00 2001 From: vinlee19 <1401597760@qq.com> Date: Sat, 11 May 2024 14:22:08 +0800 Subject: [PATCH 11/11] fix doris.query.filter is not null, the pushdown errro --- .../org/apache/doris/flink/table/DorisDynamicTableSource.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSource.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSource.java index c57ce2a69..e2b837c32 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSource.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSource.java @@ -185,7 +185,7 @@ public Result applyFilters(List filters) { DorisExpressionVisitor expressionVisitor = new DorisExpressionVisitor(); for (ResolvedExpression filter : filters) { String filterQuery = filter.accept(expressionVisitor); - if (StringUtils.isNullOrWhitespaceOnly(filterQuery) + if (StringUtils.isNullOrWhitespaceOnly(readOptions.getFilterQuery()) && !StringUtils.isNullOrWhitespaceOnly(filterQuery)) { acceptedFilters.add(filter); this.resolvedFilterQuery.add(filterQuery);