Skip to content

Commit

Permalink
feat(rapid): to enable purge
Browse files Browse the repository at this point in the history
  • Loading branch information
ShannonBase committed Jan 17, 2025
1 parent 431dc58 commit cbedcd7
Show file tree
Hide file tree
Showing 15 changed files with 210 additions and 42 deletions.
2 changes: 1 addition & 1 deletion sql/mysqld.cc
Original file line number Diff line number Diff line change
Expand Up @@ -9726,7 +9726,7 @@ static int show_rapid_change_propagation(THD * /*unused*/, SHOW_VAR *var,
var->type = SHOW_BOOL;
var->value = buff;
*(pointer_cast<bool *>(buff)) =
ShannonBase::Populate::Populator::log_pop_thread_is_active();
ShannonBase::Populate::Populator::active();
return 0;
}

Expand Down
4 changes: 2 additions & 2 deletions sql/sql_table.cc
Original file line number Diff line number Diff line change
Expand Up @@ -11682,10 +11682,10 @@ bool Sql_cmd_secondary_load_unload::mysql_secondary_load_or_unload(
&skip_metadata_update))
return true;
//start population thread if table loaded successfully.
ShannonBase::Populate::Populator::start_change_populate_threads();
ShannonBase::Populate::Populator::start();
} else {
//at first, stop the main pop monitor thread.
ShannonBase::Populate::Populator::end_change_populate_threads();
ShannonBase::Populate::Populator::end();

if (DBUG_EVALUATE_IF("sim_secunload_fail",
(my_error(ER_SECONDARY_ENGINE, MYF(0),
Expand Down
6 changes: 5 additions & 1 deletion storage/innobase/include/srv0srv.h
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,10 @@ struct Srv_threads {
IB_thread m_gtid_persister;

/** Thread for changes poping */
IB_thread m_change_pop;
IB_thread m_change_pop_cordinator;

/** Thread for rapid IMCS*/
IB_thread m_rapid_purg_cordinator;
#ifdef UNIV_DEBUG
/** Used in test scenario to delay threads' cleanup until the pre_dd_shutdown
is ended and final plugin's shutdown is started (when plugin is DELETED).
Expand Down Expand Up @@ -826,6 +829,7 @@ extern mysql_pfs_key_t srv_ts_alter_encrypt_thread_key;
extern mysql_pfs_key_t parallel_read_thread_key;
extern mysql_pfs_key_t parallel_rseg_init_thread_key;
extern mysql_pfs_key_t rapid_populate_thread_key;
extern mysql_pfs_key_t rapid_purge_thread_key;
#endif /* UNIV_PFS_THREAD */
#endif /* !UNIV_HOTBACKUP */

Expand Down
2 changes: 1 addition & 1 deletion storage/innobase/mtr/mtr0mtr.cc
Original file line number Diff line number Diff line change
Expand Up @@ -989,7 +989,7 @@ void mtr_t::Command::execute() {
ut_ad(write_log.m_lsn == handle.end_lsn);

if (srv_shutdown_state.load() == SRV_SHUTDOWN_NONE &&
ShannonBase::Populate::Populator::log_pop_thread_is_active() &&
ShannonBase::Populate::Populator::active() &&
!recv_recovery_is_on() &&
m_impl->m_log_mode == MTR_LOG_ALL_WITH_POP) {
//after each of block copied to log.buf without holes, then cpy to pop.
Expand Down
2 changes: 1 addition & 1 deletion storage/innobase/srv/srv0srv.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1563,7 +1563,7 @@ bool srv_printf_innodb_monitor(FILE *file, bool nowait, ulint *trx_start_pos,
"RAPID\n"
"-----\n",
file);
ShannonBase::Populate::Populator::rapid_print_thread_info(file);
ShannonBase::Populate::Populator::print_info(file);

fputs(
"----------------------------\n"
Expand Down
6 changes: 5 additions & 1 deletion storage/innobase/srv/srv0start.cc
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,8 @@ this program; if not, write to the Free Software Foundation, Inc.,
#include "ut0new.h"

#include "storage/rapid_engine/populate/populate.h"
#include "storage/rapid_engine/populate/purge.h"

/** fil_space_t::flags for hard-coded tablespaces */
extern uint32_t predefined_flags;

Expand Down Expand Up @@ -2598,12 +2600,14 @@ bool srv_shutdown_waits_for_rollback_of_recovered_transactions() {
}

static void srv_shutdown_pop_stop() {
while (ShannonBase::Populate::Populator::log_pop_thread_is_active()) {
while (ShannonBase::Populate::Populator::active()) {
ShannonBase::Populate::sys_pop_started.store(false);
os_event_set(log_sys->rapid_events[0]);
std::this_thread::sleep_for(
std::chrono::microseconds(SHUTDOWN_SLEEP_TIME_US));
}

ShannonBase::Purge::sys_purge_started.store(false);
}

/** Shut down all InnoDB background tasks that may look up objects in
Expand Down
1 change: 1 addition & 0 deletions storage/rapid_engine/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ SET(SHANNON_IMCS_SOURCES
SET(SHANNON_POPULATE_SOURCES
populate/log_parser.cpp
populate/populate.cpp
populate/purge.cpp
)
SET (SHANNON_TRX_SOURCES
trx/transaction.cpp
Expand Down
7 changes: 3 additions & 4 deletions storage/rapid_engine/handler/ha_shannon_rapid.cc
Original file line number Diff line number Diff line change
Expand Up @@ -609,7 +609,7 @@ static bool RapidPrepareEstimateQueryCosts(THD *thd, LEX *lex) {
// primary_plan_info->query_expression()->first_query_block()->leaf_tables;
// for (Table_ref *tr = leaf_tables; tr != nullptr; tr = tr->next_leaf)
std::string db_tb;
if (ShannonBase::Populate::Populator::check_population_status(db_tb)) {
if (ShannonBase::Populate::Populator::check_status(db_tb)) {
SetSecondaryEngineOffloadFailedReason(thd, "table queried is populating.");
return true;
}
Expand Down Expand Up @@ -673,9 +673,8 @@ bool SecondaryEnginePrePrepareHook(THD *thd) {
return ShannonBase::Utils::Util::standard_cost_threshold_classifier(thd);
} else if (likely(thd->variables.rapid_use_dynamic_offload)) {
// 1: static sceanrio.
if (likely(!ShannonBase::Populate::Populator::log_pop_thread_is_active() ||
(ShannonBase::Populate::Populator::log_pop_thread_is_active() &&
ShannonBase::Populate::sys_pop_buff.empty()))) {
if (likely(!ShannonBase::Populate::Populator::active() ||
(ShannonBase::Populate::Populator::active() && ShannonBase::Populate::sys_pop_buff.empty()))) {
return ShannonBase::Utils::Util::decision_tree_classifier(thd);
} else {
// 2: dynamic scenario.
Expand Down
6 changes: 3 additions & 3 deletions storage/rapid_engine/include/rapid_arch_inf.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,9 @@
// cache line size
#define CACHE_LINE_SIZE 64

#define CACHE_L1_SIZE 32768
#define CACHE_L2_SIZE 524288
#define CACHE_L3_SIZE 8388608
#define CACHE_L1_SIZE
#define CACHE_L2_SIZE
#define CACHE_L3_SIZE

// clang-format off
/* prefetch
Expand Down
2 changes: 1 addition & 1 deletion storage/rapid_engine/optimizer/optimizer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ void OptimzieAccessPath(AccessPath *path, JOIN *join) {
path->using_batch_instr = true;
// this table is used by query and the table has been loaded into rapid engine. then start
// a propagation.
ShannonBase::Populate::Populator::send_propagation_notify();
ShannonBase::Populate::Populator::send_notify();
}
} break;
case AccessPath::HASH_JOIN: {
Expand Down
32 changes: 15 additions & 17 deletions storage/rapid_engine/populate/populate.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -168,33 +168,31 @@ static void parse_log_func_main(log_t *log_ptr) {
sys_pop_started.store(false, std::memory_order_seq_cst);
}

bool Populator::log_pop_thread_is_active() { return thread_is_active(srv_threads.m_change_pop); }
bool Populator::active() { return thread_is_active(srv_threads.m_change_pop_cordinator); }

bool Populator::log_pop_worker_is_active() { return true; }
void Populator::send_notify() { os_event_set(log_sys->rapid_events[0]); }

void Populator::send_propagation_notify() { os_event_set(log_sys->rapid_events[0]); }

void Populator::start_change_populate_threads() {
if (!Populator::log_pop_thread_is_active() && shannon_loaded_tables->size()) {
srv_threads.m_change_pop = os_thread_create(rapid_populate_thread_key, 0, parse_log_func_main, log_sys);
ShannonBase::Populate::sys_pop_started = true;
srv_threads.m_change_pop.start();
assert(log_pop_thread_is_active());
void Populator::start() {
if (!Populator::active() && shannon_loaded_tables->size()) {
srv_threads.m_change_pop_cordinator = os_thread_create(rapid_populate_thread_key, 0, parse_log_func_main, log_sys);
ShannonBase::Populate::sys_pop_started.store(true, std::memory_order_seq_cst);
srv_threads.m_change_pop_cordinator.start();
assert(Populator::active());
}
}

void Populator::end_change_populate_threads() {
if (Populator::log_pop_thread_is_active() && shannon_loaded_tables->size()) {
void Populator::end() {
if (Populator::active() && shannon_loaded_tables->size()) {
sys_pop_started.store(false, std::memory_order_seq_cst);
os_event_set(log_sys->rapid_events[0]);
srv_threads.m_change_pop.join();
srv_threads.m_change_pop_cordinator.join();
sys_rapid_loop_count = 0;
ShannonBase::Populate::sys_pop_started = false;
assert(log_pop_thread_is_active() == false);

assert(Populator::active() == false);
}
}

void Populator::rapid_print_thread_info(FILE *file) { /* in: output stream */
void Populator::print_info(FILE *file) { /* in: output stream */
fprintf(file,
"rapid log pop thread : %s \n"
"rapid log pop thread loops: " ULINTPF
Expand All @@ -206,7 +204,7 @@ void Populator::rapid_print_thread_info(FILE *file) { /* in: output stream */
ShannonBase::Populate::sys_pop_data_sz / 1024, ShannonBase::Populate::sys_pop_buff.size());
}

bool Populator::check_population_status(std::string &table_name) { return false; }
bool Populator::check_status(std::string &table_name) { return false; }

} // namespace Populate
} // namespace ShannonBase
14 changes: 6 additions & 8 deletions storage/rapid_engine/populate/populate.h
Original file line number Diff line number Diff line change
Expand Up @@ -103,19 +103,17 @@ extern std::unordered_map<uint64_t, mtr_log_rec> sys_pop_buff;
class Populator {
public:
// whether the log pop main thread is active or not. true is alive, false dead.
static bool log_pop_thread_is_active();
// to return whether there're('s) log pop worker(s) activelly.
static bool log_pop_worker_is_active();
static bool active();
// to launch log pop main thread.
static void start_change_populate_threads();
static void start();
// to stop lop pop main thread.
static void end_change_populate_threads();
static void end();
// to print thread infos.
static void rapid_print_thread_info(FILE *file);
static void print_info(FILE *file);
// to check whether the specific table are still do populating.
static bool check_population_status(std::string &table_name);
static bool check_status(std::string &table_name);
// to send notify to populator main thread to start do propagation.
static void send_propagation_notify();
static void send_notify();
};

} // namespace Populate
Expand Down
109 changes: 109 additions & 0 deletions storage/rapid_engine/populate/purge.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
/**
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License, version 2.0,
as published by the Free Software Foundation.

This program is also distributed with certain software (including
but not limited to OpenSSL) that is licensed under separate terms,
as designated in a particular file or component or in included license
documentation. The authors of MySQL hereby grant you an additional
permission to link the program and your derivative works with the
separately licensed software that they have included with MySQL.

This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License, version 2.0, for more details.

You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA

The fundmental code for imcs. The chunk is used to store the data which
transfer from row-based format to column-based format.

Copyright (c) 2023, 2024, 2025 Shannon Data AI and/or its affiliates.

The fundmental code for imcs. The purge is used to gc the unused data by any inactive
transaction. it's usually deleted data.
*/
#if !defined(_WIN32)
#include <pthread.h> // For pthread_setname_np
#else
#include <Windows.h> // For SetThreadDescription
#endif
#include <chrono>
#include <future>
#include <mutex>
#include <sstream>
#include <thread>

#include "current_thd.h"
#include "include/os0event.h"
#include "sql/sql_class.h"
#include "storage/innobase/include/os0thread-create.h"

#include "storage/innobase/include/srv0shutdown.h"
#include "storage/innobase/include/srv0srv.h"

#include "storage/rapid_engine/imcs/imcs.h"
#include "storage/rapid_engine/populate/purge.h"

#ifdef UNIV_PFS_THREAD
mysql_pfs_key_t rapid_purge_thread_key;
#endif /* UNIV_PFS_THREAD */

namespace ShannonBase {
namespace Purge {

std::atomic<bool> sys_purge_started{false};

static void purge_func_main() {
while (srv_shutdown_state.load(std::memory_order_acquire) == SRV_SHUTDOWN_NONE &&
sys_purge_started.load(std::memory_order_acquire)) {
// thread stopped.
if (unlikely(!sys_purge_started.load(std::memory_order_acquire))) break;
}

sys_purge_started.store(false, std::memory_order_seq_cst);
}

void start_rapid_purge_threads() {
if (!Purger::active()) {
srv_threads.m_rapid_purg_cordinator = os_thread_create(rapid_purge_thread_key, 0, purge_func_main);
ShannonBase::Purge::sys_purge_started.store(false, std::memory_order_seq_cst);
srv_threads.m_rapid_purg_cordinator.start();
}
}

bool Purger::active() { return thread_is_active(srv_threads.m_rapid_purg_cordinator); }

void Purger::send_notify() { os_event_set(log_sys->rapid_events[0]); }

void Purger::start() {
if (!Purger::active()) {
srv_threads.m_rapid_purg_cordinator = os_thread_create(rapid_purge_thread_key, 0, purge_func_main);
ShannonBase::Purge::sys_purge_started.store(false, std::memory_order_seq_cst);
srv_threads.m_rapid_purg_cordinator.start();
}
}

void Purger::end() {
if (Purger::active()) {
sys_purge_started.store(false, std::memory_order_seq_cst);
os_event_set(log_sys->rapid_events[0]);
srv_threads.m_rapid_purg_cordinator.join();

assert(Purger::active() == false);

srv_threads.m_rapid_purg_cordinator.join();
}
}

void Purger::print_info(FILE *file) { /* in: output stream */
}

bool Purger::check_status(std::string &table_name) { return false; }

} // namespace Purge
} // namespace ShannonBase
55 changes: 55 additions & 0 deletions storage/rapid_engine/populate/purge.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/**
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License, version 2.0,
as published by the Free Software Foundation.

This program is also distributed with certain software (including
but not limited to OpenSSL) that is licensed under separate terms,
as designated in a particular file or component or in included license
documentation. The authors of MySQL hereby grant you an additional
permission to link the program and your derivative works with the
separately licensed software that they have included with MySQL.

This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License, version 2.0, for more details.

You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA

The fundmental code for imcs. The chunk is used to store the data which
transfer from row-based format to column-based format.

Copyright (c) 2023, 2024, 2025 Shannon Data AI and/or its affiliates.

The fundmental code for imcs. The purge is used to gc the unused data by any inactive
transaction. it's usually deleted data.
*/
#ifndef __SHANNONBASE_PURGE_H__
#define __SHANNONBASE_PURGE_H__
namespace ShannonBase {
namespace Purge {

extern std::atomic<bool> sys_purge_started;

class Purger {
public:
// whether the log pop main thread is active or not. true is alive, false dead.
static bool active();
// to launch log pop main thread.
static void start();
// to stop lop pop main thread.
static void end();
// to print thread infos.
static void print_info(FILE *file);
// to check whether the specific table are still do populating.
static bool check_status(std::string &table_name);
// to send notify to populator main thread to start do propagation.
static void send_notify();
};

} // namespace Purge
} // namespace ShannonBase
#endif // __SHANNONBASE_PURGE_H__
Loading

0 comments on commit cbedcd7

Please sign in to comment.