Skip to content

MDEV-18983 Port rpl_semi_sync_master_wait_for_slave_count from MySQL #4037

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 4 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions mysql-test/main/mysqld--help.result
Original file line number Diff line number Diff line change
Expand Up @@ -1282,9 +1282,14 @@ The following specify which files/extra groups are read (specified before remain
replication in the master
--rpl-semi-sync-master-trace-level=#
The tracing level for semi-sync replication
--rpl-semi-sync-master-wait-for-slave-count=#
The number of slaves that need to acknowledge that they
have received a transaction before the transaction can
complete on the master
--rpl-semi-sync-master-wait-no-slave
Wait until timeout when no semi-synchronous replication
slave is available
Wait until timeout when less than
`rpl_semi_sync_master_wait_for_slave_count`
semi-synchronous replication slaves are available
(Defaults to on; use --skip-rpl-semi-sync-master-wait-no-slave to disable.)
--rpl-semi-sync-master-wait-point=name
Should transaction wait for semi-sync ack after having
Expand Down Expand Up @@ -1991,6 +1996,7 @@ rowid-merge-buff-size 8388608
rpl-semi-sync-master-enabled FALSE
rpl-semi-sync-master-timeout 10000
rpl-semi-sync-master-trace-level 32
rpl-semi-sync-master-wait-for-slave-count 1
rpl-semi-sync-master-wait-no-slave TRUE
rpl-semi-sync-master-wait-point AFTER_COMMIT
rpl-semi-sync-slave-delay-master FALSE
Expand Down
12 changes: 11 additions & 1 deletion mysql-test/suite/sys_vars/r/sysvars_server_notembedded.result
Original file line number Diff line number Diff line change
Expand Up @@ -3882,10 +3882,20 @@ NUMERIC_BLOCK_SIZE 1
ENUM_VALUE_LIST NULL
READ_ONLY NO
COMMAND_LINE_ARGUMENT REQUIRED
VARIABLE_NAME RPL_SEMI_SYNC_MASTER_WAIT_FOR_SLAVE_COUNT
VARIABLE_SCOPE GLOBAL
VARIABLE_TYPE INT UNSIGNED
VARIABLE_COMMENT The number of slaves that need to acknowledge that they have received a transaction before the transaction can complete on the master
NUMERIC_MIN_VALUE 1
NUMERIC_MAX_VALUE 65535
NUMERIC_BLOCK_SIZE 1
ENUM_VALUE_LIST NULL
READ_ONLY NO
COMMAND_LINE_ARGUMENT REQUIRED
VARIABLE_NAME RPL_SEMI_SYNC_MASTER_WAIT_NO_SLAVE
VARIABLE_SCOPE GLOBAL
VARIABLE_TYPE BOOLEAN
VARIABLE_COMMENT Wait until timeout when no semi-synchronous replication slave is available
VARIABLE_COMMENT Wait until timeout when less than `rpl_semi_sync_master_wait_for_slave_count` semi-synchronous replication slaves are available
NUMERIC_MIN_VALUE NULL
NUMERIC_MAX_VALUE NULL
NUMERIC_BLOCK_SIZE NULL
Expand Down
2 changes: 2 additions & 0 deletions sql/privilege.h
Original file line number Diff line number Diff line change
Expand Up @@ -520,6 +520,8 @@ constexpr privilege_t PRIV_SET_SYSTEM_GLOBAL_VAR_RPL_SEMI_SYNC_MASTER_TRACE_LEVE
REPL_MASTER_ADMIN_ACL;
constexpr privilege_t PRIV_SET_SYSTEM_GLOBAL_VAR_RPL_SEMI_SYNC_MASTER_WAIT_POINT=
REPL_MASTER_ADMIN_ACL;
constexpr privilege_t PRIV_SET_SYSTEM_GLOBAL_VAR_RPL_SEMI_SYNC_MASTER_WAIT_FOR_SLAVE_COUNT=
REPL_MASTER_ADMIN_ACL;

constexpr privilege_t PRIV_SET_SYSTEM_GLOBAL_VAR_MASTER_VERIFY_CHECKSUM=
REPL_MASTER_ADMIN_ACL;
Expand Down
119 changes: 60 additions & 59 deletions sql/semisync_master.cc
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ ulonglong rpl_semi_sync_master_net_wait_num = 0;
ulong rpl_semi_sync_master_clients = 0;
ulonglong rpl_semi_sync_master_net_wait_time = 0;
ulonglong rpl_semi_sync_master_trx_wait_time = 0;
unsigned int rpl_semi_sync_master_wait_for_slave_count = 1;

Repl_semi_sync_master repl_semisync_master;
Ack_receiver ack_receiver;
Expand All @@ -68,6 +69,14 @@ static ulonglong timespec_to_usec(const struct timespec *ts)
return (ulonglong) ts->tv_sec * TIME_MILLION + ts->tv_nsec / TIME_THOUSAND;
}

