Skip to content

Commit

Permalink
Allow isolating shard placement groups on individual nodes
Browse files Browse the repository at this point in the history
  • Loading branch information
onurctirtir committed Feb 9, 2024
1 parent 2fae91c commit 6b9ba04
Show file tree
Hide file tree
Showing 71 changed files with 5,356 additions and 470 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1454,8 +1454,9 @@ InsertMetadataForCitusLocalTable(Oid citusLocalTableId, uint64 shardId,

text *shardMinValue = NULL;
text *shardMaxValue = NULL;
bool needsSeparateNode = false;
InsertShardRow(citusLocalTableId, shardId, shardStorageType,
shardMinValue, shardMaxValue);
shardMinValue, shardMaxValue, needsSeparateNode);

List *nodeList = list_make1(CoordinatorNodeIfAddedAsWorkerOrError());

Expand Down
52 changes: 44 additions & 8 deletions src/backend/distributed/commands/create_distributed_table.c
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ static char DecideDistTableReplicationModel(char distributionMethod,
static List * HashSplitPointsForShardList(List *shardList);
static List * HashSplitPointsForShardCount(int shardCount);
static List * WorkerNodesForShardList(List *shardList);
static List * NeedsSeparateNodeForShardList(List *shardList);
static List * RoundRobinWorkerNodeList(List *workerNodeList, int listLength);
static CitusTableParams DecideCitusTableParams(CitusTableType tableType,
DistributedTableParams *
Expand Down Expand Up @@ -572,16 +573,10 @@ CreateDistributedTableConcurrently(Oid relationId, char *distributionColumnName,
colocatedTableId = ColocatedTableId(colocationId);
}

List *workerNodeList = DistributedTablePlacementNodeList(NoLock);
if (workerNodeList == NIL)
{
ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("no worker nodes are available for placing shards"),
errhint("Add more worker nodes.")));
}

List *workersForPlacementList;
List *shardSplitPointsList;
List *needsSeparateNodeForPlacementList;


if (colocatedTableId != InvalidOid)
{
Expand All @@ -596,6 +591,12 @@ CreateDistributedTableConcurrently(Oid relationId, char *distributionColumnName,
* Find the node IDs of the shard placements.
*/
workersForPlacementList = WorkerNodesForShardList(colocatedShardList);

/*
* Inherit needsseparatenode from the colocated shards.
*/
needsSeparateNodeForPlacementList =
NeedsSeparateNodeForShardList(colocatedShardList);
}
else
{
Expand All @@ -607,7 +608,21 @@ CreateDistributedTableConcurrently(Oid relationId, char *distributionColumnName,
/*
* Place shards in a round-robin fashion across all data nodes.
*/
List *workerNodeList = NewDistributedTablePlacementNodeList(NoLock);
if (workerNodeList == NIL)
{
ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE),

Check warning on line 614 in src/backend/distributed/commands/create_distributed_table.c

View check run for this annotation

Codecov / codecov/patch

src/backend/distributed/commands/create_distributed_table.c#L614

Added line #L614 was not covered by tests
errmsg("no worker nodes are available for placing shards"),
errhint("Add more worker nodes.")));
}

workersForPlacementList = RoundRobinWorkerNodeList(workerNodeList, shardCount);

/*
* For a new colocation group, needsseparatenode is set to false for
* all shards.
*/
needsSeparateNodeForPlacementList = GenerateListFromIntElement(false, shardCount);
}

