Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add failure handling for CREATE DATABASE commands #7483

Merged
merged 18 commits into from
Feb 23, 2024
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
107 changes: 103 additions & 4 deletions src/backend/distributed/commands/database.c
Original file line number Diff line number Diff line change
Expand Up @@ -40,15 +40,38 @@
#include "distributed/deparse_shard_query.h"
#include "distributed/deparser.h"
#include "distributed/listutils.h"
#include "distributed/local_executor.h"
#include "distributed/metadata/distobject.h"
#include "distributed/metadata_sync.h"
#include "distributed/metadata_utility.h"
#include "distributed/multi_executor.h"
#include "distributed/relation_access_tracking.h"
#include "distributed/serialize_distributed_ddls.h"
#include "distributed/shard_cleaner.h"
#include "distributed/worker_protocol.h"
#include "distributed/worker_transaction.h"


/*
* Used to save original name of the database before it is replaced with a
* temporary name for failure handling purposes in PreprocessCreateDatabaseStmt().
*/
static char *CreateDatabaseCommandOriginalDbName = NULL;


/*
* The format string used when creating a temporary databases for failure
* handling purposes.
*
* The fields are as follows to ensure using a unique name for each temporary
* database:
* - operationId: The operation id returned by RegisterOperationNeedingCleanup().
* - groupId: The group id of the worker node where CREATE DATABASE command
* is issued from.
*/
#define TEMP_DATABASE_NAME_FMT "citus_temp_database_%lu_%d"


/*
* DatabaseCollationInfo is used to store collation related information of a database.
*/
Expand Down Expand Up @@ -453,7 +476,12 @@ PreprocessAlterDatabaseSetStmt(Node *node, const char *queryString,
*
* In this stage, we perform validations that we want to ensure before delegating to
* previous utility hooks because it might not be convenient to throw an error in an
* implicit transaction that creates a database.
* implicit transaction that creates a database. Also in this stage, we save the original
* database name and replace dbname field with a temporary name for failure handling
* purposes. We let Postgres create the database with the temporary name, insert a cleanup
* record for the temporary database name on all workers and let PostprocessCreateDatabaseStmt()
* to return the distributed DDL job that both creates the database with the temporary name
* and then renames it back to its original name.
*
* We also serialize database commands globally by acquiring a Citus specific advisory
* lock based on OCLASS_DATABASE on the first primary worker node.
Expand All @@ -474,14 +502,41 @@ PreprocessCreateDatabaseStmt(Node *node, const char *queryString,

SerializeDistributedDDLsOnObjectClass(OCLASS_DATABASE);

OperationId operationId = RegisterOperationNeedingCleanup();

char *tempDatabaseName = psprintf(TEMP_DATABASE_NAME_FMT,
operationId, GetLocalGroupId());

List *allNodes = TargetWorkerSetNodeList(ALL_SHARD_NODES, RowShareLock);
WorkerNode *workerNode = NULL;
foreach_ptr(workerNode, allNodes)
{
InsertCleanupRecordInSubtransaction(
CLEANUP_OBJECT_DATABASE,
pstrdup(quote_identifier(tempDatabaseName)),
workerNode->groupId,
CLEANUP_ON_FAILURE
);
}

CreateDatabaseCommandOriginalDbName = stmt->dbname;
stmt->dbname = tempDatabaseName;

return NIL;
}


/*
* PostprocessCreateDatabaseStmt is executed after the statement is applied to the local
* postgres instance. In this stage we prepare the commands that need to be run on
* all workers to create the database.
* postgres instance.
*
* In this stage, we first rename the temporary database back to its original name for
* local node and then return a list of distributed DDL jobs to create the database with
* the temporary name and then to rename it back to its original name. That way, if CREATE
* DATABASE fails on any of the nodes, the temporary database will be cleaned up by the
* cleanup records that we inserted in PreprocessCreateDatabaseStmt() and in case of a
* failure, we won't leak any databases called as the name that user intended to use for
* the database.
*
*/
List *
Expand Down Expand Up @@ -517,7 +572,51 @@ PostprocessCreateDatabaseStmt(Node *node, const char *queryString)
*/
List *createDatabaseDDLJobList =
NontransactionalNodeDDLTaskList(REMOTE_NODES, createDatabaseCommands);
return createDatabaseDDLJobList;

CreatedbStmt *stmt = castNode(CreatedbStmt, node);

char *renameDatabaseCommand =
psprintf("ALTER DATABASE %s RENAME TO %s",
quote_identifier(stmt->dbname),
quote_identifier(CreateDatabaseCommandOriginalDbName));

List *renameDatabaseCommands = list_make3(DISABLE_DDL_PROPAGATION,
renameDatabaseCommand,
ENABLE_DDL_PROPAGATION);

/*
* We use NodeDDLTaskList() to send the RENAME DATABASE statement to the
* workers because we want to execute it in a coordinated transaction.
*/
List *renameDatabaseDDLJobList =
NodeDDLTaskList(REMOTE_NODES, renameDatabaseCommands);

/*
* Temporarily disable citus.enable_ddl_propagation before issuing
* rename command locally because we don't want to execute it on remote
* nodes yet. We will execute it on remote nodes by returning it as a
* distributed DDL job.
*
* The reason why we don't want to execute it on remote nodes yet is that
* the database is not created on remote nodes yet.
*/
int saveNestLevel = NewGUCNestLevel();
set_config_option("citus.enable_ddl_propagation", "off",
(superuser() ? PGC_SUSET : PGC_USERSET), PGC_S_SESSION,
GUC_ACTION_LOCAL, true, 0, false);

ExecuteUtilityCommand(renameDatabaseCommand);

AtEOXact_GUC(true, saveNestLevel);

/*
* Restore the original database name because MarkObjectDistributed()
* resolves oid of the object based on the database name and is called
* after executing the distributed DDL job that renames temporary database.
*/
stmt->dbname = CreateDatabaseCommandOriginalDbName;

return list_concat(createDatabaseDDLJobList, renameDatabaseDDLJobList);
}