/** @return Should we revert to async because there not enough slaves? */
static bool is_no_slave()
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note from #3931:

The code for “automatic revert to async” is not an active function, but rather passive behaviors of various functions when the conditions are met.

{
return rpl_semi_sync_master_clients <
rpl_semi_sync_master_wait_for_slave_count &&
!rpl_semi_sync_master_wait_no_slave;
}

int signal_waiting_transaction(THD *waiting_thd, const char *binlog_file,
my_off_t binlog_pos)
{
Expand Down Expand Up @@ -157,6 +166,17 @@ int Active_tranx::compare(const char *log_file_name1, my_off_t log_file_pos1,
return 0;
}

Tranx_node *
Active_tranx::get_tranx_node(const char *log_file_name, my_off_t log_file_pos)
{
Tranx_node *entry;
mysql_mutex_assert_owner(m_lock);
for (entry= m_trx_htb[get_hash_value(log_file_name, log_file_pos)];
entry && compare(entry, log_file_name, log_file_pos);
entry= entry->hash_next);
return entry;
}

int Active_tranx::insert_tranx_node(THD *thd_to_wait,
const char *log_file_name,
my_off_t log_file_pos)
Expand Down Expand Up @@ -230,23 +250,16 @@ bool Active_tranx::is_tranx_end_pos(const char *log_file_name,
my_off_t log_file_pos)
{
DBUG_ENTER("Active_tranx::is_tranx_end_pos");
DBUG_RETURN(get_tranx_node(log_file_name, log_file_pos));
}

unsigned int hash_val = get_hash_value(log_file_name, log_file_pos);
Tranx_node *entry = m_trx_htb[hash_val];

while (entry != NULL)
{
if (compare(entry, log_file_name, log_file_pos) == 0)
break;

entry = entry->hash_next;
}

DBUG_PRINT("semisync", ("%s: probe (%s, %lu) in entry(%u)",
"Active_tranx::is_tranx_end_pos",
log_file_name, (ulong)log_file_pos, hash_val));

DBUG_RETURN(entry != NULL);
Tranx_node *Active_tranx::find_acked_tranx_node()
{
Tranx_node *new_front;
for (Tranx_node *entry= m_trx_front; entry; entry= entry->next)
Copy link
Contributor Author

@ParadoxV5 ParadoxV5 May 13, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Repl_semi_sync_master::refresh_wait_for_slave_count(uint32 server_id) calls this to iteratively find the transaction(s) that can now pass with a lowered rpl_semi_sync_master_wait_for_slave_count.
Then it calls
report_reply_binlog() which includes clear_active_tranx_nodes() which iteratively finds those transactions again.
Yes, both iterations are linear.

While the infrequently-used clear_active_tranx_nodes() can use lambdas to merge with this iteration, its call is mixed within report_reply_binlog().
This, along with is_no_slave() & Active_tranx::get_tranx_node(), and how implementing fix_rpl_semi_sync_master_wait_for_slave_count requires new methods in two layers, are signs that Semi-Sync is due for refactoring.

Copy link
Contributor Author

@ParadoxV5 ParadoxV5 May 13, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Active_tranx is a naturally-ordered, block-allocating, hashtable with linked list, specialized for managing … its elements.

P.S. It uses neither C++ std::set nor our in-house HASH.

if (entry->acks >= rpl_semi_sync_master_wait_for_slave_count)
new_front= entry;
return new_front;
}

void Active_tranx::clear_active_tranx_nodes(
Expand Down Expand Up @@ -341,45 +354,14 @@ void Active_tranx::unlink_thd_as_waiter(const char *log_file_name,
my_off_t log_file_pos)
{
DBUG_ENTER("Active_tranx::unlink_thd_as_waiter");
mysql_mutex_assert_owner(m_lock);

unsigned int hash_val = get_hash_value(log_file_name, log_file_pos);
Tranx_node *entry = m_trx_htb[hash_val];

while (entry != NULL)
{
if (compare(entry, log_file_name, log_file_pos) == 0)
break;

entry = entry->hash_next;
}

Tranx_node *entry= get_tranx_node(log_file_name, log_file_pos);
if (entry)
entry->thd= NULL;

DBUG_VOID_RETURN;
}

bool Active_tranx::is_thd_waiter(THD *thd_to_check, const char *log_file_name,
my_off_t log_file_pos)
{
DBUG_ENTER("Active_tranx::assert_thd_is_waiter");
mysql_mutex_assert_owner(m_lock);

unsigned int hash_val = get_hash_value(log_file_name, log_file_pos);
Tranx_node *entry = m_trx_htb[hash_val];

while (entry != NULL)
{
if (compare(entry, log_file_name, log_file_pos) == 0)
break;

entry = entry->hash_next;
}

DBUG_RETURN(static_cast<bool>(entry));
}

/*******************************************************************************
*
* <Repl_semi_sync_master> class: the basic code layer for semisync master.
Expand Down Expand Up @@ -565,7 +547,8 @@ void Repl_semi_sync_master::remove_slave()
{
lock();
DBUG_ASSERT(rpl_semi_sync_master_clients > 0);
if (!(--rpl_semi_sync_master_clients) && !rpl_semi_sync_master_wait_no_slave)
--rpl_semi_sync_master_clients;
if (is_no_slave())
Comment on lines 549 to +551
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

{
/*
Signal transactions waiting in commit_trx() that they do not have to
Expand Down Expand Up @@ -705,16 +688,20 @@ int Repl_semi_sync_master::report_reply_binlog(uint32 server_id,

if (need_copy_send_pos)
{
Tranx_node *entry;
strmake_buf(m_reply_file_name, log_file_name);
m_reply_file_pos = log_file_pos;
m_reply_file_name_inited = true;

/* Remove all active transaction nodes before this point. */
DBUG_ASSERT(m_active_tranxs != NULL);
m_active_tranxs->clear_active_tranx_nodes(log_file_name, log_file_pos,
signal_waiting_transaction);
if (m_active_tranxs->is_empty())
m_wait_file_name_inited= false;
entry= m_active_tranxs->get_tranx_node(log_file_name, log_file_pos);
if (entry && ++(entry->acks) >= rpl_semi_sync_master_wait_for_slave_count)
{
/* Remove all active transaction nodes before this point. */
m_active_tranxs->clear_active_tranx_nodes(log_file_name, log_file_pos,
signal_waiting_transaction);
if (m_active_tranxs->is_empty())
m_wait_file_name_inited= false;
}
Comment on lines +696 to +704
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

report_reply_binlog() is somehow responsible for three distinct subfeatures:

  1. turn Semi-Sync back on if it is not, probably in response to a late ACK post-timeout
    • Is this even the correct place for this sub-“feature”?
      rpl_semi_sync_master_wait_for_slave_count doesn’t even care about it.
  2. update Semi-Sync’s ‘current’ position
    • dump_start() also calls report_reply_binlog(), yet I believe it’s only needs this subfeature.
      The other two subfeatures require this entry && condition to defend against this call.
  3. clear_active_tranx_nodes(): flush and clear transactions (plural?) before and including the specified position to allow them to complete


DBUG_PRINT("semisync", ("%s: Got reply at (%s, %lu)",
"Repl_semi_sync_master::report_reply_binlog",
Expand Down Expand Up @@ -859,7 +846,7 @@ int Repl_semi_sync_master::commit_trx(const char *trx_wait_binlog_name,
bool success= 0;
DBUG_ENTER("Repl_semi_sync_master::commit_trx");

if (!rpl_semi_sync_master_clients && !rpl_semi_sync_master_wait_no_slave)
if (is_no_slave())
{
rpl_semi_sync_master_no_transactions++;
DBUG_RETURN(0);
Expand Down Expand Up @@ -895,7 +882,7 @@ int Repl_semi_sync_master::commit_trx(const char *trx_wait_binlog_name,
while (is_on() && !(aborted= thd_killed(thd)))
{
/* We have to check these again as things may have changed */
if (!rpl_semi_sync_master_clients && !rpl_semi_sync_master_wait_no_slave)
if (is_no_slave())
{
aborted= 1;
break;
Expand Down Expand Up @@ -930,8 +917,8 @@ int Repl_semi_sync_master::commit_trx(const char *trx_wait_binlog_name,
* rpl_semi_sync_master_yes/no_tx consistent with it, we check for a
* semi-sync restart _after_ checking the reply state.
*/
if (unlikely(!m_active_tranxs->is_thd_waiter(thd, trx_wait_binlog_name,
trx_wait_binlog_pos)))
if (unlikely(!m_active_tranxs->is_tranx_end_pos(trx_wait_binlog_name,
trx_wait_binlog_pos)))
{
DBUG_EXECUTE_IF(
"semisync_log_skip_trx_wait",
Expand Down Expand Up @@ -1499,6 +1486,20 @@ void Repl_semi_sync_master::await_all_slave_replies(const char *msg)
DBUG_VOID_RETURN;
}

void Repl_semi_sync_master::refresh_wait_for_slave_count(uint32 server_id)
{
DBUG_ENTER("refresh_wait_for_slave_count");
lock();
if (get_master_enabled())
{
Tranx_node *entry;
DBUG_ASSERT(m_active_tranxs);
if ((entry= m_active_tranxs->find_acked_tranx_node()))
report_reply_binlog(server_id, entry->log_name, entry->log_pos);
}
unlock();
}

/* Get the waiting time given the wait's staring time.
*
* Return:
Expand Down
28 changes: 20 additions & 8 deletions sql/semisync_master.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ struct Tranx_node {
THD *thd; /* The thread awaiting an ACK */
struct Tranx_node *next; /* the next node in the sorted list */
struct Tranx_node *hash_next; /* the next node during hash collision */
unsigned int acks; ///< number of ACKs received
};

/**
Expand Down Expand Up @@ -129,6 +130,7 @@ class Tranx_node_allocator
trx_node->log_pos= 0;
trx_node->next= 0;
trx_node->hash_next= 0;
trx_node->acks= 0;
return trx_node;
}

Expand Down Expand Up @@ -348,6 +350,9 @@ class Active_tranx
unsigned long trace_level);
~Active_tranx();

/** Find (if any) the active transaction node with the specified position */
Tranx_node *get_tranx_node(const char *log_file_name, my_off_t log_file_pos);

/* Insert an active transaction node with the specified position.
*
* Return:
Expand All @@ -356,6 +361,13 @@ class Active_tranx
int insert_tranx_node(THD *thd_to_wait, const char *log_file_name,
my_off_t log_file_pos);

/**
Find (if any) the (last) transaction node with at least
rpl_semi_sync_master_wait_for_slave_count Tranx_node::acks
@see Repl_semi_sync_master::refresh_wait_for_slave_count
*/
Tranx_node *find_acked_tranx_node();

/* Clear the active transaction nodes until(inclusive) the specified
* position.
* If log_file_name is NULL, everything will be cleared: the sorted
Expand All @@ -377,13 +389,6 @@ class Active_tranx
*/
void unlink_thd_as_waiter(const char *log_file_name, my_off_t log_file_pos);

/* Uses DBUG_ASSERT statements to ensure that the argument thd_to_check
* matches the thread of the respective Tranx_node::thd of the passed in
* log_file_name and log_file_pos.
*/
bool is_thd_waiter(THD *thd_to_check, const char *log_file_name,
my_off_t log_file_pos);

/* Given a position, check to see whether the position is an active
* transaction's ending position by probing the hash table.
*/
Expand All @@ -400,7 +405,6 @@ class Active_tranx
* if the internal linked list has no entries, false otherwise.
*/
bool is_empty() { return m_trx_front == NULL; }

};

/**
Expand Down Expand Up @@ -697,6 +701,13 @@ class Repl_semi_sync_master
/*called before reset master*/
int before_reset_master();

/**
If `SET rpl_semi_sync_master_wait_for_slave_count` lowered the requirement,
the transaction queue `m_active_tranxs` needs to flush any that did not have
enough Tranx_node::acks before but now have.
*/
void refresh_wait_for_slave_count(uint32 server_id);

mysql_mutex_t LOCK_rpl_semi_sync_master_enabled;
};

Expand All @@ -711,6 +722,7 @@ extern Ack_receiver ack_receiver;
/* System and status variables for the master component */
extern my_bool rpl_semi_sync_master_enabled;
extern my_bool rpl_semi_sync_master_status;
extern unsigned int rpl_semi_sync_master_wait_for_slave_count;
extern ulong rpl_semi_sync_master_wait_point;
extern ulong rpl_semi_sync_master_clients;
extern ulong rpl_semi_sync_master_timeout;
Expand Down
25 changes: 23 additions & 2 deletions sql/sys_vars.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3826,6 +3826,15 @@ static bool fix_rpl_semi_sync_master_wait_point(sys_var *self, THD *thd,
return false;
}

static bool fix_rpl_semi_sync_master_wait_for_slave_count
(sys_var *self, THD *thd, enum_var_type type)
{
mysql_mutex_unlock(&LOCK_global_system_variables);
repl_semisync_master.refresh_wait_for_slave_count(thd->variables.server_id);
Comment on lines +3829 to +3833
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ideally, the refresh is only required when the requirement decreases.
However, it seems that the Sys.Var. system has a limitation that the variable must have an ‘old’ duplicate that an ON_UPDATE callback can compare to tell whether it was decreased.

Intriguingly, other Semi-Sync Primary variables are duplicated and updated through ON_UPDATE even though they don’t have special handling.
This is likely a remnant of Semi-Sync’s past as a plugin.

mysql_mutex_lock(&LOCK_global_system_variables);
return false;
}

static Sys_var_on_access_global<Sys_var_mybool,
PRIV_SET_SYSTEM_GLOBAL_VAR_RPL_SEMI_SYNC_MASTER_ENABLED>
Sys_semisync_master_enabled(
Expand All @@ -3852,8 +3861,8 @@ static Sys_var_on_access_global<Sys_var_mybool,
PRIV_SET_SYSTEM_GLOBAL_VAR_RPL_SEMI_SYNC_MASTER_WAIT_NO_SLAVE>
Sys_semisync_master_wait_no_slave(
"rpl_semi_sync_master_wait_no_slave",
"Wait until timeout when no semi-synchronous replication slave is "
"available",
"Wait until timeout when less than `rpl_semi_sync_master_wait_for_"
"slave_count` semi-synchronous replication slaves are available",
GLOBAL_VAR(rpl_semi_sync_master_wait_no_slave),
CMD_LINE(OPT_ARG), DEFAULT(TRUE),
NO_MUTEX_GUARD, NOT_IN_BINLOG, ON_CHECK(0));
Expand Down Expand Up @@ -3883,6 +3892,18 @@ Sys_semisync_master_wait_point(
NO_MUTEX_GUARD, NOT_IN_BINLOG,ON_CHECK(0),
ON_UPDATE(fix_rpl_semi_sync_master_wait_point));

static Sys_var_on_access_global<Sys_var_uint,
PRIV_SET_SYSTEM_GLOBAL_VAR_RPL_SEMI_SYNC_MASTER_WAIT_FOR_SLAVE_COUNT>
Sys_semisync_master_wait_for_slave_count(
"rpl_semi_sync_master_wait_for_slave_count",
"The number of slaves that need to acknowledge that they have received "
"a transaction before the transaction can complete on the master",
GLOBAL_VAR(rpl_semi_sync_master_wait_for_slave_count),
CMD_LINE(REQUIRED_ARG), VALID_RANGE(1, 0xFFFF),
Comment on lines +3899 to +3902
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

matches MySQL

DEFAULT(rpl_semi_sync_master_wait_for_slave_count), BLOCK_SIZE(1),
NO_MUTEX_GUARD, NOT_IN_BINLOG, ON_CHECK(0),
ON_UPDATE(fix_rpl_semi_sync_master_wait_for_slave_count));

static bool fix_rpl_semi_sync_slave_trace_level(sys_var *self, THD *thd,
enum_var_type type)
{
Expand Down