Skip to content

Commit

Permalink
fix more
Browse files Browse the repository at this point in the history
  • Loading branch information
onurctirtir committed Jan 3, 2024
1 parent 97350c8 commit 5c88980
Show file tree
Hide file tree
Showing 12 changed files with 225 additions and 130 deletions.
23 changes: 9 additions & 14 deletions src/backend/distributed/commands/database.c
Original file line number Diff line number Diff line change
Expand Up @@ -197,13 +197,8 @@ PreprocessGrantOnDatabaseStmt(Node *node, const char *queryString,
String *distributedDatabaseName = NULL;
foreach_ptr(distributedDatabaseName, sortedDistributedDatabases)
{
/* FilterDistributedDatabases ensured that the database exists and is distributed */
bool missingOk = false;
ObjectAddress *distributedDatabaseAddress = GetDatabaseAddressFromDatabaseName(
strVal(distributedDatabaseName), missingOk);

SerializeDistributedDDLsOnObjectClass(OCLASS_DATABASE,
distributedDatabaseAddress->objectId);
SerializeDistributedDDLsOnObjectClassObject(OCLASS_DATABASE,
strVal(distributedDatabaseName));
}

List *originalObjects = stmt->objects;
Expand Down Expand Up @@ -290,7 +285,7 @@ PreprocessAlterDatabaseStmt(Node *node, const char *queryString,
}

EnsureCoordinator();
SerializeDistributedDDLsOnObjectClass(OCLASS_DATABASE, dbAddress->objectId);
SerializeDistributedDDLsOnObjectClassObject(OCLASS_DATABASE, stmt->dbname);

char *sql = DeparseTreeNode((Node *) stmt);

Expand Down Expand Up @@ -342,7 +337,7 @@ PreprocessAlterDatabaseRefreshCollStmt(Node *node, const char *queryString,
}

EnsureCoordinator();
SerializeDistributedDDLsOnObjectClass(OCLASS_DATABASE, dbAddress->objectId);
SerializeDistributedDDLsOnObjectClassObject(OCLASS_DATABASE, stmt->dbname);

char *sql = DeparseTreeNode((Node *) stmt);

Expand Down Expand Up @@ -392,8 +387,8 @@ PreprocessAlterDatabaseRenameStmt(Node *node, const char *queryString,
* with CREATE DATABASE operations because they might cause name conflicts
* and that could also cause deadlocks too.
*/
SerializeDistributedDDLsOnObjectClass(OCLASS_DATABASE, InvalidOid);
SerializeDistributedDDLsOnObjectClass(OCLASS_DATABASE, dbAddress->objectId);
SerializeDistributedDDLsOnObjectClass(OCLASS_DATABASE);
SerializeDistributedDDLsOnObjectClassObject(OCLASS_DATABASE, stmt->subname);

return NIL;
}
Expand Down Expand Up @@ -454,7 +449,7 @@ PreprocessAlterDatabaseSetStmt(Node *node, const char *queryString,
}

EnsureCoordinator();
SerializeDistributedDDLsOnObjectClass(OCLASS_DATABASE, dbAddress->objectId);
SerializeDistributedDDLsOnObjectClassObject(OCLASS_DATABASE, stmt->dbname);

char *sql = DeparseTreeNode((Node *) stmt);

Expand Down Expand Up @@ -491,7 +486,7 @@ PreprocessCreateDatabaseStmt(Node *node, const char *queryString,
CreatedbStmt *stmt = castNode(CreatedbStmt, node);
EnsureSupportedCreateDatabaseCommand(stmt);

SerializeDistributedDDLsOnObjectClass(OCLASS_DATABASE, InvalidOid);
SerializeDistributedDDLsOnObjectClass(OCLASS_DATABASE);

return NIL;
}
Expand Down Expand Up @@ -574,7 +569,7 @@ PreprocessDropDatabaseStmt(Node *node, const char *queryString,
return NIL;
}

SerializeDistributedDDLsOnObjectClass(OCLASS_DATABASE, address->objectId);
SerializeDistributedDDLsOnObjectClassObject(OCLASS_DATABASE, stmt->dbname);

char *dropDatabaseCommand = DeparseTreeNode(node);

Expand Down
195 changes: 129 additions & 66 deletions src/backend/distributed/serialize_distributed_ddls.c
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@
* This file contains functions for serializing distributed DDLs.
*
* If you're adding support for serializing a new DDL, you should
* extend AcquireCitusAdvisoryObjectClassLockCommandRemoteOidExpr() and
* AcquireCitusAdvisoryObjectClassLockCheckPrivileges() functions
* to support the new object class.
* extend the following functions to support the new object class:
* AcquireCitusAdvisoryObjectClassLockGetOid()
* AcquireCitusAdvisoryObjectClassLockCheckPrivileges()
*
*-------------------------------------------------------------------------
*/
Expand All @@ -34,12 +34,14 @@
PG_FUNCTION_INFO_V1(citus_internal_acquire_citus_advisory_object_class_lock);


static void SerializeDistributedDDLsOnObjectClassInternal(ObjectClass objectClass,
char *qualifiedObjectName);
static char * AcquireCitusAdvisoryObjectClassLockCommand(ObjectClass objectClass,
Oid oid);
static const char * AcquireCitusAdvisoryObjectClassLockCommandRemoteOidExpr(ObjectClass
objectClass,
Oid oid);
static void AcquireCitusAdvisoryObjectClassLock(ObjectClass objectClass, Oid oid);
char *qualifiedObjectName);
static void AcquireCitusAdvisoryObjectClassLock(ObjectClass objectClass,
char *qualifiedObjectName);
static Oid AcquireCitusAdvisoryObjectClassLockGetOid(ObjectClass objectClass,
char *qualifiedObjectName);
static void AcquireCitusAdvisoryObjectClassLockCheckPrivileges(ObjectClass objectClass,
Oid oid);