Expand Down
73 changes: 66 additions & 7 deletions src/backend/distributed/operations/shard_cleaner.c
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,8 @@ static bool TryDropReplicationSlotOutsideTransaction(char *replicationSlotName,
char *nodeName,
int nodePort);
static bool TryDropUserOutsideTransaction(char *username, char *nodeName, int nodePort);
static bool TryDropDatabaseOutsideTransaction(char *databaseName, char *nodeName,
int nodePort);

static CleanupRecord * GetCleanupRecordByNameAndType(char *objectName,
CleanupObject type);
Expand Down Expand Up @@ -141,7 +143,6 @@ Datum
citus_cleanup_orphaned_resources(PG_FUNCTION_ARGS)
{
CheckCitusVersion(ERROR);
EnsureCoordinator();
PreventInTransactionBlock(true, "citus_cleanup_orphaned_resources");

int droppedCount = DropOrphanedResourcesForCleanup();
Expand Down Expand Up @@ -245,12 +246,6 @@ TryDropOrphanedResources()
static int
DropOrphanedResourcesForCleanup()
{
/* Only runs on Coordinator */
if (!IsCoordinator())
{
return 0;
}

List *cleanupRecordList = ListCleanupRecords();

/*
Expand Down Expand Up @@ -603,6 +598,12 @@ TryDropResourceByCleanupRecordOutsideTransaction(CleanupRecord *record,
return TryDropUserOutsideTransaction(record->objectName, nodeName, nodePort);
}

case CLEANUP_OBJECT_DATABASE:
{
return TryDropDatabaseOutsideTransaction(record->objectName, nodeName,
nodePort);
}

default:
{
ereport(WARNING, (errmsg(
Expand Down Expand Up @@ -883,6 +884,64 @@ TryDropUserOutsideTransaction(char *username,
}


/*
* TryDropDatabaseOutsideTransaction drops the database with the given name
* if it exists.
*/
static bool
TryDropDatabaseOutsideTransaction(char *databaseName, char *nodeName, int nodePort)
{
int connectionFlags = (OUTSIDE_TRANSACTION | FORCE_NEW_CONNECTION);
MultiConnection *connection = GetNodeUserDatabaseConnection(connectionFlags,
nodeName, nodePort,
CitusExtensionOwnerName(),
NULL);

if (PQstatus(connection->pgConn) != CONNECTION_OK)
{
return false;
}

/*
* We want to disable DDL propagation and set lock_timeout before issuing
* the DROP DATABASE command but we cannot do so in a way that's scoped
* to the DROP DATABASE command. This is because, we cannot use a
* transaction block for the DROP DATABASE command.
*
* For this reason, to avoid leaking the lock_timeout and DDL propagation
* settings to future commands, we force the connection to close at the end
* of the transaction.
*/
ForceConnectionCloseAtTransactionEnd(connection);

/*
* The DROP DATABASE command should not propagate, so we disable DDL
* propagation.
*/
List *commandList = list_make3(
"SET lock_timeout TO '1s'",
"SET citus.enable_ddl_propagation TO OFF;",
psprintf("DROP DATABASE IF EXISTS %s;", quote_identifier(databaseName))
);

bool executeCommand = true;

const char *commandString = NULL;
foreach_ptr(commandString, commandList)
{
if (ExecuteOptionalRemoteCommand(connection, commandString, NULL) !=
RESPONSE_OKAY)
{
executeCommand = false;
break;
}
}

CloseConnection(connection);
return executeCommand;
}


/*
* ErrorIfCleanupRecordForShardExists errors out if a cleanup record for the given
* shard name exists.
Expand Down
3 changes: 2 additions & 1 deletion src/include/distributed/shard_cleaner.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@ typedef enum CleanupObject
CLEANUP_OBJECT_SUBSCRIPTION = 2,
CLEANUP_OBJECT_REPLICATION_SLOT = 3,
CLEANUP_OBJECT_PUBLICATION = 4,
CLEANUP_OBJECT_USER = 5
CLEANUP_OBJECT_USER = 5,
CLEANUP_OBJECT_DATABASE = 6
} CleanupObject;

/*
Expand Down
10 changes: 10 additions & 0 deletions src/test/regress/expected/alter_database_propagation.out
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,12 @@ DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing ALTER DATABASE regression RESET lock_timeout
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
set citus.enable_create_database_propagation=on;
SET citus.next_operation_id TO 3000;
create database "regression!'2";
NOTICE: issuing ALTER DATABASE citus_temp_database_3000_0 RENAME TO "regression!'2"
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing ALTER DATABASE citus_temp_database_3000_0 RENAME TO "regression!'2"
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
alter database "regression!'2" with CONNECTION LIMIT 100;
NOTICE: issuing ALTER DATABASE "regression!'2" WITH CONNECTION LIMIT 100;
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
Expand Down Expand Up @@ -189,7 +194,12 @@ alter DATABASE local_regression rename to local_regression2;
drop database local_regression2;
set citus.enable_create_database_propagation=on;
drop database regression3;
SET citus.next_operation_id TO 3100;
create database "regression!'4";
NOTICE: issuing ALTER DATABASE citus_temp_database_3100_0 RENAME TO "regression!'4"
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing ALTER DATABASE citus_temp_database_3100_0 RENAME TO "regression!'4"
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
SELECT result FROM run_command_on_all_nodes(
$$
ALTER TABLESPACE alter_db_tablespace RENAME TO "ts-needs\!escape"
Expand Down
Loading
Loading