Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix 7875 #7910

Draft
wants to merge 3 commits into
base: release-13.0
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 47 additions & 6 deletions src/backend/distributed/transaction/transaction_recovery.c
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,8 @@ PG_FUNCTION_INFO_V1(recover_prepared_transactions);


/* Local functions forward declarations */
static int RecoverWorkerTransactions(WorkerNode *workerNode);
static int RecoverWorkerTransactions(WorkerNode *workerNode,
MultiConnection *connection);
static List * PendingWorkerTransactionList(MultiConnection *connection);
static bool IsTransactionInProgress(HTAB *activeTransactionNumberSet,
char *preparedTransactionName);
Expand Down Expand Up @@ -122,11 +123,52 @@ RecoverTwoPhaseCommits(void)
/* take advisory lock first to avoid running concurrently */
LockTransactionRecovery(ShareUpdateExclusiveLock);

List *workerList = ActivePrimaryNodeList(NoLock);
List *workerList = ActivePrimaryRemoteNodeList(NoLock);
List *workerConnections = NIL;
WorkerNode *workerNode = NULL;
MultiConnection *connection = NULL;

/*
* Pre-establish all connections to worker nodes.
*
* We do this to enforce a consistent lock acquisition order and prevent deadlocks.
* Currently, during extension updates, we take strong locks on the Citus
* catalog tables in a specific order: first on pg_dist_authinfo, then on
* pg_dist_transaction. It's critical that any operation locking these two
* tables adheres to this order, or a deadlock could occur.
*
* Note that RecoverWorkerTransactions() retains its lock until the end
* of the transaction, while GetNodeConnection() releases its lock after
* the catalog lookup. So when there are multiple workers in the active primary
* node list, the lock acquisition order may reverse in subsequent iterations
* of the loop calling RecoverWorkerTransactions(), increasing the risk
* of deadlock.
*
* By establishing all worker connections upfront, we ensure that
* RecoverWorkerTransactions() deals with a single distributed catalog table,
* thereby preventing deadlocks regardless of the lock acquisition sequence
* used in the upgrade extension script.
*/

foreach_declared_ptr(workerNode, workerList)
{
recoveredTransactionCount += RecoverWorkerTransactions(workerNode);
int connectionFlags = 0;
char *nodeName = workerNode->workerName;
int nodePort = workerNode->workerPort;

connection = GetNodeConnection(connectionFlags, nodeName, nodePort);
Assert(connection != NULL);

/*
* We don't verify connection validity here.
* Instead, RecoverWorkerTransactions() performs the necessary
* sanity checks on the connection state.
*/
workerConnections = lappend(workerConnections, connection);
}
forboth_ptr(workerNode, workerList, connection, workerConnections)
{
recoveredTransactionCount += RecoverWorkerTransactions(workerNode, connection);
}

return recoveredTransactionCount;
Expand All @@ -138,7 +180,7 @@ RecoverTwoPhaseCommits(void)
* started by this node on the specified worker.
*/
static int
RecoverWorkerTransactions(WorkerNode *workerNode)
RecoverWorkerTransactions(WorkerNode *workerNode, MultiConnection *connection)
{
int recoveredTransactionCount = 0;

Expand All @@ -156,8 +198,7 @@ RecoverWorkerTransactions(WorkerNode *workerNode)

bool recoveryFailed = false;

int connectionFlags = 0;
MultiConnection *connection = GetNodeConnection(connectionFlags, nodeName, nodePort);
Assert(connection != NULL);
if (connection->pgConn == NULL || PQstatus(connection->pgConn) != CONNECTION_OK)
{
ereport(WARNING, (errmsg("transaction recovery cannot connect to %s:%d",
Expand Down
18 changes: 0 additions & 18 deletions src/test/regress/citus_tests/upgrade/citus_upgrade_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,16 +62,10 @@ def run_citus_upgrade_tests(config, before_upgrade_schedule, after_upgrade_sched

install_citus(config.post_tar_path)

# disable 2pc recovery for all nodes to work around https://github.com/citusdata/citus/issues/7875
disable_2pc_recovery_for_all_nodes(config.bindir, config)

restart_databases(config.bindir, config.datadir, config.mixed_mode, config)
run_alter_citus(config.bindir, config.mixed_mode, config)
verify_upgrade(config, config.mixed_mode, config.node_name_to_ports.values())

# re-enable 2pc recovery for all nodes
enable_2pc_recovery_for_all_nodes(config.bindir, config)

run_test_on_coordinator(config, after_upgrade_schedule)
remove_citus(config.post_tar_path)

Expand Down Expand Up @@ -152,18 +146,6 @@ def restart_database(pg_path, abs_data_path, node_name, node_ports, logfile_pref
subprocess.run(command, check=True)


def disable_2pc_recovery_for_all_nodes(pg_path, config):
for port in config.node_name_to_ports.values():
utils.psql(pg_path, port, "ALTER SYSTEM SET citus.recover_2pc_interval TO -1;")
utils.psql(pg_path, port, "SELECT pg_reload_conf();")


def enable_2pc_recovery_for_all_nodes(pg_path, config):
for port in config.node_name_to_ports.values():
utils.psql(pg_path, port, "ALTER SYSTEM RESET citus.recover_2pc_interval;")
utils.psql(pg_path, port, "SELECT pg_reload_conf();")


def run_alter_citus(pg_path, mixed_mode, config):
for port in config.node_name_to_ports.values():
if mixed_mode and port in (
Expand Down
Loading