Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

IGNITE-23559 resetPartitions improvements: two phase reset #4688

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.ignite.internal.distributionzones.rebalance;

import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;
import java.util.stream.IntStream;
import org.apache.ignite.internal.lang.ByteArray;
import org.apache.ignite.internal.metastorage.Entry;
import org.apache.ignite.internal.metastorage.MetaStorageManager;
import org.apache.ignite.internal.partitiondistribution.Assignments;

/**
* Util class for methods to work with the assignments.
*/
public class AssignmentUtil {

/**
* Collect partition ids for partition assignments.
*
* @param partitionIds IDs of partitions to get assignments for. If empty, get all partition assignments.
* @param numberOfPartitions Number of partitions. Ignored if partition IDs are specified.
*/
public static int[] partitionIds(Set<Integer> partitionIds, int numberOfPartitions) {
IntStream partitionIdsStream = partitionIds.isEmpty()
? IntStream.range(0, numberOfPartitions)
: partitionIds.stream().mapToInt(Integer::intValue);

return partitionIdsStream.toArray();
}

/**
* Collect partition ids for partition assignments.
*
* @param numberOfPartitions Number of partitions.
*/
public static int[] partitionIds(int numberOfPartitions) {
return partitionIds(Set.of(), numberOfPartitions);
}

/**
* Returns assignments for table partitions from meta storage.
*
* @param metaStorageManager Meta storage manager.
* @param partitionIds IDs of partitions to get assignments for.
* @return Future with table assignments as a value.
*/
public static CompletableFuture<Map<Integer, Assignments>> metastoreAssignments(
MetaStorageManager metaStorageManager,
int[] partitionIds,
Function<Integer, ByteArray> keyForPartition
) {
Map<ByteArray, Integer> partitionKeysToPartitionNumber = new HashMap<>();

for (int partitionId : partitionIds) {
partitionKeysToPartitionNumber.put(keyForPartition.apply(partitionId), partitionId);
}

return metaStorageManager.getAll(partitionKeysToPartitionNumber.keySet())
.thenApply(entries -> {
if (entries.isEmpty()) {
return Map.of();
}

Map<Integer, Assignments> result = new HashMap<>();

for (var mapEntry : entries.entrySet()) {
Entry entry = mapEntry.getValue();

if (!entry.empty() && !entry.tombstone()) {
result.put(partitionKeysToPartitionNumber.get(mapEntry.getKey()), Assignments.fromBytes(entry.value()));
}
}

return result.isEmpty() ? Map.of() : result;
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import org.apache.ignite.internal.catalog.Catalog;
Expand All @@ -51,7 +50,6 @@
import org.apache.ignite.internal.metastorage.Revisions;
import org.apache.ignite.internal.metastorage.WatchEvent;
import org.apache.ignite.internal.metastorage.WatchListener;
import org.apache.ignite.internal.util.ExceptionUtils;
import org.apache.ignite.internal.util.IgniteSpinBusyLock;
import org.apache.ignite.internal.util.IgniteUtils;
import org.jetbrains.annotations.TestOnly;
Expand Down Expand Up @@ -312,53 +310,17 @@ private CompletableFuture<Void> triggerPartitionsRebalanceForAllTables(
List<CompletableFuture<?>> tableFutures = new ArrayList<>(tableDescriptors.size());

for (CatalogTableDescriptor tableDescriptor : tableDescriptors) {
CompletableFuture<?>[] partitionFutures = RebalanceUtil.triggerAllTablePartitionsRebalance(
tableFutures.add(RebalanceUtil.triggerAllTablePartitionsRebalance(
tableDescriptor,
zoneDescriptor,
dataNodes,
revision,
metaStorageManager,
assignmentsTimestamp
);

// This set is used to deduplicate exceptions (if there is an exception from upstream, for instance,
// when reading from MetaStorage, it will be encountered by every partition future) to avoid noise
// in the logs.
Set<Throwable> unwrappedCauses = ConcurrentHashMap.newKeySet();

for (int partId = 0; partId < partitionFutures.length; partId++) {
int finalPartId = partId;

partitionFutures[partId].exceptionally(e -> {
Throwable cause = ExceptionUtils.unwrapCause(e);

if (unwrappedCauses.add(cause)) {
// The exception is specific to this partition.
LOG.error(
"Exception on updating assignments for [table={}, partition={}]",
e,
tableInfo(tableDescriptor), finalPartId
);
} else {
// The exception is from upstream and not specific for this partition, so don't log the partition index.
LOG.error(
"Exception on updating assignments for [table={}]",
e,
tableInfo(tableDescriptor)
);
}

return null;
});
}

tableFutures.add(allOf(partitionFutures));
));
}

return allOf(tableFutures.toArray(CompletableFuture[]::new));
}

private static String tableInfo(CatalogTableDescriptor tableDescriptor) {
return tableDescriptor.id() + "/" + tableDescriptor.name();
}
}
Loading