Skip to content

Commit

Permalink
Revert "Rename per worker"
Browse files Browse the repository at this point in the history
This reverts commit c48bf5e.
  • Loading branch information
emelsimsek committed Mar 11, 2024
1 parent 93c0ec1 commit 20a887b
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 46 deletions.
2 changes: 1 addition & 1 deletion src/backend/distributed/operations/shard_rebalancer.c
Original file line number Diff line number Diff line change
Expand Up @@ -436,7 +436,7 @@ BigIntArrayDatumContains(Datum *array, int arrayLength, uint64 toFind)
* FullShardPlacementList returns a List containing all the shard placements of
* a specific table (excluding the excludedShardArray)
*/
List *
static List *
FullShardPlacementList(Oid relationId, ArrayType *excludedShardArray)
{
List *shardPlacementList = NIL;
Expand Down
70 changes: 27 additions & 43 deletions src/backend/distributed/utils/multi_partitioning_utils.c
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,10 @@
#include "distributed/version_compat.h"
#include "distributed/worker_protocol.h"

<<<<<<< HEAD
extern List * FullShardPlacementList(Oid relationId, ArrayType *excludedShardArray);
=======
>>>>>>> parent of c48bf5eba (Rename per worker)
static char * PartitionBound(Oid partitionId);
static Relation try_relation_open_nolock(Oid relationId);
static List * CreateFixPartitionConstraintsTaskList(Oid relationId);
Expand Down Expand Up @@ -556,62 +559,43 @@ CreateFixPartitionShardIndexNames(Oid parentRelationId, Oid partitionRelationId,
/* lock metadata before getting placement lists */
LockShardListMetadata(parentShardIntervalList, ShareLock);

int taskId = 1;

List *shardPlacementList = FullShardPlacementList(parentRelationId,
construct_empty_array(INT4OID));


List *workerNodeList = ReadDistNode(true);

/* make sure we have deterministic output for our tests */
workerNodeList = SortList(workerNodeList, CompareWorkerNodes);

MemoryContext localContext = AllocSetContextCreate(CurrentMemoryContext,
"CreateFixPartitionShardIndexNames",
ALLOCSET_DEFAULT_SIZES);
MemoryContext oldContext = MemoryContextSwitchTo(localContext);

WorkerNode *workerNode = NULL;
foreach_ptr(workerNode, workerNodeList)
{
List *shardsOnNode = FilterActiveShardPlacementListByNode(
shardPlacementList, workerNode);
int taskId = 1;

ShardPlacement *shardPlacement = NULL;
ShardInterval *parentShardInterval = NULL;
foreach_ptr(parentShardInterval, parentShardIntervalList)
{
uint64 parentShardId = parentShardInterval->shardId;

foreach_ptr(shardPlacement, shardsOnNode)
List *queryStringList =
WorkerFixPartitionShardIndexNamesCommandList(parentShardId,
parentIndexIdList,
partitionRelationId);
if (queryStringList != NIL)
{
uint64 parentShardId = shardPlacement->shardId;

List *queryStringList =
WorkerFixPartitionShardIndexNamesCommandList(parentShardId,
parentIndexIdList,
partitionRelationId);
if (queryStringList != NIL)
{
Task *task = CitusMakeNode(Task);
task->jobId = INVALID_JOB_ID;
task->taskId = taskId++;
task->taskType = DDL_TASK;
Task *task = CitusMakeNode(Task);
task->jobId = INVALID_JOB_ID;
task->taskId = taskId++;
task->taskType = DDL_TASK;

char *prefix = "SELECT pg_catalog.citus_run_local_command($$";
char *postfix = "$$)";
char *string = StringJoinParams(queryStringList, ';', prefix, postfix);
char *prefix = "SELECT pg_catalog.citus_run_local_command($$";
char *postfix = "$$)";
char *string = StringJoinParams(queryStringList, ';', prefix, postfix);

SetTaskQueryString(task, string);
SetTaskQueryString(task, string);


task->dependentTaskList = NULL;
task->replicationModel = REPLICATION_MODEL_INVALID;
task->anchorShardId = parentShardId;
task->taskPlacementList = ActiveShardPlacementList(parentShardId);
task->dependentTaskList = NULL;
task->replicationModel = REPLICATION_MODEL_INVALID;
task->anchorShardId = parentShardId;
task->taskPlacementList = ActiveShardPlacementList(parentShardId);

bool localExecutionSupported = true;
ExecuteUtilityTaskList(list_make1(task), localExecutionSupported);
}

break;
bool localExecutionSupported = true;
ExecuteUtilityTaskList(list_make1(task), localExecutionSupported);
}

/* after every iteration, clean-up all the memory associated with it */
Expand Down
2 changes: 0 additions & 2 deletions src/include/distributed/shard_rebalancer.h
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,4 @@ extern void SetupRebalanceMonitor(List *placementUpdateList,
uint64 initialProgressState,
PlacementUpdateStatus initialStatus);

extern List * FullShardPlacementList(Oid relationId, ArrayType *excludedShardArray);

#endif /* SHARD_REBALANCER_H */

0 comments on commit 20a887b

Please sign in to comment.