diff --git a/src/backend/distributed/operations/repair_shards.c b/src/backend/distributed/operations/repair_shards.c index 26928fd3af2..8fbea8127b2 100644 --- a/src/backend/distributed/operations/repair_shards.c +++ b/src/backend/distributed/operations/repair_shards.c @@ -20,6 +20,7 @@ #include "access/htup_details.h" #include "catalog/pg_class.h" #include "catalog/pg_enum.h" +#include "distributed/adaptive_executor.h" #include "distributed/citus_ruleutils.h" #include "distributed/colocation_utils.h" #include "distributed/commands.h" @@ -38,6 +39,7 @@ #include "distributed/remote_commands.h" #include "distributed/resource_lock.h" #include "distributed/shard_rebalancer.h" +#include "distributed/shard_split.h" #include "distributed/worker_manager.h" #include "distributed/worker_protocol.h" #include "distributed/worker_transaction.h" @@ -1180,6 +1182,9 @@ CopyShardTablesViaBlockWrites(List *shardIntervalList, char *sourceNodeName, ALLOCSET_DEFAULT_SIZES); MemoryContext oldContext = MemoryContextSwitchTo(localContext); + WorkerNode *sourceNode = FindWorkerNode(sourceNodeName, sourceNodePort); + WorkerNode *targetNode = FindWorkerNode(targetNodeName, targetNodePort); + /* iterate through the colocated shards and copy each */ ShardInterval *shardInterval = NULL; foreach_ptr(shardInterval, shardIntervalList) @@ -1199,9 +1204,12 @@ CopyShardTablesViaBlockWrites(List *shardIntervalList, char *sourceNodeName, char *tableOwner = TableOwner(shardInterval->relationId); SendCommandListToWorkerOutsideTransaction(targetNodeName, targetNodePort, tableOwner, ddlCommandList); + } - ddlCommandList = NIL; - + int taskId = 0; + List *copyTaskList = NIL; + foreach_ptr(shardInterval, shardIntervalList) + { /* * Skip copying data for partitioned tables, because they contain no * data themselves. Their partitions do contain data, but those are @@ -1209,13 +1217,37 @@ CopyShardTablesViaBlockWrites(List *shardIntervalList, char *sourceNodeName, */ if (!PartitionedTable(shardInterval->relationId)) { - ddlCommandList = CopyShardContentsCommandList(shardInterval, sourceNodeName, - sourceNodePort); + StringInfo splitCopyUdfCommand = CreateSplitCopyCommand( + shardInterval, + list_make1(shardInterval), + list_make1(targetNode)); + + Task *copyTask = CreateBasicTask( + INVALID_JOB_ID, + taskId, + READ_TASK, + splitCopyUdfCommand->data); + + ShardPlacement *taskPlacement = CitusMakeNode(ShardPlacement); + SetPlacementNodeMetadata(taskPlacement, sourceNode); + + copyTask->taskPlacementList = list_make1(taskPlacement); + + copyTaskList = lappend(copyTaskList, copyTask); + taskId++; } - ddlCommandList = list_concat( - ddlCommandList, + } + + ExecuteTaskListOutsideTransaction(ROW_MODIFY_NONE, copyTaskList, + MaxAdaptiveExecutorPoolSize, + NULL /* jobIdList (ignored by API implementation) */); + + foreach_ptr(shardInterval, shardIntervalList) + { + List *ddlCommandList = PostLoadShardCreationCommandList(shardInterval, sourceNodeName, - sourceNodePort)); + sourceNodePort); + char *tableOwner = TableOwner(shardInterval->relationId); SendCommandListToWorkerOutsideTransaction(targetNodeName, targetNodePort, tableOwner, ddlCommandList); diff --git a/src/backend/distributed/operations/shard_split.c b/src/backend/distributed/operations/shard_split.c index 76687434ee6..9b44f1e3439 100644 --- a/src/backend/distributed/operations/shard_split.c +++ b/src/backend/distributed/operations/shard_split.c @@ -78,9 +78,6 @@ static void DoSplitCopy(WorkerNode *sourceShardNode, List *sourceColocatedShardIntervalList, List *shardGroupSplitIntervalListList, List *workersForPlacementList); -static StringInfo CreateSplitCopyCommand(ShardInterval *sourceShardSplitInterval, - List *splitChildrenShardIntervalList, - List *workersForPlacementList); static void InsertSplitChildrenShardMetadata(List *shardGroupSplitIntervalListList, List *workersForPlacementList); static void CreatePartitioningHierarchy(List *shardGroupSplitIntervalListList, @@ -760,7 +757,7 @@ DoSplitCopy(WorkerNode *sourceShardNode, List *sourceColocatedShardIntervalList, * ] * ); */ -static StringInfo +StringInfo CreateSplitCopyCommand(ShardInterval *sourceShardSplitInterval, List *splitChildrenShardIntervalList, List *destinationWorkerNodesList) diff --git a/src/backend/distributed/operations/worker_split_copy_udf.c b/src/backend/distributed/operations/worker_split_copy_udf.c index 2b33654f977..721fac60b34 100644 --- a/src/backend/distributed/operations/worker_split_copy_udf.c +++ b/src/backend/distributed/operations/worker_split_copy_udf.c @@ -80,9 +80,18 @@ worker_split_copy(PG_FUNCTION_ARGS) } EState *executor = CreateExecutorState(); - DestReceiver *splitCopyDestReceiver = CreatePartitionedSplitCopyDestReceiver(executor, - shardIntervalToSplitCopy, - splitCopyInfoList); + DestReceiver *destReceiver = NULL; + if (list_length(splitCopyInfoList) == 1) + { + destReceiver = *CreateShardCopyDestReceivers(executor, shardIntervalToSplitCopy, + splitCopyInfoList); + } + else + { + destReceiver = CreatePartitionedSplitCopyDestReceiver(executor, + shardIntervalToSplitCopy, + splitCopyInfoList); + } Oid sourceShardToCopySchemaOId = get_rel_namespace( shardIntervalToSplitCopy->relationId); @@ -99,7 +108,7 @@ worker_split_copy(PG_FUNCTION_ARGS) ParamListInfo params = NULL; ExecuteQueryStringIntoDestReceiver(selectShardQueryForCopy->data, params, - (DestReceiver *) splitCopyDestReceiver); + destReceiver); FreeExecutorState(executor); @@ -244,6 +253,10 @@ CreatePartitionedSplitCopyDestReceiver(EState *estate, shardIntervalToSplitCopy->relationId); char partitionMethod = cacheEntry->partitionMethod; Var *partitionColumn = cacheEntry->partitionColumn; + if (partitionColumn == NULL) + { + ereport(ERROR, (errmsg("it's not possible to split reference tables"))); + } CitusTableCacheEntry *shardSearchInfo = QueryTupleShardSearchInfo(minValuesArray, maxValuesArray, diff --git a/src/backend/distributed/utils/reference_table_utils.c b/src/backend/distributed/utils/reference_table_utils.c index e271d0ceba1..865029cd856 100644 --- a/src/backend/distributed/utils/reference_table_utils.c +++ b/src/backend/distributed/utils/reference_table_utils.c @@ -207,6 +207,13 @@ EnsureReferenceTablesExistOnAllNodesExtended(char transferMode) CopyShardPlacementToWorkerNodeQuery(sourceShardPlacement, newWorkerNode, transferMode); + + /* + * The placement copy command uses distributed execution to copy + * the shard, this is all fine so we temporarily allow it. + */ + ExecuteCriticalRemoteCommand(connection, + "SET LOCAL citus.allow_nested_distributed_execution = true"); ExecuteCriticalRemoteCommand(connection, placementCopyCommand->data); RemoteTransactionCommit(connection); } diff --git a/src/include/distributed/shard_split.h b/src/include/distributed/shard_split.h index 790e3d6126e..75a14a33521 100644 --- a/src/include/distributed/shard_split.h +++ b/src/include/distributed/shard_split.h @@ -37,6 +37,9 @@ extern void SplitShard(SplitMode splitMode, uint64 shardIdToSplit, List *shardSplitPointsList, List *nodeIdsForPlacementList); +extern StringInfo CreateSplitCopyCommand(ShardInterval *sourceShardSplitInterval, + List *splitChildrenShardIntervalList, + List *destinationWorkerNodesList); /* TODO(niupre): Make all these APIs private when all consumers (Example : ISOLATE_TENANT_TO_NEW_SHARD) directly call 'SplitShard' API. */ extern void ErrorIfCannotSplitShard(SplitOperation splitOperation, diff --git a/src/test/regress/expected/failure_offline_move_shard_placement.out b/src/test/regress/expected/failure_offline_move_shard_placement.out index a6ecee18e59..bdd45449bdc 100644 --- a/src/test/regress/expected/failure_offline_move_shard_placement.out +++ b/src/test/regress/expected/failure_offline_move_shard_placement.out @@ -91,8 +91,8 @@ SELECT citus.mitmproxy('conn.onQuery(query="CREATE TABLE move_shard_offline.t"). SELECT master_move_shard_placement(201, 'localhost', :worker_1_port, 'localhost', :worker_2_proxy_port, 'block_writes'); ERROR: canceling statement due to user request --- failure on blocking append_table_to_shard operation on target node -SELECT citus.mitmproxy('conn.onQuery(query="worker_append_table_to_shard").kill()'); +-- failure on blocking COPY operation on target node +SELECT citus.mitmproxy('conn.onQuery(query="COPY").kill()'); mitmproxy --------------------------------------------------------------------- @@ -101,8 +101,9 @@ SELECT citus.mitmproxy('conn.onQuery(query="worker_append_table_to_shard").kill( SELECT master_move_shard_placement(201, 'localhost', :worker_1_port, 'localhost', :worker_2_proxy_port, 'block_writes'); ERROR: connection not open CONTEXT: while executing command on localhost:xxxxx --- cancellation on blocking append_table_to_shard operation on target node -SELECT citus.mitmproxy('conn.onQuery(query="worker_append_table_to_shard").cancel(' || :pid || ')'); +while executing command on localhost:xxxxx +-- cancellation on blocking COPY operation on target node +SELECT citus.mitmproxy('conn.onQuery(query="COPY").cancel(' || :pid || ')'); mitmproxy --------------------------------------------------------------------- @@ -129,25 +130,6 @@ SELECT citus.mitmproxy('conn.onQuery(query="ADD CONSTRAINT").cancel(' || :pid || SELECT master_move_shard_placement(201, 'localhost', :worker_1_port, 'localhost', :worker_2_proxy_port, 'block_writes'); ERROR: canceling statement due to user request --- failure on CopyData operation on source node -SELECT citus.mitmproxy('conn.onCopyData().kill()'); - mitmproxy ---------------------------------------------------------------------- - -(1 row) - -SELECT master_move_shard_placement(200, 'localhost', :worker_2_proxy_port, 'localhost', :worker_1_port, 'block_writes'); -ERROR: could not copy table "t_200" from "localhost:xxxxx" -CONTEXT: while executing command on localhost:xxxxx --- cancellation on CopyData operation on source node -SELECT citus.mitmproxy('conn.onCopyData().cancel(' || :pid || ')'); - mitmproxy ---------------------------------------------------------------------- - -(1 row) - -SELECT master_move_shard_placement(200, 'localhost', :worker_2_proxy_port, 'localhost', :worker_1_port, 'block_writes'); -ERROR: canceling statement due to user request CALL citus_cleanup_orphaned_shards(); -- Verify that the shard is not moved and the number of rows are still 100k SELECT citus.mitmproxy('conn.allow()'); diff --git a/src/test/regress/expected/ignoring_orphaned_shards.out b/src/test/regress/expected/ignoring_orphaned_shards.out index 2c12a5ae4e8..64225500f03 100644 --- a/src/test/regress/expected/ignoring_orphaned_shards.out +++ b/src/test/regress/expected/ignoring_orphaned_shards.out @@ -298,70 +298,40 @@ NOTICE: issuing ROLLBACK DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx NOTICE: issuing ROLLBACK DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -SET citus.shard_replication_factor TO 1; -SET citus.next_shard_id TO 92448500; -CREATE TABLE range1(id int); -SELECT create_distributed_table('range1', 'id', 'range'); - create_distributed_table ---------------------------------------------------------------------- - -(1 row) - -CALL public.create_range_partitioned_shards('range1', '{0,3}','{2,5}'); --- Move shard placement and clean it up -SELECT citus_move_shard_placement(92448500, 'localhost', :worker_2_port, 'localhost', :worker_1_port, 'block_writes'); - citus_move_shard_placement ---------------------------------------------------------------------- - -(1 row) - -CALL citus_cleanup_orphaned_shards(); -NOTICE: cleaned up 3 orphaned shards -SELECT shardid, shardstate, nodeport FROM pg_dist_shard_placement WHERE shardid = 92448300 ORDER BY placementid; - shardid | shardstate | nodeport ---------------------------------------------------------------------- - 92448300 | 1 | 57638 - 92448300 | 1 | 57637 -(2 rows) - -SET citus.next_shard_id TO 92448600; -CREATE TABLE range2(id int); -SELECT create_distributed_table('range2', 'id', 'range'); - create_distributed_table ---------------------------------------------------------------------- - -(1 row) - -CALL public.create_range_partitioned_shards('range2', '{0,3}','{2,5}'); --- Mark tables co-located -UPDATE pg_dist_partition SET colocationid = 30001 -WHERE logicalrelid = 'range1'::regclass OR logicalrelid = 'range2'::regclass; --- Move shard placement and DON'T clean it up, now range1 and range2 are --- colocated, but only range2 has an orphaned shard. -SELECT citus_move_shard_placement(92448600, 'localhost', :worker_2_port, 'localhost', :worker_1_port, 'block_writes'); - citus_move_shard_placement ---------------------------------------------------------------------- - -(1 row) - -SELECT shardid, shardstate, nodeport FROM pg_dist_shard_placement WHERE shardid = 92448600 ORDER BY placementid; - shardid | shardstate | nodeport ---------------------------------------------------------------------- - 92448600 | 4 | 57638 - 92448600 | 1 | 57637 -(2 rows) - --- Make sure co-located join works -SELECT * FROM range1 JOIN range2 ON range1.id = range2.id; - id | id ---------------------------------------------------------------------- -(0 rows) - --- Make sure we can create a foreign key on community edition, because --- replication factor is 1 -ALTER TABLE range1 -ADD CONSTRAINT range1_ref_fk -FOREIGN KEY (id) -REFERENCES ref(id); +-- TODO: Re-enable this test once we metadatasync range partioned tables +-- SET citus.shard_replication_factor TO 1; +-- SET citus.next_shard_id TO 92448500; +-- CREATE TABLE range1(id int); +-- SELECT create_distributed_table('range1', 'id', 'range'); +-- CALL public.create_range_partitioned_shards('range1', '{0,3}','{2,5}'); +-- +-- -- Move shard placement and clean it up +-- SELECT citus_move_shard_placement(92448500, 'localhost', :worker_2_port, 'localhost', :worker_1_port, 'block_writes'); +-- CALL citus_cleanup_orphaned_shards(); +-- SELECT shardid, shardstate, nodeport FROM pg_dist_shard_placement WHERE shardid = 92448300 ORDER BY placementid; +-- +-- SET citus.next_shard_id TO 92448600; +-- CREATE TABLE range2(id int); +-- SELECT create_distributed_table('range2', 'id', 'range'); +-- CALL public.create_range_partitioned_shards('range2', '{0,3}','{2,5}'); +-- +-- -- Mark tables co-located +-- UPDATE pg_dist_partition SET colocationid = 30001 +-- WHERE logicalrelid = 'range1'::regclass OR logicalrelid = 'range2'::regclass; +-- +-- -- Move shard placement and DON'T clean it up, now range1 and range2 are +-- -- colocated, but only range2 has an orphaned shard. +-- SELECT citus_move_shard_placement(92448600, 'localhost', :worker_2_port, 'localhost', :worker_1_port, 'block_writes'); +-- SELECT shardid, shardstate, nodeport FROM pg_dist_shard_placement WHERE shardid = 92448600 ORDER BY placementid; +-- +-- -- Make sure co-located join works +-- SELECT * FROM range1 JOIN range2 ON range1.id = range2.id; +-- +-- -- Make sure we can create a foreign key on community edition, because +-- -- replication factor is 1 +-- ALTER TABLE range1 +-- ADD CONSTRAINT range1_ref_fk +-- FOREIGN KEY (id) +-- REFERENCES ref(id); SET client_min_messages TO WARNING; DROP SCHEMA ignoring_orphaned_shards CASCADE; diff --git a/src/test/regress/expected/shard_rebalancer.out b/src/test/regress/expected/shard_rebalancer.out index b9d928acbc2..aab94dad101 100644 --- a/src/test/regress/expected/shard_rebalancer.out +++ b/src/test/regress/expected/shard_rebalancer.out @@ -416,18 +416,31 @@ SELECT unnest(shard_placement_replication_array( 2 )); ERROR: could not find a target for shard xxxxx +SET client_min_messages TO WARNING; +set citus.shard_count = 4; +-- Create a distributed table with all shards on a single node, so that we can +-- use this as an under-replicated +SET citus.shard_replication_factor TO 1; +SELECT * from master_set_node_property('localhost', :worker_1_port, 'shouldhaveshards', false); + master_set_node_property +--------------------------------------------------------------------- + +(1 row) + +CREATE TABLE replication_test_table(int_column int); +SELECT create_distributed_table('replication_test_table', 'int_column'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +UPDATE pg_dist_partition SET repmodel = 'c' WHERE logicalrelid = 'replication_test_table'::regclass; +INSERT INTO replication_test_table SELECT * FROM generate_series(1, 100); -- Ensure that shard_replication_factor is 2 during replicate_table_shards -- and rebalance_table_shards tests SET citus.shard_replication_factor TO 2; --- Turn off NOTICE messages -SET client_min_messages TO WARNING; --- Create a single-row test data for shard rebalancer test shards -CREATE TABLE shard_rebalancer_test_data AS SELECT 1::int as int_column; --- Test replicate_table_shards, which will in turn test update_shard_placement --- in copy mode. -CREATE TABLE replication_test_table(int_column int); -SELECT master_create_distributed_table('replication_test_table', 'int_column', 'append'); - master_create_distributed_table +SELECT * from master_set_node_property('localhost', :worker_1_port, 'shouldhaveshards', true); + master_set_node_property --------------------------------------------------------------------- (1 row) @@ -438,37 +451,14 @@ CREATE VIEW replication_test_table_placements_per_node AS AND shardstate != 4 GROUP BY nodename, nodeport ORDER BY nodename, nodeport; -WARNING: "view replication_test_table_placements_per_node" has dependency to "table replication_test_table" that is not in Citus' metadata -DETAIL: "view replication_test_table_placements_per_node" will be created only locally -HINT: Distribute "table replication_test_table" first to distribute "view replication_test_table_placements_per_node" --- Create four shards with replication factor 2, and delete the placements --- with smaller port number to simulate under-replicated shards. -SELECT count(master_create_empty_shard('replication_test_table')) - FROM generate_series(1, 4); - count ---------------------------------------------------------------------- - 4 -(1 row) - -DELETE FROM pg_dist_shard_placement WHERE placementid in ( - SELECT pg_dist_shard_placement.placementid - FROM pg_dist_shard_placement NATURAL JOIN pg_dist_shard - WHERE logicalrelid = 'replication_test_table'::regclass - AND (nodename, nodeport) = (SELECT nodename, nodeport FROM pg_dist_shard_placement - ORDER BY nodename, nodeport limit 1) -); --- Upload the test data to the shards -\COPY replication_test_table FROM PROGRAM 'echo 1' WITH (format 'csv', append_to_shard xxxxx) -\COPY replication_test_table FROM PROGRAM 'echo 1' WITH (format 'csv', append_to_shard xxxxx) -\COPY replication_test_table FROM PROGRAM 'echo 1' WITH (format 'csv', append_to_shard xxxxx) -\COPY replication_test_table FROM PROGRAM 'echo 1' WITH (format 'csv', append_to_shard xxxxx) --- Verify that there is one node with all placements SELECT * FROM replication_test_table_placements_per_node; count --------------------------------------------------------------------- 4 (1 row) +-- Test replicate_table_shards, which will in turn test update_shard_placement +-- in copy mode. -- Check excluded_shard_list by excluding three shards with smaller ids SELECT replicate_table_shards('replication_test_table', excluded_shard_list := excluded_shard_list, @@ -540,19 +530,22 @@ SELECT * FROM replication_test_table_placements_per_node; SELECT count(*) FROM replication_test_table; count --------------------------------------------------------------------- - 4 + 100 (1 row) DROP TABLE public.replication_test_table CASCADE; -- Test rebalance_table_shards, which will in turn test update_shard_placement -- in move mode. +SET citus.shard_replication_factor TO 1; +SET citus.shard_count TO 6; CREATE TABLE rebalance_test_table(int_column int); -SELECT master_create_distributed_table('rebalance_test_table', 'int_column', 'append'); - master_create_distributed_table +SELECT create_distributed_table('rebalance_test_table', 'int_column'); + create_distributed_table --------------------------------------------------------------------- (1 row) +UPDATE pg_dist_partition SET repmodel = 'c' WHERE logicalrelid = 'rebalance_test_table'::regclass; CREATE VIEW table_placements_per_node AS SELECT nodeport, logicalrelid::regclass, count(*) FROM pg_dist_shard_placement NATURAL JOIN pg_dist_shard @@ -566,9 +559,6 @@ LANGUAGE SQL AS $$ SET citus.shard_replication_factor TO 1; - SELECT count(master_create_empty_shard(rel)) - FROM generate_series(1, 6); - SELECT count(master_move_shard_placement(shardid, src.nodename, src.nodeport::int, dst.nodename, dst.nodeport::int, @@ -582,12 +572,7 @@ $$; CALL create_unbalanced_shards('rebalance_test_table'); SET citus.shard_replication_factor TO 2; -- Upload the test data to the shards -\COPY rebalance_test_table FROM PROGRAM 'echo 1' WITH (format 'csv', append_to_shard xxxxx) -\COPY rebalance_test_table FROM PROGRAM 'echo 1' WITH (format 'csv', append_to_shard xxxxx) -\COPY rebalance_test_table FROM PROGRAM 'echo 1' WITH (format 'csv', append_to_shard xxxxx) -\COPY rebalance_test_table FROM PROGRAM 'echo 1' WITH (format 'csv', append_to_shard xxxxx) -\COPY rebalance_test_table FROM PROGRAM 'echo 1' WITH (format 'csv', append_to_shard xxxxx) -\COPY rebalance_test_table FROM PROGRAM 'echo 1' WITH (format 'csv', append_to_shard xxxxx) +INSERT INTO rebalance_test_table SELECT * FROM generate_series(1, 100); -- Verify that there is one node with all placements SELECT * FROM table_placements_per_node; nodeport | logicalrelid | count @@ -772,7 +757,7 @@ SELECT * FROM table_placements_per_node; SELECT count(*) FROM rebalance_test_table; count --------------------------------------------------------------------- - 6 + 100 (1 row) DROP TABLE rebalance_test_table; @@ -863,21 +848,39 @@ INSERT INTO test_schema_support.imbalanced_table_local VALUES(4); CREATE TABLE test_schema_support.imbalanced_table ( id integer not null ); -SELECT master_create_distributed_table('test_schema_support.imbalanced_table', 'id', 'append'); - master_create_distributed_table +SET citus.shard_count = 3; +SET citus.shard_replication_factor TO 1; +SELECT * from master_set_node_property('localhost', :worker_1_port, 'shouldhaveshards', false); + master_set_node_property --------------------------------------------------------------------- (1 row) -SET citus.shard_replication_factor TO 1; -SELECT master_create_empty_shard('test_schema_support.imbalanced_table') AS shardid \gset -COPY test_schema_support.imbalanced_table FROM STDIN WITH (format 'csv', append_to_shard :shardid); +SELECT create_distributed_table('test_schema_support.imbalanced_table', 'id'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +INSERT INTO test_schema_support.imbalanced_table SELECT * FROM generate_series(1, 100); +UPDATE pg_dist_partition SET repmodel = 'c' WHERE logicalrelid = 'test_schema_support.imbalanced_table'::regclass; +SELECT * from master_set_node_property('localhost', :worker_1_port, 'shouldhaveshards', true); + master_set_node_property +--------------------------------------------------------------------- + +(1 row) + +SET citus.shard_count = 4; +-- copy one of the shards to the other node, this is to test that the +-- rebalancer takes into account all copies of a placement SET citus.shard_replication_factor TO 2; -SELECT master_create_empty_shard('test_schema_support.imbalanced_table') AS shardid \gset -COPY test_schema_support.imbalanced_table FROM STDIN WITH (format 'csv', append_to_shard :shardid); +SELECT replicate_table_shards('test_schema_support.imbalanced_table', max_shard_copies := 1, shard_transfer_mode := 'block_writes'); + replicate_table_shards +--------------------------------------------------------------------- + +(1 row) + SET citus.shard_replication_factor TO 1; -SELECT master_create_empty_shard('test_schema_support.imbalanced_table') AS shardid \gset -COPY test_schema_support.imbalanced_table FROM STDIN WITH (format 'csv', append_to_shard :shardid); -- imbalanced_table is now imbalanced -- Shard counts in each node before rebalance SELECT * FROM public.table_placements_per_node; @@ -891,7 +894,7 @@ SELECT * FROM public.table_placements_per_node; SELECT COUNT(*) FROM imbalanced_table; count --------------------------------------------------------------------- - 12 + 100 (1 row) -- Test rebalance operation @@ -915,13 +918,13 @@ SELECT * FROM public.table_placements_per_node; SELECT COUNT(*) FROM imbalanced_table; count --------------------------------------------------------------------- - 12 + 100 (1 row) -DROP TABLE public.shard_rebalancer_test_data; DROP TABLE test_schema_support.imbalanced_table; DROP TABLE test_schema_support.imbalanced_table_local; SET citus.shard_replication_factor TO 1; +SET citus.shard_count = 4; CREATE TABLE colocated_rebalance_test(id integer); CREATE TABLE colocated_rebalance_test2(id integer); SELECT create_distributed_table('colocated_rebalance_test', 'id'); @@ -1073,14 +1076,14 @@ CALL citus_cleanup_orphaned_shards(); select * from pg_dist_placement ORDER BY placementid; placementid | shardid | shardstate | shardlength | groupid --------------------------------------------------------------------- - 150 | 123023 | 1 | 0 | 14 - 153 | 123024 | 1 | 0 | 14 - 156 | 123027 | 1 | 0 | 14 - 157 | 123028 | 1 | 0 | 14 - 158 | 123021 | 1 | 0 | 16 - 159 | 123025 | 1 | 0 | 16 - 160 | 123022 | 1 | 0 | 16 - 161 | 123026 | 1 | 0 | 16 + 146 | 123023 | 1 | 0 | 14 + 149 | 123024 | 1 | 0 | 14 + 152 | 123027 | 1 | 0 | 14 + 153 | 123028 | 1 | 0 | 14 + 154 | 123021 | 1 | 0 | 16 + 155 | 123025 | 1 | 0 | 16 + 156 | 123022 | 1 | 0 | 16 + 157 | 123026 | 1 | 0 | 16 (8 rows) -- Move all shards to worker1 again @@ -2123,8 +2126,7 @@ SET citus.shard_replication_factor TO 2; SELECT replicate_table_shards('dist_table_test_3', max_shard_copies := 4, shard_transfer_mode:='block_writes'); ERROR: Table 'dist_table_test_3' is streaming replicated. Shards of streaming replicated tables cannot be copied -- Mark table as coordinator replicated in order to be able to test replicate_table_shards -UPDATE pg_dist_partition SET repmodel='c' WHERE logicalrelid IN - ('dist_table_test_3'::regclass); +UPDATE pg_dist_partition SET repmodel='c' WHERE logicalrelid = 'dist_table_test_3'::regclass; SELECT replicate_table_shards('dist_table_test_3', max_shard_copies := 4, shard_transfer_mode:='block_writes'); replicate_table_shards --------------------------------------------------------------------- diff --git a/src/test/regress/expected/tableam.out b/src/test/regress/expected/tableam.out index e211e2bf17f..242cb53100a 100644 --- a/src/test/regress/expected/tableam.out +++ b/src/test/regress/expected/tableam.out @@ -5,7 +5,7 @@ SET citus.shard_count TO 4; create schema test_tableam; set search_path to test_tableam; SELECT public.run_command_on_coordinator_and_workers($Q$ - SET citus.enable_ddl_propagation TO off; + SET citus.enable_ddl_propagation TO off; CREATE FUNCTION fake_am_handler(internal) RETURNS table_am_handler AS 'citus' @@ -26,7 +26,7 @@ ALTER EXTENSION citus ADD ACCESS METHOD fake_am; create table test_hash_dist(id int, val int) using fake_am; insert into test_hash_dist values (1, 1); WARNING: fake_tuple_insert -select create_distributed_table('test_hash_dist','id'); +select create_distributed_table('test_hash_dist','id', colocate_with := 'none'); WARNING: fake_scan_getnextslot CONTEXT: SQL statement "SELECT TRUE FROM test_tableam.test_hash_dist LIMIT 1" WARNING: fake_scan_getnextslot @@ -168,16 +168,20 @@ SELECT * FROM master_get_table_ddl_events('test_range_dist'); -- select a.shardid, a.nodeport FROM pg_dist_shard b, pg_dist_shard_placement a -WHERE a.shardid=b.shardid AND logicalrelid = 'test_range_dist'::regclass::oid +WHERE a.shardid=b.shardid AND logicalrelid = 'test_hash_dist'::regclass::oid ORDER BY a.shardid, nodeport; shardid | nodeport --------------------------------------------------------------------- - 60005 | 57637 - 60006 | 57638 -(2 rows) + 60000 | 57637 + 60001 | 57638 + 60002 | 57637 + 60003 | 57638 +(4 rows) +-- Change repmodel to allow master_copy_shard_placement +UPDATE pg_dist_partition SET repmodel='c' WHERE logicalrelid = 'test_hash_dist'::regclass; SELECT master_copy_shard_placement( - get_shard_id_for_distribution_column('test_range_dist', '1'), + get_shard_id_for_distribution_column('test_hash_dist', '1'), 'localhost', :worker_1_port, 'localhost', :worker_2_port, do_repair := false, @@ -189,55 +193,42 @@ SELECT master_copy_shard_placement( select a.shardid, a.nodeport FROM pg_dist_shard b, pg_dist_shard_placement a -WHERE a.shardid=b.shardid AND logicalrelid = 'test_range_dist'::regclass::oid +WHERE a.shardid=b.shardid AND logicalrelid = 'test_hash_dist'::regclass::oid ORDER BY a.shardid, nodeport; shardid | nodeport --------------------------------------------------------------------- - 60005 | 57637 - 60005 | 57638 - 60006 | 57638 -(3 rows) + 60000 | 57637 + 60000 | 57638 + 60001 | 57638 + 60002 | 57637 + 60003 | 57638 +(5 rows) -- verify that data was copied correctly \c - - - :worker_1_port -select * from test_tableam.test_range_dist_60005 ORDER BY id; -WARNING: fake_scan_getnextslot -WARNING: fake_scan_getnextslot -WARNING: fake_scan_getnextslot -WARNING: fake_scan_getnextslot +select * from test_tableam.test_hash_dist_60000 ORDER BY id; WARNING: fake_scan_getnextslot WARNING: fake_scan_getnextslot WARNING: fake_scan_getnextslot id | val --------------------------------------------------------------------- - 0 | 0 1 | 1 - 1 | -1 - 2 | 4 - 3 | 9 - 7 | 9 -(6 rows) + 1 | 1 +(2 rows) \c - - - :worker_2_port -select * from test_tableam.test_range_dist_60005 ORDER BY id; -WARNING: fake_scan_getnextslot -WARNING: fake_scan_getnextslot -WARNING: fake_scan_getnextslot -WARNING: fake_scan_getnextslot +select * from test_tableam.test_hash_dist_60000 ORDER BY id; WARNING: fake_scan_getnextslot WARNING: fake_scan_getnextslot WARNING: fake_scan_getnextslot id | val --------------------------------------------------------------------- - 0 | 0 1 | 1 - 1 | -1 - 2 | 4 - 3 | 9 - 7 | 9 -(6 rows) + 1 | 1 +(2 rows) \c - - - :master_port +set search_path to test_tableam; -- -- Test that partitioned tables work correctly with a fake_am table -- @@ -254,15 +245,15 @@ SELECT create_distributed_table('test_partitioned', 'id'); NOTICE: Copying data from local table... NOTICE: copying the data has completed DETAIL: The local data in the table is no longer visible, but is still on disk. -HINT: To remove the local data, run: SELECT truncate_local_data_after_distributing_table($$public.test_partitioned_p1$$) +HINT: To remove the local data, run: SELECT truncate_local_data_after_distributing_table($$test_tableam.test_partitioned_p1$$) WARNING: fake_scan_getnextslot -CONTEXT: SQL statement "SELECT TRUE FROM public.test_partitioned_p2 LIMIT 1" +CONTEXT: SQL statement "SELECT TRUE FROM test_tableam.test_partitioned_p2 LIMIT 1" WARNING: fake_scan_getnextslot NOTICE: Copying data from local table... WARNING: fake_scan_getnextslot NOTICE: copying the data has completed DETAIL: The local data in the table is no longer visible, but is still on disk. -HINT: To remove the local data, run: SELECT truncate_local_data_after_distributing_table($$public.test_partitioned_p2$$) +HINT: To remove the local data, run: SELECT truncate_local_data_after_distributing_table($$test_tableam.test_partitioned_p2$$) create_distributed_table --------------------------------------------------------------------- diff --git a/src/test/regress/multi_schedule b/src/test/regress/multi_schedule index f9136008f2e..58cfc87c835 100644 --- a/src/test/regress/multi_schedule +++ b/src/test/regress/multi_schedule @@ -58,7 +58,8 @@ test: cte_inline recursive_view_local_table values sequences_with_different_type test: pg13 pg12 # run pg14 sequentially as it syncs metadata test: pg14 -test: tableam drop_column_partitioned_table +test: drop_column_partitioned_table +test: tableam # ---------- # Miscellaneous tests to check our query planning behavior diff --git a/src/test/regress/sql/failure_offline_move_shard_placement.sql b/src/test/regress/sql/failure_offline_move_shard_placement.sql index 81683398b9f..1b02da1e9f4 100644 --- a/src/test/regress/sql/failure_offline_move_shard_placement.sql +++ b/src/test/regress/sql/failure_offline_move_shard_placement.sql @@ -57,12 +57,12 @@ SELECT master_move_shard_placement(201, 'localhost', :worker_1_port, 'localhost' SELECT citus.mitmproxy('conn.onQuery(query="CREATE TABLE move_shard_offline.t").cancel(' || :pid || ')'); SELECT master_move_shard_placement(201, 'localhost', :worker_1_port, 'localhost', :worker_2_proxy_port, 'block_writes'); --- failure on blocking append_table_to_shard operation on target node -SELECT citus.mitmproxy('conn.onQuery(query="worker_append_table_to_shard").kill()'); +-- failure on blocking COPY operation on target node +SELECT citus.mitmproxy('conn.onQuery(query="COPY").kill()'); SELECT master_move_shard_placement(201, 'localhost', :worker_1_port, 'localhost', :worker_2_proxy_port, 'block_writes'); --- cancellation on blocking append_table_to_shard operation on target node -SELECT citus.mitmproxy('conn.onQuery(query="worker_append_table_to_shard").cancel(' || :pid || ')'); +-- cancellation on blocking COPY operation on target node +SELECT citus.mitmproxy('conn.onQuery(query="COPY").cancel(' || :pid || ')'); SELECT master_move_shard_placement(201, 'localhost', :worker_1_port, 'localhost', :worker_2_proxy_port, 'block_writes'); -- failure on adding constraints on target node @@ -73,14 +73,6 @@ SELECT master_move_shard_placement(201, 'localhost', :worker_1_port, 'localhost' SELECT citus.mitmproxy('conn.onQuery(query="ADD CONSTRAINT").cancel(' || :pid || ')'); SELECT master_move_shard_placement(201, 'localhost', :worker_1_port, 'localhost', :worker_2_proxy_port, 'block_writes'); --- failure on CopyData operation on source node -SELECT citus.mitmproxy('conn.onCopyData().kill()'); -SELECT master_move_shard_placement(200, 'localhost', :worker_2_proxy_port, 'localhost', :worker_1_port, 'block_writes'); - --- cancellation on CopyData operation on source node -SELECT citus.mitmproxy('conn.onCopyData().cancel(' || :pid || ')'); -SELECT master_move_shard_placement(200, 'localhost', :worker_2_proxy_port, 'localhost', :worker_1_port, 'block_writes'); - CALL citus_cleanup_orphaned_shards(); -- Verify that the shard is not moved and the number of rows are still 100k diff --git a/src/test/regress/sql/ignoring_orphaned_shards.sql b/src/test/regress/sql/ignoring_orphaned_shards.sql index 774d7cd19bd..447e629677f 100644 --- a/src/test/regress/sql/ignoring_orphaned_shards.sql +++ b/src/test/regress/sql/ignoring_orphaned_shards.sql @@ -112,40 +112,41 @@ INSERT INTO rep1 VALUES (1); ROLLBACK; -SET citus.shard_replication_factor TO 1; -SET citus.next_shard_id TO 92448500; -CREATE TABLE range1(id int); -SELECT create_distributed_table('range1', 'id', 'range'); -CALL public.create_range_partitioned_shards('range1', '{0,3}','{2,5}'); - --- Move shard placement and clean it up -SELECT citus_move_shard_placement(92448500, 'localhost', :worker_2_port, 'localhost', :worker_1_port, 'block_writes'); -CALL citus_cleanup_orphaned_shards(); -SELECT shardid, shardstate, nodeport FROM pg_dist_shard_placement WHERE shardid = 92448300 ORDER BY placementid; - -SET citus.next_shard_id TO 92448600; -CREATE TABLE range2(id int); -SELECT create_distributed_table('range2', 'id', 'range'); -CALL public.create_range_partitioned_shards('range2', '{0,3}','{2,5}'); - --- Mark tables co-located -UPDATE pg_dist_partition SET colocationid = 30001 -WHERE logicalrelid = 'range1'::regclass OR logicalrelid = 'range2'::regclass; - --- Move shard placement and DON'T clean it up, now range1 and range2 are --- colocated, but only range2 has an orphaned shard. -SELECT citus_move_shard_placement(92448600, 'localhost', :worker_2_port, 'localhost', :worker_1_port, 'block_writes'); -SELECT shardid, shardstate, nodeport FROM pg_dist_shard_placement WHERE shardid = 92448600 ORDER BY placementid; - --- Make sure co-located join works -SELECT * FROM range1 JOIN range2 ON range1.id = range2.id; - --- Make sure we can create a foreign key on community edition, because --- replication factor is 1 -ALTER TABLE range1 -ADD CONSTRAINT range1_ref_fk -FOREIGN KEY (id) -REFERENCES ref(id); +-- TODO: Re-enable this test once we metadatasync range partioned tables +-- SET citus.shard_replication_factor TO 1; +-- SET citus.next_shard_id TO 92448500; +-- CREATE TABLE range1(id int); +-- SELECT create_distributed_table('range1', 'id', 'range'); +-- CALL public.create_range_partitioned_shards('range1', '{0,3}','{2,5}'); +-- +-- -- Move shard placement and clean it up +-- SELECT citus_move_shard_placement(92448500, 'localhost', :worker_2_port, 'localhost', :worker_1_port, 'block_writes'); +-- CALL citus_cleanup_orphaned_shards(); +-- SELECT shardid, shardstate, nodeport FROM pg_dist_shard_placement WHERE shardid = 92448300 ORDER BY placementid; +-- +-- SET citus.next_shard_id TO 92448600; +-- CREATE TABLE range2(id int); +-- SELECT create_distributed_table('range2', 'id', 'range'); +-- CALL public.create_range_partitioned_shards('range2', '{0,3}','{2,5}'); +-- +-- -- Mark tables co-located +-- UPDATE pg_dist_partition SET colocationid = 30001 +-- WHERE logicalrelid = 'range1'::regclass OR logicalrelid = 'range2'::regclass; +-- +-- -- Move shard placement and DON'T clean it up, now range1 and range2 are +-- -- colocated, but only range2 has an orphaned shard. +-- SELECT citus_move_shard_placement(92448600, 'localhost', :worker_2_port, 'localhost', :worker_1_port, 'block_writes'); +-- SELECT shardid, shardstate, nodeport FROM pg_dist_shard_placement WHERE shardid = 92448600 ORDER BY placementid; +-- +-- -- Make sure co-located join works +-- SELECT * FROM range1 JOIN range2 ON range1.id = range2.id; +-- +-- -- Make sure we can create a foreign key on community edition, because +-- -- replication factor is 1 +-- ALTER TABLE range1 +-- ADD CONSTRAINT range1_ref_fk +-- FOREIGN KEY (id) +-- REFERENCES ref(id); SET client_min_messages TO WARNING; DROP SCHEMA ignoring_orphaned_shards CASCADE; diff --git a/src/test/regress/sql/shard_rebalancer.sql b/src/test/regress/sql/shard_rebalancer.sql index b16356a4a96..0d482998bac 100644 --- a/src/test/regress/sql/shard_rebalancer.sql +++ b/src/test/regress/sql/shard_rebalancer.sql @@ -291,24 +291,22 @@ SELECT unnest(shard_placement_replication_array( 2 )); --- Ensure that shard_replication_factor is 2 during replicate_table_shards --- and rebalance_table_shards tests - -SET citus.shard_replication_factor TO 2; - --- Turn off NOTICE messages - SET client_min_messages TO WARNING; --- Create a single-row test data for shard rebalancer test shards - -CREATE TABLE shard_rebalancer_test_data AS SELECT 1::int as int_column; - --- Test replicate_table_shards, which will in turn test update_shard_placement --- in copy mode. - +set citus.shard_count = 4; +-- Create a distributed table with all shards on a single node, so that we can +-- use this as an under-replicated +SET citus.shard_replication_factor TO 1; +SELECT * from master_set_node_property('localhost', :worker_1_port, 'shouldhaveshards', false); CREATE TABLE replication_test_table(int_column int); -SELECT master_create_distributed_table('replication_test_table', 'int_column', 'append'); +SELECT create_distributed_table('replication_test_table', 'int_column'); +UPDATE pg_dist_partition SET repmodel = 'c' WHERE logicalrelid = 'replication_test_table'::regclass; +INSERT INTO replication_test_table SELECT * FROM generate_series(1, 100); + +-- Ensure that shard_replication_factor is 2 during replicate_table_shards +-- and rebalance_table_shards tests +SET citus.shard_replication_factor TO 2; +SELECT * from master_set_node_property('localhost', :worker_1_port, 'shouldhaveshards', true); CREATE VIEW replication_test_table_placements_per_node AS SELECT count(*) FROM pg_dist_shard_placement NATURAL JOIN pg_dist_shard @@ -317,31 +315,12 @@ CREATE VIEW replication_test_table_placements_per_node AS GROUP BY nodename, nodeport ORDER BY nodename, nodeport; --- Create four shards with replication factor 2, and delete the placements --- with smaller port number to simulate under-replicated shards. - -SELECT count(master_create_empty_shard('replication_test_table')) - FROM generate_series(1, 4); - -DELETE FROM pg_dist_shard_placement WHERE placementid in ( - SELECT pg_dist_shard_placement.placementid - FROM pg_dist_shard_placement NATURAL JOIN pg_dist_shard - WHERE logicalrelid = 'replication_test_table'::regclass - AND (nodename, nodeport) = (SELECT nodename, nodeport FROM pg_dist_shard_placement - ORDER BY nodename, nodeport limit 1) -); - --- Upload the test data to the shards - -\COPY replication_test_table FROM PROGRAM 'echo 1' WITH (format 'csv', append_to_shard 123000) -\COPY replication_test_table FROM PROGRAM 'echo 1' WITH (format 'csv', append_to_shard 123001) -\COPY replication_test_table FROM PROGRAM 'echo 1' WITH (format 'csv', append_to_shard 123002) -\COPY replication_test_table FROM PROGRAM 'echo 1' WITH (format 'csv', append_to_shard 123003) - --- Verify that there is one node with all placements SELECT * FROM replication_test_table_placements_per_node; +-- Test replicate_table_shards, which will in turn test update_shard_placement +-- in copy mode. + -- Check excluded_shard_list by excluding three shards with smaller ids SELECT replicate_table_shards('replication_test_table', @@ -386,8 +365,11 @@ DROP TABLE public.replication_test_table CASCADE; -- Test rebalance_table_shards, which will in turn test update_shard_placement -- in move mode. +SET citus.shard_replication_factor TO 1; +SET citus.shard_count TO 6; CREATE TABLE rebalance_test_table(int_column int); -SELECT master_create_distributed_table('rebalance_test_table', 'int_column', 'append'); +SELECT create_distributed_table('rebalance_test_table', 'int_column'); +UPDATE pg_dist_partition SET repmodel = 'c' WHERE logicalrelid = 'rebalance_test_table'::regclass; CREATE VIEW table_placements_per_node AS SELECT nodeport, logicalrelid::regclass, count(*) @@ -404,9 +386,6 @@ LANGUAGE SQL AS $$ SET citus.shard_replication_factor TO 1; - SELECT count(master_create_empty_shard(rel)) - FROM generate_series(1, 6); - SELECT count(master_move_shard_placement(shardid, src.nodename, src.nodeport::int, dst.nodename, dst.nodeport::int, @@ -424,12 +403,7 @@ SET citus.shard_replication_factor TO 2; -- Upload the test data to the shards -\COPY rebalance_test_table FROM PROGRAM 'echo 1' WITH (format 'csv', append_to_shard 123004) -\COPY rebalance_test_table FROM PROGRAM 'echo 1' WITH (format 'csv', append_to_shard 123005) -\COPY rebalance_test_table FROM PROGRAM 'echo 1' WITH (format 'csv', append_to_shard 123006) -\COPY rebalance_test_table FROM PROGRAM 'echo 1' WITH (format 'csv', append_to_shard 123007) -\COPY rebalance_test_table FROM PROGRAM 'echo 1' WITH (format 'csv', append_to_shard 123008) -\COPY rebalance_test_table FROM PROGRAM 'echo 1' WITH (format 'csv', append_to_shard 123009) +INSERT INTO rebalance_test_table SELECT * FROM generate_series(1, 100); -- Verify that there is one node with all placements @@ -604,34 +578,20 @@ CREATE TABLE test_schema_support.imbalanced_table ( id integer not null ); -SELECT master_create_distributed_table('test_schema_support.imbalanced_table', 'id', 'append'); - +SET citus.shard_count = 3; SET citus.shard_replication_factor TO 1; -SELECT master_create_empty_shard('test_schema_support.imbalanced_table') AS shardid \gset -COPY test_schema_support.imbalanced_table FROM STDIN WITH (format 'csv', append_to_shard :shardid); -1 -2 -3 -4 -\. +SELECT * from master_set_node_property('localhost', :worker_1_port, 'shouldhaveshards', false); +SELECT create_distributed_table('test_schema_support.imbalanced_table', 'id'); +INSERT INTO test_schema_support.imbalanced_table SELECT * FROM generate_series(1, 100); +UPDATE pg_dist_partition SET repmodel = 'c' WHERE logicalrelid = 'test_schema_support.imbalanced_table'::regclass; +SELECT * from master_set_node_property('localhost', :worker_1_port, 'shouldhaveshards', true); +SET citus.shard_count = 4; +-- copy one of the shards to the other node, this is to test that the +-- rebalancer takes into account all copies of a placement SET citus.shard_replication_factor TO 2; -SELECT master_create_empty_shard('test_schema_support.imbalanced_table') AS shardid \gset -COPY test_schema_support.imbalanced_table FROM STDIN WITH (format 'csv', append_to_shard :shardid); -1 -2 -3 -4 -\. - +SELECT replicate_table_shards('test_schema_support.imbalanced_table', max_shard_copies := 1, shard_transfer_mode := 'block_writes'); SET citus.shard_replication_factor TO 1; -SELECT master_create_empty_shard('test_schema_support.imbalanced_table') AS shardid \gset -COPY test_schema_support.imbalanced_table FROM STDIN WITH (format 'csv', append_to_shard :shardid); -1 -2 -3 -4 -\. -- imbalanced_table is now imbalanced @@ -652,11 +612,11 @@ SELECT * FROM public.table_placements_per_node; -- Row count in imbalanced table after rebalance SELECT COUNT(*) FROM imbalanced_table; -DROP TABLE public.shard_rebalancer_test_data; DROP TABLE test_schema_support.imbalanced_table; DROP TABLE test_schema_support.imbalanced_table_local; SET citus.shard_replication_factor TO 1; +SET citus.shard_count = 4; CREATE TABLE colocated_rebalance_test(id integer); CREATE TABLE colocated_rebalance_test2(id integer); @@ -1276,8 +1236,7 @@ SET citus.shard_replication_factor TO 2; SELECT replicate_table_shards('dist_table_test_3', max_shard_copies := 4, shard_transfer_mode:='block_writes'); -- Mark table as coordinator replicated in order to be able to test replicate_table_shards -UPDATE pg_dist_partition SET repmodel='c' WHERE logicalrelid IN - ('dist_table_test_3'::regclass); +UPDATE pg_dist_partition SET repmodel='c' WHERE logicalrelid = 'dist_table_test_3'::regclass; SELECT replicate_table_shards('dist_table_test_3', max_shard_copies := 4, shard_transfer_mode:='block_writes'); diff --git a/src/test/regress/sql/tableam.sql b/src/test/regress/sql/tableam.sql index 47845492a17..f0ed5cfca0f 100644 --- a/src/test/regress/sql/tableam.sql +++ b/src/test/regress/sql/tableam.sql @@ -26,7 +26,7 @@ ALTER EXTENSION citus ADD ACCESS METHOD fake_am; create table test_hash_dist(id int, val int) using fake_am; insert into test_hash_dist values (1, 1); -select create_distributed_table('test_hash_dist','id'); +select create_distributed_table('test_hash_dist','id', colocate_with := 'none'); select * from test_hash_dist; insert into test_hash_dist values (1, 1); @@ -86,11 +86,14 @@ SELECT * FROM master_get_table_ddl_events('test_range_dist'); select a.shardid, a.nodeport FROM pg_dist_shard b, pg_dist_shard_placement a -WHERE a.shardid=b.shardid AND logicalrelid = 'test_range_dist'::regclass::oid +WHERE a.shardid=b.shardid AND logicalrelid = 'test_hash_dist'::regclass::oid ORDER BY a.shardid, nodeport; +-- Change repmodel to allow master_copy_shard_placement +UPDATE pg_dist_partition SET repmodel='c' WHERE logicalrelid = 'test_hash_dist'::regclass; + SELECT master_copy_shard_placement( - get_shard_id_for_distribution_column('test_range_dist', '1'), + get_shard_id_for_distribution_column('test_hash_dist', '1'), 'localhost', :worker_1_port, 'localhost', :worker_2_port, do_repair := false, @@ -98,19 +101,21 @@ SELECT master_copy_shard_placement( select a.shardid, a.nodeport FROM pg_dist_shard b, pg_dist_shard_placement a -WHERE a.shardid=b.shardid AND logicalrelid = 'test_range_dist'::regclass::oid +WHERE a.shardid=b.shardid AND logicalrelid = 'test_hash_dist'::regclass::oid ORDER BY a.shardid, nodeport; -- verify that data was copied correctly \c - - - :worker_1_port -select * from test_tableam.test_range_dist_60005 ORDER BY id; +select * from test_tableam.test_hash_dist_60000 ORDER BY id; \c - - - :worker_2_port -select * from test_tableam.test_range_dist_60005 ORDER BY id; +select * from test_tableam.test_hash_dist_60000 ORDER BY id; \c - - - :master_port +set search_path to test_tableam; + -- -- Test that partitioned tables work correctly with a fake_am table --