From 0c10107e4f86a8889b6b215de76b0e51f9a31926 Mon Sep 17 00:00:00 2001 From: Jelte Fennema Date: Wed, 27 Jul 2022 16:30:52 +0200 Subject: [PATCH] Use shard split copy code for blocking shard moves The new shard copy code that was created for shard splits has some advantages over the old code. This one uses binary copy when possible to make the copy faster. When doing a shard move using `block_writes` it now uses this better copy logic. --- .../distributed/operations/repair_shards.c | 64 +++++++- .../worker_copy_table_to_node_udf.c | 65 ++++++++ .../distributed/sql/citus--11.0-3--11.1-1.sql | 1 + .../sql/downgrades/citus--11.1-1--11.0-3.sql | 4 + .../udfs/worker_copy_table_to_node/11.1-1.sql | 8 + .../udfs/worker_copy_table_to_node/latest.sql | 8 + .../distributed/utils/reference_table_utils.c | 9 ++ .../failure_offline_move_shard_placement.out | 28 +--- src/test/regress/expected/multi_extension.out | 3 +- .../regress/expected/shard_rebalancer.out | 140 +++++++++--------- src/test/regress/expected/tableam.out | 65 ++++---- .../expected/upgrade_list_citus_objects.out | 3 +- .../expected/worker_copy_table_to_node.out | 81 ++++++++++ src/test/regress/multi_schedule | 3 +- src/test/regress/operations_schedule | 1 + .../failure_offline_move_shard_placement.sql | 16 +- src/test/regress/sql/shard_rebalancer.sql | 107 +++++-------- src/test/regress/sql/tableam.sql | 17 ++- .../regress/sql/worker_copy_table_to_node.sql | 49 ++++++ 19 files changed, 441 insertions(+), 231 deletions(-) create mode 100644 src/backend/distributed/operations/worker_copy_table_to_node_udf.c create mode 100644 src/backend/distributed/sql/udfs/worker_copy_table_to_node/11.1-1.sql create mode 100644 src/backend/distributed/sql/udfs/worker_copy_table_to_node/latest.sql create mode 100644 src/test/regress/expected/worker_copy_table_to_node.out create mode 100644 src/test/regress/sql/worker_copy_table_to_node.sql diff --git a/src/backend/distributed/operations/repair_shards.c b/src/backend/distributed/operations/repair_shards.c index 26928fd3af2..6f5443ba304 100644 --- a/src/backend/distributed/operations/repair_shards.c +++ b/src/backend/distributed/operations/repair_shards.c @@ -20,6 +20,7 @@ #include "access/htup_details.h" #include "catalog/pg_class.h" #include "catalog/pg_enum.h" +#include "distributed/adaptive_executor.h" #include "distributed/citus_ruleutils.h" #include "distributed/colocation_utils.h" #include "distributed/commands.h" @@ -38,6 +39,7 @@ #include "distributed/remote_commands.h" #include "distributed/resource_lock.h" #include "distributed/shard_rebalancer.h" +#include "distributed/shard_split.h" #include "distributed/worker_manager.h" #include "distributed/worker_protocol.h" #include "distributed/worker_transaction.h" @@ -129,6 +131,7 @@ static List * PostLoadShardCreationCommandList(ShardInterval *shardInterval, int32 sourceNodePort); static ShardCommandList * CreateShardCommandList(ShardInterval *shardInterval, List *ddlCommandList); +static char * CreateShardCopyCommand(ShardInterval *shard, WorkerNode *targetNode); /* declarations for dynamic loading */ @@ -1180,6 +1183,9 @@ CopyShardTablesViaBlockWrites(List *shardIntervalList, char *sourceNodeName, ALLOCSET_DEFAULT_SIZES); MemoryContext oldContext = MemoryContextSwitchTo(localContext); + WorkerNode *sourceNode = FindWorkerNode(sourceNodeName, sourceNodePort); + WorkerNode *targetNode = FindWorkerNode(targetNodeName, targetNodePort); + /* iterate through the colocated shards and copy each */ ShardInterval *shardInterval = NULL; foreach_ptr(shardInterval, shardIntervalList) @@ -1199,9 +1205,12 @@ CopyShardTablesViaBlockWrites(List *shardIntervalList, char *sourceNodeName, char *tableOwner = TableOwner(shardInterval->relationId); SendCommandListToWorkerOutsideTransaction(targetNodeName, targetNodePort, tableOwner, ddlCommandList); + } - ddlCommandList = NIL; - + int taskId = 0; + List *copyTaskList = NIL; + foreach_ptr(shardInterval, shardIntervalList) + { /* * Skip copying data for partitioned tables, because they contain no * data themselves. Their partitions do contain data, but those are @@ -1209,13 +1218,35 @@ CopyShardTablesViaBlockWrites(List *shardIntervalList, char *sourceNodeName, */ if (!PartitionedTable(shardInterval->relationId)) { - ddlCommandList = CopyShardContentsCommandList(shardInterval, sourceNodeName, - sourceNodePort); + char *copyCommand = CreateShardCopyCommand( + shardInterval, targetNode); + + Task *copyTask = CreateBasicTask( + INVALID_JOB_ID, + taskId, + READ_TASK, + copyCommand); + + ShardPlacement *taskPlacement = CitusMakeNode(ShardPlacement); + SetPlacementNodeMetadata(taskPlacement, sourceNode); + + copyTask->taskPlacementList = list_make1(taskPlacement); + + copyTaskList = lappend(copyTaskList, copyTask); + taskId++; } - ddlCommandList = list_concat( - ddlCommandList, + } + + ExecuteTaskListOutsideTransaction(ROW_MODIFY_NONE, copyTaskList, + MaxAdaptiveExecutorPoolSize, + NULL /* jobIdList (ignored by API implementation) */); + + foreach_ptr(shardInterval, shardIntervalList) + { + List *ddlCommandList = PostLoadShardCreationCommandList(shardInterval, sourceNodeName, - sourceNodePort)); + sourceNodePort); + char *tableOwner = TableOwner(shardInterval->relationId); SendCommandListToWorkerOutsideTransaction(targetNodeName, targetNodePort, tableOwner, ddlCommandList); @@ -1278,6 +1309,25 @@ CopyShardTablesViaBlockWrites(List *shardIntervalList, char *sourceNodeName, } +/* + * CreateShardCopyCommand constructs the command to copy a shard to another + * worker node. This command needs to be run on the node wher you want to copy + * the shard from. + */ +static char * +CreateShardCopyCommand(ShardInterval *shard, + WorkerNode *targetNode) +{ + char *shardName = ConstructQualifiedShardName(shard); + StringInfo query = makeStringInfo(); + appendStringInfo(query, + "SELECT pg_catalog.worker_copy_table_to_node(%s::regclass, %u);", + quote_literal_cstr(shardName), + targetNode->nodeId); + return query->data; +} + + /* * CopyPartitionShardsCommandList gets a shardInterval which is a shard that * belongs to partitioned table (this is asserted). diff --git a/src/backend/distributed/operations/worker_copy_table_to_node_udf.c b/src/backend/distributed/operations/worker_copy_table_to_node_udf.c new file mode 100644 index 00000000000..46391160c57 --- /dev/null +++ b/src/backend/distributed/operations/worker_copy_table_to_node_udf.c @@ -0,0 +1,65 @@ +/*------------------------------------------------------------------------- + * + * worker_copy_table_to_node_udf.c + * + * This file implements the worker_copy_table_to_node UDF. This UDF can be + * used to copy the data in a shard (or other table) from one worker node to + * another. + * + * Copyright (c) Citus Data, Inc. + * + *------------------------------------------------------------------------- + */ + +#include "postgres.h" + +#include "utils/builtins.h" +#include "utils/lsyscache.h" +#include "distributed/citus_ruleutils.h" +#include "distributed/metadata_cache.h" +#include "distributed/multi_executor.h" +#include "distributed/worker_shard_copy.h" + +PG_FUNCTION_INFO_V1(worker_copy_table_to_node); + +/* + * worker_copy_table_to_node copies a shard from this worker to another worker + * + * SQL signature: + * + * worker_copy_table_to_node( + * source_table regclass, + * target_node_id integer + * ) RETURNS VOID + */ +Datum +worker_copy_table_to_node(PG_FUNCTION_ARGS) +{ + Oid relationId = PG_GETARG_OID(0); + uint32_t targetNodeId = PG_GETARG_INT32(1); + + Oid schemaOid = get_rel_namespace(relationId); + char *relationSchemaName = get_namespace_name(schemaOid); + char *relationName = get_rel_name(relationId); + char *relationQualifiedName = quote_qualified_identifier( + relationSchemaName, + relationName); + + EState *executor = CreateExecutorState(); + DestReceiver *destReceiver = CreateShardCopyDestReceiver( + executor, + list_make2(relationSchemaName, relationName), + targetNodeId); + + StringInfo selectShardQueryForCopy = makeStringInfo(); + appendStringInfo(selectShardQueryForCopy, + "SELECT * FROM %s;", relationQualifiedName); + + ParamListInfo params = NULL; + ExecuteQueryStringIntoDestReceiver(selectShardQueryForCopy->data, params, + destReceiver); + + FreeExecutorState(executor); + + PG_RETURN_VOID(); +} diff --git a/src/backend/distributed/sql/citus--11.0-3--11.1-1.sql b/src/backend/distributed/sql/citus--11.0-3--11.1-1.sql index f8b9563788b..e71f9362bbb 100644 --- a/src/backend/distributed/sql/citus--11.0-3--11.1-1.sql +++ b/src/backend/distributed/sql/citus--11.0-3--11.1-1.sql @@ -69,3 +69,4 @@ DROP FUNCTION pg_catalog.get_all_active_transactions(OUT datid oid, OUT process_ #include "udfs/get_all_active_transactions/11.1-1.sql" #include "udfs/citus_split_shard_by_split_points/11.1-1.sql" #include "udfs/worker_split_copy/11.1-1.sql" +#include "udfs/worker_copy_table_to_node/11.1-1.sql" diff --git a/src/backend/distributed/sql/downgrades/citus--11.1-1--11.0-3.sql b/src/backend/distributed/sql/downgrades/citus--11.1-1--11.0-3.sql index 26430a9f683..7261a31db2b 100644 --- a/src/backend/distributed/sql/downgrades/citus--11.1-1--11.0-3.sql +++ b/src/backend/distributed/sql/downgrades/citus--11.1-1--11.0-3.sql @@ -73,6 +73,10 @@ DROP FUNCTION pg_catalog.worker_split_copy( splitCopyInfos pg_catalog.split_copy_info[]); DROP TYPE pg_catalog.split_copy_info; +DROP FUNCTION pg_catalog.worker_copy_table_to_node( + source_table regclass, + target_node_id integer); + DROP FUNCTION pg_catalog.get_all_active_transactions(OUT datid oid, OUT process_id int, OUT initiator_node_identifier int4, OUT worker_query BOOL, OUT transaction_number int8, OUT transaction_stamp timestamptz, OUT global_pid int8); diff --git a/src/backend/distributed/sql/udfs/worker_copy_table_to_node/11.1-1.sql b/src/backend/distributed/sql/udfs/worker_copy_table_to_node/11.1-1.sql new file mode 100644 index 00000000000..ebe093dee33 --- /dev/null +++ b/src/backend/distributed/sql/udfs/worker_copy_table_to_node/11.1-1.sql @@ -0,0 +1,8 @@ +CREATE OR REPLACE FUNCTION pg_catalog.worker_copy_table_to_node( + source_table regclass, + target_node_id integer) +RETURNS void +LANGUAGE C STRICT +AS 'MODULE_PATHNAME', $$worker_copy_table_to_node$$; +COMMENT ON FUNCTION pg_catalog.worker_copy_table_to_node(regclass, integer) + IS 'Perform copy of a shard'; diff --git a/src/backend/distributed/sql/udfs/worker_copy_table_to_node/latest.sql b/src/backend/distributed/sql/udfs/worker_copy_table_to_node/latest.sql new file mode 100644 index 00000000000..ebe093dee33 --- /dev/null +++ b/src/backend/distributed/sql/udfs/worker_copy_table_to_node/latest.sql @@ -0,0 +1,8 @@ +CREATE OR REPLACE FUNCTION pg_catalog.worker_copy_table_to_node( + source_table regclass, + target_node_id integer) +RETURNS void +LANGUAGE C STRICT +AS 'MODULE_PATHNAME', $$worker_copy_table_to_node$$; +COMMENT ON FUNCTION pg_catalog.worker_copy_table_to_node(regclass, integer) + IS 'Perform copy of a shard'; diff --git a/src/backend/distributed/utils/reference_table_utils.c b/src/backend/distributed/utils/reference_table_utils.c index e271d0ceba1..e0cab96d69d 100644 --- a/src/backend/distributed/utils/reference_table_utils.c +++ b/src/backend/distributed/utils/reference_table_utils.c @@ -207,6 +207,15 @@ EnsureReferenceTablesExistOnAllNodesExtended(char transferMode) CopyShardPlacementToWorkerNodeQuery(sourceShardPlacement, newWorkerNode, transferMode); + + /* + * The placement copy command uses distributed execution to copy + * the shard. This is allowed when indicating that the backend is a + * rebalancer backend. + */ + ExecuteCriticalRemoteCommand(connection, + "SET LOCAL application_name TO " + CITUS_REBALANCER_NAME); ExecuteCriticalRemoteCommand(connection, placementCopyCommand->data); RemoteTransactionCommit(connection); } diff --git a/src/test/regress/expected/failure_offline_move_shard_placement.out b/src/test/regress/expected/failure_offline_move_shard_placement.out index a6ecee18e59..bdd45449bdc 100644 --- a/src/test/regress/expected/failure_offline_move_shard_placement.out +++ b/src/test/regress/expected/failure_offline_move_shard_placement.out @@ -91,8 +91,8 @@ SELECT citus.mitmproxy('conn.onQuery(query="CREATE TABLE move_shard_offline.t"). SELECT master_move_shard_placement(201, 'localhost', :worker_1_port, 'localhost', :worker_2_proxy_port, 'block_writes'); ERROR: canceling statement due to user request --- failure on blocking append_table_to_shard operation on target node -SELECT citus.mitmproxy('conn.onQuery(query="worker_append_table_to_shard").kill()'); +-- failure on blocking COPY operation on target node +SELECT citus.mitmproxy('conn.onQuery(query="COPY").kill()'); mitmproxy --------------------------------------------------------------------- @@ -101,8 +101,9 @@ SELECT citus.mitmproxy('conn.onQuery(query="worker_append_table_to_shard").kill( SELECT master_move_shard_placement(201, 'localhost', :worker_1_port, 'localhost', :worker_2_proxy_port, 'block_writes'); ERROR: connection not open CONTEXT: while executing command on localhost:xxxxx --- cancellation on blocking append_table_to_shard operation on target node -SELECT citus.mitmproxy('conn.onQuery(query="worker_append_table_to_shard").cancel(' || :pid || ')'); +while executing command on localhost:xxxxx +-- cancellation on blocking COPY operation on target node +SELECT citus.mitmproxy('conn.onQuery(query="COPY").cancel(' || :pid || ')'); mitmproxy --------------------------------------------------------------------- @@ -129,25 +130,6 @@ SELECT citus.mitmproxy('conn.onQuery(query="ADD CONSTRAINT").cancel(' || :pid || SELECT master_move_shard_placement(201, 'localhost', :worker_1_port, 'localhost', :worker_2_proxy_port, 'block_writes'); ERROR: canceling statement due to user request --- failure on CopyData operation on source node -SELECT citus.mitmproxy('conn.onCopyData().kill()'); - mitmproxy ---------------------------------------------------------------------- - -(1 row) - -SELECT master_move_shard_placement(200, 'localhost', :worker_2_proxy_port, 'localhost', :worker_1_port, 'block_writes'); -ERROR: could not copy table "t_200" from "localhost:xxxxx" -CONTEXT: while executing command on localhost:xxxxx --- cancellation on CopyData operation on source node -SELECT citus.mitmproxy('conn.onCopyData().cancel(' || :pid || ')'); - mitmproxy ---------------------------------------------------------------------- - -(1 row) - -SELECT master_move_shard_placement(200, 'localhost', :worker_2_proxy_port, 'localhost', :worker_1_port, 'block_writes'); -ERROR: canceling statement due to user request CALL citus_cleanup_orphaned_shards(); -- Verify that the shard is not moved and the number of rows are still 100k SELECT citus.mitmproxy('conn.allow()'); diff --git a/src/test/regress/expected/multi_extension.out b/src/test/regress/expected/multi_extension.out index c2624894cf0..0067bdbadad 100644 --- a/src/test/regress/expected/multi_extension.out +++ b/src/test/regress/expected/multi_extension.out @@ -1097,10 +1097,11 @@ SELECT * FROM multi_extension.print_extension_changes(); table columnar.stripe | | function citus_locks() SETOF record | function citus_split_shard_by_split_points(bigint,text[],integer[],citus.shard_transfer_mode) void + | function worker_copy_table_to_node(regclass,integer) void | function worker_split_copy(bigint,split_copy_info[]) void | type split_copy_info | view citus_locks -(26 rows) +(27 rows) DROP TABLE multi_extension.prev_objects, multi_extension.extension_diff; -- show running version diff --git a/src/test/regress/expected/shard_rebalancer.out b/src/test/regress/expected/shard_rebalancer.out index b9d928acbc2..aab94dad101 100644 --- a/src/test/regress/expected/shard_rebalancer.out +++ b/src/test/regress/expected/shard_rebalancer.out @@ -416,18 +416,31 @@ SELECT unnest(shard_placement_replication_array( 2 )); ERROR: could not find a target for shard xxxxx +SET client_min_messages TO WARNING; +set citus.shard_count = 4; +-- Create a distributed table with all shards on a single node, so that we can +-- use this as an under-replicated +SET citus.shard_replication_factor TO 1; +SELECT * from master_set_node_property('localhost', :worker_1_port, 'shouldhaveshards', false); + master_set_node_property +--------------------------------------------------------------------- + +(1 row) + +CREATE TABLE replication_test_table(int_column int); +SELECT create_distributed_table('replication_test_table', 'int_column'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +UPDATE pg_dist_partition SET repmodel = 'c' WHERE logicalrelid = 'replication_test_table'::regclass; +INSERT INTO replication_test_table SELECT * FROM generate_series(1, 100); -- Ensure that shard_replication_factor is 2 during replicate_table_shards -- and rebalance_table_shards tests SET citus.shard_replication_factor TO 2; --- Turn off NOTICE messages -SET client_min_messages TO WARNING; --- Create a single-row test data for shard rebalancer test shards -CREATE TABLE shard_rebalancer_test_data AS SELECT 1::int as int_column; --- Test replicate_table_shards, which will in turn test update_shard_placement --- in copy mode. -CREATE TABLE replication_test_table(int_column int); -SELECT master_create_distributed_table('replication_test_table', 'int_column', 'append'); - master_create_distributed_table +SELECT * from master_set_node_property('localhost', :worker_1_port, 'shouldhaveshards', true); + master_set_node_property --------------------------------------------------------------------- (1 row) @@ -438,37 +451,14 @@ CREATE VIEW replication_test_table_placements_per_node AS AND shardstate != 4 GROUP BY nodename, nodeport ORDER BY nodename, nodeport; -WARNING: "view replication_test_table_placements_per_node" has dependency to "table replication_test_table" that is not in Citus' metadata -DETAIL: "view replication_test_table_placements_per_node" will be created only locally -HINT: Distribute "table replication_test_table" first to distribute "view replication_test_table_placements_per_node" --- Create four shards with replication factor 2, and delete the placements --- with smaller port number to simulate under-replicated shards. -SELECT count(master_create_empty_shard('replication_test_table')) - FROM generate_series(1, 4); - count ---------------------------------------------------------------------- - 4 -(1 row) - -DELETE FROM pg_dist_shard_placement WHERE placementid in ( - SELECT pg_dist_shard_placement.placementid - FROM pg_dist_shard_placement NATURAL JOIN pg_dist_shard - WHERE logicalrelid = 'replication_test_table'::regclass - AND (nodename, nodeport) = (SELECT nodename, nodeport FROM pg_dist_shard_placement - ORDER BY nodename, nodeport limit 1) -); --- Upload the test data to the shards -\COPY replication_test_table FROM PROGRAM 'echo 1' WITH (format 'csv', append_to_shard xxxxx) -\COPY replication_test_table FROM PROGRAM 'echo 1' WITH (format 'csv', append_to_shard xxxxx) -\COPY replication_test_table FROM PROGRAM 'echo 1' WITH (format 'csv', append_to_shard xxxxx) -\COPY replication_test_table FROM PROGRAM 'echo 1' WITH (format 'csv', append_to_shard xxxxx) --- Verify that there is one node with all placements SELECT * FROM replication_test_table_placements_per_node; count --------------------------------------------------------------------- 4 (1 row) +-- Test replicate_table_shards, which will in turn test update_shard_placement +-- in copy mode. -- Check excluded_shard_list by excluding three shards with smaller ids SELECT replicate_table_shards('replication_test_table', excluded_shard_list := excluded_shard_list, @@ -540,19 +530,22 @@ SELECT * FROM replication_test_table_placements_per_node; SELECT count(*) FROM replication_test_table; count --------------------------------------------------------------------- - 4 + 100 (1 row) DROP TABLE public.replication_test_table CASCADE; -- Test rebalance_table_shards, which will in turn test update_shard_placement -- in move mode. +SET citus.shard_replication_factor TO 1; +SET citus.shard_count TO 6; CREATE TABLE rebalance_test_table(int_column int); -SELECT master_create_distributed_table('rebalance_test_table', 'int_column', 'append'); - master_create_distributed_table +SELECT create_distributed_table('rebalance_test_table', 'int_column'); + create_distributed_table --------------------------------------------------------------------- (1 row) +UPDATE pg_dist_partition SET repmodel = 'c' WHERE logicalrelid = 'rebalance_test_table'::regclass; CREATE VIEW table_placements_per_node AS SELECT nodeport, logicalrelid::regclass, count(*) FROM pg_dist_shard_placement NATURAL JOIN pg_dist_shard @@ -566,9 +559,6 @@ LANGUAGE SQL AS $$ SET citus.shard_replication_factor TO 1; - SELECT count(master_create_empty_shard(rel)) - FROM generate_series(1, 6); - SELECT count(master_move_shard_placement(shardid, src.nodename, src.nodeport::int, dst.nodename, dst.nodeport::int, @@ -582,12 +572,7 @@ $$; CALL create_unbalanced_shards('rebalance_test_table'); SET citus.shard_replication_factor TO 2; -- Upload the test data to the shards -\COPY rebalance_test_table FROM PROGRAM 'echo 1' WITH (format 'csv', append_to_shard xxxxx) -\COPY rebalance_test_table FROM PROGRAM 'echo 1' WITH (format 'csv', append_to_shard xxxxx) -\COPY rebalance_test_table FROM PROGRAM 'echo 1' WITH (format 'csv', append_to_shard xxxxx) -\COPY rebalance_test_table FROM PROGRAM 'echo 1' WITH (format 'csv', append_to_shard xxxxx) -\COPY rebalance_test_table FROM PROGRAM 'echo 1' WITH (format 'csv', append_to_shard xxxxx) -\COPY rebalance_test_table FROM PROGRAM 'echo 1' WITH (format 'csv', append_to_shard xxxxx) +INSERT INTO rebalance_test_table SELECT * FROM generate_series(1, 100); -- Verify that there is one node with all placements SELECT * FROM table_placements_per_node; nodeport | logicalrelid | count @@ -772,7 +757,7 @@ SELECT * FROM table_placements_per_node; SELECT count(*) FROM rebalance_test_table; count --------------------------------------------------------------------- - 6 + 100 (1 row) DROP TABLE rebalance_test_table; @@ -863,21 +848,39 @@ INSERT INTO test_schema_support.imbalanced_table_local VALUES(4); CREATE TABLE test_schema_support.imbalanced_table ( id integer not null ); -SELECT master_create_distributed_table('test_schema_support.imbalanced_table', 'id', 'append'); - master_create_distributed_table +SET citus.shard_count = 3; +SET citus.shard_replication_factor TO 1; +SELECT * from master_set_node_property('localhost', :worker_1_port, 'shouldhaveshards', false); + master_set_node_property --------------------------------------------------------------------- (1 row) -SET citus.shard_replication_factor TO 1; -SELECT master_create_empty_shard('test_schema_support.imbalanced_table') AS shardid \gset -COPY test_schema_support.imbalanced_table FROM STDIN WITH (format 'csv', append_to_shard :shardid); +SELECT create_distributed_table('test_schema_support.imbalanced_table', 'id'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +INSERT INTO test_schema_support.imbalanced_table SELECT * FROM generate_series(1, 100); +UPDATE pg_dist_partition SET repmodel = 'c' WHERE logicalrelid = 'test_schema_support.imbalanced_table'::regclass; +SELECT * from master_set_node_property('localhost', :worker_1_port, 'shouldhaveshards', true); + master_set_node_property +--------------------------------------------------------------------- + +(1 row) + +SET citus.shard_count = 4; +-- copy one of the shards to the other node, this is to test that the +-- rebalancer takes into account all copies of a placement SET citus.shard_replication_factor TO 2; -SELECT master_create_empty_shard('test_schema_support.imbalanced_table') AS shardid \gset -COPY test_schema_support.imbalanced_table FROM STDIN WITH (format 'csv', append_to_shard :shardid); +SELECT replicate_table_shards('test_schema_support.imbalanced_table', max_shard_copies := 1, shard_transfer_mode := 'block_writes'); + replicate_table_shards +--------------------------------------------------------------------- + +(1 row) + SET citus.shard_replication_factor TO 1; -SELECT master_create_empty_shard('test_schema_support.imbalanced_table') AS shardid \gset -COPY test_schema_support.imbalanced_table FROM STDIN WITH (format 'csv', append_to_shard :shardid); -- imbalanced_table is now imbalanced -- Shard counts in each node before rebalance SELECT * FROM public.table_placements_per_node; @@ -891,7 +894,7 @@ SELECT * FROM public.table_placements_per_node; SELECT COUNT(*) FROM imbalanced_table; count --------------------------------------------------------------------- - 12 + 100 (1 row) -- Test rebalance operation @@ -915,13 +918,13 @@ SELECT * FROM public.table_placements_per_node; SELECT COUNT(*) FROM imbalanced_table; count --------------------------------------------------------------------- - 12 + 100 (1 row) -DROP TABLE public.shard_rebalancer_test_data; DROP TABLE test_schema_support.imbalanced_table; DROP TABLE test_schema_support.imbalanced_table_local; SET citus.shard_replication_factor TO 1; +SET citus.shard_count = 4; CREATE TABLE colocated_rebalance_test(id integer); CREATE TABLE colocated_rebalance_test2(id integer); SELECT create_distributed_table('colocated_rebalance_test', 'id'); @@ -1073,14 +1076,14 @@ CALL citus_cleanup_orphaned_shards(); select * from pg_dist_placement ORDER BY placementid; placementid | shardid | shardstate | shardlength | groupid --------------------------------------------------------------------- - 150 | 123023 | 1 | 0 | 14 - 153 | 123024 | 1 | 0 | 14 - 156 | 123027 | 1 | 0 | 14 - 157 | 123028 | 1 | 0 | 14 - 158 | 123021 | 1 | 0 | 16 - 159 | 123025 | 1 | 0 | 16 - 160 | 123022 | 1 | 0 | 16 - 161 | 123026 | 1 | 0 | 16 + 146 | 123023 | 1 | 0 | 14 + 149 | 123024 | 1 | 0 | 14 + 152 | 123027 | 1 | 0 | 14 + 153 | 123028 | 1 | 0 | 14 + 154 | 123021 | 1 | 0 | 16 + 155 | 123025 | 1 | 0 | 16 + 156 | 123022 | 1 | 0 | 16 + 157 | 123026 | 1 | 0 | 16 (8 rows) -- Move all shards to worker1 again @@ -2123,8 +2126,7 @@ SET citus.shard_replication_factor TO 2; SELECT replicate_table_shards('dist_table_test_3', max_shard_copies := 4, shard_transfer_mode:='block_writes'); ERROR: Table 'dist_table_test_3' is streaming replicated. Shards of streaming replicated tables cannot be copied -- Mark table as coordinator replicated in order to be able to test replicate_table_shards -UPDATE pg_dist_partition SET repmodel='c' WHERE logicalrelid IN - ('dist_table_test_3'::regclass); +UPDATE pg_dist_partition SET repmodel='c' WHERE logicalrelid = 'dist_table_test_3'::regclass; SELECT replicate_table_shards('dist_table_test_3', max_shard_copies := 4, shard_transfer_mode:='block_writes'); replicate_table_shards --------------------------------------------------------------------- diff --git a/src/test/regress/expected/tableam.out b/src/test/regress/expected/tableam.out index e211e2bf17f..242cb53100a 100644 --- a/src/test/regress/expected/tableam.out +++ b/src/test/regress/expected/tableam.out @@ -5,7 +5,7 @@ SET citus.shard_count TO 4; create schema test_tableam; set search_path to test_tableam; SELECT public.run_command_on_coordinator_and_workers($Q$ - SET citus.enable_ddl_propagation TO off; + SET citus.enable_ddl_propagation TO off; CREATE FUNCTION fake_am_handler(internal) RETURNS table_am_handler AS 'citus' @@ -26,7 +26,7 @@ ALTER EXTENSION citus ADD ACCESS METHOD fake_am; create table test_hash_dist(id int, val int) using fake_am; insert into test_hash_dist values (1, 1); WARNING: fake_tuple_insert -select create_distributed_table('test_hash_dist','id'); +select create_distributed_table('test_hash_dist','id', colocate_with := 'none'); WARNING: fake_scan_getnextslot CONTEXT: SQL statement "SELECT TRUE FROM test_tableam.test_hash_dist LIMIT 1" WARNING: fake_scan_getnextslot @@ -168,16 +168,20 @@ SELECT * FROM master_get_table_ddl_events('test_range_dist'); -- select a.shardid, a.nodeport FROM pg_dist_shard b, pg_dist_shard_placement a -WHERE a.shardid=b.shardid AND logicalrelid = 'test_range_dist'::regclass::oid +WHERE a.shardid=b.shardid AND logicalrelid = 'test_hash_dist'::regclass::oid ORDER BY a.shardid, nodeport; shardid | nodeport --------------------------------------------------------------------- - 60005 | 57637 - 60006 | 57638 -(2 rows) + 60000 | 57637 + 60001 | 57638 + 60002 | 57637 + 60003 | 57638 +(4 rows) +-- Change repmodel to allow master_copy_shard_placement +UPDATE pg_dist_partition SET repmodel='c' WHERE logicalrelid = 'test_hash_dist'::regclass; SELECT master_copy_shard_placement( - get_shard_id_for_distribution_column('test_range_dist', '1'), + get_shard_id_for_distribution_column('test_hash_dist', '1'), 'localhost', :worker_1_port, 'localhost', :worker_2_port, do_repair := false, @@ -189,55 +193,42 @@ SELECT master_copy_shard_placement( select a.shardid, a.nodeport FROM pg_dist_shard b, pg_dist_shard_placement a -WHERE a.shardid=b.shardid AND logicalrelid = 'test_range_dist'::regclass::oid +WHERE a.shardid=b.shardid AND logicalrelid = 'test_hash_dist'::regclass::oid ORDER BY a.shardid, nodeport; shardid | nodeport --------------------------------------------------------------------- - 60005 | 57637 - 60005 | 57638 - 60006 | 57638 -(3 rows) + 60000 | 57637 + 60000 | 57638 + 60001 | 57638 + 60002 | 57637 + 60003 | 57638 +(5 rows) -- verify that data was copied correctly \c - - - :worker_1_port -select * from test_tableam.test_range_dist_60005 ORDER BY id; -WARNING: fake_scan_getnextslot -WARNING: fake_scan_getnextslot -WARNING: fake_scan_getnextslot -WARNING: fake_scan_getnextslot +select * from test_tableam.test_hash_dist_60000 ORDER BY id; WARNING: fake_scan_getnextslot WARNING: fake_scan_getnextslot WARNING: fake_scan_getnextslot id | val --------------------------------------------------------------------- - 0 | 0 1 | 1 - 1 | -1 - 2 | 4 - 3 | 9 - 7 | 9 -(6 rows) + 1 | 1 +(2 rows) \c - - - :worker_2_port -select * from test_tableam.test_range_dist_60005 ORDER BY id; -WARNING: fake_scan_getnextslot -WARNING: fake_scan_getnextslot -WARNING: fake_scan_getnextslot -WARNING: fake_scan_getnextslot +select * from test_tableam.test_hash_dist_60000 ORDER BY id; WARNING: fake_scan_getnextslot WARNING: fake_scan_getnextslot WARNING: fake_scan_getnextslot id | val --------------------------------------------------------------------- - 0 | 0 1 | 1 - 1 | -1 - 2 | 4 - 3 | 9 - 7 | 9 -(6 rows) + 1 | 1 +(2 rows) \c - - - :master_port +set search_path to test_tableam; -- -- Test that partitioned tables work correctly with a fake_am table -- @@ -254,15 +245,15 @@ SELECT create_distributed_table('test_partitioned', 'id'); NOTICE: Copying data from local table... NOTICE: copying the data has completed DETAIL: The local data in the table is no longer visible, but is still on disk. -HINT: To remove the local data, run: SELECT truncate_local_data_after_distributing_table($$public.test_partitioned_p1$$) +HINT: To remove the local data, run: SELECT truncate_local_data_after_distributing_table($$test_tableam.test_partitioned_p1$$) WARNING: fake_scan_getnextslot -CONTEXT: SQL statement "SELECT TRUE FROM public.test_partitioned_p2 LIMIT 1" +CONTEXT: SQL statement "SELECT TRUE FROM test_tableam.test_partitioned_p2 LIMIT 1" WARNING: fake_scan_getnextslot NOTICE: Copying data from local table... WARNING: fake_scan_getnextslot NOTICE: copying the data has completed DETAIL: The local data in the table is no longer visible, but is still on disk. -HINT: To remove the local data, run: SELECT truncate_local_data_after_distributing_table($$public.test_partitioned_p2$$) +HINT: To remove the local data, run: SELECT truncate_local_data_after_distributing_table($$test_tableam.test_partitioned_p2$$) create_distributed_table --------------------------------------------------------------------- diff --git a/src/test/regress/expected/upgrade_list_citus_objects.out b/src/test/regress/expected/upgrade_list_citus_objects.out index 0271d4a7743..ecf0e0e4da0 100644 --- a/src/test/regress/expected/upgrade_list_citus_objects.out +++ b/src/test/regress/expected/upgrade_list_citus_objects.out @@ -206,6 +206,7 @@ ORDER BY 1; function worker_apply_shard_ddl_command(bigint,text) function worker_apply_shard_ddl_command(bigint,text,text) function worker_change_sequence_dependency(regclass,regclass,regclass) + function worker_copy_table_to_node(regclass,integer) function worker_create_or_alter_role(text,text,text) function worker_create_or_replace_object(text) function worker_create_or_replace_object(text[]) @@ -263,5 +264,5 @@ ORDER BY 1; view citus_stat_statements view pg_dist_shard_placement view time_partitions -(255 rows) +(256 rows) diff --git a/src/test/regress/expected/worker_copy_table_to_node.out b/src/test/regress/expected/worker_copy_table_to_node.out new file mode 100644 index 00000000000..76f440189d0 --- /dev/null +++ b/src/test/regress/expected/worker_copy_table_to_node.out @@ -0,0 +1,81 @@ +CREATE SCHEMA worker_copy_table_to_node; +SET search_path TO worker_copy_table_to_node; +SET citus.shard_count TO 1; -- single shard table for ease of testing +SET citus.shard_replication_factor TO 1; +SET citus.next_shard_id TO 62629600; +SELECT nodeid AS worker_1_node FROM pg_dist_node WHERE nodeport=:worker_1_port \gset +SELECT nodeid AS worker_2_node FROM pg_dist_node WHERE nodeport=:worker_2_port \gset +CREATE TABLE t(a int); +INSERT INTO t SELECT generate_series(1, 100); +CREATE TABLE ref(a int); +INSERT INTO ref SELECT generate_series(1, 100); +select create_distributed_table('t', 'a'); +NOTICE: Copying data from local table... +NOTICE: copying the data has completed +DETAIL: The local data in the table is no longer visible, but is still on disk. +HINT: To remove the local data, run: SELECT truncate_local_data_after_distributing_table($$worker_copy_table_to_node.t$$) + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +select create_reference_table('ref'); +NOTICE: Copying data from local table... +NOTICE: copying the data has completed +DETAIL: The local data in the table is no longer visible, but is still on disk. +HINT: To remove the local data, run: SELECT truncate_local_data_after_distributing_table($$worker_copy_table_to_node.ref$$) + create_reference_table +--------------------------------------------------------------------- + +(1 row) + +\c - - - :worker_2_port +SET search_path TO worker_copy_table_to_node; +-- Create empty shard on worker 2 too +CREATE TABLE t_62629600(a int); +\c - - - :worker_1_port +SET search_path TO worker_copy_table_to_node; +-- Make sure that the UDF doesn't work on Citus tables +SELECT worker_copy_table_to_node('t', :worker_1_node); +ERROR: cannot execute a distributed query from a query on a shard +DETAIL: Executing a distributed query in a function call that may be pushed to a remote node can lead to incorrect results. +SELECT worker_copy_table_to_node('ref', :worker_1_node); +ERROR: cannot execute a distributed query from a query on a shard +DETAIL: Executing a distributed query in a function call that may be pushed to a remote node can lead to incorrect results. +-- It should work on shards +SELECT worker_copy_table_to_node('t_62629600', :worker_1_node); + worker_copy_table_to_node +--------------------------------------------------------------------- + +(1 row) + +SELECT count(*) FROM t; + count +--------------------------------------------------------------------- + 200 +(1 row) + +SELECT count(*) FROM t_62629600; + count +--------------------------------------------------------------------- + 200 +(1 row) + +SELECT worker_copy_table_to_node('t_62629600', :worker_2_node); + worker_copy_table_to_node +--------------------------------------------------------------------- + +(1 row) + +\c - - - :worker_2_port +SET search_path TO worker_copy_table_to_node; +SELECT count(*) FROM t_62629600; + count +--------------------------------------------------------------------- + 200 +(1 row) + +\c - - - :master_port +SET search_path TO worker_copy_table_to_node; +SET client_min_messages TO WARNING; +DROP SCHEMA worker_copy_table_to_node CASCADE; diff --git a/src/test/regress/multi_schedule b/src/test/regress/multi_schedule index f9136008f2e..58cfc87c835 100644 --- a/src/test/regress/multi_schedule +++ b/src/test/regress/multi_schedule @@ -58,7 +58,8 @@ test: cte_inline recursive_view_local_table values sequences_with_different_type test: pg13 pg12 # run pg14 sequentially as it syncs metadata test: pg14 -test: tableam drop_column_partitioned_table +test: drop_column_partitioned_table +test: tableam # ---------- # Miscellaneous tests to check our query planning behavior diff --git a/src/test/regress/operations_schedule b/src/test/regress/operations_schedule index 2692f212fc2..353eabfcdb4 100644 --- a/src/test/regress/operations_schedule +++ b/src/test/regress/operations_schedule @@ -3,6 +3,7 @@ test: multi_cluster_management test: multi_test_catalog_views test: shard_rebalancer_unit test: shard_rebalancer +test: worker_copy_table_to_node test: foreign_key_to_reference_shard_rebalance test: multi_move_mx test: shard_move_deferred_delete diff --git a/src/test/regress/sql/failure_offline_move_shard_placement.sql b/src/test/regress/sql/failure_offline_move_shard_placement.sql index 81683398b9f..1b02da1e9f4 100644 --- a/src/test/regress/sql/failure_offline_move_shard_placement.sql +++ b/src/test/regress/sql/failure_offline_move_shard_placement.sql @@ -57,12 +57,12 @@ SELECT master_move_shard_placement(201, 'localhost', :worker_1_port, 'localhost' SELECT citus.mitmproxy('conn.onQuery(query="CREATE TABLE move_shard_offline.t").cancel(' || :pid || ')'); SELECT master_move_shard_placement(201, 'localhost', :worker_1_port, 'localhost', :worker_2_proxy_port, 'block_writes'); --- failure on blocking append_table_to_shard operation on target node -SELECT citus.mitmproxy('conn.onQuery(query="worker_append_table_to_shard").kill()'); +-- failure on blocking COPY operation on target node +SELECT citus.mitmproxy('conn.onQuery(query="COPY").kill()'); SELECT master_move_shard_placement(201, 'localhost', :worker_1_port, 'localhost', :worker_2_proxy_port, 'block_writes'); --- cancellation on blocking append_table_to_shard operation on target node -SELECT citus.mitmproxy('conn.onQuery(query="worker_append_table_to_shard").cancel(' || :pid || ')'); +-- cancellation on blocking COPY operation on target node +SELECT citus.mitmproxy('conn.onQuery(query="COPY").cancel(' || :pid || ')'); SELECT master_move_shard_placement(201, 'localhost', :worker_1_port, 'localhost', :worker_2_proxy_port, 'block_writes'); -- failure on adding constraints on target node @@ -73,14 +73,6 @@ SELECT master_move_shard_placement(201, 'localhost', :worker_1_port, 'localhost' SELECT citus.mitmproxy('conn.onQuery(query="ADD CONSTRAINT").cancel(' || :pid || ')'); SELECT master_move_shard_placement(201, 'localhost', :worker_1_port, 'localhost', :worker_2_proxy_port, 'block_writes'); --- failure on CopyData operation on source node -SELECT citus.mitmproxy('conn.onCopyData().kill()'); -SELECT master_move_shard_placement(200, 'localhost', :worker_2_proxy_port, 'localhost', :worker_1_port, 'block_writes'); - --- cancellation on CopyData operation on source node -SELECT citus.mitmproxy('conn.onCopyData().cancel(' || :pid || ')'); -SELECT master_move_shard_placement(200, 'localhost', :worker_2_proxy_port, 'localhost', :worker_1_port, 'block_writes'); - CALL citus_cleanup_orphaned_shards(); -- Verify that the shard is not moved and the number of rows are still 100k diff --git a/src/test/regress/sql/shard_rebalancer.sql b/src/test/regress/sql/shard_rebalancer.sql index b16356a4a96..0d482998bac 100644 --- a/src/test/regress/sql/shard_rebalancer.sql +++ b/src/test/regress/sql/shard_rebalancer.sql @@ -291,24 +291,22 @@ SELECT unnest(shard_placement_replication_array( 2 )); --- Ensure that shard_replication_factor is 2 during replicate_table_shards --- and rebalance_table_shards tests - -SET citus.shard_replication_factor TO 2; - --- Turn off NOTICE messages - SET client_min_messages TO WARNING; --- Create a single-row test data for shard rebalancer test shards - -CREATE TABLE shard_rebalancer_test_data AS SELECT 1::int as int_column; - --- Test replicate_table_shards, which will in turn test update_shard_placement --- in copy mode. - +set citus.shard_count = 4; +-- Create a distributed table with all shards on a single node, so that we can +-- use this as an under-replicated +SET citus.shard_replication_factor TO 1; +SELECT * from master_set_node_property('localhost', :worker_1_port, 'shouldhaveshards', false); CREATE TABLE replication_test_table(int_column int); -SELECT master_create_distributed_table('replication_test_table', 'int_column', 'append'); +SELECT create_distributed_table('replication_test_table', 'int_column'); +UPDATE pg_dist_partition SET repmodel = 'c' WHERE logicalrelid = 'replication_test_table'::regclass; +INSERT INTO replication_test_table SELECT * FROM generate_series(1, 100); + +-- Ensure that shard_replication_factor is 2 during replicate_table_shards +-- and rebalance_table_shards tests +SET citus.shard_replication_factor TO 2; +SELECT * from master_set_node_property('localhost', :worker_1_port, 'shouldhaveshards', true); CREATE VIEW replication_test_table_placements_per_node AS SELECT count(*) FROM pg_dist_shard_placement NATURAL JOIN pg_dist_shard @@ -317,31 +315,12 @@ CREATE VIEW replication_test_table_placements_per_node AS GROUP BY nodename, nodeport ORDER BY nodename, nodeport; --- Create four shards with replication factor 2, and delete the placements --- with smaller port number to simulate under-replicated shards. - -SELECT count(master_create_empty_shard('replication_test_table')) - FROM generate_series(1, 4); - -DELETE FROM pg_dist_shard_placement WHERE placementid in ( - SELECT pg_dist_shard_placement.placementid - FROM pg_dist_shard_placement NATURAL JOIN pg_dist_shard - WHERE logicalrelid = 'replication_test_table'::regclass - AND (nodename, nodeport) = (SELECT nodename, nodeport FROM pg_dist_shard_placement - ORDER BY nodename, nodeport limit 1) -); - --- Upload the test data to the shards - -\COPY replication_test_table FROM PROGRAM 'echo 1' WITH (format 'csv', append_to_shard 123000) -\COPY replication_test_table FROM PROGRAM 'echo 1' WITH (format 'csv', append_to_shard 123001) -\COPY replication_test_table FROM PROGRAM 'echo 1' WITH (format 'csv', append_to_shard 123002) -\COPY replication_test_table FROM PROGRAM 'echo 1' WITH (format 'csv', append_to_shard 123003) - --- Verify that there is one node with all placements SELECT * FROM replication_test_table_placements_per_node; +-- Test replicate_table_shards, which will in turn test update_shard_placement +-- in copy mode. + -- Check excluded_shard_list by excluding three shards with smaller ids SELECT replicate_table_shards('replication_test_table', @@ -386,8 +365,11 @@ DROP TABLE public.replication_test_table CASCADE; -- Test rebalance_table_shards, which will in turn test update_shard_placement -- in move mode. +SET citus.shard_replication_factor TO 1; +SET citus.shard_count TO 6; CREATE TABLE rebalance_test_table(int_column int); -SELECT master_create_distributed_table('rebalance_test_table', 'int_column', 'append'); +SELECT create_distributed_table('rebalance_test_table', 'int_column'); +UPDATE pg_dist_partition SET repmodel = 'c' WHERE logicalrelid = 'rebalance_test_table'::regclass; CREATE VIEW table_placements_per_node AS SELECT nodeport, logicalrelid::regclass, count(*) @@ -404,9 +386,6 @@ LANGUAGE SQL AS $$ SET citus.shard_replication_factor TO 1; - SELECT count(master_create_empty_shard(rel)) - FROM generate_series(1, 6); - SELECT count(master_move_shard_placement(shardid, src.nodename, src.nodeport::int, dst.nodename, dst.nodeport::int, @@ -424,12 +403,7 @@ SET citus.shard_replication_factor TO 2; -- Upload the test data to the shards -\COPY rebalance_test_table FROM PROGRAM 'echo 1' WITH (format 'csv', append_to_shard 123004) -\COPY rebalance_test_table FROM PROGRAM 'echo 1' WITH (format 'csv', append_to_shard 123005) -\COPY rebalance_test_table FROM PROGRAM 'echo 1' WITH (format 'csv', append_to_shard 123006) -\COPY rebalance_test_table FROM PROGRAM 'echo 1' WITH (format 'csv', append_to_shard 123007) -\COPY rebalance_test_table FROM PROGRAM 'echo 1' WITH (format 'csv', append_to_shard 123008) -\COPY rebalance_test_table FROM PROGRAM 'echo 1' WITH (format 'csv', append_to_shard 123009) +INSERT INTO rebalance_test_table SELECT * FROM generate_series(1, 100); -- Verify that there is one node with all placements @@ -604,34 +578,20 @@ CREATE TABLE test_schema_support.imbalanced_table ( id integer not null ); -SELECT master_create_distributed_table('test_schema_support.imbalanced_table', 'id', 'append'); - +SET citus.shard_count = 3; SET citus.shard_replication_factor TO 1; -SELECT master_create_empty_shard('test_schema_support.imbalanced_table') AS shardid \gset -COPY test_schema_support.imbalanced_table FROM STDIN WITH (format 'csv', append_to_shard :shardid); -1 -2 -3 -4 -\. +SELECT * from master_set_node_property('localhost', :worker_1_port, 'shouldhaveshards', false); +SELECT create_distributed_table('test_schema_support.imbalanced_table', 'id'); +INSERT INTO test_schema_support.imbalanced_table SELECT * FROM generate_series(1, 100); +UPDATE pg_dist_partition SET repmodel = 'c' WHERE logicalrelid = 'test_schema_support.imbalanced_table'::regclass; +SELECT * from master_set_node_property('localhost', :worker_1_port, 'shouldhaveshards', true); +SET citus.shard_count = 4; +-- copy one of the shards to the other node, this is to test that the +-- rebalancer takes into account all copies of a placement SET citus.shard_replication_factor TO 2; -SELECT master_create_empty_shard('test_schema_support.imbalanced_table') AS shardid \gset -COPY test_schema_support.imbalanced_table FROM STDIN WITH (format 'csv', append_to_shard :shardid); -1 -2 -3 -4 -\. - +SELECT replicate_table_shards('test_schema_support.imbalanced_table', max_shard_copies := 1, shard_transfer_mode := 'block_writes'); SET citus.shard_replication_factor TO 1; -SELECT master_create_empty_shard('test_schema_support.imbalanced_table') AS shardid \gset -COPY test_schema_support.imbalanced_table FROM STDIN WITH (format 'csv', append_to_shard :shardid); -1 -2 -3 -4 -\. -- imbalanced_table is now imbalanced @@ -652,11 +612,11 @@ SELECT * FROM public.table_placements_per_node; -- Row count in imbalanced table after rebalance SELECT COUNT(*) FROM imbalanced_table; -DROP TABLE public.shard_rebalancer_test_data; DROP TABLE test_schema_support.imbalanced_table; DROP TABLE test_schema_support.imbalanced_table_local; SET citus.shard_replication_factor TO 1; +SET citus.shard_count = 4; CREATE TABLE colocated_rebalance_test(id integer); CREATE TABLE colocated_rebalance_test2(id integer); @@ -1276,8 +1236,7 @@ SET citus.shard_replication_factor TO 2; SELECT replicate_table_shards('dist_table_test_3', max_shard_copies := 4, shard_transfer_mode:='block_writes'); -- Mark table as coordinator replicated in order to be able to test replicate_table_shards -UPDATE pg_dist_partition SET repmodel='c' WHERE logicalrelid IN - ('dist_table_test_3'::regclass); +UPDATE pg_dist_partition SET repmodel='c' WHERE logicalrelid = 'dist_table_test_3'::regclass; SELECT replicate_table_shards('dist_table_test_3', max_shard_copies := 4, shard_transfer_mode:='block_writes'); diff --git a/src/test/regress/sql/tableam.sql b/src/test/regress/sql/tableam.sql index 47845492a17..f0ed5cfca0f 100644 --- a/src/test/regress/sql/tableam.sql +++ b/src/test/regress/sql/tableam.sql @@ -26,7 +26,7 @@ ALTER EXTENSION citus ADD ACCESS METHOD fake_am; create table test_hash_dist(id int, val int) using fake_am; insert into test_hash_dist values (1, 1); -select create_distributed_table('test_hash_dist','id'); +select create_distributed_table('test_hash_dist','id', colocate_with := 'none'); select * from test_hash_dist; insert into test_hash_dist values (1, 1); @@ -86,11 +86,14 @@ SELECT * FROM master_get_table_ddl_events('test_range_dist'); select a.shardid, a.nodeport FROM pg_dist_shard b, pg_dist_shard_placement a -WHERE a.shardid=b.shardid AND logicalrelid = 'test_range_dist'::regclass::oid +WHERE a.shardid=b.shardid AND logicalrelid = 'test_hash_dist'::regclass::oid ORDER BY a.shardid, nodeport; +-- Change repmodel to allow master_copy_shard_placement +UPDATE pg_dist_partition SET repmodel='c' WHERE logicalrelid = 'test_hash_dist'::regclass; + SELECT master_copy_shard_placement( - get_shard_id_for_distribution_column('test_range_dist', '1'), + get_shard_id_for_distribution_column('test_hash_dist', '1'), 'localhost', :worker_1_port, 'localhost', :worker_2_port, do_repair := false, @@ -98,19 +101,21 @@ SELECT master_copy_shard_placement( select a.shardid, a.nodeport FROM pg_dist_shard b, pg_dist_shard_placement a -WHERE a.shardid=b.shardid AND logicalrelid = 'test_range_dist'::regclass::oid +WHERE a.shardid=b.shardid AND logicalrelid = 'test_hash_dist'::regclass::oid ORDER BY a.shardid, nodeport; -- verify that data was copied correctly \c - - - :worker_1_port -select * from test_tableam.test_range_dist_60005 ORDER BY id; +select * from test_tableam.test_hash_dist_60000 ORDER BY id; \c - - - :worker_2_port -select * from test_tableam.test_range_dist_60005 ORDER BY id; +select * from test_tableam.test_hash_dist_60000 ORDER BY id; \c - - - :master_port +set search_path to test_tableam; + -- -- Test that partitioned tables work correctly with a fake_am table -- diff --git a/src/test/regress/sql/worker_copy_table_to_node.sql b/src/test/regress/sql/worker_copy_table_to_node.sql new file mode 100644 index 00000000000..fa0703a257b --- /dev/null +++ b/src/test/regress/sql/worker_copy_table_to_node.sql @@ -0,0 +1,49 @@ +CREATE SCHEMA worker_copy_table_to_node; +SET search_path TO worker_copy_table_to_node; +SET citus.shard_count TO 1; -- single shard table for ease of testing +SET citus.shard_replication_factor TO 1; +SET citus.next_shard_id TO 62629600; + +SELECT nodeid AS worker_1_node FROM pg_dist_node WHERE nodeport=:worker_1_port \gset +SELECT nodeid AS worker_2_node FROM pg_dist_node WHERE nodeport=:worker_2_port \gset + +CREATE TABLE t(a int); +INSERT INTO t SELECT generate_series(1, 100); + +CREATE TABLE ref(a int); +INSERT INTO ref SELECT generate_series(1, 100); + +select create_distributed_table('t', 'a'); +select create_reference_table('ref'); + +\c - - - :worker_2_port +SET search_path TO worker_copy_table_to_node; + +-- Create empty shard on worker 2 too +CREATE TABLE t_62629600(a int); + +\c - - - :worker_1_port +SET search_path TO worker_copy_table_to_node; + +-- Make sure that the UDF doesn't work on Citus tables +SELECT worker_copy_table_to_node('t', :worker_1_node); +SELECT worker_copy_table_to_node('ref', :worker_1_node); + +-- It should work on shards +SELECT worker_copy_table_to_node('t_62629600', :worker_1_node); + +SELECT count(*) FROM t; +SELECT count(*) FROM t_62629600; + +SELECT worker_copy_table_to_node('t_62629600', :worker_2_node); + +\c - - - :worker_2_port +SET search_path TO worker_copy_table_to_node; + +SELECT count(*) FROM t_62629600; + +\c - - - :master_port +SET search_path TO worker_copy_table_to_node; + +SET client_min_messages TO WARNING; +DROP SCHEMA worker_copy_table_to_node CASCADE;