diff --git a/mysql-test/r/mysqld--help-notwin.result b/mysql-test/r/mysqld--help-notwin.result index bf2aea100a85..edede7440f3c 100644 --- a/mysql-test/r/mysqld--help-notwin.result +++ b/mysql-test/r/mysqld--help-notwin.result @@ -1911,6 +1911,16 @@ The following options may be given as the first argument: --wsrep-trx-fragment-unit=name Unit for streaming replication transaction fragments' size: bytes, rows, statements + --wsrep-use-async-monitor + Use Async Monitors to avoid deadlock of replicated + transactions in a multi-threaded replica. Deadlock is + possible when the PXC replica tries to commit the + replicated transactions in galera in a different order + than its order in the relay log. Use + 'wsrep_use_async_monitor' to avoid such deadlocks.This + variable is only allowed to be changed through command + line. + (Defaults to on; use --skip-wsrep-use-async-monitor to disable.) --xa-detach-on-prepare When set, XA transactions will be detached (AKA dissociated or disconnected) from connection as part of @@ -2435,6 +2445,7 @@ wsrep-start-position 00000000-0000-0000-0000-000000000000:-1 wsrep-sync-wait 0 wsrep-trx-fragment-size 0 wsrep-trx-fragment-unit bytes +wsrep-use-async-monitor TRUE xa-detach-on-prepare TRUE To see what values a running MySQL server is using, type diff --git a/mysql-test/suite/galera/r/galera_defaults.result b/mysql-test/suite/galera/r/galera_defaults.result index ccdfcfb43650..003d2b5494a6 100644 --- a/mysql-test/suite/galera/r/galera_defaults.result +++ b/mysql-test/suite/galera/r/galera_defaults.result @@ -56,6 +56,7 @@ WSREP_SST_METHOD xtrabackup-v2 WSREP_SYNC_WAIT 15 WSREP_TRX_FRAGMENT_SIZE 0 WSREP_TRX_FRAGMENT_UNIT bytes +WSREP_USE_ASYNC_MONITOR ON allocator.disk_pages_encryption = no; allocator.encryption_cache_page_size = 32K; allocator.encryption_cache_size = 16777216; ; ; ; cert.log_conflicts = no; cert.optimistic_pa = no; debug = no; evs.auto_evict = 0; evs.causal_keepalive_period = PT1S; evs.debug_log_mask = 0x1; evs.delay_margin = PT1S; evs.delayed_keep_period = PT30S; evs.inactive_check_period = PT0.5S; evs.inactive_timeout = PT30S; evs.info_log_mask = 0; evs.install_timeout = PT15S; evs.join_retrans_period = PT1S; evs.keepalive_period = PT1S; evs.max_install_timeouts = 3; evs.send_window = 10; evs.stats_report_period = PT1M; evs.suspect_timeout = PT12S; evs.use_aggregate = true; evs.user_send_window = 4; evs.version = 1; evs.view_forget_timeout = P1D; ; gcache.encryption = no; gcache.encryption_cache_page_size = 32K; gcache.encryption_cache_size = 16777216; gcache.freeze_purge_at_seqno = -1; gcache.keep_pages_count = 0; gcache.keep_pages_size = 0; gcache.mem_size = 0; ; gcache.page_size = 128M; gcache.recover = yes; gcache.size = 128M; gcomm.thread_prio = ; gcs.fc_auto_evict_threshold = 0.75; gcs.fc_auto_evict_window = 0; gcs.fc_debug = 0; gcs.fc_factor = 1.0; gcs.fc_limit = 100; gcs.fc_master_slave = no; gcs.fc_single_primary = no; gcs.max_packet_size = 64500; gcs.max_throttle = 0.25; ;gcs.recv_q_soft_limit = 0.25; gcs.sync_donor = no; ; gmcast.mcast_addr = ; gmcast.mcast_ttl = 1; gmcast.peer_timeout = PT10S; gmcast.segment = 0; gmcast.time_wait = PT5S; gmcast.version = 0; ; pc.announce_timeout = PT3S; pc.checksum = false; pc.ignore_quorum = false; pc.ignore_sb = false; pc.linger = PT20S; pc.npvo = false; pc.recovery = true; pc.version = 0; pc.wait_prim = true; pc.wait_prim_timeout = PT30S; pc.wait_restored_prim_timeout = PT0S; pc.weight = 1; protonet.backend = asio; protonet.version = 0; repl.causal_read_timeout = PT90S; repl.commit_order = 3; repl.key_format = FLAT8; repl.max_ws_size = 2147483647; ;socket.checksum = 2; socket.recv_buf_size = auto; socket.send_buf_size = auto; SELECT COUNT(*) FROM performance_schema.global_status WHERE VARIABLE_NAME LIKE 'wsrep_%' diff --git a/mysql-test/suite/galera/t/pxc_inconsistency_voting.test b/mysql-test/suite/galera/t/pxc_inconsistency_voting.test index 0c7c8cf9b0c4..b42785b347d4 100644 --- a/mysql-test/suite/galera/t/pxc_inconsistency_voting.test +++ b/mysql-test/suite/galera/t/pxc_inconsistency_voting.test @@ -14,14 +14,14 @@ ALTER TABLE t1 MODIFY c2 BIGINT UNSIGNED NOT NULL AUTO_INCREMENT ; # Assert that node1 logs the message in the language set above. --let $assert_text= Verify that node1 logs the message in the language set above ---let $assert_select= Member 0.* initiates vote on .* Un seul champ automatique est permis et il doit être indexé +--let $assert_select= Member .* initiates vote on .* Un seul champ automatique est permis et il doit être indexé --let $assert_file= $MYSQLTEST_VARDIR/log/mysqld.1.err --let $assert_count=1 --source include/assert_grep.inc # Assert that node2 logs the message in the default language. --let $assert_text= Verify that node2 logs the message in the default language ---let $assert_select= Member 1.* initiates vote on .* Incorrect table definition; there can be only one auto column and it must be defined as a key +--let $assert_select= Member .* initiates vote on .* Incorrect table definition; there can be only one auto column and it must be defined as a key --let $assert_file= $MYSQLTEST_VARDIR/log/mysqld.1.err --let $assert_count=1 diff --git a/mysql-test/suite/sys_vars/r/all_vars.result b/mysql-test/suite/sys_vars/r/all_vars.result index f624d03292eb..1449f3b18d84 100644 --- a/mysql-test/suite/sys_vars/r/all_vars.result +++ b/mysql-test/suite/sys_vars/r/all_vars.result @@ -275,6 +275,8 @@ wsrep_trx_fragment_size wsrep_trx_fragment_size wsrep_trx_fragment_unit wsrep_trx_fragment_unit +wsrep_use_async_monitor +wsrep_use_async_monitor xa_detach_on_prepare xa_detach_on_prepare drop table t1; diff --git a/sql/CMakeLists.txt b/sql/CMakeLists.txt index 337765c41033..7485fc030599 100644 --- a/sql/CMakeLists.txt +++ b/sql/CMakeLists.txt @@ -1148,6 +1148,7 @@ SET (RPL_REPLICA_SRCS rpl_trx_boundary_parser.cc udf_service_impl.cc udf_service_util.cc + wsrep_async_monitor.cc ) ADD_STATIC_LIBRARY(rpl_replica ${RPL_REPLICA_SRCS} DEPENDENCIES GenError diff --git a/sql/binlog.cc b/sql/binlog.cc index b31b5ab42d61..00728f85ddf2 100644 --- a/sql/binlog.cc +++ b/sql/binlog.cc @@ -8864,12 +8864,14 @@ TC_LOG::enum_result MYSQL_BIN_LOG::commit(THD *thd, bool all) { trans_commit_stmt()) the following call to my_error() will allow overwriting the error */ my_error(ER_TRANSACTION_ROLLBACK_DURING_COMMIT, MYF(0)); + thd_leave_async_monitor(thd); return RESULT_ABORTED; } int rc = ordered_commit(thd, all, skip_commit); if (run_wsrep_hooks) { + thd_leave_async_monitor(thd); wsrep_after_commit(thd, all); } diff --git a/sql/log_event.cc b/sql/log_event.cc index ff762cdd2653..c4b67590f3f2 100644 --- a/sql/log_event.cc +++ b/sql/log_event.cc @@ -168,6 +168,7 @@ #include "service_wsrep.h" #include "wsrep_mysqld.h" #include "wsrep_xid.h" +#include "sql/wsrep_async_monitor.h" #endif /* WITH_WSREP */ #define window_size Log_throttle::LOG_THROTTLE_WINDOW_SIZE @@ -2712,6 +2713,13 @@ Slave_worker *Log_event::get_slave_worker(Relay_log_info *rli) { Gtid_log_event *gtid_log_ev = static_cast(this); rli->started_processing(gtid_log_ev); +#ifdef WITH_WSREP + Wsrep_async_monitor *wsrep_async_monitor {rli->get_wsrep_async_monitor()}; + if (wsrep_async_monitor) { + auto seqno = gtid_log_ev->sequence_number; + wsrep_async_monitor->schedule(seqno); + } +#endif /* WITH_WSREP */ } if (schedule_next_event(this, rli)) { @@ -11233,6 +11241,21 @@ static enum_tbl_map_status check_table_map(Relay_log_info const *rli, } } +#ifdef WITH_WSREP + // This transaction is anyways going to be skipped. So skip the transaction + // in the async monitor as well + if (WSREP(rli->info_thd) && rli->info_thd->system_thread == SYSTEM_THREAD_SLAVE_WORKER + && !thd_is_wsrep_applier && res == FILTERED_OUT) { + Slave_worker *sw = static_cast(const_cast(rli)); + Wsrep_async_monitor *wsrep_async_monitor {sw->get_wsrep_async_monitor()}; + if (wsrep_async_monitor) { + auto seqno = sw->sequence_number(); + assert(seqno > 0); + wsrep_async_monitor->skip(seqno); + } + } +#endif /* WITH_WSREP */ + DBUG_PRINT("debug", ("check of table map ended up with: %u", res)); return res; diff --git a/sql/rpl_gtid_execution.cc b/sql/rpl_gtid_execution.cc index 2434f7eef34c..714bf8b3fad7 100644 --- a/sql/rpl_gtid_execution.cc +++ b/sql/rpl_gtid_execution.cc @@ -42,6 +42,10 @@ #include "sql/sql_lex.h" #include "sql/sql_parse.h" // stmt_causes_implicit_commit #include "sql/system_variables.h" +#ifdef WITH_WSREP +#include "sql/rpl_rli_pdb.h" // Slave_worker +#include "sql/wsrep_async_monitor.h" +#endif /* WITH_WSREP */ bool set_gtid_next(THD *thd, const Gtid_specification &spec) { DBUG_TRACE; @@ -355,6 +359,18 @@ static inline void skip_statement(THD *thd) { to notify that its session ticket was consumed. */ Commit_stage_manager::get_instance().finish_session_ticket(thd); +#ifdef WITH_WSREP + /* Despite the transaction was skipped, it needs to be updated in the Wsrep_async_monitor */ + if (thd->system_thread == SYSTEM_THREAD_SLAVE_WORKER) { + Slave_worker *sw = dynamic_cast(thd->rli_slave); + Wsrep_async_monitor *wsrep_async_monitor {sw->get_wsrep_async_monitor()}; + if (wsrep_async_monitor) { + auto seqno = sw->sequence_number(); + assert(seqno > 0); + wsrep_async_monitor->skip(seqno); + } + } +#endif /* WITH_WSREP */ #ifndef NDEBUG const Gtid_set *executed_gtids = gtid_state->get_executed_gtids(); diff --git a/sql/rpl_replica.cc b/sql/rpl_replica.cc index 1572e759402c..7641f3d4d3ba 100644 --- a/sql/rpl_replica.cc +++ b/sql/rpl_replica.cc @@ -172,6 +172,10 @@ #endif #include "scope_guard.h" +#ifdef WITH_WSREP +#include "sql/wsrep_async_monitor.h" +#endif /* WITH_WSREP */ + struct mysql_cond_t; struct mysql_mutex_t; @@ -7228,6 +7232,9 @@ extern "C" void *handle_slave_sql(void *arg) { bool mts_inited = false; Global_THD_manager *thd_manager = Global_THD_manager::get_instance(); Commit_order_manager *commit_order_mngr = nullptr; +#ifdef WITH_WSREP + Wsrep_async_monitor *wsrep_async_monitor = nullptr; +#endif /* WITH_WSREP */ Rpl_applier_reader applier_reader(rli); Relay_log_info::enum_priv_checks_status priv_check_status = Relay_log_info::enum_priv_checks_status::SUCCESS; @@ -7273,6 +7280,17 @@ wsrep_restart_point : rli->set_commit_order_manager(commit_order_mngr); +#ifdef WITH_WSREP + // Restore the last executed seqno + if (WSREP_ON && wsrep_use_async_monitor + && opt_replica_preserve_commit_order + && !rli->is_parallel_exec() + && rli->opt_replica_parallel_workers > 1) { + wsrep_async_monitor = new Wsrep_async_monitor(rli->opt_replica_parallel_workers); + rli->set_wsrep_async_monitor(wsrep_async_monitor); + } +#endif /* WITH_WSREP */ + if (channel_map.is_group_replication_channel_name(rli->get_channel())) { if (channel_map.is_group_replication_channel_name(rli->get_channel(), true)) { @@ -7700,6 +7718,13 @@ wsrep_restart_point : delete commit_order_mngr; } +#ifdef WITH_WSREP + if (wsrep_async_monitor) { + rli->set_wsrep_async_monitor(nullptr); + delete wsrep_async_monitor; + } +#endif /* WITH_WSREP */ + mysql_mutex_unlock(&rli->info_thd_lock); set_thd_in_use_temporary_tables( rli); // (re)set info_thd in use for saved temp tables diff --git a/sql/rpl_rli.cc b/sql/rpl_rli.cc index dfda8bbf2cc7..23c7e622e466 100644 --- a/sql/rpl_rli.cc +++ b/sql/rpl_rli.cc @@ -187,6 +187,9 @@ Relay_log_info::Relay_log_info(bool is_slave_recovery, reported_unsafe_warning(false), rli_description_event(nullptr), commit_order_mngr(nullptr), +#ifdef WITH_WSREP + wsrep_async_monitor(nullptr), +#endif /* WITH_WSREP */ sql_delay(0), sql_delay_end(0), m_flags(0), diff --git a/sql/rpl_rli.h b/sql/rpl_rli.h index 373df411c160..8e52892c03f7 100644 --- a/sql/rpl_rli.h +++ b/sql/rpl_rli.h @@ -75,6 +75,10 @@ class String; struct LEX_MASTER_INFO; struct db_worker_hash_entry; +#ifdef WITH_WSREP +class Wsrep_async_monitor; +#endif /* WITH_WSREP */ + extern uint sql_replica_skip_counter; typedef Prealloced_array Slave_worker_array; @@ -1778,6 +1782,15 @@ class Relay_log_info : public Rpl_info { commit_order_mngr = mngr; } +#ifdef WITH_WSREP + Wsrep_async_monitor* get_wsrep_async_monitor() { + return wsrep_async_monitor; + } + void set_wsrep_async_monitor(Wsrep_async_monitor *monitor) { + wsrep_async_monitor = monitor; + } +#endif /* WITH_WSREP */ + /* Following set function is required to initialize the 'until_option' during MTS relay log recovery process. @@ -1835,6 +1848,12 @@ class Relay_log_info : public Rpl_info { */ Commit_order_manager *commit_order_mngr; +#ifdef WITH_WSREP + /* + Wsrep_async_monitor orders DMLs and DDls in galera. + */ + Wsrep_async_monitor *wsrep_async_monitor; +#endif /* WITH_WSREP */ /** Delay slave SQL thread by this amount of seconds. The delay is applied per transaction and based on the immediate master's diff --git a/sql/rpl_rli_pdb.cc b/sql/rpl_rli_pdb.cc index 8268671ed33a..616de611e19f 100644 --- a/sql/rpl_rli_pdb.cc +++ b/sql/rpl_rli_pdb.cc @@ -327,6 +327,9 @@ int Slave_worker::init_worker(Relay_log_info *rli, ulong i) { this->set_require_row_format(rli->is_row_format_required()); set_commit_order_manager(c_rli->get_commit_order_manager()); +#ifdef WITH_WSREP + set_wsrep_async_monitor(c_rli->get_wsrep_async_monitor()); +#endif /* WITH_WSREP */ if (rli_init_info(false) || DBUG_EVALUATE_IF("inject_init_worker_init_info_fault", true, false)) diff --git a/sql/sql_parse.cc b/sql/sql_parse.cc index 54e74bfe3b45..ce2fd70ba33e 100644 --- a/sql/sql_parse.cc +++ b/sql/sql_parse.cc @@ -190,6 +190,7 @@ #endif /* WITH_LOCK_ORDER */ #ifdef WITH_WSREP +#include "wsrep_async_monitor.h" #include "wsrep_binlog.h" #include "wsrep_mysqld.h" #include "wsrep_sst.h" @@ -307,10 +308,27 @@ bool all_tables_not_ok(THD *thd, Table_ref *tables) { if (WSREP(thd) && thd->wsrep_applier && wsrep_check_mode(WSREP_MODE_IGNORE_NATIVE_REPLICATION_FILTER_RULES)) return false; -#endif + + bool ret = rpl_filter->is_on() && tables && !thd->sp_runtime_ctx && + !rpl_filter->tables_ok(thd->db().str, tables); + // This transaction is anyways going to be skipped. So skip the transaction + // in the async monitor as well + if (ret && WSREP(thd) && thd->system_thread == SYSTEM_THREAD_SLAVE_WORKER && + !thd->wsrep_applier) { + Slave_worker *sw = dynamic_cast(thd->rli_slave); + Wsrep_async_monitor *wsrep_async_monitor{sw->get_wsrep_async_monitor()}; + if (wsrep_async_monitor) { + auto seqno = sw->sequence_number(); + assert(seqno > 0); + wsrep_async_monitor->skip(seqno); + } + } + return ret; +#else return rpl_filter->is_on() && tables && !thd->sp_runtime_ctx && !rpl_filter->tables_ok(thd->db().str, tables); +#endif } bool is_normal_transaction_boundary_stmt(enum_sql_command sql_cmd) { @@ -394,6 +412,21 @@ inline bool check_database_filters(THD *thd, const char *db, break; } } + +#ifdef WITH_WSREP + // This transaction is anyways going to be skipped. So skip the transaction + // in the async monitor as well + if (WSREP(thd) && thd->system_thread == SYSTEM_THREAD_SLAVE_WORKER + && !thd->wsrep_applier && !db_ok) { + Slave_worker *sw = dynamic_cast(thd->rli_slave); + Wsrep_async_monitor *wsrep_async_monitor {sw->get_wsrep_async_monitor()}; + if (wsrep_async_monitor) { + auto seqno = sw->sequence_number(); + assert(seqno > 0); + wsrep_async_monitor->skip(seqno); + } + } +#endif /* WITH_WSREP */ return db_ok; } diff --git a/sql/sys_vars.cc b/sql/sys_vars.cc index fb9594197bde..f2396d6634e1 100644 --- a/sql/sys_vars.cc +++ b/sql/sys_vars.cc @@ -8846,6 +8846,15 @@ static Sys_var_enum Sys_wsrep_disk_pages_encrypt( READ_ONLY GLOBAL_VAR(wsrep_disk_pages_encrypt), CMD_LINE(OPT_ARG), wsrep_encrypt_modes, DEFAULT(WSREP_ENCRYPT_MODE_NONE), NO_MUTEX_GUARD, NOT_IN_BINLOG); +static Sys_var_bool Sys_wsrep_async_monitor( + "wsrep_use_async_monitor", + "Use Async Monitors to avoid deadlock of replicated transactions in a multi-threaded replica. " + "Deadlock is possible when the PXC replica tries to commit the replicated transactions in galera " + "in a different order than its order in the relay log. Use 'wsrep_use_async_monitor' to avoid " + "such deadlocks." + "This variable is only allowed to be changed through command line.", + READ_ONLY GLOBAL_VAR(wsrep_use_async_monitor), CMD_LINE(OPT_ARG), + DEFAULT(true), NO_MUTEX_GUARD, NOT_IN_BINLOG); #endif /* WITH_WSREP */ static bool check_set_require_row_format(sys_var *, THD *thd, set_var *var) { diff --git a/sql/wsrep_async_monitor.cc b/sql/wsrep_async_monitor.cc new file mode 100644 index 000000000000..73dbfa605326 --- /dev/null +++ b/sql/wsrep_async_monitor.cc @@ -0,0 +1,144 @@ +/* Copyright (c) 2024 Percona LLC and/or its affiliates. All rights reserved. + + This program is free software; you can redistribute it and/or + modify it under the terms of the GNU General Public License + as published by the Free Software Foundation; version 2 of + the License. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */ + +#ifdef WITH_WSREP +#include "sql/mysqld.h" +#include "sql/wsrep_async_monitor.h" +#include + +// Method for main thread to add scheduled seqnos +void Wsrep_async_monitor::schedule(seqno_t seqno) { + std::unique_lock lock(m_mutex); + scheduled_seqnos.push(seqno); +} + +// Method for both DDL and DML to enter the monitor +void Wsrep_async_monitor::enter(seqno_t seqno) { + std::unique_lock lock(m_mutex); + + // Don't wait if it is a skipped seqno + if (skipped_seqnos.count(seqno) > 0) return; + + // Wait until this transaction is at the head of the scheduled queue + m_cond.wait(lock, [this, seqno] { + + // Here we need to remove skipped transactions + + // Imagine a scenario where scheduled seqnos is 1,2(skip),3 and threads enter in + // an out of order manner. + // + // - 3 enters the monitor first, since it is not in the front of the queue, it + // goes to cond_wait + // - 2 enters the monitor, adds 2 to skipped_seqnos + // - 1 enters the monitor, since it is in the front, it acquires the monitor, + // does its job and leaves the monitor by removing itself from the + // scheduled_seqnos and signals 3. + // - 3 wakes up, but it will see that 2 in the front of the + // queue. + // + // To proceed, it needs to remove all the skipped seqnos from the + // scheduled_seqnos queue. + while (!scheduled_seqnos.empty() && + skipped_seqnos.count(scheduled_seqnos.front()) > 0) { + scheduled_seqnos.pop(); + } + return !scheduled_seqnos.empty() && scheduled_seqnos.front() == seqno; + }); +} + +// Method to be called after DDL/DML processing is complete +void Wsrep_async_monitor::leave(seqno_t seqno) { + std::unique_lock lock(m_mutex); + + // Don't wait if it is a skipped seqno + if (skipped_seqnos.count(seqno) > 0) return; + + // Check if the sequence number matches the front of the queue. + // In a correctly functioning monitor this should always be true + // as each transaction should exit in the order it was scheduled + // and processed. + if (!scheduled_seqnos.empty() && scheduled_seqnos.front() == seqno) { + // Remove the seqno from the scheduled queue now that it has completed + scheduled_seqnos.pop(); + } else { + // std::cout << "Error: Mismatch in sequence numbers. Expected " + // << (scheduled_seqnos.empty() + // ? "none" + // : std::to_string(scheduled_seqnos.front())) + // << " but got " << seqno << "." << std::endl; + assert(false && "Sequence number mismatch in leave()"); + unireg_abort(1); + } + + // Remove seqnos from skipped_seqnos. + // Note: Here we remove all seqnos less than `seqno - m_workers_count` to + // ensure that at most the skipped_seqnos has `m_workers_count` items. + // + // Example: when there are 4 workers and if seqno 15 enters, this will remove + // all entries less than 11 + seqno_t remove_upto = + (seqno > m_workers_count) ? (seqno - m_workers_count) : 0; + for (auto it = skipped_seqnos.begin(); + it != skipped_seqnos.end() && *it <= remove_upto; + /* No increment here*/) { + it = skipped_seqnos.erase(it); + } + + // Notify waiting threads in case the next scheduled sequence can enter + m_cond.notify_all(); +} + +// Method to skip a transaction that will not call enter() and leave() +void Wsrep_async_monitor::skip(seqno_t seqno) { + std::unique_lock lock(m_mutex); + + // Check if the seqno is already marked as skipped + if (skipped_seqnos.count(seqno) > 0) { + return; // Already skipped, so do nothing + } + + // Mark the seqno as skipped + skipped_seqnos.insert(seqno); + + // Remove it from the scheduled queue if it is at the front + if (!scheduled_seqnos.empty() && scheduled_seqnos.front() == seqno) { + scheduled_seqnos.pop(); + } + + // Remove seqnos from skipped_seqnos. + // Note: Here we remove all seqnos less than `seqno - m_workers_count` to + // ensure that at most the skipped_seqnos has `m_workers_count` items. + // + // Example: when there are 4 workers and if seqno 15 enters, this will remove + // all entries less than 11 + seqno_t remove_upto = + (seqno > m_workers_count) ? (seqno - m_workers_count) : 0; + for (auto it = skipped_seqnos.begin(); + it != skipped_seqnos.end() && *it <= remove_upto; + /* No increment here*/) { + it = skipped_seqnos.erase(it); + } + + // Notify in case other transactions are waiting to enter + m_cond.notify_all(); +} + +// Method to return if the monitor is empty, used by the unittests +bool Wsrep_async_monitor::is_empty() { + std::unique_lock lock(m_mutex); + return scheduled_seqnos.empty(); +} +#endif /* WITH_WSREP */ diff --git a/sql/wsrep_async_monitor.h b/sql/wsrep_async_monitor.h new file mode 100644 index 000000000000..299e59c47cf2 --- /dev/null +++ b/sql/wsrep_async_monitor.h @@ -0,0 +1,58 @@ +/* Copyright (c) 2024 Percona LLC and/or its affiliates. All rights reserved. + + This program is free software; you can redistribute it and/or + modify it under the terms of the GNU General Public License + as published by the Free Software Foundation; version 2 of + the License. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */ + +#ifndef WSREP_ASYNC_MONITOR_H +#define WSREP_ASYNC_MONITOR_H + +#ifdef WITH_WSREP +#include +#include +#include +#include +#include + +class Wsrep_async_monitor { + public: + Wsrep_async_monitor(unsigned long count) : m_workers_count(count) {} + + using seqno_t = unsigned long long; + + // Method for main thread to add scheduled seqnos + void schedule(seqno_t seqno); + + // Method for both DDL and DML to enter the monitor + void enter(seqno_t seqno); + + // Method to be called after DDL/DML processing is complete + void leave(seqno_t seqno); + + // Method to skip a transaction that will not call enter() and leave() + void skip(seqno_t seqno); + + // Method to return if the monitor is empty, used by the unittests + bool is_empty(); + + private: + unsigned long m_workers_count; + std::mutex m_mutex; + std::condition_variable m_cond; + std::set skipped_seqnos; // Tracks skipped sequence numbers + std::queue scheduled_seqnos; // Queue to track scheduled seqnos +}; +; + +#endif /* WITH_WSREP */ +#endif /* WSREP_ASYNC_MONITOR_H */ diff --git a/sql/wsrep_mysqld.cc b/sql/wsrep_mysqld.cc index 1527eee14e99..c52e4eedc0cd 100644 --- a/sql/wsrep_mysqld.cc +++ b/sql/wsrep_mysqld.cc @@ -44,6 +44,7 @@ #include "sp_head.h" #include "sql_base.h" // TEMP_PREFIX #include "wsrep_applier.h" +#include "wsrep_async_monitor.h" #include "wsrep_binlog.h" #include "wsrep_priv.h" #include "wsrep_sst.h" @@ -150,6 +151,9 @@ bool pxc_encrypt_cluster_traffic = 0; ulong wsrep_gcache_encrypt = WSREP_ENCRYPT_MODE_NONE; ulong wsrep_disk_pages_encrypt = WSREP_ENCRYPT_MODE_NONE; +/* Enables Async Monitors to avoid TOI deadlocks */ +bool wsrep_use_async_monitor = true; + /* force flush of error message if error is detected at early stage during SST or other initialization. */ bool pxc_force_flush_error_message = false; @@ -2870,6 +2874,42 @@ static void wsrep_RSU_end(THD *thd) { thd->disable_binlog_guard.reset(); } +void thd_enter_async_monitor(THD* thd) { + + // Only replica worker threads are allowed to enter + if (thd->system_thread == SYSTEM_THREAD_SLAVE_WORKER) { + + // If the thread is already killed, leave it to the called to handle it. + if (thd->killed != THD::NOT_KILLED || thd->wsrep_applier) { + return; + } + Slave_worker *sw = dynamic_cast(thd->rli_slave); + Wsrep_async_monitor *wsrep_async_monitor {sw->get_wsrep_async_monitor()}; + if (wsrep_async_monitor) { + auto seqno = sw->sequence_number(); + assert(seqno > 0); + // TODO: If requied, we must set current_mutex and current_cond here + // i.e, thd->enter_cond(); + wsrep_async_monitor->enter(seqno); + } + } +} + +void thd_leave_async_monitor(THD* thd) { + + if (thd->wsrep_applier) return; + + if (thd->system_thread == SYSTEM_THREAD_SLAVE_WORKER) { + Slave_worker * sw = dynamic_cast(thd->rli_slave); + Wsrep_async_monitor *wsrep_async_monitor {sw->get_wsrep_async_monitor()}; + if (wsrep_async_monitor) { + auto seqno = sw->sequence_number(); + assert(seqno > 0); + wsrep_async_monitor->leave(seqno); + } + } +} + int wsrep_to_isolation_begin(THD *thd, const char *db_, const char *table_, const Table_ref *table_list, dd::Tablespace_table_ref_vec *trefs, @@ -2966,6 +3006,8 @@ int wsrep_to_isolation_begin(THD *thd, const char *db_, const char *table_, thd->variables.auto_increment_increment = 1; } + thd_enter_async_monitor(thd); + DEBUG_SYNC(thd, "wsrep_to_isolation_begin_before_replication"); if (thd->variables.wsrep_on && wsrep_thd_is_local(thd)) { @@ -3039,6 +3081,8 @@ void wsrep_to_isolation_end(THD *thd) { } if (wsrep_emulate_bin_log) wsrep_thd_binlog_trx_reset(thd); + thd_leave_async_monitor(thd); + DEBUG_SYNC(thd, "wsrep_to_isolation_end_before_wsrep_skip_wsrep_hton"); mysql_mutex_lock(&thd->LOCK_thd_data); diff --git a/sql/wsrep_mysqld.h b/sql/wsrep_mysqld.h index f8542f6dd9f6..3897d7fb455d 100644 --- a/sql/wsrep_mysqld.h +++ b/sql/wsrep_mysqld.h @@ -167,6 +167,7 @@ enum enump_wsrep_encrypt_modes { }; extern ulong wsrep_gcache_encrypt; extern ulong wsrep_disk_pages_encrypt; +extern bool wsrep_use_async_monitor; extern bool pxc_force_flush_error_message; @@ -443,6 +444,9 @@ extern PSI_thread_key key_THREAD_wsrep_post_rollbacker; extern PSI_file_key key_file_wsrep_gra_log; #endif /* HAVE_PSI_INTERFACE */ +void thd_enter_async_monitor(THD* thd); +void thd_leave_async_monitor(THD* thd); + class Alter_info; int wsrep_to_isolation_begin(THD *thd, const char *db_, const char *table_, const Table_ref *table_list, diff --git a/sql/wsrep_trans_observer.h b/sql/wsrep_trans_observer.h index da2082cfa89c..c22340fcb443 100644 --- a/sql/wsrep_trans_observer.h +++ b/sql/wsrep_trans_observer.h @@ -296,6 +296,10 @@ static inline int wsrep_before_commit(THD *thd, bool all) { WSREP_DEBUG("wsrep_before_commit: %d, %lld", wsrep_is_real(thd, all), (long long)wsrep_thd_trx_seqno(thd)); int ret = 0; + + /* Enter the async monitor */ + thd_enter_async_monitor(thd); + assert(wsrep_run_commit_hook(thd, all)); if ((ret = thd->wsrep_cs().before_commit()) == 0) { assert(!thd->wsrep_trx().ws_meta().gtid().is_undefined()); diff --git a/unittest/gunit/CMakeLists.txt b/unittest/gunit/CMakeLists.txt index d2867a78f339..a98164af6be8 100644 --- a/unittest/gunit/CMakeLists.txt +++ b/unittest/gunit/CMakeLists.txt @@ -464,6 +464,18 @@ MYSQL_ADD_EXECUTABLE(rpl_event_ctx-t rpl_event_ctx-t.cc LINK_LIBRARIES rpl_event_ctx_lib gunit_small ) +# WITH_WSREP +ADD_LIBRARY(wsrep_async_monitor_lib STATIC + ${CMAKE_SOURCE_DIR}/sql/wsrep_async_monitor.cc +) +TARGET_LINK_LIBRARIES(wsrep_async_monitor_lib) + +MYSQL_ADD_EXECUTABLE(wsrep_async_monitor-t wsrep_async_monitor-t.cc + ENABLE_EXPORTS + ADD_TEST wsrep_async_monitor + LINK_LIBRARIES wsrep_async_monitor_lib gunit_small +) + ADD_SUBDIRECTORY(ddl_rewriter) ADD_SUBDIRECTORY(innodb) ADD_SUBDIRECTORY(keyring) diff --git a/unittest/gunit/wsrep_async_monitor-t.cc b/unittest/gunit/wsrep_async_monitor-t.cc new file mode 100644 index 000000000000..72e7b4559f11 --- /dev/null +++ b/unittest/gunit/wsrep_async_monitor-t.cc @@ -0,0 +1,234 @@ +/* Copyright (c) 2024 Percona LLC and/or its affiliates. All rights reserved. + + This program is free software; you can redistribute it and/or + modify it under the terms of the GNU General Public License + as published by the Free Software Foundation; version 2 of + the License. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */ +#ifdef WITH_WSREP + +#include +#include +#include +#include "sql/wsrep_async_monitor.h" // Include the header file of your class + +using seqno_t = unsigned long long; + +class WsrepAsyncMonitorTest : public ::testing::Test { + protected: + Wsrep_async_monitor monitor{8}; + + void SetUp() override { + // Any setup can be done here, if needed + } + + void TearDown() override { + // Any teardown can be done here, if needed + } +}; + +// Helper function to simulate processing a transaction +void processThread(Wsrep_async_monitor &monitor, seqno_t seqno) { + monitor.enter(seqno); + // Simulate work by sleeping briefly + std::this_thread::sleep_for(std::chrono::milliseconds(50)); + monitor.leave(seqno); +} + +// Test that the monitor can handle single transactions (DML and DDL) +TEST_F(WsrepAsyncMonitorTest, SingleTransaction) { + monitor.schedule(1); + + std::thread t1(processThread, std::ref(monitor), 1); + t1.join(); + + // Check if the queue is empty after leaving + ASSERT_TRUE(monitor.is_empty()); +} + +// Test that monitor can handle sequence mismatch. +TEST_F(WsrepAsyncMonitorTest, LeaveSequenceNumberMismatch) { + seqno_t seqno1 = 1; + seqno_t seqno2 = 2; + monitor.schedule(seqno1); + monitor.enter(seqno1); + + // Check if the queue is not empty + ASSERT_TRUE(!monitor.is_empty()); + + // Expect an assertion failure or exit due to sequence number mismatch + ASSERT_DEATH(monitor.leave(seqno2), "Sequence number mismatch in leave()"); +} + +// Test that multiple transactions are processed in the correct sequence +TEST_F(WsrepAsyncMonitorTest, MultipleQueriesInSequence) { + monitor.schedule(1); + monitor.schedule(2); + monitor.schedule(3); + monitor.schedule(4); + + std::vector processed_seqnos; + auto transactionWorker = [&](seqno_t seqno, bool isDDL) { + monitor.enter(seqno); + processed_seqnos.push_back(seqno); + if (isDDL) { + // Simulate work by sleeping briefly + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + } + monitor.leave(seqno); + }; + + std::thread t1(transactionWorker, 1, true); // DML with seqno 1 + std::thread t2(transactionWorker, 2, false); // DDL with seqno 2 + std::thread t3(transactionWorker, 3, true); // DML with seqno 3 + std::thread t4(transactionWorker, 4, false); // DML with seqno 4 + + t1.join(); + t2.join(); + t3.join(); + t4.join(); + + EXPECT_EQ(processed_seqnos, std::vector({1, 2, 3, 4})); +} + +// Test that skipped transactions are not processed but do not block other +// transactions +TEST_F(WsrepAsyncMonitorTest, Skip) { + monitor.schedule(1); + monitor.schedule(2); + monitor.schedule(3); + monitor.schedule(4); + monitor.schedule(5); + monitor.schedule(6); + + std::vector processed_seqnos; + auto transactionWorker = [&](seqno_t seqno, bool isDDL) { + monitor.enter(seqno); + processed_seqnos.push_back(seqno); + if (isDDL) { + // Simulate work by sleeping briefly + std::this_thread::sleep_for(std::chrono::milliseconds(50)); + } + monitor.leave(seqno); + }; + auto transactionSkip = [&](seqno_t seqno) { monitor.skip(seqno); }; + + // Make sure that the logic works for out-of-order enter() calls + std::thread t1(transactionWorker, 3, false); // DML with seqno 3 + std::thread t2(transactionSkip, 2); // Skipped seqno 2 + + std::this_thread::sleep_for(std::chrono::milliseconds(500)); + std::thread t3(transactionWorker, 1, false); // DML with seqno 1 + + t1.join(); + t2.join(); + t3.join(); + + // Make sure that the logic works for out-of-order enter() calls + std::thread t4(transactionSkip, 5); // DML with seqno 3 + std::thread t5(transactionWorker, 6, true); // Skipped seqno 2 + + std::this_thread::sleep_for(std::chrono::milliseconds(500)); + std::thread t6(transactionWorker, 4, false); // DML with seqno 1 + + t4.join(); + t5.join(); + t6.join(); + EXPECT_EQ(processed_seqnos, std::vector({1, 3, 4, 6})); +} + +// Test that transactions can be skipped without blocking other transactions +// +// Imagine a scenario where scheduled seqnos is 1,2(skip),3 and threads enter in +// an out of order manner. +// +// - 3 enters the monitor first, since it is not in the front of the queue, it +// goes to cond_wait +// - 2 enters the monitor, adds 2 to skipped_seqnos +// - 1 enters the monitor, since it is in the front, it acquires the monitor, +// does its job and leaves the monitor by removing itself from the +// scheduled_seqnos and signals 3. +// - 3 wakes up, but it will see that 2 in the front of the +// queue. +// +// To proceed, it needs to remove all the skipped seqnos from the +// scheduled_seqnos queue. +TEST_F(WsrepAsyncMonitorTest, SkipThenEnterTransaction) { + monitor.schedule(1); + monitor.schedule(2); + monitor.schedule(3); + + monitor.skip(2); // Skip seqno 2 + + std::vector processed_seqnos; + std::mutex seqno_mutex; + + auto transactionWorker = [&](unsigned long seqno) { + monitor.enter(seqno); + // Dont process seqno 2 + if (seqno != 2) { + std::lock_guard lock(seqno_mutex); + processed_seqnos.push_back(seqno); + } + monitor.leave(seqno); + }; + + std::thread t1(transactionWorker, 1); // seqno 1 + std::thread t2(transactionWorker, + 2); // seqno 2 (should skip without blocking) + std::thread t3(transactionWorker, 3); // seqno 3 + + t1.join(); + t2.join(); + t3.join(); + + EXPECT_EQ(processed_seqnos, std::vector({1, 3})); +} + +// Test that transactions can be processed concurrently +TEST_F(WsrepAsyncMonitorTest, Concurrents) { + monitor.schedule(1); + monitor.schedule(2); + monitor.schedule(3); + monitor.schedule(4); + + std::vector processed_seqnos; + std::mutex seqno_mutex; + + auto transactionWorker = [&](seqno_t seqno, bool isDDL) { + monitor.enter(seqno); + { + std::lock_guard lock(seqno_mutex); + processed_seqnos.push_back(seqno); + if (isDDL) { + // Simulate work by sleeping briefly + std::this_thread::sleep_for(std::chrono::milliseconds(50)); + } + } + monitor.leave(seqno); + }; + + std::thread t1(transactionWorker, 4, false); // DML with seqno 4 + std::this_thread::sleep_for(std::chrono::milliseconds(50)); + std::thread t2(transactionWorker, 3, true); // DDL with seqno 3 + std::this_thread::sleep_for(std::chrono::milliseconds(50)); + std::thread t3(transactionWorker, 2, false); // DML with seqno 2 + std::this_thread::sleep_for(std::chrono::milliseconds(50)); + std::thread t4(transactionWorker, 1, true); // DDL with seqno 1 + + t1.join(); + t2.join(); + t3.join(); + t4.join(); + + EXPECT_EQ(processed_seqnos, std::vector({1, 2, 3, 4})); +} +#endif /* WITH_WSREP */