Skip to content

Commit

Permalink
Add failure handling for CREATE DATABASE commands
Browse files Browse the repository at this point in the history
  • Loading branch information
onurctirtir committed Feb 6, 2024
1 parent 594cb6f commit 2898212
Show file tree
Hide file tree
Showing 4 changed files with 156 additions and 12 deletions.
110 changes: 101 additions & 9 deletions src/backend/distributed/commands/database.c
Original file line number Diff line number Diff line change
Expand Up @@ -40,15 +40,32 @@
#include "distributed/deparse_shard_query.h"
#include "distributed/deparser.h"
#include "distributed/listutils.h"
#include "distributed/local_executor.h"
#include "distributed/metadata/distobject.h"
#include "distributed/metadata_sync.h"
#include "distributed/metadata_utility.h"
#include "distributed/multi_executor.h"
#include "distributed/relation_access_tracking.h"
#include "distributed/serialize_distributed_ddls.h"
#include "distributed/shard_cleaner.h"
#include "distributed/worker_protocol.h"
#include "distributed/worker_transaction.h"


/*
* Used to save original name of the database before it is replaced with a
* temporary name for failure handling purposes in PreprocessCreateDatabaseStmt().
*/
static char *CreateDatabaseCommandOriginalDbName = NULL;


/*
* The format string used when creating a temporary databases for failure
* handling purposes.
*/
#define TEMP_DATABASE_NAME_FMT "citus_temp_database_%lu"


