Skip to content

Commit

Permalink
insert into value
Browse files Browse the repository at this point in the history
  • Loading branch information
sollhui committed Jan 1, 2024
1 parent 01f99d4 commit f86570c
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 0 deletions.
9 changes: 9 additions & 0 deletions fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
Original file line number Diff line number Diff line change
Expand Up @@ -408,6 +408,10 @@ public int getScanRangeNum() {
return scanRangeNum;
}

public TQueryOptions getQueryOptions() {
return this.queryOptions;
}

public void setQueryId(TUniqueId queryId) {
this.queryId = queryId;
}
Expand Down Expand Up @@ -633,18 +637,23 @@ private void execInternal() throws Exception {
DebugUtil.printId(queryId), fragments.get(0).toThrift());
}

LOG.info("test 222");
processFragmentAssignmentAndParams();
LOG.info("test 222");

traceInstance();
LOG.info("test 222");

QeProcessorImpl.INSTANCE.registerInstances(queryId, instanceIds.size());
LOG.info("test 222");

// create result receiver
PlanFragmentId topId = fragments.get(0).getFragmentId();
FragmentExecParams topParams = fragmentExecParamsMap.get(topId);
DataSink topDataSink = topParams.fragment.getSink();
this.timeoutDeadline = System.currentTimeMillis() + queryOptions.getExecutionTimeout() * 1000L;
if (topDataSink instanceof ResultSink || topDataSink instanceof ResultFileSink) {
LOG.info("test 222");
TNetworkAddress execBeAddr = topParams.instanceExecParams.get(0).host;
receiver = new ResultReceiver(queryId, topParams.instanceExecParams.get(0).instanceId,
addressToBackendID.get(execBeAddr), toBrpcHost(execBeAddr), this.timeoutDeadline);
Expand Down
15 changes: 15 additions & 0 deletions fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.PrimitiveType;
import org.apache.doris.catalog.ScalarType;
import org.apache.doris.catalog.Table;
Expand Down Expand Up @@ -1226,6 +1227,14 @@ private void analyzeAndGenerateQueryPlan(TQueryOptions tQueryOptions) throws Use
}
profile.getSummaryProfile().setQueryAnalysisFinishTime();
planner = new OriginalPlanner(analyzer);
if (parsedStmt instanceof InsertStmt) {
boolean isEnableMemtableOnSinkNode =
! ((OlapTable) ((InsertStmt) parsedStmt).getTargetTable())
.getTableProperty()
.getUseSchemaLightChange()
? false : tQueryOptions.isEnableMemtableOnSinkNode();
tQueryOptions.setEnableMemtableOnSinkNode(isEnableMemtableOnSinkNode);
}
if (parsedStmt instanceof QueryStmt || parsedStmt instanceof InsertStmt) {
planner.plan(parsedStmt, tQueryOptions);
}
Expand Down Expand Up @@ -1996,6 +2005,12 @@ private void handleInsertStmt() throws Exception {

QeProcessorImpl.INSTANCE.registerQuery(context.queryId(), coord);

boolean isEnableMemtableOnSinkNode =
! ((OlapTable) insertStmt.getTargetTable())
.getTableProperty()
.getUseSchemaLightChange()
? false : coord.getQueryOptions().isEnableMemtableOnSinkNode();
coord.getQueryOptions().setEnableMemtableOnSinkNode(isEnableMemtableOnSinkNode);
coord.exec();
int execTimeout = context.getExecTimeout();
LOG.debug("Insert {} execution timeout:{}", DebugUtil.printId(context.queryId()), execTimeout);
Expand Down

0 comments on commit f86570c

Please sign in to comment.