Skip to content

Commit

Permalink
[Enhancement] optimize the performace for topn with large offset
Browse files Browse the repository at this point in the history
Signed-off-by: stdpain <[email protected]>
  • Loading branch information
stdpain committed Feb 14, 2025
1 parent 6b04504 commit 6735781
Show file tree
Hide file tree
Showing 15 changed files with 542 additions and 224 deletions.
4 changes: 2 additions & 2 deletions be/src/bench/chunks_sorter_bench.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -257,8 +257,8 @@ static void do_bench(benchmark::State& state, SortAlgorithm sorter_algo, Logical
}
case MergeSort: {
sorter = std::make_unique<ChunksSorterTopn>(suite._runtime_state.get(), &sort_exprs, &asc_arr, &null_first,
"", 0, limit_rows, TTopNType::ROW_NUMBER,
params.max_buffered_chunks);
"", 0, limit_rows, TTopNType::ROW_NUMBER, max_buffered_rows,
max_buffered_bytes params.max_buffered_chunks);
expected_rows = limit_rows;
break;
}
Expand Down
1 change: 1 addition & 0 deletions be/src/exec/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,7 @@ set(EXEC_FILES
schema_scanner/schema_cluster_snapshot_jobs_scanner.cpp
jdbc_scanner.cpp
sorting/compare_column.cpp
sorting/merge.cpp
sorting/merge_column.cpp
sorting/merge_path.cpp
sorting/merge_cascade.cpp
Expand Down
100 changes: 0 additions & 100 deletions be/src/exec/chunks_sorter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,28 +28,6 @@

namespace starrocks {

static void get_compare_results_colwise(size_t rows_to_sort, Columns& order_by_columns,
std::vector<CompareVector>& compare_results_array,
std::vector<DataSegment>& data_segments, const SortDescs& sort_desc) {
size_t dats_segment_size = data_segments.size();

for (size_t i = 0; i < dats_segment_size; ++i) {
size_t rows = data_segments[i].chunk->num_rows();
compare_results_array[i].resize(rows, 0);
}

size_t order_by_column_size = order_by_columns.size();

for (size_t i = 0; i < dats_segment_size; i++) {
Buffer<Datum> rhs_values;
auto& segment = data_segments[i];
for (size_t col_idx = 0; col_idx < order_by_column_size; col_idx++) {
rhs_values.push_back(order_by_columns[col_idx]->get(rows_to_sort));
}
compare_columns(segment.order_by_columns, compare_results_array[i], rhs_values, sort_desc);
}
}

void DataSegment::init(const std::vector<ExprContext*>* sort_exprs, const ChunkPtr& cnk) {
chunk = cnk;
order_by_columns.reserve(sort_exprs->size());
Expand All @@ -58,84 +36,6 @@ void DataSegment::init(const std::vector<ExprContext*>* sort_exprs, const ChunkP
}
}

Status DataSegment::get_filter_array(std::vector<DataSegment>& data_segments, size_t rows_to_sort,
std::vector<std::vector<uint8_t>>& filter_array, const SortDescs& sort_desc,
uint32_t& smaller_num, uint32_t& include_num) {
size_t dats_segment_size = data_segments.size();
std::vector<CompareVector> compare_results_array(dats_segment_size);

// First compare the chunk with last row of this segment.
{
get_compare_results_colwise(rows_to_sort - 1, order_by_columns, compare_results_array, data_segments,
sort_desc);
}

// Since the first and the last of segment is the same value,
// we can get both `SMALLER_THAN_MIN_OF_SEGMENT` and `INCLUDE_IN_SEGMENT` parts
// with only one comparation
if (rows_to_sort == 1) {
smaller_num = 0, include_num = 0;
filter_array.resize(dats_segment_size);
for (size_t i = 0; i < dats_segment_size; ++i) {
size_t rows = data_segments[i].chunk->num_rows();
filter_array[i].resize(rows);

for (size_t j = 0; j < rows; ++j) {
if (compare_results_array[i][j] < 0) {
filter_array[i][j] = DataSegment::SMALLER_THAN_MIN_OF_SEGMENT;
++smaller_num;
} else {
filter_array[i][j] = DataSegment::INCLUDE_IN_SEGMENT;
++include_num;
}
}
}
} else {
include_num = 0;
filter_array.resize(dats_segment_size);
for (size_t i = 0; i < dats_segment_size; ++i) {
DataSegment& segment = data_segments[i];
size_t rows = segment.chunk->num_rows();
filter_array[i].resize(rows);

for (size_t j = 0; j < rows; ++j) {
if (compare_results_array[i][j] <= 0) {
filter_array[i][j] = DataSegment::INCLUDE_IN_SEGMENT;
++include_num;
}
}
}

// Second compare with first row of this chunk, use rows from first compare.
{
for (size_t i = 0; i < dats_segment_size; i++) {
for (auto& cmp : compare_results_array[i]) {
if (cmp < 0) {
cmp = 0;
}
}
}
get_compare_results_colwise(0, order_by_columns, compare_results_array, data_segments, sort_desc);
}

smaller_num = 0;
for (size_t i = 0; i < dats_segment_size; ++i) {
DataSegment& segment = data_segments[i];
size_t rows = segment.chunk->num_rows();

for (size_t j = 0; j < rows; ++j) {
if (compare_results_array[i][j] < 0) {
filter_array[i][j] = DataSegment::SMALLER_THAN_MIN_OF_SEGMENT;
++smaller_num;
}
}
}
include_num -= smaller_num;
}

return Status::OK();
}

ChunksSorter::ChunksSorter(RuntimeState* state, const std::vector<ExprContext*>* sort_exprs,
const std::vector<bool>* is_asc, const std::vector<bool>* is_null_first,
std::string sort_keys, const bool is_topn)
Expand Down
15 changes: 0 additions & 15 deletions be/src/exec/chunks_sorter.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,17 +49,6 @@ struct DataSegment {

void init(const std::vector<ExprContext*>* sort_exprs, const ChunkPtr& cnk);

// There is two compares in the method,
// the first is:
// compare every row in every DataSegment of data_segments with `rows_to_sort - 1` row of this DataSegment,
// obtain every row compare result in compare_results_array, if <= 0, mark it with `INCLUDE_IN_SEGMENT`.
// the second is:
// compare every row in compare_results_array that <= 0 (i.e. `INCLUDE_IN_SEGMENT` part) with the first row of this DataSegment,
// if < 0, then mark it with `SMALLER_THAN_MIN_OF_SEGMENT`
Status get_filter_array(std::vector<DataSegment>& data_segments, size_t rows_to_sort,
std::vector<std::vector<uint8_t>>& filter_array, const SortDescs& sort_order_flags,
uint32_t& least_num, uint32_t& middle_num);

void clear() {
chunk.reset(std::make_unique<Chunk>().release());
order_by_columns.clear();
Expand Down Expand Up @@ -133,8 +122,6 @@ class ChunksSorter {
// Return accurate output rows of this operator
virtual size_t get_output_rows() const = 0;

size_t get_next_output_row() { return _next_output_row; }

virtual int64_t mem_usage() const = 0;

virtual bool is_full() { return false; }
Expand Down Expand Up @@ -163,8 +150,6 @@ class ChunksSorter {
const std::string _sort_keys;
const bool _is_topn;

size_t _next_output_row = 0;

RuntimeProfile::Counter* _build_timer = nullptr;
RuntimeProfile::Counter* _sort_timer = nullptr;
RuntimeProfile::Counter* _merge_timer = nullptr;
Expand Down
3 changes: 1 addition & 2 deletions be/src/exec/chunks_sorter_heap_sort.h
Original file line number Diff line number Diff line change
Expand Up @@ -272,8 +272,7 @@ class ChunksSorterHeapSort final : public ChunksSorter {

const size_t _offset;
const size_t _limit;

// std::vector<detail::ChunkRowCursor> _sorted_values;
size_t _next_output_row = 0;

RuntimeProfile::Counter* _sort_filter_rows = nullptr;
RuntimeProfile::Counter* _sort_filter_costs = nullptr;
Expand Down
Loading

0 comments on commit 6735781

Please sign in to comment.