Skip to content

Commit

Permalink
commit
Browse files Browse the repository at this point in the history
  • Loading branch information
onurctirtir committed Dec 29, 2023
1 parent e228834 commit f67448b
Show file tree
Hide file tree
Showing 31 changed files with 920 additions and 80 deletions.
100 changes: 94 additions & 6 deletions src/backend/distributed/commands/database.c
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
#include "distributed/metadata_utility.h"
#include "distributed/multi_executor.h"
#include "distributed/relation_access_tracking.h"
#include "distributed/serialize_distributed_ddls.h"
#include "distributed/worker_protocol.h"
#include "distributed/worker_transaction.h"

Expand Down Expand Up @@ -161,6 +162,10 @@ get_database_owner(Oid dbId)
*
* In this stage we can prepare the commands that need to be run on all workers to grant
* on databases.
*
* We also serialize database commands globally by acquiring Citus specific advisory
* locks on affected databases based on OCLASS_DATABASE on the first primary worker
* node.
*/
List *
PreprocessGrantOnDatabaseStmt(Node *node, const char *queryString,
Expand All @@ -183,6 +188,24 @@ PreprocessGrantOnDatabaseStmt(Node *node, const char *queryString,

EnsureCoordinator();

/*
* Sort before acquring the locks to prevent deadlocks that could happen
* due to acquiring the locks in different orders.
*/
List *sortedDistributedDatabases = SortList(distributedDatabases, CompareStringNodes);

String *distributedDatabaseName = NULL;
foreach_ptr(distributedDatabaseName, sortedDistributedDatabases)
{
/* FilterDistributedDatabases ensured that the database exists and is distributed */
bool missingOk = false;
ObjectAddress *distributedDatabaseAddress = GetDatabaseAddressFromDatabaseName(
strVal(distributedDatabaseName), missingOk);

SerializeDistributedDDLsOnObjectClass(OCLASS_DATABASE,
distributedDatabaseAddress->objectId);
}

List *originalObjects = stmt->objects;

stmt->objects = distributedDatabases;
Expand Down Expand Up @@ -248,6 +271,9 @@ IsSetTablespaceStatement(AlterDatabaseStmt *stmt)
*
* In this stage we can prepare the commands that need to be run on all workers to grant
* on databases.
*
* We also serialize database commands globally by acquiring a Citus specific advisory
* lock based on OCLASS_DATABASE on the first primary worker node.
*/
List *
PreprocessAlterDatabaseStmt(Node *node, const char *queryString,
Expand All @@ -264,6 +290,7 @@ PreprocessAlterDatabaseStmt(Node *node, const char *queryString,
}

EnsureCoordinator();
SerializeDistributedDDLsOnObjectClass(OCLASS_DATABASE, dbAddress->objectId);

char *sql = DeparseTreeNode((Node *) stmt);

Expand Down Expand Up @@ -296,6 +323,9 @@ PreprocessAlterDatabaseStmt(Node *node, const char *queryString,
*
* In this stage we can prepare the commands that need to be run on all workers to grant
* on databases.
*
* We also serialize database commands globally by acquiring a Citus specific advisory
* lock based on OCLASS_DATABASE on the first primary worker node.
*/
List *
PreprocessAlterDatabaseRefreshCollStmt(Node *node, const char *queryString,
Expand All @@ -312,6 +342,7 @@ PreprocessAlterDatabaseRefreshCollStmt(Node *node, const char *queryString,
}

EnsureCoordinator();
SerializeDistributedDDLsOnObjectClass(OCLASS_DATABASE, dbAddress->objectId);

char *sql = DeparseTreeNode((Node *) stmt);

Expand All @@ -325,8 +356,51 @@ PreprocessAlterDatabaseRefreshCollStmt(Node *node, const char *queryString,

#endif


/*
* PreprocessAlterDatabaseRenameStmt is executed before the statement is applied to
* the local postgres instance.
*
* We also serialize database commands globally by acquiring a Citus specific advisory
* lock based on OCLASS_DATABASE on the first primary worker node.
*
* We acquire this lock here instead of PostprocessAlterDatabaseRenameStmt because the
* command renames the database and SerializeDistributedDDLsOnObjectClass resolves the
* object on workers based on database name. For this reason, we need to acquire the lock
* before the command is applied to the local postgres instance.
*/
List *
PreprocessAlterDatabaseRenameStmt(Node *node, const char *queryString,
ProcessUtilityContext processUtilityContext)
{
bool missingOk = true;
RenameStmt *stmt = castNode(RenameStmt, node);
ObjectAddress *dbAddress = GetDatabaseAddressFromDatabaseName(stmt->subname,
missingOk);

if (!ShouldPropagate() || !IsAnyObjectDistributed(list_make1(dbAddress)))
{
return NIL;
}

EnsureCoordinator();

/*
* Different than other ALTER DATABASE commands, we first acquire a lock
* by providing InvalidOid because we want ALTER TABLE .. RENAME TO ..
* commands to block not only with ALTER DATABASE operations but also
* with CREATE DATABASE operations because they might cause name conflicts
* and that could also cause deadlocks too.
*/
SerializeDistributedDDLsOnObjectClass(OCLASS_DATABASE, InvalidOid);
SerializeDistributedDDLsOnObjectClass(OCLASS_DATABASE, dbAddress->objectId);

return NIL;
}


/*
* PreprocessAlterDatabaseRenameStmt is executed before the statement is applied to the local
* PostprocessAlterDatabaseRenameStmt is executed after the statement is applied to the local
* postgres instance. In this stage we prepare ALTER DATABASE RENAME statement to be run on
* all workers.
*/
Expand Down Expand Up @@ -361,6 +435,9 @@ PostprocessAlterDatabaseRenameStmt(Node *node, const char *queryString)
*
* In this stage we can prepare the commands that need to be run on all workers to grant
* on databases.
*
* We also serialize database commands globally by acquiring a Citus specific advisory
* lock based on OCLASS_DATABASE on the first primary worker node.
*/
List *
PreprocessAlterDatabaseSetStmt(Node *node, const char *queryString,
Expand All @@ -377,6 +454,7 @@ PreprocessAlterDatabaseSetStmt(Node *node, const char *queryString,
}

EnsureCoordinator();
SerializeDistributedDDLsOnObjectClass(OCLASS_DATABASE, dbAddress->objectId);

char *sql = DeparseTreeNode((Node *) stmt);

Expand All @@ -395,6 +473,9 @@ PreprocessAlterDatabaseSetStmt(Node *node, const char *queryString,
* In this stage, we perform validations that we want to ensure before delegating to
* previous utility hooks because it might not be convenient to throw an error in an
* implicit transaction that creates a database.
*
* We also serialize database commands globally by acquiring a Citus specific advisory
* lock based on OCLASS_DATABASE on the first primary worker node.
*/
List *
PreprocessCreateDatabaseStmt(Node *node, const char *queryString,
Expand All @@ -405,11 +486,13 @@ PreprocessCreateDatabaseStmt(Node *node, const char *queryString,
return NIL;
}

EnsureCoordinator();
EnsurePropagationToCoordinator();

CreatedbStmt *stmt = castNode(CreatedbStmt, node);
EnsureSupportedCreateDatabaseCommand(stmt);

SerializeDistributedDDLsOnObjectClass(OCLASS_DATABASE, InvalidOid);

return NIL;
}

Expand All @@ -430,7 +513,7 @@ PostprocessCreateDatabaseStmt(Node *node, const char *queryString)
return NIL;
}

EnsureCoordinator();
EnsurePropagationToCoordinator();

/*
* Given that CREATE DATABASE doesn't support "IF NOT EXISTS" and we're
Expand All @@ -448,7 +531,7 @@ PostprocessCreateDatabaseStmt(Node *node, const char *queryString)
(void *) createDatabaseCommand,
ENABLE_DDL_PROPAGATION);

return NontransactionalNodeDDLTaskList(NON_COORDINATOR_NODES, commands);
return NontransactionalNodeDDLTaskList(REMOTE_NODES, commands);
}


Expand All @@ -458,6 +541,9 @@ PostprocessCreateDatabaseStmt(Node *node, const char *queryString)
* all workers to drop the database. Since the DROP DATABASE statement gives error in
* transaction context, we need to use NontransactionalNodeDDLTaskList to send the
* DROP DATABASE statement to the workers.
*
* We also serialize database commands globally by acquiring a Citus specific advisory
* lock based on OCLASS_DATABASE on the first primary worker node.
*/
List *
PreprocessDropDatabaseStmt(Node *node, const char *queryString,
Expand All @@ -468,7 +554,7 @@ PreprocessDropDatabaseStmt(Node *node, const char *queryString,
return NIL;
}

EnsureCoordinator();
EnsurePropagationToCoordinator();

DropdbStmt *stmt = (DropdbStmt *) node;

Expand All @@ -488,13 +574,15 @@ PreprocessDropDatabaseStmt(Node *node, const char *queryString,
return NIL;
}

SerializeDistributedDDLsOnObjectClass(OCLASS_DATABASE, address->objectId);

char *dropDatabaseCommand = DeparseTreeNode(node);

List *commands = list_make3(DISABLE_DDL_PROPAGATION,
(void *) dropDatabaseCommand,
ENABLE_DDL_PROPAGATION);

return NontransactionalNodeDDLTaskList(NON_COORDINATOR_NODES, commands);
return NontransactionalNodeDDLTaskList(REMOTE_NODES, commands);
}


Expand Down
2 changes: 1 addition & 1 deletion src/backend/distributed/commands/distribute_object_ops.c
Original file line number Diff line number Diff line change
Expand Up @@ -536,7 +536,7 @@ static DistributeObjectOps Database_Set = {
static DistributeObjectOps Database_Rename = {
.deparse = DeparseAlterDatabaseRenameStmt,
.qualify = NULL,
.preprocess = NULL,
.preprocess = PreprocessAlterDatabaseRenameStmt,
.postprocess = PostprocessAlterDatabaseRenameStmt,
.objectType = OBJECT_DATABASE,
.operationType = DIST_OPS_ALTER,
Expand Down
4 changes: 2 additions & 2 deletions src/backend/distributed/commands/utility_hook.c
Original file line number Diff line number Diff line change
Expand Up @@ -726,9 +726,9 @@ citus_ProcessUtilityInternal(PlannedStmt *pstmt,
ereport(NOTICE, (errmsg("Citus partially supports CREATE DATABASE for "
"distributed databases"),
errdetail("Citus does not propagate CREATE DATABASE "
"command to workers"),
"command to other nodes"),
errhint("You can manually create a database and its "
"extensions on workers.")));
"extensions on other nodes.")));
}
}
else if (IsA(parsetree, CreateRoleStmt) && !EnableCreateRolePropagation)
Expand Down
14 changes: 13 additions & 1 deletion src/backend/distributed/metadata/node_metadata.c
Original file line number Diff line number Diff line change
Expand Up @@ -2771,12 +2771,24 @@ EnsureCoordinatorIsInMetadata(void)
{
bool isCoordinatorInMetadata = false;
PrimaryNodeForGroup(COORDINATOR_GROUP_ID, &isCoordinatorInMetadata);
if (!isCoordinatorInMetadata)
if (isCoordinatorInMetadata)
{
return;
}

/* be more descriptive when we're not on coordinator */
if (IsCoordinator())
{
ereport(ERROR, (errmsg("coordinator is not added to the metadata"),
errhint("Use SELECT citus_set_coordinator_host('<hostname>') "
"to configure the coordinator hostname")));
}
else
{
ereport(ERROR, (errmsg("coordinator is not added to the metadata"),
errhint("Use SELECT citus_set_coordinator_host('<hostname>') "
"on coordinator to configure the coordinator hostname")));
}
}


Expand Down
Loading

0 comments on commit f67448b

Please sign in to comment.