Skip to content

Commit

Permalink
support vertical compaction merge sparse
Browse files Browse the repository at this point in the history
  • Loading branch information
eldenmoon committed Feb 26, 2025
1 parent 5db93b9 commit abadd08
Show file tree
Hide file tree
Showing 33 changed files with 1,177 additions and 287 deletions.
2 changes: 1 addition & 1 deletion be/src/cloud/cloud_base_compaction.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ class CloudBaseCompaction : public CloudCompactionMixin {

void _filter_input_rowset();

void build_basic_info();
Status build_basic_info();

ReaderType compaction_type() const override { return ReaderType::READER_BASE_COMPACTION; }

Expand Down
5 changes: 4 additions & 1 deletion be/src/cloud/cloud_rowset_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,10 @@ Status CloudRowsetWriter::init(const RowsetWriterContext& rowset_writer_context)
DCHECK_NE(_context.newest_write_timestamp, -1);
_rowset_meta->set_newest_write_timestamp(_context.newest_write_timestamp);
}
_rowset_meta->set_tablet_schema(_context.tablet_schema);
auto schema = _context.tablet_schema->need_record_variant_extended_schema()
? _context.tablet_schema
: _context.tablet_schema->copy_without_variant_extracted_columns();
_rowset_meta->set_tablet_schema(schema);
_context.segment_collector = std::make_shared<SegmentCollectorT<BaseBetaRowsetWriter>>(this);
_context.file_writer_creator = std::make_shared<FileWriterCreatorT<BaseBetaRowsetWriter>>(this);
return Status::OK();
Expand Down
25 changes: 25 additions & 0 deletions be/src/olap/base_tablet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
#include "olap/delete_bitmap_calculator.h"
#include "olap/iterators.h"
#include "olap/memtable.h"
#include "olap/olap_common.h"
#include "olap/partial_update_info.h"
#include "olap/primary_key_index.h"
#include "olap/rowid_conversion.h"
Expand Down Expand Up @@ -163,6 +164,30 @@ TabletSchemaSPtr BaseTablet::tablet_schema_with_merged_max_schema_version(
return target_schema;
}

Status BaseTablet::get_compaction_schema(const std::vector<RowsetMetaSharedPtr>& rowset_metas,
TabletSchemaSPtr& target_schema) {
RowsetMetaSharedPtr max_schema_version_rs = *std::max_element(
rowset_metas.begin(), rowset_metas.end(),
[](const RowsetMetaSharedPtr& a, const RowsetMetaSharedPtr& b) {
return !a->tablet_schema()
? true
: (!b->tablet_schema()
? false
: a->tablet_schema()->schema_version() <
b->tablet_schema()->schema_version());
});
target_schema = max_schema_version_rs->tablet_schema();
if (target_schema->num_variant_columns() > 0) {
RowsetIdUnorderedSet rowset_ids;
for (const RowsetMetaSharedPtr& rs_meta : rowset_metas) {
rowset_ids.emplace(rs_meta->rowset_id());
}
RETURN_IF_ERROR(vectorized::schema_util::get_compaction_schema(
get_rowset_by_ids(&rowset_ids), target_schema));
}
return Status::OK();
}

