Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[TEST]3.2.15 test #56347

Open
wants to merge 6 commits into
base: branch-3.2
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 10 additions & 5 deletions be/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ if (CMAKE_CXX_COMPILER_ID STREQUAL "GNU")
message(STATUS "GCC version ${CMAKE_CXX_COMPILER_VERSION} is greater than 10.3.0, disable -Werror. Be careful with compile warnings.")
else()
# -Werror: compile warnings should be errors when using the toolchain compiler.
set(CXX_GCC_FLAGS "${CXX_GCC_FLAGS} -Werror")
#set(CXX_GCC_FLAGS "${CXX_GCC_FLAGS} -Werror")
endif()
elseif (CMAKE_CXX_COMPILER_ID STREQUAL "Clang")
elseif (NOT APPLE)
Expand Down Expand Up @@ -615,11 +615,16 @@ endif()
set(CXX_COMMON_FLAGS "${CXX_COMMON_FLAGS} -fno-sized-deallocation")

if (CMAKE_CXX_COMPILER_ID STREQUAL "Clang")
if (NOT ${MAKE_TEST} STREQUAL "ON")
# there are too many warnings reported by clang in be/test and it will take some time to fix.
# temporarily disable Werror in the unit test so that clang can compile.
#set(CXX_COMMON_FLAGS "${CXX_COMMON_FLAGS} -Werror")
endif()
set(CXX_COMMON_FLAGS "${CXX_COMMON_FLAGS} -Wno-unused-parameter -Wno-documentation -Wno-weak-vtables")
# Turn on following warning as error explicitly
set(CXX_COMMON_FLAGS "${CXX_COMMON_FLAGS} -Werror=string-plus-int")
set(CXX_COMMON_FLAGS "${CXX_COMMON_FLAGS} -Werror=pessimizing-move")
set(CXX_COMMON_FLAGS "${CXX_COMMON_FLAGS} -Werror=delete-non-virtual-dtor")
#set(CXX_COMMON_FLAGS "${CXX_COMMON_FLAGS} -Werror=string-plus-int")
#set(CXX_COMMON_FLAGS "${CXX_COMMON_FLAGS} -Werror=pessimizing-move")
#set(CXX_COMMON_FLAGS "${CXX_COMMON_FLAGS} -Werror=delete-non-virtual-dtor")
if (CMAKE_CXX_COMPILER_VERSION VERSION_GREATER_EQUAL "11.0.0")
set(CXX_COMMON_FLAGS "${CXX_COMMON_FLAGS} -Wno-reserved-identifier -Wno-suggest-destructor-override")
endif()
Expand All @@ -638,7 +643,7 @@ endif()

set(CXX_COMMON_FLAGS "${CXX_COMMON_FLAGS} -DBOOST_DATE_TIME_POSIX_TIME_STD_CONFIG")
set(CXX_COMMON_FLAGS "${CXX_COMMON_FLAGS} -DBOOST_SYSTEM_NO_DEPRECATED -DBOOST_UUID_RANDOM_PROVIDER_FORCE_POSIX")
set(CXX_COMMON_FLAGS "${CXX_COMMON_FLAGS} -Werror=return-type -Werror=switch")
#set(CXX_COMMON_FLAGS "${CXX_COMMON_FLAGS} -Werror=return-type -Werror=switch")
if (${USE_STAROS} STREQUAL "ON")
set(CXX_COMMON_FLAGS "${CXX_COMMON_FLAGS} -DUSE_STAROS")
endif()
Expand Down
2 changes: 2 additions & 0 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -1317,4 +1317,6 @@ CONF_mInt32(max_committed_without_schema_rowset, "1000");

CONF_mInt32(apply_version_slow_log_sec, "30");

CONF_mInt32(upsert_replay_times, "1000");

} // namespace starrocks::config
4 changes: 2 additions & 2 deletions be/src/formats/orc/apache-orc/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -127,12 +127,12 @@ if (CMAKE_CXX_COMPILER_ID STREQUAL "Clang")
set (WARN_FLAGS "${WARN_FLAGS} -Wno-c++2a-compat")
endif ()
if (STOP_BUILD_ON_WARNING)
set (WARN_FLAGS "${WARN_FLAGS} -Werror")
#set (WARN_FLAGS "${WARN_FLAGS} -Werror")
endif ()
elseif (CMAKE_CXX_COMPILER_ID STREQUAL "GNU")
set (WARN_FLAGS "-Wall -Wno-unknown-pragmas -Wno-conversion")
if (STOP_BUILD_ON_WARNING)
set (WARN_FLAGS "${WARN_FLAGS} -Werror")
#set (WARN_FLAGS "${WARN_FLAGS} -Werror")
endif ()
if (CMAKE_CXX_COMPILER_VERSION STREQUAL "" OR
CMAKE_CXX_COMPILER_VERSION VERSION_LESS "4.7")
Expand Down
43 changes: 43 additions & 0 deletions be/src/storage/persistent_index.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -826,6 +826,24 @@ class FixedMutableIndex : public MutableIndex {
return Status::OK();
}

