Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

make Distributed single shard table use logic similar to SingleShardTableShard #7572

Draft
wants to merge 6 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 10 additions & 5 deletions src/backend/distributed/commands/create_distributed_table.c
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,9 @@ static void ConvertCitusLocalTableToTableType(Oid relationId,
DistributedTableParams *
distributedTableParams);
static void CreateHashDistributedTableShards(Oid relationId, int shardCount,
Oid colocatedTableId, bool localTableEmpty);
Oid colocatedTableId,
bool localTableEmpty,
uint32 colocationId);
static void CreateSingleShardTableShard(Oid relationId, Oid colocatedTableId,
uint32 colocationId);
static uint32 ColocationIdForNewTable(Oid relationId, CitusTableType tableType,
Expand Down Expand Up @@ -1288,9 +1290,11 @@ CreateCitusTable(Oid relationId, CitusTableType tableType,
if (tableType == HASH_DISTRIBUTED)
{
/* create shards for hash distributed table */
CreateHashDistributedTableShards(relationId, distributedTableParams->shardCount,
CreateHashDistributedTableShards(relationId,
distributedTableParams->shardCount,
colocatedTableId,
localTableEmpty);
localTableEmpty,
colocationId);
}
else if (tableType == REFERENCE_TABLE)
{
Expand Down Expand Up @@ -1878,7 +1882,8 @@ DecideDistTableReplicationModel(char distributionMethod, char *colocateWithTable
*/
static void
CreateHashDistributedTableShards(Oid relationId, int shardCount,
Oid colocatedTableId, bool localTableEmpty)
Oid colocatedTableId, bool localTableEmpty,
uint32 colocationId)
{
bool useExclusiveConnection = false;

Expand Down Expand Up @@ -1917,7 +1922,7 @@ CreateHashDistributedTableShards(Oid relationId, int shardCount,
* we can directly use ShardReplicationFactor global variable here.
*/
CreateShardsWithRoundRobinPolicy(relationId, shardCount, ShardReplicationFactor,
useExclusiveConnection);
useExclusiveConnection, colocationId);
}
}

Expand Down
18 changes: 16 additions & 2 deletions src/backend/distributed/operations/create_shards.c
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,9 @@
#include "distributed/transaction_management.h"
#include "distributed/worker_manager.h"

/* Config variables managed via guc */
bool EnableSingleShardTableMultiNodePlacement = false;


/* declarations for dynamic loading */
PG_FUNCTION_INFO_V1(master_create_worker_shards);
Expand Down Expand Up @@ -81,7 +84,8 @@ master_create_worker_shards(PG_FUNCTION_ARGS)
*/
void
CreateShardsWithRoundRobinPolicy(Oid distributedTableId, int32 shardCount,
int32 replicationFactor, bool useExclusiveConnections)
int32 replicationFactor, bool useExclusiveConnections,
uint32 colocationId)
{
CitusTableCacheEntry *cacheEntry = GetCitusTableCacheEntry(distributedTableId);
List *insertedShardPlacements = NIL;
Expand Down Expand Up @@ -163,9 +167,19 @@ CreateShardsWithRoundRobinPolicy(Oid distributedTableId, int32 shardCount,
/* set shard storage type according to relation type */
char shardStorageType = ShardStorageType(distributedTableId);

int64 shardOffset = 0;
if (EnableSingleShardTableMultiNodePlacement && shardCount == 1 &&
shardStorageType == SHARD_STORAGE_TABLE)
{
/* For single shard distributed tables, use the colocationId to offset
* where the shard is placed.
*/
shardOffset = colocationId;
}

for (int64 shardIndex = 0; shardIndex < shardCount; shardIndex++)
{
uint32 roundRobinNodeIndex = shardIndex % workerNodeCount;
uint32 roundRobinNodeIndex = (shardIndex + shardOffset) % workerNodeCount;

/* initialize the hash token space for this shard */
int32 shardMinHashToken = PG_INT32_MIN + (shardIndex * hashTokenIncrement);
Expand Down
12 changes: 12 additions & 0 deletions src/backend/distributed/shared_library_init.c
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@
#include "distributed/multi_router_planner.h"
#include "distributed/multi_server_executor.h"
#include "distributed/pg_dist_partition.h"
#include "distributed/pg_dist_shard.h"
#include "distributed/placement_connection.h"
#include "distributed/priority.h"
#include "distributed/query_pushdown_planning.h"
Expand Down Expand Up @@ -1462,6 +1463,17 @@ RegisterCitusConfigVariables(void)
GUC_NO_SHOW_ALL | GUC_NOT_IN_SAMPLE,
NULL, NULL, NULL);

DefineCustomBoolVariable(
"citus.enable_single_shard_table_multi_node_placement",
gettext_noop("Enables placement of single shard distributed tables in"
" all nodes of the cluster"),
NULL,
&EnableSingleShardTableMultiNodePlacement,
false,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we want to backport this, I think it indeed makes sense for this to be false by default. But lets create a PR right after merging this to change the default to true for future releases.

PGC_USERSET,
GUC_NO_SHOW_ALL | GUC_NOT_IN_SAMPLE,
NULL, NULL, NULL);

DefineCustomBoolVariable(
"citus.enable_statistics_collection",
gettext_noop("Enables sending basic usage statistics to Citus."),
Expand Down
3 changes: 2 additions & 1 deletion src/include/distributed/coordinator_protocol.h
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,8 @@ extern void InsertShardPlacementRows(Oid relationId, int64 shardId,
extern uint64 UpdateShardStatistics(int64 shardId);
extern void CreateShardsWithRoundRobinPolicy(Oid distributedTableId, int32 shardCount,
int32 replicationFactor,
bool useExclusiveConnections);
bool useExclusiveConnections,
uint32 colocationId);
extern void CreateColocatedShards(Oid targetRelationId, Oid sourceRelationId,
bool useExclusiveConnections);
extern void CreateReferenceTableShard(Oid distributedTableId);
Expand Down
1 change: 1 addition & 0 deletions src/include/distributed/pg_dist_shard.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,5 +59,6 @@ typedef FormData_pg_dist_shard *Form_pg_dist_shard;
#define SHARD_STORAGE_TABLE 't'
#define SHARD_STORAGE_VIRTUAL 'v'

extern bool EnableSingleShardTableMultiNodePlacement;

#endif /* PG_DIST_SHARD_H */
49 changes: 49 additions & 0 deletions src/test/regress/expected/multi_create_table.out
Original file line number Diff line number Diff line change
Expand Up @@ -466,3 +466,52 @@ select create_reference_table('temp_table');
ERROR: cannot distribute a temporary table
DROP TABLE temp_table;
DROP TABLE shard_count_table_3;
-- test shard count 1 placement with colocate none.
-- create a base table instance
set citus.enable_single_shard_table_multi_node_placement to on;
CREATE TABLE shard_count_table_1_inst_1 (a int);
SELECT create_distributed_table('shard_count_table_1_inst_1', 'a', shard_count:=1, colocate_with:='none');
create_distributed_table
---------------------------------------------------------------------

(1 row)

-- create another table with similar requirements
CREATE TABLE shard_count_table_1_inst_2 (a int);
SELECT create_distributed_table('shard_count_table_1_inst_2', 'a', shard_count:=1, colocate_with:='none');
create_distributed_table
---------------------------------------------------------------------

(1 row)

-- Now check placement:
SELECT (SELECT colocation_id FROM citus_tables WHERE table_name = 'shard_count_table_1_inst_1'::regclass) != (SELECT colocation_id FROM citus_tables WHERE table_name = 'shard_count_table_1_inst_2'::regclass);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not testing the intended thing afaict. You want to test that the placement of the shard is different, not that the colocation id is different. The colocation id would also be different if citus.enable_single_shard_table_multi_node_placement was set to off.

The placement of the shards can be checked easily in pg_dist_shard_placement or citus_shards.

?column?
---------------------------------------------------------------------
t
(1 row)

-- double check shard counts
SELECT (SELECT shard_count FROM citus_tables WHERE table_name = 'shard_count_table_1_inst_1'::regclass) = (SELECT shard_count FROM citus_tables WHERE table_name = 'shard_count_table_1_inst_2'::regclass);
?column?
---------------------------------------------------------------------
t
(1 row)

SELECT shard_count = 1 FROM citus_tables WHERE table_name = 'shard_count_table_1_inst_1'::regclass;
?column?
---------------------------------------------------------------------
t
(1 row)

-- check placement: These should be placed on different workers.
SELECT nodename || ':' || nodeport AS inst_1_node_endpoint FROM citus_shards WHERE table_name = 'shard_count_table_1_inst_1'::regclass \gset
SELECT nodename || ':' || nodeport AS inst_2_node_endpoint FROM citus_shards WHERE table_name = 'shard_count_table_1_inst_2'::regclass \gset
SELECT :'inst_1_node_endpoint', :'inst_2_node_endpoint', :'inst_1_node_endpoint' = :'inst_2_node_endpoint';
?column? | ?column? | ?column?
---------------------------------------------------------------------
localhost:xxxxx | localhost:xxxxx | f
(1 row)

DROP TABLE shard_count_table_1_inst_1;
DROP TABLE shard_count_table_1_inst_2;
39 changes: 35 additions & 4 deletions src/test/regress/expected/multi_mx_create_table.out
Original file line number Diff line number Diff line change
Expand Up @@ -416,6 +416,23 @@ SELECT create_distributed_table('company_employees_mx', 'company_id');

(1 row)

CREATE TABLE articles_single_shard_hash_mx_partition_inst1 (LIKE articles_single_shard_hash_mx);
CREATE TABLE articles_single_shard_hash_mx_partition_inst2 (LIKE articles_single_shard_hash_mx);
SET citus.shard_count TO 1;
SET citus.enable_single_shard_table_multi_node_placement to on;
SELECT create_distributed_table('articles_single_shard_hash_mx_partition_inst1', 'author_id', colocate_with => 'none');
create_distributed_table
---------------------------------------------------------------------

(1 row)

SELECT create_distributed_table('articles_single_shard_hash_mx_partition_inst2', 'author_id', colocate_with => 'none');
create_distributed_table
---------------------------------------------------------------------

(1 row)

set citus.enable_single_shard_table_multi_node_placement to off;
WITH shard_counts AS (
SELECT logicalrelid, count(*) AS shard_count FROM pg_dist_shard GROUP BY logicalrelid
)
Expand Down Expand Up @@ -447,7 +464,9 @@ ORDER BY colocationid, logicalrelid;
labs_mx | 1220007 | 1 | h | s
objects_mx | 1220007 | 1 | h | s
articles_single_shard_hash_mx | 1220007 | 1 | h | s
(23 rows)
articles_single_shard_hash_mx_partition_inst1 | 1220008 | 1 | h | s
articles_single_shard_hash_mx_partition_inst2 | 1220009 | 1 | h | s
(25 rows)

-- check the citus_tables view
SELECT table_name, citus_table_type, distribution_column, shard_count, table_owner
Expand All @@ -458,6 +477,8 @@ ORDER BY table_name::text;
app_analytics_events_mx | distributed | app_id | 4 | postgres
articles_hash_mx | distributed | author_id | 2 | postgres
articles_single_shard_hash_mx | distributed | author_id | 1 | postgres
articles_single_shard_hash_mx_partition_inst1 | distributed | author_id | 1 | postgres
articles_single_shard_hash_mx_partition_inst2 | distributed | author_id | 1 | postgres
citus_mx_test_schema.nation_hash | distributed | n_nationkey | 16 | postgres
citus_mx_test_schema.nation_hash_collation_search_path | distributed | n_nationkey | 4 | postgres
citus_mx_test_schema.nation_hash_composite_types | distributed | n_nationkey | 4 | postgres
Expand All @@ -478,7 +499,7 @@ ORDER BY table_name::text;
part_mx | reference | <none> | 1 | postgres
researchers_mx | distributed | lab_id | 2 | postgres
supplier_mx | reference | <none> | 1 | postgres
(23 rows)
(25 rows)

\c - - - :worker_1_port
SET client_min_messages TO WARNING;
Expand All @@ -490,6 +511,8 @@ ORDER BY table_name::text;
app_analytics_events_mx | distributed | app_id | 4 | postgres
articles_hash_mx | distributed | author_id | 2 | postgres
articles_single_shard_hash_mx | distributed | author_id | 1 | postgres
articles_single_shard_hash_mx_partition_inst1 | distributed | author_id | 1 | postgres
articles_single_shard_hash_mx_partition_inst2 | distributed | author_id | 1 | postgres
citus_mx_test_schema.nation_hash | distributed | n_nationkey | 16 | postgres
citus_mx_test_schema.nation_hash_collation_search_path | distributed | n_nationkey | 4 | postgres
citus_mx_test_schema.nation_hash_composite_types | distributed | n_nationkey | 4 | postgres
Expand All @@ -510,7 +533,7 @@ ORDER BY table_name::text;
part_mx | reference | <none> | 1 | postgres
researchers_mx | distributed | lab_id | 2 | postgres
supplier_mx | reference | <none> | 1 | postgres
(23 rows)
(25 rows)

SELECT shard_name, table_name, citus_table_type, shard_size FROM citus_shards ORDER BY shard_name::text;
shard_name | table_name | citus_table_type | shard_size
Expand Down Expand Up @@ -546,6 +569,14 @@ SELECT shard_name, table_name, citus_table_type, shard_size FROM citus_shards OR
articles_single_shard_hash_mx_1220106 | articles_single_shard_hash_mx | distributed | 0
articles_single_shard_hash_mx_1220106 | articles_single_shard_hash_mx | distributed | 0
articles_single_shard_hash_mx_1220106 | articles_single_shard_hash_mx | distributed | 0
articles_single_shard_hash_mx_partition_inst1_1220111 | articles_single_shard_hash_mx_partition_inst1 | distributed | 0
articles_single_shard_hash_mx_partition_inst1_1220111 | articles_single_shard_hash_mx_partition_inst1 | distributed | 0
articles_single_shard_hash_mx_partition_inst1_1220111 | articles_single_shard_hash_mx_partition_inst1 | distributed | 0
articles_single_shard_hash_mx_partition_inst1_1220111 | articles_single_shard_hash_mx_partition_inst1 | distributed | 0
articles_single_shard_hash_mx_partition_inst1_1220111 | articles_single_shard_hash_mx_partition_inst1 | distributed | 0
articles_single_shard_hash_mx_partition_inst1_1220111 | articles_single_shard_hash_mx_partition_inst1 | distributed | 0
articles_single_shard_hash_mx_partition_inst1_1220111 | articles_single_shard_hash_mx_partition_inst1 | distributed | 0
articles_single_shard_hash_mx_partition_inst2_1220112 | articles_single_shard_hash_mx_partition_inst2 | distributed | 0
citus_mx_test_schema.nation_hash_1220016 | citus_mx_test_schema.nation_hash | distributed | 0
citus_mx_test_schema.nation_hash_1220016 | citus_mx_test_schema.nation_hash | distributed | 0
citus_mx_test_schema.nation_hash_1220016 | citus_mx_test_schema.nation_hash | distributed | 0
Expand Down Expand Up @@ -984,7 +1015,7 @@ SELECT shard_name, table_name, citus_table_type, shard_size FROM citus_shards OR
supplier_mx_1220087 | supplier_mx | reference | 0
supplier_mx_1220087 | supplier_mx | reference | 0
supplier_mx_1220087 | supplier_mx | reference | 0
(469 rows)
(477 rows)

-- Show that altering type name is not supported from worker node
ALTER TYPE citus_mx_test_schema.order_side_mx RENAME TO temp_order_side_mx;
Expand Down
67 changes: 58 additions & 9 deletions src/test/regress/expected/multi_router_planner.out
Original file line number Diff line number Diff line change
Expand Up @@ -774,15 +774,64 @@ SELECT a.author_id as first_author, b.word_count as second_word_count
FROM articles_hash a, articles_single_shard_hash b
WHERE a.author_id = 10 and a.author_id = b.author_id
ORDER BY 1,2 LIMIT 3;
DEBUG: Creating router plan
DEBUG: query has a single distribution column value: 10
first_author | second_word_count
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@JelteF scenarios like this can change behavior coz the single shard is now on a different node

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think that's a big issue (although we should modify the test accordingly).

The reason I don't think it's a big issue is that it requires setting citus.enable_non_colocated_router_query_pushdown. Which already is a not-recommended GUC, because it basically requires that you don't ever do re-balancing.

---------------------------------------------------------------------
10 | 19519
10 | 19519
10 | 19519
(3 rows)

DEBUG: found no worker with all shard placements
DEBUG: push down of limit count: 3
DEBUG: join prunable for task partitionId 0 and 1
DEBUG: join prunable for task partitionId 0 and 2
DEBUG: join prunable for task partitionId 0 and 3
DEBUG: join prunable for task partitionId 0 and 4
DEBUG: join prunable for task partitionId 0 and 5
DEBUG: join prunable for task partitionId 1 and 0
DEBUG: join prunable for task partitionId 1 and 2
DEBUG: join prunable for task partitionId 1 and 3
DEBUG: join prunable for task partitionId 1 and 4
DEBUG: join prunable for task partitionId 1 and 5
DEBUG: join prunable for task partitionId 2 and 0
DEBUG: join prunable for task partitionId 2 and 1
DEBUG: join prunable for task partitionId 2 and 3
DEBUG: join prunable for task partitionId 2 and 4
DEBUG: join prunable for task partitionId 2 and 5
DEBUG: join prunable for task partitionId 3 and 0
DEBUG: join prunable for task partitionId 3 and 1
DEBUG: join prunable for task partitionId 3 and 2
DEBUG: join prunable for task partitionId 3 and 4
DEBUG: join prunable for task partitionId 3 and 5
DEBUG: join prunable for task partitionId 4 and 0
DEBUG: join prunable for task partitionId 4 and 1
DEBUG: join prunable for task partitionId 4 and 2
DEBUG: join prunable for task partitionId 4 and 3
DEBUG: join prunable for task partitionId 4 and 5
DEBUG: join prunable for task partitionId 5 and 0
DEBUG: join prunable for task partitionId 5 and 1
DEBUG: join prunable for task partitionId 5 and 2
DEBUG: join prunable for task partitionId 5 and 3
DEBUG: join prunable for task partitionId 5 and 4
DEBUG: pruning merge fetch taskId 1
DETAIL: Creating dependency on merge taskId 2
DEBUG: pruning merge fetch taskId 2
DETAIL: Creating dependency on merge taskId 2
DEBUG: pruning merge fetch taskId 4
DETAIL: Creating dependency on merge taskId 4
DEBUG: pruning merge fetch taskId 5
DETAIL: Creating dependency on merge taskId 4
DEBUG: pruning merge fetch taskId 7
DETAIL: Creating dependency on merge taskId 6
DEBUG: pruning merge fetch taskId 8
DETAIL: Creating dependency on merge taskId 6
DEBUG: pruning merge fetch taskId 10
DETAIL: Creating dependency on merge taskId 8
DEBUG: pruning merge fetch taskId 11
DETAIL: Creating dependency on merge taskId 8
DEBUG: pruning merge fetch taskId 13
DETAIL: Creating dependency on merge taskId 10
DEBUG: pruning merge fetch taskId 14
DETAIL: Creating dependency on merge taskId 10
DEBUG: pruning merge fetch taskId 16
DETAIL: Creating dependency on merge taskId 12
DEBUG: pruning merge fetch taskId 17
DETAIL: Creating dependency on merge taskId 12
ERROR: the query contains a join that requires repartitioning
HINT: Set citus.enable_repartition_joins to on to enable repartitioning
SET citus.enable_non_colocated_router_query_pushdown TO OFF;
-- but this is not the case otherwise
SELECT a.author_id as first_author, b.word_count as second_word_count
Expand Down
Loading
Loading