Skip to content

Commit

Permalink
do not wait response for 'commit prepared', part 2
Browse files Browse the repository at this point in the history
  • Loading branch information
winter-loo committed Jan 8, 2025
1 parent a4a8640 commit e1b21e6
Show file tree
Hide file tree
Showing 6 changed files with 132 additions and 10 deletions.
51 changes: 47 additions & 4 deletions src/backend/distributed/connection/connection_management.c
Original file line number Diff line number Diff line change
Expand Up @@ -773,9 +773,31 @@ ShutdownConnection(MultiConnection *connection)
if (PQstatus(connection->pgConn) == CONNECTION_OK &&
PQtransactionStatus(connection->pgConn) == PQTRANS_ACTIVE)
{
SendCancelationRequest(connection);
RemoteTransaction* transaction = &connection->remoteTransaction;
if (Enable2PCQuickResponse &&
(transaction->transactionState == REMOTE_TRANS_2PC_ABORTING ||
transaction->transactionState == REMOTE_TRANS_2PC_COMMITTING))
{
#if IN_MY_DEBUGGING_PHASE
if (LogRemoteCommands)
{
elog(NOTICE, "Not send cancel request for 2PC aborting or committing");
}
#endif
}
else
{
SendCancelationRequest(connection);
}
}
CitusPQFinish(connection);
#if IN_MY_DEBUGGING_PHASE
if (LogRemoteCommands)
{
elog(NOTICE, "connection shutdown connectionId=%ld hostname=%s port=%d",
connection->connectionId, connection->hostname, connection->port);
}
#endif
}


Expand Down Expand Up @@ -1573,6 +1595,26 @@ RemoteTransactionIdle(MultiConnection *connection)
return true;
}

#if 0
/*
* TODO: if we want to avoid shutting down the connection when aborting or committing 2PC
* distributed transactions, we need do the following:
* 1. in background worker, we need consume results of this connection
* 2. in assignning task, we should not use the this connection until the transaction is idle
*/
if (Enable2PCQuickResponse &&
(connection->remoteTransaction.transactionState == REMOTE_TRANS_2PC_COMMITTING ||
connection->remoteTransaction.transactionState == REMOTE_TRANS_2PC_ABORTING))
{
/*
* As RemoteTransactionIdle() is called from ShouldShutdownConnection() only, when
* we are aborting or committing a 2PC distributed transaction, we keep the transaction
* state by returning true to avoid shutting down the connection.
*/
return true;
}
#endif

return PQtransactionStatus(connection->pgConn) == PQTRANS_IDLE;
}

