Skip to content

Commit

Permalink
feat(shannon): performance refine
Browse files Browse the repository at this point in the history
1: peformance refine.
2: global var: rapid_change_propagation_status added.
  • Loading branch information
ShannonBase committed Jan 9, 2025
1 parent 6381caa commit 21033e2
Show file tree
Hide file tree
Showing 6 changed files with 75 additions and 26 deletions.
12 changes: 12 additions & 0 deletions sql/mysqld.cc
Original file line number Diff line number Diff line change
Expand Up @@ -983,6 +983,7 @@ MySQL clients support the protocol:
#include "sql/server_component/mysql_thd_store_imp.h"
#include "sql/server_component/persistent_dynamic_loader_imp.h"
#include "sql/srv_session.h"
#include "storage/rapid_engine/populate/populate.h"

using std::max;
using std::min;
Expand Down Expand Up @@ -9720,6 +9721,15 @@ static int show_telemetry_traces_support(THD * /*unused*/, SHOW_VAR *var,
return 0;
}

static int show_rapid_change_propagation(THD * /*unused*/, SHOW_VAR *var,
char *buff) {
var->type = SHOW_BOOL;
var->value = buff;
*(pointer_cast<bool *>(buff)) =
ShannonBase::Populate::Populator::log_pop_thread_is_active();
return 0;
}