Expand All @@ -56,26 +58,78 @@ citus_internal_acquire_citus_advisory_object_class_lock(PG_FUNCTION_ARGS)
PG_ENSURE_ARGNOTNULL(0, "object_class");

Check warning on line 58 in src/backend/distributed/serialize_distributed_ddls.c

View check run for this annotation

Codecov / codecov/patch

src/backend/distributed/serialize_distributed_ddls.c#L58

Added line #L58 was not covered by tests
ObjectClass objectClass = PG_GETARG_INT32(0);

PG_ENSURE_ARGNOTNULL(1, "oid");
Oid oid = PG_GETARG_OID(1);
char *qualifiedObjectName = PG_ARGISNULL(1) ? NULL : PG_GETARG_CSTRING(1);

AcquireCitusAdvisoryObjectClassLock(objectClass, oid);
AcquireCitusAdvisoryObjectClassLock(objectClass, qualifiedObjectName);

PG_RETURN_VOID();
}


/*
* SerializeDistributedDDLsOnObjectClass serializes distributed DDLs that
* target given object class by acquiring a Citus specific advisory lock
* SerializeDistributedDDLsOnObjectClass is a wrapper around
* SerializeDistributedDDLsOnObjectClassInternal to acquire the lock on given
* object class itself, see the comment in header file for more details about
* the difference between this function and
* SerializeDistributedDDLsOnObjectClassObject().
*/
void
SerializeDistributedDDLsOnObjectClass(ObjectClass objectClass)
{
SerializeDistributedDDLsOnObjectClassInternal(objectClass, NULL);
}


/*
* SerializeDistributedDDLsOnObjectClassObject is a wrapper around
* SerializeDistributedDDLsOnObjectClassInternal to acquire the lock on given
* object that belongs to given object class, see the comment in header file
* for more details about the difference between this function and
* SerializeDistributedDDLsOnObjectClass().
*/
void
SerializeDistributedDDLsOnObjectClassObject(ObjectClass objectClass,
char *qualifiedObjectName)
{
if (qualifiedObjectName == NULL)
{
elog(ERROR, "qualified object name cannot be NULL");

Check warning on line 96 in src/backend/distributed/serialize_distributed_ddls.c

View check run for this annotation

Codecov / codecov/patch

src/backend/distributed/serialize_distributed_ddls.c#L96

Added line #L96 was not covered by tests
}

SerializeDistributedDDLsOnObjectClassInternal(objectClass, qualifiedObjectName);
}


