Skip to content

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat(shannon):refine pop thread perfomrance
Browse files Browse the repository at this point in the history
ShannonBase committed Jan 8, 2025
1 parent e8213e2 commit 6381caa
Showing 5 changed files with 25 additions and 23 deletions.
1 change: 1 addition & 0 deletions storage/rapid_engine/imcs/imcs.h
Original file line number Diff line number Diff line change
@@ -36,6 +36,7 @@
#include "my_inttypes.h"
#include "sql/field.h"
#include "sql/handler.h"

#include "storage/rapid_engine/compress/dictionary/dictionary.h"
#include "storage/rapid_engine/imcs/cu.h"
#include "storage/rapid_engine/include/rapid_const.h"
4 changes: 2 additions & 2 deletions storage/rapid_engine/optimizer/optimizer.cpp
Original file line number Diff line number Diff line change
@@ -28,8 +28,8 @@
#include "sql/iterators/basic_row_iterators.h"
#include "sql/iterators/hash_join_iterator.h" //HashJoinIterator
#include "sql/iterators/timing_iterator.h"
#include "sql/join_optimizer/access_path.h" //AccessPath
#include "sql/sql_class.h" //THD
#include "sql/join_optimizer/access_path.h"
#include "sql/sql_class.h"
#include "sql/sql_lex.h" //Query_expression
#include "sql/sql_optimizer.h" //JOIN
#include "storage/innobase/include/ut0dbg.h" //ut_a
10 changes: 7 additions & 3 deletions storage/rapid_engine/populate/log_commons.h
Original file line number Diff line number Diff line change
@@ -45,9 +45,13 @@
namespace ShannonBase {
namespace Populate {

// one spin time is 1 ms.
constexpr uint MAX_LOG_POP_SPIN_COUNT = 20000;
constexpr uint MAX_TIMEOUT_SPIN = 200;
/** Default value of spin delay (in spin rounds)
* 1000 spin round takes 4us, 25000 takes 1ms for busy waiting. therefore, 200ms means
* 5000000 spin rounds. for the more detail infor ref to : comment of
* `innodb_log_writer_spin_delay`.
*/
constexpr uint64 MAX_LOG_POP_SPINS = 5000000;
constexpr uint64 MAX_WAIT_TIMEOUT = 200;
/// jnk0le/Ring-Buffer
/*!
* \brief Lock free, with no wasted slots ringbuffer implementation
31 changes: 14 additions & 17 deletions storage/rapid_engine/populate/populate.cpp
Original file line number Diff line number Diff line change
@@ -59,8 +59,8 @@ std::unordered_map<uint64_t, mtr_log_rec> sys_pop_buff;

std::atomic<bool> sys_pop_started{false};
// how many data was in sys_pop_buff?
std::atomic<ulonglong> sys_pop_data_sz{0};
static ulint sys_rapid_loop_count{0};
std::atomic<uint64> sys_pop_data_sz{0};
static uint64 sys_rapid_loop_count{0};

/**
* return lsn_t, key of processing mtr_log_rec_t.
@@ -102,30 +102,30 @@ static uint64_t parse_mtr_log_worker(uint64_t start_lsn, const byte *start, cons
*/
static void parse_log_func_main(log_t *log_ptr) {
// here we have a notifiyer, start pop. ref: https://dev.mysql.com/doc/heatwave/en/mys-hw-change-propagation.html
while (srv_shutdown_state.load() == SRV_SHUTDOWN_NONE && sys_pop_started.load()) {
while (srv_shutdown_state.load(std::memory_order_acquire) == SRV_SHUTDOWN_NONE &&
sys_pop_started.load(std::memory_order_acquire)) {
auto stop_condition = [&](bool wait) {
if (sys_pop_data_sz >= SHANNON_POPULATION_HRESHOLD_SIZE || sys_pop_started.load() == false) {
if (sys_pop_data_sz.load(std::memory_order_acquire) >= SHANNON_POPULATION_HRESHOLD_SIZE) {
return true;
}

if (wait) {
if (unlikely(wait)) {
os_event_set(log_sys->rapid_events[0]);
return true;
}
return false;
};

// waiting until the new data coming in.
os_event_wait_for(log_ptr->rapid_events[0], MAX_LOG_POP_SPIN_COUNT, std::chrono::microseconds{MAX_TIMEOUT_SPIN},
// waiting until the 200ms reached and the incomming data in buffer more than 64MB.
os_event_wait_for(log_ptr->rapid_events[0], MAX_LOG_POP_SPINS, std::chrono::microseconds{MAX_WAIT_TIMEOUT},
stop_condition);

if (!sys_pop_started) break;

if (sys_pop_buff.empty()) continue;
os_event_reset(log_sys->rapid_events[0]);
if (unlikely(!sys_pop_started.load(std::memory_order_acquire))) break;
if (likely(sys_pop_buff.empty())) continue;

mutex_enter(&(log_sys->rapid_populator_mutex));
sys_rapid_loop_count++;
auto size = sys_pop_buff.begin()->second.size;
uint64 size = sys_pop_buff.begin()->second.size;
byte *from_ptr = sys_pop_buff.begin()->second.data.get();
mutex_exit(&(log_sys->rapid_populator_mutex));

@@ -140,8 +140,7 @@ static void parse_log_func_main(log_t *log_ptr) {
mutex_exit(&(log_sys->rapid_populator_mutex));
// to reduce the pop data size.
sys_pop_data_sz.fetch_sub(size);
os_event_reset(log_sys->rapid_events[0]);
} // wile(pop_started)
}

sys_pop_started.store(false, std::memory_order_seq_cst);
}
@@ -152,17 +151,15 @@ bool Populator::log_pop_worker_is_active() { return true; }

void Populator::start_change_populate_threads() {
if (!Populator::log_pop_thread_is_active() && shannon_loaded_tables->size()) {
os_event_reset(log_sys->rapid_events[0]);
srv_threads.m_change_pop = os_thread_create(rapid_populate_thread_key, 0, parse_log_func_main, log_sys);
ShannonBase::Populate::sys_pop_started = true;
os_event_set(log_sys->rapid_events[0]);
srv_threads.m_change_pop.start();
}
}

void Populator::end_change_populate_threads() {
if (Populator::log_pop_thread_is_active() && !shannon_loaded_tables->size()) {
os_event_reset(log_sys->rapid_events[0]);
// os_event_reset(log_sys->rapid_events[0]);
sys_pop_started.store(false, std::memory_order_seq_cst);
sys_rapid_loop_count = 0;
srv_threads.m_change_pop.join();
2 changes: 1 addition & 1 deletion storage/rapid_engine/populate/populate.h
Original file line number Diff line number Diff line change
@@ -80,7 +80,7 @@ typedef struct mtr_log_rec_t {
} mtr_log_rec;

// pop change buffer size.
extern std::atomic<ulonglong> sys_pop_data_sz;
extern std::atomic<uint64> sys_pop_data_sz;
// flag of pop change thread. true is running, set to false to stop
extern std::atomic<bool> sys_pop_started;
/** a buffer to store redo log records, then parses these records. if we

0 comments on commit 6381caa

Please sign in to comment.