Skip to content

Commit

Permalink
Clarify resource-cleaner apis (#7518)
Browse files Browse the repository at this point in the history
Rename InsertCleanupRecordInCurrentTransaction ->
InsertCleanupOnSuccessRecordInCurrentTransaction and hardcode policy
type as CLEANUP_DEFERRED_ON_SUCCESS.

Rename InsertCleanupRecordInSubtransaction ->
InsertCleanupRecordOutsideTransaction.
  • Loading branch information
onurctirtir authored Feb 20, 2024
1 parent 71ccbcf commit 56e014e
Show file tree
Hide file tree
Showing 6 changed files with 79 additions and 74 deletions.
7 changes: 3 additions & 4 deletions src/backend/distributed/operations/delete_protocol.c
Original file line number Diff line number Diff line change
Expand Up @@ -426,10 +426,9 @@ ExecuteDropShardPlacementCommandRemotely(ShardPlacement *shardPlacement,
errdetail("Marking this shard placement for "
"deletion")));

InsertCleanupRecordInCurrentTransaction(CLEANUP_OBJECT_SHARD_PLACEMENT,
shardRelationName,
shardPlacement->groupId,
CLEANUP_DEFERRED_ON_SUCCESS);
InsertCleanupOnSuccessRecordInCurrentTransaction(CLEANUP_OBJECT_SHARD_PLACEMENT,
shardRelationName,
shardPlacement->groupId);