Expand Down Expand Up @@ -1697,7 +1739,7 @@ PrintConnectionHash(void)
while ((entry = (ConnectionHashEntry *) hash_seq_search(&status)) != NULL)
{
dlist_iter iter;
elog(NOTICE, "key(Hostname: %s, Port: %d, User: %s, Database: %s, Replication: %s):",
elog(NOTICE, "key(Hostname=%s Port=%d User=%s Database=%s Replication=%s):",
entry->key.hostname,
entry->key.port,
entry->key.user,
Expand All @@ -1706,12 +1748,13 @@ PrintConnectionHash(void)
dlist_foreach(iter, entry->connections)
{
MultiConnection *connection = dlist_container(MultiConnection, connectionNode, iter.cur);
elog(NOTICE, "value(Connection: %lu, xact status: %d, pqstatus: %d, busy: %s, Claimed: %s)",
elog(NOTICE, "value(Connection=%lu connectionState=%d xactStatus=%d pqstatus=%d busy=%s Claimed=%s)",
connection->connectionId,
connection->connectionState,
PQtransactionStatus(connection->pgConn),
PQstatus(connection->pgConn),
PQisBusy(connection->pgConn) ? "true" : "false",
connection->claimedExclusively ? "true" : "false");
}
}
}
}
5 changes: 4 additions & 1 deletion src/backend/distributed/connection/remote_commands.c
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ int RemoteCopyFlushThreshold = 8 * 1024 * 1024;
/* GUC, determining whether statements sent to remote nodes are logged */
bool LogRemoteCommands = false;
char *GrepRemoteCommands = "";
bool Enable2PCQuickResponse = false;


static bool ClearResultsInternal(MultiConnection *connection, bool raiseErrors,
Expand Down Expand Up @@ -986,7 +987,9 @@ WaitForAllConnections(List *connectionList, bool raiseInterrupts)
}
else if (sendStatus == 0)
{
if (connection->remoteTransaction.transactionState == REMOTE_TRANS_2PC_COMMITTING)
if (Enable2PCQuickResponse &&
(connection->remoteTransaction.transactionState == REMOTE_TRANS_2PC_COMMITTING ||
connection->remoteTransaction.transactionState == REMOTE_TRANS_2PC_ABORTING))
{
/* we dont wait for 2pc committing response */
connectionIsReady = true;
Expand Down
67 changes: 64 additions & 3 deletions src/backend/distributed/executor/adaptive_executor.c
Original file line number Diff line number Diff line change
Expand Up @@ -788,6 +788,16 @@ AdaptiveExecutor(CitusScanState *scanState)
Job *job = distributedPlan->workerJob;
List *taskList = job->taskList;

#if IN_MY_DEBUGGING_PHASE
{
if (LogRemoteCommands)
{
elog(NOTICE, "AdaptiveExecutor: taskList size: %d", list_length(taskList));
PrintConnectionHash();
}
}
#endif

/* we should only call this once before the scan finished */
Assert(!scanState->finishedRemoteScan);

Expand Down Expand Up @@ -883,6 +893,14 @@ AdaptiveExecutor(CitusScanState *scanState)
RunDistributedExecution(execution);
}

#if IN_MY_DEBUGGING_PHASE
if (LogRemoteCommands)
{
elog(NOTICE, "end of RunDistributedExecution in AdaptiveExecutor: taskList size: %d", list_length(taskList));
PrintConnectionHash();
}
#endif

/* execute tasks local to the node (if any) */
if (list_length(execution->localTaskList) > 0)
{
Expand All @@ -897,6 +915,14 @@ AdaptiveExecutor(CitusScanState *scanState)
}

FinishDistributedExecution(execution);

#if IN_MY_DEBUGGING_PHASE
if (LogRemoteCommands)
{
elog(NOTICE, "end2 of RunDistributedExecution in AdaptiveExecutor: taskList size: %d", list_length(taskList));
PrintConnectionHash();
}
#endif

if (SortReturning && distributedPlan->expectResults && commandType != CMD_SELECT)
{
Expand Down Expand Up @@ -1458,11 +1484,29 @@ AssignTasksToConnectionsOrWorkerPool(DistributedExecution *execution)
int connectionFlags = 0;
char *nodeName = NULL;
int nodePort = 0;

#if IN_MY_DEBUGGING_PHASE
{
if (LogRemoteCommands)
{
elog(NOTICE, "before assign shardId=%ld groupId=%d nodeName=%s nodePort=%d", taskPlacement->shardId, taskPlacement->groupId, taskPlacement->nodeName, taskPlacement->nodePort);
}
}
#endif
LookupTaskPlacementHostAndPort(taskPlacement, &nodeName, &nodePort);

WorkerPool *workerPool = FindOrCreateWorkerPool(execution, nodeName,
nodePort);

#if IN_MY_DEBUGGING_PHASE
{
if (LogRemoteCommands)
{
elog(NOTICE, "after assign shardId=%ld groupId=%d nodeName=%s nodePort=%d", taskPlacement->shardId, taskPlacement->groupId, taskPlacement->nodeName, taskPlacement->nodePort);
}
}
#endif

/*
* Execution of a command on a shard placement, which may not always
* happen if the query is read-only and the shard has multiple placements.
Expand Down Expand Up @@ -1517,9 +1561,15 @@ AssignTasksToConnectionsOrWorkerPool(DistributedExecution *execution)
WorkerSession *session =
FindOrCreateWorkerSession(workerPool, connection);

ereport(DEBUG4, (errmsg("Session %ld (%s:%d) has an assigned task",
session->sessionId, connection->hostname,
connection->port)));
#if IN_MY_DEBUGGING_PHASE
if (LogRemoteCommands)
{
ereport(NOTICE, (errmsg("Session %ld (%s:%d) has an assigned task shardId=%ld connectionState=%d",
session->sessionId, connection->hostname,
connection->port, session->currentTask->shardPlacement->shardId,
connection->connectionState)));
}
#endif

placementExecution->assignedSession = session;

Expand Down Expand Up @@ -3009,6 +3059,17 @@ ConnectionStateMachine(WorkerSession *session)
do {
currentState = connection->connectionState;

#if IN_MY_DEBUGGING_PHASE
{
if (LogRemoteCommands)
{
elog(NOTICE, "ConnectionStateMachine: connection=%lu state=%d node=%s:%d, pqstatus=%d",
connection->connectionId, connection->connectionState, connection->hostname,
connection->port, PQstatus(connection->pgConn));
}
}
#endif

switch (currentState)
{
case MULTI_CONNECTION_INITIAL:
Expand Down
10 changes: 10 additions & 0 deletions src/backend/distributed/shared_library_init.c
Original file line number Diff line number Diff line change
Expand Up @@ -994,6 +994,16 @@ RegisterCitusConfigVariables(void)
GUC_NO_SHOW_ALL | GUC_NOT_IN_SAMPLE,
NULL, NULL, NULL);

DefineCustomBoolVariable(
"citus.allow_2pc_quick_response",
gettext_noop("Enables to not wait response for 2pc last phase(commit or abort)"),
NULL,
&Enable2PCQuickResponse,
false,
PGC_USERSET,
GUC_NO_SHOW_ALL | GUC_NOT_IN_SAMPLE,
NULL, NULL, NULL);

DefineCustomIntVariable(
"citus.background_task_queue_interval",
gettext_noop("Time to wait between checks for scheduled background tasks."),
Expand Down
8 changes: 6 additions & 2 deletions src/backend/distributed/transaction/remote_transaction.c
Original file line number Diff line number Diff line change
Expand Up @@ -578,9 +578,9 @@ FinishRemoteTransactionCommit(MultiConnection *connection)
transaction->transactionState == REMOTE_TRANS_1PC_COMMITTING ||
transaction->transactionState == REMOTE_TRANS_2PC_COMMITTING);

if (transaction->transactionState == REMOTE_TRANS_2PC_COMMITTING ||
transaction->transactionState == REMOTE_TRANS_2PC_ABORTING)
if (Enable2PCQuickResponse && transaction->transactionState == REMOTE_TRANS_2PC_COMMITTING)
{
/* we don't set transactionState to COMMITTED here, as ShutdownConnection() will depend on this state */
return;
}

Expand Down Expand Up @@ -1042,6 +1042,10 @@ ResetRemoteTransaction(struct MultiConnection *connection)
{
/* XXX: Should we error out for a critical transaction? */

if (Enable2PCQuickResponse && LogRemoteCommands)
{
elog(NOTICE, "ResetRemoteTransaction for connection %s:%d", connection->hostname, connection->port);
}
dlist_delete(&connection->transactionNode);
connection->transactionInProgress = false;
memset(&connection->transactionNode, 0, sizeof(connection->transactionNode));
Expand Down
1 change: 1 addition & 0 deletions src/include/distributed/remote_commands.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
/* GUC, determining whether statements sent to remote nodes are logged */
extern bool LogRemoteCommands;
extern char *GrepRemoteCommands;
extern bool Enable2PCQuickResponse;

/* GUC that determines the number of bytes after which remote COPY is flushed */
extern int RemoteCopyFlushThreshold;
Expand Down

0 comments on commit e1b21e6

Please sign in to comment.