SHOW_VAR status_vars[] = {
{"Aborted_clients", (char *)&aborted_threads, SHOW_LONG, SHOW_SCOPE_GLOBAL},
{"Aborted_connects", (char *)&show_aborted_connects, SHOW_FUNC,
Expand Down Expand Up @@ -10082,6 +10092,8 @@ SHOW_VAR status_vars[] = {
SHOW_FUNC, SHOW_SCOPE_GLOBAL},
{"Tls_sni_server_name", (char *)&show_ssl_get_tls_sni_servername, SHOW_FUNC,
SHOW_SCOPE_SESSION},
{"rapid_change_propagation_status", (char *)&show_rapid_change_propagation, SHOW_FUNC,
SHOW_SCOPE_GLOBAL},
{NullS, NullS, SHOW_LONG, SHOW_SCOPE_ALL}};

void add_terminator(vector<my_option> *options) {
Expand Down
13 changes: 7 additions & 6 deletions storage/rapid_engine/handler/ha_shannon_rapid.cc
Original file line number Diff line number Diff line change
Expand Up @@ -625,7 +625,7 @@ static bool PrepareSecondaryEngine(THD *thd, LEX *lex) {
// and, we can get all optimzation information, then caches all these info.
static bool RapidCachePrimaryInfoAtPrimaryTentativelyStep(THD *thd) {
assert(thd->secondary_engine_optimization() == Secondary_engine_optimization::PRIMARY_TENTATIVELY);
if (thd->secondary_engine_statement_context() == nullptr) {
if (unlikely(thd->secondary_engine_statement_context() == nullptr)) {
/* Prepare this query's specific statment context */
std::unique_ptr<Secondary_engine_statement_context> ctx = std::make_unique<ShannonBase::Rapid_statement_context>();
thd->set_secondary_engine_statement_context(std::move(ctx));
Expand Down Expand Up @@ -653,8 +653,9 @@ bool SecondaryEnginePrePrepareHook(THD *thd) {
return ShannonBase::Utils::Util::standard_cost_threshold_classifier(thd);
} else if (likely(thd->variables.rapid_use_dynamic_offload)) {
// 1: static sceanrio.
if (!ShannonBase::Populate::Populator::log_pop_thread_is_active() ||
(ShannonBase::Populate::Populator::log_pop_thread_is_active() && ShannonBase::Populate::sys_pop_buff.empty())) {
if (likely(!ShannonBase::Populate::Populator::log_pop_thread_is_active() ||
(ShannonBase::Populate::Populator::log_pop_thread_is_active() &&
ShannonBase::Populate::sys_pop_buff.empty()))) {
return ShannonBase::Utils::Util::decision_tree_classifier(thd);
} else {
// 2: dynamic scenario.
Expand Down Expand Up @@ -703,7 +704,7 @@ static void AssertSupportedPath(const AccessPath *path) {
// propagation lag to decide if query should be offloaded to rapid
// returns true, goes to innodb engine. otherwise, false, goes to secondary engine.
static bool RapidOptimize(THD *thd, LEX *lex) {
if (thd->variables.use_secondary_engine == SECONDARY_ENGINE_OFF) {
if (likely(thd->variables.use_secondary_engine == SECONDARY_ENGINE_OFF)) {
SetSecondaryEngineOffloadFailedReason(thd, "in RapidOptimize, set use_secondary_engine to false.");
return true;
}
Expand All @@ -712,8 +713,8 @@ static bool RapidOptimize(THD *thd, LEX *lex) {
// to much changes to populate, then goes to primary engine.
ulonglong too_much_pop_threshold =
static_cast<ulonglong>(ShannonBase::SHANNON_TO_MUCH_POP_THRESHOLD_RATIO * ShannonBase::rpd_pop_buff_sz_max);
if (ShannonBase::Populate::sys_pop_buff.size() > 10000 ||
ShannonBase::Populate::sys_pop_data_sz > too_much_pop_threshold) {
if (unlikely(ShannonBase::Populate::sys_pop_buff.size() > ShannonBase::SHANNON_POP_BUFF_THRESHOLD_COUNT ||
ShannonBase::Populate::sys_pop_data_sz > too_much_pop_threshold)) {
SetSecondaryEngineOffloadFailedReason(thd, "in RapidOptimize, the CP lag is too much.");
return true;
}
Expand Down
1 change: 1 addition & 0 deletions storage/rapid_engine/include/rapid_const.h
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ constexpr uint64 SHANNON_MAX_MEMRORY_SIZE = SHANNON_DEFAULT_MEMRORY_SIZE;
constexpr uint64 SHANNON_POPULATION_HRESHOLD_SIZE = 64 * SHANNON_MB;
constexpr uint64 SHANNON_MAX_POPULATION_BUFFER_SIZE = 128 * SHANNON_MB;
constexpr double SHANNON_TO_MUCH_POP_THRESHOLD_RATIO = 0.85;
constexpr uint64 SHANNON_POP_BUFF_THRESHOLD_COUNT = 10000;

#define ALIGN_WORD(WORD, TYPE_SIZE) ((WORD + TYPE_SIZE - 1) & ~(TYPE_SIZE - 1))

Expand Down
5 changes: 5 additions & 0 deletions storage/rapid_engine/optimizer/optimizer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@
#include "storage/rapid_engine/include/rapid_context.h"
#include "storage/rapid_engine/optimizer/rules/const_fold_rule.h"

#include "storage/rapid_engine/populate/populate.h"

namespace ShannonBase {
namespace Optimizer {

Expand All @@ -65,6 +67,9 @@ void OptimzieAccessPath(AccessPath *path, JOIN *join) {
auto table = path->table_scan().table;
if (table->s->is_secondary_engine() && table->file->stats.records >= SHANNON_VECTOR_WIDTH) {
path->using_batch_instr = true;
// this table is used by query and the table has been loaded into rapid engine. then start
// a propagation.
ShannonBase::Populate::Populator::send_propagation_notify();
}
} break;
case AccessPath::HASH_JOIN: {
Expand Down
68 changes: 48 additions & 20 deletions storage/rapid_engine/populate/populate.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
#include <chrono>
#include <future>
#include <mutex>
#include <sstream>
#include <thread>

#include "current_thd.h"
Expand Down Expand Up @@ -116,30 +117,52 @@ static void parse_log_func_main(log_t *log_ptr) {
return false;
};

// waiting until the 200ms reached and the incomming data in buffer more than 64MB.
// waiting until the 200ms reached or the incomming data in buffer more than 64MB.
os_event_wait_for(log_ptr->rapid_events[0], MAX_LOG_POP_SPINS, std::chrono::microseconds{MAX_WAIT_TIMEOUT},
stop_condition);
os_event_reset(log_sys->rapid_events[0]);

// thread stopped.
if (unlikely(!sys_pop_started.load(std::memory_order_acquire))) break;

// pop buffer is empty, then re-check the condtion.
if (likely(sys_pop_buff.empty())) continue;

mutex_enter(&(log_sys->rapid_populator_mutex));
// we only use a half of threads to do propagation.
std::vector<std::future<uint64_t>> results;
size_t thread_num = std::thread::hardware_concurrency() / 2;
thread_num = (thread_num > sys_pop_buff.size()) ? sys_pop_buff.size() : thread_num;
auto curr_iter = sys_pop_buff.begin();
for (size_t counter = 0; counter < thread_num; counter++) {
mutex_enter(&log_sys->rapid_populator_mutex);
byte *from_ptr = curr_iter->second.data.get();
auto size = curr_iter->second.size;

// using std thread, not IB_thread, ib_thread has not interface to thread func ret.
results.emplace_back(
std::async(std::launch::async, parse_mtr_log_worker, curr_iter->first, from_ptr, from_ptr + size, size));
curr_iter++;
mutex_exit(&log_sys->rapid_populator_mutex);
}

for (auto &res : results) { // gets the result from worker thread.
auto ret_lsn = res.get();
if (!ret_lsn) { // propagation failure occure.
std::stringstream error_message;
error_message << "Propagation thread occur errors, it makes loaded data to be stale"
<< ". Please unload loaded tables, and then load tables again manually.";
push_warning(current_thd, Sql_condition::SL_WARNING, errno, error_message.str().c_str());
}

mutex_enter(&log_sys->rapid_populator_mutex);
auto iter = sys_pop_buff.find(ret_lsn);
ut_a(iter != sys_pop_buff.end());
sys_pop_data_sz.fetch_sub(iter->second.size);
sys_pop_buff.erase(ret_lsn);
mutex_exit(&log_sys->rapid_populator_mutex);
}

sys_rapid_loop_count++;
uint64 size = sys_pop_buff.begin()->second.size;
byte *from_ptr = sys_pop_buff.begin()->second.data.get();
mutex_exit(&(log_sys->rapid_populator_mutex));

// using std thread, not IB_thread, ib_thread has not interface to thread func ret.
std::future<uint64_t> result = std::async(std::launch::async, parse_mtr_log_worker, sys_pop_buff.begin()->first,
from_ptr, from_ptr + size, size);
// gets the result from worker thread.
auto ret_lsn = result.get();
mutex_enter(&(log_sys->rapid_populator_mutex));
ut_a(sys_pop_buff.find(ret_lsn) != sys_pop_buff.end());
sys_pop_buff.erase(ret_lsn);
mutex_exit(&(log_sys->rapid_populator_mutex));
// to reduce the pop data size.
sys_pop_data_sz.fetch_sub(size);
}

sys_pop_started.store(false, std::memory_order_seq_cst);
Expand All @@ -149,20 +172,25 @@ bool Populator::log_pop_thread_is_active() { return thread_is_active(srv_threads

bool Populator::log_pop_worker_is_active() { return true; }

void Populator::send_propagation_notify() { os_event_set(log_sys->rapid_events[0]); }

void Populator::start_change_populate_threads() {
if (!Populator::log_pop_thread_is_active() && shannon_loaded_tables->size()) {
srv_threads.m_change_pop = os_thread_create(rapid_populate_thread_key, 0, parse_log_func_main, log_sys);
ShannonBase::Populate::sys_pop_started = true;
srv_threads.m_change_pop.start();
assert(log_pop_thread_is_active());
}
}

void Populator::end_change_populate_threads() {
if (Populator::log_pop_thread_is_active() && !shannon_loaded_tables->size()) {
// os_event_reset(log_sys->rapid_events[0]);
if (Populator::log_pop_thread_is_active() && shannon_loaded_tables->size()) {
sys_pop_started.store(false, std::memory_order_seq_cst);
sys_rapid_loop_count = 0;
os_event_set(log_sys->rapid_events[0]);
srv_threads.m_change_pop.join();
sys_rapid_loop_count = 0;
ShannonBase::Populate::sys_pop_started = false;
assert(log_pop_thread_is_active() == false);
}
}

Expand Down
2 changes: 2 additions & 0 deletions storage/rapid_engine/populate/populate.h
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,8 @@ class Populator {
static void rapid_print_thread_info(FILE *file);
// to check whether the specific table are still do populating.
static bool check_population_status(std::string &table_name);
// to send notify to populator main thread to start do propagation.
static void send_propagation_notify();
};

} // namespace Populate
Expand Down

0 comments on commit 21033e2

Please sign in to comment.