Status BaseTablet::set_tablet_state(TabletState state) {
if (_tablet_meta->tablet_state() == TABLET_SHUTDOWN && state != TABLET_SHUTDOWN) {
return Status::Error<META_INVALID_ARGUMENT>(
Expand Down
3 changes: 3 additions & 0 deletions be/src/olap/base_tablet.h
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,9 @@ class BaseTablet {
static TabletSchemaSPtr tablet_schema_with_merged_max_schema_version(
const std::vector<RowsetMetaSharedPtr>& rowset_metas);

Status get_compaction_schema(const std::vector<RowsetMetaSharedPtr>& rowset_metas,
TabletSchemaSPtr& target_schema);

////////////////////////////////////////////////////////////////////////////
// begin MoW functions
////////////////////////////////////////////////////////////////////////////
Expand Down
18 changes: 13 additions & 5 deletions be/src/olap/compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,7 @@ Tablet* CompactionMixin::tablet() {
}

Status CompactionMixin::do_compact_ordered_rowsets() {
build_basic_info();
RETURN_IF_ERROR(build_basic_info());
RowsetWriterContext ctx;
RETURN_IF_ERROR(construct_output_rowset_writer(ctx));

Expand Down Expand Up @@ -323,7 +323,7 @@ Status CompactionMixin::do_compact_ordered_rowsets() {
return Status::OK();
}

void CompactionMixin::build_basic_info() {
Status CompactionMixin::build_basic_info() {
for (auto& rowset : _input_rowsets) {
_input_rowsets_data_size += rowset->data_disk_size();
_input_rowsets_index_size += rowset->index_disk_size();
Expand All @@ -344,6 +344,10 @@ void CompactionMixin::build_basic_info() {
std::transform(_input_rowsets.begin(), _input_rowsets.end(), rowset_metas.begin(),
[](const RowsetSharedPtr& rowset) { return rowset->rowset_meta(); });
_cur_tablet_schema = _tablet->tablet_schema_with_merged_max_schema_version(rowset_metas);
if (!_cur_tablet_schema->need_record_variant_extended_schema()) {
RETURN_IF_ERROR(_tablet->get_compaction_schema(rowset_metas, _cur_tablet_schema));
}
return Status::OK();
}

bool CompactionMixin::handle_ordered_data_compaction() {
Expand Down Expand Up @@ -461,7 +465,7 @@ Status CompactionMixin::execute_compact_impl(int64_t permits) {
_state = CompactionState::SUCCESS;
return Status::OK();
}
build_basic_info();
RETURN_IF_ERROR(build_basic_info());

VLOG_DEBUG << "dump tablet schema: " << _cur_tablet_schema->dump_structure();

Expand Down Expand Up @@ -1348,7 +1352,7 @@ void Compaction::_load_segment_to_cache() {
}
}

void CloudCompactionMixin::build_basic_info() {
Status CloudCompactionMixin::build_basic_info() {
_output_version =
Version(_input_rowsets.front()->start_version(), _input_rowsets.back()->end_version());

Expand All @@ -1358,6 +1362,10 @@ void CloudCompactionMixin::build_basic_info() {
std::transform(_input_rowsets.begin(), _input_rowsets.end(), rowset_metas.begin(),
[](const RowsetSharedPtr& rowset) { return rowset->rowset_meta(); });
_cur_tablet_schema = _tablet->tablet_schema_with_merged_max_schema_version(rowset_metas);
if (!_cur_tablet_schema->need_record_variant_extended_schema()) {
RETURN_IF_ERROR(_tablet->get_compaction_schema(rowset_metas, _cur_tablet_schema));
}
return Status::OK();
}

int64_t CloudCompactionMixin::get_compaction_permits() {
Expand All @@ -1375,7 +1383,7 @@ CloudCompactionMixin::CloudCompactionMixin(CloudStorageEngine& engine, CloudTabl
Status CloudCompactionMixin::execute_compact_impl(int64_t permits) {
OlapStopWatch watch;

build_basic_info();
RETURN_IF_ERROR(build_basic_info());

LOG(INFO) << "start " << compaction_name() << ". tablet=" << _tablet->tablet_id()
<< ", output_version=" << _output_version << ", permits: " << permits;
Expand Down
4 changes: 2 additions & 2 deletions be/src/olap/compaction.h
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ class CompactionMixin : public Compaction {
private:
Status execute_compact_impl(int64_t permits);

void build_basic_info();
Status build_basic_info();

// Return true if do ordered data compaction successfully
bool handle_ordered_data_compaction();
Expand Down Expand Up @@ -204,7 +204,7 @@ class CloudCompactionMixin : public Compaction {

Status execute_compact_impl(int64_t permits);

void build_basic_info();
Status build_basic_info();

virtual Status modify_rowsets();

Expand Down
3 changes: 3 additions & 0 deletions be/src/olap/iterators.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
#include "olap/rowset/segment_v2/row_ranges.h"
#include "olap/tablet_schema.h"
#include "runtime/runtime_state.h"
#include "vec/columns/column.h"
#include "vec/core/block.h"
#include "vec/exprs/vexpr.h"

Expand Down Expand Up @@ -119,6 +120,8 @@ class StorageReadOptions {
std::map<std::string, TypeDescriptor> target_cast_type_for_variants;
RowRanges row_ranges;
size_t topn_limit = 0;
// Cache for sparse column data to avoid redundant reads
vectorized::ColumnPtr sparse_column_cache;
};

struct CompactionSampleInfo {
Expand Down
9 changes: 8 additions & 1 deletion be/src/olap/rowset/beta_rowset_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,10 @@ Status BaseBetaRowsetWriter::init(const RowsetWriterContext& rowset_writer_conte
_rowset_meta->set_newest_write_timestamp(_context.newest_write_timestamp);
}
_rowset_meta->set_tablet_uid(_context.tablet_uid);
_rowset_meta->set_tablet_schema(_context.tablet_schema);
auto schema = _context.tablet_schema->need_record_variant_extended_schema()
? _context.tablet_schema
: _context.tablet_schema->copy_without_variant_extracted_columns();
_rowset_meta->set_tablet_schema(schema);
_context.segment_collector = std::make_shared<SegmentCollectorT<BaseBetaRowsetWriter>>(this);
_context.file_writer_creator = std::make_shared<FileWriterCreatorT<BaseBetaRowsetWriter>>(this);
return Status::OK();
Expand Down Expand Up @@ -801,6 +804,10 @@ Status BetaRowsetWriter::build(RowsetSharedPtr& rowset) {
// update rowset meta tablet schema if tablet schema updated
auto rowset_schema = _context.merged_tablet_schema != nullptr ? _context.merged_tablet_schema
: _context.tablet_schema;

rowset_schema = rowset_schema->need_record_variant_extended_schema()
? rowset_schema
: rowset_schema->copy_without_variant_extracted_columns();
_rowset_meta->set_tablet_schema(rowset_schema);

// If segment compaction occurs, the idx file info will become inaccurate.
Expand Down
3 changes: 3 additions & 0 deletions be/src/olap/rowset/rowset_writer_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@

#include <gen_cpp/olap_file.pb.h>

#include <string_view>
#include <unordered_map>

#include "olap/olap_define.h"
#include "olap/partial_update_info.h"
#include "olap/storage_policy.h"
Expand Down
Loading

0 comments on commit abadd08

Please sign in to comment.