Skip to content

Commit

Permalink
Support CREATE / DROP database commands from any node (#7359)
Browse files Browse the repository at this point in the history
DESCRIPTION: Adds support for issuing `CREATE`/`DROP` DATABASE commands
from worker nodes

With this commit, we allow issuing CREATE / DROP DATABASE commands from
worker nodes too.
As in #7278, this is not allowed when the coordinator is not added to
metadata because we don't ever sync metadata changes to coordinator
when adding coordinator to the metadata via
`SELECT citus_set_coordinator_host('<hostname>')`, or equivalently, via
`SELECT citus_add_node(<coordinator_node_name>, <coordinator_node_port>, 0)`.

We serialize database management commands by acquiring a Citus specific
advisory lock on the first primary worker node if there are any workers in the
cluster. As opposed to what we've done in #7278
for role management commands, we try to avoid from running into distributed deadlocks
as much as possible. This is because, while distributed deadlocks that can happen around
role management commands can be detected by Citus, this is not the case for database
management commands because most of them cannot be run inside in a transaction block.
In that case, Citus cannot even detect the distributed deadlock because the command is not
part of a distributed transaction at all, then the command execution might not return the
control back to the user for an indefinite amount of time.
  • Loading branch information
onurctirtir authored Jan 8, 2024
1 parent 20dc58c commit 1d55deb
Show file tree
Hide file tree
Showing 33 changed files with 1,438 additions and 85 deletions.
86 changes: 76 additions & 10 deletions src/backend/distributed/commands/database.c
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
#include "distributed/metadata_utility.h"
#include "distributed/multi_executor.h"
#include "distributed/relation_access_tracking.h"
#include "distributed/serialize_distributed_ddls.h"
#include "distributed/worker_protocol.h"
#include "distributed/worker_transaction.h"

Expand Down Expand Up @@ -248,6 +249,9 @@ IsSetTablespaceStatement(AlterDatabaseStmt *stmt)
*
* In this stage we can prepare the commands that need to be run on all workers to grant
* on databases.
*
* We also serialize database commands globally by acquiring a Citus specific advisory
* lock based on OCLASS_DATABASE on the first primary worker node.
*/
List *
PreprocessAlterDatabaseStmt(Node *node, const char *queryString,
Expand All @@ -264,6 +268,7 @@ PreprocessAlterDatabaseStmt(Node *node, const char *queryString,
}

EnsureCoordinator();
SerializeDistributedDDLsOnObjectClassObject(OCLASS_DATABASE, stmt->dbname);

char *sql = DeparseTreeNode((Node *) stmt);

Expand Down Expand Up @@ -291,11 +296,14 @@ PreprocessAlterDatabaseStmt(Node *node, const char *queryString,
#if PG_VERSION_NUM >= PG_VERSION_15

/*
* PreprocessAlterDatabaseSetStmt is executed before the statement is applied to the local
* postgres instance.
* PreprocessAlterDatabaseRefreshCollStmt is executed before the statement is applied to
* the local postgres instance.
*
* In this stage we can prepare the commands that need to be run on all workers to grant
* on databases.
*
* We also serialize database commands globally by acquiring a Citus specific advisory
* lock based on OCLASS_DATABASE on the first primary worker node.
*/
List *
PreprocessAlterDatabaseRefreshCollStmt(Node *node, const char *queryString,
Expand All @@ -312,6 +320,7 @@ PreprocessAlterDatabaseRefreshCollStmt(Node *node, const char *queryString,
}

EnsureCoordinator();
SerializeDistributedDDLsOnObjectClassObject(OCLASS_DATABASE, stmt->dbname);

char *sql = DeparseTreeNode((Node *) stmt);

Expand All @@ -325,8 +334,51 @@ PreprocessAlterDatabaseRefreshCollStmt(Node *node, const char *queryString,

#endif


/*
* PreprocessAlterDatabaseRenameStmt is executed before the statement is applied to the local
* PreprocessAlterDatabaseRenameStmt is executed before the statement is applied to
* the local postgres instance.
*
* We also serialize database commands globally by acquiring a Citus specific advisory
* lock based on OCLASS_DATABASE on the first primary worker node.
*
* We acquire this lock here instead of PostprocessAlterDatabaseRenameStmt because the
* command renames the database and SerializeDistributedDDLsOnObjectClass resolves the
* object on workers based on database name. For this reason, we need to acquire the lock
* before the command is applied to the local postgres instance.
*/
List *
PreprocessAlterDatabaseRenameStmt(Node *node, const char *queryString,
ProcessUtilityContext processUtilityContext)
{
bool missingOk = true;
RenameStmt *stmt = castNode(RenameStmt, node);
ObjectAddress *dbAddress = GetDatabaseAddressFromDatabaseName(stmt->subname,
missingOk);

if (!ShouldPropagate() || !IsAnyObjectDistributed(list_make1(dbAddress)))
{
return NIL;
}

EnsureCoordinator();

/*
* Different than other ALTER DATABASE commands, we first acquire a lock
* by providing InvalidOid because we want ALTER TABLE .. RENAME TO ..
* commands to block not only with ALTER DATABASE operations but also
* with CREATE DATABASE operations because they might cause name conflicts
* and that could also cause deadlocks too.
*/
SerializeDistributedDDLsOnObjectClass(OCLASS_DATABASE);
SerializeDistributedDDLsOnObjectClassObject(OCLASS_DATABASE, stmt->subname);

return NIL;
}


/*
* PostprocessAlterDatabaseRenameStmt is executed after the statement is applied to the local
* postgres instance. In this stage we prepare ALTER DATABASE RENAME statement to be run on
* all workers.
*/
Expand Down Expand Up @@ -361,6 +413,9 @@ PostprocessAlterDatabaseRenameStmt(Node *node, const char *queryString)
*
* In this stage we can prepare the commands that need to be run on all workers to grant
* on databases.
*
* We also serialize database commands globally by acquiring a Citus specific advisory
* lock based on OCLASS_DATABASE on the first primary worker node.
*/
List *
PreprocessAlterDatabaseSetStmt(Node *node, const char *queryString,
Expand All @@ -377,6 +432,7 @@ PreprocessAlterDatabaseSetStmt(Node *node, const char *queryString,
}

EnsureCoordinator();
SerializeDistributedDDLsOnObjectClassObject(OCLASS_DATABASE, stmt->dbname);

char *sql = DeparseTreeNode((Node *) stmt);

Expand All @@ -389,12 +445,15 @@ PreprocessAlterDatabaseSetStmt(Node *node, const char *queryString,


/*
* PostprocessAlterDatabaseStmt is executed before the statement is applied to the local
* PreprocessCreateDatabaseStmt is executed before the statement is applied to the local
* Postgres instance.
*
* In this stage, we perform validations that we want to ensure before delegating to
* previous utility hooks because it might not be convenient to throw an error in an
* implicit transaction that creates a database.
*
* We also serialize database commands globally by acquiring a Citus specific advisory
* lock based on OCLASS_DATABASE on the first primary worker node.
*/
List *
PreprocessCreateDatabaseStmt(Node *node, const char *queryString,
Expand All @@ -405,11 +464,13 @@ PreprocessCreateDatabaseStmt(Node *node, const char *queryString,
return NIL;
}

EnsureCoordinator();
EnsurePropagationToCoordinator();

CreatedbStmt *stmt = castNode(CreatedbStmt, node);
EnsureSupportedCreateDatabaseCommand(stmt);

SerializeDistributedDDLsOnObjectClass(OCLASS_DATABASE);

return NIL;
}

Expand All @@ -430,7 +491,7 @@ PostprocessCreateDatabaseStmt(Node *node, const char *queryString)
return NIL;
}

EnsureCoordinator();
EnsurePropagationToCoordinator();

/*
* Given that CREATE DATABASE doesn't support "IF NOT EXISTS" and we're
Expand All @@ -448,16 +509,19 @@ PostprocessCreateDatabaseStmt(Node *node, const char *queryString)
(void *) createDatabaseCommand,
ENABLE_DDL_PROPAGATION);

return NontransactionalNodeDDLTaskList(NON_COORDINATOR_NODES, commands);
return NontransactionalNodeDDLTaskList(REMOTE_NODES, commands);
}


/*
* PreprocessDropDatabaseStmt is executed after the statement is applied to the local
* PreprocessDropDatabaseStmt is executed before the statement is applied to the local
* postgres instance. In this stage we can prepare the commands that need to be run on
* all workers to drop the database. Since the DROP DATABASE statement gives error in
* transaction context, we need to use NontransactionalNodeDDLTaskList to send the
* DROP DATABASE statement to the workers.
*
* We also serialize database commands globally by acquiring a Citus specific advisory
* lock based on OCLASS_DATABASE on the first primary worker node.
*/
List *
PreprocessDropDatabaseStmt(Node *node, const char *queryString,
Expand All @@ -468,7 +532,7 @@ PreprocessDropDatabaseStmt(Node *node, const char *queryString,
return NIL;
}

EnsureCoordinator();
EnsurePropagationToCoordinator();

DropdbStmt *stmt = (DropdbStmt *) node;

Expand All @@ -488,13 +552,15 @@ PreprocessDropDatabaseStmt(Node *node, const char *queryString,
return NIL;
}

SerializeDistributedDDLsOnObjectClassObject(OCLASS_DATABASE, stmt->dbname);

char *dropDatabaseCommand = DeparseTreeNode(node);

List *commands = list_make3(DISABLE_DDL_PROPAGATION,
(void *) dropDatabaseCommand,
ENABLE_DDL_PROPAGATION);

return NontransactionalNodeDDLTaskList(NON_COORDINATOR_NODES, commands);
return NontransactionalNodeDDLTaskList(REMOTE_NODES, commands);
}


Expand Down
2 changes: 1 addition & 1 deletion src/backend/distributed/commands/distribute_object_ops.c
Original file line number Diff line number Diff line change
Expand Up @@ -536,7 +536,7 @@ static DistributeObjectOps Database_Set = {
static DistributeObjectOps Database_Rename = {
.deparse = DeparseAlterDatabaseRenameStmt,
.qualify = NULL,
.preprocess = NULL,
.preprocess = PreprocessAlterDatabaseRenameStmt,
.postprocess = PostprocessAlterDatabaseRenameStmt,
.objectType = OBJECT_DATABASE,
.operationType = DIST_OPS_ALTER,
Expand Down
4 changes: 2 additions & 2 deletions src/backend/distributed/commands/utility_hook.c
Original file line number Diff line number Diff line change
Expand Up @@ -726,9 +726,9 @@ citus_ProcessUtilityInternal(PlannedStmt *pstmt,
ereport(NOTICE, (errmsg("Citus partially supports CREATE DATABASE for "
"distributed databases"),
errdetail("Citus does not propagate CREATE DATABASE "
"command to workers"),
"command to other nodes"),
errhint("You can manually create a database and its "
"extensions on workers.")));
"extensions on other nodes.")));
}
}
else if (IsA(parsetree, CreateRoleStmt) && !EnableCreateRolePropagation)
Expand Down
14 changes: 13 additions & 1 deletion src/backend/distributed/metadata/node_metadata.c
Original file line number Diff line number Diff line change
Expand Up @@ -2771,12 +2771,24 @@ EnsureCoordinatorIsInMetadata(void)
{
bool isCoordinatorInMetadata = false;
PrimaryNodeForGroup(COORDINATOR_GROUP_ID, &isCoordinatorInMetadata);
if (!isCoordinatorInMetadata)
if (isCoordinatorInMetadata)
{
return;
}

/* be more descriptive when we're not on coordinator */
if (IsCoordinator())
{
ereport(ERROR, (errmsg("coordinator is not added to the metadata"),
errhint("Use SELECT citus_set_coordinator_host('<hostname>') "
"to configure the coordinator hostname")));
}
else
{
ereport(ERROR, (errmsg("coordinator is not added to the metadata"),
errhint("Use SELECT citus_set_coordinator_host('<hostname>') "
"on coordinator to configure the coordinator hostname")));
}
}


Expand Down
Loading

0 comments on commit 1d55deb

Please sign in to comment.