Skip to content

Commit

Permalink
[fix](ctx) manager the lifecycle of connection context (#29346)
Browse files Browse the repository at this point in the history
In FrontendService, we may create some connection context and set it as a thread local varaible.
These context should be removed from thread local after call.
Otherwise, it may be reused by other thread incorrectly.
  • Loading branch information
morningman authored Jan 1, 2024
1 parent 738abac commit 4cbbd25
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 22 deletions.
2 changes: 2 additions & 0 deletions be/src/runtime/group_commit_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,8 @@ Status GroupCommitTable::_create_group_commit_load(
request.__set_token("group_commit"); // this is a fake, fe not check it now
request.__set_max_filter_ratio(1.0);
request.__set_strictMode(false);
// this is an internal interface, use admin to pass the auth check
request.__set_user("admin");
if (_exec_env->master_info()->__isset.backend_id) {
request.__set_backend_id(_exec_env->master_info()->backend_id);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.doris.thrift.TScanRangeLocations;

import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
Expand Down Expand Up @@ -122,9 +123,13 @@ public void init(List<String> preLocations) throws UserException {
Set<Tag> tags = Sets.newHashSet();
if (ConnectContext.get() != null && ConnectContext.get().getCurrentUserIdentity() != null) {
String qualifiedUser = ConnectContext.get().getCurrentUserIdentity().getQualifiedUser();
tags = Env.getCurrentEnv().getAuth().getResourceTags(qualifiedUser);
if (tags == UserProperty.INVALID_RESOURCE_TAGS) {
throw new UserException("No valid resource tag for user: " + qualifiedUser);
// Some request from stream load(eg, mysql load) may not set user info in ConnectContext
// just ignore it.
if (!Strings.isNullOrEmpty(qualifiedUser)) {
tags = Env.getCurrentEnv().getAuth().getResourceTags(qualifiedUser);
if (tags == UserProperty.INVALID_RESOURCE_TAGS) {
throw new UserException("No valid resource tag for user: " + qualifiedUser);
}
}
} else {
LOG.debug("user info in ExternalFileScanNode should not be null, add log to observer");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1786,9 +1786,19 @@ public TStreamLoadPutResult streamLoadPut(TStreamLoadPutRequest request) {
TStreamLoadPutResult result = new TStreamLoadPutResult();
TStatus status = new TStatus(TStatusCode.OK);
result.setStatus(status);

// create connect context
ConnectContext ctx = new ConnectContext();
ctx.setEnv(Env.getCurrentEnv());
ctx.setQueryId(request.getLoadId());
ctx.setCurrentUserIdentity(UserIdentity.createAnalyzedUserIdentWithIp(request.getUser(), "%"));
ctx.setQualifiedUser(request.getUser());
ctx.setBackendId(request.getBackendId());
ctx.setThreadLocalInfo();

try {
if (!Strings.isNullOrEmpty(request.getLoadSql())) {
httpStreamPutImpl(request, result);
httpStreamPutImpl(request, result, ctx);
return result;
} else {
if (Config.enable_pipeline_load) {
Expand All @@ -1806,6 +1816,8 @@ public TStreamLoadPutResult streamLoadPut(TStreamLoadPutRequest request) {
status.setStatusCode(TStatusCode.INTERNAL_ERROR);
status.addToErrorMsgs(e.getClass().getSimpleName() + ": " + Strings.nullToEmpty(e.getMessage()));
return result;
} finally {
ConnectContext.remove();
}
return result;
}
Expand Down Expand Up @@ -1917,25 +1929,19 @@ public TStreamLoadMultiTablePutResult streamLoadMultiTablePut(TStreamLoadPutRequ
return result;
}


private void httpStreamPutImpl(TStreamLoadPutRequest request, TStreamLoadPutResult result)
private void httpStreamPutImpl(TStreamLoadPutRequest request, TStreamLoadPutResult result, ConnectContext ctx)
throws UserException {
LOG.info("receive http stream put request");
if (LOG.isDebugEnabled()) {
LOG.debug("receive http stream put request: {}", request);
}
String originStmt = request.getLoadSql();
ConnectContext ctx = new ConnectContext();
if (request.isSetAuthCode()) {
// TODO(cmy): find a way to check
} else if (Strings.isNullOrEmpty(request.getToken())) {
checkSingleTablePasswordAndPrivs(request.getUser(), request.getPasswd(), request.getDb(),
request.getTbl(),
request.getUserIp(), PrivPredicate.LOAD);
}
ctx.setEnv(Env.getCurrentEnv());
ctx.setQueryId(request.getLoadId());
ctx.setCurrentUserIdentity(UserIdentity.ROOT);
ctx.setQualifiedUser(UserIdentity.ROOT.getQualifiedUser());
ctx.setBackendId(request.getBackendId());
ctx.setThreadLocalInfo();
SqlScanner input = new SqlScanner(new StringReader(originStmt), ctx.getSessionVariable().getSqlMode());
SqlParser parser = new SqlParser(input);
try {
Expand Down Expand Up @@ -2859,16 +2865,11 @@ private TRestoreSnapshotResult restoreSnapshotImpl(TRestoreSnapshotRequest reque
restoreStmt.setIsBeingSynced();
LOG.trace("restore snapshot info, restoreStmt: {}", restoreStmt);
try {
ConnectContext ctx = ConnectContext.get();
if (ctx == null) {
ctx = new ConnectContext();
ctx.setThreadLocalInfo();
}
ConnectContext ctx = new ConnectContext();
ctx.setQualifiedUser(request.getUser());
String fullUserName = ClusterNamespace.getNameFromFullName(request.getUser());
UserIdentity currentUserIdentity = new UserIdentity(fullUserName, "%");
ctx.setCurrentUserIdentity(currentUserIdentity);

ctx.setCurrentUserIdentity(UserIdentity.createAnalyzedUserIdentWithIp(fullUserName, "%"));
ctx.setThreadLocalInfo();
Analyzer analyzer = new Analyzer(ctx.getEnv(), ctx);
restoreStmt.analyze(analyzer);
DdlExecutor.execute(Env.getCurrentEnv(), restoreStmt);
Expand All @@ -2880,6 +2881,8 @@ private TRestoreSnapshotResult restoreSnapshotImpl(TRestoreSnapshotRequest reque
LOG.warn("catch unknown result.", e);
status.setStatusCode(TStatusCode.INTERNAL_ERROR);
status.addToErrorMsgs(Strings.nullToEmpty(e.getMessage()));
} finally {
ConnectContext.remove();
}

return result;
Expand Down

0 comments on commit 4cbbd25

Please sign in to comment.