Skip to content

Commit

Permalink
Merge pull request #1927 from venkatesh-prasad-v/8.0-PXC-4173-monitor
Browse files Browse the repository at this point in the history
PXC-4173: PXC node stalls with parallel replication workers executing DDLs via async node
  • Loading branch information
venkatesh-prasad-v authored Jan 6, 2025
2 parents 03819be + 7da86bf commit 9474967
Show file tree
Hide file tree
Showing 21 changed files with 651 additions and 3 deletions.
11 changes: 11 additions & 0 deletions mysql-test/r/mysqld--help-notwin.result
Original file line number Diff line number Diff line change
Expand Up @@ -1911,6 +1911,16 @@ The following options may be given as the first argument:
--wsrep-trx-fragment-unit=name
Unit for streaming replication transaction fragments'
size: bytes, rows, statements
--wsrep-use-async-monitor
Use Async Monitors to avoid deadlock of replicated
transactions in a multi-threaded replica. Deadlock is
possible when the PXC replica tries to commit the
replicated transactions in galera in a different order
than its order in the relay log. Use
'wsrep_use_async_monitor' to avoid such deadlocks.This
variable is only allowed to be changed through command
line.
(Defaults to on; use --skip-wsrep-use-async-monitor to disable.)
--xa-detach-on-prepare
When set, XA transactions will be detached (AKA
dissociated or disconnected) from connection as part of
Expand Down Expand Up @@ -2435,6 +2445,7 @@ wsrep-start-position 00000000-0000-0000-0000-000000000000:-1
wsrep-sync-wait 0
wsrep-trx-fragment-size 0
wsrep-trx-fragment-unit bytes
wsrep-use-async-monitor TRUE
xa-detach-on-prepare TRUE

To see what values a running MySQL server is using, type
Expand Down
1 change: 1 addition & 0 deletions mysql-test/suite/galera/r/galera_defaults.result
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ WSREP_SST_METHOD xtrabackup-v2
WSREP_SYNC_WAIT 15
WSREP_TRX_FRAGMENT_SIZE 0
WSREP_TRX_FRAGMENT_UNIT bytes
WSREP_USE_ASYNC_MONITOR ON
allocator.disk_pages_encryption = no; allocator.encryption_cache_page_size = 32K; allocator.encryption_cache_size = 16777216; <BASE_DIR>; <BASE_HOST>; <BASE_PORT>; cert.log_conflicts = no; cert.optimistic_pa = no; debug = no; evs.auto_evict = 0; evs.causal_keepalive_period = PT1S; evs.debug_log_mask = 0x1; evs.delay_margin = PT1S; evs.delayed_keep_period = PT30S; evs.inactive_check_period = PT0.5S; evs.inactive_timeout = PT30S; evs.info_log_mask = 0; evs.install_timeout = PT15S; evs.join_retrans_period = PT1S; evs.keepalive_period = PT1S; evs.max_install_timeouts = 3; evs.send_window = 10; evs.stats_report_period = PT1M; evs.suspect_timeout = PT12S; evs.use_aggregate = true; evs.user_send_window = 4; evs.version = 1; evs.view_forget_timeout = P1D; <GCACHE_DIR>; gcache.encryption = no; gcache.encryption_cache_page_size = 32K; gcache.encryption_cache_size = 16777216; gcache.freeze_purge_at_seqno = -1; gcache.keep_pages_count = 0; gcache.keep_pages_size = 0; gcache.mem_size = 0; <GCACHE_NAME>; gcache.page_size = 128M; gcache.recover = yes; gcache.size = 128M; gcomm.thread_prio = ; gcs.fc_auto_evict_threshold = 0.75; gcs.fc_auto_evict_window = 0; gcs.fc_debug = 0; gcs.fc_factor = 1.0; gcs.fc_limit = 100; gcs.fc_master_slave = no; gcs.fc_single_primary = no; gcs.max_packet_size = 64500; gcs.max_throttle = 0.25; <RECV_Q_HARD_LIMIT>;gcs.recv_q_soft_limit = 0.25; gcs.sync_donor = no; <GMCAST_LISTEN_ADDR>; gmcast.mcast_addr = ; gmcast.mcast_ttl = 1; gmcast.peer_timeout = PT10S; gmcast.segment = 0; gmcast.time_wait = PT5S; gmcast.version = 0; <IST_RECV_ADDR>; pc.announce_timeout = PT3S; pc.checksum = false; pc.ignore_quorum = false; pc.ignore_sb = false; pc.linger = PT20S; pc.npvo = false; pc.recovery = true; pc.version = 0; pc.wait_prim = true; pc.wait_prim_timeout = PT30S; pc.wait_restored_prim_timeout = PT0S; pc.weight = 1; protonet.backend = asio; protonet.version = 0; repl.causal_read_timeout = PT90S; repl.commit_order = 3; repl.key_format = FLAT8; repl.max_ws_size = 2147483647; <REPL_PROTO_MAX>;socket.checksum = 2; socket.recv_buf_size = auto; socket.send_buf_size = auto;
SELECT COUNT(*) FROM performance_schema.global_status
WHERE VARIABLE_NAME LIKE 'wsrep_%'
Expand Down
4 changes: 2 additions & 2 deletions mysql-test/suite/galera/t/pxc_inconsistency_voting.test
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,14 @@ ALTER TABLE t1 MODIFY c2 BIGINT UNSIGNED NOT NULL AUTO_INCREMENT ;

