Skip to content

Commit

Permalink
two phase implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
Kirill Sizov committed Nov 7, 2024
1 parent de6d12c commit 7885b6c
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,15 @@ public void put(String nodeName, LocalPartitionStateMessage state) {
map.put(nodeName, state);
}

/**
* Returns node state mapping.
*
* @param node Consistent ID of the node.
*/
public LocalPartitionStateMessage partitionState(String node) {
return map.get(node);
}

@Override
public String toString() {
return map.toString();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,11 @@
import static org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
import static org.apache.ignite.lang.ErrorGroups.DisasterRecovery.CLUSTER_NOT_IDLE_ERR;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
Expand Down Expand Up @@ -142,6 +145,9 @@ public CompletableFuture<Void> handle(DisasterRecoveryManager disasterRecoveryMa

CompletableFuture<Set<String>> dataNodesFuture = disasterRecoveryManager.dzManager.dataNodes(msRevision, catalogVersion, zoneId);

// TODO: flag from another pr
boolean manualUpdate = true;

return dataNodesFuture.thenCombine(localStates, (dataNodes, localStatesMap) -> {
Set<String> nodeConsistentIds = disasterRecoveryManager.dzManager.logicalTopology()
.stream()
Expand All @@ -156,7 +162,8 @@ public CompletableFuture<Void> handle(DisasterRecoveryManager disasterRecoveryMa
msRevision,
disasterRecoveryManager.metaStorageManager,
localStatesMap,
catalog.time()
catalog.time(),
manualUpdate
);
}).thenCompose(Function.identity());
}
Expand All @@ -183,7 +190,8 @@ private CompletableFuture<Void> forceAssignmentsUpdate(
long revision,
MetaStorageManager metaStorageManager,
Map<TablePartitionId, LocalPartitionStateMessageByNode> localStatesMap,
long assignmentsTimestamp
long assignmentsTimestamp,
boolean manualUpdate
) {
int[] partitionIdsArray = AssignmentUtil.partitionIds(partitionIds, zoneDescriptor.partitions());

Expand All @@ -203,7 +211,8 @@ private CompletableFuture<Void> forceAssignmentsUpdate(
localStatesMap,
assignmentsTimestamp,
partitionIdsArray,
tableAssignments
tableAssignments,
manualUpdate
);
});
}
Expand All @@ -218,7 +227,8 @@ private static CompletableFuture<Void> updateAssignments(
Map<TablePartitionId, LocalPartitionStateMessageByNode> localStatesMap,
long assignmentsTimestamp,
int[] partitionIdsArray,
Map<Integer, Assignments> tableAssignments
Map<Integer, Assignments> tableAssignments,
boolean manualUpdate
) {
Set<String> aliveDataNodes = CollectionUtils.intersect(dataNodes, aliveNodesConsistentIds);

Expand All @@ -236,7 +246,8 @@ private static CompletableFuture<Void> updateAssignments(
metaStorageManager,
tableAssignments.get(replicaGrpId.partitionId()).nodes(),
localStatesMap.get(replicaGrpId),
assignmentsTimestamp
assignmentsTimestamp,
manualUpdate
).thenAccept(res -> {
DisasterRecoveryManager.LOG.info(
"Partition {} returned {} status on reset attempt", replicaGrpId, UpdateStatus.valueOf(res)
Expand All @@ -256,7 +267,8 @@ private static CompletableFuture<Integer> manualPartitionUpdate(
MetaStorageManager metaStorageMgr,
Set<Assignment> currentAssignments,
LocalPartitionStateMessageByNode localPartitionStateMessageByNode,
long assignmentsTimestamp
long assignmentsTimestamp,
boolean manualUpdate
) {
Set<Assignment> partAssignments = getAliveNodesWithData(aliveNodesConsistentIds, localPartitionStateMessageByNode);
Set<Assignment> aliveStableNodes = CollectionUtils.intersect(currentAssignments, partAssignments);
Expand All @@ -268,6 +280,12 @@ private static CompletableFuture<Integer> manualPartitionUpdate(
Iif invokeClosure;

if (aliveStableNodes.isEmpty()) {
if (!manualUpdate) {
// Do nothing if no alive nodes and it not a manual update.
// todo: check which exactly set of nodes to check.
return completedFuture(ASSIGNMENT_NOT_UPDATED.ordinal());
}

enrichAssignments(partId, aliveDataNodes, replicas, partAssignments);

// There are no known nodes with data, which means that we can just put new assignments into pending assignments with "forced"
Expand All @@ -279,15 +297,27 @@ private static CompletableFuture<Integer> manualPartitionUpdate(
null
);
} else {
Set<Assignment> stableAssignments = Set.copyOf(partAssignments);
enrichAssignments(partId, aliveDataNodes, replicas, partAssignments);
assert !partAssignments.isEmpty() : "Alive nodes with data should not be empty";

List<Assignment> stableAssignments = new ArrayList<>(partAssignments);

stableAssignments.sort(
Comparator.<Assignment>comparingLong(
node -> localPartitionStateMessageByNode.partitionState(node.consistentId()).logIndex()
)
.reversed()
.thenComparing(Assignment::consistentId));


if (manualUpdate) {
enrichAssignments(partId, aliveDataNodes, replicas, partAssignments);
}
// There are nodes with data, and we set pending assignments to this set of nodes. It'll be the source of peers for
// "resetPeers", and after that new assignments with restored replica factor wil be picked up from planned assignments.
invokeClosure = prepareMsInvokeClosure(
partId,
longToBytesKeepingOrder(revision),
Assignments.forced(stableAssignments, assignmentsTimestamp).toBytes(),
Assignments.forced(Set.of(stableAssignments.get(0)), assignmentsTimestamp).toBytes(),
Assignments.toBytes(partAssignments, assignmentsTimestamp)
);
}
Expand Down

0 comments on commit 7885b6c

Please sign in to comment.