/*
* DatabaseCollationInfo is used to store collation related information of a database.
*/
Expand Down Expand Up @@ -453,7 +470,12 @@ PreprocessAlterDatabaseSetStmt(Node *node, const char *queryString,
*
* In this stage, we perform validations that we want to ensure before delegating to
* previous utility hooks because it might not be convenient to throw an error in an
* implicit transaction that creates a database.
* implicit transaction that creates a database. Also in this stage, we save the original
* database name and replace dbname field with a temporary name for failure handling
* purposes. We let Postgres create the database with the temporary name, insert a cleanup
* record for the temporary database name on all workers and let PostprocessCreateDatabaseStmt()
* to return the distributed DDL job that both creates the database with the temporary name
* and then renames it back to its original name.
*
* We also serialize database commands globally by acquiring a Citus specific advisory
* lock based on OCLASS_DATABASE on the first primary worker node.
Expand All @@ -474,17 +496,45 @@ PreprocessCreateDatabaseStmt(Node *node, const char *queryString,

SerializeDistributedDDLsOnObjectClass(OCLASS_DATABASE);

OperationId operationId = RegisterOperationNeedingCleanup();

Check warning on line 499 in src/backend/distributed/commands/database.c

View check run for this annotation

Codecov / codecov/patch

src/backend/distributed/commands/database.c#L499

Added line #L499 was not covered by tests

char *tempDatabaseName = psprintf(TEMP_DATABASE_NAME_FMT, operationId);

Check warning on line 501 in src/backend/distributed/commands/database.c

View check run for this annotation

Codecov / codecov/patch

src/backend/distributed/commands/database.c#L501

Added line #L501 was not covered by tests

List *allNodes = TargetWorkerSetNodeList(ALL_SHARD_NODES, RowShareLock);
WorkerNode *workerNode = NULL;
foreach_ptr(workerNode, allNodes)

Check warning on line 505 in src/backend/distributed/commands/database.c

View check run for this annotation

Codecov / codecov/patch

src/backend/distributed/commands/database.c#L503-L505

Added lines #L503 - L505 were not covered by tests
{
InsertCleanupRecordInSubtransaction(

Check warning on line 507 in src/backend/distributed/commands/database.c

View check run for this annotation

Codecov / codecov/patch

src/backend/distributed/commands/database.c#L507

Added line #L507 was not covered by tests
CLEANUP_OBJECT_DATABASE,
pstrdup(quote_identifier(tempDatabaseName)),
workerNode->groupId,
CLEANUP_ON_FAILURE
);
}

CreateDatabaseCommandOriginalDbName = stmt->dbname;
stmt->dbname = tempDatabaseName;

Check warning on line 516 in src/backend/distributed/commands/database.c

View check run for this annotation

Codecov / codecov/patch

src/backend/distributed/commands/database.c#L515-L516

Added lines #L515 - L516 were not covered by tests

return NIL;
}


/*
* PostprocessCreateDatabaseStmt is executed after the statement is applied to the local
* postgres instance. In this stage we prepare the commands that need to be run on
* all workers to create the database. Since the CREATE DATABASE statement gives error
* in a transaction block, we need to use NontransactionalNodeDDLTaskList to send the
* CREATE DATABASE statement to the workers.
* postgres instance.
*
* In this stage, we first rename the temporary database back to its original name for
* local node and then return a list of distributed DDL jobs to create the database with
* the temporary name and then to rename it back to its original name. That way, if CREATE
* DATABASE fails on any of the nodes, the temporary database will be cleaned up by the
* cleanup records that we inserted in PreprocessCreateDatabaseStmt() and in case of a
* failure, we won't leak any databases called as the name that user intended to use for
* the database.
*
* Also, while we use NontransactionalNodeDDLTaskList() to send the CREATE DATABASE statement
* to the workers because CREATE DATABASE statements are not allowed in a transaction block,
* we use NodeDDLTaskList() to send the RENAME DATABASE statement to the workers because we
* want to execute it in a transaction block.
*/
List *
PostprocessCreateDatabaseStmt(Node *node, const char *queryString)
Expand All @@ -508,11 +558,53 @@ PostprocessCreateDatabaseStmt(Node *node, const char *queryString)

char *createDatabaseCommand = DeparseTreeNode(node);

List *commands = list_make3(DISABLE_DDL_PROPAGATION,
(void *) createDatabaseCommand,
ENABLE_DDL_PROPAGATION);
List *createDatabaseCommands = list_make3(DISABLE_DDL_PROPAGATION,

Check warning on line 561 in src/backend/distributed/commands/database.c

View check run for this annotation

Codecov / codecov/patch

src/backend/distributed/commands/database.c#L561

Added line #L561 was not covered by tests
(void *) createDatabaseCommand,
ENABLE_DDL_PROPAGATION);

return NontransactionalNodeDDLTaskList(REMOTE_NODES, commands);
List *createDatabaseDDLJobList =
NontransactionalNodeDDLTaskList(REMOTE_NODES, createDatabaseCommands);

Check warning on line 566 in src/backend/distributed/commands/database.c

View check run for this annotation

Codecov / codecov/patch

src/backend/distributed/commands/database.c#L565-L566

Added lines #L565 - L566 were not covered by tests

CreatedbStmt *stmt = castNode(CreatedbStmt, node);

Check warning on line 568 in src/backend/distributed/commands/database.c

View check run for this annotation

Codecov / codecov/patch

src/backend/distributed/commands/database.c#L568

Added line #L568 was not covered by tests

char *renameDatabaseCommand =
psprintf("ALTER DATABASE %s RENAME TO %s",
quote_identifier(stmt->dbname),

Check warning on line 572 in src/backend/distributed/commands/database.c

View check run for this annotation

Codecov / codecov/patch

src/backend/distributed/commands/database.c#L570-L572

Added lines #L570 - L572 were not covered by tests
quote_identifier(CreateDatabaseCommandOriginalDbName));

List *renameDatabaseCommands = list_make3(DISABLE_DDL_PROPAGATION,

Check warning on line 575 in src/backend/distributed/commands/database.c

View check run for this annotation

Codecov / codecov/patch

src/backend/distributed/commands/database.c#L575

Added line #L575 was not covered by tests
renameDatabaseCommand,
ENABLE_DDL_PROPAGATION);

List *renameDatabaseDDLJobList =
NodeDDLTaskList(REMOTE_NODES, renameDatabaseCommands);

Check warning on line 580 in src/backend/distributed/commands/database.c

View check run for this annotation

Codecov / codecov/patch

src/backend/distributed/commands/database.c#L579-L580

Added lines #L579 - L580 were not covered by tests

/*
* Temporarily disable citus.enable_ddl_propagation before issuing
* rename command locally because we don't want to execute it on remote
* nodes yet. We will execute it on remote nodes by returning it as a
* distributed DDL job.
*
* The reason why we don't want to execute it on remote nodes yet is that
* the database is not created on remote nodes yet.
*/
int saveNestLevel = NewGUCNestLevel();
set_config_option("citus.enable_ddl_propagation", "off",
(superuser() ? PGC_SUSET : PGC_USERSET), PGC_S_SESSION,

Check warning on line 593 in src/backend/distributed/commands/database.c

View check run for this annotation

Codecov / codecov/patch

src/backend/distributed/commands/database.c#L591-L593

Added lines #L591 - L593 were not covered by tests
GUC_ACTION_LOCAL, true, 0, false);

ExecuteUtilityCommand(renameDatabaseCommand);

Check warning on line 596 in src/backend/distributed/commands/database.c

View check run for this annotation

Codecov / codecov/patch

src/backend/distributed/commands/database.c#L596

Added line #L596 was not covered by tests

AtEOXact_GUC(true, saveNestLevel);

Check warning on line 598 in src/backend/distributed/commands/database.c

View check run for this annotation

Codecov / codecov/patch

src/backend/distributed/commands/database.c#L598

Added line #L598 was not covered by tests

/*
* Restore the original database name because MarkObjectDistributed()
* resolves oid of the object based on the database name and is called
* after executing the distributed DDL job that renames temporary database.
*/
stmt->dbname = CreateDatabaseCommandOriginalDbName;

Check warning on line 605 in src/backend/distributed/commands/database.c

View check run for this annotation

Codecov / codecov/patch

src/backend/distributed/commands/database.c#L605

Added line #L605 was not covered by tests

return list_concat(createDatabaseDDLJobList, renameDatabaseDDLJobList);
}

Check warning on line 608 in src/backend/distributed/commands/database.c

View check run for this annotation

Codecov / codecov/patch

src/backend/distributed/commands/database.c#L607-L608