# Assert that node1 logs the message in the language set above.
--let $assert_text= Verify that node1 logs the message in the language set above
--let $assert_select= Member 0.* initiates vote on .* Un seul champ automatique est permis et il doit être indexé
--let $assert_select= Member .* initiates vote on .* Un seul champ automatique est permis et il doit être indexé
--let $assert_file= $MYSQLTEST_VARDIR/log/mysqld.1.err
--let $assert_count=1
--source include/assert_grep.inc

# Assert that node2 logs the message in the default language.
--let $assert_text= Verify that node2 logs the message in the default language
--let $assert_select= Member 1.* initiates vote on .* Incorrect table definition; there can be only one auto column and it must be defined as a key
--let $assert_select= Member .* initiates vote on .* Incorrect table definition; there can be only one auto column and it must be defined as a key

--let $assert_file= $MYSQLTEST_VARDIR/log/mysqld.1.err
--let $assert_count=1
Expand Down
2 changes: 2 additions & 0 deletions mysql-test/suite/sys_vars/r/all_vars.result
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,8 @@ wsrep_trx_fragment_size
wsrep_trx_fragment_size
wsrep_trx_fragment_unit
wsrep_trx_fragment_unit
wsrep_use_async_monitor
wsrep_use_async_monitor
xa_detach_on_prepare
xa_detach_on_prepare
drop table t1;
Expand Down
1 change: 1 addition & 0 deletions sql/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -1148,6 +1148,7 @@ SET (RPL_REPLICA_SRCS
rpl_trx_boundary_parser.cc
udf_service_impl.cc
udf_service_util.cc
wsrep_async_monitor.cc
)
ADD_STATIC_LIBRARY(rpl_replica ${RPL_REPLICA_SRCS}
DEPENDENCIES GenError
Expand Down
2 changes: 2 additions & 0 deletions sql/binlog.cc
Original file line number Diff line number Diff line change
Expand Up @@ -8864,12 +8864,14 @@ TC_LOG::enum_result MYSQL_BIN_LOG::commit(THD *thd, bool all) {
trans_commit_stmt()) the following call to my_error() will allow
overwriting the error */
my_error(ER_TRANSACTION_ROLLBACK_DURING_COMMIT, MYF(0));
thd_leave_async_monitor(thd);
return RESULT_ABORTED;
}

int rc = ordered_commit(thd, all, skip_commit);

if (run_wsrep_hooks) {
thd_leave_async_monitor(thd);
wsrep_after_commit(thd, all);
}

Expand Down
23 changes: 23 additions & 0 deletions sql/log_event.cc
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,7 @@
#include "service_wsrep.h"
#include "wsrep_mysqld.h"
#include "wsrep_xid.h"
#include "sql/wsrep_async_monitor.h"
#endif /* WITH_WSREP */

