Skip to content

Commit

Permalink
improve code / comments
Browse files Browse the repository at this point in the history
  • Loading branch information
onurctirtir committed Nov 30, 2023
1 parent 55a7159 commit 52d9ccf
Show file tree
Hide file tree
Showing 13 changed files with 140 additions and 70 deletions.
42 changes: 41 additions & 1 deletion src/backend/distributed/metadata/metadata_sync.c
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,8 @@
#include "distributed/relation_access_tracking.h"
#include "distributed/remote_commands.h"
#include "distributed/resource_lock.h"
#include "distributed/serialize_distributed_ddls.h"
#include "distributed/tenant_schema_metadata.h"
#include "distributed/utils/array_type.h"
#include "distributed/utils/function.h"
#include "distributed/version_compat.h"
#include "distributed/worker_manager.h"
Expand Down Expand Up @@ -184,6 +184,7 @@ PG_FUNCTION_INFO_V1(citus_internal_add_tenant_schema);
PG_FUNCTION_INFO_V1(citus_internal_delete_tenant_schema);
PG_FUNCTION_INFO_V1(citus_internal_update_none_dist_table_metadata);
PG_FUNCTION_INFO_V1(citus_internal_database_command);
PG_FUNCTION_INFO_V1(citus_internal_acquire_citus_advisory_object_class_lock);


static bool got_SIGTERM = false;
Expand Down Expand Up @@ -4049,6 +4050,29 @@ citus_internal_database_command(PG_FUNCTION_ARGS)
}


/*
* citus_internal_acquire_citus_advisory_object_class_lock is an internal UDF
* to call AcquireCitusAdvisoryObjectClassLock().
*/
Datum
citus_internal_acquire_citus_advisory_object_class_lock(PG_FUNCTION_ARGS)
{
CheckCitusVersion(ERROR);

PG_ENSURE_ARGNOTNULL(0, "object_class");
ObjectClass objectClass = PG_GETARG_INT32(0);

if (!ShouldSkipMetadataChecks())
{
EnsureCitusInitiatedOperation();
}

AcquireCitusAdvisoryObjectClassLock(objectClass);

PG_RETURN_VOID();
}


/*
* SyncNewColocationGroup synchronizes a new pg_dist_colocation entry to a worker.
*/
Expand Down Expand Up @@ -4279,6 +4303,22 @@ DeletePlacementMetadataCommand(uint64 placementId)
}


/*
* AcquireCitusAdvisoryObjectClassLockCommand returns a command to call
* citus_internal_acquire_citus_advisory_object_class_lock().
*/
char *
AcquireCitusAdvisoryObjectClassLockCommand(ObjectClass objectClass)
{
StringInfo command = makeStringInfo();
appendStringInfo(command,
"SELECT pg_catalog.citus_internal_acquire_citus_advisory_object_class_lock(%d)",
objectClass);

return command->data;
}


/*
* RemoteSchemaIdExpressionById returns an expression in text form that
* can be used to obtain the OID of the schema with given schema id on a
Expand Down
78 changes: 27 additions & 51 deletions src/backend/distributed/serialize_distributed_ddls.c
Original file line number Diff line number Diff line change
@@ -1,3 +1,12 @@
/*-------------------------------------------------------------------------
*
* serialize_distributed_ddls.c
*
* This file contains functions for serializing distributed DDLs.
*
*-------------------------------------------------------------------------
*/

#include "postgres.h"

#include "miscadmin.h"
Expand All @@ -7,25 +16,27 @@

#include "distributed/adaptive_executor.h"
#include "distributed/deparse_shard_query.h"
#include "distributed/metadata_sync.h"
#include "distributed/resource_lock.h"
#include "distributed/serialize_distributed_ddls.h"


PG_FUNCTION_INFO_V1(citus_internal_acquire_advisory_catalog_lock);


static char * AcquireAdvisoryCatalogLockCommand(ObjectClass objectClass);
static void AcquireAdvisoryCatalogLock(ObjectClass objectClass);
static bool IsNodeWideObjectClass(ObjectClass objectClass);


/*
* SerializeDistributedDDLsOnObjectClass serializes distributed DDLs that
* target given object class by acquiring a Citus specific advisory lock
* on the first primary worker node.
*
* The lock is acquired via a coordinated transaction. For this reason,
* it automatically gets released when (maybe implicit) transaction on
* current server commits or rolls back.
*/
void
SerializeDistributedDDLsOnObjectClass(ObjectClass objectClass)
{
Task *task = CitusMakeNode(Task);
task->taskType = DDL_TASK;

char *command = AcquireAdvisoryCatalogLockCommand(objectClass);
char *command = AcquireCitusAdvisoryObjectClassLockCommand(objectClass);
SetTaskQueryString(task, command);

WorkerNode *firstWorkerNode = GetFirstPrimaryWorkerNode();
Expand All @@ -42,53 +53,18 @@ SerializeDistributedDDLsOnObjectClass(ObjectClass objectClass)
}


