From 6b8df85a88df700fb6bdcc78b60b4d10780ab88f Mon Sep 17 00:00:00 2001 From: Vinod Sridharan <14185211+visridha@users.noreply.github.com> Date: Wed, 3 Apr 2024 19:28:26 +0000 Subject: [PATCH] Update distributed placement for single shard tables --- .../commands/create_distributed_table.c | 15 ++++-- .../distributed/operations/create_shards.c | 6 ++- .../distributed/coordinator_protocol.h | 3 +- .../regress/expected/multi_create_table.out | 48 +++++++++++++++++++ src/test/regress/sql/multi_create_table.sql | 24 ++++++++++ 5 files changed, 88 insertions(+), 8 deletions(-) diff --git a/src/backend/distributed/commands/create_distributed_table.c b/src/backend/distributed/commands/create_distributed_table.c index 5ec6d6dd7bb..a955d09aa30 100644 --- a/src/backend/distributed/commands/create_distributed_table.c +++ b/src/backend/distributed/commands/create_distributed_table.c @@ -146,7 +146,9 @@ static void ConvertCitusLocalTableToTableType(Oid relationId, DistributedTableParams * distributedTableParams); static void CreateHashDistributedTableShards(Oid relationId, int shardCount, - Oid colocatedTableId, bool localTableEmpty); + Oid colocatedTableId, + bool localTableEmpty, + uint32 colocationId); static void CreateSingleShardTableShard(Oid relationId, Oid colocatedTableId, uint32 colocationId); static uint32 ColocationIdForNewTable(Oid relationId, CitusTableType tableType, @@ -1288,9 +1290,11 @@ CreateCitusTable(Oid relationId, CitusTableType tableType, if (tableType == HASH_DISTRIBUTED) { /* create shards for hash distributed table */ - CreateHashDistributedTableShards(relationId, distributedTableParams->shardCount, + CreateHashDistributedTableShards(relationId, + distributedTableParams->shardCount, colocatedTableId, - localTableEmpty); + localTableEmpty, + colocationId); } else if (tableType == REFERENCE_TABLE) { @@ -1878,7 +1882,8 @@ DecideDistTableReplicationModel(char distributionMethod, char *colocateWithTable */ static void CreateHashDistributedTableShards(Oid relationId, int shardCount, - Oid colocatedTableId, bool localTableEmpty) + Oid colocatedTableId, bool localTableEmpty, + uint32 colocationId) { bool useExclusiveConnection = false; @@ -1917,7 +1922,7 @@ CreateHashDistributedTableShards(Oid relationId, int shardCount, * we can directly use ShardReplicationFactor global variable here. */ CreateShardsWithRoundRobinPolicy(relationId, shardCount, ShardReplicationFactor, - useExclusiveConnection); + useExclusiveConnection, colocationId); } } diff --git a/src/backend/distributed/operations/create_shards.c b/src/backend/distributed/operations/create_shards.c index 96254705122..27fb8c04f79 100644 --- a/src/backend/distributed/operations/create_shards.c +++ b/src/backend/distributed/operations/create_shards.c @@ -81,7 +81,8 @@ master_create_worker_shards(PG_FUNCTION_ARGS) */ void CreateShardsWithRoundRobinPolicy(Oid distributedTableId, int32 shardCount, - int32 replicationFactor, bool useExclusiveConnections) + int32 replicationFactor, bool useExclusiveConnections, + uint32 colocationId) { CitusTableCacheEntry *cacheEntry = GetCitusTableCacheEntry(distributedTableId); List *insertedShardPlacements = NIL; @@ -162,10 +163,11 @@ CreateShardsWithRoundRobinPolicy(Oid distributedTableId, int32 shardCount, /* set shard storage type according to relation type */ char shardStorageType = ShardStorageType(distributedTableId); + int64 shardOffset = shardCount == 1 ? colocationId : 0; for (int64 shardIndex = 0; shardIndex < shardCount; shardIndex++) { - uint32 roundRobinNodeIndex = shardIndex % workerNodeCount; + uint32 roundRobinNodeIndex = (shardIndex + shardOffset) % workerNodeCount; /* initialize the hash token space for this shard */ int32 shardMinHashToken = PG_INT32_MIN + (shardIndex * hashTokenIncrement); diff --git a/src/include/distributed/coordinator_protocol.h b/src/include/distributed/coordinator_protocol.h index b2170fd2edc..8c25ae7ab96 100644 --- a/src/include/distributed/coordinator_protocol.h +++ b/src/include/distributed/coordinator_protocol.h @@ -259,7 +259,8 @@ extern void InsertShardPlacementRows(Oid relationId, int64 shardId, extern uint64 UpdateShardStatistics(int64 shardId); extern void CreateShardsWithRoundRobinPolicy(Oid distributedTableId, int32 shardCount, int32 replicationFactor, - bool useExclusiveConnections); + bool useExclusiveConnections, + uint32 colocationId); extern void CreateColocatedShards(Oid targetRelationId, Oid sourceRelationId, bool useExclusiveConnections); extern void CreateReferenceTableShard(Oid distributedTableId); diff --git a/src/test/regress/expected/multi_create_table.out b/src/test/regress/expected/multi_create_table.out index 61c3c8fe121..8d1ca713f16 100644 --- a/src/test/regress/expected/multi_create_table.out +++ b/src/test/regress/expected/multi_create_table.out @@ -466,3 +466,51 @@ select create_reference_table('temp_table'); ERROR: cannot distribute a temporary table DROP TABLE temp_table; DROP TABLE shard_count_table_3; +-- test shard count 1 placement with colocate none. +-- create a base table instance +CREATE TABLE shard_count_table_1_inst_1 (a int); +SELECT create_distributed_table('shard_count_table_1_inst_1', 'a', shard_count:=1, colocate_with:='none'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +-- create another table with similar requirements +CREATE TABLE shard_count_table_1_inst_2 (a int); +SELECT create_distributed_table('shard_count_table_1_inst_2', 'a', shard_count:=1, colocate_with:='none'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +-- Now check placement: +SELECT (SELECT colocation_id FROM citus_tables WHERE table_name = 'shard_count_table_1_inst_1'::regclass) != (SELECT colocation_id FROM citus_tables WHERE table_name = 'shard_count_table_1_inst_2'::regclass); + ?column? +--------------------------------------------------------------------- + t +(1 row) + +-- double check shard counts +SELECT (SELECT shard_count FROM citus_tables WHERE table_name = 'shard_count_table_1_inst_1'::regclass) = (SELECT shard_count FROM citus_tables WHERE table_name = 'shard_count_table_1_inst_2'::regclass); + ?column? +--------------------------------------------------------------------- + t +(1 row) + +SELECT shard_count = 1 FROM citus_tables WHERE table_name = 'shard_count_table_1_inst_1'::regclass; + ?column? +--------------------------------------------------------------------- + t +(1 row) + +-- check placement: These should be placed on different workers. +SELECT nodename || ':' || nodeport AS inst_1_node_endpoint FROM citus_shards WHERE table_name = 'shard_count_table_1_inst_1'::regclass \gset +SELECT nodename || ':' || nodeport AS inst_2_node_endpoint FROM citus_shards WHERE table_name = 'shard_count_table_1_inst_2'::regclass \gset +SELECT :'inst_1_node_endpoint' = :'inst_2_node_endpoint'; + ?column? +--------------------------------------------------------------------- + f +(1 row) + +DROP TABLE shard_count_table_1_inst_1; +DROP TABLE shard_count_table_1_inst_2; diff --git a/src/test/regress/sql/multi_create_table.sql b/src/test/regress/sql/multi_create_table.sql index 9bf34090957..34afb3efba3 100644 --- a/src/test/regress/sql/multi_create_table.sql +++ b/src/test/regress/sql/multi_create_table.sql @@ -291,3 +291,27 @@ select create_reference_table('temp_table'); DROP TABLE temp_table; DROP TABLE shard_count_table_3; + +-- test shard count 1 placement with colocate none. +-- create a base table instance +CREATE TABLE shard_count_table_1_inst_1 (a int); +SELECT create_distributed_table('shard_count_table_1_inst_1', 'a', shard_count:=1, colocate_with:='none'); + +-- create another table with similar requirements +CREATE TABLE shard_count_table_1_inst_2 (a int); +SELECT create_distributed_table('shard_count_table_1_inst_2', 'a', shard_count:=1, colocate_with:='none'); + +-- Now check placement: +SELECT (SELECT colocation_id FROM citus_tables WHERE table_name = 'shard_count_table_1_inst_1'::regclass) != (SELECT colocation_id FROM citus_tables WHERE table_name = 'shard_count_table_1_inst_2'::regclass); + +-- double check shard counts +SELECT (SELECT shard_count FROM citus_tables WHERE table_name = 'shard_count_table_1_inst_1'::regclass) = (SELECT shard_count FROM citus_tables WHERE table_name = 'shard_count_table_1_inst_2'::regclass); +SELECT shard_count = 1 FROM citus_tables WHERE table_name = 'shard_count_table_1_inst_1'::regclass; + +-- check placement: These should be placed on different workers. +SELECT nodename || ':' || nodeport AS inst_1_node_endpoint FROM citus_shards WHERE table_name = 'shard_count_table_1_inst_1'::regclass \gset +SELECT nodename || ':' || nodeport AS inst_2_node_endpoint FROM citus_shards WHERE table_name = 'shard_count_table_1_inst_2'::regclass \gset +SELECT :'inst_1_node_endpoint' = :'inst_2_node_endpoint'; + +DROP TABLE shard_count_table_1_inst_1; +DROP TABLE shard_count_table_1_inst_2; \ No newline at end of file