#define window_size Log_throttle::LOG_THROTTLE_WINDOW_SIZE
Expand Down Expand Up @@ -2712,6 +2713,13 @@ Slave_worker *Log_event::get_slave_worker(Relay_log_info *rli) {

Gtid_log_event *gtid_log_ev = static_cast<Gtid_log_event *>(this);
rli->started_processing(gtid_log_ev);
#ifdef WITH_WSREP
Wsrep_async_monitor *wsrep_async_monitor {rli->get_wsrep_async_monitor()};
if (wsrep_async_monitor) {
auto seqno = gtid_log_ev->sequence_number;
wsrep_async_monitor->schedule(seqno);
}
#endif /* WITH_WSREP */
}

if (schedule_next_event(this, rli)) {
Expand Down Expand Up @@ -11233,6 +11241,21 @@ static enum_tbl_map_status check_table_map(Relay_log_info const *rli,
}
}

#ifdef WITH_WSREP
// This transaction is anyways going to be skipped. So skip the transaction
// in the async monitor as well
if (WSREP(rli->info_thd) && rli->info_thd->system_thread == SYSTEM_THREAD_SLAVE_WORKER
&& !thd_is_wsrep_applier && res == FILTERED_OUT) {
Slave_worker *sw = static_cast<Slave_worker *>(const_cast<Relay_log_info *>(rli));
Wsrep_async_monitor *wsrep_async_monitor {sw->get_wsrep_async_monitor()};
if (wsrep_async_monitor) {
auto seqno = sw->sequence_number();
assert(seqno > 0);
wsrep_async_monitor->skip(seqno);
}
}
#endif /* WITH_WSREP */

DBUG_PRINT("debug", ("check of table map ended up with: %u", res));

return res;
Expand Down
16 changes: 16 additions & 0 deletions sql/rpl_gtid_execution.cc
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,10 @@
#include "sql/sql_lex.h"
#include "sql/sql_parse.h" // stmt_causes_implicit_commit
#include "sql/system_variables.h"
#ifdef WITH_WSREP
#include "sql/rpl_rli_pdb.h" // Slave_worker
#include "sql/wsrep_async_monitor.h"
#endif /* WITH_WSREP */