return;
}
Expand Down
31 changes: 18 additions & 13 deletions src/backend/distributed/operations/shard_cleaner.c
Original file line number Diff line number Diff line change
Expand Up @@ -452,15 +452,15 @@ CompareCleanupRecordsByObjectType(const void *leftElement, const void *rightElem


/*
* InsertCleanupRecordInCurrentTransaction inserts a new pg_dist_cleanup entry
* InsertCleanupOnSuccessRecordInCurrentTransaction inserts a new pg_dist_cleanup entry
* as part of the current transaction. This is primarily useful for deferred drop scenarios,
* since these records would roll back in case of operation failure.
* since these records would roll back in case of operation failure. And for the same reason,
* always sets the policy type to CLEANUP_DEFERRED_ON_SUCCESS.
*/
void
InsertCleanupRecordInCurrentTransaction(CleanupObject objectType,
char *objectName,
int nodeGroupId,
CleanupPolicy policy)
InsertCleanupOnSuccessRecordInCurrentTransaction(CleanupObject objectType,
char *objectName,
int nodeGroupId)
{
/* We must have a valid OperationId. Any operation requring cleanup
* will call RegisterOperationNeedingCleanup.
Expand All @@ -482,7 +482,8 @@ InsertCleanupRecordInCurrentTransaction(CleanupObject objectType,
values[Anum_pg_dist_cleanup_object_type - 1] = Int32GetDatum(objectType);
values[Anum_pg_dist_cleanup_object_name - 1] = CStringGetTextDatum(objectName);
values[Anum_pg_dist_cleanup_node_group_id - 1] = Int32GetDatum(nodeGroupId);
values[Anum_pg_dist_cleanup_policy_type - 1] = Int32GetDatum(policy);
values[Anum_pg_dist_cleanup_policy_type - 1] =
Int32GetDatum(CLEANUP_DEFERRED_ON_SUCCESS);

/* open cleanup relation and insert new tuple */
Oid relationId = DistCleanupRelationId();
Expand All @@ -499,23 +500,27 @@ InsertCleanupRecordInCurrentTransaction(CleanupObject objectType,


/*
* InsertCleanupRecordInSubtransaction inserts a new pg_dist_cleanup entry in a
* InsertCleanupRecordOutsideTransaction inserts a new pg_dist_cleanup entry in a
* separate transaction to ensure the record persists after rollback. We should
* delete these records if the operation completes successfully.
*
* For failure scenarios, use a subtransaction (direct insert via localhost).
* This is used in scenarios where we need to cleanup resources on operation
* completion (CLEANUP_ALWAYS) or on failure (CLEANUP_ON_FAILURE).
*/
void
InsertCleanupRecordInSubtransaction(CleanupObject objectType,
char *objectName,
int nodeGroupId,
CleanupPolicy policy)
InsertCleanupRecordOutsideTransaction(CleanupObject objectType,
char *objectName,
int nodeGroupId,
CleanupPolicy policy)
{
/* We must have a valid OperationId. Any operation requring cleanup
* will call RegisterOperationNeedingCleanup.
*/
Assert(CurrentOperationId != INVALID_OPERATION_ID);

/* assert the circumstance noted in function comment */
Assert(policy == CLEANUP_ALWAYS || policy == CLEANUP_ON_FAILURE);

StringInfo sequenceName = makeStringInfo();
appendStringInfo(sequenceName, "%s.%s",
PG_CATALOG,
Expand Down
30 changes: 15 additions & 15 deletions src/backend/distributed/operations/shard_split.c
Original file line number Diff line number Diff line change
Expand Up @@ -733,11 +733,11 @@ CreateSplitShardsForShardGroup(List *shardGroupSplitIntervalListList,
workerPlacementNode->workerPort)));
}

InsertCleanupRecordInSubtransaction(CLEANUP_OBJECT_SHARD_PLACEMENT,
ConstructQualifiedShardName(
shardInterval),
workerPlacementNode->groupId,
CLEANUP_ON_FAILURE);
InsertCleanupRecordOutsideTransaction(CLEANUP_OBJECT_SHARD_PLACEMENT,
ConstructQualifiedShardName(
shardInterval),
workerPlacementNode->groupId,
CLEANUP_ON_FAILURE);

/* Create new split child shard on the specified placement list */
CreateObjectOnPlacement(splitShardCreationCommandList,
Expand Down Expand Up @@ -1717,11 +1717,11 @@ CreateDummyShardsForShardGroup(HTAB *mapOfPlacementToDummyShardList,
/* Log shard in pg_dist_cleanup. Given dummy shards are transient resources,
* we want to cleanup irrespective of operation success or failure.
*/
InsertCleanupRecordInSubtransaction(CLEANUP_OBJECT_SHARD_PLACEMENT,
ConstructQualifiedShardName(
shardInterval),
workerPlacementNode->groupId,
CLEANUP_ALWAYS);
InsertCleanupRecordOutsideTransaction(CLEANUP_OBJECT_SHARD_PLACEMENT,
ConstructQualifiedShardName(
shardInterval),
workerPlacementNode->groupId,
CLEANUP_ALWAYS);

/* Create dummy source shard on the specified placement list */
CreateObjectOnPlacement(splitShardCreationCommandList,
Expand Down Expand Up @@ -1780,11 +1780,11 @@ CreateDummyShardsForShardGroup(HTAB *mapOfPlacementToDummyShardList,
/* Log shard in pg_dist_cleanup. Given dummy shards are transient resources,
* we want to cleanup irrespective of operation success or failure.
*/
InsertCleanupRecordInSubtransaction(CLEANUP_OBJECT_SHARD_PLACEMENT,
ConstructQualifiedShardName(
shardInterval),
sourceWorkerNode->groupId,
CLEANUP_ALWAYS);
InsertCleanupRecordOutsideTransaction(CLEANUP_OBJECT_SHARD_PLACEMENT,
ConstructQualifiedShardName(
shardInterval),
sourceWorkerNode->groupId,
CLEANUP_ALWAYS);

/* Create dummy split child shard on source worker node */
CreateObjectOnPlacement(splitShardCreationCommandList, sourceWorkerNode);
Expand Down
33 changes: 17 additions & 16 deletions src/backend/distributed/operations/shard_transfer.c
Original file line number Diff line number Diff line change
Expand Up @@ -604,10 +604,10 @@ InsertDeferredDropCleanupRecordsForShards(List *shardIntervalList)
* We also log cleanup record in the current transaction. If the current transaction rolls back,
* we do not generate a record at all.
*/
InsertCleanupRecordInCurrentTransaction(CLEANUP_OBJECT_SHARD_PLACEMENT,
qualifiedShardName,
placement->groupId,
CLEANUP_DEFERRED_ON_SUCCESS);
InsertCleanupOnSuccessRecordInCurrentTransaction(
CLEANUP_OBJECT_SHARD_PLACEMENT,
qualifiedShardName,
placement->groupId);
}
}
}
Expand All @@ -634,10 +634,9 @@ InsertCleanupRecordsForShardPlacementsOnNode(List *shardIntervalList,
* We also log cleanup record in the current transaction. If the current transaction rolls back,
* we do not generate a record at all.
*/
InsertCleanupRecordInCurrentTransaction(CLEANUP_OBJECT_SHARD_PLACEMENT,
qualifiedShardName,
groupId,
CLEANUP_DEFERRED_ON_SUCCESS);
InsertCleanupOnSuccessRecordInCurrentTransaction(CLEANUP_OBJECT_SHARD_PLACEMENT,
qualifiedShardName,
groupId);
}
}

Expand Down Expand Up @@ -1393,10 +1392,11 @@ CopyShardTablesViaLogicalReplication(List *shardIntervalList, char *sourceNodeNa
char *tableOwner = TableOwner(shardInterval->relationId);

/* drop the shard we created on the target, in case of failure */
InsertCleanupRecordInSubtransaction(CLEANUP_OBJECT_SHARD_PLACEMENT,
ConstructQualifiedShardName(shardInterval),
GroupForNode(targetNodeName, targetNodePort),
CLEANUP_ON_FAILURE);
InsertCleanupRecordOutsideTransaction(CLEANUP_OBJECT_SHARD_PLACEMENT,
ConstructQualifiedShardName(shardInterval),
GroupForNode(targetNodeName,
targetNodePort),
CLEANUP_ON_FAILURE);

SendCommandListToWorkerOutsideTransaction(targetNodeName, targetNodePort,
tableOwner,
Expand Down Expand Up @@ -1466,10 +1466,11 @@ CopyShardTablesViaBlockWrites(List *shardIntervalList, char *sourceNodeName,
char *tableOwner = TableOwner(shardInterval->relationId);

/* drop the shard we created on the target, in case of failure */
InsertCleanupRecordInSubtransaction(CLEANUP_OBJECT_SHARD_PLACEMENT,
ConstructQualifiedShardName(shardInterval),
GroupForNode(targetNodeName, targetNodePort),
CLEANUP_ON_FAILURE);
InsertCleanupRecordOutsideTransaction(CLEANUP_OBJECT_SHARD_PLACEMENT,
ConstructQualifiedShardName(shardInterval),
GroupForNode(targetNodeName,
targetNodePort),
CLEANUP_ON_FAILURE);

SendCommandListToWorkerOutsideTransaction(targetNodeName, targetNodePort,
tableOwner, ddlCommandList);
Expand Down
32 changes: 16 additions & 16 deletions src/backend/distributed/replication/multi_logical_replication.c
Original file line number Diff line number Diff line change
Expand Up @@ -1335,10 +1335,10 @@ CreatePublications(MultiConnection *connection,

WorkerNode *worker = FindWorkerNode(connection->hostname,
connection->port);
InsertCleanupRecordInSubtransaction(CLEANUP_OBJECT_PUBLICATION,
entry->name,
worker->groupId,
CLEANUP_ALWAYS);
InsertCleanupRecordOutsideTransaction(CLEANUP_OBJECT_PUBLICATION,
entry->name,
worker->groupId,
CLEANUP_ALWAYS);

ExecuteCriticalRemoteCommand(connection, DISABLE_DDL_PROPAGATION);
ExecuteCriticalRemoteCommand(connection, createPublicationCommand->data);
Expand Down Expand Up @@ -1435,10 +1435,10 @@ CreateReplicationSlots(MultiConnection *sourceConnection,

WorkerNode *worker = FindWorkerNode(sourceConnection->hostname,
sourceConnection->port);
InsertCleanupRecordInSubtransaction(CLEANUP_OBJECT_REPLICATION_SLOT,
replicationSlot->name,
worker->groupId,
CLEANUP_ALWAYS);
InsertCleanupRecordOutsideTransaction(CLEANUP_OBJECT_REPLICATION_SLOT,
replicationSlot->name,
worker->groupId,
CLEANUP_ALWAYS);

if (!firstReplicationSlot)
{
Expand Down Expand Up @@ -1506,10 +1506,10 @@ CreateSubscriptions(MultiConnection *sourceConnection,
quote_identifier(GetUserNameFromId(ownerId, false))
)));

InsertCleanupRecordInSubtransaction(CLEANUP_OBJECT_USER,
target->subscriptionOwnerName,
worker->groupId,
CLEANUP_ALWAYS);
InsertCleanupRecordOutsideTransaction(CLEANUP_OBJECT_USER,
target->subscriptionOwnerName,
worker->groupId,
CLEANUP_ALWAYS);

StringInfo conninfo = makeStringInfo();
appendStringInfo(conninfo, "host='%s' port=%d user='%s' dbname='%s' "
Expand Down Expand Up @@ -1567,10 +1567,10 @@ CreateSubscriptions(MultiConnection *sourceConnection,
pfree(createSubscriptionCommand->data);
pfree(createSubscriptionCommand);

InsertCleanupRecordInSubtransaction(CLEANUP_OBJECT_SUBSCRIPTION,
target->subscriptionName,
worker->groupId,
CLEANUP_ALWAYS);
InsertCleanupRecordOutsideTransaction(CLEANUP_OBJECT_SUBSCRIPTION,
target->subscriptionName,
worker->groupId,
CLEANUP_ALWAYS);

ExecuteCriticalRemoteCommand(target->superuserConnection, psprintf(
"ALTER SUBSCRIPTION %s OWNER TO %s",
Expand Down
20 changes: 10 additions & 10 deletions src/include/distributed/shard_cleaner.h
Original file line number Diff line number Diff line change
Expand Up @@ -81,16 +81,16 @@ typedef enum CleanupPolicy
extern OperationId RegisterOperationNeedingCleanup(void);

/*
* InsertCleanupRecordInCurrentTransaction inserts a new pg_dist_cleanup entry
* InsertCleanupOnSuccessRecordInCurrentTransaction inserts a new pg_dist_cleanup entry
* as part of the current transaction.
*
* This is primarily useful for deferred cleanup (CLEANUP_DEFERRED_ON_SUCCESS)
* scenarios, since the records would roll back in case of failure.
* scenarios, since the records would roll back in case of failure. And for the
* same reason, always sets the policy type to CLEANUP_DEFERRED_ON_SUCCESS.
*/
extern void InsertCleanupRecordInCurrentTransaction(CleanupObject objectType,
char *objectName,
int nodeGroupId,
CleanupPolicy policy);
extern void InsertCleanupOnSuccessRecordInCurrentTransaction(CleanupObject objectType,
char *objectName,
int nodeGroupId);

/*
* InsertCleanupRecordInSeparateTransaction inserts a new pg_dist_cleanup entry
Expand All @@ -99,10 +99,10 @@ extern void InsertCleanupRecordInCurrentTransaction(CleanupObject objectType,
* This is used in scenarios where we need to cleanup resources on operation
* completion (CLEANUP_ALWAYS) or on failure (CLEANUP_ON_FAILURE).
*/
extern void InsertCleanupRecordInSubtransaction(CleanupObject objectType,
char *objectName,
int nodeGroupId,
CleanupPolicy policy);
extern void InsertCleanupRecordOutsideTransaction(CleanupObject objectType,
char *objectName,
int nodeGroupId,
CleanupPolicy policy);

/*
* FinalizeOperationNeedingCleanupOnSuccess is be called by an operation to signal
Expand Down

0 comments on commit 56e014e

Please sign in to comment.