Skip to content

Commit

Permalink
feat(shannon): optimize-secnodary-engine
Browse files Browse the repository at this point in the history
  • Loading branch information
ShannonBase committed Jan 6, 2025
1 parent a4d659e commit 89ccf50
Show file tree
Hide file tree
Showing 17 changed files with 304 additions and 48 deletions.
45 changes: 24 additions & 21 deletions mysql-test/suite/secondary_engine/r/query_preparation.result
Original file line number Diff line number Diff line change
Expand Up @@ -580,7 +580,7 @@ x
234
SHOW SESSION STATUS LIKE 'Secondary_engine_execution_count';
Variable_name Value
Secondary_engine_execution_count 0
Secondary_engine_execution_count 1
DROP FUNCTION f1;
DROP FUNCTION f2;
DROP TABLE tf;
Expand Down Expand Up @@ -637,15 +637,15 @@ a
INSERT INTO t1 SELECT * FROM t2;
SHOW SESSION STATUS LIKE 'Secondary_engine_execution_count';
Variable_name Value
Secondary_engine_execution_count 1
Secondary_engine_execution_count 2
# An error shall be returned if table is not loaded.
ALTER TABLE t1 SECONDARY_UNLOAD;
SET @@use_secondary_engine = FORCED;
SELECT * FROM t1;
Got one of the listed errors
SHOW SESSION STATUS LIKE 'Secondary_engine_execution_count';
Variable_name Value
Secondary_engine_execution_count 1
Secondary_engine_execution_count 2
SET @@use_secondary_engine = ON;
SELECT * FROM t1;
a
Expand All @@ -654,7 +654,7 @@ a
1
SHOW SESSION STATUS LIKE 'Secondary_engine_execution_count';
Variable_name Value
Secondary_engine_execution_count 1
Secondary_engine_execution_count 2
DROP TABLE t1;
DROP TABLE t2;
SET @@use_secondary_engine = @my_use_secondary_engine;
Expand Down Expand Up @@ -774,27 +774,27 @@ EXECUTE ps;
a
SHOW SESSION STATUS LIKE 'Secondary_engine_execution_count';
Variable_name Value
Secondary_engine_execution_count 0
Secondary_engine_execution_count 1
SET @@use_secondary_engine = FORCED;
EXECUTE ps;
a
SHOW SESSION STATUS LIKE 'Secondary_engine_execution_count';
Variable_name Value
Secondary_engine_execution_count 1
Secondary_engine_execution_count 2
# Unload table from secondary engine.
ALTER TABLE t1 SECONDARY_UNLOAD;
SET @@use_secondary_engine = ON;
EXECUTE ps;
a
SHOW SESSION STATUS LIKE 'Secondary_engine_execution_count';
Variable_name Value
Secondary_engine_execution_count 1
Secondary_engine_execution_count 2
SET @@use_secondary_engine = FORCED;
EXECUTE ps;
a
SHOW SESSION STATUS LIKE 'Secondary_engine_execution_count';
Variable_name Value
Secondary_engine_execution_count 1
Secondary_engine_execution_count 2
DROP PREPARE ps;
DROP TABLE t1;
SET @@use_secondary_engine = @my_use_secondary_engine;
Expand Down Expand Up @@ -830,13 +830,13 @@ a
SET @@use_secondary_engine = @my_use_secondary_engine;
SHOW SESSION STATUS LIKE 'Secondary_engine_execution_count';
Variable_name Value
Secondary_engine_execution_count 1
Secondary_engine_execution_count 2
SELECT /*+ SET_VAR(use_secondary_engine = OFF) */ * FROM t1;
a
1
SHOW SESSION STATUS LIKE 'Secondary_engine_execution_count';
Variable_name Value
Secondary_engine_execution_count 1
Secondary_engine_execution_count 2
DROP TABLE t1;
SET @@use_secondary_engine = @my_use_secondary_engine;
#
Expand All @@ -848,9 +848,9 @@ ALTER TABLE t SECONDARY_LOAD;
SELECT * FROM t;
x
FLUSH STATUS;
SELECT 0;
0
0
SELECT 1;
1
1
DROP TABLE t;
#
# Load only a subset of columns into secondary engine.
Expand Down Expand Up @@ -1046,7 +1046,7 @@ a b
1 1
SHOW SESSION STATUS LIKE 'Secondary_engine_execution_count';
Variable_name Value
Secondary_engine_execution_count 0
Secondary_engine_execution_count 2
DROP TABLE t1;
#
# Use NOT SECONDARY with other constraint on column.
Expand All @@ -1064,7 +1064,7 @@ SELECT /*+ SET_VAR(use_secondary_engine = FORCED) */ b FROM t1;
ERROR HY000: Secondary engine operation failed. One or more read columns are marked as NOT SECONDARY.
SHOW SESSION STATUS LIKE 'Secondary_engine_execution_count';
Variable_name Value
Secondary_engine_execution_count 0
Secondary_engine_execution_count 1
DROP TABLE t1;
# Add NON SECONDARY and do not use SECONDARY_ENGINE AT CREATE
CREATE TABLE t1 (a INT NOT SECONDARY, b INT);
Expand Down Expand Up @@ -1092,8 +1092,11 @@ Table Op Msg_type Msg_text
test.t analyze status OK
EXPLAIN FORMAT=TREE SELECT * FROM t WHERE a IS NULL;
EXPLAIN
-> Rows fetched before execution (rows=1)
-> Filter: (t.a = <cache>(last_insert_id())) (rows=1)
-> Table scan on t in secondary engine Rapid (rows=1)

