Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix](nereids) Fix query mv rewrite fail when mv cache build quickly #28876

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import org.apache.doris.catalog.TableIf;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.mtmv.BaseTableInfo;
import org.apache.doris.mtmv.MTMVCache;
import org.apache.doris.mtmv.MTMVPartitionInfo;
import org.apache.doris.mtmv.MTMVUtil;
import org.apache.doris.nereids.CascadesContext;
Expand Down Expand Up @@ -104,13 +103,8 @@ protected List<Plan> rewrite(Plan queryPlan, CascadesContext cascadesContext) {
logger.debug(currentClassName + " this group is already rewritten so skip");
continue;
}
MTMV mtmv = materializationContext.getMTMV();
MTMVCache mtmvCache = getCacheFromMTMV(mtmv);
if (mtmvCache == null) {
logger.warn(currentClassName + " mv cache is null so return");
return rewriteResults;
}
List<StructInfo> viewStructInfos = extractStructInfo(mtmvCache.getLogicalPlan(), cascadesContext);
List<StructInfo> viewStructInfos = extractStructInfo(materializationContext.getMvPlan(),
cascadesContext);
if (viewStructInfos.size() > 1) {
// view struct info should only have one
logger.warn(currentClassName + " the num of view struct info is more then one so return");
Expand Down Expand Up @@ -200,7 +194,7 @@ protected List<Plan> rewrite(Plan queryPlan, CascadesContext cascadesContext) {
CascadesContext rewrittenPlanContext =
CascadesContext.initContext(cascadesContext.getStatementContext(), rewrittenPlan,
cascadesContext.getCurrentJobContext().getRequiredProperties());
Rewriter.getWholeTreeRewriter(cascadesContext).execute();
Rewriter.getWholeTreeRewriter(rewrittenPlanContext).execute();
rewrittenPlan = rewrittenPlanContext.getRewritePlan();
logger.debug(currentClassName + "rewrite by materialized view success");
rewriteResults.add(rewrittenPlan);
Expand Down Expand Up @@ -289,17 +283,6 @@ protected boolean checkPartitionIsValid(
&& relatedTalbeValidSet.containsAll(relatedTableSelectedPartitionToCheck);
}

private MTMVCache getCacheFromMTMV(MTMV mtmv) {
MTMVCache cache;
try {
cache = mtmv.getOrGenerateCache();
} catch (AnalysisException analysisException) {
logger.warn(this.getClass().getSimpleName() + " get mtmv cache analysisException", analysisException);
return null;
}
return cache;
}

/**
* Rewrite query by view, for aggregate or join rewriting should be different inherit class implementation
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ public class MaterializationContext {
// generate form mv scan plan
private ExpressionMapping mvExprToMvScanExprMapping;
private boolean available = true;
// the mv plan from cache at present, record it to make sure query rewrite by mv is right when cache change.
private Plan mvPlan;

/**
* MaterializationContext, this contains necessary info for query rewriting by mv
Expand Down Expand Up @@ -81,6 +83,8 @@ public MaterializationContext(MTMV mtmv,
mtmvCache.getMvOutputExpressions(),
mtmvCache.getLogicalPlan()),
mvScanPlan.getExpressions());
// copy the plan from cache, which the plan in cache may change
this.mvPlan = mtmvCache.getLogicalPlan();
}

public Set<GroupId> getMatchedGroups() {
Expand Down Expand Up @@ -119,6 +123,10 @@ public boolean isAvailable() {
return available;
}

public Plan getMvPlan() {
return mvPlan;
}

/**
* MaterializationContext fromMaterializedView
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,11 @@ suite("aggregate_with_roll_up") {
O_COMMENT VARCHAR(79) NOT NULL
)
DUPLICATE KEY(o_orderkey, o_custkey)
PARTITION BY RANGE(o_orderdate) (PARTITION `day_2` VALUES LESS THAN ('2023-12-30'))
PARTITION BY RANGE(o_orderdate) (
PARTITION `day_2` VALUES LESS THAN ('2023-12-9'),
PARTITION `day_3` VALUES LESS THAN ("2023-12-11"),
PARTITION `day_4` VALUES LESS THAN ("2023-12-30")
)
DISTRIBUTED BY HASH(o_orderkey) BUCKETS 3
PROPERTIES (
"replication_num" = "1"
Expand Down Expand Up @@ -73,7 +77,10 @@ suite("aggregate_with_roll_up") {
l_comment VARCHAR(44) NOT NULL
)
DUPLICATE KEY(l_orderkey, l_partkey, l_suppkey, l_linenumber)
PARTITION BY RANGE(l_shipdate) (PARTITION `day_1` VALUES LESS THAN ('2023-12-30'))
PARTITION BY RANGE(l_shipdate) (
PARTITION `day_1` VALUES LESS THAN ('2023-12-9'),
PARTITION `day_2` VALUES LESS THAN ("2023-12-11"),
PARTITION `day_3` VALUES LESS THAN ("2023-12-30"))
DISTRIBUTED BY HASH(l_orderkey) BUCKETS 3
PROPERTIES (
"replication_num" = "1"
Expand Down Expand Up @@ -144,6 +151,26 @@ suite("aggregate_with_roll_up") {
}
}

def check_rewrite_with_mv_partition = { mv_sql, query_sql, mv_name, partition_column ->

sql """DROP MATERIALIZED VIEW IF EXISTS ${mv_name}"""
sql"""
CREATE MATERIALIZED VIEW ${mv_name}
BUILD IMMEDIATE REFRESH COMPLETE ON MANUAL
PARTITION BY (${partition_column})
DISTRIBUTED BY RANDOM BUCKETS 2
PROPERTIES ('replication_num' = '1')
AS ${mv_sql}
"""

def job_name = getJobName(db, mv_name);
waitingMTMVTaskFinished(job_name)
explain {
sql("${query_sql}")
contains "(${mv_name})"
}
}

def check_rewrite_with_force_analyze = { mv_sql, query_sql, mv_name ->

sql """DROP MATERIALIZED VIEW IF EXISTS ${mv_name}"""
Expand Down Expand Up @@ -283,7 +310,7 @@ suite("aggregate_with_roll_up") {
"l_partkey, " +
"l_suppkey"
order_qt_query15_0_before "${query15_0}"
check_rewrite(mv15_0, query15_0, "mv15_0")
check_rewrite_with_mv_partition(mv15_0, query15_0, "mv15_0", "l_shipdate")
order_qt_query15_0_after "${query15_0}"
sql """ DROP MATERIALIZED VIEW IF EXISTS mv15_0"""

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,15 @@ suite("aggregate_without_roll_up") {
o_comment VARCHAR(79) NOT NULL
)
DUPLICATE KEY(o_orderkey, o_custkey)
PARTITION BY RANGE(o_orderdate) (PARTITION `day_2` VALUES LESS THAN ('2023-12-30'))
PARTITION BY RANGE(o_orderdate) (
PARTITION `day_2` VALUES LESS THAN ('2023-12-9'),
PARTITION `day_3` VALUES LESS THAN ("2023-12-11"),
PARTITION `day_4` VALUES LESS THAN ("2023-12-30")
)
DISTRIBUTED BY HASH(o_orderkey) BUCKETS 3
PROPERTIES (
"replication_num" = "1"
)
);
"""

sql """
Expand All @@ -74,7 +78,11 @@ suite("aggregate_without_roll_up") {
l_comment VARCHAR(44) NOT NULL
)
DUPLICATE KEY(l_orderkey, l_partkey, l_suppkey, l_linenumber)
PARTITION BY RANGE(l_shipdate) (PARTITION `day_1` VALUES LESS THAN ('2023-12-30'))
PARTITION BY RANGE(l_shipdate) (
PARTITION `day_2` VALUES LESS THAN ('2023-12-9'),
PARTITION `day_3` VALUES LESS THAN ("2023-12-11"),
PARTITION `day_4` VALUES LESS THAN ("2023-12-30")
)
DISTRIBUTED BY HASH(l_orderkey) BUCKETS 3
PROPERTIES (
"replication_num" = "1"
Expand All @@ -100,7 +108,8 @@ suite("aggregate_without_roll_up") {
)
"""

sql """ insert into lineitem values
sql """
insert into lineitem values
(1, 2, 3, 4, 5.5, 6.5, 7.5, 8.5, 'o', 'k', '2023-12-08', '2023-12-09', '2023-12-10', 'a', 'b', 'yyyyyyyyy'),
(2, 4, 3, 4, 5.5, 6.5, 7.5, 8.5, 'o', 'k', '2023-12-09', '2023-12-09', '2023-12-10', 'a', 'b', 'yyyyyyyyy'),
(3, 2, 4, 4, 5.5, 6.5, 7.5, 8.5, 'o', 'k', '2023-12-10', '2023-12-09', '2023-12-10', 'a', 'b', 'yyyyyyyyy'),
Expand Down
Loading