Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Fix](partition) Skip rowset partition id eq 0 smaller than config wh… #29363

Merged
merged 5 commits into from
Jan 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions be/src/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1120,6 +1120,9 @@ DEFINE_mInt32(s3_writer_buffer_allocation_timeout_second, "60");

DEFINE_mBool(enable_column_type_check, "true");

// Tolerance for the number of partition id 0 in rowset, default 0
DEFINE_Int32(ignore_invalid_partition_id_rowset_num, "0");

// clang-format off
#ifdef BE_TEST
// test s3
Expand Down
3 changes: 3 additions & 0 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -1176,6 +1176,9 @@ DECLARE_mInt32(s3_writer_buffer_allocation_timeout_second);

DECLARE_mBool(enable_column_type_check);

// Tolerance for the number of partition id 0 in rowset, default 0
DECLARE_Int32(ignore_invalid_partition_id_rowset_num);

#ifdef BE_TEST
// test s3
DECLARE_String(test_s3_resource);
Expand Down
36 changes: 32 additions & 4 deletions be/src/olap/data_dir.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -383,6 +383,12 @@ Status DataDir::load() {
if (rowset_meta->is_local()) {
rowset_meta->set_fs(local_fs);
}

if (rowset_meta->partition_id() == 0) {
LOG(WARNING) << "rs tablet=" << rowset_meta->tablet_id() << " rowset_id=" << rowset_id
<< " load from meta but partition id eq 0";
}

dir_rowset_metas.push_back(rowset_meta);
return true;
};
Expand Down Expand Up @@ -470,6 +476,19 @@ Status DataDir::load() {
};
TabletMetaManager::traverse_pending_publish(_meta, load_pending_publish_info_func);

int64_t rowset_partition_id_eq_0_num = 0;
for (auto rowset_meta : dir_rowset_metas) {
if (rowset_meta->partition_id() == 0) {
++rowset_partition_id_eq_0_num;
}
}
if (rowset_partition_id_eq_0_num > config::ignore_invalid_partition_id_rowset_num) {
LOG(FATAL) << fmt::format(
"roswet partition id eq 0 bigger than config {}, be exit, plz check be.INFO",
config::ignore_invalid_partition_id_rowset_num);
exit(-1);
}

// traverse rowset
// 1. add committed rowset to txn map
// 2. add visible rowset to tablet
Expand All @@ -486,6 +505,13 @@ Status DataDir::load() {
continue;
}

if (rowset_meta->partition_id() == 0) {
LOG(WARNING) << "skip tablet_id=" << tablet->tablet_id()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If there is a committed rowset with partition id = 0; and you ignore it. Then publish version will success and FE will finish the txn. But BE will not have this data, then data loss and query will failed

<< " rowset: " << rowset_meta->rowset_id()
<< " txn: " << rowset_meta->txn_id();
continue;
}