Warnings:
Note 1003 Query is executed in secondary engine; the actual query plan may diverge from the printed one
DROP TABLE t;
SET @@sql_auto_is_null = @saved_sql_auto_is_null;
# Bug#29288023: Join condition is substituted with REF access
Expand Down Expand Up @@ -1292,7 +1295,7 @@ SELECT count_star, count_secondary, sum_select_scan
FROM performance_schema.events_statements_summary_by_digest
WHERE digest_text LIKE 'SELECT %';
count_star count_secondary sum_select_scan
3 1 2
2 1 2
EXECUTE ps;
x
1
Expand All @@ -1311,7 +1314,7 @@ SELECT execution_engine, count_execute, count_secondary
FROM performance_schema.prepared_statements_instances
WHERE statement_name = 'ps';
execution_engine count_execute count_secondary
PRIMARY 3 0
SECONDARY 3 1
SET @saved_use_secondary_engine = @@use_secondary_engine;
SET use_secondary_engine = OFF;
EXECUTE ps;
Expand All @@ -1333,7 +1336,7 @@ SELECT execution_engine, count_execute, count_secondary
FROM performance_schema.prepared_statements_instances
WHERE statement_name = 'ps';
execution_engine count_execute count_secondary
PRIMARY 4 0
SECONDARY 4 2
SET @saved_use_secondary_engine = @@use_secondary_engine;
SET use_secondary_engine = FORCED;
EXECUTE ps;
Expand All @@ -1355,7 +1358,7 @@ SELECT execution_engine, count_execute, count_secondary
FROM performance_schema.prepared_statements_instances
WHERE statement_name = 'ps';
execution_engine count_execute count_secondary
PRIMARY 5 0
SECONDARY 5 3
CREATE PROCEDURE p() EXECUTE ps;
CALL p();
x
Expand All @@ -1371,7 +1374,7 @@ SELECT execution_engine, count_execute, count_secondary
FROM performance_schema.prepared_statements_instances
WHERE statement_name = 'ps';
execution_engine count_execute count_secondary
PRIMARY 6 0
SECONDARY 6 4
DROP PREPARE ps;
FLUSH STATUS;
CREATE TABLE tt AS SELECT * FROM t;
Expand Down
8 changes: 4 additions & 4 deletions sql/iterators/basic_row_iterators.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ class QEP_TAB;
This is the most basic access method of a table using rnd_init,
ha_rnd_next and rnd_end. No indexes are used.
*/
class TableScanIterator final : public TableRowIterator {
class TableScanIterator : public TableRowIterator {
public:
/**
@param thd session context
Expand All @@ -75,13 +75,13 @@ class TableScanIterator final : public TableRowIterator {
ha_rows *examined_rows);
~TableScanIterator() override;

bool Init() override;
int Read() override;
virtual bool Init() override;
virtual int Read() override;

private:
uchar *const m_record;
const double m_expected_rows;
ha_rows *const m_examined_rows;

/// Used to keep track of how many more duplicates of the last read row that
/// remains to be written to the next stage: used for EXCEPT and INTERSECT
/// computation: we only ever materialize one row even if there are
Expand Down
9 changes: 7 additions & 2 deletions sql/join_optimizer/access_path.cc
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
#include "sql/sql_update.h"
#include "sql/table.h"

#include "storage/rapid_engine/iterators/iterator.h"
using pack_rows::TableCollection;
using std::all_of;
using std::vector;
Expand Down Expand Up @@ -476,8 +477,12 @@ unique_ptr_destroy_only<RowIterator> CreateIteratorFromAccessPath(
switch (path->type) {
case AccessPath::TABLE_SCAN: {
const auto &param = path->table_scan();
iterator = NewIterator<TableScanIterator>(
thd, mem_root, param.table, path->num_output_rows(), examined_rows);
if (path->using_batch_instr)
iterator = NewIterator<ShannonBase::Executor::BatchTableScanIterator>(
thd, mem_root, param.table, path->num_output_rows(), examined_rows);
else
iterator = NewIterator<TableScanIterator>(
thd, mem_root, param.table, path->num_output_rows(), examined_rows);
break;
}
case AccessPath::INDEX_SCAN: {
Expand Down
7 changes: 5 additions & 2 deletions sql/join_optimizer/access_path.h
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ struct AccessPath {
// TODO(khatlen): When we move to C++20, add initializers to the bit fields
// too, and use the default constructor generated by the compiler.
AccessPath()
: count_examined_rows(false)
: using_batch_instr(false), count_examined_rows(false)
#ifndef NDEBUG
,
forced_by_dbug(false)
Expand Down Expand Up @@ -294,6 +294,9 @@ struct AccessPath {
/// Whether it is safe to get row IDs (for sorting) from this access path.
Safety safe_for_rowid = SAFE;

// wheteher it use vectorization opr in execution.
bool using_batch_instr: 1;

/// Whether this access path counts as one that scans a base table,
/// and thus should be counted towards examined_rows. It can sometimes
/// seem a bit arbitrary which iterators count towards examined_rows
Expand Down Expand Up @@ -1222,7 +1225,7 @@ static_assert(std::is_trivially_destructible<AccessPath>::value,
"on the MEM_ROOT and not wrapped in unique_ptr_destroy_only"
"(because multiple candidates during planning could point to "
"the same access paths, and refcounting would be expensive)");
static_assert(sizeof(AccessPath) <= 144,
static_assert(sizeof(AccessPath) <= 145,
"We are creating a lot of access paths in the join "
"optimizer, so be sure not to bloat it without noticing. "
"(96 bytes for the base, 48 bytes for the variant.)");
Expand Down
8 changes: 7 additions & 1 deletion sql/sql_class.cc
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@
#include "string_with_len.h"
#include "template_utils.h"
#include "thr_mutex.h"
#include "join_optimizer/access_path.h"

class Parse_tree_root;

Expand Down Expand Up @@ -646,6 +647,11 @@ void Secondary_engine_statement_context::cache_primary_plan_info(THD* thd, JOIN*
if (!tab_ref) continue;
m_tables.emplace_back(tab_ref);
m_base_table_rows += (const_cast<Table_ref*>(tab_ref)->fetch_number_of_rows());

auto accesspath = join->qep_tab[i].access_path();
if (accesspath->type == AccessPath::EQ_REF || accesspath->type == AccessPath::INDEX_SCAN
|| accesspath->type == AccessPath::INDEX_RANGE_SCAN)
m_count_ref_index_ts ++;
}
}

Expand All @@ -654,7 +660,7 @@ void Secondary_engine_statement_context::cache_primary_plan_info(THD* thd, JOIN*

auto root_access_path = thd->lex->unit->root_access_path();
assert (root_access_path);
//join->row_limit == 1;
m_are_all_ts_index_ref = (m_count_ref_index_ts == m_count_all_base_tables) ? true : false;
}

THD::THD(bool enable_plugins)
Expand Down
2 changes: 1 addition & 1 deletion sql/sql_class.h
Original file line number Diff line number Diff line change
Expand Up @@ -981,7 +981,7 @@ class Secondary_engine_statement_context {
//the # of rows of all base tables.
uint m_base_table_rows {0};
//whether all tables use index table scan.
bool are_all_ts_index_ref {false};
bool m_are_all_ts_index_ref {false};
//is a complex query or not.
bool m_complex_query {false};
};
Expand Down
1 change: 1 addition & 0 deletions sql/sql_lex.h
Original file line number Diff line number Diff line change
Expand Up @@ -716,6 +716,7 @@ class Query_expression {
return down_cast<Query_term_set_op *>(m_query_term)->m_last_distinct > 0;
}

void set_root_iterator(unique_ptr_destroy_only<RowIterator>& it) { m_root_iterator = std::move(it); }
private:
/**
Marker for subqueries in WHERE, HAVING, ORDER BY, GROUP BY and
Expand Down
2 changes: 1 addition & 1 deletion storage/innobase/mtr/mtr0mtr.cc
Original file line number Diff line number Diff line change
Expand Up @@ -827,6 +827,7 @@ lsn_t mtr_t::Command::cp_to_pop_buff(log_t& log, lsn_t start_lsn, ulint str_len)
ut_a(log.buf_size % OS_FILE_LOG_BLOCK_SIZE == 0);

ShannonBase::Populate::mtr_log_rec log_rec(str_len);
ShannonBase::Populate::sys_pop_data_sz.fetch_add(str_len);

size_t pos {0};
/* That's only used in the assertion at the very end. */
Expand Down Expand Up @@ -933,7 +934,6 @@ lsn_t mtr_t::Command::cp_to_pop_buff(log_t& log, lsn_t start_lsn, ulint str_len)
}

ShannonBase::Populate::sys_pop_buff.emplace(start_lsn, std::move(log_rec));
ShannonBase::Populate::sys_pop_data_sz.fetch_add(str_len);
ut_a(ptr >= log.buf);
ut_a(ptr <= buf_end);
ut_a(buf_end == log.buf + log.buf_size);
Expand Down
1 change: 1 addition & 0 deletions storage/rapid_engine/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ SET (SHANNON_READER_SOURCES
)
SET (SHANNON_ITERATOR_SOURCES
iterators/iterator.cpp
iterators/hash_join_iterator.cpp
${SHANNON_READER_SOURCES}
)
set (SHANNON_OPTIMIZE
Expand Down
23 changes: 19 additions & 4 deletions storage/rapid_engine/handler/ha_shannon_rapid.cc
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,11 @@
#include "storage/rapid_engine/include/rapid_const.h" //const
#include "storage/rapid_engine/include/rapid_context.h"
#include "storage/rapid_engine/include/rapid_status.h" //column stats
#include "storage/rapid_engine/optimizer/optimizer.h"
#include "storage/rapid_engine/populate/populate.h"
#include "storage/rapid_engine/trx/transaction.h" //transaction
#include "storage/rapid_engine/utils/utils.h"

#include "template_utils.h"
#include "thr_lock.h"

Expand Down Expand Up @@ -700,23 +702,36 @@ static void AssertSupportedPath(const AccessPath *path) {
// rapid_statement_context and additionally looks at Change
// propagation lag to decide if query should be offloaded to rapid
// returns true, goes to innodb engine. otherwise, false, goes to secondary engine.
static bool RapidOptimize(THD *thd [[maybe_unused]], LEX *lex) {
static bool RapidOptimize(THD *thd, LEX *lex) {
if (thd->variables.use_secondary_engine == SECONDARY_ENGINE_OFF) {
SetSecondaryEngineOffloadFailedReason(thd, "in RapidOptimize, set use_secondary_engine to false.");
return true;
} else if (thd->variables.use_secondary_engine == SECONDARY_ENGINE_FORCED)
return false;
}

// auto statement_context = thd->secondary_engine_statement_context();
// to much changes to populate, then goes to primary engine.
ulonglong too_much_pop_threshold =
static_cast<ulonglong>(ShannonBase::SHANNON_TO_MUCH_POP_THRESHOLD_RATIO * ShannonBase::rpd_pop_buff_sz_max);
if (ShannonBase::Populate::sys_pop_buff.size() > 1000 ||
if (ShannonBase::Populate::sys_pop_buff.size() > 10000 ||
ShannonBase::Populate::sys_pop_data_sz > too_much_pop_threshold) {
SetSecondaryEngineOffloadFailedReason(thd, "in RapidOptimize, the CP lag is too much.");
return true;
}

JOIN *join = lex->unit->first_query_block()->join;
WalkAccessPaths(lex->unit->root_access_path(), join, WalkAccessPathPolicy::ENTIRE_TREE,
[&](AccessPath *path, const JOIN *join) {
ShannonBase::Optimizer::OptimzieAccessPath(path, const_cast<JOIN *>(join));
return false;
});
// Here, because we cannot get the parent node of corresponding iterator, we reset the type of access
// path, then re-generates all the iterators. But, it makes the preformance regression for a `short`
// AP workload. But, we will replace the itertor when we traverse iterator tree from root to leaves.
lex->unit->release_root_iterator().reset();
auto new_root_iter =
CreateIteratorFromAccessPath(thd, lex->unit->root_access_path(), join, /*eligible_for_batch_mode=*/true);

lex->unit->set_root_iterator(new_root_iter);
return false;
}

Expand Down
Loading

0 comments on commit 89ccf50

Please sign in to comment.