Skip to content

Commit

Permalink
add alignment process for flexible partial update in flush phase
Browse files Browse the repository at this point in the history
add debug logs

fix be

fix sequence col

remove some debug logs

tmp

t1
  • Loading branch information
bobhan1 committed Aug 22, 2024
1 parent 098c0d3 commit ace9799
Show file tree
Hide file tree
Showing 12 changed files with 529 additions and 60 deletions.
28 changes: 15 additions & 13 deletions be/src/olap/base_tablet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -569,8 +569,8 @@ Status BaseTablet::calc_segment_delete_bitmap(RowsetSharedPtr rowset,
rowset_schema = rowset_schema->copy_without_variant_extracted_columns();
}
// use for partial update
PartialUpdateReadPlan read_plan_ori;
PartialUpdateReadPlan read_plan_update;
FixedReadPlan read_plan_ori;
FixedReadPlan read_plan_update;
int64_t conflict_rows = 0;
int64_t new_generated_rows = 0;

Expand Down Expand Up @@ -733,9 +733,13 @@ Status BaseTablet::calc_segment_delete_bitmap(RowsetSharedPtr rowset,
if (pos > 0) {
auto partial_update_info = rowset_writer->get_partial_update_info();
DCHECK(partial_update_info);
RETURN_IF_ERROR(generate_new_block_for_partial_update(
rowset_schema, partial_update_info.get(), read_plan_ori, read_plan_update,
rsid_to_rowset, &block));
if (partial_update_info->is_fixed_partial_update()) {
RETURN_IF_ERROR(generate_new_block_for_partial_update(
rowset_schema, partial_update_info.get(), read_plan_ori, read_plan_update,
rsid_to_rowset, &block));
} else {
// TODO(bobhan1): add support for flexible partial update
}
RETURN_IF_ERROR(sort_block(block, ordered_block));
RETURN_IF_ERROR(rowset_writer->flush_single_block(&ordered_block));
if (new_generated_rows != rowset_writer->num_rows()) {
Expand Down Expand Up @@ -861,7 +865,7 @@ Status BaseTablet::fetch_value_by_rowids(RowsetSharedPtr input_rowset, uint32_t
return Status::OK();
}

const signed char* BaseTablet::get_delete_sign_column_data(vectorized::Block& block,
const signed char* BaseTablet::get_delete_sign_column_data(const vectorized::Block& block,
size_t rows_at_least) {
if (const vectorized::ColumnWithTypeAndName* delete_sign_column =
block.try_get_by_name(DELETE_SIGN);
Expand Down Expand Up @@ -897,7 +901,7 @@ Status BaseTablet::generate_default_value_block(const TabletSchema& schema,

Status BaseTablet::generate_new_block_for_partial_update(
TabletSchemaSPtr rowset_schema, const PartialUpdateInfo* partial_update_info,
const PartialUpdateReadPlan& read_plan_ori, const PartialUpdateReadPlan& read_plan_update,
const FixedReadPlan& read_plan_ori, const FixedReadPlan& read_plan_update,
const std::map<RowsetId, RowsetSharedPtr>& rsid_to_rowset,
vectorized::Block* output_block) {
// do partial update related works
Expand All @@ -923,8 +927,7 @@ Status BaseTablet::generate_new_block_for_partial_update(
for (auto i = 0; i < update_cids.size(); ++i) {
for (auto idx = 0; idx < update_rows; ++idx) {
full_mutable_columns[update_cids[i]]->insert_from(
*update_block.get_columns_with_type_and_name()[i].column.get(),
read_index_update[idx]);
*update_block.get_by_position(i).column, read_index_update[idx]);
}
}

Expand Down Expand Up @@ -974,7 +977,7 @@ Status BaseTablet::generate_new_block_for_partial_update(
} else if (old_block_delete_signs != nullptr &&
old_block_delete_signs[read_index_old[idx]] != 0) {
if (rs_column.has_default_value()) {
mutable_column->insert_from(*mutable_default_value_columns[i].get(), 0);
mutable_column->insert_from(*mutable_default_value_columns[i], 0);
} else if (rs_column.is_nullable()) {
assert_cast<vectorized::ColumnNullable*, TypeCheckOnRelease::DISABLE>(
mutable_column.get())
Expand All @@ -983,9 +986,8 @@ Status BaseTablet::generate_new_block_for_partial_update(
mutable_column->insert_default();
}
} else {
mutable_column->insert_from(
*old_block.get_columns_with_type_and_name()[i].column.get(),
read_index_old[idx]);
mutable_column->insert_from(*old_block.get_by_position(i).column,
read_index_old[idx]);
}
}
}
Expand Down
9 changes: 5 additions & 4 deletions be/src/olap/base_tablet.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include "common/status.h"
#include "olap/iterators.h"
#include "olap/olap_common.h"
#include "olap/partial_update_info.h"
#include "olap/rowset/segment_v2/segment.h"
#include "olap/tablet_fwd.h"
#include "olap/tablet_meta.h"
Expand All @@ -39,7 +40,7 @@ class CalcDeleteBitmapToken;
class SegmentCacheHandle;
class RowIdConversion;
struct PartialUpdateInfo;
class PartialUpdateReadPlan;
class FixedReadPlan;

struct TabletWithVersion {
BaseTabletSPtr tablet;
Expand Down Expand Up @@ -187,7 +188,7 @@ class BaseTablet {
int64_t txn_id, const RowsetIdUnorderedSet& rowset_ids,
std::vector<RowsetSharedPtr>* rowsets = nullptr);

static const signed char* get_delete_sign_column_data(vectorized::Block& block,
static const signed char* get_delete_sign_column_data(const vectorized::Block& block,
size_t rows_at_least = 0);

static Status generate_default_value_block(const TabletSchema& schema,
Expand All @@ -198,8 +199,8 @@ class BaseTablet {

static Status generate_new_block_for_partial_update(
TabletSchemaSPtr rowset_schema, const PartialUpdateInfo* partial_update_info,
const PartialUpdateReadPlan& read_plan_ori,
const PartialUpdateReadPlan& read_plan_update,
const FixedReadPlan& read_plan_ori,
const FixedReadPlan& read_plan_update,
const std::map<RowsetId, RowsetSharedPtr>& rsid_to_rowset,
vectorized::Block* output_block);

Expand Down
177 changes: 159 additions & 18 deletions be/src/olap/partial_update_info.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#include "olap/rowset/rowset_writer_context.h"
#include "olap/tablet_schema.h"
#include "olap/utils.h"
#include "util/bitmap_value.h"
#include "vec/common/assert_cast.h"
#include "vec/core/block.h"

Expand All @@ -50,19 +51,27 @@ void PartialUpdateInfo::init(const TabletSchema& tablet_schema, bool fixed_parti
this->timezone = timezone;
missing_cids.clear();
update_cids.clear();

for (auto i = 0; i < tablet_schema.num_columns(); ++i) {
auto tablet_column = tablet_schema.column(i);
if (!partial_update_input_columns.contains(tablet_column.name())) {
missing_cids.emplace_back(i);
if (!tablet_column.has_default_value() && !tablet_column.is_nullable() &&
tablet_schema.auto_increment_column() != tablet_column.name()) {
can_insert_new_rows_in_partial_update = false;
if (fixed_partial_update) {
auto tablet_column = tablet_schema.column(i);
if (!partial_update_input_columns.contains(tablet_column.name())) {
missing_cids.emplace_back(i);
if (!tablet_column.has_default_value() && !tablet_column.is_nullable() &&
tablet_schema.auto_increment_column() != tablet_column.name()) {
can_insert_new_rows_in_partial_update = false;
}
} else {
update_cids.emplace_back(i);
}
if (auto_increment_column == tablet_column.name()) {
is_schema_contains_auto_inc_column = true;
}
} else {
update_cids.emplace_back(i);
}
if (auto_increment_column == tablet_column.name()) {
is_schema_contains_auto_inc_column = true;
// in flexible partial update, missing cids is all non sort keys' cid
if (i >= tablet_schema.num_key_columns()) {
missing_cids.emplace_back(i);
}
}
}
this->is_strict_mode = is_strict_mode;
Expand Down Expand Up @@ -189,13 +198,13 @@ void PartialUpdateInfo::_generate_default_values_for_missing_cids(
CHECK_EQ(missing_cids.size(), default_values.size());
}

void PartialUpdateReadPlan::prepare_to_read(const RowLocation& row_location, size_t pos) {
void FixedReadPlan::prepare_to_read(const RowLocation& row_location, size_t pos) {
plan[row_location.rowset_id][row_location.segment_id].emplace_back(row_location.row_id, pos);
}

// read columns by read plan
// read_index: ori_pos-> block_idx
Status PartialUpdateReadPlan::read_columns_by_plan(
Status FixedReadPlan::read_columns_by_plan(
const TabletSchema& tablet_schema, const std::vector<uint32_t> cids_to_read,
const std::map<RowsetId, RowsetSharedPtr>& rsid_to_rowset, vectorized::Block& block,
std::map<uint32_t, uint32_t>* read_index, const signed char* __restrict skip_map) const {
Expand Down Expand Up @@ -239,7 +248,7 @@ Status PartialUpdateReadPlan::read_columns_by_plan(
return Status::OK();
}

Status PartialUpdateReadPlan::fill_missing_columns(
Status FixedReadPlan::fill_missing_columns(
RowsetWriterContext* rowset_ctx, const std::map<RowsetId, RowsetSharedPtr>& rsid_to_rowset,
const TabletSchema& tablet_schema, vectorized::Block& full_block,
const std::vector<bool>& use_default_or_null_flag, bool has_default_or_nullable,
Expand All @@ -250,7 +259,7 @@ Status PartialUpdateReadPlan::fill_missing_columns(
auto old_value_block = tablet_schema.create_block_by_cids(missing_cids);
CHECK_EQ(missing_cids.size(), old_value_block.columns());

// record real pos, key is input line num, value is old_block line num
// segment pos to write -> rowid to read in old_value_block
std::map<uint32_t, uint32_t> read_index;
RETURN_IF_ERROR(read_columns_by_plan(tablet_schema, missing_cids, rsid_to_rowset,
old_value_block, &read_index, nullptr));
Expand All @@ -274,7 +283,8 @@ Status PartialUpdateReadPlan::fill_missing_columns(
// be found in Tablet::lookup_row_key() and `use_default_or_null_flag[idx]` will be false. But we should not
// read values from old rows for missing values in this occasion. So we should read the DELETE_SIGN column
// to check if a row REALLY exists in the table.
auto pos_in_old_block = read_index[idx + segment_start_pos];
auto segment_pos = idx + segment_start_pos;
auto pos_in_old_block = read_index[segment_pos];
if (use_default_or_null_flag[idx] || (delete_sign_column_data != nullptr &&
delete_sign_column_data[pos_in_old_block] != 0)) {
for (auto i = 0; i < missing_cids.size(); ++i) {
Expand All @@ -284,7 +294,7 @@ Status PartialUpdateReadPlan::fill_missing_columns(
auto& missing_col = mutable_full_columns[missing_cids[i]];
// clang-format off
if (tablet_column.has_default_value()) {
missing_col->insert_from(*mutable_default_value_columns[i].get(), 0);
missing_col->insert_from(*mutable_default_value_columns[i], 0);
} else if (tablet_column.is_nullable()) {
auto* nullable_column =
assert_cast<vectorized::ColumnNullable*, TypeCheckOnRelease::DISABLE>(missing_col.get());
Expand All @@ -310,10 +320,141 @@ Status PartialUpdateReadPlan::fill_missing_columns(
}
for (auto i = 0; i < missing_cids.size(); ++i) {
mutable_full_columns[missing_cids[i]]->insert_from(
*old_value_block.get_columns_with_type_and_name()[i].column.get(),
pos_in_old_block);
*old_value_block.get_by_position(i).column, pos_in_old_block);
}
}
full_block.set_columns(std::move(mutable_full_columns));
return Status::OK();
}

void FlexibleReadPlan::prepare_to_read(const RowLocation& row_location, size_t pos,
const BitmapValue& skip_bitmap) {
for (uint64_t col_uid : skip_bitmap) {
plan[row_location.rowset_id][row_location.segment_id][col_uid].emplace_back(
row_location.row_id, pos);
}
}

Status FlexibleReadPlan::read_columns_by_plan(
const TabletSchema& tablet_schema,
const std::map<RowsetId, RowsetSharedPtr>& rsid_to_rowset,
vectorized::Block& old_value_block,
std::map<uint32_t, std::map<uint32_t, uint32_t>>* read_index,
const signed char* __restrict skip_map) const {
auto mutable_columns = old_value_block.mutate_columns();

// cid -> next rid to fill in block
std::map<uint32_t, uint32_t> next_read_idx;
for (std::size_t cid {0}; cid < tablet_schema.num_columns(); cid++) {
next_read_idx[cid] = 0;
}

for (const auto& [rowset_id, segment_mappings] : plan) {
for (const auto& [segment_id, uid_mappings] : segment_mappings) {
for (const auto& [col_uid, mappings] : uid_mappings) {
auto rowset_iter = rsid_to_rowset.find(rowset_id);
CHECK(rowset_iter != rsid_to_rowset.end());
auto cid = tablet_schema.field_index(col_uid);
DCHECK(cid != -1);
DCHECK(cid >= tablet_schema.num_key_columns());
std::vector<uint32_t> rids;
for (auto [rid, pos] : mappings) {
if (skip_map && skip_map[pos]) {
continue;
}
rids.emplace_back(rid);
(*read_index)[cid][pos] = next_read_idx[cid]++;
}

TabletColumn tablet_column = tablet_schema.column(cid);
auto idx = cid - tablet_schema.num_key_columns();
RETURN_IF_ERROR(doris::BaseTablet::fetch_value_by_rowids(
rowset_iter->second, segment_id, rids, tablet_column,
mutable_columns[idx]));
}
}
}
// !!!ATTENTION!!!: columns in block may have different size because every row has different columns to update
old_value_block.set_columns(std::move(mutable_columns));
return Status::OK();
}

Status FlexibleReadPlan::fill_non_sort_key_columns(
RowsetWriterContext* rowset_ctx, const std::map<RowsetId, RowsetSharedPtr>& rsid_to_rowset,
const TabletSchema& tablet_schema, vectorized::Block& full_block,
const std::vector<bool>& use_default_or_null_flag, bool has_default_or_nullable,
const std::size_t segment_start_pos, const std::size_t block_start_pos,
const vectorized::Block* block, std::vector<BitmapValue>* skip_bitmaps) const {
auto mutable_full_columns = full_block.mutate_columns();

// missing_cids are all non key columns' cids
const auto& non_sort_key_cids = rowset_ctx->partial_update_info->missing_cids;
auto old_value_block = tablet_schema.create_block_by_cids(non_sort_key_cids);
CHECK_EQ(non_sort_key_cids.size(), old_value_block.columns());

// cid -> segment pos to write -> rowid to read in old_value_block
std::map<uint32_t, std::map<uint32_t, uint32_t>> read_index;
RETURN_IF_ERROR(read_columns_by_plan(tablet_schema, rsid_to_rowset, old_value_block,
&read_index, nullptr));
// !!!ATTENTION!!!: columns in old_value_block may have different size because every row has different columns to update

const auto* delete_sign_column_data = BaseTablet::get_delete_sign_column_data(old_value_block);

// build default value columns
auto default_value_block = old_value_block.clone_empty();
if (has_default_or_nullable || delete_sign_column_data != nullptr) {
RETURN_IF_ERROR(BaseTablet::generate_default_value_block(
tablet_schema, non_sort_key_cids, rowset_ctx->partial_update_info->default_values,
old_value_block, default_value_block));
}

// fill all non sort key columns from mutable_old_columns, need to consider default value and null value
for (std::size_t i {0}; i < non_sort_key_cids.size(); i++) {
auto cid = non_sort_key_cids[i];
const auto& tablet_column = tablet_schema.column(cid);
auto col_uid = tablet_column.unique_id();
auto& cur_col = mutable_full_columns[cid];
for (auto idx = 0; idx < use_default_or_null_flag.size(); idx++) {
auto segment_pos = segment_start_pos + idx;
auto block_pos = block_start_pos + idx;
if (skip_bitmaps->at(block_pos).contains(col_uid)) {
DCHECK(cid != tablet_schema.skip_bitmap_col_idx());
DCHECK(cid != tablet_schema.version_col_idx());
DCHECK(!tablet_column.is_row_store_column());

// `use_default_or_null_flag[idx] == false` doesn't mean that we should read values from the old row
// for the missing columns. For example, if a table has sequence column, the rows with DELETE_SIGN column
// marked will not be marked in delete bitmap(see https://github.com/apache/doris/pull/24011), so it will
// be found in Tablet::lookup_row_key() and `use_default_or_null_flag[idx]` will be false. But we should not
// read values from old rows for missing values in this occasion. So we should read the DELETE_SIGN column
// to check if a row REALLY exists in the table.
auto delete_sign_pos = read_index[tablet_schema.delete_sign_idx()][segment_pos];
if (use_default_or_null_flag[idx] ||
(delete_sign_column_data != nullptr &&
delete_sign_column_data[delete_sign_pos] != 0)) {
if (tablet_column.has_default_value()) {
cur_col->insert_from(*default_value_block.get_by_position(i).column, 0);
} else if (tablet_column.is_nullable()) {
assert_cast<vectorized::ColumnNullable*, TypeCheckOnRelease::DISABLE>(
cur_col.get())
->insert_null_elements(1);
} else {
// If the control flow reaches this branch, the column neither has default value
// nor is nullable. It means that the row's delete sign is marked, and the value
// columns are useless and won't be read. So we can just put arbitary values in the cells
cur_col->insert_default();
}
} else {
auto pos_in_old_block = read_index.at(cid).at(segment_pos);
cur_col->insert_from(*old_value_block.get_by_position(i).column,
pos_in_old_block);
}
} else {
cur_col->insert_from(*block->get_by_position(cid).column, block_pos);
}
}
}
full_block.set_columns(std::move(mutable_full_columns));
return Status::OK();
}

Expand Down
Loading

0 comments on commit ace9799

Please sign in to comment.