/*
* SerializeDistributedDDLsOnObjectClassInternal serializes distributed DDLs
* that target given object class by acquiring a Citus specific advisory lock
* on the first primary worker node if there are any workers in the cluster.
*
* The lock is acquired via a coordinated transaction. For this reason,
* it automatically gets released when (maybe implicit) transaction on
* current server commits or rolls back.
*
* If qualifiedObjectName is provided to be non-null, then the oid of the
* object is first resolved on the first primary worker node and then the
* lock is acquired on that oid. If qualifiedObjectName is null, then the
* lock is acquired on the object class itself.
*
* Note that those two lock types don't conflict with each other and are
* acquired for different purposes. The lock on the object class
* (qualifiedObjectName = NULL) is used to serialize DDLs that target the
* object class itself, e.g., when creating a new object of that class, and
* the latter is used to serialize DDLs that target a specific object of
* that class, e.g., when altering an object.
*
* In some cases, we may want to acquire both locks at the same time. For
* example, when renaming a database, we want to acquire both lock types
* because while the object class lock is used to ensure that another session
* doesn't create a new database with the same name, the object lock is used
* to ensure that another session doesn't alter the same database.
*/
void
SerializeDistributedDDLsOnObjectClass(ObjectClass objectClass, Oid oid)
static void
SerializeDistributedDDLsOnObjectClassInternal(ObjectClass objectClass,
char *qualifiedObjectName)
{
WorkerNode *firstWorkerNode = GetFirstPrimaryWorkerNode();
if (firstWorkerNode == NULL)
Expand All @@ -90,7 +144,8 @@ SerializeDistributedDDLsOnObjectClass(ObjectClass objectClass, Oid oid)
Task *task = CitusMakeNode(Task);
task->taskType = DDL_TASK;

char *command = AcquireCitusAdvisoryObjectClassLockCommand(objectClass, oid);
char *command = AcquireCitusAdvisoryObjectClassLockCommand(objectClass,
qualifiedObjectName);
SetTaskQueryString(task, command);

ShardPlacement *targetPlacement = CitusMakeNode(ShardPlacement);
Expand All @@ -110,60 +165,34 @@ SerializeDistributedDDLsOnObjectClass(ObjectClass objectClass, Oid oid)
* pg_catalog.citus_internal_acquire_citus_advisory_object_class_lock().
*/
static char *
AcquireCitusAdvisoryObjectClassLockCommand(ObjectClass objectClass, Oid oid)
AcquireCitusAdvisoryObjectClassLockCommand(ObjectClass objectClass,
char *qualifiedObjectName)
{
/* safe to cast to int as it's an enum */
int objectClassInt = (int) objectClass;

const char *oidString =
AcquireCitusAdvisoryObjectClassLockCommandRemoteOidExpr(objectClass, oid);
char *quotedObjectName =
!qualifiedObjectName ? "NULL" :
quote_literal_cstr(qualifiedObjectName);

StringInfo command = makeStringInfo();
appendStringInfo(command,
"SELECT pg_catalog.citus_internal_acquire_citus_advisory_object_class_lock(%d, (%s))",
objectClassInt, oidString);
"SELECT pg_catalog.citus_internal_acquire_citus_advisory_object_class_lock(%d, %s)",
objectClassInt, quotedObjectName);

return command->data;
}


/*
* AcquireCitusAdvisoryObjectClassLockCommandRemoteOidExpr returns the expr or query string
* that can be used to resolve the oid of the object given in oid parameter on other
* nodes. If oid is InvalidOid, then it returns "0".
*/
static const char *
AcquireCitusAdvisoryObjectClassLockCommandRemoteOidExpr(ObjectClass objectClass, Oid oid)
{
if (objectClass == OCLASS_DATABASE)
{
char *oidString = "0";

if (OidIsValid(oid))
{
StringInfo oidStringInfo = makeStringInfo();
appendStringInfo(oidStringInfo,
"SELECT oid FROM pg_catalog.pg_database WHERE datname = %s",
quote_literal_cstr(get_database_name(oid)));
oidString = oidStringInfo->data;
}

return oidString;
}
else
{
elog(ERROR, "unsupported object class: %d", objectClass);
}
}


/*
* AcquireCitusAdvisoryObjectClassLock acquires a Citus specific advisory
* ExclusiveLock based on given object class.
*/
static void
AcquireCitusAdvisoryObjectClassLock(ObjectClass objectClass, Oid oid)
AcquireCitusAdvisoryObjectClassLock(ObjectClass objectClass, char *qualifiedObjectName)
{
Oid oid = AcquireCitusAdvisoryObjectClassLockGetOid(objectClass, qualifiedObjectName);

AcquireCitusAdvisoryObjectClassLockCheckPrivileges(objectClass, oid);

LOCKTAG locktag;
Expand All @@ -176,29 +205,63 @@ AcquireCitusAdvisoryObjectClassLock(ObjectClass objectClass, Oid oid)
}


