Skip to content

Commit

Permalink
Add failure handling for CREATE DATABASE commands
Browse files Browse the repository at this point in the history
  • Loading branch information
onurctirtir committed Feb 6, 2024
1 parent 594cb6f commit 35f64c7
Show file tree
Hide file tree
Showing 7 changed files with 273 additions and 19 deletions.
110 changes: 101 additions & 9 deletions src/backend/distributed/commands/database.c
Original file line number Diff line number Diff line change
Expand Up @@ -40,15 +40,32 @@
#include "distributed/deparse_shard_query.h"
#include "distributed/deparser.h"
#include "distributed/listutils.h"
#include "distributed/local_executor.h"
#include "distributed/metadata/distobject.h"
#include "distributed/metadata_sync.h"
#include "distributed/metadata_utility.h"
#include "distributed/multi_executor.h"
#include "distributed/relation_access_tracking.h"
#include "distributed/serialize_distributed_ddls.h"
#include "distributed/shard_cleaner.h"
#include "distributed/worker_protocol.h"
#include "distributed/worker_transaction.h"


/*
* Used to save original name of the database before it is replaced with a
* temporary name for failure handling purposes in PreprocessCreateDatabaseStmt().
*/
static char *CreateDatabaseCommandOriginalDbName = NULL;


/*
* The format string used when creating a temporary databases for failure
* handling purposes.
*/
#define TEMP_DATABASE_NAME_FMT "citus_temp_database_%lu"