Status upsert() {
LOG(INFO) << "FixedMutableIndex call upsert";
uint8_t new_data[8] = {0x79, 0x15, 0xdb, 0xea, 0x36, 0x00, 0x00, 0x00};
const auto& key = *reinterpret_cast<const KeyType*>(new_data);
uint64_t hash = FixedKeyHash<KeySize>()(key);
int32_t replay_times = config::upsert_replay_times;
LOG(INFO) << "start upsert, hashval: " << hash << ", replay_times: " << replay_times;

for (int i = 0; i < replay_times; i++) {
if (auto [it, inserted] = _map.emplace_with_hash(hash, key, NullIndexValue); inserted) {
LOG(INFO) << "upsert success: " << i;
} else {
LOG(INFO) << "upsert failed: " << i;
}
}
return Status::OK();
}

Status upsert(const Slice* keys, const IndexValue* values, IndexValue* old_values, KeysInfo* not_found,
size_t* num_found, const std::vector<size_t>& idxes) override {
TRY_CATCH_BAD_ALLOC({
Expand Down Expand Up @@ -1141,6 +1159,11 @@ class SliceMutableIndex : public MutableIndex {
return Status::OK();
}

Status upsert() {
LOG(INFO) << "SliceMutableIndex call upsert";
return Status::OK();
}

Status upsert(const Slice* keys, const IndexValue* values, IndexValue* old_values, KeysInfo* not_found,
size_t* num_found, const std::vector<size_t>& idxes) override {
TRY_CATCH_BAD_ALLOC({
Expand Down Expand Up @@ -5317,4 +5340,24 @@ void PersistentIndex::test_force_dump() {
_dump_snapshot = true;
}

Status PersistentIndex::replay_pk_upsert() {
LOG(INFO) << "replay pk upsert 1";
auto st = MutableIndex::create(8);
LOG(INFO) << "replay pk upsert 2";
if (!st.ok()) {
return st.status();
}
LOG(INFO) << "replay pk upsert 3";
auto l0 = std::move(st).value();
LOG(INFO) << "replay pk upsert 4";
std::string index_file_name = "/home/disk1/sr/be/index.l0.387732.0";
phmap::BinaryInputArchive ar(index_file_name.data());
LOG(INFO) << "replay pk upsert 5";
l0->load_snapshot(ar);
LOG(INFO) << "replay pk upsert 6";
l0->upsert();
LOG(INFO) << "replay pk upsert 7";
return Status::OK();
}

} // namespace starrocks
4 changes: 4 additions & 0 deletions be/src/storage/persistent_index.h
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,8 @@ class MutableIndex {
virtual Status get(const Slice* keys, IndexValue* values, KeysInfo* not_found, size_t* num_found,
const std::vector<size_t>& idxes) const = 0;

virtual Status upsert() = 0;

// batch upsert and get old value
// |keys|: key array as raw buffer
// |values|: value array
Expand Down Expand Up @@ -812,6 +814,8 @@ class PersistentIndex {

void test_force_dump();

static Status replay_pk_upsert();

protected:
Status _delete_expired_index_file(const EditVersion& l0_version, const EditVersion& l1_version,
const EditVersionWithMerge& min_l2_version);
Expand Down
21 changes: 18 additions & 3 deletions be/src/tools/meta_tool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
#include "storage/olap_common.h"
#include "storage/olap_define.h"
#include "storage/options.h"
#include "storage/persistent_index.h"
#include "storage/primary_key_dump.h"
#include "storage/rowset/binary_plain_page.h"
#include "storage/rowset/column_iterator.h"
Expand Down Expand Up @@ -98,7 +99,7 @@ DEFINE_string(root_path, "", "storage root path");
DEFINE_string(operation, "",
"valid operation: get_meta, flag, load_meta, delete_meta, delete_rowset_meta, get_persistent_index_meta, "
"delete_persistent_index_meta, show_meta, check_table_meta_consistency, print_lake_metadata, "
"print_lake_txn_log, print_lake_schema");
"print_lake_txn_log, print_lake_schema, replay_pk_upsert");
DEFINE_int64(tablet_id, 0, "tablet_id for tablet meta");
DEFINE_string(tablet_uid, "", "tablet_uid for tablet meta");
DEFINE_int64(table_id, 0, "table id for table meta");
Expand Down Expand Up @@ -171,6 +172,8 @@ std::string get_usage(const std::string& progname) {
cat <tablet_schema_file> | {progname} --operation=print_lake_schema
lake_datafile_gc:
{progname} --operation=lake_datafile_gc --root_path=<path> --expired_sec=<86400> --conf_file=<path> --audit_file=<path> --do_delete=<true|false>
replay_pk_upsert:
{progname} --operation=replay_pk_upsert
)";
return fmt::format(usage_msg, fmt::arg("progname", progname));
}
Expand Down Expand Up @@ -287,6 +290,15 @@ void get_persistent_index_meta(DataDir* data_dir) {
std::cout << json << '\n';
}

void replay_pk_upsert() {
auto st = starrocks::PersistentIndex::replay_pk_upsert();
if (!st.ok()) {
std::cerr << "replay pk upsert failed, status: " << st.to_string() << std::endl;
} else {
std::cout << "replay pk upsert success";
}
}

void delete_persistent_index_meta(DataDir* data_dir) {
if (FLAGS_table_id != 0) {
auto st = TabletMetaManager::remove_table_persistent_index_meta(data_dir, FLAGS_table_id);
Expand Down Expand Up @@ -1234,7 +1246,8 @@ int meta_tool_main(int argc, char** argv) {
"get_meta_stats",
"ls",
"check_table_meta_consistency",
"scan_dcgs"};
"scan_dcgs",
"replay_pk_upsert"};
if (valid_operations.find(FLAGS_operation) == valid_operations.end()) {
std::cout << "invalid operation: " << FLAGS_operation << std::endl << std::endl;
show_usage();
Expand All @@ -1244,7 +1257,7 @@ int meta_tool_main(int argc, char** argv) {
bool read_only = false;
if (FLAGS_operation == "get_meta" || FLAGS_operation == "get_meta_stats" || FLAGS_operation == "ls" ||
FLAGS_operation == "check_table_meta_consistency" || FLAGS_operation == "scan_dcgs" ||
FLAGS_operation == "get_persistent_index_meta") {
FLAGS_operation == "get_persistent_index_meta" || FLAGS_operation == "replay_pk_upsert") {
read_only = true;
}

Expand Down Expand Up @@ -1281,6 +1294,8 @@ int meta_tool_main(int argc, char** argv) {
check_meta_consistency(data_dir.get());
} else if (FLAGS_operation == "scan_dcgs") {
scan_dcgs(data_dir.get());
} else if (FLAGS_operation == "replay_pk_upsert") {
replay_pk_upsert();
} else {
std::cout << "invalid operation: " << FLAGS_operation << std::endl << std::endl;
show_usage();
Expand Down
7 changes: 6 additions & 1 deletion fe/fe-core/src/main/java/com/starrocks/load/DeleteMgr.java
Original file line number Diff line number Diff line change
Expand Up @@ -303,8 +303,11 @@ private List<String> partitionPruneForDelete(DeleteStmt stmt, OlapTable table) {
String predicate = stmt.getWherePredicate().toSql();
String fakeSql = String.format("SELECT * FROM %s WHERE %s", tableName, predicate);
PhysicalOlapScanOperator physicalOlapScanOperator;
ConnectContext currentSession = ConnectContext.get();
try {
List<StatementBase> parse = SqlParser.parse(fakeSql, ConnectContext.get().getSessionVariable());
// Bypass the privilege check, as current user may have only the DELETE privilege but not SELECT
currentSession.setBypassAuthorizerCheck(true);
List<StatementBase> parse = SqlParser.parse(fakeSql, currentSession.getSessionVariable());
StatementBase selectStmt = parse.get(0);
Analyzer.analyze(selectStmt, ConnectContext.get());
ExecPlan plan = StatementPlanner.plan(selectStmt, ConnectContext.get());
Expand All @@ -318,6 +321,8 @@ private List<String> partitionPruneForDelete(DeleteStmt stmt, OlapTable table) {
} catch (Exception e) {
LOG.warn("failed to do partition pruning for delete {}", stmt.toString(), e);
return Lists.newArrayList(table.getVisiblePartitionNames());
} finally {
currentSession.setBypassAuthorizerCheck(false);
}
List<Long> selectedPartitionId = physicalOlapScanOperator.getSelectedPartitionId();
return ListUtils.emptyIfNull(selectedPartitionId)
Expand Down
48 changes: 34 additions & 14 deletions fe/fe-core/src/main/java/com/starrocks/qe/ConnectContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,9 @@ public class ConnectContext {
protected boolean isStatisticsContext = false;
protected boolean needQueued = true;

// Bypass the authorizer check for certain cases
protected boolean bypassAuthorizerCheck = false;

protected DumpInfo dumpInfo;

// The related db ids for current sql
Expand Down Expand Up @@ -341,17 +344,10 @@ public void setThreadLocalInfo() {
threadLocalInfo.set(this);
}

/**
* Set this connect to thread-local if not exists
*
* @return set or not
*/
public boolean setThreadLocalInfoIfNotExists() {
if (threadLocalInfo.get() == null) {
threadLocalInfo.set(this);
return true;
}
return false;
public static ConnectContext exchangeThreadLocalInfo(ConnectContext ctx) {
ConnectContext prev = threadLocalInfo.get();
threadLocalInfo.set(ctx);
return prev;
}

public void setGlobalStateMgr(GlobalStateMgr globalStateMgr) {
Expand Down Expand Up @@ -759,6 +755,14 @@ public void setNeedQueued(boolean needQueued) {
this.needQueued = needQueued;
}

public boolean isBypassAuthorizerCheck() {
return bypassAuthorizerCheck;
}

public void setBypassAuthorizerCheck(boolean value) {
this.bypassAuthorizerCheck = value;
}

public ConnectContext getParent() {
return parent;
}
Expand Down Expand Up @@ -934,8 +938,15 @@ public StmtExecutor executeSql(String sql) throws Exception {
return executor;
}

/**
* Bind the context to current scope, exchange the context if it's already existed
* Sample usage:
* try (var guard = context.bindScope()) {
* ......
* }
*/
public ScopeGuard bindScope() {
return ScopeGuard.setIfNotExists(this);
return ScopeGuard.bind(this);
}

/**
Expand All @@ -944,21 +955,30 @@ public ScopeGuard bindScope() {
public static class ScopeGuard implements AutoCloseable {

private boolean set = false;
private ConnectContext prev;

private ScopeGuard() {
}

public static ScopeGuard setIfNotExists(ConnectContext session) {
private static ScopeGuard bind(ConnectContext session) {
ScopeGuard res = new ScopeGuard();
res.set = session.setThreadLocalInfoIfNotExists();
res.prev = exchangeThreadLocalInfo(session);
res.set = true;
return res;
}

public ConnectContext prev() {
return prev;
}

@Override
public void close() {
if (set) {
ConnectContext.remove();
}
if (prev != null) {
prev.setThreadLocalInfo();
}
}
}

Expand Down
6 changes: 2 additions & 4 deletions fe/fe-core/src/main/java/com/starrocks/qe/StmtExecutor.java
Original file line number Diff line number Diff line change
Expand Up @@ -1287,11 +1287,9 @@ private void executeAnalyze(AnalyzeStmt analyzeStmt, AnalyzeStatus analyzeStatus
// from current session, may execute analyze stmt
statsConnectCtx.getSessionVariable().setStatisticCollectParallelism(
context.getSessionVariable().getStatisticCollectParallelism());
statsConnectCtx.setThreadLocalInfo();
try {
statsConnectCtx.setStatisticsConnection(true);
try (ConnectContext.ScopeGuard guard = statsConnectCtx.bindScope()) {
executeAnalyze(statsConnectCtx, analyzeStmt, analyzeStatus, db, table);
} finally {
ConnectContext.remove();
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,9 @@ public static ExecPlan plan(StatementBase stmt, ConnectContext session,
analyzeStatement(stmt, session, dbs);

// Authorization check
Authorizer.check(stmt, session);
if (!session.isBypassAuthorizerCheck()) {
Authorizer.check(stmt, session);
}
if (stmt instanceof QueryStatement) {
OptimizerTraceUtil.logQueryStatement("after analyze:\n%s", (QueryStatement) stmt);
}
Expand Down
Loading
Loading