Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix](ctx) manager the lifecycle of connection context #29346

Merged
merged 3 commits into from
Jan 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions be/src/runtime/group_commit_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,8 @@ Status GroupCommitTable::_create_group_commit_load(
request.__set_token("group_commit"); // this is a fake, fe not check it now
request.__set_max_filter_ratio(1.0);
request.__set_strictMode(false);
// this is an internal interface, use admin to pass the auth check
request.__set_user("admin");
if (_exec_env->master_info()->__isset.backend_id) {
request.__set_backend_id(_exec_env->master_info()->backend_id);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.doris.thrift.TScanRangeLocations;

import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
Expand Down Expand Up @@ -122,9 +123,13 @@ public void init(List<String> preLocations) throws UserException {
Set<Tag> tags = Sets.newHashSet();
if (ConnectContext.get() != null && ConnectContext.get().getCurrentUserIdentity() != null) {
String qualifiedUser = ConnectContext.get().getCurrentUserIdentity().getQualifiedUser();
tags = Env.getCurrentEnv().getAuth().getResourceTags(qualifiedUser);
if (tags == UserProperty.INVALID_RESOURCE_TAGS) {
throw new UserException("No valid resource tag for user: " + qualifiedUser);
// Some request from stream load(eg, mysql load) may not set user info in ConnectContext
// just ignore it.
if (!Strings.isNullOrEmpty(qualifiedUser)) {
tags = Env.getCurrentEnv().getAuth().getResourceTags(qualifiedUser);
if (tags == UserProperty.INVALID_RESOURCE_TAGS) {
throw new UserException("No valid resource tag for user: " + qualifiedUser);
}
}
} else {
LOG.debug("user info in ExternalFileScanNode should not be null, add log to observer");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1786,9 +1786,19 @@ public TStreamLoadPutResult streamLoadPut(TStreamLoadPutRequest request) {
TStreamLoadPutResult result = new TStreamLoadPutResult();
TStatus status = new TStatus(TStatusCode.OK);
result.setStatus(status);

// create connect context
ConnectContext ctx = new ConnectContext();
ctx.setEnv(Env.getCurrentEnv());
ctx.setQueryId(request.getLoadId());
ctx.setCurrentUserIdentity(UserIdentity.createAnalyzedUserIdentWithIp(request.getUser(), "%"));
ctx.setQualifiedUser(request.getUser());
ctx.setBackendId(request.getBackendId());
ctx.setThreadLocalInfo();

try {
if (!Strings.isNullOrEmpty(request.getLoadSql())) {
httpStreamPutImpl(request, result);
httpStreamPutImpl(request, result, ctx);
return result;
} else {
if (Config.enable_pipeline_load) {
Expand All @@ -1806,6 +1816,8 @@ public TStreamLoadPutResult streamLoadPut(TStreamLoadPutRequest request) {
status.setStatusCode(TStatusCode.INTERNAL_ERROR);
status.addToErrorMsgs(e.getClass().getSimpleName() + ": " + Strings.nullToEmpty(e.getMessage()));
return result;
} finally {
ConnectContext.remove();
}
return result;
}
Expand Down Expand Up @@ -1917,25 +1929,19 @@ public TStreamLoadMultiTablePutResult streamLoadMultiTablePut(TStreamLoadPutRequ
return result;
}


private void httpStreamPutImpl(TStreamLoadPutRequest request, TStreamLoadPutResult result)
private void httpStreamPutImpl(TStreamLoadPutRequest request, TStreamLoadPutResult result, ConnectContext ctx)
throws UserException {
LOG.info("receive http stream put request");
if (LOG.isDebugEnabled()) {
LOG.debug("receive http stream put request: {}", request);
}
String originStmt = request.getLoadSql();
ConnectContext ctx = new ConnectContext();
if (request.isSetAuthCode()) {
// TODO(cmy): find a way to check
} else if (Strings.isNullOrEmpty(request.getToken())) {
checkSingleTablePasswordAndPrivs(request.getUser(), request.getPasswd(), request.getDb(),
request.getTbl(),
request.getUserIp(), PrivPredicate.LOAD);
}
ctx.setEnv(Env.getCurrentEnv());
ctx.setQueryId(request.getLoadId());
ctx.setCurrentUserIdentity(UserIdentity.ROOT);
ctx.setQualifiedUser(UserIdentity.ROOT.getQualifiedUser());
ctx.setBackendId(request.getBackendId());
ctx.setThreadLocalInfo();
SqlScanner input = new SqlScanner(new StringReader(originStmt), ctx.getSessionVariable().getSqlMode());
SqlParser parser = new SqlParser(input);
try {
Expand Down Expand Up @@ -2859,16 +2865,11 @@ private TRestoreSnapshotResult restoreSnapshotImpl(TRestoreSnapshotRequest reque
restoreStmt.setIsBeingSynced();
LOG.trace("restore snapshot info, restoreStmt: {}", restoreStmt);
try {
ConnectContext ctx = ConnectContext.get();
if (ctx == null) {
ctx = new ConnectContext();
ctx.setThreadLocalInfo();
}
ConnectContext ctx = new ConnectContext();
ctx.setQualifiedUser(request.getUser());
String fullUserName = ClusterNamespace.getNameFromFullName(request.getUser());
UserIdentity currentUserIdentity = new UserIdentity(fullUserName, "%");
ctx.setCurrentUserIdentity(currentUserIdentity);

ctx.setCurrentUserIdentity(UserIdentity.createAnalyzedUserIdentWithIp(fullUserName, "%"));
ctx.setThreadLocalInfo();
Analyzer analyzer = new Analyzer(ctx.getEnv(), ctx);
restoreStmt.analyze(analyzer);
DdlExecutor.execute(Env.getCurrentEnv(), restoreStmt);
Expand All @@ -2880,6 +2881,8 @@ private TRestoreSnapshotResult restoreSnapshotImpl(TRestoreSnapshotRequest reque
LOG.warn("catch unknown result.", e);
status.setStatusCode(TStatusCode.INTERNAL_ERROR);
status.addToErrorMsgs(Strings.nullToEmpty(e.getMessage()));
} finally {
ConnectContext.remove();
}

return result;
Expand Down
Loading