/*
* DatabaseCollationInfo is used to store collation related information of a database.
*/
Expand Down Expand Up @@ -453,7 +470,12 @@ PreprocessAlterDatabaseSetStmt(Node *node, const char *queryString,
*
* In this stage, we perform validations that we want to ensure before delegating to
* previous utility hooks because it might not be convenient to throw an error in an
* implicit transaction that creates a database.
* implicit transaction that creates a database. Also in this stage, we save the original
* database name and replace dbname field with a temporary name for failure handling
* purposes. We let Postgres create the database with the temporary name, insert a cleanup
* record for the temporary database name on all workers and let PostprocessCreateDatabaseStmt()
* to return the distributed DDL job that both creates the database with the temporary name
* and then renames it back to its original name.
*
* We also serialize database commands globally by acquiring a Citus specific advisory
* lock based on OCLASS_DATABASE on the first primary worker node.
Expand All @@ -474,17 +496,45 @@ PreprocessCreateDatabaseStmt(Node *node, const char *queryString,

SerializeDistributedDDLsOnObjectClass(OCLASS_DATABASE);

OperationId operationId = RegisterOperationNeedingCleanup();

Check warning on line 499 in src/backend/distributed/commands/database.c

View check run for this annotation

Codecov / codecov/patch

src/backend/distributed/commands/database.c#L499

Added line #L499 was not covered by tests

char *tempDatabaseName = psprintf(TEMP_DATABASE_NAME_FMT, operationId);

Check warning on line 501 in src/backend/distributed/commands/database.c

View check run for this annotation

Codecov / codecov/patch

src/backend/distributed/commands/database.c#L501

Added line #L501 was not covered by tests

List *allNodes = TargetWorkerSetNodeList(ALL_SHARD_NODES, RowShareLock);
WorkerNode *workerNode = NULL;
foreach_ptr(workerNode, allNodes)

Check warning on line 505 in src/backend/distributed/commands/database.c

View check run for this annotation

Codecov / codecov/patch

src/backend/distributed/commands/database.c#L503-L505

Added lines #L503 - L505 were not covered by tests
{
InsertCleanupRecordInSubtransaction(

Check warning on line 507 in src/backend/distributed/commands/database.c

View check run for this annotation

Codecov / codecov/patch

src/backend/distributed/commands/database.c#L507

Added line #L507 was not covered by tests
CLEANUP_OBJECT_DATABASE,
pstrdup(quote_identifier(tempDatabaseName)),
workerNode->groupId,
CLEANUP_ON_FAILURE
);
}

CreateDatabaseCommandOriginalDbName = stmt->dbname;
stmt->dbname = tempDatabaseName;

Check warning on line 516 in src/backend/distributed/commands/database.c

View check run for this annotation

Codecov / codecov/patch

src/backend/distributed/commands/database.c#L515-L516

Added lines #L515 - L516 were not covered by tests

return NIL;
}


/*
* PostprocessCreateDatabaseStmt is executed after the statement is applied to the local
* postgres instance. In this stage we prepare the commands that need to be run on
* all workers to create the database. Since the CREATE DATABASE statement gives error
* in a transaction block, we need to use NontransactionalNodeDDLTaskList to send the
* CREATE DATABASE statement to the workers.
* postgres instance.
*
* In this stage, we first rename the temporary database back to its original name for
* local node and then return a list of distributed DDL jobs to create the database with
* the temporary name and then to rename it back to its original name. That way, if CREATE
* DATABASE fails on any of the nodes, the temporary database will be cleaned up by the
* cleanup records that we inserted in PreprocessCreateDatabaseStmt() and in case of a
* failure, we won't leak any databases called as the name that user intended to use for
* the database.
*
* Also, while we use NontransactionalNodeDDLTaskList() to send the CREATE DATABASE statement
* to the workers because CREATE DATABASE statements are not allowed in a transaction block,
* we use NodeDDLTaskList() to send the RENAME DATABASE statement to the workers because we
* want to execute it in a transaction block.
*/
List *
PostprocessCreateDatabaseStmt(Node *node, const char *queryString)
Expand All @@ -508,11 +558,53 @@ PostprocessCreateDatabaseStmt(Node *node, const char *queryString)

char *createDatabaseCommand = DeparseTreeNode(node);

List *commands = list_make3(DISABLE_DDL_PROPAGATION,
(void *) createDatabaseCommand,
ENABLE_DDL_PROPAGATION);
List *createDatabaseCommands = list_make3(DISABLE_DDL_PROPAGATION,

Check warning on line 561 in src/backend/distributed/commands/database.c

View check run for this annotation

Codecov / codecov/patch

src/backend/distributed/commands/database.c#L561

Added line #L561 was not covered by tests
(void *) createDatabaseCommand,
ENABLE_DDL_PROPAGATION);

return NontransactionalNodeDDLTaskList(REMOTE_NODES, commands);
List *createDatabaseDDLJobList =
NontransactionalNodeDDLTaskList(REMOTE_NODES, createDatabaseCommands);

Check warning on line 566 in src/backend/distributed/commands/database.c

View check run for this annotation

Codecov / codecov/patch

src/backend/distributed/commands/database.c#L565-L566

Added lines #L565 - L566 were not covered by tests

CreatedbStmt *stmt = castNode(CreatedbStmt, node);

Check warning on line 568 in src/backend/distributed/commands/database.c

View check run for this annotation

Codecov / codecov/patch

src/backend/distributed/commands/database.c#L568

Added line #L568 was not covered by tests

char *renameDatabaseCommand =
psprintf("ALTER DATABASE %s RENAME TO %s",
quote_identifier(stmt->dbname),

Check warning on line 572 in src/backend/distributed/commands/database.c

View check run for this annotation

Codecov / codecov/patch

src/backend/distributed/commands/database.c#L570-L572

Added lines #L570 - L572 were not covered by tests
quote_identifier(CreateDatabaseCommandOriginalDbName));

List *renameDatabaseCommands = list_make3(DISABLE_DDL_PROPAGATION,

Check warning on line 575 in src/backend/distributed/commands/database.c

View check run for this annotation

Codecov / codecov/patch

src/backend/distributed/commands/database.c#L575

Added line #L575 was not covered by tests
renameDatabaseCommand,
ENABLE_DDL_PROPAGATION);

List *renameDatabaseDDLJobList =
NodeDDLTaskList(REMOTE_NODES, renameDatabaseCommands);

Check warning on line 580 in src/backend/distributed/commands/database.c

View check run for this annotation

Codecov / codecov/patch

src/backend/distributed/commands/database.c#L579-L580

Added lines #L579 - L580 were not covered by tests

/*
* Temporarily disable citus.enable_ddl_propagation before issuing
* rename command locally because we don't want to execute it on remote
* nodes yet. We will execute it on remote nodes by returning it as a
* distributed DDL job.
*
* The reason why we don't want to execute it on remote nodes yet is that
* the database is not created on remote nodes yet.
*/
int saveNestLevel = NewGUCNestLevel();
set_config_option("citus.enable_ddl_propagation", "off",
(superuser() ? PGC_SUSET : PGC_USERSET), PGC_S_SESSION,

Check warning on line 593 in src/backend/distributed/commands/database.c

View check run for this annotation

Codecov / codecov/patch

src/backend/distributed/commands/database.c#L591-L593

Added lines #L591 - L593 were not covered by tests
GUC_ACTION_LOCAL, true, 0, false);

ExecuteUtilityCommand(renameDatabaseCommand);

Check warning on line 596 in src/backend/distributed/commands/database.c

View check run for this annotation

Codecov / codecov/patch

src/backend/distributed/commands/database.c#L596

Added line #L596 was not covered by tests

AtEOXact_GUC(true, saveNestLevel);

Check warning on line 598 in src/backend/distributed/commands/database.c

View check run for this annotation

Codecov / codecov/patch

src/backend/distributed/commands/database.c#L598

Added line #L598 was not covered by tests

/*
* Restore the original database name because MarkObjectDistributed()
* resolves oid of the object based on the database name and is called
* after executing the distributed DDL job that renames temporary database.
*/
stmt->dbname = CreateDatabaseCommandOriginalDbName;

Check warning on line 605 in src/backend/distributed/commands/database.c

View check run for this annotation

Codecov / codecov/patch

src/backend/distributed/commands/database.c#L605

Added line #L605 was not covered by tests

return list_concat(createDatabaseDDLJobList, renameDatabaseDDLJobList);

Check warning on line 607 in src/backend/distributed/commands/database.c

View check run for this annotation

Codecov / codecov/patch

src/backend/distributed/commands/database.c#L607

Added line #L607 was not covered by tests
}