Added lines #L607 - L608 were not covered by tests


Expand Down
51 changes: 51 additions & 0 deletions src/backend/distributed/operations/shard_cleaner.c
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,8 @@ static bool TryDropReplicationSlotOutsideTransaction(char *replicationSlotName,
char *nodeName,
int nodePort);
static bool TryDropUserOutsideTransaction(char *username, char *nodeName, int nodePort);
static bool TryDropDatabaseOutsideTransaction(char *databaseName, char *nodeName,
int nodePort);

static CleanupRecord * GetCleanupRecordByNameAndType(char *objectName,
CleanupObject type);
Expand Down Expand Up @@ -603,6 +605,12 @@ TryDropResourceByCleanupRecordOutsideTransaction(CleanupRecord *record,
return TryDropUserOutsideTransaction(record->objectName, nodeName, nodePort);
}

case CLEANUP_OBJECT_DATABASE:
{
return TryDropDatabaseOutsideTransaction(record->objectName, nodeName,

Check warning on line 610 in src/backend/distributed/operations/shard_cleaner.c

View check run for this annotation

Codecov / codecov/patch

src/backend/distributed/operations/shard_cleaner.c#L610

Added line #L610 was not covered by tests
nodePort);
}

default:
{
ereport(WARNING, (errmsg(
Expand Down Expand Up @@ -883,6 +891,49 @@ TryDropUserOutsideTransaction(char *username,
}


/*
* TryDropDatabaseOutsideTransaction drops the database with the given name
* if it exists.
*/
static bool
TryDropDatabaseOutsideTransaction(char *databaseName, char *nodeName, int nodePort)

Check warning on line 899 in src/backend/distributed/operations/shard_cleaner.c

View check run for this annotation

Codecov / codecov/patch

src/backend/distributed/operations/shard_cleaner.c#L899

Added line #L899 was not covered by tests
{
int connectionFlags = OUTSIDE_TRANSACTION;
MultiConnection *connection = GetNodeUserDatabaseConnection(connectionFlags,

Check warning on line 902 in src/backend/distributed/operations/shard_cleaner.c

View check run for this annotation

Codecov / codecov/patch

src/backend/distributed/operations/shard_cleaner.c#L901-L902

Added lines #L901 - L902 were not covered by tests
nodeName, nodePort,
CitusExtensionOwnerName(),

Check warning on line 904 in src/backend/distributed/operations/shard_cleaner.c

View check run for this annotation

Codecov / codecov/patch

src/backend/distributed/operations/shard_cleaner.c#L904

Added line #L904 was not covered by tests
NULL);

if (PQstatus(connection->pgConn) != CONNECTION_OK)
{
return false;
}

/*
* The DROP DATABASE command should not propagate, so we disable DDL
* propagation.
*
* TODO: would it be problem to not use LOCAL (because we cannot use it)
* TODO: what about lock timeout? cannot afford leaking it to next command ..
*/
List *commandList = list_make2(

Check warning on line 919 in src/backend/distributed/operations/shard_cleaner.c

View check run for this annotation

Codecov / codecov/patch

src/backend/distributed/operations/shard_cleaner.c#L919

Added line #L919 was not covered by tests
"SET citus.enable_ddl_propagation TO OFF;",
psprintf("DROP DATABASE IF EXISTS %s;", quote_identifier(databaseName))
);

const char *commandString = NULL;
foreach_ptr(commandString, commandList)

Check warning on line 925 in src/backend/distributed/operations/shard_cleaner.c

View check run for this annotation

Codecov / codecov/patch

src/backend/distributed/operations/shard_cleaner.c#L924-L925

Added lines #L924 - L925 were not covered by tests
{
if (ExecuteOptionalRemoteCommand(connection, commandString, NULL) != 0)
{
return false;
}
}

return true;
}


/*
* ErrorIfCleanupRecordForShardExists errors out if a cleanup record for the given
* shard name exists.
Expand Down
3 changes: 2 additions & 1 deletion src/include/distributed/shard_cleaner.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@ typedef enum CleanupObject
CLEANUP_OBJECT_SUBSCRIPTION = 2,
CLEANUP_OBJECT_REPLICATION_SLOT = 3,
CLEANUP_OBJECT_PUBLICATION = 4,
CLEANUP_OBJECT_USER = 5
CLEANUP_OBJECT_USER = 5,
CLEANUP_OBJECT_DATABASE = 6
} CleanupObject;

/*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -429,9 +429,9 @@ set citus.enable_create_database_propagation=on;
SET citus.log_remote_commands = true;
set citus.grep_remote_commands = '%CREATE DATABASE%';
create database "mydatabase#1'2";
NOTICE: issuing CREATE DATABASE "mydatabase#1'2"
NOTICE: issuing CREATE DATABASE citus_temp_database_14
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing CREATE DATABASE "mydatabase#1'2"
NOTICE: issuing CREATE DATABASE citus_temp_database_14
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
set citus.grep_remote_commands = '%DROP DATABASE%';
drop database if exists "mydatabase#1'2";
Expand Down

0 comments on commit 2898212

Please sign in to comment.