Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/catch_flaky.yml
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ jobs:
tt version
- name: Setup luatest
run: tt rocks install luatest
run: tt rocks install luatest 1.4.1

- run: cmake .
- run: make test-flaky
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/fast_testing.yml
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ jobs:
tt version
- name: Setup luatest
run: tt rocks install luatest
run: tt rocks install luatest 1.4.1

- run: cmake .
- run: make test-force
2 changes: 1 addition & 1 deletion .github/workflows/reusable_testing.yml
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ jobs:
tt version
- name: Setup luatest
run: tt rocks install luatest
run: tt rocks install luatest 1.4.1

- run: cmake .
- run: make test-force
5 changes: 4 additions & 1 deletion test/instances/storage.lua
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,9 @@ local function bucket_recovery_wait()
if index_has(status_index, vconst.BUCKET.RECEIVING) then
error('Still have RECEIVING buckets')
end
if index_has(status_index, vconst.BUCKET.READONLY) then
error('Still have READONLY buckets')
end
end)
end

Expand All @@ -163,7 +166,7 @@ end

local function bucket_recovery_continue()
vshard.storage.internal.errinj.ERRINJ_RECOVERY_PAUSE = false
vshard.storage.garbage_collector_wakeup()
vshard.storage.recovery_wakeup()
end

local function wal_sync()
Expand Down
22 changes: 22 additions & 0 deletions test/luatest_helpers/vtest.lua
Original file line number Diff line number Diff line change
Expand Up @@ -498,6 +498,26 @@ local function cluster_rebalancer_enable(g)
t.assert_equals(err, nil, 'rebalancer enable')
end

--
-- Disable recovery on all masters.
--
local function cluster_recovery_pause(g)
local _, err = cluster_exec_each_master(g, function()
_G.bucket_recovery_pause()
end)
t.assert_equals(err, nil, 'cluster recovery pause')
end

--
-- Enable recovery on all masters.
--
local function cluster_recovery_continue(g)
local _, err = cluster_exec_each_master(g, function()
_G.bucket_recovery_continue()
end)
t.assert_equals(err, nil, 'cluster recovery continue')
end