static char *
AcquireAdvisoryCatalogLockCommand(ObjectClass objectClass)
{
StringInfo command = makeStringInfo();
appendStringInfo(command,
"SELECT pg_catalog.citus_internal_acquire_advisory_catalog_lock(%d)",
objectClass);

return command->data;
}


Datum
citus_internal_acquire_advisory_catalog_lock(PG_FUNCTION_ARGS)
{
ObjectClass objectClass = PG_GETARG_INT32(0);

AcquireAdvisoryCatalogLock(objectClass);

PG_RETURN_VOID();
}


static void
AcquireAdvisoryCatalogLock(ObjectClass objectClass)
/*
* AcquireCitusAdvisoryObjectClassLock acquires a Citus specific advisory
* ExclusiveLock based on given object class.
*/
void
AcquireCitusAdvisoryObjectClassLock(ObjectClass objectClass)
{
Oid databaseId = IsNodeWideObjectClass(objectClass) ? InvalidOid : MyDatabaseId;

LOCKTAG locktag;
SET_LOCKTAG_CITUS_DDL_FROM_ANY_NODE(locktag, databaseId, objectClass);
SET_LOCKTAG_GLOBAL_DDL_SERIALIZATION(locktag, objectClass);

LOCKMODE lockmode = ExclusiveLock;
bool sessionLock = false;
bool dontWait = false;
LockAcquire(&locktag, lockmode, sessionLock, dontWait);
}


static bool
IsNodeWideObjectClass(ObjectClass objectClass)
{
switch (objectClass)
{
case OCLASS_DATABASE:
return true;

default:
return false;
}
}
2 changes: 1 addition & 1 deletion src/backend/distributed/sql/citus--12.1-1--12.2-1.sql
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,4 @@

#include "udfs/citus_internal_database_command/12.2-1.sql"
#include "udfs/citus_add_rebalance_strategy/12.2-1.sql"
#include "udfs/citus_internal_acquire_advisory_catalog_lock/12.2-1.sql"
#include "udfs/citus_internal_acquire_citus_advisory_object_class_lock/12.2-1.sql"
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
-- citus--12.2-1--12.1-1

DROP FUNCTION pg_catalog.citus_internal_database_command(text);
DROP FUNCTION pg_catalog.citus_internal_acquire_advisory_catalog_lock(int);
DROP FUNCTION pg_catalog.citus_internal_acquire_citus_advisory_object_class_lock(int);

#include "../udfs/citus_add_rebalance_strategy/10.1-1.sql"

This file was deleted.

This file was deleted.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
CREATE OR REPLACE FUNCTION pg_catalog.citus_internal_acquire_citus_advisory_object_class_lock(objectClass int)
RETURNS void
LANGUAGE C
VOLATILE
AS 'MODULE_PATHNAME', $$citus_internal_acquire_citus_advisory_object_class_lock$$;
2 changes: 2 additions & 0 deletions src/include/distributed/metadata_sync.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
#define METADATA_SYNC_H


#include "catalog/dependency.h"
#include "nodes/pg_list.h"

#include "distributed/commands/utility_hook.h"
Expand Down Expand Up @@ -146,6 +147,7 @@ extern char * UpdateNoneDistTableMetadataCommand(Oid relationId, char replicatio
extern char * AddPlacementMetadataCommand(uint64 shardId, uint64 placementId,
uint64 shardLength, int32 groupId);
extern char * DeletePlacementMetadataCommand(uint64 placementId);
extern char * AcquireCitusAdvisoryObjectClassLockCommand(ObjectClass objectClass);

extern MetadataSyncContext * CreateMetadataSyncContext(List *nodeList,
bool collectCommands,
Expand Down
43 changes: 39 additions & 4 deletions src/include/distributed/resource_lock.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

#include "c.h"

#include "catalog/dependency.h"
#include "nodes/pg_list.h"
#include "storage/lock.h"
#include "tcop/utility.h"
Expand Down Expand Up @@ -46,7 +47,7 @@ typedef enum AdvisoryLocktagClass
ADV_LOCKTAG_CLASS_CITUS_LOGICAL_REPLICATION = 12,
ADV_LOCKTAG_CLASS_CITUS_REBALANCE_PLACEMENT_COLOCATION = 13,
ADV_LOCKTAG_CLASS_CITUS_BACKGROUND_TASK = 14,
ADV_LOCKTAG_CLASS_CITUS_DDL_FROM_ANY_NODE = 15
ADV_LOCKTAG_CLASS_CITUS_GLOBAL_DDL_SERIALIZATION = 15
} AdvisoryLocktagClass;

/* CitusOperations has constants for citus operations */
Expand Down Expand Up @@ -143,13 +144,47 @@ typedef enum CitusOperations
(uint32) (taskId), \
ADV_LOCKTAG_CLASS_CITUS_BACKGROUND_TASK)

