Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 0 additions & 3 deletions src/cluster_migrateslots.c
Original file line number Diff line number Diff line change
Expand Up @@ -865,9 +865,6 @@ void performSlotImportJobFailover(slotMigrationJob *job) {
/* 4) Pong all the other nodes so that they can update the state accordingly
* and detect that we have taken over the slots. */
clusterDoBeforeSleep(CLUSTER_TODO_BROADCAST_ALL);

/* 5) Mark all slots as stable in the kvstore (for SCAN/KEYS/RANDOMKEY) */
setSlotImportingStateInAllDbs(job->slot_ranges, 0);
}

bool clusterIsAnySlotImporting(void) {
Expand Down
4 changes: 2 additions & 2 deletions src/kvstore.c
Original file line number Diff line number Diff line change
Expand Up @@ -949,10 +949,10 @@ void kvstoreSetIsImporting(kvstore *kvs, int didx, int is_importing) {
return;
}

hashtableDelete(kvs->importing, (void *)(intptr_t)didx);
/* Once we mark a hashtable as not importing, we need to begin tracking in
* the kvstore metadata */
if (ht && hashtableSize(ht) != 0) {
if (hashtableDelete(kvs->importing, (void *)(intptr_t)didx) && ht && hashtableSize(ht) != 0) {
cumulativeKeyCountAdd(kvs, didx, hashtableSize(ht));
kvs->importing_key_count -= hashtableSize(ht);
}
}
74 changes: 74 additions & 0 deletions tests/unit/cluster/cluster-migrateslots.tcl
Original file line number Diff line number Diff line change
Expand Up @@ -512,6 +512,54 @@ start_cluster 3 3 {tags {logreqres:skip external:skip cluster} overrides {cluste
}
}

test "Slot migration DBSIZE after migration" {
assert_does_not_resync {
# Populate data before migration
populate 1000 "$16381_slot_tag:" 1000 -2
populate 1000 "$16382_slot_tag:" 1000 -2
populate 1000 "$16383_slot_tag:" 1000 -2
wait_for_countkeysinslot 5 16381 1000
wait_for_countkeysinslot 5 16382 1000
wait_for_countkeysinslot 5 16383 1000
assert_equal "3000" [R 2 DBSIZE]
assert_equal "0" [R 0 DBSIZE]
assert_equal "3000" [R 5 DBSIZE]
assert_equal "0" [R 3 DBSIZE]

foreach slot_to_migrate {16381 16382 16383} {
# Perform one-shot migration
assert_match "OK" [R 2 CLUSTER MIGRATESLOTS SLOTSRANGE $slot_to_migrate $slot_to_migrate NODE $node0_id]
wait_for_migration 0 $slot_to_migrate

# Reflected in DBSIZE
assert_match "1000" [R 0 CLUSTER COUNTKEYSINSLOT $slot_to_migrate]
assert_match "0" [R 2 CLUSTER COUNTKEYSINSLOT $slot_to_migrate]
assert_equal "2000" [R 2 DBSIZE]
assert_equal "1000" [R 0 DBSIZE]
wait_for_countkeysinslot 3 $slot_to_migrate 1000
wait_for_countkeysinslot 5 $slot_to_migrate 0
assert_equal "2000" [R 5 DBSIZE]
assert_equal "1000" [R 3 DBSIZE]

# Perform migration back
assert_match "OK" [R 0 CLUSTER MIGRATESLOTS SLOTSRANGE $slot_to_migrate $slot_to_migrate NODE $node2_id]
set jobname [get_job_name 0 $slot_to_migrate]
wait_for_migration 2 $slot_to_migrate

# Reflected in DBSIZE
assert_equal "3000" [R 2 DBSIZE]
assert_equal "0" [R 0 DBSIZE]
wait_for_countkeysinslot 5 $slot_to_migrate 1000
wait_for_countkeysinslot 3 $slot_to_migrate 0
assert_equal "3000" [R 5 DBSIZE]
assert_equal "0" [R 3 DBSIZE]
}

# Cleanup for next test
assert_match "OK" [R 2 FLUSHDB SYNC]
}
}

test "Single source import - two phase" {
assert_does_not_resync {
set_debug_prevent_pause 1
Expand Down Expand Up @@ -2006,27 +2054,53 @@ start_cluster 3 3 {tags {logreqres:skip external:skip cluster} overrides {cluste
set owning_repl 3
}

# Restart the node again to verify resync is capable. This would
# catch if the result of the above operation caused some kind of
# kvstore/RDB corruption.
if {$do_save} {
assert_match "OK" [R $idx_to_restart SAVE]
}
do_node_restart $idx_to_restart

# Wait for resync
wait_for_condition 50 1000 {
[status [srv -3 client] master_link_status] == "up"
} else {
fail "Node 3 is not synced"
}
wait_for_condition 50 1000 {
[status [srv -5 client] master_link_status] == "up"
} else {
fail "Node 5 is not synced"
}

if {$owning_prim == $idx_to_restart && ! $do_save} {
assert_match "0" [R $owning_prim CLUSTER COUNTKEYSINSLOT 16379]
assert_match "0" [R $owning_prim CLUSTER COUNTKEYSINSLOT 16381]
assert_match "0" [R $owning_prim CLUSTER COUNTKEYSINSLOT 16383]
wait_for_countkeysinslot $owning_repl 16379 0
wait_for_countkeysinslot $owning_repl 16381 0
wait_for_countkeysinslot $owning_repl 16383 0
assert_equal "0" [R $owning_prim DBSIZE]
assert_equal "0" [R $owning_repl DBSIZE]
} else {
assert_match "333" [R $owning_prim CLUSTER COUNTKEYSINSLOT 16379]
assert_match "333" [R $owning_prim CLUSTER COUNTKEYSINSLOT 16381]
assert_match "334" [R $owning_prim CLUSTER COUNTKEYSINSLOT 16383]
wait_for_countkeysinslot $owning_repl 16379 333
wait_for_countkeysinslot $owning_repl 16381 333
wait_for_countkeysinslot $owning_repl 16383 334
assert_equal "1000" [R $owning_prim DBSIZE]
assert_equal "1000" [R $owning_repl DBSIZE]
}
assert_match "0" [R $not_owning_prim CLUSTER COUNTKEYSINSLOT 16379]
assert_match "0" [R $not_owning_prim CLUSTER COUNTKEYSINSLOT 16381]
assert_match "0" [R $not_owning_prim CLUSTER COUNTKEYSINSLOT 16383]
wait_for_countkeysinslot $not_owning_repl 16379 0
wait_for_countkeysinslot $not_owning_repl 16381 0
wait_for_countkeysinslot $not_owning_repl 16383 0
assert_equal "0" [R $not_owning_repl DBSIZE]
assert_equal "0" [R $not_owning_repl DBSIZE]

# Cleanup for the next test
assert_match "OK" [R $owning_prim FLUSHDB SYNC]
Expand Down
Loading