bool set_gtid_next(THD *thd, const Gtid_specification &spec) {
DBUG_TRACE;
Expand Down Expand Up @@ -355,6 +359,18 @@ static inline void skip_statement(THD *thd) {
to notify that its session ticket was consumed.
*/
Commit_stage_manager::get_instance().finish_session_ticket(thd);
#ifdef WITH_WSREP
/* Despite the transaction was skipped, it needs to be updated in the Wsrep_async_monitor */
if (thd->system_thread == SYSTEM_THREAD_SLAVE_WORKER) {
Slave_worker *sw = dynamic_cast<Slave_worker*>(thd->rli_slave);
Wsrep_async_monitor *wsrep_async_monitor {sw->get_wsrep_async_monitor()};
if (wsrep_async_monitor) {
auto seqno = sw->sequence_number();
assert(seqno > 0);
wsrep_async_monitor->skip(seqno);
}
}
#endif /* WITH_WSREP */

#ifndef NDEBUG
const Gtid_set *executed_gtids = gtid_state->get_executed_gtids();
Expand Down
25 changes: 25 additions & 0 deletions sql/rpl_replica.cc
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,10 @@
#endif
#include "scope_guard.h"

#ifdef WITH_WSREP
#include "sql/wsrep_async_monitor.h"
#endif /* WITH_WSREP */

struct mysql_cond_t;
struct mysql_mutex_t;

Expand Down Expand Up @@ -7228,6 +7232,9 @@ extern "C" void *handle_slave_sql(void *arg) {
bool mts_inited = false;
Global_THD_manager *thd_manager = Global_THD_manager::get_instance();
Commit_order_manager *commit_order_mngr = nullptr;
#ifdef WITH_WSREP
Wsrep_async_monitor *wsrep_async_monitor = nullptr;
#endif /* WITH_WSREP */
Rpl_applier_reader applier_reader(rli);
Relay_log_info::enum_priv_checks_status priv_check_status =
Relay_log_info::enum_priv_checks_status::SUCCESS;
Expand Down Expand Up @@ -7273,6 +7280,17 @@ wsrep_restart_point :

rli->set_commit_order_manager(commit_order_mngr);

#ifdef WITH_WSREP
// Restore the last executed seqno
if (WSREP_ON && wsrep_use_async_monitor
&& opt_replica_preserve_commit_order
&& !rli->is_parallel_exec()
&& rli->opt_replica_parallel_workers > 1) {
wsrep_async_monitor = new Wsrep_async_monitor(rli->opt_replica_parallel_workers);
rli->set_wsrep_async_monitor(wsrep_async_monitor);
}
#endif /* WITH_WSREP */

if (channel_map.is_group_replication_channel_name(rli->get_channel())) {
if (channel_map.is_group_replication_channel_name(rli->get_channel(),
true)) {
Expand Down Expand Up @@ -7700,6 +7718,13 @@ wsrep_restart_point :
delete commit_order_mngr;
}

#ifdef WITH_WSREP
if (wsrep_async_monitor) {
rli->set_wsrep_async_monitor(nullptr);
delete wsrep_async_monitor;
}
#endif /* WITH_WSREP */

mysql_mutex_unlock(&rli->info_thd_lock);
set_thd_in_use_temporary_tables(
rli); // (re)set info_thd in use for saved temp tables
Expand Down
3 changes: 3 additions & 0 deletions sql/rpl_rli.cc
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,9 @@ Relay_log_info::Relay_log_info(bool is_slave_recovery,
reported_unsafe_warning(false),
rli_description_event(nullptr),
commit_order_mngr(nullptr),
#ifdef WITH_WSREP
wsrep_async_monitor(nullptr),
#endif /* WITH_WSREP */
sql_delay(0),
sql_delay_end(0),
m_flags(0),
Expand Down
19 changes: 19 additions & 0 deletions sql/rpl_rli.h
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,10 @@ class String;
struct LEX_MASTER_INFO;
struct db_worker_hash_entry;

#ifdef WITH_WSREP
class Wsrep_async_monitor;
#endif /* WITH_WSREP */

extern uint sql_replica_skip_counter;

typedef Prealloced_array<Slave_worker *, 4> Slave_worker_array;
Expand Down Expand Up @@ -1778,6 +1782,15 @@ class Relay_log_info : public Rpl_info {
commit_order_mngr = mngr;
}

#ifdef WITH_WSREP
Wsrep_async_monitor* get_wsrep_async_monitor() {
return wsrep_async_monitor;
}
void set_wsrep_async_monitor(Wsrep_async_monitor *monitor) {
wsrep_async_monitor = monitor;
}
#endif /* WITH_WSREP */

/*
Following set function is required to initialize the 'until_option' during
MTS relay log recovery process.
Expand Down Expand Up @@ -1835,6 +1848,12 @@ class Relay_log_info : public Rpl_info {
*/
Commit_order_manager *commit_order_mngr;

#ifdef WITH_WSREP
/*
Wsrep_async_monitor orders DMLs and DDls in galera.
*/
Wsrep_async_monitor *wsrep_async_monitor;
#endif /* WITH_WSREP */
/**
Delay slave SQL thread by this amount of seconds.
The delay is applied per transaction and based on the immediate master's
Expand Down
3 changes: 3 additions & 0 deletions sql/rpl_rli_pdb.cc
Original file line number Diff line number Diff line change
Expand Up @@ -327,6 +327,9 @@ int Slave_worker::init_worker(Relay_log_info *rli, ulong i) {
this->set_require_row_format(rli->is_row_format_required());

set_commit_order_manager(c_rli->get_commit_order_manager());
#ifdef WITH_WSREP
set_wsrep_async_monitor(c_rli->get_wsrep_async_monitor());
#endif /* WITH_WSREP */

if (rli_init_info(false) ||
DBUG_EVALUATE_IF("inject_init_worker_init_info_fault", true, false))
Expand Down
35 changes: 34 additions & 1 deletion sql/sql_parse.cc
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,7 @@
#endif /* WITH_LOCK_ORDER */

#ifdef WITH_WSREP
#include "wsrep_async_monitor.h"
#include "wsrep_binlog.h"
#include "wsrep_mysqld.h"
#include "wsrep_sst.h"
Expand Down Expand Up @@ -307,10 +308,27 @@ bool all_tables_not_ok(THD *thd, Table_ref *tables) {
if (WSREP(thd) && thd->wsrep_applier &&
wsrep_check_mode(WSREP_MODE_IGNORE_NATIVE_REPLICATION_FILTER_RULES))
return false;
#endif

bool ret = rpl_filter->is_on() && tables && !thd->sp_runtime_ctx &&
!rpl_filter->tables_ok(thd->db().str, tables);
// This transaction is anyways going to be skipped. So skip the transaction
// in the async monitor as well
if (ret && WSREP(thd) && thd->system_thread == SYSTEM_THREAD_SLAVE_WORKER &&
!thd->wsrep_applier) {
Slave_worker *sw = dynamic_cast<Slave_worker *>(thd->rli_slave);
Wsrep_async_monitor *wsrep_async_monitor{sw->get_wsrep_async_monitor()};
if (wsrep_async_monitor) {
auto seqno = sw->sequence_number();
assert(seqno > 0);
wsrep_async_monitor->skip(seqno);
}
}
return ret;
#else

return rpl_filter->is_on() && tables && !thd->sp_runtime_ctx &&
!rpl_filter->tables_ok(thd->db().str, tables);
#endif
}

bool is_normal_transaction_boundary_stmt(enum_sql_command sql_cmd) {
Expand Down Expand Up @@ -394,6 +412,21 @@ inline bool check_database_filters(THD *thd, const char *db,
break;
}
}

#ifdef WITH_WSREP
// This transaction is anyways going to be skipped. So skip the transaction
// in the async monitor as well
if (WSREP(thd) && thd->system_thread == SYSTEM_THREAD_SLAVE_WORKER
&& !thd->wsrep_applier && !db_ok) {
Slave_worker *sw = dynamic_cast<Slave_worker*>(thd->rli_slave);
Wsrep_async_monitor *wsrep_async_monitor {sw->get_wsrep_async_monitor()};
if (wsrep_async_monitor) {
auto seqno = sw->sequence_number();
assert(seqno > 0);
wsrep_async_monitor->skip(seqno);
}
}
#endif /* WITH_WSREP */
return db_ok;
}

Expand Down
9 changes: 9 additions & 0 deletions sql/sys_vars.cc
Original file line number Diff line number Diff line change
Expand Up @@ -8846,6 +8846,15 @@ static Sys_var_enum Sys_wsrep_disk_pages_encrypt(
READ_ONLY GLOBAL_VAR(wsrep_disk_pages_encrypt), CMD_LINE(OPT_ARG), wsrep_encrypt_modes,
DEFAULT(WSREP_ENCRYPT_MODE_NONE), NO_MUTEX_GUARD, NOT_IN_BINLOG);

static Sys_var_bool Sys_wsrep_async_monitor(
"wsrep_use_async_monitor",
"Use Async Monitors to avoid deadlock of replicated transactions in a multi-threaded replica. "
"Deadlock is possible when the PXC replica tries to commit the replicated transactions in galera "
"in a different order than its order in the relay log. Use 'wsrep_use_async_monitor' to avoid "
"such deadlocks."
"This variable is only allowed to be changed through command line.",
READ_ONLY GLOBAL_VAR(wsrep_use_async_monitor), CMD_LINE(OPT_ARG),
DEFAULT(true), NO_MUTEX_GUARD, NOT_IN_BINLOG);
#endif /* WITH_WSREP */

static bool check_set_require_row_format(sys_var *, THD *thd, set_var *var) {
Expand Down
Loading

0 comments on commit 9474967

Please sign in to comment.