From 4cbbd25d8c9f0a4ef8178fb38f49738c6ec45963 Mon Sep 17 00:00:00 2001 From: Mingyu Chen Date: Mon, 1 Jan 2024 23:32:28 +0800 Subject: [PATCH] [fix](ctx) manager the lifecycle of connection context (#29346) In FrontendService, we may create some connection context and set it as a thread local varaible. These context should be removed from thread local after call. Otherwise, it may be reused by other thread incorrectly. --- be/src/runtime/group_commit_mgr.cpp | 2 + .../external/FederationBackendPolicy.java | 11 +++-- .../doris/service/FrontendServiceImpl.java | 41 ++++++++++--------- 3 files changed, 32 insertions(+), 22 deletions(-) diff --git a/be/src/runtime/group_commit_mgr.cpp b/be/src/runtime/group_commit_mgr.cpp index acb40ae6c78661..00b3a559fbf6ef 100644 --- a/be/src/runtime/group_commit_mgr.cpp +++ b/be/src/runtime/group_commit_mgr.cpp @@ -235,6 +235,8 @@ Status GroupCommitTable::_create_group_commit_load( request.__set_token("group_commit"); // this is a fake, fe not check it now request.__set_max_filter_ratio(1.0); request.__set_strictMode(false); + // this is an internal interface, use admin to pass the auth check + request.__set_user("admin"); if (_exec_env->master_info()->__isset.backend_id) { request.__set_backend_id(_exec_env->master_info()->backend_id); } else { diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FederationBackendPolicy.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FederationBackendPolicy.java index d1d7e90d35ad16..ade03291c30552 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FederationBackendPolicy.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FederationBackendPolicy.java @@ -30,6 +30,7 @@ import org.apache.doris.thrift.TScanRangeLocations; import com.google.common.base.Preconditions; +import com.google.common.base.Strings; import com.google.common.cache.CacheBuilder; import com.google.common.cache.CacheLoader; import com.google.common.cache.LoadingCache; @@ -122,9 +123,13 @@ public void init(List preLocations) throws UserException { Set tags = Sets.newHashSet(); if (ConnectContext.get() != null && ConnectContext.get().getCurrentUserIdentity() != null) { String qualifiedUser = ConnectContext.get().getCurrentUserIdentity().getQualifiedUser(); - tags = Env.getCurrentEnv().getAuth().getResourceTags(qualifiedUser); - if (tags == UserProperty.INVALID_RESOURCE_TAGS) { - throw new UserException("No valid resource tag for user: " + qualifiedUser); + // Some request from stream load(eg, mysql load) may not set user info in ConnectContext + // just ignore it. + if (!Strings.isNullOrEmpty(qualifiedUser)) { + tags = Env.getCurrentEnv().getAuth().getResourceTags(qualifiedUser); + if (tags == UserProperty.INVALID_RESOURCE_TAGS) { + throw new UserException("No valid resource tag for user: " + qualifiedUser); + } } } else { LOG.debug("user info in ExternalFileScanNode should not be null, add log to observer"); diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java index e5b0f651016ce9..136084f5f1c012 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java +++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java @@ -1786,9 +1786,19 @@ public TStreamLoadPutResult streamLoadPut(TStreamLoadPutRequest request) { TStreamLoadPutResult result = new TStreamLoadPutResult(); TStatus status = new TStatus(TStatusCode.OK); result.setStatus(status); + + // create connect context + ConnectContext ctx = new ConnectContext(); + ctx.setEnv(Env.getCurrentEnv()); + ctx.setQueryId(request.getLoadId()); + ctx.setCurrentUserIdentity(UserIdentity.createAnalyzedUserIdentWithIp(request.getUser(), "%")); + ctx.setQualifiedUser(request.getUser()); + ctx.setBackendId(request.getBackendId()); + ctx.setThreadLocalInfo(); + try { if (!Strings.isNullOrEmpty(request.getLoadSql())) { - httpStreamPutImpl(request, result); + httpStreamPutImpl(request, result, ctx); return result; } else { if (Config.enable_pipeline_load) { @@ -1806,6 +1816,8 @@ public TStreamLoadPutResult streamLoadPut(TStreamLoadPutRequest request) { status.setStatusCode(TStatusCode.INTERNAL_ERROR); status.addToErrorMsgs(e.getClass().getSimpleName() + ": " + Strings.nullToEmpty(e.getMessage())); return result; + } finally { + ConnectContext.remove(); } return result; } @@ -1917,12 +1929,12 @@ public TStreamLoadMultiTablePutResult streamLoadMultiTablePut(TStreamLoadPutRequ return result; } - - private void httpStreamPutImpl(TStreamLoadPutRequest request, TStreamLoadPutResult result) + private void httpStreamPutImpl(TStreamLoadPutRequest request, TStreamLoadPutResult result, ConnectContext ctx) throws UserException { - LOG.info("receive http stream put request"); + if (LOG.isDebugEnabled()) { + LOG.debug("receive http stream put request: {}", request); + } String originStmt = request.getLoadSql(); - ConnectContext ctx = new ConnectContext(); if (request.isSetAuthCode()) { // TODO(cmy): find a way to check } else if (Strings.isNullOrEmpty(request.getToken())) { @@ -1930,12 +1942,6 @@ private void httpStreamPutImpl(TStreamLoadPutRequest request, TStreamLoadPutResu request.getTbl(), request.getUserIp(), PrivPredicate.LOAD); } - ctx.setEnv(Env.getCurrentEnv()); - ctx.setQueryId(request.getLoadId()); - ctx.setCurrentUserIdentity(UserIdentity.ROOT); - ctx.setQualifiedUser(UserIdentity.ROOT.getQualifiedUser()); - ctx.setBackendId(request.getBackendId()); - ctx.setThreadLocalInfo(); SqlScanner input = new SqlScanner(new StringReader(originStmt), ctx.getSessionVariable().getSqlMode()); SqlParser parser = new SqlParser(input); try { @@ -2859,16 +2865,11 @@ private TRestoreSnapshotResult restoreSnapshotImpl(TRestoreSnapshotRequest reque restoreStmt.setIsBeingSynced(); LOG.trace("restore snapshot info, restoreStmt: {}", restoreStmt); try { - ConnectContext ctx = ConnectContext.get(); - if (ctx == null) { - ctx = new ConnectContext(); - ctx.setThreadLocalInfo(); - } + ConnectContext ctx = new ConnectContext(); ctx.setQualifiedUser(request.getUser()); String fullUserName = ClusterNamespace.getNameFromFullName(request.getUser()); - UserIdentity currentUserIdentity = new UserIdentity(fullUserName, "%"); - ctx.setCurrentUserIdentity(currentUserIdentity); - + ctx.setCurrentUserIdentity(UserIdentity.createAnalyzedUserIdentWithIp(fullUserName, "%")); + ctx.setThreadLocalInfo(); Analyzer analyzer = new Analyzer(ctx.getEnv(), ctx); restoreStmt.analyze(analyzer); DdlExecutor.execute(Env.getCurrentEnv(), restoreStmt); @@ -2880,6 +2881,8 @@ private TRestoreSnapshotResult restoreSnapshotImpl(TRestoreSnapshotRequest reque LOG.warn("catch unknown result.", e); status.setStatusCode(TStatusCode.INTERNAL_ERROR); status.addToErrorMsgs(Strings.nullToEmpty(e.getMessage())); + } finally { + ConnectContext.remove(); } return result;