Expand Down
58 changes: 51 additions & 7 deletions src/backend/distributed/operations/shard_cleaner.c
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,8 @@ static bool TryDropReplicationSlotOutsideTransaction(char *replicationSlotName,
char *nodeName,
int nodePort);
static bool TryDropUserOutsideTransaction(char *username, char *nodeName, int nodePort);
static bool TryDropDatabaseOutsideTransaction(char *databaseName, char *nodeName,
int nodePort);

static CleanupRecord * GetCleanupRecordByNameAndType(char *objectName,
CleanupObject type);
Expand Down Expand Up @@ -141,7 +143,6 @@ Datum
citus_cleanup_orphaned_resources(PG_FUNCTION_ARGS)
{
CheckCitusVersion(ERROR);
EnsureCoordinator();
PreventInTransactionBlock(true, "citus_cleanup_orphaned_resources");

int droppedCount = DropOrphanedResourcesForCleanup();
Expand Down Expand Up @@ -245,12 +246,6 @@ TryDropOrphanedResources()
static int
DropOrphanedResourcesForCleanup()
{
/* Only runs on Coordinator */
if (!IsCoordinator())
{
return 0;
}

List *cleanupRecordList = ListCleanupRecords();

/*
Expand Down Expand Up @@ -603,6 +598,12 @@ TryDropResourceByCleanupRecordOutsideTransaction(CleanupRecord *record,
return TryDropUserOutsideTransaction(record->objectName, nodeName, nodePort);
}

case CLEANUP_OBJECT_DATABASE:

Check warning on line 601 in src/backend/distributed/operations/shard_cleaner.c

View check run for this annotation

Codecov / codecov/patch

src/backend/distributed/operations/shard_cleaner.c#L601

Added line #L601 was not covered by tests
{
return TryDropDatabaseOutsideTransaction(record->objectName, nodeName,

Check warning on line 603 in src/backend/distributed/operations/shard_cleaner.c

View check run for this annotation

Codecov / codecov/patch

src/backend/distributed/operations/shard_cleaner.c#L603

Added line #L603 was not covered by tests
nodePort);
}

default:
{
ereport(WARNING, (errmsg(
Expand Down Expand Up @@ -883,6 +884,49 @@ TryDropUserOutsideTransaction(char *username,
}


/*
* TryDropDatabaseOutsideTransaction drops the database with the given name
* if it exists.
*/
static bool
TryDropDatabaseOutsideTransaction(char *databaseName, char *nodeName, int nodePort)

Check warning on line 892 in src/backend/distributed/operations/shard_cleaner.c

View check run for this annotation

Codecov / codecov/patch

src/backend/distributed/operations/shard_cleaner.c#L892

Added line #L892 was not covered by tests
{
int connectionFlags = OUTSIDE_TRANSACTION;
MultiConnection *connection = GetNodeUserDatabaseConnection(connectionFlags,

Check warning on line 895 in src/backend/distributed/operations/shard_cleaner.c

View check run for this annotation

Codecov / codecov/patch

src/backend/distributed/operations/shard_cleaner.c#L894-L895

Added lines #L894 - L895 were not covered by tests
nodeName, nodePort,
CitusExtensionOwnerName(),

Check warning on line 897 in src/backend/distributed/operations/shard_cleaner.c

View check run for this annotation

Codecov / codecov/patch

src/backend/distributed/operations/shard_cleaner.c#L897

Added line #L897 was not covered by tests
NULL);

if (PQstatus(connection->pgConn) != CONNECTION_OK)
{
return false;
}

/*
* The DROP DATABASE command should not propagate, so we disable DDL
* propagation.
*
* TODO: would it be problem to not use LOCAL (because we cannot use it)
* TODO: what about lock timeout? cannot afford leaking it to next command ..
*/
List *commandList = list_make2(

Check warning on line 912 in src/backend/distributed/operations/shard_cleaner.c

View check run for this annotation

Codecov / codecov/patch

src/backend/distributed/operations/shard_cleaner.c#L912

Added line #L912 was not covered by tests
"SET citus.enable_ddl_propagation TO OFF;",
psprintf("DROP DATABASE IF EXISTS %s;", quote_identifier(databaseName))
);

const char *commandString = NULL;
foreach_ptr(commandString, commandList)

Check warning on line 918 in src/backend/distributed/operations/shard_cleaner.c

View check run for this annotation

Codecov / codecov/patch

src/backend/distributed/operations/shard_cleaner.c#L917-L918

Added lines #L917 - L918 were not covered by tests
{
if (ExecuteOptionalRemoteCommand(connection, commandString, NULL) != 0)
{
return false;
}
}

return true;
}


/*
* ErrorIfCleanupRecordForShardExists errors out if a cleanup record for the given
* shard name exists.
Expand Down
3 changes: 2 additions & 1 deletion src/include/distributed/shard_cleaner.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@ typedef enum CleanupObject
CLEANUP_OBJECT_SUBSCRIPTION = 2,
CLEANUP_OBJECT_REPLICATION_SLOT = 3,
CLEANUP_OBJECT_PUBLICATION = 4,
CLEANUP_OBJECT_USER = 5
CLEANUP_OBJECT_USER = 5,
CLEANUP_OBJECT_DATABASE = 6
} CleanupObject;

/*
Expand Down
10 changes: 10 additions & 0 deletions src/test/regress/expected/alter_database_propagation.out
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,12 @@ DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing ALTER DATABASE regression RESET lock_timeout
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
set citus.enable_create_database_propagation=on;
SET citus.next_operation_id TO 3000;
create database "regression!'2";
NOTICE: issuing ALTER DATABASE citus_temp_database_3000 RENAME TO "regression!'2"
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing ALTER DATABASE citus_temp_database_3000 RENAME TO "regression!'2"
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
alter database "regression!'2" with CONNECTION LIMIT 100;
NOTICE: issuing ALTER DATABASE "regression!'2" WITH CONNECTION LIMIT 100;
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
Expand Down Expand Up @@ -189,7 +194,12 @@ alter DATABASE local_regression rename to local_regression2;
drop database local_regression2;
set citus.enable_create_database_propagation=on;
drop database regression3;
SET citus.next_operation_id TO 3100;
create database "regression!'4";
NOTICE: issuing ALTER DATABASE citus_temp_database_3100 RENAME TO "regression!'4"
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing ALTER DATABASE citus_temp_database_3100 RENAME TO "regression!'4"
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
SELECT result FROM run_command_on_all_nodes(
$$
ALTER TABLESPACE alter_db_tablespace RENAME TO "ts-needs\!escape"
Expand Down
68 changes: 66 additions & 2 deletions src/test/regress/expected/create_drop_database_propagation.out
Original file line number Diff line number Diff line change
Expand Up @@ -428,10 +428,11 @@ SELECT * FROM public.check_database_on_all_nodes('my_template_database') ORDER B
set citus.enable_create_database_propagation=on;
SET citus.log_remote_commands = true;
set citus.grep_remote_commands = '%CREATE DATABASE%';
SET citus.next_operation_id TO 2000;
create database "mydatabase#1'2";
NOTICE: issuing CREATE DATABASE "mydatabase#1'2"
NOTICE: issuing CREATE DATABASE citus_temp_database_2000
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing CREATE DATABASE "mydatabase#1'2"
NOTICE: issuing CREATE DATABASE citus_temp_database_2000
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
set citus.grep_remote_commands = '%DROP DATABASE%';
drop database if exists "mydatabase#1'2";
Expand Down Expand Up @@ -1264,6 +1265,69 @@ SELECT 1 FROM run_command_on_all_nodes($$REVOKE ALL ON TABLESPACE pg_default FRO

DROP DATABASE no_createdb;
DROP USER no_createdb;
-- Test a failure scenario by trying to create a distributed database that
-- already exists on one of the nodes.
\c - - - :worker_1_port
CREATE DATABASE "test_\!failure";
NOTICE: Citus partially supports CREATE DATABASE for distributed databases
DETAIL: Citus does not propagate CREATE DATABASE command to other nodes
HINT: You can manually create a database and its extensions on other nodes.
\c - - - :master_port
SET citus.enable_create_database_propagation TO ON;
CREATE DATABASE "test_\!failure";
ERROR: database "test_\!failure" already exists
CONTEXT: while executing command on localhost:xxxxx
SET client_min_messages TO WARNING;
CALL citus_cleanup_orphaned_resources();
RESET client_min_messages;
SELECT result AS database_cleanedup_on_node FROM run_command_on_all_nodes($$SELECT COUNT(*)=0 FROM pg_database WHERE datname LIKE 'citus_temp_database_%'$$);
database_cleanedup_on_node
---------------------------------------------------------------------
t
t
t
(3 rows)

SELECT * FROM public.check_database_on_all_nodes($$test_\!failure$$) ORDER BY node_type, result;
node_type | result
---------------------------------------------------------------------
coordinator (local) | {"database_properties": null, "pg_dist_object_record_for_db_exists": false, "stale_pg_dist_object_record_for_a_db_exists": false}
worker node (remote) | {"database_properties": null, "pg_dist_object_record_for_db_exists": false, "stale_pg_dist_object_record_for_a_db_exists": false}
worker node (remote) | {"database_properties": {"datacl": null, "datname": "test_\\!failure", "datctype": "C", "encoding": "UTF8", "datcollate": "C", "tablespace": "pg_default", "daticurules": null, "datallowconn": true, "datconnlimit": -1, "daticulocale": null, "datistemplate": false, "database_owner": "postgres", "datcollversion": null, "datlocprovider": "c"}, "pg_dist_object_record_for_db_exists": false, "stale_pg_dist_object_record_for_a_db_exists": false}
(3 rows)

SET citus.enable_create_database_propagation TO OFF;
CREATE DATABASE "test_\!failure1";
NOTICE: Citus partially supports CREATE DATABASE for distributed databases
DETAIL: Citus does not propagate CREATE DATABASE command to other nodes
HINT: You can manually create a database and its extensions on other nodes.
\c - - - :worker_1_port
DROP DATABASE "test_\!failure";
SET citus.enable_create_database_propagation TO ON;
CREATE DATABASE "test_\!failure1";
ERROR: database "test_\!failure1" already exists
CONTEXT: while executing command on localhost:xxxxx
SET client_min_messages TO WARNING;
CALL citus_cleanup_orphaned_resources();
RESET client_min_messages;
SELECT result AS database_cleanedup_on_node FROM run_command_on_all_nodes($$SELECT COUNT(*)=0 FROM pg_database WHERE datname LIKE 'citus_temp_database_%'$$);
database_cleanedup_on_node
---------------------------------------------------------------------
t
t
t
(3 rows)

SELECT * FROM public.check_database_on_all_nodes($$test_\!failure1$$) ORDER BY node_type, result;
node_type | result
---------------------------------------------------------------------
coordinator (remote) | {"database_properties": {"datacl": null, "datname": "test_\\!failure1", "datctype": "C", "encoding": "UTF8", "datcollate": "C", "tablespace": "pg_default", "daticurules": null, "datallowconn": true, "datconnlimit": -1, "daticulocale": null, "datistemplate": false, "database_owner": "postgres", "datcollversion": null, "datlocprovider": "c"}, "pg_dist_object_record_for_db_exists": false, "stale_pg_dist_object_record_for_a_db_exists": false}
worker node (local) | {"database_properties": null, "pg_dist_object_record_for_db_exists": false, "stale_pg_dist_object_record_for_a_db_exists": false}
worker node (remote) | {"database_properties": null, "pg_dist_object_record_for_db_exists": false, "stale_pg_dist_object_record_for_a_db_exists": false}
(3 rows)

\c - - - :master_port
DROP DATABASE "test_\!failure1";
SET citus.enable_create_database_propagation TO ON;
--clean up resources created by this test
-- DROP TABLESPACE is not supported, so we need to drop it manually.
Expand Down
2 changes: 2 additions & 0 deletions src/test/regress/sql/alter_database_propagation.sql
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ alter database regression set lock_timeout to DEFAULT;
alter database regression RESET lock_timeout;

set citus.enable_create_database_propagation=on;
SET citus.next_operation_id TO 3000;
create database "regression!'2";
alter database "regression!'2" with CONNECTION LIMIT 100;
alter database "regression!'2" with IS_TEMPLATE true CONNECTION LIMIT 50;
Expand Down Expand Up @@ -90,6 +91,7 @@ set citus.enable_create_database_propagation=on;

drop database regression3;

SET citus.next_operation_id TO 3100;
create database "regression!'4";


Expand Down
Loading

0 comments on commit 35f64c7

Please sign in to comment.