RowsetSharedPtr rowset;
Status create_status = tablet->create_rowset(rowset_meta, &rowset);
if (!create_status) {
Expand All @@ -499,8 +525,9 @@ Status DataDir::load() {
rowset_meta->tablet_uid() == tablet->tablet_uid()) {
if (!rowset_meta->tablet_schema()) {
rowset_meta->set_tablet_schema(tablet->tablet_schema());
RowsetMetaManager::save(_meta, rowset_meta->tablet_uid(), rowset_meta->rowset_id(),
rowset_meta->get_rowset_pb());
RETURN_IF_ERROR(RowsetMetaManager::save(_meta, rowset_meta->tablet_uid(),
rowset_meta->rowset_id(),
rowset_meta->get_rowset_pb(), false));
}
Status commit_txn_status = _txn_manager->commit_txn(
_meta, rowset_meta->partition_id(), rowset_meta->txn_id(),
Expand All @@ -527,8 +554,9 @@ Status DataDir::load() {
rowset_meta->tablet_uid() == tablet->tablet_uid()) {
if (!rowset_meta->tablet_schema()) {
rowset_meta->set_tablet_schema(tablet->tablet_schema());
RowsetMetaManager::save(_meta, rowset_meta->tablet_uid(), rowset_meta->rowset_id(),
rowset_meta->get_rowset_pb());
RETURN_IF_ERROR(RowsetMetaManager::save(_meta, rowset_meta->tablet_uid(),
rowset_meta->rowset_id(),
rowset_meta->get_rowset_pb(), false));
}
Status publish_status = tablet->add_rowset(rowset);
if (!publish_status && !publish_status.is<PUSH_VERSION_ALREADY_EXIST>()) {
Expand Down
16 changes: 12 additions & 4 deletions be/src/olap/rowset/rowset_meta_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
#include "olap/olap_define.h"
#include "olap/olap_meta.h"
#include "olap/utils.h"
#include "util/debug_points.h"

namespace doris {
namespace {
Expand Down Expand Up @@ -98,15 +99,22 @@ Status RowsetMetaManager::save(OlapMeta* meta, TabletUid tablet_uid, const Rowse
// return Status::InternalError("invaid partition id {} tablet {}",
// rowset_meta_pb.partition_id(), rowset_meta_pb.tablet_id());
}
DBUG_EXECUTE_IF("RowsetMetaManager::save::zero_partition_id", {
long partition_id = rowset_meta_pb.partition_id();
auto& rs_pb = const_cast<std::decay_t<decltype(rowset_meta_pb)>&>(rowset_meta_pb);
rs_pb.set_partition_id(0);
LOG(WARNING) << "set debug point RowsetMetaManager::save::zero_partition_id old="
<< partition_id << " new=" << rowset_meta_pb.DebugString();
});
if (enable_binlog) {
return _save_with_binlog(meta, tablet_uid, rowset_id, rowset_meta_pb);
} else {
return save(meta, tablet_uid, rowset_id, rowset_meta_pb);
return _save(meta, tablet_uid, rowset_id, rowset_meta_pb);
}
}

Status RowsetMetaManager::save(OlapMeta* meta, TabletUid tablet_uid, const RowsetId& rowset_id,
const RowsetMetaPB& rowset_meta_pb) {
Status RowsetMetaManager::_save(OlapMeta* meta, TabletUid tablet_uid, const RowsetId& rowset_id,
const RowsetMetaPB& rowset_meta_pb) {
std::string key =
fmt::format("{}{}_{}", ROWSET_PREFIX, tablet_uid.to_string(), rowset_id.to_string());
std::string value;
Expand Down Expand Up @@ -523,7 +531,7 @@ Status RowsetMetaManager::load_json_rowset_meta(OlapMeta* meta,
}
RowsetId rowset_id = rowset_meta.rowset_id();
TabletUid tablet_uid = rowset_meta.tablet_uid();
Status status = save(meta, tablet_uid, rowset_id, rowset_meta.get_rowset_pb());
Status status = save(meta, tablet_uid, rowset_id, rowset_meta.get_rowset_pb(), false);
return status;
}

Expand Down
4 changes: 2 additions & 2 deletions be/src/olap/rowset/rowset_meta_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,6 @@ class RowsetMetaManager {
// TODO(Drogon): refactor save && _save_with_binlog to one, adapt to ut temperately
static Status save(OlapMeta* meta, TabletUid tablet_uid, const RowsetId& rowset_id,
const RowsetMetaPB& rowset_meta_pb, bool enable_binlog);
static Status save(OlapMeta* meta, TabletUid tablet_uid, const RowsetId& rowset_id,
const RowsetMetaPB& rowset_meta_pb);

static std::vector<std::string> get_binlog_filenames(OlapMeta* meta, TabletUid tablet_uid,
std::string_view binlog_version,
Expand All @@ -79,6 +77,8 @@ class RowsetMetaManager {
static Status load_json_rowset_meta(OlapMeta* meta, const std::string& rowset_meta_path);

private:
static Status _save(OlapMeta* meta, TabletUid tablet_uid, const RowsetId& rowset_id,
const RowsetMetaPB& rowset_meta_pb);
static Status _save_with_binlog(OlapMeta* meta, TabletUid tablet_uid, const RowsetId& rowset_id,
const RowsetMetaPB& rowset_meta_pb);
static Status _get_rowset_binlog_metas(OlapMeta* meta, const TabletUid tablet_uid,
Expand Down
4 changes: 4 additions & 0 deletions be/src/olap/tablet_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -844,6 +844,10 @@ Status TabletManager::load_tablet_from_meta(DataDir* data_dir, TTabletId tablet_
tablet_meta->set_tablet_state(TABLET_RUNNING);
}

if (tablet_meta->partition_id() == 0) {
LOG(WARNING) << "tablet=" << tablet_id << " load from meta but partition id eq 0";
}

TabletSharedPtr tablet = Tablet::create_tablet_from_meta(tablet_meta, data_dir);
if (tablet == nullptr) {
return Status::Error<TABLE_CREATE_FROM_HEADER_ERROR>(
Expand Down
11 changes: 11 additions & 0 deletions be/src/olap/tablet_meta.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
#include "olap/olap_define.h"
#include "olap/tablet_meta_manager.h"
#include "olap/utils.h"
#include "util/debug_points.h"
#include "util/string_util.h"
#include "util/time.h"
#include "util/uid_util.h"
Expand Down Expand Up @@ -471,6 +472,16 @@ Status TabletMeta::_save_meta(DataDir* data_dir) {
Status TabletMeta::serialize(string* meta_binary) {
TabletMetaPB tablet_meta_pb;
to_meta_pb(&tablet_meta_pb);
if (tablet_meta_pb.partition_id() <= 0) {
LOG(WARNING) << "invalid partition id " << tablet_meta_pb.partition_id() << " tablet "
<< tablet_meta_pb.tablet_id();
}
DBUG_EXECUTE_IF("TabletMeta::serialize::zero_partition_id", {
long partition_id = tablet_meta_pb.partition_id();
tablet_meta_pb.set_partition_id(0);
LOG(WARNING) << "set debug point TabletMeta::serialize::zero_partition_id old="
<< partition_id << " new=" << tablet_meta_pb.DebugString();
});
bool serialize_success = tablet_meta_pb.SerializeToString(meta_binary);
if (!serialize_success) {
LOG(FATAL) << "failed to serialize meta " << full_name();
Expand Down
17 changes: 9 additions & 8 deletions be/src/olap/txn_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,13 @@ Status TxnManager::commit_txn(OlapMeta* meta, TPartitionId partition_id,
do {
// get tx
std::shared_lock rdlock(_get_txn_map_lock(transaction_id));
auto rs_pb = rowset_ptr->rowset_meta()->get_rowset_pb();
// TODO(dx): remove log after fix partition id eq 0 bug
if (!rs_pb.has_partition_id() || rs_pb.partition_id() == 0) {
rowset_ptr->rowset_meta()->set_partition_id(partition_id);
LOG(WARNING) << "cant get partition id from rs pb, get from func arg partition_id="
<< partition_id;
}
txn_tablet_map_t& txn_tablet_map = _get_txn_tablet_map(transaction_id);
auto it = txn_tablet_map.find(key);
if (it == txn_tablet_map.end()) {
Expand Down Expand Up @@ -335,15 +342,9 @@ Status TxnManager::commit_txn(OlapMeta* meta, TPartitionId partition_id,
// save meta need access disk, it maybe very slow, so that it is not in global txn lock
// it is under a single txn lock
if (!is_recovery) {
auto rs_pb = rowset_ptr->rowset_meta()->get_rowset_pb();
// TODO(dx): remove log after fix partition id eq 0 bug
if (!rs_pb.has_partition_id() || rs_pb.partition_id() == 0) {
rs_pb.set_partition_id(partition_id);
LOG(WARNING) << "cant get partition id from rs pb, get from func arg partition_id="
<< partition_id;
}
Status save_status =
RowsetMetaManager::save(meta, tablet_uid, rowset_ptr->rowset_id(), rs_pb);
RowsetMetaManager::save(meta, tablet_uid, rowset_ptr->rowset_id(),
rowset_ptr->rowset_meta()->get_rowset_pb(), false);
DBUG_EXECUTE_IF("TxnManager.RowsetMetaManager.save_wait", {
if (auto wait = dp->param<int>("duration", 0); wait > 0) {
LOG_WARNING("TxnManager.RowsetMetaManager.save_wait").tag("wait ms", wait);
Expand Down
2 changes: 1 addition & 1 deletion be/test/olap/rowset/rowset_meta_manager_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ TEST_F(RowsetMetaManagerTest, TestSaveAndGetAndRemove) {
EXPECT_EQ(rowset_meta.rowset_id(), rowset_id);
RowsetMetaPB rowset_meta_pb;
rowset_meta.to_rowset_pb(&rowset_meta_pb);
Status status = RowsetMetaManager::save(_meta, _tablet_uid, rowset_id, rowset_meta_pb);
Status status = RowsetMetaManager::save(_meta, _tablet_uid, rowset_id, rowset_meta_pb, false);
EXPECT_TRUE(status == Status::OK());
EXPECT_TRUE(RowsetMetaManager::check_rowset_meta(_meta, _tablet_uid, rowset_id));
std::string json_rowset_meta_read;
Expand Down
Loading