Skip to content

Commit

Permalink
support downgrade
Browse files Browse the repository at this point in the history
  • Loading branch information
eldenmoon committed Feb 17, 2025
1 parent c807382 commit 5db93b9
Show file tree
Hide file tree
Showing 21 changed files with 520 additions and 262 deletions.
6 changes: 2 additions & 4 deletions be/src/olap/compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -343,8 +343,7 @@ void CompactionMixin::build_basic_info() {
std::vector<RowsetMetaSharedPtr> rowset_metas(_input_rowsets.size());
std::transform(_input_rowsets.begin(), _input_rowsets.end(), rowset_metas.begin(),
[](const RowsetSharedPtr& rowset) { return rowset->rowset_meta(); });
_cur_tablet_schema = _tablet->tablet_schema_with_merged_max_schema_version(rowset_metas)
->copy_without_variant_extracted_columns();
_cur_tablet_schema = _tablet->tablet_schema_with_merged_max_schema_version(rowset_metas);
}

bool CompactionMixin::handle_ordered_data_compaction() {
Expand Down Expand Up @@ -1358,8 +1357,7 @@ void CloudCompactionMixin::build_basic_info() {
std::vector<RowsetMetaSharedPtr> rowset_metas(_input_rowsets.size());
std::transform(_input_rowsets.begin(), _input_rowsets.end(), rowset_metas.begin(),
[](const RowsetSharedPtr& rowset) { return rowset->rowset_meta(); });
_cur_tablet_schema = _tablet->tablet_schema_with_merged_max_schema_version(rowset_metas)
->copy_without_variant_extracted_columns();
_cur_tablet_schema = _tablet->tablet_schema_with_merged_max_schema_version(rowset_metas);
}

int64_t CloudCompactionMixin::get_compaction_permits() {
Expand Down
253 changes: 175 additions & 78 deletions be/src/olap/rowset/segment_v2/column_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
#include <utility>

#include "common/compiler_util.h" // IWYU pragma: keep
#include "common/config.h"
#include "common/exception.h"
#include "common/status.h"
#include "io/fs/file_reader.h"
Expand Down Expand Up @@ -267,14 +268,108 @@ Status VariantColumnReader::_create_hierarchical_reader(ColumnIterator** reader,
return Status::OK();
}

Status VariantColumnReader::new_iterator(ColumnIterator** iterator,
const TabletColumn& target_col) {
bool VariantColumnReader::_read_flat_leaves(ReaderType type, const TabletColumn& target_col) {
auto relative_path = target_col.path_info_ptr()->copy_pop_front();
bool is_compaction_type =
(type == ReaderType::READER_BASE_COMPACTION ||
type == ReaderType::READER_CUMULATIVE_COMPACTION ||
type == ReaderType::READER_COLD_DATA_COMPACTION ||
type == ReaderType::READER_SEGMENT_COMPACTION ||
type == ReaderType::READER_FULL_COMPACTION || type == ReaderType::READER_CHECKSUM);
// For compaction operations (e.g., base compaction, cumulative compaction, cold data compaction,
// segment compaction, full compaction, or checksum reading), a legacy compaction style is applied
// when reading variant columns.
//
// Specifically:
// 1. If the target column is a root column (i.e., relative_path is empty) and it does not have any
// subcolumns (i.e., target_col.variant_max_subcolumns_count() <= 0), then the legacy compaction style
// is used.
// 2. If the target column is a nested subcolumn (i.e., relative_path is not empty), then the legacy
// compaction style is also used.
//
// This ensures that during compaction, the reading behavior for variant columns remains consistent
// with historical processing methods, preventing potential data amplification issues.
return is_compaction_type &&
((relative_path.empty() && target_col.variant_max_subcolumns_count() <= 0) ||
!relative_path.empty());
}

Status VariantColumnReader::_new_default_iter_with_same_nested(ColumnIterator** iterator,
const TabletColumn& tablet_column) {
auto relative_path = tablet_column.path_info_ptr()->copy_pop_front();
// We find node that represents the same Nested type as path.
const auto* parent = _subcolumn_readers->find_best_match(relative_path);
VLOG_DEBUG << "find with path " << tablet_column.path_info_ptr()->get_path() << " parent "
<< (parent ? parent->path.get_path() : "nullptr") << ", type "
<< ", parent is nested " << (parent ? parent->is_nested() : false) << ", "
<< TabletColumn::get_string_by_field_type(tablet_column.type());
// find it's common parent with nested part
// why not use parent->path->has_nested_part? because parent may not be a leaf node
// none leaf node may not contain path info
// Example:
// {"payload" : {"commits" : [{"issue" : {"id" : 123, "email" : "a@b"}}]}}
// nested node path : payload.commits(NESTED)
// tablet_column path_info : payload.commits.issue.id(SCALAR)
// parent path node : payload.commits.issue(TUPLE)
// leaf path_info : payload.commits.issue.email(SCALAR)
if (parent && SubcolumnColumnReaders::find_parent(
parent, [](const auto& node) { return node.is_nested(); })) {
/// Find any leaf of Nested subcolumn.
const auto* leaf = SubcolumnColumnReaders::find_leaf(
parent, [](const auto& node) { return node.path.has_nested_part(); });
assert(leaf);
std::unique_ptr<ColumnIterator> sibling_iter;
ColumnIterator* sibling_iter_ptr;
RETURN_IF_ERROR(leaf->data.reader->new_iterator(&sibling_iter_ptr));
sibling_iter.reset(sibling_iter_ptr);
*iterator = new DefaultNestedColumnIterator(std::move(sibling_iter),
leaf->data.file_column_type);
} else {
*iterator = new DefaultNestedColumnIterator(nullptr, nullptr);
}
return Status::OK();
}

Status VariantColumnReader::_new_iterator_with_flat_leaves(ColumnIterator** iterator,
const TabletColumn& target_col) {
auto relative_path = target_col.path_info_ptr()->copy_pop_front();
// compaction need to read flat leaves nodes data to prevent from amplification
const auto* node =
target_col.has_path_info() ? _subcolumn_readers->find_leaf(relative_path) : nullptr;
if (!node) {
if (target_col.is_nested_subcolumn()) {
// using the sibling of the nested column to fill the target nested column
RETURN_IF_ERROR(_new_default_iter_with_same_nested(iterator, target_col));
} else {
std::unique_ptr<ColumnIterator> it;
RETURN_IF_ERROR(Segment::new_default_iterator(target_col, &it));
*iterator = it.release();
}
return Status::OK();
}
if (relative_path.empty()) {
// root path, use VariantRootColumnIterator
*iterator = *iterator =
new VariantRootColumnIterator(new FileColumnIterator(node->data.reader.get()));
return Status::OK();
}
RETURN_IF_ERROR(node->data.reader->new_iterator(iterator));
return Status::OK();
}

Status VariantColumnReader::new_iterator(ColumnIterator** iterator, const TabletColumn& target_col,
const StorageReadOptions* opt) {
// root column use unique id, leaf column use parent_unique_id
auto relative_path = target_col.path_info_ptr()->copy_pop_front();
const auto* root = _subcolumn_readers->get_root();
const auto* node =
target_col.has_path_info() ? _subcolumn_readers->find_exact(relative_path) : nullptr;

if (opt != nullptr && _read_flat_leaves(opt->io_ctx.reader_type, target_col)) {
// original path, compaction with wide schema
return _new_iterator_with_flat_leaves(iterator, target_col);
}

if (node != nullptr) {
// relative_path means the root node, should always use HierarchicalDataReader
if (node->is_leaf_node() && !relative_path.empty()) {
Expand Down Expand Up @@ -917,7 +1012,8 @@ Status ColumnReader::seek_at_or_before(ordinal_t ordinal, OrdinalPageIndexIterat
return Status::OK();
}

Status ColumnReader::new_iterator(ColumnIterator** iterator, const TabletColumn& col) {
Status ColumnReader::new_iterator(ColumnIterator** iterator, const TabletColumn& col,
const StorageReadOptions* opt) {
return new_iterator(iterator);
}

Expand All @@ -944,12 +1040,12 @@ Status ColumnReader::new_iterator(ColumnIterator** iterator) {
case FieldType::OLAP_FIELD_TYPE_MAP: {
return new_map_iterator(iterator);
}
case FieldType::OLAP_FIELD_TYPE_VARIANT: {
// read from root data
// *iterator = new VariantRootColumnIterator(new FileColumnIterator(this));
*iterator = new FileColumnIterator(this);
return Status::OK();
}
// case FieldType::OLAP_FIELD_TYPE_VARIANT: {
// // read from root data
// *iterator = new VariantRootColumnIterator(new FileColumnIterator(this));
// // *iterator = new FileColumnIterator(this);
// return Status::OK();
// }
default:
return Status::NotSupported("unsupported type to create iterator: {}",
std::to_string(int(type)));
Expand Down Expand Up @@ -1840,75 +1936,76 @@ void DefaultValueColumnIterator::_insert_many_default(vectorized::MutableColumnP
}
}

// Status VariantRootColumnIterator::_process_root_column(
// vectorized::MutableColumnPtr& dst, vectorized::MutableColumnPtr& root_column,
// const vectorized::DataTypePtr& most_common_type) {
// auto& obj =
// dst->is_nullable()
// ? assert_cast<vectorized::ColumnObject&>(
// assert_cast<vectorized::ColumnNullable&>(*dst).get_nested_column())
// : assert_cast<vectorized::ColumnObject&>(*dst);
//
// // fill nullmap
// if (root_column->is_nullable() && dst->is_nullable()) {
// vectorized::ColumnUInt8& dst_null_map =
// assert_cast<vectorized::ColumnNullable&>(*dst).get_null_map_column();
// vectorized::ColumnUInt8& src_null_map =
// assert_cast<vectorized::ColumnNullable&>(*root_column).get_null_map_column();
// dst_null_map.insert_range_from(src_null_map, 0, src_null_map.size());
// }
//
// // add root column to a tmp object column
// auto tmp = vectorized::ColumnObject::create(true, false);
// auto& tmp_obj = assert_cast<vectorized::ColumnObject&>(*tmp);
// tmp_obj.add_sub_column({}, std::move(root_column), most_common_type);
//
// // merge tmp object column to dst
// obj.insert_range_from(*tmp, 0, tmp_obj.rows());
//
// // finalize object if needed
// if (!obj.is_finalized()) {
// obj.finalize();
// }
//
// #ifndef NDEBUG
// obj.check_consistency();
// #endif
//
// return Status::OK();
// }
//
// Status VariantRootColumnIterator::next_batch(size_t* n, vectorized::MutableColumnPtr& dst,
// bool* has_null) {
// // read root column
// auto& obj =
// dst->is_nullable()
// ? assert_cast<vectorized::ColumnObject&>(
// assert_cast<vectorized::ColumnNullable&>(*dst).get_nested_column())
// : assert_cast<vectorized::ColumnObject&>(*dst);
//
// auto most_common_type = obj.get_most_common_type();
// auto root_column = most_common_type->create_column();
// RETURN_IF_ERROR(_inner_iter->next_batch(n, root_column, has_null));
//
// return _process_root_column(dst, root_column, most_common_type);
// }
//
// Status VariantRootColumnIterator::read_by_rowids(const rowid_t* rowids, const size_t count,
// vectorized::MutableColumnPtr& dst) {
// // read root column
// auto& obj =
// dst->is_nullable()
// ? assert_cast<vectorized::ColumnObject&>(
// assert_cast<vectorized::ColumnNullable&>(*dst).get_nested_column())
// : assert_cast<vectorized::ColumnObject&>(*dst);
//
// auto most_common_type = obj.get_most_common_type();
// auto root_column = most_common_type->create_column();
// RETURN_IF_ERROR(_inner_iter->read_by_rowids(rowids, count, root_column));
//
// return _process_root_column(dst, root_column, most_common_type);
// }
Status VariantRootColumnIterator::_process_root_column(
vectorized::MutableColumnPtr& dst, vectorized::MutableColumnPtr& root_column,
const vectorized::DataTypePtr& most_common_type) {
auto& obj =
dst->is_nullable()
? assert_cast<vectorized::ColumnObject&>(
assert_cast<vectorized::ColumnNullable&>(*dst).get_nested_column())
: assert_cast<vectorized::ColumnObject&>(*dst);

// fill nullmap
if (root_column->is_nullable() && dst->is_nullable()) {
vectorized::ColumnUInt8& dst_null_map =
assert_cast<vectorized::ColumnNullable&>(*dst).get_null_map_column();
vectorized::ColumnUInt8& src_null_map =
assert_cast<vectorized::ColumnNullable&>(*root_column).get_null_map_column();
dst_null_map.insert_range_from(src_null_map, 0, src_null_map.size());
}

// add root column to a tmp object column
auto tmp = vectorized::ColumnObject::create(0, root_column->size());
auto& tmp_obj = assert_cast<vectorized::ColumnObject&>(*tmp);
tmp_obj.add_sub_column({}, std::move(root_column), most_common_type);
// tmp_obj.get_sparse_column()->assume_mutable()->insert_many_defaults(root_column->size());

// merge tmp object column to dst
obj.insert_range_from(*tmp, 0, tmp_obj.rows());

// finalize object if needed
if (!obj.is_finalized()) {
obj.finalize();
}

#ifndef NDEBUG
obj.check_consistency();
#endif

return Status::OK();
}

Status VariantRootColumnIterator::next_batch(size_t* n, vectorized::MutableColumnPtr& dst,
bool* has_null) {
// read root column
auto& obj =
dst->is_nullable()
? assert_cast<vectorized::ColumnObject&>(
assert_cast<vectorized::ColumnNullable&>(*dst).get_nested_column())
: assert_cast<vectorized::ColumnObject&>(*dst);

auto most_common_type = obj.get_most_common_type();
auto root_column = most_common_type->create_column();
RETURN_IF_ERROR(_inner_iter->next_batch(n, root_column, has_null));

return _process_root_column(dst, root_column, most_common_type);
}

Status VariantRootColumnIterator::read_by_rowids(const rowid_t* rowids, const size_t count,
vectorized::MutableColumnPtr& dst) {
// read root column
auto& obj =
dst->is_nullable()
? assert_cast<vectorized::ColumnObject&>(
assert_cast<vectorized::ColumnNullable&>(*dst).get_nested_column())
: assert_cast<vectorized::ColumnObject&>(*dst);

auto most_common_type = obj.get_most_common_type();
auto root_column = most_common_type->create_column();
RETURN_IF_ERROR(_inner_iter->read_by_rowids(rowids, count, root_column));

return _process_root_column(dst, root_column, most_common_type);
}

Status DefaultNestedColumnIterator::next_batch(size_t* n, vectorized::MutableColumnPtr& dst) {
bool has_null = false;
Expand Down
Loading

0 comments on commit 5db93b9

Please sign in to comment.