/*
* AcquireCitusAdvisoryObjectClassLockGetOid returns the oid of given object
* that belongs to given object class. If qualifiedObjectName is NULL, then
* it returns InvalidOid.
*/
static Oid
AcquireCitusAdvisoryObjectClassLockGetOid(ObjectClass objectClass,
char *qualifiedObjectName)
{
if (qualifiedObjectName == NULL)
{
return InvalidOid;
}

bool missingOk = false;

switch (objectClass)
{
case OCLASS_DATABASE:
{
return get_database_oid(qualifiedObjectName, missingOk);
}

default:
elog(ERROR, "unsupported object class: %d", objectClass);

Check warning on line 232 in src/backend/distributed/serialize_distributed_ddls.c

View check run for this annotation

Codecov / codecov/patch

src/backend/distributed/serialize_distributed_ddls.c#L232

Added line #L232 was not covered by tests
}
}


/*
* AcquireCitusAdvisoryObjectClassLockCheckPrivileges is used to perform privilege checks
* before acquiring the Citus specific advisory lock on given object class and oid.
*/
static void
AcquireCitusAdvisoryObjectClassLockCheckPrivileges(ObjectClass objectClass, Oid oid)
{
if (objectClass == OCLASS_DATABASE)
switch (objectClass)
{
if (OidIsValid(oid) && !object_ownercheck(DatabaseRelationId, oid, GetUserId()))
{
aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_DATABASE,
get_database_name(oid));
}
else if (!OidIsValid(oid) && !have_createdb_privilege())
case OCLASS_DATABASE:
{
ereport(ERROR,
(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
errmsg("permission denied to create / rename database")));
if (OidIsValid(oid) && !object_ownercheck(DatabaseRelationId, oid,
GetUserId()))
{
aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_DATABASE,
get_database_name(oid));
}
else if (!OidIsValid(oid) && !have_createdb_privilege())
{
ereport(ERROR,

Check warning on line 256 in src/backend/distributed/serialize_distributed_ddls.c

View check run for this annotation

Codecov / codecov/patch

src/backend/distributed/serialize_distributed_ddls.c#L256

Added line #L256 was not covered by tests
(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
errmsg("permission denied to create / rename database")));
}

break;
}
}
else
{
elog(ERROR, "unsupported object class: %d", objectClass);

default:
elog(ERROR, "unsupported object class: %d", objectClass);

Check warning on line 265 in src/backend/distributed/serialize_distributed_ddls.c

View check run for this annotation

Codecov / codecov/patch

src/backend/distributed/serialize_distributed_ddls.c#L265

Added line #L265 was not covered by tests
}
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
-- citus--12.2-1--12.1-1

DROP FUNCTION pg_catalog.citus_internal_database_command(text);
DROP FUNCTION pg_catalog.citus_internal_acquire_citus_advisory_object_class_lock(int, oid);
DROP FUNCTION pg_catalog.citus_internal_acquire_citus_advisory_object_class_lock(int, cstring);

#include "../udfs/citus_add_rebalance_strategy/10.1-1.sql"

Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
CREATE OR REPLACE FUNCTION pg_catalog.citus_internal_acquire_citus_advisory_object_class_lock(objectClass int, oid oid)
CREATE OR REPLACE FUNCTION pg_catalog.citus_internal_acquire_citus_advisory_object_class_lock(objectClass int, qualifiedObjectName cstring)
RETURNS void
LANGUAGE C
VOLATILE
Expand Down
19 changes: 18 additions & 1 deletion src/include/distributed/serialize_distributed_ddls.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,23 @@

#include "catalog/dependency.h"

extern void SerializeDistributedDDLsOnObjectClass(ObjectClass objectClass, Oid oid);
/*
* Note that those two lock types don't conflict with each other and are
* acquired for different purposes. The lock on the object class
* --SerializeDistributedDDLsOnObjectClass()-- is used to serialize DDLs
* that target the object class itself, e.g., when creating a new object
* of that class, and the latter one --SerializeDistributedDDLsOnObjectClassObject()--
* is used to serialize DDLs that target a specific object of that class,
* e.g., when altering an object.
*
* In some cases, we may want to acquire both locks at the same time. For
* example, when renaming a database, we want to acquire both lock types
* because while the object class lock is used to ensure that another session
* doesn't create a new database with the same name, the object lock is used
* to ensure that another session doesn't alter the same database.
*/
extern void SerializeDistributedDDLsOnObjectClass(ObjectClass objectClass);
extern void SerializeDistributedDDLsOnObjectClassObject(ObjectClass objectClass,
char *qualifiedObjectName);

#endif /* SERIALIZE_DDLS_OVER_CATALOG_H */
Loading

0 comments on commit 5c88980

Please sign in to comment.