--
-- Wait vclock sync in each replicaset between all its replicas.
--
Expand Down Expand Up @@ -870,6 +890,8 @@ return {
cluster_bootstrap = cluster_bootstrap,
cluster_rebalancer_disable = cluster_rebalancer_disable,
cluster_rebalancer_enable = cluster_rebalancer_enable,
cluster_recovery_pause = cluster_recovery_pause,
cluster_recovery_continue = cluster_recovery_continue,
cluster_wait_vclock_all = cluster_wait_vclock_all,
cluster_wait_fullsync = cluster_wait_fullsync,
cluster_rebalancer_find = cluster_rebalancer_find,
Expand Down
63 changes: 3 additions & 60 deletions test/rebalancer/bucket_ref.result
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,7 @@ fiber.sleep(0.2)
vshard.storage.buckets_info(1)
---
- 1:
status: active
status: readonly
rw_lock: true
id: 1
ref_rw: 1
Expand All @@ -292,11 +292,8 @@ vshard.storage.buckets_info(1)
id: 1
...
--
-- Rebalancer takes buckets starting from the minimal id. If a
-- bucket with that ID is locked, it should try another. The case
-- makes bucket with minimal ID locked for RW requests. The only
-- function taking the lock is bucket_send, so to test that a
-- manual bucket_send is called before rebalancer.
-- Cancel during bucket_send. In that case all the locks should
-- be freed, obviously.
--
vshard.storage.rebalancer_enable()
---
Expand Down Expand Up @@ -327,12 +324,6 @@ function keep_ref(id) \
end
---
...
fiber_to_ref = fiber.create(keep_ref, 1)
---
...
while vshard.storage.buckets_info(1)[1].ref_rw ~= 1 do fiber.sleep(0.01) end
---
...
-- Now bucket_send on that bucket blocks.
function do_send(id) \
send_result = { \
Expand All @@ -342,54 +333,6 @@ function do_send(id) \
end
---
...
fiber_to_lock = fiber.create(do_send, 1)
---
...
while not vshard.storage.buckets_info(1)[1].rw_lock do fiber.sleep(0.01) end
---
...
cfg.sharding[util.replicasets[1]].weight = 99
---
...
cfg.sharding[util.replicasets[2]].weight = 101
---
...
cfg.rebalancer_disbalance_threshold = 0
---
...
vshard.storage.cfg(cfg, box.info.uuid)
---
...
wait_rebalancer_state('The cluster is balanced ok', test_run)
---
...
-- Cleanup after the test.
keep_lock = false
---
...
while not send_result do fiber.sleep(0.01) end
---
...
send_result
---
- - true
...
cfg.sharding[util.replicasets[1]].weight = nil
---
...
cfg.sharding[util.replicasets[2]].weight = nil
---
...
vshard.storage.cfg(cfg, box.info.uuid)
---
...
wait_rebalancer_state('The cluster is balanced ok', test_run)
---
...
--
-- Cancel during bucket_send. In that case all the locks should
-- be freed, obviously.
--
keep_lock = true
---
...
Expand Down
33 changes: 2 additions & 31 deletions test/rebalancer/bucket_ref.test.lua
Original file line number Diff line number Diff line change
Expand Up @@ -91,11 +91,8 @@ _ = test_run:switch('box_1_a')
vshard.storage.buckets_info(1)

--
-- Rebalancer takes buckets starting from the minimal id. If a
-- bucket with that ID is locked, it should try another. The case
-- makes bucket with minimal ID locked for RW requests. The only
-- function taking the lock is bucket_send, so to test that a
-- manual bucket_send is called before rebalancer.
-- Cancel during bucket_send. In that case all the locks should
-- be freed, obviously.
--
vshard.storage.rebalancer_enable()
_ = test_run:switch('box_2_a')
Expand All @@ -112,39 +109,13 @@ function keep_ref(id) \
end \
vshard.storage.bucket_unrefrw(1) \
end
fiber_to_ref = fiber.create(keep_ref, 1)
while vshard.storage.buckets_info(1)[1].ref_rw ~= 1 do fiber.sleep(0.01) end

-- Now bucket_send on that bucket blocks.
function do_send(id) \
send_result = { \
vshard.storage.bucket_send(id, util.replicasets[2], \
{timeout = 9999999}) \
} \
end
fiber_to_lock = fiber.create(do_send, 1)
while not vshard.storage.buckets_info(1)[1].rw_lock do fiber.sleep(0.01) end


cfg.sharding[util.replicasets[1]].weight = 99
cfg.sharding[util.replicasets[2]].weight = 101
cfg.rebalancer_disbalance_threshold = 0
vshard.storage.cfg(cfg, box.info.uuid)
wait_rebalancer_state('The cluster is balanced ok', test_run)

-- Cleanup after the test.
keep_lock = false
while not send_result do fiber.sleep(0.01) end
send_result
cfg.sharding[util.replicasets[1]].weight = nil
cfg.sharding[util.replicasets[2]].weight = nil
vshard.storage.cfg(cfg, box.info.uuid)
wait_rebalancer_state('The cluster is balanced ok', test_run)

--
-- Cancel during bucket_send. In that case all the locks should
-- be freed, obviously.
--
keep_lock = true
fiber_to_ref = fiber.create(keep_ref, 1)
while vshard.storage.buckets_info(1)[1].ref_rw ~= 1 do fiber.sleep(0.01) end
Expand Down
2 changes: 1 addition & 1 deletion test/rebalancer/rebalancer.result
Original file line number Diff line number Diff line change
Expand Up @@ -360,7 +360,7 @@ test_run:switch('box_1_a')
vshard.storage.rebalancer_disable()
---
...
wait_bucket_is_collected(100)
for i = 91, 100 do wait_bucket_is_collected(i) end
---
...
vshard.storage.bucket_force_create(91, 10)
Expand Down
2 changes: 1 addition & 1 deletion test/rebalancer/rebalancer.test.lua
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ util.map_bucket_protection(test_run, {REPLICASET_1}, true)
--
test_run:switch('box_1_a')
vshard.storage.rebalancer_disable()
wait_bucket_is_collected(100)
for i = 91, 100 do wait_bucket_is_collected(i) end
vshard.storage.bucket_force_create(91, 10)
space = box.space.test
space:replace{1, 91}
Expand Down
4 changes: 2 additions & 2 deletions test/rebalancer/receiving_bucket.result
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ vshard.storage.internal.errinj.ERRINJ_LAST_RECEIVE_DELAY = true
_ = test_run:switch('box_2_a')
---
...
_, err = vshard.storage.bucket_send(101, util.replicasets[1], {timeout = 0.1})
_, err = vshard.storage.bucket_send(101, util.replicasets[1], {timeout = 1})
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My only comment is that it seems not right that we use one timeout for sync + data send. Sync is entirely bottlenecked by the replication and doesn't depend on bucket size. OTOH, the data send can be quite longer, and it does depend on the bucket size.

It seems logical and convenient, if we would add a new option sync_timeout. It would be used to wait for the sync. And the timeout would be used for sending.

Alternatively, to make it more explicit, we could make 3 options sync_timeout + send_timeout + timeout.

  • If sync_timeout and timeout are specified, we calculate send_timeout = timeout - sync_timeout.
  • If send_timeout and timeout are specified, we calculate sync_timeout = timeout - send_timeout.
  • If timeout is not specified, we take it as default, same as now. Then we apply the rules above.
  • If all 3 are specified, then we ignore timeout. Or raise an error.

This will help us to make the sync timeout in tests very high, and send - very small. And the tests can remain fast.

---
...
util.is_timeout_error(err)
Expand Down Expand Up @@ -308,7 +308,7 @@ while f1:status() ~= 'suspended' do fiber.sleep(0.01) end
vshard.storage.buckets_info(1)
---
- 1:
status: active
status: readonly
rw_lock: true
id: 1
ref_rw: 1
Expand Down
2 changes: 1 addition & 1 deletion test/rebalancer/receiving_bucket.test.lua
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ box.space._bucket:get{1}
_ = test_run:switch('box_1_a')
vshard.storage.internal.errinj.ERRINJ_LAST_RECEIVE_DELAY = true
_ = test_run:switch('box_2_a')
_, err = vshard.storage.bucket_send(101, util.replicasets[1], {timeout = 0.1})
_, err = vshard.storage.bucket_send(101, util.replicasets[1], {timeout = 1})
util.is_timeout_error(err)
wait_bucket_is_collected(101)
_ = test_run:switch('box_1_a')
Expand Down
4 changes: 4 additions & 0 deletions test/rebalancer/restart_during_rebalancing.result
Original file line number Diff line number Diff line change
Expand Up @@ -306,6 +306,7 @@ vshard.storage.info().bucket
total: 50
garbage: 0
pinned: 0
readonly: 0
sending: 0
...
check_consistency()
Expand All @@ -323,6 +324,7 @@ vshard.storage.info().bucket
total: 50
garbage: 0
pinned: 0
readonly: 0
sending: 0
...
check_consistency()
Expand All @@ -340,6 +342,7 @@ vshard.storage.info().bucket
total: 50
garbage: 0
pinned: 0
readonly: 0
sending: 0
...
check_consistency()
Expand All @@ -357,6 +360,7 @@ vshard.storage.info().bucket
total: 50
garbage: 0
pinned: 0
readonly: 0
sending: 0
...
check_consistency()
Expand Down
8 changes: 5 additions & 3 deletions test/storage-luatest/auto_master_2_2_2_test.lua
Original file line number Diff line number Diff line change
Expand Up @@ -369,7 +369,7 @@ test_group.test_master_discovery_on_disconnect = function(g)
-- destination's master is disconnected. A master search is
-- triggered then.
local ok, err = ivshard.storage.bucket_send(
bid, rs_uuid, {timeout = 0.01})
bid, rs_uuid, {timeout = 1})
ilt.assert(not ok)
ilt.assert_not_equals(err, nil)
-- Recovery will re-discover the master.
Expand Down Expand Up @@ -411,13 +411,15 @@ test_group.test_master_discovery_on_disconnect = function(g)
g.replica_2_b:update_box_cfg{read_only = false}
promote_if_needed(g, g.replica_2_b)
send_bucket_to_new_master(g.replica_1_a, g.replica_2_b)
-- Can't GC the bucket until the old master is back. But can send it.
g.replica_2_b:exec(bucket_send, {bid, g.replica_1_a:replicaset_uuid()})
-- Can't GC the bucket until the old master is back and can't send it.

-- Restore everything back.
g.replica_2_a:start()
vtest.cluster_cfg(g, global_cfg)
promote_if_needed(g, g.replica_2_a)
-- `replica_2_a` should get the bucket from `replica_2_b` to send it.
g.replica_2_a:wait_for_vclock_of(g.replica_2_b)
g.replica_2_a:exec(bucket_send, {bid, g.replica_1_a:replicaset_uuid()})
g.replica_2_b:exec(bucket_gc_wait)
g.replica_2_b:update_box_cfg{read_only = true}
vtest.cluster_exec_each(g, function()
Expand Down
2 changes: 1 addition & 1 deletion test/storage-luatest/bucket_triggers_test.lua
Original file line number Diff line number Diff line change
Expand Up @@ -493,7 +493,7 @@ test_group.test_bucket_space_reject_bad_replace_on_transition = function(g)
_bucket:replace{bid, ivconst.BUCKET.ACTIVE}
internal.is_bucket_protected = true
-- To be sure that the loops above didn't somehow skip everything.
ilt.assert_equals(count, 42, 'transition count')
ilt.assert_equals(count, 56, 'transition count')
_G.bucket_gc_continue()
end)
rep_b:wait_vclock_of(rep_a)
Expand Down
Loading
Loading