/*
Expand Down Expand Up @@ -646,6 +661,7 @@ CreateDistributedTableConcurrently(Oid relationId, char *distributionColumnName,
shardToSplit->shardId,
shardSplitPointsList,
workersForPlacementList,
needsSeparateNodeForPlacementList,
distributionColumnOverrides,
sourceColocatedShardIntervalList,
colocationId
Expand Down Expand Up @@ -898,6 +914,26 @@ WorkerNodesForShardList(List *shardList)
}


/*
* NeedsSeparateNodeForShardList returns a list of node booleans reflecting whether
* each shard in the given list needs a separate node.
*/
static List *
NeedsSeparateNodeForShardList(List *shardList)
{
List *needsSeparateNodeList = NIL;

ShardInterval *shardInterval = NULL;
foreach_ptr(shardInterval, shardList)
{
needsSeparateNodeList = lappend_int(needsSeparateNodeList,
shardInterval->needsSeparateNode);
}

return needsSeparateNodeList;
}


/*
* RoundRobinWorkerNodeList round robins over the workers in the worker node list
* and adds node ids to a list of length listLength.
Expand Down
3 changes: 3 additions & 0 deletions src/backend/distributed/metadata/metadata_cache.c
Original file line number Diff line number Diff line change
Expand Up @@ -5379,6 +5379,8 @@ DeformedDistShardTupleToShardInterval(Datum *datumArray, bool *isNullArray,
char storageType = DatumGetChar(datumArray[Anum_pg_dist_shard_shardstorage - 1]);
Datum minValueTextDatum = datumArray[Anum_pg_dist_shard_shardminvalue - 1];
Datum maxValueTextDatum = datumArray[Anum_pg_dist_shard_shardmaxvalue - 1];
bool needsSeparateNode = DatumGetBool(
datumArray[Anum_pg_dist_shard_needsseparatenode - 1]);

bool minValueNull = isNullArray[Anum_pg_dist_shard_shardminvalue - 1];
bool maxValueNull = isNullArray[Anum_pg_dist_shard_shardmaxvalue - 1];
Expand Down Expand Up @@ -5415,6 +5417,7 @@ DeformedDistShardTupleToShardInterval(Datum *datumArray, bool *isNullArray,
shardInterval->minValue = minValue;
shardInterval->maxValue = maxValue;
shardInterval->shardId = shardId;
shardInterval->needsSeparateNode = needsSeparateNode;

return shardInterval;
}
Expand Down
73 changes: 68 additions & 5 deletions src/backend/distributed/metadata/metadata_sync.c
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,7 @@ PG_FUNCTION_INFO_V1(citus_internal_add_tenant_schema);
PG_FUNCTION_INFO_V1(citus_internal_delete_tenant_schema);
PG_FUNCTION_INFO_V1(citus_internal_update_none_dist_table_metadata);
PG_FUNCTION_INFO_V1(citus_internal_database_command);
PG_FUNCTION_INFO_V1(citus_internal_shard_property_set);


static bool got_SIGTERM = false;
Expand Down Expand Up @@ -1267,7 +1268,7 @@ ShardListInsertCommand(List *shardIntervalList)
StringInfo insertShardCommand = makeStringInfo();
appendStringInfo(insertShardCommand,
"WITH shard_data(relationname, shardid, storagetype, "
"shardminvalue, shardmaxvalue) AS (VALUES ");
"shardminvalue, shardmaxvalue, needsseparatenode) AS (VALUES ");

foreach_ptr(shardInterval, shardIntervalList)
{
Expand Down Expand Up @@ -1299,12 +1300,13 @@ ShardListInsertCommand(List *shardIntervalList)
}

appendStringInfo(insertShardCommand,
"(%s::regclass, %ld, '%c'::\"char\", %s, %s)",
"(%s::regclass, %ld, '%c'::\"char\", %s, %s, %s)",
quote_literal_cstr(qualifiedRelationName),
shardId,
shardInterval->storageType,
minHashToken->data,
maxHashToken->data);
maxHashToken->data,
shardInterval->needsSeparateNode ? "true" : "false");

if (llast(shardIntervalList) != shardInterval)
{
Expand All @@ -1316,7 +1318,7 @@ ShardListInsertCommand(List *shardIntervalList)

appendStringInfo(insertShardCommand,
"SELECT citus_internal.add_shard_metadata(relationname, shardid, "
"storagetype, shardminvalue, shardmaxvalue) "
"storagetype, shardminvalue, shardmaxvalue, needsseparatenode) "
"FROM shard_data;");

/*
Expand Down Expand Up @@ -3359,6 +3361,9 @@ citus_internal_add_shard_metadata(PG_FUNCTION_ARGS)
shardMaxValue = PG_GETARG_TEXT_P(4);
}

PG_ENSURE_ARGNOTNULL(5, "needs separate node");

Check warning on line 3364 in src/backend/distributed/metadata/metadata_sync.c

View check run for this annotation

Codecov / codecov/patch

src/backend/distributed/metadata/metadata_sync.c#L3364

Added line #L3364 was not covered by tests
bool needsSeparateNode = PG_GETARG_BOOL(5);

/* only owner of the table (or superuser) is allowed to add the Citus metadata */
EnsureTableOwner(relationId);

