Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

make Distributed single shard table use logic similar to SingleShardTableShard #7572

Draft
wants to merge 6 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 10 additions & 5 deletions src/backend/distributed/commands/create_distributed_table.c
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,9 @@ static void ConvertCitusLocalTableToTableType(Oid relationId,
DistributedTableParams *
distributedTableParams);
static void CreateHashDistributedTableShards(Oid relationId, int shardCount,
Oid colocatedTableId, bool localTableEmpty);
Oid colocatedTableId,
bool localTableEmpty,
uint32 colocationId);
static void CreateSingleShardTableShard(Oid relationId, Oid colocatedTableId,
uint32 colocationId);
static uint32 ColocationIdForNewTable(Oid relationId, CitusTableType tableType,
Expand Down Expand Up @@ -1288,9 +1290,11 @@ CreateCitusTable(Oid relationId, CitusTableType tableType,
if (tableType == HASH_DISTRIBUTED)
{
/* create shards for hash distributed table */
CreateHashDistributedTableShards(relationId, distributedTableParams->shardCount,
CreateHashDistributedTableShards(relationId,
distributedTableParams->shardCount,
colocatedTableId,
localTableEmpty);
localTableEmpty,
colocationId);
}
else if (tableType == REFERENCE_TABLE)
{
Expand Down Expand Up @@ -1878,7 +1882,8 @@ DecideDistTableReplicationModel(char distributionMethod, char *colocateWithTable
*/
static void
CreateHashDistributedTableShards(Oid relationId, int shardCount,
Oid colocatedTableId, bool localTableEmpty)
Oid colocatedTableId, bool localTableEmpty,
uint32 colocationId)
{
bool useExclusiveConnection = false;

Expand Down Expand Up @@ -1917,7 +1922,7 @@ CreateHashDistributedTableShards(Oid relationId, int shardCount,
* we can directly use ShardReplicationFactor global variable here.
*/
CreateShardsWithRoundRobinPolicy(relationId, shardCount, ShardReplicationFactor,
useExclusiveConnection);
useExclusiveConnection, colocationId);
}
}

Expand Down
14 changes: 12 additions & 2 deletions src/backend/distributed/operations/create_shards.c
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,8 @@ master_create_worker_shards(PG_FUNCTION_ARGS)
*/
void
CreateShardsWithRoundRobinPolicy(Oid distributedTableId, int32 shardCount,
int32 replicationFactor, bool useExclusiveConnections)
int32 replicationFactor, bool useExclusiveConnections,
uint32 colocationId)
{
CitusTableCacheEntry *cacheEntry = GetCitusTableCacheEntry(distributedTableId);
List *insertedShardPlacements = NIL;
Expand Down Expand Up @@ -163,9 +164,18 @@ CreateShardsWithRoundRobinPolicy(Oid distributedTableId, int32 shardCount,
/* set shard storage type according to relation type */
char shardStorageType = ShardStorageType(distributedTableId);

int64 shardOffset = 0;
if (shardCount == 1 && shardStorageType == SHARD_STORAGE_TABLE)
{
/* For single shard distributed tables, use the colocationId to offset
* where the shard is placed.
*/
shardOffset = colocationId;
}

for (int64 shardIndex = 0; shardIndex < shardCount; shardIndex++)
{
uint32 roundRobinNodeIndex = shardIndex % workerNodeCount;
uint32 roundRobinNodeIndex = (shardIndex + shardOffset) % workerNodeCount;

/* initialize the hash token space for this shard */
int32 shardMinHashToken = PG_INT32_MIN + (shardIndex * hashTokenIncrement);
Expand Down
3 changes: 2 additions & 1 deletion src/include/distributed/coordinator_protocol.h
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,8 @@ extern void InsertShardPlacementRows(Oid relationId, int64 shardId,
extern uint64 UpdateShardStatistics(int64 shardId);
extern void CreateShardsWithRoundRobinPolicy(Oid distributedTableId, int32 shardCount,
int32 replicationFactor,
bool useExclusiveConnections);
bool useExclusiveConnections,
uint32 colocationId);
extern void CreateColocatedShards(Oid targetRelationId, Oid sourceRelationId,
bool useExclusiveConnections);
extern void CreateReferenceTableShard(Oid distributedTableId);
Expand Down
48 changes: 48 additions & 0 deletions src/test/regress/expected/multi_create_table.out
Original file line number Diff line number Diff line change
Expand Up @@ -466,3 +466,51 @@ select create_reference_table('temp_table');
ERROR: cannot distribute a temporary table
DROP TABLE temp_table;
DROP TABLE shard_count_table_3;
-- test shard count 1 placement with colocate none.
-- create a base table instance
CREATE TABLE shard_count_table_1_inst_1 (a int);
SELECT create_distributed_table('shard_count_table_1_inst_1', 'a', shard_count:=1, colocate_with:='none');
create_distributed_table
---------------------------------------------------------------------

(1 row)

-- create another table with similar requirements
CREATE TABLE shard_count_table_1_inst_2 (a int);
SELECT create_distributed_table('shard_count_table_1_inst_2', 'a', shard_count:=1, colocate_with:='none');
create_distributed_table
---------------------------------------------------------------------

(1 row)

-- Now check placement:
SELECT (SELECT colocation_id FROM citus_tables WHERE table_name = 'shard_count_table_1_inst_1'::regclass) != (SELECT colocation_id FROM citus_tables WHERE table_name = 'shard_count_table_1_inst_2'::regclass);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not testing the intended thing afaict. You want to test that the placement of the shard is different, not that the colocation id is different. The colocation id would also be different if citus.enable_single_shard_table_multi_node_placement was set to off.

The placement of the shards can be checked easily in pg_dist_shard_placement or citus_shards.

?column?
---------------------------------------------------------------------
t
(1 row)

-- double check shard counts
SELECT (SELECT shard_count FROM citus_tables WHERE table_name = 'shard_count_table_1_inst_1'::regclass) = (SELECT shard_count FROM citus_tables WHERE table_name = 'shard_count_table_1_inst_2'::regclass);
?column?
---------------------------------------------------------------------
t
(1 row)

SELECT shard_count = 1 FROM citus_tables WHERE table_name = 'shard_count_table_1_inst_1'::regclass;
?column?
---------------------------------------------------------------------
t
(1 row)

-- check placement: These should be placed on different workers.
SELECT nodename || ':' || nodeport AS inst_1_node_endpoint FROM citus_shards WHERE table_name = 'shard_count_table_1_inst_1'::regclass \gset
SELECT nodename || ':' || nodeport AS inst_2_node_endpoint FROM citus_shards WHERE table_name = 'shard_count_table_1_inst_2'::regclass \gset
SELECT :'inst_1_node_endpoint' = :'inst_2_node_endpoint';
?column?
---------------------------------------------------------------------
f
(1 row)

DROP TABLE shard_count_table_1_inst_1;
DROP TABLE shard_count_table_1_inst_2;
20 changes: 1 addition & 19 deletions src/test/regress/expected/multi_mx_create_table.out
Original file line number Diff line number Diff line change
Expand Up @@ -540,12 +540,6 @@ SELECT shard_name, table_name, citus_table_type, shard_size FROM citus_shards OR
articles_hash_mx_1220104 | articles_hash_mx | distributed | 0
articles_hash_mx_1220105 | articles_hash_mx | distributed | 0
articles_single_shard_hash_mx_1220106 | articles_single_shard_hash_mx | distributed | 0
articles_single_shard_hash_mx_1220106 | articles_single_shard_hash_mx | distributed | 0
articles_single_shard_hash_mx_1220106 | articles_single_shard_hash_mx | distributed | 0
articles_single_shard_hash_mx_1220106 | articles_single_shard_hash_mx | distributed | 0
articles_single_shard_hash_mx_1220106 | articles_single_shard_hash_mx | distributed | 0
articles_single_shard_hash_mx_1220106 | articles_single_shard_hash_mx | distributed | 0
articles_single_shard_hash_mx_1220106 | articles_single_shard_hash_mx | distributed | 0
citus_mx_test_schema.nation_hash_1220016 | citus_mx_test_schema.nation_hash | distributed | 0
citus_mx_test_schema.nation_hash_1220016 | citus_mx_test_schema.nation_hash | distributed | 0
citus_mx_test_schema.nation_hash_1220016 | citus_mx_test_schema.nation_hash | distributed | 0
Expand Down Expand Up @@ -715,12 +709,6 @@ SELECT shard_name, table_name, citus_table_type, shard_size FROM citus_shards OR
customer_mx_1220084 | customer_mx | reference | 0
customer_mx_1220084 | customer_mx | reference | 0
labs_mx_1220102 | labs_mx | distributed | 8192
labs_mx_1220102 | labs_mx | distributed | 8192
labs_mx_1220102 | labs_mx | distributed | 8192
labs_mx_1220102 | labs_mx | distributed | 8192
labs_mx_1220102 | labs_mx | distributed | 8192
labs_mx_1220102 | labs_mx | distributed | 8192
labs_mx_1220102 | labs_mx | distributed | 8192
limit_orders_mx_1220092 | limit_orders_mx | distributed | 16384
limit_orders_mx_1220092 | limit_orders_mx | distributed | 16384
limit_orders_mx_1220092 | limit_orders_mx | distributed | 16384
Expand Down Expand Up @@ -890,12 +878,6 @@ SELECT shard_name, table_name, citus_table_type, shard_size FROM citus_shards OR
nation_mx_1220085 | nation_mx | reference | 0
nation_mx_1220085 | nation_mx | reference | 0
objects_mx_1220103 | objects_mx | distributed | 16384
objects_mx_1220103 | objects_mx | distributed | 16384
objects_mx_1220103 | objects_mx | distributed | 16384
objects_mx_1220103 | objects_mx | distributed | 16384
objects_mx_1220103 | objects_mx | distributed | 16384
objects_mx_1220103 | objects_mx | distributed | 16384
objects_mx_1220103 | objects_mx | distributed | 16384
orders_mx_1220068 | orders_mx | distributed | 8192
orders_mx_1220068 | orders_mx | distributed | 8192
orders_mx_1220068 | orders_mx | distributed | 8192
Expand Down Expand Up @@ -984,7 +966,7 @@ SELECT shard_name, table_name, citus_table_type, shard_size FROM citus_shards OR
supplier_mx_1220087 | supplier_mx | reference | 0
supplier_mx_1220087 | supplier_mx | reference | 0
supplier_mx_1220087 | supplier_mx | reference | 0
(469 rows)
(451 rows)

-- Show that altering type name is not supported from worker node
ALTER TYPE citus_mx_test_schema.order_side_mx RENAME TO temp_order_side_mx;
Expand Down
67 changes: 58 additions & 9 deletions src/test/regress/expected/multi_router_planner.out
Original file line number Diff line number Diff line change
Expand Up @@ -774,15 +774,64 @@ SELECT a.author_id as first_author, b.word_count as second_word_count
FROM articles_hash a, articles_single_shard_hash b
WHERE a.author_id = 10 and a.author_id = b.author_id
ORDER BY 1,2 LIMIT 3;
DEBUG: Creating router plan
DEBUG: query has a single distribution column value: 10
first_author | second_word_count
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@JelteF scenarios like this can change behavior coz the single shard is now on a different node

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think that's a big issue (although we should modify the test accordingly).

The reason I don't think it's a big issue is that it requires setting citus.enable_non_colocated_router_query_pushdown. Which already is a not-recommended GUC, because it basically requires that you don't ever do re-balancing.

---------------------------------------------------------------------
10 | 19519
10 | 19519
10 | 19519
(3 rows)

DEBUG: found no worker with all shard placements
DEBUG: push down of limit count: 3
DEBUG: join prunable for task partitionId 0 and 1
DEBUG: join prunable for task partitionId 0 and 2
DEBUG: join prunable for task partitionId 0 and 3
DEBUG: join prunable for task partitionId 0 and 4
DEBUG: join prunable for task partitionId 0 and 5
DEBUG: join prunable for task partitionId 1 and 0
DEBUG: join prunable for task partitionId 1 and 2
DEBUG: join prunable for task partitionId 1 and 3
DEBUG: join prunable for task partitionId 1 and 4
DEBUG: join prunable for task partitionId 1 and 5
DEBUG: join prunable for task partitionId 2 and 0
DEBUG: join prunable for task partitionId 2 and 1
DEBUG: join prunable for task partitionId 2 and 3
DEBUG: join prunable for task partitionId 2 and 4
DEBUG: join prunable for task partitionId 2 and 5
DEBUG: join prunable for task partitionId 3 and 0
DEBUG: join prunable for task partitionId 3 and 1
DEBUG: join prunable for task partitionId 3 and 2
DEBUG: join prunable for task partitionId 3 and 4
DEBUG: join prunable for task partitionId 3 and 5
DEBUG: join prunable for task partitionId 4 and 0
DEBUG: join prunable for task partitionId 4 and 1
DEBUG: join prunable for task partitionId 4 and 2
DEBUG: join prunable for task partitionId 4 and 3
DEBUG: join prunable for task partitionId 4 and 5
DEBUG: join prunable for task partitionId 5 and 0
DEBUG: join prunable for task partitionId 5 and 1
DEBUG: join prunable for task partitionId 5 and 2
DEBUG: join prunable for task partitionId 5 and 3
DEBUG: join prunable for task partitionId 5 and 4
DEBUG: pruning merge fetch taskId 1
DETAIL: Creating dependency on merge taskId 2
DEBUG: pruning merge fetch taskId 2
DETAIL: Creating dependency on merge taskId 2
DEBUG: pruning merge fetch taskId 4
DETAIL: Creating dependency on merge taskId 4
DEBUG: pruning merge fetch taskId 5
DETAIL: Creating dependency on merge taskId 4
DEBUG: pruning merge fetch taskId 7
DETAIL: Creating dependency on merge taskId 6
DEBUG: pruning merge fetch taskId 8
DETAIL: Creating dependency on merge taskId 6
DEBUG: pruning merge fetch taskId 10
DETAIL: Creating dependency on merge taskId 8
DEBUG: pruning merge fetch taskId 11
DETAIL: Creating dependency on merge taskId 8
DEBUG: pruning merge fetch taskId 13
DETAIL: Creating dependency on merge taskId 10
DEBUG: pruning merge fetch taskId 14
DETAIL: Creating dependency on merge taskId 10
DEBUG: pruning merge fetch taskId 16
DETAIL: Creating dependency on merge taskId 12
DEBUG: pruning merge fetch taskId 17
DETAIL: Creating dependency on merge taskId 12
ERROR: the query contains a join that requires repartitioning
HINT: Set citus.enable_repartition_joins to on to enable repartitioning
SET citus.enable_non_colocated_router_query_pushdown TO OFF;
-- but this is not the case otherwise
SELECT a.author_id as first_author, b.word_count as second_word_count
Expand Down
24 changes: 24 additions & 0 deletions src/test/regress/sql/multi_create_table.sql
Original file line number Diff line number Diff line change
Expand Up @@ -291,3 +291,27 @@ select create_reference_table('temp_table');
DROP TABLE temp_table;

DROP TABLE shard_count_table_3;

-- test shard count 1 placement with colocate none.
-- create a base table instance
CREATE TABLE shard_count_table_1_inst_1 (a int);
SELECT create_distributed_table('shard_count_table_1_inst_1', 'a', shard_count:=1, colocate_with:='none');

-- create another table with similar requirements
CREATE TABLE shard_count_table_1_inst_2 (a int);
SELECT create_distributed_table('shard_count_table_1_inst_2', 'a', shard_count:=1, colocate_with:='none');

-- Now check placement:
SELECT (SELECT colocation_id FROM citus_tables WHERE table_name = 'shard_count_table_1_inst_1'::regclass) != (SELECT colocation_id FROM citus_tables WHERE table_name = 'shard_count_table_1_inst_2'::regclass);

-- double check shard counts
SELECT (SELECT shard_count FROM citus_tables WHERE table_name = 'shard_count_table_1_inst_1'::regclass) = (SELECT shard_count FROM citus_tables WHERE table_name = 'shard_count_table_1_inst_2'::regclass);
SELECT shard_count = 1 FROM citus_tables WHERE table_name = 'shard_count_table_1_inst_1'::regclass;

-- check placement: These should be placed on different workers.
SELECT nodename || ':' || nodeport AS inst_1_node_endpoint FROM citus_shards WHERE table_name = 'shard_count_table_1_inst_1'::regclass \gset
SELECT nodename || ':' || nodeport AS inst_2_node_endpoint FROM citus_shards WHERE table_name = 'shard_count_table_1_inst_2'::regclass \gset
SELECT :'inst_1_node_endpoint' = :'inst_2_node_endpoint';

DROP TABLE shard_count_table_1_inst_1;
DROP TABLE shard_count_table_1_inst_2;
Loading