diff --git a/be/src/olap/base_tablet.cpp b/be/src/olap/base_tablet.cpp index 85c62b0f7839e0..aa617ff1224870 100644 --- a/be/src/olap/base_tablet.cpp +++ b/be/src/olap/base_tablet.cpp @@ -441,7 +441,8 @@ Status BaseTablet::lookup_row_key(const Slice& encoded_key, TabletSchema* latest const std::vector& specified_rowsets, RowLocation* row_location, uint32_t version, std::vector>& segment_caches, - RowsetSharedPtr* rowset, bool with_rowid) { + RowsetSharedPtr* rowset, bool with_rowid, + std::string* encoded_seq_value) { SCOPED_BVAR_LATENCY(g_tablet_lookup_rowkey_latency); size_t seq_col_length = 0; // use the latest tablet schema to decide if the tablet has sequence column currently @@ -489,7 +490,7 @@ Status BaseTablet::lookup_row_key(const Slice& encoded_key, TabletSchema* latest for (auto id : picked_segments) { Status s = segments[id]->lookup_row_key(encoded_key, schema, with_seq_col, with_rowid, - &loc); + &loc, encoded_seq_value); if (s.is()) { continue; } diff --git a/be/src/olap/base_tablet.h b/be/src/olap/base_tablet.h index 18e0ad6038fbd4..ee471276fbcb47 100644 --- a/be/src/olap/base_tablet.h +++ b/be/src/olap/base_tablet.h @@ -154,7 +154,8 @@ class BaseTablet { const std::vector& specified_rowsets, RowLocation* row_location, uint32_t version, std::vector>& segment_caches, - RowsetSharedPtr* rowset = nullptr, bool with_rowid = true); + RowsetSharedPtr* rowset = nullptr, bool with_rowid = true, + std::string* encoded_seq_value = nullptr); // calc delete bitmap when flush memtable, use a fake version to calc // For example, cur max version is 5, and we use version 6 to calc but diff --git a/be/src/olap/partial_update_info.cpp b/be/src/olap/partial_update_info.cpp index c64e0ca2bd207a..3e8c5d9750c81a 100644 --- a/be/src/olap/partial_update_info.cpp +++ b/be/src/olap/partial_update_info.cpp @@ -397,7 +397,7 @@ Status FixedReadPlan::fill_missing_columns( void FlexibleReadPlan::prepare_to_read(const RowLocation& row_location, size_t pos, const BitmapValue& skip_bitmap) { - if (!has_row_store) { + if (!use_row_store) { for (uint64_t col_uid : skip_bitmap) { plan[row_location.rowset_id][row_location.segment_id][col_uid].emplace_back( row_location.row_id, pos); @@ -408,10 +408,6 @@ void FlexibleReadPlan::prepare_to_read(const RowLocation& row_location, size_t p } } -void FlexibleReadPlan::set_row_store(bool has_row_store_column) { - has_row_store = has_row_store_column; -} - Status FlexibleReadPlan::read_columns_by_plan( const TabletSchema& tablet_schema, const std::map& rsid_to_rowset, @@ -456,7 +452,7 @@ Status FlexibleReadPlan::read_columns_by_plan( const TabletSchema& tablet_schema, const std::vector& cids_to_read, const std::map& rsid_to_rowset, vectorized::Block& old_value_block, std::map* read_index) const { - DCHECK(has_row_store); + DCHECK(use_row_store); auto mutable_columns = old_value_block.mutate_columns(); size_t read_idx = 0; for (const auto& [rowset_id, segment_row_mappings] : row_store_plan) { @@ -491,14 +487,13 @@ Status FlexibleReadPlan::fill_non_primary_key_columns( const std::size_t segment_start_pos, const std::size_t block_start_pos, const vectorized::Block* block, std::vector* skip_bitmaps) const { auto mutable_full_columns = full_block.mutate_columns(); - bool has_row_column = tablet_schema.has_row_store_for_all_columns(); // missing_cids are all non sort key columns' cids const auto& non_sort_key_cids = rowset_ctx->partial_update_info->missing_cids; auto old_value_block = tablet_schema.create_block_by_cids(non_sort_key_cids); CHECK_EQ(non_sort_key_cids.size(), old_value_block.columns()); - if (!has_row_column) { + if (!use_row_store) { RETURN_IF_ERROR(fill_non_primary_key_columns_for_column_store( rowset_ctx, rsid_to_rowset, tablet_schema, non_sort_key_cids, old_value_block, mutable_full_columns, use_default_or_null_flag, has_default_or_nullable, diff --git a/be/src/olap/partial_update_info.h b/be/src/olap/partial_update_info.h index 30a601d9e7182c..9f4ae504ff749f 100644 --- a/be/src/olap/partial_update_info.h +++ b/be/src/olap/partial_update_info.h @@ -130,7 +130,7 @@ class FixedReadPlan { class FlexibleReadPlan { public: - void set_row_store(bool has_row_store_column); + FlexibleReadPlan(bool has_row_store_for_column) : use_row_store(has_row_store_for_column) {} void prepare_to_read(const RowLocation& row_location, size_t pos, const BitmapValue& skip_bitmap); // for column store @@ -171,10 +171,9 @@ class FlexibleReadPlan { const vectorized::Block* block, std::vector* skip_bitmaps) const; private: + bool use_row_store {false}; // rowset_id -> segment_id -> column unique id -> mappings std::map>>> plan; - - bool has_row_store {false}; std::map>> row_store_plan; }; diff --git a/be/src/olap/rowset/segment_v2/segment.cpp b/be/src/olap/rowset/segment_v2/segment.cpp index dca3ba54b9d4bc..db2774cede7641 100644 --- a/be/src/olap/rowset/segment_v2/segment.cpp +++ b/be/src/olap/rowset/segment_v2/segment.cpp @@ -921,7 +921,8 @@ Status Segment::new_inverted_index_iterator(const TabletColumn& tablet_column, } Status Segment::lookup_row_key(const Slice& key, const TabletSchema* latest_schema, - bool with_seq_col, bool with_rowid, RowLocation* row_location) { + bool with_seq_col, bool with_rowid, RowLocation* row_location, + std::string* encoded_seq_value) { RETURN_IF_ERROR(load_pk_index_and_bf()); bool has_seq_col = latest_schema->has_sequence_col(); bool has_rowid = !latest_schema->cluster_key_idxes().empty(); @@ -970,6 +971,7 @@ Status Segment::lookup_row_key(const Slice& key, const TabletSchema* latest_sche Slice sought_key_without_seq = Slice( sought_key.get_data(), sought_key.get_size() - (segment_has_seq_col ? seq_col_length : 0) - rowid_length); + if (has_seq_col) { // compare key if (key_without_seq.compare(sought_key_without_seq) != 0) { @@ -1007,6 +1009,16 @@ Status Segment::lookup_row_key(const Slice& key, const TabletSchema* latest_sche (uint8_t*)&row_location->row_id)); } + if (encoded_seq_value) { + if (!segment_has_seq_col) { + *encoded_seq_value = std::string {}; + } else { + // include marker + Slice encoded_seq_value_slice = Slice( + sought_key.get_data() + sought_key_without_seq.get_size(), seq_col_length); + *encoded_seq_value = encoded_seq_value_slice.to_string(); + } + } return Status::OK(); } diff --git a/be/src/olap/rowset/segment_v2/segment.h b/be/src/olap/rowset/segment_v2/segment.h index a4f01873f4c74f..72614e5cd5ece9 100644 --- a/be/src/olap/rowset/segment_v2/segment.h +++ b/be/src/olap/rowset/segment_v2/segment.h @@ -130,7 +130,8 @@ class Segment : public std::enable_shared_from_this { } Status lookup_row_key(const Slice& key, const TabletSchema* latest_schema, bool with_seq_col, - bool with_rowid, RowLocation* row_location); + bool with_rowid, RowLocation* row_location, + std::string* encoded_seq_value = nullptr); Status read_key_by_rowid(uint32_t row_id, std::string* key); diff --git a/be/src/olap/rowset/segment_v2/segment_writer.cpp b/be/src/olap/rowset/segment_v2/segment_writer.cpp index 88dd714e7d64af..37dc5aa5aa47e2 100644 --- a/be/src/olap/rowset/segment_v2/segment_writer.cpp +++ b/be/src/olap/rowset/segment_v2/segment_writer.cpp @@ -712,80 +712,94 @@ Status SegmentWriter::append_block_with_flexible_partial_content(const vectorize DCHECK(_tablet_schema->has_skip_bitmap_col()); auto skip_bitmap_col_idx = _tablet_schema->skip_bitmap_col_idx(); - std::vector* skip_bitmaps = - &(assert_cast( - block->get_by_position(skip_bitmap_col_idx).column->assume_mutable().get()) - ->get_data()); + auto get_skip_bitmaps = [&skip_bitmap_col_idx](const vectorized::Block* block) { + return &(assert_cast( + block->get_by_position(skip_bitmap_col_idx).column->assume_mutable().get()) + ->get_data()); + }; + std::vector* skip_bitmaps = get_skip_bitmaps(block); bool has_default_or_nullable = false; std::vector use_default_or_null_flag; use_default_or_null_flag.reserve(num_rows); - int32_t seq_col_unique_id = -1; int32_t seq_map_col_unique_id = _opts.rowset_ctx->partial_update_info->sequence_map_col_uid(); - bool schema_has_sequence_col = _tablet_schema->has_sequence_col(); - - const auto* delete_sign_column_data = - BaseTablet::get_delete_sign_column_data(*block, row_pos + num_rows); - DCHECK(delete_sign_column_data != nullptr); - int32_t delete_sign_col_unique_id = - _tablet_schema->column(_tablet_schema->delete_sign_idx()).unique_id(); DBUG_EXECUTE_IF("VerticalSegmentWriter._append_block_with_flexible_partial_content.sleep", { sleep(60); }) const std::vector& specified_rowsets = _mow_context->rowset_ptrs; std::vector> segment_caches(specified_rowsets.size()); + std::vector key_columns {}; + vectorized::IOlapColumnDataAccessor* seq_column {nullptr}; + + auto encode_key_columns = + [&full_block, block, row_pos, num_rows, + this](std::vector& key_columns) -> Status { + key_columns.clear(); + for (std::size_t cid {0}; cid < _num_sort_key_columns; cid++) { + full_block.replace_by_position(cid, block->get_by_position(cid).column); + RETURN_IF_ERROR(_olap_data_convertor->set_source_content_with_specifid_column( + full_block.get_by_position(cid), row_pos, num_rows, cid)); + auto [status, column] = _olap_data_convertor->convert_column_data(cid); + if (!status.ok()) { + return status; + } + key_columns.push_back(column); + } + return Status::OK(); + }; + + auto encode_seq_column = [row_pos, num_rows, block, + this](vectorized::IOlapColumnDataAccessor*& seq_column) -> Status { + seq_column = nullptr; + if (_tablet_schema->has_sequence_col()) { + auto seq_col_idx = _tablet_schema->sequence_col_idx(); + RETURN_IF_ERROR(_olap_data_convertor->set_source_content_with_specifid_column( + block->get_by_position(seq_col_idx), row_pos, num_rows, seq_col_idx)); + auto [status, column] = _olap_data_convertor->convert_column_data(seq_col_idx); + if (!status.ok()) { + return status; + } + seq_column = column; + } + return Status::OK(); + }; + // 1. encode key columns // we can only encode sort key columns currently becasue all non-key columns in flexible partial update // can have missing cells - std::vector key_columns; - for (std::size_t cid {0}; cid < _num_sort_key_columns; cid++) { - full_block.replace_by_position(cid, block->get_by_position(cid).column); - RETURN_IF_ERROR(_olap_data_convertor->set_source_content_with_specifid_column( - full_block.get_by_position(cid), row_pos, num_rows, cid)); - auto [status, column] = _olap_data_convertor->convert_column_data(cid); - if (!status.ok()) { - return status; - } - key_columns.push_back(column); - } + RETURN_IF_ERROR(encode_key_columns(key_columns)); + + // 2. encode sequence column + // We encode the seguence column even thought it may have invalid values in some rows because we need to + // encode the value of sequence column in key for rows that have a valid value in sequence column during + // lookup_raw_key. We will encode the sequence column again at the end of this method. At that time, we have + // a valid sequence column to encode the key with seq col. + RETURN_IF_ERROR(encode_seq_column(seq_column)); + // 3. merge duplicate rows when table has sequence column // When there are multiple rows with the same keys in memtable, some of them specify specify the sequence column, // some of them don't. We can't do the de-duplication in memtable. We must de-duplicate them here. - if (schema_has_sequence_col) { + if (_tablet_schema->has_sequence_col()) { std::size_t origin_rows = num_rows; - seq_col_unique_id = _tablet_schema->column(_tablet_schema->sequence_col_idx()).unique_id(); RETURN_IF_ERROR(merge_rows_for_sequence_column(block, row_pos, num_rows, skip_bitmaps, - key_columns, specified_rowsets, + key_columns, seq_column, specified_rowsets, segment_caches)); if (origin_rows != num_rows) { - // data in block has changed, should re-encode key columns and re-get skip_bitmaps and delete_sign_column_data + // data in block has changed, should re-encode key columns, sequence column and re-get skip_bitmaps _olap_data_convertor->clear_source_content(); - key_columns.clear(); - for (std::size_t cid {0}; cid < _num_sort_key_columns; cid++) { - full_block.replace_by_position(cid, block->get_by_position(cid).column); - RETURN_IF_ERROR(_olap_data_convertor->set_source_content_with_specifid_column( - full_block.get_by_position(cid), row_pos, num_rows, cid)); - auto [status, column] = _olap_data_convertor->convert_column_data(cid); - if (!status.ok()) { - return status; - } - key_columns.push_back(column); - } - skip_bitmaps = &(assert_cast( - block->get_by_position(skip_bitmap_col_idx) - .column->assume_mutable() - .get()) - ->get_data()); - - delete_sign_column_data = - BaseTablet::get_delete_sign_column_data(*block, row_pos + num_rows); - DCHECK(delete_sign_column_data != nullptr); + RETURN_IF_ERROR(encode_key_columns(key_columns)); + RETURN_IF_ERROR(encode_seq_column(seq_column)); + skip_bitmaps = get_skip_bitmaps(block); } } - // write key columns data + const auto* delete_sign_column_data = + BaseTablet::get_delete_sign_column_data(*block, row_pos + num_rows); + DCHECK(delete_sign_column_data != nullptr); + + // 4. write key columns data for (std::size_t cid {0}; cid < _num_sort_key_columns; cid++) { const auto& column = key_columns[cid]; DCHECK(_column_writers[cid]->get_next_rowid() == _num_rows_written); @@ -794,34 +808,14 @@ Status SegmentWriter::append_block_with_flexible_partial_content(const vectorize DCHECK(_column_writers[cid]->get_next_rowid() == _num_rows_written + num_rows); } - // 2. encode sequence column - // We encode the seguence column even thought it may have invalid values in some rows because we need to - // encode the value of sequence column in key for rows that have a valid value in sequence column during - // lookup_raw_key. We will encode the sequence column again at the end of this method. At that time, we have - // a valid sequence column to encode the key with seq col. - vectorized::IOlapColumnDataAccessor* seq_column = nullptr; - if (schema_has_sequence_col) { - auto seq_col_idx = _tablet_schema->sequence_col_idx(); - seq_col_unique_id = _tablet_schema->column(seq_col_idx).unique_id(); - RETURN_IF_ERROR(_olap_data_convertor->set_source_content_with_specifid_column( - block->get_by_position(seq_col_idx), row_pos, num_rows, seq_col_idx)); - auto [status, column] = _olap_data_convertor->convert_column_data(seq_col_idx); - if (!status.ok()) { - return status; - } - seq_column = column; - } - - FlexibleReadPlan read_plan; - bool has_row_column = _tablet_schema->has_row_store_for_all_columns(); - read_plan.set_row_store(has_row_column); - + // 5. genreate read plan + FlexibleReadPlan read_plan {_tablet_schema->has_row_store_for_all_columns()}; PartialUpdateStats stats; RETURN_IF_ERROR(generate_flexible_read_plan( - read_plan, row_pos, num_rows, segment_start_pos, schema_has_sequence_col, - seq_col_unique_id, seq_map_col_unique_id, delete_sign_col_unique_id, skip_bitmaps, - key_columns, seq_column, delete_sign_column_data, specified_rowsets, segment_caches, - has_default_or_nullable, use_default_or_null_flag, stats)); + read_plan, row_pos, num_rows, segment_start_pos, _tablet_schema->has_sequence_col(), + seq_map_col_unique_id, skip_bitmaps, key_columns, seq_column, delete_sign_column_data, + specified_rowsets, segment_caches, has_default_or_nullable, use_default_or_null_flag, + stats)); CHECK_EQ(use_default_or_null_flag.size(), num_rows); if (config::enable_merge_on_write_correctness_check) { @@ -829,7 +823,7 @@ Status SegmentWriter::append_block_with_flexible_partial_content(const vectorize _mow_context->rowset_ids); } - // read to fill full_block + // 6. read according plan to fill full_block RETURN_IF_ERROR(read_plan.fill_non_primary_key_columns( _opts.rowset_ctx, _rsid_to_rowset, *_tablet_schema, full_block, use_default_or_null_flag, has_default_or_nullable, segment_start_pos, row_pos, block, @@ -838,11 +832,10 @@ Status SegmentWriter::append_block_with_flexible_partial_content(const vectorize // TODO(bobhan1): should we replace the skip bitmap column with empty bitmaps to reduce storage occupation? // this column is not needed in read path for merge-on-write table - // row column should be filled here - // convert block to row store format + // 7. fill row store column _serialize_block_to_row_column(full_block); - // encode all non-key columns(including sequence column if exists) and append to column_writers + // 8. encode and write all non-primary key columns(including sequence column if exists) for (auto cid = _num_sort_key_columns; cid < _tablet_schema->num_columns(); cid++) { RETURN_IF_ERROR(_olap_data_convertor->set_source_content_with_specifid_column( full_block.get_by_position(cid), row_pos, num_rows, cid)); @@ -873,7 +866,7 @@ Status SegmentWriter::append_block_with_flexible_partial_content(const vectorize _num_rows_written, row_pos, _primary_key_index_builder->num_rows()); } - // build primary key index + // 9. build primary key index RETURN_IF_ERROR( _generate_primary_key_index(_key_coders, key_columns, seq_column, num_rows, false)); @@ -887,14 +880,18 @@ Status SegmentWriter::append_block_with_flexible_partial_content(const vectorize Status SegmentWriter::generate_flexible_read_plan( FlexibleReadPlan& read_plan, size_t row_pos, size_t num_rows, size_t segment_start_pos, - bool schema_has_sequence_col, int32_t seq_col_unique_id, int32_t seq_map_col_unique_id, - int32_t delete_sign_col_unique_id, std::vector* skip_bitmaps, + bool schema_has_sequence_col, int32_t seq_map_col_unique_id, + std::vector* skip_bitmaps, const std::vector& key_columns, vectorized::IOlapColumnDataAccessor* seq_column, const signed char* delete_sign_column_data, const std::vector& specified_rowsets, std::vector>& segment_caches, bool& has_default_or_nullable, std::vector& use_default_or_null_flag, PartialUpdateStats& stats) { + int32_t delete_sign_col_unique_id = + _tablet_schema->column(_tablet_schema->delete_sign_idx()).unique_id(); + int32_t seq_col_unique_id = + _tablet_schema->column(_tablet_schema->sequence_col_idx()).unique_id(); for (size_t block_pos = row_pos; block_pos < row_pos + num_rows; block_pos++) { size_t delta_pos = block_pos - row_pos; size_t segment_pos = segment_start_pos + delta_pos; @@ -940,16 +937,17 @@ Status SegmentWriter::merge_rows_for_sequence_column( const vectorized::Block* block, size_t row_pos, size_t& num_rows, std::vector* skip_bitmaps, const std::vector& key_columns, + vectorized::IOlapColumnDataAccessor* seq_column, const std::vector& specified_rowsets, std::vector>& segment_caches) { auto seq_col_unique_id = _tablet_schema->column(_tablet_schema->sequence_col_idx()).unique_id(); - FixedReadPlan read_plan; - std::unordered_map neighber_index; std::string previous_key {}; bool previous_has_seq_col {false}; - std::set use_default; int duplicate_keys {0}; + auto filter_column = vectorized::ColumnUInt8::create(num_rows, 1); + auto* __restrict filter_map = filter_column->get_data().data(); + for (size_t block_pos = row_pos; block_pos < row_pos + num_rows; block_pos++) { size_t delta_pos = block_pos - row_pos; auto& skip_bitmap = skip_bitmaps->at(block_pos); @@ -961,60 +959,47 @@ Status SegmentWriter::merge_rows_for_sequence_column( ++duplicate_keys; RowLocation loc; RowsetSharedPtr rowset; - size_t row_index {}; - size_t neighber_idx {}; + size_t rid_missing_seq {}; + size_t rid_with_seq {}; if (row_has_sequence_col) { - row_index = block_pos - 1; - neighber_idx = block_pos; + rid_missing_seq = block_pos - 1; + rid_with_seq = block_pos; } else { - row_index = block_pos; - neighber_idx = block_pos - 1; + rid_missing_seq = block_pos; + rid_with_seq = block_pos - 1; } + std::string previous_encoded_seq_value {}; + st = _tablet->lookup_row_key(key, _tablet_schema.get(), false, specified_rowsets, &loc, - _mow_context->max_version, segment_caches, &rowset); + _mow_context->max_version, segment_caches, &rowset, true, + &previous_encoded_seq_value); + DCHECK(st.is() || st.ok()); + + Slice previous_seq_slice {}; if (st.is()) { - use_default.insert(block_pos); + // TODO: handle default value } else { _rsid_to_rowset.emplace(rowset->rowset_id(), rowset); - read_plan.prepare_to_read(loc, row_index); } - neighber_index[row_index] = neighber_idx; - } - previous_key = std::move(key); - previous_has_seq_col = row_has_sequence_col; - } - if (duplicate_keys > 0) { - auto seq_col_idx = _tablet_schema->sequence_col_idx(); - std::vector cids {static_cast(seq_col_idx)}; - auto tmp_block = _tablet_schema->create_block_by_cids(cids); - std::map read_index; - RETURN_IF_ERROR(read_plan.read_columns_by_plan(*_tablet_schema, cids, _rsid_to_rowset, - tmp_block, &read_index)); - auto filter_column = vectorized::ColumnUInt8::create(num_rows, 1); - auto* __restrict filter_map = filter_column->get_data().data(); - auto seq_col_default_value = - _opts.rowset_ctx->partial_update_info - ->default_values[seq_col_idx - _tablet_schema->num_key_columns()]; - for (auto [idx, neighber] : neighber_index) { - Slice cur_seq_value {}; - if (use_default.contains(idx)) { - cur_seq_value = Slice(seq_col_default_value.data(), seq_col_default_value.size()); - } else { - cur_seq_value = tmp_block.get_by_position(0) - .column->get_data_at(read_index[idx]) - .to_slice(); - } - Slice neighber_seq_value = - block->get_by_position(seq_col_idx).column->get_data_at(neighber).to_slice(); - int res = cur_seq_value.compare(neighber_seq_value); + std::string cur_encoded_seq_value {}; + _encode_seq_column(seq_column, rid_with_seq, &cur_encoded_seq_value); + int res = Slice {previous_encoded_seq_value}.compare(Slice {cur_encoded_seq_value}); + LOG_INFO( + "SegmentWriter::merge_rows_for_sequence_column: rid_with_seq={}, " + "rid_missing_seq={}, res={}", + rid_with_seq, rid_missing_seq, res); if (res > 0) { - filter_map[neighber] = 0; + filter_map[rid_with_seq] = 0; } else if (res < 0) { - filter_map[idx] = 0; + filter_map[rid_missing_seq] = 0; } else { - filter_map[std::min(idx, neighber)] = 0; + filter_map[std::min(rid_with_seq, rid_missing_seq)] = 0; } } + previous_key = std::move(key); + previous_has_seq_col = row_has_sequence_col; + } + if (duplicate_keys > 0) { auto num_cols = block->columns(); auto* mutable_block = const_cast(block); mutable_block->insert({std::move(filter_column), @@ -1022,10 +1007,14 @@ Status SegmentWriter::merge_rows_for_sequence_column( "__dup_key_filter_col__"}); RETURN_IF_ERROR(vectorized::Block::filter_block(mutable_block, num_cols, num_cols)); int merged_rows = num_rows - mutable_block->rows(); - DCHECK_EQ(duplicate_keys, merged_rows) - << "duplicate_keys != merged_rows, duplicate_keys=" << duplicate_keys - << ", merged_rows=" << merged_rows << ", num_rows=" << num_rows - << ", mutable_block->rows()=" << mutable_block->rows(); + if (duplicate_keys != merged_rows) { + auto msg = fmt::format( + "duplicate_keys != merged_rows, duplicate_keys={}, merged_rows={}, " + "num_rows={}, mutable_block->rows()={}", + duplicate_keys, merged_rows, num_rows, block->rows()); + DCHECK(false) << msg; + return Status::InternalError(msg); + } num_rows = mutable_block->rows(); } return Status::OK(); diff --git a/be/src/olap/rowset/segment_v2/segment_writer.h b/be/src/olap/rowset/segment_v2/segment_writer.h index 837b54d5ff3d5e..3e0ae13fa272b3 100644 --- a/be/src/olap/rowset/segment_v2/segment_writer.h +++ b/be/src/olap/rowset/segment_v2/segment_writer.h @@ -111,8 +111,8 @@ class SegmentWriter { size_t row_pos, size_t num_rows); Status generate_flexible_read_plan( FlexibleReadPlan& read_plan, size_t row_pos, size_t num_rows, size_t segment_start_pos, - bool schema_has_sequence_col, int32_t seq_col_unique_id, int32_t seq_map_col_unique_id, - int32_t delete_sign_col_unique_id, std::vector* skip_bitmaps, + bool schema_has_sequence_col, int32_t seq_map_col_unique_id, + std::vector* skip_bitmaps, const std::vector& key_columns, vectorized::IOlapColumnDataAccessor* seq_column, const signed char* delete_sign_column_data, @@ -124,6 +124,7 @@ class SegmentWriter { const vectorized::Block* block, size_t row_pos, size_t& num_rows, std::vector* skip_bitmaps, const std::vector& key_columns, + vectorized::IOlapColumnDataAccessor* seq_column, const std::vector& specified_rowsets, std::vector>& segment_caches); Status append_block_with_variant_subcolumns(vectorized::Block& data); diff --git a/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp b/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp index f1d64750e71062..7ebb9242fd57ce 100644 --- a/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp +++ b/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp @@ -575,80 +575,96 @@ Status VerticalSegmentWriter::_append_block_with_flexible_partial_content( DCHECK(_tablet_schema->has_skip_bitmap_col()); auto skip_bitmap_col_idx = _tablet_schema->skip_bitmap_col_idx(); - std::vector* skip_bitmaps = &( - assert_cast( - data.block->get_by_position(skip_bitmap_col_idx).column->assume_mutable().get()) - ->get_data()); + + auto get_skip_bitmaps = [&skip_bitmap_col_idx](const vectorized::Block* block) { + return &(assert_cast( + block->get_by_position(skip_bitmap_col_idx).column->assume_mutable().get()) + ->get_data()); + }; + std::vector* skip_bitmaps = get_skip_bitmaps(data.block); bool has_default_or_nullable = false; std::vector use_default_or_null_flag; use_default_or_null_flag.reserve(data.num_rows); - int32_t seq_col_unique_id = -1; int32_t seq_map_col_unique_id = _opts.rowset_ctx->partial_update_info->sequence_map_col_uid(); bool schema_has_sequence_col = _tablet_schema->has_sequence_col(); - const auto* delete_sign_column_data = - BaseTablet::get_delete_sign_column_data(*data.block, data.row_pos + data.num_rows); - DCHECK(delete_sign_column_data != nullptr); - int32_t delete_sign_col_unique_id = - _tablet_schema->column(_tablet_schema->delete_sign_idx()).unique_id(); - DBUG_EXECUTE_IF("VerticalSegmentWriter._append_block_with_flexible_partial_content.sleep", { sleep(60); }) const std::vector& specified_rowsets = _mow_context->rowset_ptrs; std::vector> segment_caches(specified_rowsets.size()); + std::vector key_columns {}; + vectorized::IOlapColumnDataAccessor* seq_column {nullptr}; + + auto encode_key_columns = + [&full_block, &data, + this](std::vector& key_columns) -> Status { + key_columns.clear(); + for (std::size_t cid {0}; cid < _num_sort_key_columns; cid++) { + full_block.replace_by_position(cid, data.block->get_by_position(cid).column); + RETURN_IF_ERROR(_olap_data_convertor->set_source_content_with_specifid_column( + full_block.get_by_position(cid), data.row_pos, data.num_rows, cid)); + auto [status, column] = _olap_data_convertor->convert_column_data(cid); + if (!status.ok()) { + return status; + } + key_columns.push_back(column); + } + return Status::OK(); + }; + + auto encode_seq_column = [&data, &schema_has_sequence_col, + this](vectorized::IOlapColumnDataAccessor*& seq_column) -> Status { + seq_column = nullptr; + if (schema_has_sequence_col) { + auto seq_col_idx = _tablet_schema->sequence_col_idx(); + RETURN_IF_ERROR(_olap_data_convertor->set_source_content_with_specifid_column( + data.block->get_by_position(seq_col_idx), data.row_pos, data.num_rows, + seq_col_idx)); + auto [status, column] = _olap_data_convertor->convert_column_data(seq_col_idx); + if (!status.ok()) { + return status; + } + seq_column = column; + } + return Status::OK(); + }; + // 1. encode key columns // we can only encode sort key columns currently becasue all non-key columns in flexible partial update // can have missing cells - std::vector key_columns; - for (std::size_t cid {0}; cid < _num_sort_key_columns; cid++) { - full_block.replace_by_position(cid, data.block->get_by_position(cid).column); - RETURN_IF_ERROR(_olap_data_convertor->set_source_content_with_specifid_column( - full_block.get_by_position(cid), data.row_pos, data.num_rows, cid)); - auto [status, column] = _olap_data_convertor->convert_column_data(cid); - if (!status.ok()) { - return status; - } - key_columns.push_back(column); - } + RETURN_IF_ERROR(encode_key_columns(key_columns)); + + // 2. encode sequence column + // We encode the seguence column even thought it may have invalid values in some rows because we need to + // encode the value of sequence column in key for rows that have a valid value in sequence column during + // lookup_raw_key. We will encode the sequence column again at the end of this method. At that time, we have + // a valid sequence column to encode the key with seq col. + RETURN_IF_ERROR(encode_seq_column(seq_column)); + // 3. merge duplicate rows when table has sequence column // When there are multiple rows with the same keys in memtable, some of them specify specify the sequence column, // some of them don't. We can't do the de-duplication in memtable. We must de-duplicate them here. if (schema_has_sequence_col) { std::size_t origin_rows = data.num_rows; - seq_col_unique_id = _tablet_schema->column(_tablet_schema->sequence_col_idx()).unique_id(); - RETURN_IF_ERROR(_merge_rows_for_sequence_column(data, skip_bitmaps, key_columns, + RETURN_IF_ERROR(_merge_rows_for_sequence_column(data, skip_bitmaps, key_columns, seq_column, specified_rowsets, segment_caches)); if (origin_rows != data.num_rows) { - // data in block has changed, should re-encode key columns and re-get skip_bitmaps and delete_sign_column_data + // data in block has changed, should re-encode key columns, sequence column and re-get skip_bitmaps _olap_data_convertor->clear_source_content(); - key_columns.clear(); - for (std::size_t cid {0}; cid < _num_sort_key_columns; cid++) { - full_block.replace_by_position(cid, data.block->get_by_position(cid).column); - RETURN_IF_ERROR(_olap_data_convertor->set_source_content_with_specifid_column( - full_block.get_by_position(cid), data.row_pos, data.num_rows, cid)); - auto [status, column] = _olap_data_convertor->convert_column_data(cid); - if (!status.ok()) { - return status; - } - key_columns.push_back(column); - } - - skip_bitmaps = &(assert_cast( - data.block->get_by_position(skip_bitmap_col_idx) - .column->assume_mutable() - .get()) - ->get_data()); - - delete_sign_column_data = BaseTablet::get_delete_sign_column_data( - *data.block, data.row_pos + data.num_rows); - DCHECK(delete_sign_column_data != nullptr); + RETURN_IF_ERROR(encode_key_columns(key_columns)); + RETURN_IF_ERROR(encode_seq_column(seq_column)); + skip_bitmaps = get_skip_bitmaps(data.block); } } - // write key columns data + const auto* delete_sign_column_data = + BaseTablet::get_delete_sign_column_data(*data.block, data.row_pos + data.num_rows); + DCHECK(delete_sign_column_data != nullptr); + + // 4. write key columns data for (std::size_t cid {0}; cid < _num_sort_key_columns; cid++) { const auto& column = key_columns[cid]; DCHECK(_column_writers[cid]->get_next_rowid() == _num_rows_written); @@ -657,35 +673,13 @@ Status VerticalSegmentWriter::_append_block_with_flexible_partial_content( DCHECK(_column_writers[cid]->get_next_rowid() == _num_rows_written + data.num_rows); } - // 2. encode sequence column - // We encode the seguence column even thought it may have invalid values in some rows because we need to - // encode the value of sequence column in key for rows that have a valid value in sequence column during - // lookup_raw_key. We will encode the sequence column again at the end of this method. At that time, we have - // a valid sequence column to encode the key with seq col. - vectorized::IOlapColumnDataAccessor* seq_column = nullptr; - if (schema_has_sequence_col) { - auto seq_col_idx = _tablet_schema->sequence_col_idx(); - seq_col_unique_id = _tablet_schema->column(seq_col_idx).unique_id(); - RETURN_IF_ERROR(_olap_data_convertor->set_source_content_with_specifid_column( - data.block->get_by_position(seq_col_idx), data.row_pos, data.num_rows, - seq_col_idx)); - auto [status, column] = _olap_data_convertor->convert_column_data(seq_col_idx); - if (!status.ok()) { - return status; - } - seq_column = column; - } - - FlexibleReadPlan read_plan; - bool has_row_column = _tablet_schema->has_row_store_for_all_columns(); - read_plan.set_row_store(has_row_column); - + // 5. genreate read plan + FlexibleReadPlan read_plan {_tablet_schema->has_row_store_for_all_columns()}; PartialUpdateStats stats; RETURN_IF_ERROR(_generate_flexible_read_plan( - read_plan, data, segment_start_pos, schema_has_sequence_col, seq_col_unique_id, - seq_map_col_unique_id, delete_sign_col_unique_id, skip_bitmaps, key_columns, seq_column, - delete_sign_column_data, specified_rowsets, segment_caches, has_default_or_nullable, - use_default_or_null_flag, stats)); + read_plan, data, segment_start_pos, schema_has_sequence_col, seq_map_col_unique_id, + skip_bitmaps, key_columns, seq_column, delete_sign_column_data, specified_rowsets, + segment_caches, has_default_or_nullable, use_default_or_null_flag, stats)); CHECK_EQ(use_default_or_null_flag.size(), data.num_rows); if (config::enable_merge_on_write_correctness_check) { @@ -693,7 +687,7 @@ Status VerticalSegmentWriter::_append_block_with_flexible_partial_content( _mow_context->rowset_ids); } - // read to fill full_block + // 6. read according plan to fill full_block RETURN_IF_ERROR(read_plan.fill_non_primary_key_columns( _opts.rowset_ctx, _rsid_to_rowset, *_tablet_schema, full_block, use_default_or_null_flag, has_default_or_nullable, segment_start_pos, data.row_pos, @@ -702,11 +696,10 @@ Status VerticalSegmentWriter::_append_block_with_flexible_partial_content( // TODO(bobhan1): should we replace the skip bitmap column with empty bitmaps to reduce storage occupation? // this column is not needed in read path for merge-on-write table - // row column should be filled here - // convert block to row store format + // 7. fill row store column _serialize_block_to_row_column(full_block); - // encode all non-key columns(including sequence column if exists) and append to column_writers + // 8. encode and write all non-primary key columns(including sequence column if exists) for (auto cid = _num_sort_key_columns; cid < _tablet_schema->num_columns(); cid++) { RETURN_IF_ERROR(_olap_data_convertor->set_source_content_with_specifid_column( full_block.get_by_position(cid), data.row_pos, data.num_rows, cid)); @@ -737,7 +730,7 @@ Status VerticalSegmentWriter::_append_block_with_flexible_partial_content( _num_rows_written, data.row_pos, _primary_key_index_builder->num_rows()); } - // build primary key index + // 9. build primary key index RETURN_IF_ERROR(_generate_primary_key_index(_key_coders, key_columns, seq_column, data.num_rows, false)); @@ -751,14 +744,18 @@ Status VerticalSegmentWriter::_append_block_with_flexible_partial_content( Status VerticalSegmentWriter::_generate_flexible_read_plan( FlexibleReadPlan& read_plan, RowsInBlock& data, size_t segment_start_pos, - bool schema_has_sequence_col, int32_t seq_col_unique_id, int32_t seq_map_col_unique_id, - int32_t delete_sign_col_unique_id, std::vector* skip_bitmaps, + bool schema_has_sequence_col, int32_t seq_map_col_unique_id, + std::vector* skip_bitmaps, const std::vector& key_columns, vectorized::IOlapColumnDataAccessor* seq_column, const signed char* delete_sign_column_data, const std::vector& specified_rowsets, std::vector>& segment_caches, bool& has_default_or_nullable, std::vector& use_default_or_null_flag, PartialUpdateStats& stats) { + int32_t delete_sign_col_unique_id = + _tablet_schema->column(_tablet_schema->delete_sign_idx()).unique_id(); + int32_t seq_col_unique_id = + _tablet_schema->column(_tablet_schema->sequence_col_idx()).unique_id(); for (size_t block_pos = data.row_pos; block_pos < data.row_pos + data.num_rows; block_pos++) { size_t delta_pos = block_pos - data.row_pos; size_t segment_pos = segment_start_pos + delta_pos; @@ -803,18 +800,19 @@ Status VerticalSegmentWriter::_generate_flexible_read_plan( Status VerticalSegmentWriter::_merge_rows_for_sequence_column( RowsInBlock& data, std::vector* skip_bitmaps, const std::vector& key_columns, + vectorized::IOlapColumnDataAccessor* seq_column, const std::vector& specified_rowsets, std::vector>& segment_caches) { LOG_INFO("VerticalSegmentWriter::_merge_rows_for_sequence_column enter: data.block:{}\n", data.block->dump_data()); auto seq_col_unique_id = _tablet_schema->column(_tablet_schema->sequence_col_idx()).unique_id(); - FixedReadPlan read_plan; - std::unordered_map neighber_index; std::string previous_key {}; bool previous_has_seq_col {false}; - std::set use_default; int duplicate_keys {0}; + auto filter_column = vectorized::ColumnUInt8::create(data.num_rows, 1); + auto* __restrict filter_map = filter_column->get_data().data(); + for (size_t block_pos = data.row_pos; block_pos < data.row_pos + data.num_rows; block_pos++) { size_t delta_pos = block_pos - data.row_pos; auto& skip_bitmap = skip_bitmaps->at(block_pos); @@ -826,70 +824,46 @@ Status VerticalSegmentWriter::_merge_rows_for_sequence_column( ++duplicate_keys; RowLocation loc; RowsetSharedPtr rowset; - size_t row_index {}; - size_t neighber_idx {}; + size_t rid_missing_seq {}; + size_t rid_with_seq {}; if (row_has_sequence_col) { - row_index = block_pos - 1; - neighber_idx = block_pos; + rid_missing_seq = block_pos - 1; + rid_with_seq = block_pos; } else { - row_index = block_pos; - neighber_idx = block_pos - 1; + rid_missing_seq = block_pos; + rid_with_seq = block_pos - 1; } + std::string previous_encoded_seq_value {}; st = _tablet->lookup_row_key(key, _tablet_schema.get(), false, specified_rowsets, &loc, - _mow_context->max_version, segment_caches, &rowset); + _mow_context->max_version, segment_caches, &rowset, true, + &previous_encoded_seq_value); + DCHECK(st.is() || st.ok()); + + Slice previous_seq_slice {}; if (st.is()) { - use_default.insert(block_pos); + // TODO: handle default value } else { _rsid_to_rowset.emplace(rowset->rowset_id(), rowset); - read_plan.prepare_to_read(loc, row_index); - LOG_INFO( - "VerticalSegmentWriter::_merge_rows_for_sequence_column, prepare to read, " - "block_pos={}, row_index={}, neighber_idx={}, row_has_sequence_col={}", - block_pos, row_index, neighber_idx, row_has_sequence_col); } - - neighber_index[row_index] = neighber_idx; - } - previous_key = std::move(key); - previous_has_seq_col = row_has_sequence_col; - } - if (duplicate_keys > 0) { - auto seq_col_idx = _tablet_schema->sequence_col_idx(); - std::vector cids {static_cast(seq_col_idx)}; - auto tmp_block = _tablet_schema->create_block_by_cids(cids); - std::map read_index; - RETURN_IF_ERROR(read_plan.read_columns_by_plan(*_tablet_schema, cids, _rsid_to_rowset, - tmp_block, &read_index)); - auto filter_column = vectorized::ColumnUInt8::create(data.num_rows, 1); - auto* __restrict filter_map = filter_column->get_data().data(); - auto seq_col_default_value = - _opts.rowset_ctx->partial_update_info - ->default_values[seq_col_idx - _tablet_schema->num_key_columns()]; - for (auto [idx, neighber] : neighber_index) { - Slice cur_seq_value {}; - if (use_default.contains(idx)) { - cur_seq_value = Slice(seq_col_default_value.data(), seq_col_default_value.size()); - } else { - cur_seq_value = tmp_block.get_by_position(0) - .column->get_data_at(read_index[idx]) - .to_slice(); - } - Slice neighber_seq_value = data.block->get_by_position(seq_col_idx) - .column->get_data_at(neighber) - .to_slice(); - int res = cur_seq_value.compare(neighber_seq_value); + std::string cur_encoded_seq_value {}; + _encode_seq_column(seq_column, rid_with_seq, &cur_encoded_seq_value); + int res = Slice {previous_encoded_seq_value}.compare(Slice {cur_encoded_seq_value}); LOG_INFO( - "VerticalSegmentWriter::_merge_rows_for_sequence_column: idx={}, neighber={}, " - "res={}", - idx, neighber, res); + "VerticalSegmentWriter::_merge_rows_for_sequence_column: rid_with_seq={}, " + "rid_missing_seq={}, res={}", + rid_with_seq, rid_missing_seq, res); if (res > 0) { - filter_map[neighber] = 0; + filter_map[rid_with_seq] = 0; } else if (res < 0) { - filter_map[idx] = 0; + filter_map[rid_missing_seq] = 0; } else { - filter_map[std::min(idx, neighber)] = 0; + filter_map[std::min(rid_with_seq, rid_missing_seq)] = 0; } } + previous_key = std::move(key); + previous_has_seq_col = row_has_sequence_col; + } + if (duplicate_keys > 0) { auto num_cols = data.block->columns(); auto* block = const_cast(data.block); block->insert({std::move(filter_column), std::make_shared(), @@ -900,11 +874,14 @@ Status VerticalSegmentWriter::_merge_rows_for_sequence_column( "VerticalSegmentWriter::_merge_rows_for_sequence_column after filter: " "data.block:{}\n", data.block->dump_data()); - DCHECK_EQ(duplicate_keys, merged_rows) - << "duplicate_keys != merged_rows, duplicate_keys=" << duplicate_keys - << ", merged_rows=" << merged_rows << ", num_rows=" << data.num_rows - << ", mutable_block->rows()=" << block->rows(); - + if (duplicate_keys != merged_rows) { + auto msg = fmt::format( + "duplicate_keys != merged_rows, duplicate_keys={}, merged_rows={}, " + "num_rows={}, mutable_block->rows()={}", + duplicate_keys, merged_rows, data.num_rows, block->rows()); + DCHECK(false) << msg; + return Status::InternalError(msg); + } data.num_rows = block->rows(); } return Status::OK(); diff --git a/be/src/olap/rowset/segment_v2/vertical_segment_writer.h b/be/src/olap/rowset/segment_v2/vertical_segment_writer.h index 81e89955abd4d5..e6a6be5f9f73e1 100644 --- a/be/src/olap/rowset/segment_v2/vertical_segment_writer.h +++ b/be/src/olap/rowset/segment_v2/vertical_segment_writer.h @@ -172,8 +172,8 @@ class VerticalSegmentWriter { vectorized::Block& full_block); Status _generate_flexible_read_plan( FlexibleReadPlan& read_plan, RowsInBlock& data, size_t segment_start_pos, - bool schema_has_sequence_col, int32_t seq_col_unique_id, int32_t seq_map_col_unique_id, - int32_t delete_sign_col_unique_id, std::vector* skip_bitmaps, + bool schema_has_sequence_col, int32_t seq_map_col_unique_id, + std::vector* skip_bitmaps, const std::vector& key_columns, vectorized::IOlapColumnDataAccessor* seq_column, const signed char* delete_sign_column_data, @@ -184,6 +184,7 @@ class VerticalSegmentWriter { Status _merge_rows_for_sequence_column( RowsInBlock& data, std::vector* skip_bitmaps, const std::vector& key_columns, + vectorized::IOlapColumnDataAccessor* seq_column, const std::vector& specified_rowsets, std::vector>& segment_caches); Status _append_block_with_variant_subcolumns(RowsInBlock& data);