Skip to content

Commit

Permalink
Adds REASSIGN OWNED BY propagation (#7319)
Browse files Browse the repository at this point in the history
DESCRIPTION: Adds REASSIGN OWNED BY propagation

This pull request introduces the propagation of the "Reassign owned by"
statement. It accommodates both local and distributed roles for both the
old and new assignments. However, when the old role is a local role, it
undergoes filtering and is not propagated. On the other hand, if the new
role is a local role, the process involves first creating the role on
worker nodes before propagating the "Reassign owned" statement.
  • Loading branch information
gurkanindibay authored Dec 28, 2023
1 parent 181b8ab commit c3579ee
Show file tree
Hide file tree
Showing 16 changed files with 605 additions and 61 deletions.
156 changes: 131 additions & 25 deletions src/backend/distributed/commands/dependencies.c
Original file line number Diff line number Diff line change
Expand Up @@ -31,20 +31,90 @@
#include "distributed/worker_manager.h"
#include "distributed/worker_transaction.h"

typedef enum RequiredObjectSet
{
REQUIRE_ONLY_DEPENDENCIES = 1,
REQUIRE_OBJECT_AND_DEPENDENCIES = 2,
} RequiredObjectSet;


static void EnsureDependenciesCanBeDistributed(const ObjectAddress *relationAddress);
static void ErrorIfCircularDependencyExists(const ObjectAddress *objectAddress);
static int ObjectAddressComparator(const void *a, const void *b);
static void EnsureDependenciesExistOnAllNodes(const ObjectAddress *target);
static void EnsureRequiredObjectSetExistOnAllNodes(const ObjectAddress *target,
RequiredObjectSet requiredObjectSet);
static List * GetDependencyCreateDDLCommands(const ObjectAddress *dependency);
static bool ShouldPropagateObject(const ObjectAddress *address);
static char * DropTableIfExistsCommand(Oid relationId);

/*
* EnsureDependenciesExistOnAllNodes finds all the dependencies that we support and makes
* sure these are available on all nodes. If not available they will be created on the
* nodes via a separate session that will be committed directly so that the objects are
* visible to potentially multiple sessions creating the shards.
* EnsureObjectAndDependenciesExistOnAllNodes is a wrapper around
* EnsureRequiredObjectSetExistOnAllNodes to ensure the "object itself" (together
* with its dependencies) is available on all nodes.
*
* Different than EnsureDependenciesExistOnAllNodes, we return early if the
* target object is distributed already.
*
* The reason why we don't do the same in EnsureDependenciesExistOnAllNodes
* is that it's is used when altering an object too and hence the target object
* may instantly have a dependency that needs to be propagated now. For example,
* when "⁠GRANT non_dist_role TO dist_role" is executed, we need to propagate
* "non_dist_role" to all nodes before propagating the "GRANT" command itself.
* For this reason, we call EnsureDependenciesExistOnAllNodes for "dist_role"
* and it would automatically discover that "non_dist_role" is a dependency of
* "dist_role" and propagate it beforehand.
*
* However, when we're requested to create the target object itself (and
* implicitly its dependencies), we're sure that we're not altering the target
* object itself, hence we can return early if the target object is already
* distributed. This is the case, for example, when
* "REASSIGN OWNED BY dist_role TO non_dist_role" is executed. In that case,
* "non_dist_role" is not a dependency of "dist_role" but we want to distribute
* "non_dist_role" beforehand and we call this function for "non_dist_role",
* not for "dist_role".
*
* See EnsureRequiredObjectExistOnAllNodes to learn more about how this
* function deals with an object created within the same transaction.
*/
void
EnsureObjectAndDependenciesExistOnAllNodes(const ObjectAddress *target)
{
if (IsAnyObjectDistributed(list_make1((ObjectAddress *) target)))
{
return;
}
EnsureRequiredObjectSetExistOnAllNodes(target, REQUIRE_OBJECT_AND_DEPENDENCIES);
}


/*
* EnsureDependenciesExistOnAllNodes is a wrapper around
* EnsureRequiredObjectSetExistOnAllNodes to ensure "all dependencies" of given
* object --but not the object itself-- are available on all nodes.
*
* See EnsureRequiredObjectSetExistOnAllNodes to learn more about how this
* function deals with an object created within the same transaction.
*/
static void
EnsureDependenciesExistOnAllNodes(const ObjectAddress *target)
{
EnsureRequiredObjectSetExistOnAllNodes(target, REQUIRE_ONLY_DEPENDENCIES);
}


/*
* EnsureRequiredObjectSetExistOnAllNodes finds all the dependencies that we support and makes
* sure these are available on all nodes if required object set is REQUIRE_ONLY_DEPENDENCIES.
* Otherwise, i.e., if required object set is REQUIRE_OBJECT_AND_DEPENDENCIES, then this
* function creates the object itself on all nodes too. This function ensures that each
* of the dependencies are supported by Citus but doesn't check the same for the target
* object itself (when REQUIRE_OBJECT_AND_DEPENDENCIES) is provided because we assume that
* callers don't call this function for an unsupported function at all.
*
* If not available, they will be created on the nodes via a separate session that will be
* committed directly so that the objects are visible to potentially multiple sessions creating
* the shards.
*
* Note; only the actual objects are created via a separate session, the records to
* pg_dist_object are created in this session. As a side effect the objects could be
Expand All @@ -55,29 +125,52 @@ static char * DropTableIfExistsCommand(Oid relationId);
* postgres native CREATE IF NOT EXISTS, or citus helper functions.
*/
static void
EnsureDependenciesExistOnAllNodes(const ObjectAddress *target)
EnsureRequiredObjectSetExistOnAllNodes(const ObjectAddress *target,
RequiredObjectSet requiredObjectSet)
{
List *dependenciesWithCommands = NIL;
Assert(requiredObjectSet == REQUIRE_ONLY_DEPENDENCIES ||
requiredObjectSet == REQUIRE_OBJECT_AND_DEPENDENCIES);


List *objectsWithCommands = NIL;
List *ddlCommands = NULL;

/*
* If there is any unsupported dependency or circular dependency exists, Citus can
* not ensure dependencies will exist on all nodes.
*
* Note that we don't check whether "target" is distributable (in case
* REQUIRE_OBJECT_AND_DEPENDENCIES is provided) because we expect callers
* to not even call this function if Citus doesn't know how to propagate
* "target" object itself.
*/
EnsureDependenciesCanBeDistributed(target);

/* collect all dependencies in creation order and get their ddl commands */
List *dependencies = GetDependenciesForObject(target);
ObjectAddress *dependency = NULL;
foreach_ptr(dependency, dependencies)
List *objectsToBeCreated = GetDependenciesForObject(target);

/*
* Append the target object to make sure that it's created after its
* dependencies are created, if requested.
*/
if (requiredObjectSet == REQUIRE_OBJECT_AND_DEPENDENCIES)
{
List *dependencyCommands = GetDependencyCreateDDLCommands(dependency);
ObjectAddress *targetCopy = palloc(sizeof(ObjectAddress));
*targetCopy = *target;

objectsToBeCreated = lappend(objectsToBeCreated, targetCopy);
}

ObjectAddress *object = NULL;
foreach_ptr(object, objectsToBeCreated)
{
List *dependencyCommands = GetDependencyCreateDDLCommands(object);
ddlCommands = list_concat(ddlCommands, dependencyCommands);

/* create a new list with dependencies that actually created commands */
/* create a new list with objects that actually created commands */
if (list_length(dependencyCommands) > 0)
{
dependenciesWithCommands = lappend(dependenciesWithCommands, dependency);
objectsWithCommands = lappend(objectsWithCommands, object);
}
}
if (list_length(ddlCommands) <= 0)
Expand All @@ -100,34 +193,47 @@ EnsureDependenciesExistOnAllNodes(const ObjectAddress *target)
List *remoteNodeList = ActivePrimaryRemoteNodeList(RowShareLock);

/*
* Lock dependent objects explicitly to make sure same DDL command won't be sent
* Lock objects to be created explicitly to make sure same DDL command won't be sent
* multiple times from parallel sessions.
*
* Sort dependencies that will be created on workers to not to have any deadlock
* Sort the objects that will be created on workers to not to have any deadlock
* issue if different sessions are creating different objects.
*/
List *addressSortedDependencies = SortList(dependenciesWithCommands,
List *addressSortedDependencies = SortList(objectsWithCommands,
ObjectAddressComparator);
foreach_ptr(dependency, addressSortedDependencies)
foreach_ptr(object, addressSortedDependencies)
{
LockDatabaseObject(dependency->classId, dependency->objectId,
dependency->objectSubId, ExclusiveLock);
LockDatabaseObject(object->classId, object->objectId,
object->objectSubId, ExclusiveLock);
}


/*
* We need to propagate dependencies via the current user's metadata connection if
* any dependency for the target is created in the current transaction. Our assumption
* is that if we rely on a dependency created in the current transaction, then the
* current user, most probably, has permissions to create the target object as well.
* We need to propagate objects via the current user's metadata connection if
* any of the objects that we're interested in are created in the current transaction.
* Our assumption is that if we rely on an object created in the current transaction,
* then the current user, most probably, has permissions to create the target object
* as well.
*
* Note that, user still may not be able to create the target due to no permissions
* for any of its dependencies. But this is ok since it should be rare.
*
* If we opted to use a separate superuser connection for the target, then we would
* have visibility issues since propagated dependencies would be invisible to
* the separate connection until we locally commit.
*/
if (HasAnyDependencyInPropagatedObjects(target))
List *createdObjectList = GetAllSupportedDependenciesForObject(target);

/* consider target as well if we're requested to create it too */
if (requiredObjectSet == REQUIRE_OBJECT_AND_DEPENDENCIES)
{
ObjectAddress *targetCopy = palloc(sizeof(ObjectAddress));
*targetCopy = *target;

createdObjectList = lappend(createdObjectList, targetCopy);
}

if (HasAnyObjectInPropagatedObjects(createdObjectList))
{
SendCommandListToRemoteNodesWithMetadata(ddlCommands);
}
Expand All @@ -150,7 +256,7 @@ EnsureDependenciesExistOnAllNodes(const ObjectAddress *target)
* that objects have been created on remote nodes before marking them
* distributed, so MarkObjectDistributed wouldn't fail.
*/
foreach_ptr(dependency, dependenciesWithCommands)
foreach_ptr(object, objectsWithCommands)
{
/*
* pg_dist_object entries must be propagated with the super user, since
Expand All @@ -160,7 +266,7 @@ EnsureDependenciesExistOnAllNodes(const ObjectAddress *target)
* Only dependent object's metadata should be propagated with super user.
* Metadata of the table itself must be propagated with the current user.
*/
MarkObjectDistributedViaSuperUser(dependency);
MarkObjectDistributedViaSuperUser(object);
}
}

Expand Down
16 changes: 16 additions & 0 deletions src/backend/distributed/commands/distribute_object_ops.c
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,17 @@ static DistributeObjectOps Any_CreateRole = {
.address = CreateRoleStmtObjectAddress,
.markDistributed = true,
};

static DistributeObjectOps Any_ReassignOwned = {
.deparse = DeparseReassignOwnedStmt,
.qualify = NULL,
.preprocess = NULL,
.postprocess = PostprocessReassignOwnedStmt,
.operationType = DIST_OPS_ALTER,
.address = NULL,
.markDistributed = false,
};

static DistributeObjectOps Any_DropOwned = {
.deparse = DeparseDropOwnedStmt,
.qualify = NULL,
Expand Down Expand Up @@ -1878,6 +1889,11 @@ GetDistributeObjectOps(Node *node)
return &Any_DropOwned;
}

case T_ReassignOwnedStmt:
{
return &Any_ReassignOwned;
}

case T_DropStmt:
{
DropStmt *stmt = castNode(DropStmt, node);
Expand Down
81 changes: 81 additions & 0 deletions src/backend/distributed/commands/owned.c
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,9 @@
#include "distributed/version_compat.h"
#include "distributed/worker_transaction.h"


static ObjectAddress * GetNewRoleAddress(ReassignOwnedStmt *stmt);

/*
* PreprocessDropOwnedStmt finds the distributed role out of the ones
* being dropped and unmarks them distributed and creates the drop statements
Expand Down Expand Up @@ -89,3 +92,81 @@ PreprocessDropOwnedStmt(Node *node, const char *queryString,

return NodeDDLTaskList(NON_COORDINATOR_NODES, commands);
}


/*
* PostprocessReassignOwnedStmt takes a Node pointer representing a REASSIGN
* OWNED statement and performs any necessary post-processing after the statement
* has been executed locally.
*
* We filter out local roles in OWNED BY clause before deparsing the command,
* meaning that we skip reassigning what is owned by local roles. However,
* if the role specified in TO clause is local, we automatically distribute
* it before deparsing the command.
*/
List *
PostprocessReassignOwnedStmt(Node *node, const char *queryString)
{
ReassignOwnedStmt *stmt = castNode(ReassignOwnedStmt, node);
List *allReassignRoles = stmt->roles;

List *distributedReassignRoles = FilterDistributedRoles(allReassignRoles);

if (list_length(distributedReassignRoles) <= 0)
{
return NIL;
}

if (!ShouldPropagate())
{
return NIL;
}

EnsureCoordinator();

stmt->roles = distributedReassignRoles;
char *sql = DeparseTreeNode((Node *) stmt);
stmt->roles = allReassignRoles;

ObjectAddress *newRoleAddress = GetNewRoleAddress(stmt);

/*
* We temporarily enable create / alter role propagation to properly
* propagate the role specified in TO clause.
*/
int saveNestLevel = NewGUCNestLevel();
set_config_option("citus.enable_create_role_propagation", "on",
(superuser() ? PGC_SUSET : PGC_USERSET), PGC_S_SESSION,
GUC_ACTION_LOCAL, true, 0, false);
set_config_option("citus.enable_alter_role_propagation", "on",
(superuser() ? PGC_SUSET : PGC_USERSET), PGC_S_SESSION,
GUC_ACTION_LOCAL, true, 0, false);

set_config_option("citus.enable_alter_role_set_propagation", "on",
(superuser() ? PGC_SUSET : PGC_USERSET), PGC_S_SESSION,
GUC_ACTION_LOCAL, true, 0, false);

EnsureObjectAndDependenciesExistOnAllNodes(newRoleAddress);

/* rollback GUCs to the state before this session */
AtEOXact_GUC(true, saveNestLevel);

List *commands = list_make3(DISABLE_DDL_PROPAGATION,
sql,
ENABLE_DDL_PROPAGATION);

return NodeDDLTaskList(NON_COORDINATOR_NODES, commands);
}


/*
* GetNewRoleAddress returns the ObjectAddress of the new role
*/
static ObjectAddress *
GetNewRoleAddress(ReassignOwnedStmt *stmt)
{
Oid roleOid = get_role_oid(stmt->newrole->rolename, false);
ObjectAddress *address = palloc0(sizeof(ObjectAddress));
ObjectAddressSet(*address, AuthIdRelationId, roleOid);
return address;
}
26 changes: 25 additions & 1 deletion src/backend/distributed/deparser/deparse_owned_stmts.c
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ AppendRoleList(StringInfo buf, List *roleList)
{
Node *roleNode = (Node *) lfirst(cell);
Assert(IsA(roleNode, RoleSpec) || IsA(roleNode, AccessPriv));
char const *rolename = NULL;
const char *rolename = NULL;
if (IsA(roleNode, RoleSpec))
{
rolename = RoleSpecString((RoleSpec *) roleNode, true);
Expand All @@ -83,3 +83,27 @@ AppendRoleList(StringInfo buf, List *roleList)
}
}
}


static void
AppendReassignOwnedStmt(StringInfo buf, ReassignOwnedStmt *stmt)
{
appendStringInfo(buf, "REASSIGN OWNED BY ");

AppendRoleList(buf, stmt->roles);
const char *newRoleName = RoleSpecString(stmt->newrole, true);
appendStringInfo(buf, " TO %s", newRoleName);
}


char *
DeparseReassignOwnedStmt(Node *node)
{
ReassignOwnedStmt *stmt = castNode(ReassignOwnedStmt, node);
StringInfoData buf = { 0 };
initStringInfo(&buf);

AppendReassignOwnedStmt(&buf, stmt);

return buf.data;
}
Loading

0 comments on commit c3579ee

Please sign in to comment.