Expand All @@ -3379,7 +3384,8 @@ citus_internal_add_shard_metadata(PG_FUNCTION_ARGS)
shardMaxValue);
}

InsertShardRow(relationId, shardId, storageType, shardMinValue, shardMaxValue);
InsertShardRow(relationId, shardId, storageType, shardMinValue, shardMaxValue,
needsSeparateNode);

PG_RETURN_VOID();
}
Expand Down Expand Up @@ -4067,6 +4073,45 @@ citus_internal_database_command(PG_FUNCTION_ARGS)
}


/*
* citus_internal_shard_property_set is an internal UDF to
* set shard properties for all the shards within the shard group
* that given shard belongs to.
*/
Datum
citus_internal_shard_property_set(PG_FUNCTION_ARGS)
{
CheckCitusVersion(ERROR);

PG_ENSURE_ARGNOTNULL(0, "shard_id");

Check warning on line 4086 in src/backend/distributed/metadata/metadata_sync.c

View check run for this annotation

Codecov / codecov/patch

src/backend/distributed/metadata/metadata_sync.c#L4086

Added line #L4086 was not covered by tests
uint64 shardId = PG_GETARG_INT64(0);

/* only owner of the table (or superuser) is allowed to modify the Citus metadata */
Oid distributedRelationId = RelationIdForShard(shardId);
EnsureTableOwner(distributedRelationId);

/* we want to serialize all the metadata changes to this table */
LockRelationOid(distributedRelationId, ShareUpdateExclusiveLock);

if (!ShouldSkipMetadataChecks())
{
EnsureCitusInitiatedOperation();
}

bool *needsSeparateNodePtr = NULL;

if (!PG_ARGISNULL(1))
{
needsSeparateNodePtr = palloc(sizeof(bool));
*needsSeparateNodePtr = PG_GETARG_BOOL(1);
}

ShardgroupSetProperty(shardId, needsSeparateNodePtr);

PG_RETURN_VOID();
}


/*
* SyncNewColocationGroup synchronizes a new pg_dist_colocation entry to a worker.
*/
Expand Down Expand Up @@ -4266,6 +4311,24 @@ UpdateNoneDistTableMetadataCommand(Oid relationId, char replicationModel,
}


/*
* ShardgroupSetPropertyCommand returns a command to call
* citus_internal.shard_property_set().
*/
char *
ShardgroupSetPropertyCommand(uint64 shardId, bool *needsSeparateNodePtr)
{
char *needsSeparateNodeStr = !needsSeparateNodePtr ? "null" :
(*needsSeparateNodePtr ? "true" : "false");
StringInfo command = makeStringInfo();
appendStringInfo(command,
"SELECT citus_internal.shard_property_set(%lu, %s)",
shardId, needsSeparateNodeStr);

return command->data;
}


/*
* AddPlacementMetadataCommand returns a command to call
* citus_internal_add_placement_metadata().
Expand Down
Loading

0 comments on commit 6b9ba04

Please sign in to comment.