/*
* IsNodeWideObjectClass returns true if the given object class is node-wide,
* i.e., that is not bound to a particular database but to whole server.
*
* Defined here as an inlined function so that SET_LOCKTAG_GLOBAL_DDL_SERIALIZATION
* macro can use it.
*/
static inline bool
IsNodeWideObjectClass(ObjectClass objectClass)
{
switch (objectClass)
{
case OCLASS_DATABASE:
{
return true;
}

default:
ereport(ERROR, (errmsg("unexpected object class: %d", objectClass)));
}
}

#define SET_LOCKTAG_CITUS_DDL_FROM_ANY_NODE(tag, databaseId, objectClass) \

/*
* Reuse advisory lock, but with different, unused field 4 (14).
*
* Automatically sets databaseId to MyDatabaseId if the object class is
* node-wide, i.e., that is not bound to a particular database but to
* whole server. If the object class is not node-wide, sets databaseId
* to InvalidOid.
*
* That way, the lock is local to each database if the object class is
* not node-wide, and global if it is.
*/
#define SET_LOCKTAG_GLOBAL_DDL_SERIALIZATION(tag, objectClass) \
SET_LOCKTAG_ADVISORY(tag, \
databaseId, \
(uint32) (IsNodeWideObjectClass(objectClass) ? InvalidOid : \
MyDatabaseId), \
(uint32) 0, \
(uint32) objectClass, \
ADV_LOCKTAG_CLASS_CITUS_DDL_FROM_ANY_NODE)
ADV_LOCKTAG_CLASS_CITUS_GLOBAL_DDL_SERIALIZATION)

/*
* DistLockConfigs are used to configure the locking behaviour of AcquireDistributedLockOnRelations
Expand Down
17 changes: 17 additions & 0 deletions src/include/distributed/serialize_distributed_ddls.h
Original file line number Diff line number Diff line change
@@ -1,3 +1,12 @@
/*-------------------------------------------------------------------------
*
* serialize_distributed_ddls.h
*
* Declarations for public functions related to serializing distributed
* DDLs.
*
*-------------------------------------------------------------------------
*/

#ifndef SERIALIZE_DDLS_OVER_CATALOG_H
#define SERIALIZE_DDLS_OVER_CATALOG_H
Expand All @@ -8,4 +17,12 @@

extern void SerializeDistributedDDLsOnObjectClass(ObjectClass objectClass);

/*
* This function may not make much sense by itself. It's mainly
* exported for its remote variant
* (citus_internal_acquire_citus_advisory_object_class_lock()) defined
* in metadata-sync layer (metadata_sync.c).
*/
extern void AcquireCitusAdvisoryObjectClassLock(ObjectClass objectClass);

#endif /* SERIALIZE_DDLS_OVER_CATALOG_H */
2 changes: 1 addition & 1 deletion src/test/regress/expected/multi_extension.out
Original file line number Diff line number Diff line change
Expand Up @@ -1422,7 +1422,7 @@ ALTER EXTENSION citus UPDATE TO '12.2-1';
SELECT * FROM multi_extension.print_extension_changes();
previous_object | current_object
---------------------------------------------------------------------
| function citus_internal_acquire_advisory_catalog_lock(integer) void
| function citus_internal_acquire_citus_advisory_object_class_lock(integer) void
| function citus_internal_database_command(text) void
(2 rows)

Expand Down
2 changes: 1 addition & 1 deletion src/test/regress/expected/upgrade_list_citus_objects.out
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ ORDER BY 1;
function citus_internal.refresh_isolation_tester_prepared_statement()
function citus_internal.replace_isolation_tester_func()
function citus_internal.restore_isolation_tester_func()
function citus_internal_acquire_advisory_catalog_lock(integer)
function citus_internal_acquire_citus_advisory_object_class_lock(integer)
function citus_internal_add_colocation_metadata(integer,integer,integer,regtype,oid)
function citus_internal_add_object_metadata(text,text[],text[],integer,integer,boolean)
function citus_internal_add_partition_metadata(regclass,"char",text,integer,"char")
Expand Down

0 comments on commit 52d9ccf

Please sign in to comment.