diff --git a/.github/workflows/catch_flaky.yml b/.github/workflows/catch_flaky.yml index 35beca57..0a1f0908 100644 --- a/.github/workflows/catch_flaky.yml +++ b/.github/workflows/catch_flaky.yml @@ -80,7 +80,7 @@ jobs: tt version - name: Setup luatest - run: tt rocks install luatest + run: tt rocks install luatest 1.4.1 - run: cmake . - run: make test-flaky diff --git a/.github/workflows/fast_testing.yml b/.github/workflows/fast_testing.yml index 8be2095a..8fbf1b89 100644 --- a/.github/workflows/fast_testing.yml +++ b/.github/workflows/fast_testing.yml @@ -107,7 +107,7 @@ jobs: tt version - name: Setup luatest - run: tt rocks install luatest + run: tt rocks install luatest 1.4.1 - run: cmake . - run: make test-force diff --git a/.github/workflows/reusable_testing.yml b/.github/workflows/reusable_testing.yml index 2b9eb487..1468c9f3 100644 --- a/.github/workflows/reusable_testing.yml +++ b/.github/workflows/reusable_testing.yml @@ -44,7 +44,7 @@ jobs: tt version - name: Setup luatest - run: tt rocks install luatest + run: tt rocks install luatest 1.4.1 - run: cmake . - run: make test-force diff --git a/test/instances/storage.lua b/test/instances/storage.lua index 301b6512..1671ffd7 100755 --- a/test/instances/storage.lua +++ b/test/instances/storage.lua @@ -146,6 +146,9 @@ local function bucket_recovery_wait() if index_has(status_index, vconst.BUCKET.RECEIVING) then error('Still have RECEIVING buckets') end + if index_has(status_index, vconst.BUCKET.READONLY) then + error('Still have READONLY buckets') + end end) end @@ -163,7 +166,7 @@ end local function bucket_recovery_continue() vshard.storage.internal.errinj.ERRINJ_RECOVERY_PAUSE = false - vshard.storage.garbage_collector_wakeup() + vshard.storage.recovery_wakeup() end local function wal_sync() diff --git a/test/luatest_helpers/vtest.lua b/test/luatest_helpers/vtest.lua index c054e09a..11c652ed 100644 --- a/test/luatest_helpers/vtest.lua +++ b/test/luatest_helpers/vtest.lua @@ -498,6 +498,26 @@ local function cluster_rebalancer_enable(g) t.assert_equals(err, nil, 'rebalancer enable') end +-- +-- Disable recovery on all masters. +-- +local function cluster_recovery_pause(g) + local _, err = cluster_exec_each_master(g, function() + _G.bucket_recovery_pause() + end) + t.assert_equals(err, nil, 'cluster recovery pause') +end + +-- +-- Enable recovery on all masters. +-- +local function cluster_recovery_continue(g) + local _, err = cluster_exec_each_master(g, function() + _G.bucket_recovery_continue() + end) + t.assert_equals(err, nil, 'cluster recovery continue') +end + -- -- Wait vclock sync in each replicaset between all its replicas. -- @@ -870,6 +890,8 @@ return { cluster_bootstrap = cluster_bootstrap, cluster_rebalancer_disable = cluster_rebalancer_disable, cluster_rebalancer_enable = cluster_rebalancer_enable, + cluster_recovery_pause = cluster_recovery_pause, + cluster_recovery_continue = cluster_recovery_continue, cluster_wait_vclock_all = cluster_wait_vclock_all, cluster_wait_fullsync = cluster_wait_fullsync, cluster_rebalancer_find = cluster_rebalancer_find, diff --git a/test/rebalancer/bucket_ref.result b/test/rebalancer/bucket_ref.result index fdc886cb..96a81793 100644 --- a/test/rebalancer/bucket_ref.result +++ b/test/rebalancer/bucket_ref.result @@ -271,7 +271,7 @@ fiber.sleep(0.2) vshard.storage.buckets_info(1) --- - 1: - status: active + status: readonly rw_lock: true id: 1 ref_rw: 1 @@ -292,11 +292,8 @@ vshard.storage.buckets_info(1) id: 1 ... -- --- Rebalancer takes buckets starting from the minimal id. If a --- bucket with that ID is locked, it should try another. The case --- makes bucket with minimal ID locked for RW requests. The only --- function taking the lock is bucket_send, so to test that a --- manual bucket_send is called before rebalancer. +-- Cancel during bucket_send. In that case all the locks should +-- be freed, obviously. -- vshard.storage.rebalancer_enable() --- @@ -327,12 +324,6 @@ function keep_ref(id) \ end --- ... -fiber_to_ref = fiber.create(keep_ref, 1) ---- -... -while vshard.storage.buckets_info(1)[1].ref_rw ~= 1 do fiber.sleep(0.01) end ---- -... -- Now bucket_send on that bucket blocks. function do_send(id) \ send_result = { \ @@ -342,54 +333,6 @@ function do_send(id) \ end --- ... -fiber_to_lock = fiber.create(do_send, 1) ---- -... -while not vshard.storage.buckets_info(1)[1].rw_lock do fiber.sleep(0.01) end ---- -... -cfg.sharding[util.replicasets[1]].weight = 99 ---- -... -cfg.sharding[util.replicasets[2]].weight = 101 ---- -... -cfg.rebalancer_disbalance_threshold = 0 ---- -... -vshard.storage.cfg(cfg, box.info.uuid) ---- -... -wait_rebalancer_state('The cluster is balanced ok', test_run) ---- -... --- Cleanup after the test. -keep_lock = false ---- -... -while not send_result do fiber.sleep(0.01) end ---- -... -send_result ---- -- - true -... -cfg.sharding[util.replicasets[1]].weight = nil ---- -... -cfg.sharding[util.replicasets[2]].weight = nil ---- -... -vshard.storage.cfg(cfg, box.info.uuid) ---- -... -wait_rebalancer_state('The cluster is balanced ok', test_run) ---- -... --- --- Cancel during bucket_send. In that case all the locks should --- be freed, obviously. --- keep_lock = true --- ... diff --git a/test/rebalancer/bucket_ref.test.lua b/test/rebalancer/bucket_ref.test.lua index 27a48dfc..3a5c731f 100644 --- a/test/rebalancer/bucket_ref.test.lua +++ b/test/rebalancer/bucket_ref.test.lua @@ -91,11 +91,8 @@ _ = test_run:switch('box_1_a') vshard.storage.buckets_info(1) -- --- Rebalancer takes buckets starting from the minimal id. If a --- bucket with that ID is locked, it should try another. The case --- makes bucket with minimal ID locked for RW requests. The only --- function taking the lock is bucket_send, so to test that a --- manual bucket_send is called before rebalancer. +-- Cancel during bucket_send. In that case all the locks should +-- be freed, obviously. -- vshard.storage.rebalancer_enable() _ = test_run:switch('box_2_a') @@ -112,9 +109,6 @@ function keep_ref(id) \ end \ vshard.storage.bucket_unrefrw(1) \ end -fiber_to_ref = fiber.create(keep_ref, 1) -while vshard.storage.buckets_info(1)[1].ref_rw ~= 1 do fiber.sleep(0.01) end - -- Now bucket_send on that bucket blocks. function do_send(id) \ send_result = { \ @@ -122,29 +116,6 @@ function do_send(id) \ {timeout = 9999999}) \ } \ end -fiber_to_lock = fiber.create(do_send, 1) -while not vshard.storage.buckets_info(1)[1].rw_lock do fiber.sleep(0.01) end - - -cfg.sharding[util.replicasets[1]].weight = 99 -cfg.sharding[util.replicasets[2]].weight = 101 -cfg.rebalancer_disbalance_threshold = 0 -vshard.storage.cfg(cfg, box.info.uuid) -wait_rebalancer_state('The cluster is balanced ok', test_run) - --- Cleanup after the test. -keep_lock = false -while not send_result do fiber.sleep(0.01) end -send_result -cfg.sharding[util.replicasets[1]].weight = nil -cfg.sharding[util.replicasets[2]].weight = nil -vshard.storage.cfg(cfg, box.info.uuid) -wait_rebalancer_state('The cluster is balanced ok', test_run) - --- --- Cancel during bucket_send. In that case all the locks should --- be freed, obviously. --- keep_lock = true fiber_to_ref = fiber.create(keep_ref, 1) while vshard.storage.buckets_info(1)[1].ref_rw ~= 1 do fiber.sleep(0.01) end diff --git a/test/rebalancer/rebalancer.result b/test/rebalancer/rebalancer.result index 6309ac33..d87c3d06 100644 --- a/test/rebalancer/rebalancer.result +++ b/test/rebalancer/rebalancer.result @@ -360,7 +360,7 @@ test_run:switch('box_1_a') vshard.storage.rebalancer_disable() --- ... -wait_bucket_is_collected(100) +for i = 91, 100 do wait_bucket_is_collected(i) end --- ... vshard.storage.bucket_force_create(91, 10) diff --git a/test/rebalancer/rebalancer.test.lua b/test/rebalancer/rebalancer.test.lua index 0fc893e8..821bb240 100644 --- a/test/rebalancer/rebalancer.test.lua +++ b/test/rebalancer/rebalancer.test.lua @@ -181,7 +181,7 @@ util.map_bucket_protection(test_run, {REPLICASET_1}, true) -- test_run:switch('box_1_a') vshard.storage.rebalancer_disable() -wait_bucket_is_collected(100) +for i = 91, 100 do wait_bucket_is_collected(i) end vshard.storage.bucket_force_create(91, 10) space = box.space.test space:replace{1, 91} diff --git a/test/rebalancer/receiving_bucket.result b/test/rebalancer/receiving_bucket.result index ddae8c31..0f1263de 100644 --- a/test/rebalancer/receiving_bucket.result +++ b/test/rebalancer/receiving_bucket.result @@ -228,7 +228,8 @@ vshard.storage.internal.errinj.ERRINJ_LAST_RECEIVE_DELAY = true _ = test_run:switch('box_2_a') --- ... -_, err = vshard.storage.bucket_send(101, util.replicasets[1], {timeout = 0.1}) +_, err = vshard.storage.bucket_send(101, util.replicasets[1], \ + {chunk_timeout = 0.1}) --- ... util.is_timeout_error(err) @@ -308,7 +309,7 @@ while f1:status() ~= 'suspended' do fiber.sleep(0.01) end vshard.storage.buckets_info(1) --- - 1: - status: active + status: readonly rw_lock: true id: 1 ref_rw: 1 @@ -358,7 +359,7 @@ box.space.test3:select{100} _ = test_run:switch('box_2_a') --- ... -vshard.storage.bucket_send(1, util.replicasets[1], {timeout = 0.3}) +vshard.storage.bucket_send(1, util.replicasets[1], {chunk_timeout = 0.3}) --- - true ... diff --git a/test/rebalancer/receiving_bucket.test.lua b/test/rebalancer/receiving_bucket.test.lua index bfbb01f6..d7e19519 100644 --- a/test/rebalancer/receiving_bucket.test.lua +++ b/test/rebalancer/receiving_bucket.test.lua @@ -94,7 +94,8 @@ box.space._bucket:get{1} _ = test_run:switch('box_1_a') vshard.storage.internal.errinj.ERRINJ_LAST_RECEIVE_DELAY = true _ = test_run:switch('box_2_a') -_, err = vshard.storage.bucket_send(101, util.replicasets[1], {timeout = 0.1}) +_, err = vshard.storage.bucket_send(101, util.replicasets[1], \ + {chunk_timeout = 0.1}) util.is_timeout_error(err) wait_bucket_is_collected(101) _ = test_run:switch('box_1_a') @@ -140,7 +141,7 @@ _ = test_run:switch('box_1_a') box.space.test3:select{100} -- Now the bucket is unreferenced and can be transferred. _ = test_run:switch('box_2_a') -vshard.storage.bucket_send(1, util.replicasets[1], {timeout = 0.3}) +vshard.storage.bucket_send(1, util.replicasets[1], {chunk_timeout = 0.3}) wait_bucket_is_collected(1) vshard.storage.buckets_info(1) _ = test_run:switch('box_1_a') diff --git a/test/rebalancer/restart_during_rebalancing.result b/test/rebalancer/restart_during_rebalancing.result index 33911194..20232aac 100644 --- a/test/rebalancer/restart_during_rebalancing.result +++ b/test/rebalancer/restart_during_rebalancing.result @@ -306,6 +306,7 @@ vshard.storage.info().bucket total: 50 garbage: 0 pinned: 0 + readonly: 0 sending: 0 ... check_consistency() @@ -323,6 +324,7 @@ vshard.storage.info().bucket total: 50 garbage: 0 pinned: 0 + readonly: 0 sending: 0 ... check_consistency() @@ -340,6 +342,7 @@ vshard.storage.info().bucket total: 50 garbage: 0 pinned: 0 + readonly: 0 sending: 0 ... check_consistency() @@ -357,6 +360,7 @@ vshard.storage.info().bucket total: 50 garbage: 0 pinned: 0 + readonly: 0 sending: 0 ... check_consistency() diff --git a/test/storage-luatest/auto_master_2_2_2_test.lua b/test/storage-luatest/auto_master_2_2_2_test.lua index 051131ae..b1588d59 100644 --- a/test/storage-luatest/auto_master_2_2_2_test.lua +++ b/test/storage-luatest/auto_master_2_2_2_test.lua @@ -369,7 +369,7 @@ test_group.test_master_discovery_on_disconnect = function(g) -- destination's master is disconnected. A master search is -- triggered then. local ok, err = ivshard.storage.bucket_send( - bid, rs_uuid, {timeout = 0.01}) + bid, rs_uuid, {chunk_timeout = 0.01}) ilt.assert(not ok) ilt.assert_not_equals(err, nil) -- Recovery will re-discover the master. @@ -411,13 +411,15 @@ test_group.test_master_discovery_on_disconnect = function(g) g.replica_2_b:update_box_cfg{read_only = false} promote_if_needed(g, g.replica_2_b) send_bucket_to_new_master(g.replica_1_a, g.replica_2_b) - -- Can't GC the bucket until the old master is back. But can send it. - g.replica_2_b:exec(bucket_send, {bid, g.replica_1_a:replicaset_uuid()}) + -- Can't GC the bucket until the old master is back and can't send it. -- Restore everything back. g.replica_2_a:start() vtest.cluster_cfg(g, global_cfg) promote_if_needed(g, g.replica_2_a) + -- `replica_2_a` should get the bucket from `replica_2_b` to send it. + g.replica_2_a:wait_for_vclock_of(g.replica_2_b) + g.replica_2_a:exec(bucket_send, {bid, g.replica_1_a:replicaset_uuid()}) g.replica_2_b:exec(bucket_gc_wait) g.replica_2_b:update_box_cfg{read_only = true} vtest.cluster_exec_each(g, function() diff --git a/test/storage-luatest/bucket_triggers_test.lua b/test/storage-luatest/bucket_triggers_test.lua index d611fa53..045803c9 100644 --- a/test/storage-luatest/bucket_triggers_test.lua +++ b/test/storage-luatest/bucket_triggers_test.lua @@ -493,7 +493,7 @@ test_group.test_bucket_space_reject_bad_replace_on_transition = function(g) _bucket:replace{bid, ivconst.BUCKET.ACTIVE} internal.is_bucket_protected = true -- To be sure that the loops above didn't somehow skip everything. - ilt.assert_equals(count, 42, 'transition count') + ilt.assert_equals(count, 56, 'transition count') _G.bucket_gc_continue() end) rep_b:wait_vclock_of(rep_a) diff --git a/test/storage-luatest/rebalancer_test.lua b/test/storage-luatest/rebalancer_test.lua index 03141781..a0e84e1e 100644 --- a/test/storage-luatest/rebalancer_test.lua +++ b/test/storage-luatest/rebalancer_test.lua @@ -1,7 +1,14 @@ local t = require('luatest') local vtest = require('test.luatest_helpers.vtest') +local vutil = require('vshard.util') -local test_group = t.group('storage') +local group_config = {{memtx_use_mvcc_mvcc = false}} + +if vutil.feature.memtx_mvcc then + table.insert(group_config, {memtx_use_mvcc_engine = true}) +end + +local test_group = t.group('storage', group_config) local cfg_template = { sharding = { @@ -56,6 +63,7 @@ local function wait_rebalancer_on_instance(g, instance_name) end test_group.before_all(function(g) + cfg_template.memtx_use_mvcc_engine = g.params.memtx_use_mvcc_engine global_cfg = vtest.config_new(cfg_template) vtest.cluster_new(g, global_cfg) @@ -70,6 +78,17 @@ test_group.after_all(function(g) g.cluster:drop() end) +local function wait_n_buckets(storage, count) + t.helpers.retrying({timeout = vtest.wait_timeout}, storage.exec, + storage, function(count) + ivshard.storage.rebalancer_wakeup() + local _status = box.space._bucket.index.status + if _status:count({ivconst.BUCKET.ACTIVE}) ~= count then + error('Wrong bucket count') + end + end, {count}) +end + test_group.test_rebalancer_in_work = function(g) local new_cfg_template = table.deepcopy(cfg_template) new_cfg_template.sharding[1].weight = 0 @@ -77,15 +96,6 @@ test_group.test_rebalancer_in_work = function(g) local new_global_cfg = vtest.config_new(new_cfg_template) vtest.cluster_cfg(g, new_global_cfg) vtest.cluster_rebalancer_enable(g) - local function wait_n_buckets(storage, count) - t.helpers.retrying({timeout = vtest.wait_timeout}, storage.exec, - storage, function(count) - local _status = box.space._bucket.index.status - if _status:count({ivconst.BUCKET.ACTIVE}) ~= count then - error('Wrong bucket count') - end - end, {count}) - end wait_n_buckets(g.replica_1_a, 0) wait_n_buckets(g.replica_2_a, 0) wait_n_buckets(g.replica_3_a, cfg_template.bucket_count) @@ -293,3 +303,348 @@ test_group.test_rebalancer_mode = function(g) vtest.cluster_cfg(g, global_cfg) wait_rebalancer_on_instance(g, 'replica_1_a') end + +-- +-- gh-573: test the behavior of the READONLY bucket. +-- +test_group.test_readonly_recovery = function(g) + g.replica_1_a:exec(function(uuid) + local bid = _G.get_first_bucket() + ilt.assert(ivshard.storage.bucket_refrw(bid)) + -- Unsuccessful send. + local _, err = ivshard.storage.bucket_send(bid, uuid, {timeout = 0.01}) + ilt.assert(iverror.is_timeout(err)) + local bucket = ivshard.storage.bucket_stat(bid) + ilt.assert_not(bucket.is_transfering) + ilt.assert_equals(bucket.status, ivconst.BUCKET.READONLY) + -- Impossible to pin the READONLY bucket. + ilt.assert_not(ivshard.storage.bucket_pin(bid)) + -- Recovery restores the bucket to ACTIVE. + ilt.helpers.retrying({timeout = iwait_timeout}, function() + ivshard.storage.recovery_wakeup() + local b = ivshard.storage.bucket_stat(bid) + ilt.assert_equals(b.status, ivconst.BUCKET.ACTIVE) + end) + ilt.assert(ivshard.storage.bucket_unrefrw(bid)) + -- It's impossible to prepare PINNED bucket for sending. + ilt.assert(ivshard.storage.bucket_pin(bid)) + _, err = ivshard.storage.bucket_send(bid, uuid, {timeout = 0.1}) + ilt.assert_equals(err.code, iverror.code.BUCKET_IS_PINNED) + ilt.assert(ivshard.storage.bucket_unpin(bid)) + end, {g.replica_2_a:replicaset_uuid()}) +end + +-- +-- gh-573: test that pinning bucket while it's being sent is safe with MVCC. +-- +test_group.test_pinning_before_readonly = function(g) + t.run_only_if(global_cfg.memtx_use_mvcc_engine) + g.replica_1_a:exec(function(uuid) + local bid = _G.get_first_bucket() + local function test_readonly_with_pinning_template(state, first, second) + local ok, err, fiber + local is_waiting = false + local is_yield_replace = true + local function yield_on_state(_, new_bucket) + if new_bucket.status == state then + is_waiting = true + while is_yield_replace do + ifiber.sleep(0.01) + end + end + end + box.space._bucket:on_replace(yield_on_state) + fiber = ifiber.create(function() + ifiber.self():set_joinable(true) + ok, err = second() + end) + ilt.helpers.retrying({timeout = iwait_timeout}, function() + t.assert(is_waiting) + end) + ilt.assert(first()) + is_yield_replace = false + ilt.assert(fiber:join(iwait_timeout)) + ilt.assert_not(ok) + ilt.assert_equals(err.code, box.error.TRANSACTION_CONFLICT) + box.space._bucket:on_replace(nil, yield_on_state) + _G.bucket_recovery_wait() + _G.bucket_gc_wait() + end + -- PINNED before READONLY. + test_readonly_with_pinning_template(ivconst.BUCKET.READONLY, + function() return pcall(ivshard.storage.bucket_pin, bid) end, + function() return ivshard.storage.bucket_send(bid, uuid) end) + ivshard.storage.bucket_unpin(bid) + -- Manual PINNED before READONLY. + test_readonly_with_pinning_template(ivconst.BUCKET.READONLY, + function() + local bucket = box.space._bucket + local tuple = {bid, ivconst.BUCKET.PINNED} + return pcall(bucket.replace, bucket, tuple) + end, + function() return ivshard.storage.bucket_send(bid, uuid) end) + ivshard.storage.bucket_unpin(bid) + -- READONLY before PINNED. + test_readonly_with_pinning_template(ivconst.BUCKET.PINNED, + function() return ivshard.storage.bucket_send(bid, uuid) end, + function() return pcall(ivshard.storage.bucket_pin, bid) end) + end, {g.replica_2_a:replicaset_uuid()}) + g.replica_2_a:exec(function(uuid) + ilt.assert(ivshard.storage.bucket_send(_G.get_first_bucket(), uuid)) + end, {g.replica_1_a:replicaset_uuid()}) +end + +-- +-- Test, that the worker sends buckets in batches. Sending more buckets, than +-- the storage has leads to error. After error during preparation all buckets +-- can be recovered. +-- +test_group.test_send_more_buckets_than_has = function(g) + local new_global_cfg = table.deepcopy(global_cfg) + new_global_cfg.rebalancer_max_sending = 6 + -- Otherwise the worker will try to find the buckets for 15 minutes. + new_global_cfg.rebalancer_bucket_send_timeout = 1 + vtest.cluster_cfg(g, new_global_cfg) + + vtest.cluster_recovery_pause(g) + -- Start sending of 11 buckets, when replica has only 10. + g.replica_1_a:exec(function(uuid) + ilt.assert_equals(box.space._bucket:count(), 10) + -- Returns ok no matter what. + ilt.assert(ivshard.storage.rebalancer_apply_routes({[uuid] = 11})) + end, {g.replica_2_a:replicaset_uuid()}) + -- Wait for error message to happen. + t.helpers.retrying({timeout = vtest.iwait_timeout}, function() + t.assert(g.replica_1_a:grep_log('Can not find 5 active buckets')) + end) + -- Since recovery is disabled all non-sent buckets remain in READONLY state. + g.replica_1_a:exec(function() + ilt.helpers.retrying({timeout = iwait_timeout}, function() + ivshard.storage.garbage_collector_wakeup() + ilt.assert_equals(box.space._bucket:count(), 4) + end) + local transfer_flags = + ivshard.storage.internal.rebalancer_transfering_buckets + for _, b in box.space._bucket:pairs() do + ilt.assert_equals(b.status, ivconst.BUCKET.READONLY) + ilt.assert_not(transfer_flags[b.id]) + end + end) + -- Recovery restores the READONLY buckets back. + vtest.cluster_recovery_continue(g) + g.replica_1_a:exec(function() + local active_key = {ivconst.BUCKET.ACTIVE} + local _status = box.space._bucket.index.status + ilt.helpers.retrying({timeout = iwait_timeout}, function() + ilt.assert_equals(_status:count(active_key), 4) + ivshard.storage.recovery_wakeup() + end) + end) + + -- Restore the cluster with rebalancer. + vtest.cluster_rebalancer_enable(g) + wait_n_buckets(g.replica_1_a, cfg_template.bucket_count / 3) + wait_n_buckets(g.replica_2_a, cfg_template.bucket_count / 3) + wait_n_buckets(g.replica_3_a, cfg_template.bucket_count / 3) + vtest.cluster_rebalancer_disable(g) + vtest.cluster_cfg(g, global_cfg) + vtest.cluster_exec_each_master(g, function() + _G.bucket_gc_wait() + end) +end + +-- +-- Test, that the worker doesn't exit and prepares new buckets, if +-- it's woken up and none of the buckets are available. +-- +test_group.test_late_worker_wakeup_prepare = function(g) + local new_global_cfg = table.deepcopy(global_cfg) + new_global_cfg.rebalancer_max_sending = 3 + vtest.cluster_cfg(g, new_global_cfg) + + g.replica_1_a:exec(function(uuid) + ivshard.storage.internal.errinj.ERRINJ_LAST_SEND_DELAY_COUNTDOWN = 1 + local errinj = ivshard.storage.internal.errinj + local f_name = 'vshard.rebalancer_worker_3' + errinj.ERRINJ_WORKER_PREPARE_WAKEUP_DELAY[f_name] = true + ilt.assert(ivshard.storage.rebalancer_apply_routes({[uuid] = 4})) + local sending_key = {ivconst.BUCKET.SENDING} + local _status = box.space._bucket.index.status + ilt.helpers.retrying({timeout = iwait_timeout}, function() + -- Alive workers prepare buckets, send 1 successfully, + -- hang on 2 remaining prepared buckets. + ilt.assert_equals(_status:count(sending_key), 2) + end) + -- Time to wakeup the slow worker. + errinj.ERRINJ_WORKER_PREPARE_WAKEUP_DELAY[f_name] = nil + ilt.helpers.retrying({timeout = iwait_timeout}, function() + -- Now the final worker prepares and sends the last bucket. + ilt.assert_equals(_status:count(sending_key), 3) + end) + ivshard.storage.internal.errinj.ERRINJ_LAST_SEND_DELAY = false + end, {g.replica_2_a:replicaset_uuid()}) + wait_n_buckets(g.replica_1_a, cfg_template.bucket_count / 3 - 4) + + vtest.cluster_rebalancer_enable(g) + wait_n_buckets(g.replica_1_a, cfg_template.bucket_count / 3) + wait_n_buckets(g.replica_2_a, cfg_template.bucket_count / 3) + wait_n_buckets(g.replica_3_a, cfg_template.bucket_count / 3) + vtest.cluster_rebalancer_disable(g) + vtest.cluster_cfg(g, global_cfg) +end + +-- +-- gh-351: rebalancer should prefer buckets without rw refs. +-- +test_group.test_buckets_with_no_refs_are_preferred = function(g) + vtest.cluster_rebalancer_enable(g) + -- Make rw "requests" to all buckets, except the single one. + local bid = g.replica_1_a:exec(function() + -- The maximum available bucket id is not refed. + local idx = box.space._bucket.index.status + local opts = {limit = 1, iterator = 'LE'} + local bid = idx:select(ivconst.BUCKET.ACTIVE, opts)[1].id + for _, b in box.space._bucket:pairs() do + if b.id ~= bid then + ivshard.storage.bucket_refrw(b.id) + end + end + return bid + end) + + -- Move one bucket, it must be the one without refs. + local new_cfg_template = table.deepcopy(cfg_template) + new_cfg_template.sharding[1].weight = cfg_template.bucket_count / 3 - 1 + new_cfg_template.sharding[2].weight = cfg_template.bucket_count / 3 + 1 + new_cfg_template.sharding[3].weight = cfg_template.bucket_count / 3 + local new_global_cfg = vtest.config_new(new_cfg_template) + vtest.cluster_cfg(g, new_global_cfg) + wait_n_buckets(g.replica_1_a, 9) + wait_n_buckets(g.replica_2_a, 11) + g.replica_2_a:exec(function(bid) + local status = ivshard.storage.bucket_stat(bid).status + ilt.assert_equals(status, ivconst.BUCKET.ACTIVE) + end, {bid}) + + -- Even with all bucket refed we still try to send them. + new_cfg_template.sharding[1].weight = 0 + new_global_cfg = vtest.config_new(new_cfg_template) + vtest.cluster_cfg(g, new_global_cfg) + g.replica_1_a:exec(function(bid) + -- Wait for routes applier start and picking a buckets. + local internal = ivshard.storage.internal + local applier_name = 'routes_applier_service' + ivtest.wait_for_not_nil(internal, applier_name, + {timeout = iwait_timeout, + on_yield = ivshard.storage.rebalancer_wakeup}) + local service = internal[applier_name] + ivtest.service_wait_for_activity(service, 'applying routes') + -- End rw "requests". + for _, b in box.space._bucket:pairs() do + if b.id ~= bid then + ivshard.storage.bucket_unrefrw(b.id) + end + end + end, {bid}) + wait_n_buckets(g.replica_2_a, cfg_template.bucket_count / 2) + wait_n_buckets(g.replica_3_a, cfg_template.bucket_count / 2) + + vtest.cluster_cfg(g, global_cfg) + wait_n_buckets(g.replica_1_a, cfg_template.bucket_count / 3) + wait_n_buckets(g.replica_2_a, cfg_template.bucket_count / 3) + wait_n_buckets(g.replica_3_a, cfg_template.bucket_count / 3) + vtest.cluster_rebalancer_disable(g) + vtest.cluster_exec_each_master(g, function() + _G.bucket_gc_wait() + end) +end + +-- +-- gh-573: rebalancer should not make bucket SENDING before it checks, +-- that all replicas doesn't have RW refs. Checks the sync part. +-- +test_group.test_bucket_cannot_be_sent_with_down_replica = function(g) + local new_cfg_template = table.deepcopy(cfg_template) + new_cfg_template.sharding[1].weight = cfg_template.bucket_count / 3 - 1 + new_cfg_template.sharding[2].weight = cfg_template.bucket_count / 3 + 1 + new_cfg_template.sharding[3].weight = cfg_template.bucket_count / 3 + new_cfg_template.rebalancer_bucket_send_timeout = 0.1 + local new_global_cfg = vtest.config_new(new_cfg_template) + vtest.cluster_cfg(g, new_global_cfg) + g.replica_1_b:update_box_cfg({replication = {}}) + vtest.cluster_rebalancer_enable(g) + + -- Sending with rebalancer. + t.helpers.retrying({timeout = vtest.iwait_timeout}, function() + g.replica_1_a:exec(function() ivshard.storage.rebalancer_wakeup() end) + t.assert(g.replica_1_a:grep_log('could not sync with replicas')) + end) + vtest.cluster_rebalancer_disable(g) + g.replica_1_a:exec(function() + _G.bucket_recovery_wait() + end) + + -- Manual bucket send. + g.replica_1_a:exec(function(uuid) + local bid = _G.get_first_bucket() + local ok, err = ivshard.storage.bucket_send(bid, uuid, + {sync_timeout = 0.01}) + ilt.assert_not(ok) + ilt.assert(err) + ilt.assert_str_contains(err.reason, 'could not sync with replicas') + ilt.assert(iverror.is_timeout(err)) + end, {g.replica_2_a:replicaset_uuid()}) + + vtest.cluster_cfg(g, global_cfg) + g.replica_1_a:exec(function() + _G.bucket_recovery_wait() + end) +end + +-- +-- gh-573: rebalancer should not make bucket SENDING before it checks, +-- that all replicas doesn't have RW refs. Checks the ref part. +-- +test_group.test_replication_not_broken_when_ref_on_replica = function(g) + local bid = g.replica_1_a:exec(function() + local bid = _G.get_first_bucket() + ivshard.storage.bucket_refrw(bid) + return bid + end) + local new_cfg_template = table.deepcopy(cfg_template) + new_cfg_template.sharding[1].replicas.replica_1_b.read_only = false + new_cfg_template.sharding[1].replicas.replica_1_a.read_only = true + new_cfg_template.sharding[1].weight = 0 + new_cfg_template.rebalancer_bucket_send_timeout = 1 + local new_global_cfg = vtest.config_new(new_cfg_template) + vtest.cluster_cfg(g, new_global_cfg) + vtest.cluster_rebalancer_enable(g) + + t.helpers.retrying({timeout = vtest.iwait_timeout}, function() + t.assert(g.replica_1_b:grep_log('failed waiting for no RW refs')) + g.replica_1_b:exec(function() ivshard.storage.rebalancer_wakeup() end) + end) + g.replica_1_a:assert_follows_upstream(g.replica_1_b:instance_id()) + + vtest.cluster_rebalancer_disable(g) + g.replica_1_b:exec(function(bid, uuid) + _G.bucket_recovery_wait() + ilt.helpers.retrying({timeout = _G.iwait_timeout}, function() + t.assert_not(ivshard.storage.rebalancing_is_in_progress()) + end) + + -- Test manual bucket send behaves the same. + local ok, err = ivshard.storage.bucket_send(bid, uuid, + {sync_timeout = 1}) + ilt.assert_not(ok) + ilt.assert(err and err.prev) + ilt.assert_equals(err.code, iverror.code.WRONG_BUCKET) + ilt.assert(iverror.is_timeout(err.prev)) + _G.bucket_recovery_wait() + end, {bid, g.replica_2_a:replicaset_uuid()}) + + vtest.cluster_cfg(g, global_cfg) + g.replica_1_a:exec(function(bid) + ivshard.storage.bucket_unrefrw(bid) + end, {bid}) +end diff --git a/test/storage-luatest/service_info_test.lua b/test/storage-luatest/service_info_test.lua index 5fb7b694..7862f051 100644 --- a/test/storage-luatest/service_info_test.lua +++ b/test/storage-luatest/service_info_test.lua @@ -55,6 +55,11 @@ end) -- and work properly (gh-107). -- test_group.test_basic_storage_service_info = function(g) + local new_global_cfg = table.deepcopy(global_cfg) + -- Break timeout in order to get error. + new_global_cfg.rebalancer_bucket_send_timeout = 1e-6 + vtest.cluster_cfg(g, new_global_cfg) + local uuid = g.replica_1_a:exec(function() -- Test that all services save states local info = ivshard.storage.info({with_services = true}) @@ -68,9 +73,6 @@ test_group.test_basic_storage_service_info = function(g) -- Forbid routes_apply service to die local internal = ivshard.storage.internal internal.errinj.ERRINJ_APPLY_ROUTES_STOP_DELAY = true - -- Break timeout in order to get error - rawset(_G, 'chunk_timeout', ivconst.REBALANCER_CHUNK_TIMEOUT) - ivconst.REBALANCER_CHUNK_TIMEOUT = 1e-6 return ivutil.replicaset_uuid() end) @@ -91,9 +93,13 @@ test_group.test_basic_storage_service_info = function(g) ivtest.wait_for_not_nil(internal, applier_name) local service = internal[applier_name] ivtest.service_wait_for_error(service, 'Timed?[Oo]ut') + end) + vtest.cluster_cfg(g, global_cfg) + g.replica_1_a:exec(function() -- Restore everything - ivconst.REBALANCER_CHUNK_TIMEOUT = _G.chunk_timeout + local internal = ivshard.storage.internal + local applier_name = 'routes_applier_service' internal.errinj.ERRINJ_APPLY_ROUTES_STOP_DELAY = false ivtest.wait_for_nil(internal, applier_name) internal.errinj.ERRINJ_APPLY_ROUTES_STOP_DELAY = true @@ -101,7 +107,7 @@ test_group.test_basic_storage_service_info = function(g) -- All buckets must be recovered to the ACTIVE state, -- otherwise rebalancer won't work. ivshard.storage.recovery_wakeup() - service = ivshard.storage.internal.recovery_service + local service = ivshard.storage.internal.recovery_service ivtest.service_wait_for_new_ok(service) end) diff --git a/test/storage-luatest/storage_1_1_test.lua b/test/storage-luatest/storage_1_1_test.lua index ede7679a..9db09da5 100644 --- a/test/storage-luatest/storage_1_1_test.lua +++ b/test/storage-luatest/storage_1_1_test.lua @@ -638,3 +638,83 @@ test_group.test_on_replace_trigger_on_bucket_space = function(g) _G.on_replace_trigger = nil end) end + +test_group.test_bucket_send_timeout = function(g) + g.replica_2_a:exec(function() + -- Make sure the bucket will not be delivered even if somehow the tiny + -- send-timeout appeared to be enough to send it. Just for the + -- simplicity of the test. + ivshard.storage.internal.errinj.ERRINJ_RECEIVE_PARTIALLY = true + end) + g.replica_1_a:exec(function(uuid) + local json = require('json') + local tests = { + { + -- Test, that with small ref_timeout the sending ends, + -- if there're active refs on that bucket. + prepare = function(b) ivshard.storage.bucket_refrw(b) end, + finish = function(b) ivshard.storage.bucket_unrefrw(b) end, + opts = { + timeout = ivconst.TIMEOUT_INFINITY, + ref_timeout = 1e-6, + }, + }, + { + -- Same with small timeout but big ref_timeout. Min is used. + prepare = function(b) ivshard.storage.bucket_refrw(b) end, + finish = function(b) ivshard.storage.bucket_unrefrw(b) end, + opts = { + timeout = 1e-6, + ref_timeout = ivconst.TIMEOUT_INFINITY, + }, + }, + { + opts = { + -- Test, that small chunk_timeout leads to error. It + -- doesn't make sense to test small timeout and big + -- chunk_timeout, since error will happen on ref part in + -- such case. + timeout = ivconst.TIMEOUT_INFINITY, + chunk_timeout = 1e-6, + } + }, + { + -- Test that timeout option limits chunk sending. + opts = { + timeout = 1e-6, + chunk_timeout = ivconst.DEFAULT_BUCKET_CHUNK_TIMEOUT, + } + }, + { + -- Ref fails before send happens. + prepare = function(b) ivshard.storage.bucket_refrw(b) end, + finish = function(b) ivshard.storage.bucket_unrefrw(b) end, + opts = { + ref_timeout = 1e-6, + chunk_timeout = ivconst.DEFAULT_BUCKET_CHUNK_TIMEOUT, + } + }, + { + -- Ref succeeds, send fails. + opts = { + ref_timeout = ivconst.DEFAULT_BUCKET_REF_TIMEOUT, + chunk_timeout = 1e-6, + } + }, + } + local bid = _G.get_first_bucket() + for _, test in ipairs(tests) do + if test.prepare then test.prepare(bid) end + local ok, err = ivshard.storage.bucket_send(bid, uuid, test.opts) + ilt.assert_not(ok, test.opts) + ilt.assert(iverror.is_timeout(err), ('%s - %s'):format( + json.encode(test.opts), json.encode(err))) + if test.finish then test.finish(bid) end + _G.bucket_recovery_wait() + end + end, {g.replica_2_a:replicaset_uuid()}) + g.replica_2_a:exec(function() + ivshard.storage.internal.errinj.ERRINJ_RECEIVE_PARTIALLY = false + _G.bucket_recovery_wait() + end) +end diff --git a/test/storage-luatest/storage_1_test.lua b/test/storage-luatest/storage_1_test.lua index 4d671de7..5bbb7465 100644 --- a/test/storage-luatest/storage_1_test.lua +++ b/test/storage-luatest/storage_1_test.lua @@ -476,6 +476,7 @@ test_group.test_moved_buckets_various_statuses = function(g) ilt.assert_equals(ivconst.BUCKET, { ACTIVE = 'active', PINNED = 'pinned', + READONLY = 'readonly', SENDING = 'sending', SENT = 'sent', RECEIVING = 'receiving', @@ -485,7 +486,7 @@ test_group.test_moved_buckets_various_statuses = function(g) _G.bucket_gc_pause() local luuid = require('uuid') -- +1 to delete and make it a 404 bucket. - local bids = _G.get_n_buckets(7) + local bids = _G.get_n_buckets(8) -- ACTIVE = bids[1]. -- @@ -497,12 +498,16 @@ test_group.test_moved_buckets_various_statuses = function(g) -- SENDING = bids[3]. local bid_sending = bids[3] local id_sending = luuid.str() + _bucket:update({bid_sending}, + {{'=', 2, ivconst.BUCKET.READONLY}}) _bucket:update({bid_sending}, {{'=', 2, ivconst.BUCKET.SENDING}, {'=', 3, id_sending}}) -- SENT = bids[4]. local bid_sent = bids[4] local id_sent = luuid.str() + _bucket:update({bid_sent}, + {{'=', 2, ivconst.BUCKET.READONLY}}) _bucket:update({bid_sent}, {{'=', 2, ivconst.BUCKET.SENDING}, {'=', 3, id_sent}}) _bucket:update({bid_sent}, @@ -511,6 +516,8 @@ test_group.test_moved_buckets_various_statuses = function(g) -- RECEIVING = bids[5]. local bid_receiving = bids[5] local id_receiving = luuid.str() + _bucket:update({bid_receiving}, + {{'=', 2, ivconst.BUCKET.READONLY}}) _bucket:update({bid_receiving}, {{'=', 2, ivconst.BUCKET.SENDING}}) _bucket:update({bid_receiving}, @@ -523,6 +530,8 @@ test_group.test_moved_buckets_various_statuses = function(g) -- GARBAGE = bids[6]. local bid_garbage = bids[6] local id_garbage = luuid.str() + _bucket:update({bid_garbage}, + {{'=', 2, ivconst.BUCKET.READONLY}}) _bucket:update({bid_garbage}, {{'=', 2, ivconst.BUCKET.SENDING}, {'=', 3, id_garbage}}) _bucket:update({bid_garbage}, @@ -532,6 +541,8 @@ test_group.test_moved_buckets_various_statuses = function(g) -- NOT EXISTING = bids[7]. local bid_404 = bids[7] + _bucket:update({bid_404}, + {{'=', 2, ivconst.BUCKET.READONLY}}) _bucket:update({bid_404}, {{'=', 2, ivconst.BUCKET.SENDING}}) _bucket:update({bid_404}, @@ -540,6 +551,11 @@ test_group.test_moved_buckets_various_statuses = function(g) {{'=', 2, ivconst.BUCKET.GARBAGE}}) _bucket:delete({bid_404}) + -- READONLY = bids[8]. + local bid_readonly = bids[8] + _bucket:update({bid_readonly}, + {{'=', 2, ivconst.BUCKET.READONLY}}) + local moved = ivshard.storage.internal.bucket_get_moved(bids) ilt.assert_items_equals(moved, { { @@ -584,6 +600,9 @@ test_group.test_moved_buckets_various_statuses = function(g) -- PINNED. _bucket:update({bid_pinned}, {{'=', 2, ivconst.BUCKET.ACTIVE}}) + -- READONLY. + _bucket:update({bid_readonly}, + {{'=', 2, ivconst.BUCKET.ACTIVE}}) _G.bucket_recovery_continue() _G.bucket_gc_continue() diff --git a/test/storage/recovery_errinj.result b/test/storage/recovery_errinj.result index 03c1e5d2..f0da359e 100644 --- a/test/storage/recovery_errinj.result +++ b/test/storage/recovery_errinj.result @@ -72,7 +72,8 @@ util.map_bucket_protection(test_run, {REPLICASET_1}, true) _ = test_run:switch('storage_1_a') --- ... -ret, err = vshard.storage.bucket_send(1, util.replicasets[2], {timeout = 0.1}) +ret, err = vshard.storage.bucket_send(1, util.replicasets[2], \ + {chunk_timeout = 0.1}) --- ... ret, util.is_timeout_error(err) diff --git a/test/storage/recovery_errinj.test.lua b/test/storage/recovery_errinj.test.lua index d154db4f..b90211af 100644 --- a/test/storage/recovery_errinj.test.lua +++ b/test/storage/recovery_errinj.test.lua @@ -31,7 +31,8 @@ _ = test_run:switch('default') util.map_bucket_protection(test_run, {REPLICASET_1}, true) _ = test_run:switch('storage_1_a') -ret, err = vshard.storage.bucket_send(1, util.replicasets[2], {timeout = 0.1}) +ret, err = vshard.storage.bucket_send(1, util.replicasets[2], \ + {chunk_timeout = 0.1}) ret, util.is_timeout_error(err) _bucket = box.space._bucket wait_bucket_is_collected(1) diff --git a/test/storage/ref.result b/test/storage/ref.result index d53176bd..5ad14856 100644 --- a/test/storage/ref.result +++ b/test/storage/ref.result @@ -123,7 +123,7 @@ timeout = 0.01 | --- | ... ok, err = vshard.storage.bucket_send(1501, util.replicasets[1], \ - {timeout = timeout}) + {chunk_timeout = timeout}) | --- | ... assert(not ok and util.is_timeout_error(err)) diff --git a/test/storage/ref.test.lua b/test/storage/ref.test.lua index 76c67a33..cbff8567 100644 --- a/test/storage/ref.test.lua +++ b/test/storage/ref.test.lua @@ -52,7 +52,7 @@ _ = test_run:switch('storage_2_a') big_timeout = 1000000 timeout = 0.01 ok, err = vshard.storage.bucket_send(1501, util.replicasets[1], \ - {timeout = timeout}) + {chunk_timeout = timeout}) assert(not ok and util.is_timeout_error(err)) -- diff --git a/test/storage/storage.result b/test/storage/storage.result index c8dbbb21..1f3ad9ce 100644 --- a/test/storage/storage.result +++ b/test/storage/storage.result @@ -149,6 +149,7 @@ vshard.storage.info() total: 0 garbage: 0 pinned: 0 + readonly: 0 sending: 0 uri: storage@127.0.0.1:3301 identification_mode: uuid_as_key @@ -273,6 +274,7 @@ vshard.storage.info() total: 2 garbage: 0 pinned: 0 + readonly: 0 sending: 0 uri: storage@127.0.0.1:3304 identification_mode: uuid_as_key @@ -336,6 +338,7 @@ info total: 2 garbage: 0 pinned: 0 + readonly: 0 sending: 0 uri: storage@127.0.0.1:3303 identification_mode: uuid_as_key @@ -369,6 +372,7 @@ vshard.storage.info() total: 2 garbage: 0 pinned: 0 + readonly: 0 sending: 0 uri: storage@127.0.0.1:3303 identification_mode: uuid_as_key @@ -403,6 +407,7 @@ vshard.storage.info() total: 2 garbage: 0 pinned: 0 + readonly: 0 sending: 0 uri: storage@127.0.0.1:3304 identification_mode: uuid_as_key @@ -437,6 +442,7 @@ vshard.storage.info() total: 2 garbage: 0 pinned: 0 + readonly: 0 sending: 0 uri: storage@127.0.0.1:3303 identification_mode: uuid_as_key diff --git a/test/unit-luatest/dispenser_test.lua b/test/unit-luatest/dispenser_test.lua new file mode 100644 index 00000000..7ab007d9 --- /dev/null +++ b/test/unit-luatest/dispenser_test.lua @@ -0,0 +1,146 @@ +local t = require('luatest') +local test_group = t.group('dispenser-unit') +local route_dispenser = require('vshard.storage.route_dispenser') +local vconsts = require('vshard.consts') +local fiber = require('fiber') + +local function create_dispenser_with_buckets(routes, count) + local d = route_dispenser.new(routes) + d:prepare_begin() + local buckets = {} + for i = 1, count do + table.insert(buckets, i) + end + d:prepare_commit(buckets) + return d +end + +-- +-- gh-161: parallel rebalancer. One of the most important part of +-- the latter is a dispenser. It is a structure which hands out +-- destination UUIDs in a round-robin manner to worker fibers. +-- +test_group.test_dispenser_basic = function() + local d = create_dispenser_with_buckets({uuid = 15}, 15) + for _ = 1, 15 do + t.assert_equals(d:pop(), 'uuid') + end + for _ = 1, 3 do + t.assert_equals(d.rlist.count, 0) + t.assert_equals(d.remaining_count, 0) + t.assert_equals(d.map.uuid.bucket_count, 0) + t.assert_equals(d.map.uuid.progress, 0) + t.assert_equals(d.map.uuid.need_to_send, 15) + t.assert_equals(d:pop(), nil) + end + + -- Test throttle. + d = create_dispenser_with_buckets({uuid1 = 5, uuid2 = 5}, 10) + local uuid, bid = d:pop() + t.assert_equals(uuid, 'uuid2') + t.assert_equals(bid, 10) + t.assert_equals(d.remaining_count, 9) + t.assert_equals(d.rlist.last.bucket_count, 4) + t.assert_equals(d.rlist.first.bucket_count, 5) + d:put(uuid) + t.assert_equals(d.remaining_count, 10) + t.assert_equals(d.rlist.last.bucket_count, 5) + t.assert(d:throttle(uuid)) + t.assert_not(d:throttle(uuid)) + for _ = 1, 4 do + t.assert_equals(d:pop(), 'uuid1') + t.assert_equals(d:pop(), 'uuid2') + end + t.assert_equals(d:pop(), 'uuid1') + -- Not enough bucket anymore, prepare more. + d:prepare_begin() + d:prepare_commit({100}) + t.assert_equals(d:pop(), 'uuid2') + t.assert_not(d:pop()) +end + +test_group.test_skip = function() + -- Double skip should be ok. It happens, if there were several + -- workers on one destination, and all of them received an error. + local d = create_dispenser_with_buckets({uuid1 = 1}, 1) + d:skip('uuid1') + d:skip('uuid1') + t.assert_equals(d.remaining_count, 0) + -- Basic test of the skip flow. + d = create_dispenser_with_buckets({uuid1 = 5, uuid2 = 5}, 10) + t.assert_equals(d:pop(), 'uuid2') + t.assert_equals(d:pop(), 'uuid1') + d:put('uuid1') + t.assert_equals(d.remaining_count, 9) + d:skip('uuid1') + t.assert_equals(d.remaining_count, 4) + -- Put after skip changes nothing. + d:put('uuid1') + t.assert_equals(d.remaining_count, 4) + d:put('uuid2') + d:skip('uuid2') + t.assert_equals(d.remaining_count, 0) + t.assert_not(d:pop()) + -- Buckets are not returned and must be cleaned on exit. + t.assert(#d.prepared_buckets, 8) +end + +test_group.test_prepare_begin = function() + local d = route_dispenser.new({uuid = 10}) + -- Successful begin changes the `is_prepare_in_progress`. + d:prepare_begin() + t.assert(d.is_prepare_in_progress) + -- Second begin in a row - error. + t.assert_not(pcall(d.prepare_begin, d)) + -- Begin with buckets available - error. + d:prepare_commit({1}) + t.assert_gt(#d.prepared_buckets, 0) + t.assert_not(pcall(d.prepare_begin, d)) +end + +test_group.test_prepare_commit = function() + local d = route_dispenser.new({uuid = 10}) + -- Commit without begin - error. + t.assert_not(pcall(d.prepare_commit, d)) + -- Commit with buckets available - error. + d.prepared_buckets = {1} + t.assert_not(pcall(d.prepare_commit, d, {2})) + d.prepared_buckets = {} + -- Successful commit without buckets. + d:prepare_begin() + t.assert(d.is_prepare_in_progress) + d:prepare_commit() + t.assert_not(d.is_prepare_in_progress) + t.assert_equals(d.prepared_buckets, {}) + -- Successful commit with buckets. + d:prepare_begin() + t.assert(d.is_prepare_in_progress) + local bids = {1, 2} + d:prepare_commit(bids) + t.assert_not(d.is_prepare_in_progress) + t.assert_equals(d.prepared_buckets, bids) +end + +test_group.test_wait_prepared = function() + local d = route_dispenser.new({uuid = 10}) + -- wait_prepared with buckets - true (buckets are ready). + d:prepare_begin() + d:prepare_commit({1}) + t.assert(d:wait_prepared()) + d:pop() + -- No preparation - false (caller must prepare). + t.assert_not(d:wait_prepared()) + -- Waken up with commit. + d:prepare_begin() + local f = fiber.create(function() + d:wait_prepared(vconsts.TIMEOUT_INFINITY) + end) + f:set_joinable(true) + d:prepare_commit() + t.assert(f:join()) + -- Error after skip (exit). + d:skip('uuid') + local ok, err = d:wait_prepared(vconsts.TIMEOUT_INFINITY) + t.assert_not(ok) + t.assert_equals(err.message, 'Nothing to wait anymore') +end diff --git a/test/unit/rebalancer.result b/test/unit/rebalancer.result index 80b8b648..ef061642 100644 --- a/test/unit/rebalancer.result +++ b/test/unit/rebalancer.result @@ -16,9 +16,6 @@ build_routes = vshard.storage.internal.rebalancer_build_routes calc_etalon = require('vshard.replicaset').calculate_etalon_balance --- ... -dispenser = vshard.storage.internal.route_dispenser ---- -... rlist = vshard.storage.internal.rlist --- ... @@ -1069,161 +1066,6 @@ build_routes(replicasets) uuid2: uuid4: 2500 ... --- --- gh-161: parallel rebalancer. One of the most important part of --- the latter is a dispenser. It is a structure which hands out --- destination UUIDs in a round-robin manner to worker fibers. --- -d = dispenser.create({uuid = 15}) ---- -... -dispenser.pop(d) ---- -- uuid -... -for i = 1, 14 do assert(dispenser.pop(d) == 'uuid', i) end ---- -... -dispenser.pop(d) ---- -- null -... -dispenser.pop(d) ---- -- null -... -dispenser.pop(d) ---- -- null -... -dispenser.pop(d) ---- -- null -... -d ---- -- rlist: - count: 0 - map: - uuid: - bucket_count: 0 - id: uuid - progress: 0 - is_throttle_warned: false - need_to_send: 15 -... -d = dispenser.create({uuid1 = 5, uuid2 = 5}) ---- -... -u = dispenser.pop(d) ---- -... -u, d ---- -- uuid2 -- rlist: - count: 2 - last: &0 - bucket_count: 4 - prev: &1 - bucket_count: 5 - next: *0 - id: uuid1 - progress: 0 - is_throttle_warned: false - need_to_send: 5 - id: uuid2 - progress: 0 - is_throttle_warned: false - need_to_send: 5 - first: *1 - map: - uuid1: *1 - uuid2: *0 -... -dispenser.put(d, u) ---- -... -d ---- -- rlist: - count: 2 - last: &0 - bucket_count: 5 - prev: &1 - bucket_count: 5 - next: *0 - id: uuid1 - progress: 0 - is_throttle_warned: false - need_to_send: 5 - id: uuid2 - progress: 0 - is_throttle_warned: false - need_to_send: 5 - first: *1 - map: - uuid1: *1 - uuid2: *0 -... -dispenser.throttle(d, u) ---- -- true -... -d ---- -- rlist: - count: 2 - last: &0 - bucket_count: 5 - prev: &1 - bucket_count: 5 - next: *0 - id: uuid1 - progress: 0 - is_throttle_warned: false - need_to_send: 5 - id: uuid2 - progress: 0 - is_throttle_warned: true - need_to_send: 5 - first: *1 - map: - uuid1: *1 - uuid2: *0 -... -dispenser.throttle(d, u) ---- -- false -... -u1 = dispenser.pop(d) ---- -... -u2 = dispenser.pop(d) ---- -... -u1, u2 ---- -- uuid1 -- uuid2 -... -for i = 1, 4 do \ - assert(dispenser.pop(d) == u1) \ - assert(dispenser.pop(d) == u2) \ -end ---- -... --- Double skip should be ok. It happens, if there were several --- workers on one destination, and all of them received an error. -d = dispenser.create({uuid1 = 1}) ---- -... -dispenser.skip(d, 'uuid1') ---- -... -dispenser.skip(d, 'uuid1') ---- -... _bucket:drop() --- ... diff --git a/test/unit/rebalancer.test.lua b/test/unit/rebalancer.test.lua index 4481236d..20ef80cb 100644 --- a/test/unit/rebalancer.test.lua +++ b/test/unit/rebalancer.test.lua @@ -4,7 +4,6 @@ fiber = require('fiber') calc_metrics = vshard.storage.internal.rebalancer_calculate_metrics build_routes = vshard.storage.internal.rebalancer_build_routes calc_etalon = require('vshard.replicaset').calculate_etalon_balance -dispenser = vshard.storage.internal.route_dispenser rlist = vshard.storage.internal.rlist consts = vshard.consts util = require('util') @@ -271,40 +270,4 @@ calc_metrics(replicasets) replicasets build_routes(replicasets) --- --- gh-161: parallel rebalancer. One of the most important part of --- the latter is a dispenser. It is a structure which hands out --- destination UUIDs in a round-robin manner to worker fibers. --- -d = dispenser.create({uuid = 15}) -dispenser.pop(d) -for i = 1, 14 do assert(dispenser.pop(d) == 'uuid', i) end -dispenser.pop(d) -dispenser.pop(d) -dispenser.pop(d) -dispenser.pop(d) -d - -d = dispenser.create({uuid1 = 5, uuid2 = 5}) -u = dispenser.pop(d) -u, d -dispenser.put(d, u) -d -dispenser.throttle(d, u) -d -dispenser.throttle(d, u) -u1 = dispenser.pop(d) -u2 = dispenser.pop(d) -u1, u2 -for i = 1, 4 do \ - assert(dispenser.pop(d) == u1) \ - assert(dispenser.pop(d) == u2) \ -end - --- Double skip should be ok. It happens, if there were several --- workers on one destination, and all of them received an error. -d = dispenser.create({uuid1 = 1}) -dispenser.skip(d, 'uuid1') -dispenser.skip(d, 'uuid1') - _bucket:drop() diff --git a/test/upgrade/upgrade.result b/test/upgrade/upgrade.result index e0c9469b..75ad5c82 100644 --- a/test/upgrade/upgrade.result +++ b/test/upgrade/upgrade.result @@ -173,6 +173,7 @@ vshard.storage._call('test_api', 1, 2, 3) | --- | - - bucket_recv | - bucket_test_gc + | - bucket_test_send | - info | - rebalancer_apply_routes | - rebalancer_request_state diff --git a/vshard/cfg.lua b/vshard/cfg.lua index 4df6bd0e..ab909004 100644 --- a/vshard/cfg.lua +++ b/vshard/cfg.lua @@ -416,6 +416,10 @@ local cfg_template = { default = consts.DEFAULT_REBALANCER_MAX_SENDING, max = consts.REBALANCER_MAX_SENDING_MAX }, + rebalancer_bucket_send_timeout = { + type = 'positive number', name = 'Rebalancer bucket send timeout', + is_optional = true, default = consts.TIMEOUT_INFINITY, + }, rebalancer_mode = { type = 'enum', name = 'Rebalancer mode', @@ -573,8 +577,17 @@ local function cfg_extract_identifiers(cfg_key, cfg_value, is_named) return uuid, name end +local function check_option(name, type, tv, value) + assert(type_descriptors[type]) + local td = type_descriptors[type] + if not td:check(tv, value) then + error('%s must be %s', name, td:tostring(tv)) + end +end + return { check = cfg_check, + check_option = check_option, extract_vshard = cfg_extract_vshard, extract_box = cfg_extract_box, extract_identifiers = cfg_extract_identifiers, diff --git a/vshard/consts.lua b/vshard/consts.lua index f5ad61e4..c89bf593 100644 --- a/vshard/consts.lua +++ b/vshard/consts.lua @@ -7,6 +7,7 @@ return { BUCKET = { ACTIVE = 'active', PINNED = 'pinned', + READONLY = 'readonly', SENDING = 'sending', SENT = 'sent', RECEIVING = 'receiving', @@ -39,9 +40,9 @@ return { DEFAULT_REBALANCER_DISBALANCE_THRESHOLD = 1; REBALANCER_IDLE_INTERVAL = 60 * 60; REBALANCER_WORK_INTERVAL = 10; - REBALANCER_CHUNK_TIMEOUT = 60 * 5; REBALANCER_GET_STATE_TIMEOUT = 5, REBALANCER_APPLY_ROUTES_TIMEOUT = 5, + REBALANCER_WORKER_STEP = 0.1, DEFAULT_REBALANCER_MAX_SENDING = 1; REBALANCER_MAX_SENDING_MAX = 15; DEFAULT_REBALANCER_MAX_RECEIVING = 100; @@ -56,13 +57,17 @@ return { GC_BACKOFF_INTERVAL = 5, GC_MAP_CALL_TIMEOUT = 64, GC_WAIT_LSN_TIMEOUT = 64, - GC_WAIT_LSN_STEP = 0.1, + WAIT_LSN_STEP = 0.1, RECOVERY_BACKOFF_INTERVAL = 5, RECOVERY_GET_STAT_TIMEOUT = 5, REPLICA_BACKOFF_INTERVAL = 5, REPLICA_NOACTIVITY_TIMEOUT = 30, - DEFAULT_BUCKET_SEND_TIMEOUT = 10, - DEFAULT_BUCKET_RECV_TIMEOUT = 10, + + DEFAULT_BUCKET_REF_TIMEOUT = 60 * 5, + DEFAULT_BUCKET_CHUNK_TIMEOUT = 60 * 5, + DEFAULT_BUCKET_POP_TIMEOUT = 60 * 15, + DEFAULT_BUCKET_SYNC_TIMEOUT = 60, + REPLICA_LAG_LIMIT = 30, LOG_RATELIMIT_INTERVAL = 30, diff --git a/vshard/storage/CMakeLists.txt b/vshard/storage/CMakeLists.txt index be37d9b0..7cc78ef5 100644 --- a/vshard/storage/CMakeLists.txt +++ b/vshard/storage/CMakeLists.txt @@ -1,3 +1,3 @@ install(FILES init.lua reload_evolution.lua ref.lua sched.lua schema.lua - export_log.lua exports.lua + export_log.lua exports.lua route_dispenser.lua DESTINATION ${TARANTOOL_INSTALL_LUADIR}/vshard/storage) diff --git a/vshard/storage/init.lua b/vshard/storage/init.lua index f62d150f..3063338e 100644 --- a/vshard/storage/init.lua +++ b/vshard/storage/init.lua @@ -18,16 +18,16 @@ if rawget(_G, MODULE_INTERNALS) then local vshard_modules = { 'vshard.consts', 'vshard.error', 'vshard.cfg', 'vshard.version', 'vshard.replicaset', 'vshard.util', 'vshard.service_info', - 'vshard.storage.reload_evolution', 'vshard.rlist', 'vshard.registry', + 'vshard.storage.reload_evolution', 'vshard.registry', 'vshard.heap', 'vshard.storage.ref', 'vshard.storage.sched', 'vshard.storage.schema', 'vshard.storage.export_log', 'vshard.storage.exports', 'vshard.log_ratelimit', + 'vshard.storage.route_dispenser', } for _, module in pairs(vshard_modules) do package.loaded[module] = nil end end -local rlist = require('vshard.rlist') local consts = require('vshard.consts') local lerror = require('vshard.error') local lcfg = require('vshard.cfg') @@ -40,10 +40,12 @@ local lref = require('vshard.storage.ref') local lsched = require('vshard.storage.sched') local lschema = require('vshard.storage.schema') local reload_evolution = require('vshard.storage.reload_evolution') +local route_dispenser = require('vshard.storage.route_dispenser') local fiber_cond_wait = util.fiber_cond_wait local index_has = util.index_has local BACTIVE = consts.BUCKET.ACTIVE local BPINNED = consts.BUCKET.PINNED +local BREADONLY = consts.BUCKET.READONLY local BSENDING = consts.BUCKET.SENDING local BSENT = consts.BUCKET.SENT local BRECEIVING = consts.BUCKET.RECEIVING @@ -93,6 +95,7 @@ if not M then ERRINJ_LONG_RECEIVE = false, ERRINJ_LAST_RECEIVE_DELAY = false, ERRINJ_LAST_SEND_DELAY = false, + ERRINJ_LAST_SEND_DELAY_COUNTDOWN = -1, ERRINJ_RECEIVE_PARTIALLY = false, ERRINJ_RECOVERY_PAUSE = false, ERRINJ_DISCOVERY = false, @@ -100,6 +103,7 @@ if not M then ERRINJ_BUCKET_GC_LONG_REPLICAS_TEST = false, ERRINJ_APPLY_ROUTES_STOP_DELAY = false, ERRINJ_SKIP_BUCKET_STATUS_VALIDATE = false, + ERRINJ_WORKER_PREPARE_WAKEUP_DELAY = {}, }, -- This counter is used to restart background fibers with -- new reloaded code. @@ -224,6 +228,9 @@ if not M then -- order in parallel. Each fiber sends 1 bucket at a -- moment. rebalancer_worker_count = consts.DEFAULT_REBALANCER_WORKER_COUNT, + -- The timeout for sending a bucket in rebalancer apply routes worker. + -- Limits the number of seconds, a bucket can be unavailable for write. + rebalancer_bucket_send_timeout = consts.TIMEOUT_INFINITY, -- Map of bucket ro/rw reference counters. These counters -- works like bucket pins, but countable and are not -- persisted. Persistence is not needed since the refs are @@ -382,7 +389,8 @@ local function bucket_are_all_rw_not_cache() local res = not index_has(status_index, status.SENDING) and not index_has(status_index, status.SENT) and not index_has(status_index, status.RECEIVING) and - not index_has(status_index, status.GARBAGE) + not index_has(status_index, status.GARBAGE) and + not index_has(status_index, status.READONLY) M.bucket_are_all_rw_cache = res bucket_are_all_rw = bucket_are_all_rw_cache @@ -434,14 +442,22 @@ end -- Check if @a bucket can accept 'read' requests. -- local function bucket_status_is_readable(status) - return bucket_status_is_writable(status) or status == BSENDING + return bucket_status_is_writable(status) or status == BSENDING or + status == BREADONLY end -- -- Check if a bucket is sending or receiving. -- local function bucket_status_is_transfer_in_progress(status) - return status == BSENDING or status == BRECEIVING + return status == BSENDING or status == BRECEIVING or status == BREADONLY +end + +-- +-- Check, whether the bucket should have the destination written. +-- +local function bucket_status_has_destination(status) + return status ~= BACTIVE and status ~= BPINNED and status ~= BREADONLY end -- @@ -486,8 +502,14 @@ end local bucket_state_edges = { -- From nothing. Have to cast nil to box.NULL explicitly then. [box.NULL] = {BRECEIVING}, - [BACTIVE] = {BPINNED, BSENDING}, + -- In the current code ACTIVE cannot become SENDING, only the transition + -- ACTIVE -> READONLY -> SENDING is allowed. However, it must be kept, + -- since if we're a replica and master works on top of an old version, it + -- may make ACTIVE bucket SENDING, and the replication should not be broken + -- in such case, replica should accept such change. + [BACTIVE] = {BPINNED, BSENDING, BREADONLY}, [BPINNED] = {BACTIVE}, + [BREADONLY] = {BACTIVE, BSENDING}, [BSENDING] = {BACTIVE, BSENT}, [BSENT] = {BGARBAGE}, [BRECEIVING] = {BGARBAGE, BACTIVE}, @@ -940,10 +962,23 @@ local function recovery_local_bucket_is_active(local_bucket, remote_bucket) return status == BSENT or status == BGARBAGE end -local function recovery_save_recovered(dict, id, status) - local ids = dict[status] or {} - table.insert(ids, id) - dict[status] = ids +local bucket_recovery_checkers = { + [BSENT] = recovery_local_bucket_is_sent, + [BGARBAGE] = recovery_local_bucket_is_garbage, + [BACTIVE] = recovery_local_bucket_is_active, +} + +local function bucket_recover(bucket_id, status, recovered_buckets) + if bucket_status_has_destination(status) then + -- Preserve the destination, just update the status. + box.space._bucket:update({bucket_id}, {{'=', 2, status}}) + else + -- Replace the whole tuple, drop destination if it exists. + box.space._bucket:replace({bucket_id, status}) + end + local ids = recovered_buckets[status] or {} + table.insert(ids, bucket_id) + recovered_buckets[status] = ids end -- @@ -965,8 +1000,16 @@ local function recovery_step_by_type(type, limiter) goto continue end assert(bucket_status_is_transfer_in_progress(bucket.status)) - local peer_id = bucket.destination - local destination = M.replicasets[peer_id] + local peer_id, destination, remote_bucket, err + if not bucket_status_has_destination(type) then + -- READONLY bucket status doesn't have destination and doesn't + -- require checking the remote bucket state, since it's guaranteed, + -- that the bucket has not been sent yet. Just recover it. + assert(type == BREADONLY) + goto recover + end + peer_id = bucket.destination + destination = M.replicasets[peer_id] if not destination then -- No replicaset master for a bucket. Wait until it -- appears. @@ -979,7 +1022,7 @@ local function recovery_step_by_type(type, limiter) goto continue end lfiber.testcancel() - local remote_bucket, err = master_call( + remote_bucket, err = master_call( destination, 'vshard.storage._call', {'recovery_bucket_stat', bucket_id}, {timeout = consts.RECOVERY_GET_STAT_TIMEOUT}) @@ -1003,6 +1046,7 @@ local function recovery_step_by_type(type, limiter) if remote_bucket and remote_bucket.is_transfering then goto continue end + ::recover:: -- It is possible that during lookup a new request arrived -- which finished the transfer. bucket = _bucket:get{bucket_id} @@ -1014,19 +1058,14 @@ local function recovery_step_by_type(type, limiter) log.info(start_format, type) end lfiber.testcancel() - if recovery_local_bucket_is_sent(bucket, remote_bucket) then - _bucket:update({bucket_id}, {{'=', 2, BSENT}}) - recovered = recovered + 1 - recovery_save_recovered(recovered_buckets, bucket_id, BSENT) - elseif recovery_local_bucket_is_garbage(bucket, remote_bucket) then - _bucket:update({bucket_id}, {{'=', 2, BGARBAGE}}) - recovered = recovered + 1 - recovery_save_recovered(recovered_buckets, bucket_id, BGARBAGE) - elseif recovery_local_bucket_is_active(bucket, remote_bucket) then - _bucket:replace({bucket_id, BACTIVE}) - recovered = recovered + 1 - recovery_save_recovered(recovered_buckets, bucket_id, BACTIVE) - elseif is_step_empty then + for status, can_recover in pairs(bucket_recovery_checkers) do + if can_recover(bucket, remote_bucket) then + bucket_recover(bucket_id, status, recovered_buckets) + recovered = recovered + 1 + break + end + end + if recovered == 0 and is_step_empty then log.info('Bucket %s is %s local and %s on replicaset %s, waiting', bucket_id, bucket.status, remote_bucket.status, peer_id) end @@ -1070,31 +1109,22 @@ local function recovery_service_f(service, limiter) goto sleep end - service:set_activity('recovering sending') - lfiber.testcancel() - ok, total, recovered = pcall(recovery_step_by_type, BSENDING, limiter) - if not ok then - is_all_recovered = false - limiter:log_error(total, service:set_status_error( - 'Error during sending buckets recovery: %s', total)) - elseif total ~= recovered then - is_all_recovered = false + for _, status in ipairs({BREADONLY, BSENDING, BRECEIVING}) do + lfiber.testcancel() + service:set_activity('recovering ' .. status) + ok, total, recovered = pcall(recovery_step_by_type, status, limiter) + if not ok then + is_all_recovered = false + limiter:log_error(total, service:set_status_error( + 'Error during %s buckets recovery: %s', status, total)) + elseif total ~= recovered then + is_all_recovered = false + end end - - service:set_activity('recovering receiving') - lfiber.testcancel() - ok, total, recovered = pcall(recovery_step_by_type, BRECEIVING, limiter) - if not ok then - is_all_recovered = false - limiter:log_error(total, service:set_status_error( - 'Error during receiving buckets recovery: %s', total)) - elseif total == 0 then + if ok and total == 0 then bucket_receiving_quota_reset() - else + elseif ok then bucket_receiving_quota_add(recovered) - if total ~= recovered then - is_all_recovered = false - end end ::sleep:: @@ -1380,19 +1410,6 @@ local function bucket_unrefrw(bucket_id) return true end --- --- Ensure that a bucket ref exists and can be referenced for an RW --- request. --- -local function bucket_refrw_touch(bucket_id) - local status, err = bucket_refrw(bucket_id) - if not status then - return nil, err - end - bucket_unrefrw(bucket_id) - return M.bucket_refs[bucket_id] -end - -- -- Ref/unref shortcuts for an obscure mode. -- @@ -1528,7 +1545,7 @@ local function bucket_recv_xc(bucket_id, from, data, opts) return nil, lerror.vshard(lerror.code.TOO_MANY_RECEIVING) end local timeout = opts and opts.timeout or - consts.DEFAULT_BUCKET_SEND_TIMEOUT + consts.DEFAULT_BUCKET_REF_TIMEOUT local ok, err = lsched.move_start(timeout) if not ok then return nil, err @@ -1709,6 +1726,38 @@ local function bucket_test_gc(bids) return {bids_not_ok = bids_not_ok} end +-- +-- Test, that all passed buckets can be safely tranferred to another replicaset. +-- +local function bucket_test_send(bids, opts) + assert(opts and opts.timeout) + local deadline = fiber_clock() + opts.timeout + local ok, err, timeout + for _, bid in ipairs(bids) do + local bucket = box.space._bucket:get({bid}) + if not bucket or bucket.status ~= BREADONLY then + -- Should not happen, just for safety. + local reason = string.format("Bucket is '%s' instead of '%s'", + bucket and bucket.status, BREADONLY) + error(lerror.vshard(lerror.code.WRONG_BUCKET, bid, reason, + M.this_replica.id)) + end + local ref = M.bucket_refs[bid] + while ref and ref.rw ~= 0 do + timeout = deadline - fiber_clock() + ok, err = fiber_cond_wait(M.bucket_rw_lock_is_ready_cond, timeout) + if not ok then + local reason = 'failed waiting for no RW refs' + local final_err = lerror.vshard(lerror.code.WRONG_BUCKET, + bid, reason, M.this_replica.id) + final_err.prev = err + error(final_err) + end + ref = M.bucket_refs[bid] + end + end +end + -- -- Public wrapper for sharded spaces list getter. -- @@ -1806,41 +1855,96 @@ local function buckets_discovery(opts) end -- --- Send a bucket to other replicaset. +-- Builds the timeout options for sending a bucket. Used in manual +-- `bucket_send` and rebalancer routes applier worker. -- -local function bucket_send_xc(bucket_id, destination, opts, exception_guard) - local id = M.this_replicaset.id - local status, ok - local ref, err = bucket_refrw_touch(bucket_id) - if not ref then +local function bucket_send_build_opts(opts, defaults) + assert(defaults['timeout']) + opts = opts or {} + local type = 'positive number' + for name, default in pairs(defaults) do + if opts[name] then + lcfg.check_option(name, type, nil, opts[name]) + else + opts[name] = default + end + end + opts.deadline = fiber_clock() + opts.timeout + return opts +end + +-- +-- Prepare the bucket for sending: mark it as transferring, make READONLY and +-- wait for 0 RW refs locally. +-- +local function bucket_send_prepare(bid, opts) + assert(opts and opts.ref_timeout and opts.deadline) + local ok, err, bucket, ref + local _bucket = box.space._bucket + local ref_deadline = fiber_clock() + opts.ref_timeout + local deadline = math.min(ref_deadline, opts.deadline) + + ok, err = bucket_transfer_start(bid) + if not ok then return nil, err end - ref.rw_lock = true - exception_guard.ref = ref - exception_guard.drop_rw_lock = true - if not opts or not opts.timeout then - opts = opts and table.copy(opts) or {} - opts.timeout = consts.DEFAULT_BUCKET_SEND_TIMEOUT + bucket, err = bucket_check_state(bid, 'write') + if err then + goto error end - local timeout = opts.timeout - local deadline = fiber_clock() + timeout - while ref.rw ~= 0 do - timeout = deadline - fiber_clock() - ok, err = fiber_cond_wait(M.bucket_rw_lock_is_ready_cond, timeout) + if bucket.status == BPINNED then + err = lerror.vshard(lerror.code.BUCKET_IS_PINNED, bid) + goto error + end + ok, err = lsched.move_start(deadline - fiber_clock()) + if not ok then + goto error + end + assert(lref.count == 0) + -- Move is scheduled only for the time of _bucket update because: + -- + -- * it is consistent with bucket_recv() (see its comments); + -- + -- * gives the same effect as if move was in the scheduler for the whole + -- bucket_send() time, because refs won't be able to start anyway - the + -- bucket is not writable. + ok, err = pcall(_bucket.replace, _bucket, {bid, BREADONLY}) + lsched.move_end(1) + if not ok then + goto error + end + ref = M.bucket_refs[bid] + while ref and ref.rw ~= 0 do + ok, err = fiber_cond_wait(M.bucket_rw_lock_is_ready_cond, + deadline - fiber_clock()) if not ok then - return nil, err + goto error end lfiber.testcancel() end + do return true end + +::error:: + bucket_transfer_end(bid) + return nil, err +end +-- +-- Send a bucket to other replicaset. +-- +local function bucket_send_xc(bucket_id, destination, opts) + assert(opts and opts.chunk_timeout and opts.deadline) + -- `bucket_send_prepare()` must be called before `bucket_send_xc()`. + assert(M.rebalancer_transfering_buckets[bucket_id]) + local ref = M.bucket_refs[bucket_id] + assert(not ref or (ref.rw == 0 and ref.rw_lock)) + assert(lref.count == 0) + local id = M.this_replicaset.id + local status, ok, err local _bucket = box.space._bucket - local bucket = _bucket:get({bucket_id}) if is_this_replicaset_locked() then return nil, lerror.vshard(lerror.code.REPLICASET_IS_LOCKED) end - if bucket.status == BPINNED then - return nil, lerror.vshard(lerror.code.BUCKET_IS_PINNED, bucket_id) - end local replicaset = M.replicasets[destination] if replicaset == nil then return nil, lerror.vshard(lerror.code.NO_SUCH_REPLICASET, destination) @@ -1855,25 +1959,13 @@ local function bucket_send_xc(bucket_id, destination, opts, exception_guard) local bucket_generation = M.bucket_generation local sendg = BSENDING - ok, err = lsched.move_start(timeout) - if not ok then - return nil, err - end - assert(lref.count == 0) - -- Move is scheduled only for the time of _bucket update because: - -- - -- * it is consistent with bucket_recv() (see its comments); - -- - -- * gives the same effect as if move was in the scheduler for the whole - -- bucket_send() time, because refs won't be able to start anyway - the - -- bucket is not writable. ok, err = pcall(_bucket.replace, _bucket, {bucket_id, sendg, destination}) - lsched.move_end(1) if not ok then return nil, lerror.make(err) end - exception_guard.drop_rw_lock = false + local timeout + local deadline = opts.deadline for _, space in pairs(spaces) do local index = space.index[idx] local space_data = {} @@ -1882,9 +1974,10 @@ local function bucket_send_xc(bucket_id, destination, opts, exception_guard) limit = limit - 1 if limit == 0 then table.insert(data, {space.name, space_data}) + timeout = math.min(opts.chunk_timeout, deadline - fiber_clock()) status, err = master_call( replicaset, 'vshard.storage.bucket_recv', - {bucket_id, id, data}, opts) + {bucket_id, id, data}, {timeout = timeout}) bucket_generation = bucket_guard_xc(bucket_generation, bucket_id, sendg) if not status then @@ -1897,11 +1990,14 @@ local function bucket_send_xc(bucket_id, destination, opts, exception_guard) end table.insert(data, {space.name, space_data}) end + timeout = math.min(opts.chunk_timeout, deadline - fiber_clock()) status, err = master_call(replicaset, 'vshard.storage.bucket_recv', - {bucket_id, id, data}, opts) + {bucket_id, id, data}, {timeout = timeout}) if not status then return status, lerror.make(err) end + util.errinj_countdown(M.errinj, 'ERRINJ_LAST_SEND_DELAY_COUNTDOWN', + function() M.errinj.ERRINJ_LAST_SEND_DELAY = true end) while M.errinj.ERRINJ_LAST_SEND_DELAY do lfiber.sleep(0.01) end @@ -1927,14 +2023,47 @@ local function bucket_send_xc(bucket_id, destination, opts, exception_guard) -- a bucket is sent, hung in the network. Then it is recovered -- to active on the source, and then the message arrives and -- the same bucket is activated on the destination. + timeout = math.min(opts.chunk_timeout, deadline - fiber_clock()) status, err = master_call(replicaset, 'vshard.storage.bucket_recv', - {bucket_id, id, {}, {is_last = true}}, opts) + {bucket_id, id, {}, {is_last = true}}, + {timeout = timeout}) if not status then return status, lerror.make(err) end return true end +local function bucket_send_end(bid) + bucket_transfer_end(bid) +end + +local function bucket_test_send_on_replicas(buckets, opts) + assert(opts and opts.sync_timeout and opts.deadline) + local ok, err, _ + local sync_deadline = fiber_clock() + opts.sync_timeout + local deadline = math.min(sync_deadline, opts.deadline) + ok, err = wait_lsn(deadline - fiber_clock(), consts.WAIT_LSN_STEP) + if not ok then + err.reason = string.format('could not sync with replicas for ' .. + 'readonly batch %s', json_encode(buckets)) + return nil, err + end + local call_timeout = deadline - fiber_clock() + -- Use smaller timeout to reveal the error from storage if it's + -- available, user timeout is passed to the call itself. + local wait_timeout = call_timeout / 1.5 + _, err = M.this_replicaset:map_call('vshard.storage._call', + {'bucket_test_send', buckets, {timeout = wait_timeout}}, { + timeout = call_timeout, + except = M.this_replica.id, + }) + if err then + err = lerror.from_string(err.message) or err + return nil, err + end + return true +end + -- -- Exception and recovery safe version of bucket_send_xc. -- @@ -1942,22 +2071,24 @@ local function bucket_send(bucket_id, destination, opts) if type(bucket_id) ~= 'number' or type(destination) ~= 'string' then error('Usage: bucket_send(bucket_id, destination)') end + opts = bucket_send_build_opts(table.deepcopy(opts), { + ['timeout'] = consts.TIMEOUT_INFINITY, + ['chunk_timeout'] = consts.DEFAULT_BUCKET_CHUNK_TIMEOUT, + ['ref_timeout'] = consts.DEFAULT_BUCKET_REF_TIMEOUT, + ['sync_timeout'] = consts.DEFAULT_BUCKET_SYNC_TIMEOUT, + }) local status, ret, err - ret, err = check_is_master() - if not ret then + status, err = bucket_send_prepare(bucket_id, opts) + if not status then return nil, err end - status, err = bucket_transfer_start(bucket_id) + status, err = bucket_test_send_on_replicas({bucket_id}, opts) if not status then + bucket_send_end(bucket_id) return nil, err end - local exception_guard = {} - status, ret, err = pcall(bucket_send_xc, bucket_id, destination, opts, - exception_guard) - if exception_guard.drop_rw_lock then - exception_guard.ref.rw_lock = false - end - bucket_transfer_end(bucket_id) + status, ret, err = pcall(bucket_send_xc, bucket_id, destination, opts) + bucket_send_end(bucket_id) if status then if ret then return ret @@ -2113,8 +2244,7 @@ end -- were approved for deletion. -- local function gc_bucket_process_sent_one_batch_xc(batch) - local ok, err = wait_lsn(consts.GC_WAIT_LSN_TIMEOUT, - consts.GC_WAIT_LSN_STEP) + local ok, err = wait_lsn(consts.GC_WAIT_LSN_TIMEOUT, consts.WAIT_LSN_STEP) if not ok then local msg = 'Failed to delete sent buckets - could not sync '.. 'with replicas' @@ -2517,134 +2647,112 @@ local function rebalancer_build_routes(replicasets) return bucket_routes end --- --- Dispenser is a container of routes received from the --- rebalancer. Its task is to hand out the routes to worker fibers --- in a round-robin manner so as any two sequential results are --- different. It allows to spread dispensing evenly over the --- receiver nodes. --- -local function route_dispenser_create(routes) - local rlist = rlist.new() - local map = {} - for id, bucket_count in pairs(routes) do - local new = { - -- Receiver's ID. - id = id, - -- Rest of buckets to send. The receiver will be - -- dispensed this number of times. - bucket_count = bucket_count, - -- Constant value to be able to track progress. - need_to_send = bucket_count, - -- Number of *successfully* sent buckets. - progress = 0, - -- If a user set too long max number of receiving - -- buckets, or too high number of workers, worker - -- fibers will receive 'throttle' errors, perhaps - -- quite often. So as not to clog the log each - -- destination is logged as throttled only once. - is_throttle_warned = false, - } - -- Map of destinations is stored in addition to the queue, - -- because - -- 1) It is possible, that there are no more buckets to - -- send, but suddenly one of the workers trying to send - -- the last bucket receives a throttle error. In that - -- case the bucket is put back, and the destination - -- returns to the queue; - -- 2) After all buckets are sent, and the queue is empty, - -- the main applier fiber does some analysis on the - -- destinations. - map[id] = new - rlist:add_tail(new) - end - return { - rlist = rlist, - map = map, - -- Error, which occurred in `bucket_send` and led to - -- skipping of the above-mentioned id from dispenser. - error = nil, - } -end - --- --- Put one bucket back to the dispenser. It happens, if the worker --- receives a throttle error. This is the only error that can be --- tolerated. --- -local function route_dispenser_put(dispenser, id) - local dst = dispenser.map[id] - if dst then - local bucket_count = dst.bucket_count + 1 - dst.bucket_count = bucket_count - if bucket_count == 1 then - dispenser.rlist:add_tail(dst) +local function rebalancer_prepare_buckets(bucket_count, opts) + assert(opts and opts.timeout) + assert(bucket_count >= 0) + local buckets = {} + local active_key = {BACTIVE} + local status, err, bucket_id, msg + local _status = box.space._bucket.index.status + local limit = consts.BUCKET_CHUNK_SIZE + while #buckets ~= bucket_count do + -- Prefer buckets without rw refs or without refs at all. + for _, bucket in _status:pairs(active_key) do + bucket_id = bucket.id + local ref = M.bucket_refs[bucket_id] + if not M.rebalancer_transfering_buckets[bucket_id] and + (not ref or ref.rw == 0) then + goto prepare + end + limit = limit - 1 + if limit == 0 then + fiber_yield() + limit = consts.BUCKET_CHUNK_SIZE + end end + -- Can't just take a first active bucket. It may be already locked by a + -- manual bucket_send in another fiber. + for _, bucket in _status:pairs(active_key) do + bucket_id = bucket.id + if not M.rebalancer_transfering_buckets[bucket_id] then + goto prepare + end + limit = limit - 1 + if limit == 0 then + fiber_yield() + limit = consts.BUCKET_CHUNK_SIZE + end + end + msg = ('Can not find %d active buckets for send'):format(bucket_count) + err = lerror.make(msg) + goto error +::prepare:: + status, err = bucket_send_prepare(bucket_id, opts) + if not status then + goto error + end + table.insert(buckets, bucket_id) end -end - --- --- In case if a receiver responded with a serious error it is not --- safe to send more buckets to there. For example, if it was a --- timeout, it is unknown whether the bucket was received or not. --- If it was a box error like index key conflict, then it is even --- worse and the cluster is broken. --- -local function route_dispenser_skip(dispenser, id) - local map = dispenser.map - local dst = map[id] - if dst then - map[id] = nil - dispenser.rlist:remove(dst) - end -end - --- --- Set that the receiver @a id was throttled. When it happens --- first time it is logged. --- -local function route_dispenser_throttle(dispenser, id) - local dst = dispenser.map[id] - if dst then - local old_value = dst.is_throttle_warned - dst.is_throttle_warned = true - return not old_value + status, err = bucket_test_send_on_replicas(buckets, opts) + if not status then + goto error end - return false -end + do return buckets end --- --- Notify the dispenser that a bucket was successfully sent to --- @a id. It has no any functional purpose except tracking --- progress. --- -local function route_dispenser_sent(dispenser, id) - local dst = dispenser.map[id] - if dst then - local new_progress = dst.progress + 1 - dst.progress = new_progress - local need_to_send = dst.need_to_send - return new_progress == need_to_send, need_to_send +::error:: + for _, bid in ipairs(buckets) do + bucket_send_end(bid) end - return false + return nil, err end --- --- Take a next destination to send a bucket to. --- -local function route_dispenser_pop(dispenser) - local rlist = dispenser.rlist - local dst = rlist.first - if dst then - local bucket_count = dst.bucket_count - 1 - dst.bucket_count = bucket_count - rlist:remove(dst) - if bucket_count > 0 then - rlist:add_tail(dst) +local function rebalancer_dispenser_pop(dispenser) + local ok, err, count, buckets + local opts = bucket_send_build_opts(nil, { + ['timeout'] = M.rebalancer_bucket_send_timeout, + ['ref_timeout'] = consts.DEFAULT_BUCKET_REF_TIMEOUT, + ['pop_timeout'] = consts.DEFAULT_BUCKET_POP_TIMEOUT, + ['sync_timeout'] = consts.DEFAULT_BUCKET_SYNC_TIMEOUT, + }) + local pop_deadline = fiber_clock() + opts.pop_timeout + local deadline = math.min(pop_deadline, opts.deadline) + while fiber_clock() < deadline do + ok, err = dispenser:wait_prepared(deadline - fiber_clock()) + local injection = M.errinj.ERRINJ_WORKER_PREPARE_WAKEUP_DELAY + while injection[lfiber.self().name()] do + lfiber.testcancel() + lfiber.sleep(0.01) + end + if ok then + if #dispenser.prepared_buckets == 0 then + -- We're waken up, however, the buckets are not yet prepared + -- (e.g. all other workers are already sending them). We need + -- to wait again or prepare them on our own. + goto continue + end + return dispenser:pop() + elseif err then + -- Stop waiting, doesn't make sense anymore. Do not log error, + -- since it's not an error but a marker, that waiting is not needed. + -- Causes exit from a worker. + return + end + -- We must prepare the buckets. + assert(ok == false) + dispenser:prepare_begin() + count = math.min(M.rebalancer_worker_count, dispenser.remaining_count) + buckets, err = rebalancer_prepare_buckets(count, opts) + dispenser:prepare_commit(buckets) + if buckets then + return dispenser:pop() end - return dst.id + log.info('Failed preparing the buckets for sending, retrying: %s', err) + ::continue:: + lfiber.sleep(consts.REBALANCER_WORKER_STEP) end - return nil + -- Intentional nil return here, since we didn't manage to wait for buckets + -- to be prepared, nothing to do here anymore. Causes exit from a worker. + log.error('TimedOut while waiting for buckets to be prepared') end -- @@ -2656,38 +2764,32 @@ end -- local function rebalancer_worker_f(worker_id, dispenser, quit_cond) lfiber.name(string.format('vshard.rebalancer_worker_%d', worker_id)) - local _status = box.space._bucket.index.status - local opts = {timeout = consts.REBALANCER_CHUNK_TIMEOUT} - local active_key = {BACTIVE} - local id = route_dispenser_pop(dispenser) + local id, bid = rebalancer_dispenser_pop(dispenser) local worker_throttle_count = 0 - local bucket_id, is_found - while id do - is_found = false - -- Can't just take a first active bucket. It may be - -- already locked by a manual bucket_send in another - -- fiber. - for _, bucket in _status:pairs(active_key) do - bucket_id = bucket.id - if not M.rebalancer_transfering_buckets[bucket_id] then - is_found = true - break - end - end - if not is_found then - log.error('Can not find active buckets') - break - end - local ret, err = bucket_send(bucket_id, id, opts) - if ret then + while id and bid do + -- The opts are recreated for every bucket to allow reconfiguring + -- of the timeout when routes applier already works. + local send_opts = bucket_send_build_opts(nil, { + ['timeout'] = M.rebalancer_bucket_send_timeout, + ['chunk_timeout'] = consts.DEFAULT_BUCKET_CHUNK_TIMEOUT, + }) + local status, ok, err = pcall(bucket_send_xc, bid, id, send_opts) + -- Note, that the bucket should not be returned to the prepared ones. + -- Firstly, it may have non-readonly status already, better prepare the + -- new one and properly recover this one for code simplicity and + -- readability. Secondly, we should not prohibit writes to the bucket + -- for more time, than we really have to. + bucket_send_end(bid) + if status and ok then worker_throttle_count = 0 - local finished, total = route_dispenser_sent(dispenser, id) + local finished, total = dispenser:sent(id) if finished then log.info('%d buckets were successfully sent to %s', total, id) end goto continue end - route_dispenser_put(dispenser, id) + err = status and err or ok + dispenser:put(id) if err.type ~= 'ShardingError' or err.code ~= lerror.code.TOO_MANY_RECEIVING then log.error('Error during rebalancer routes applying: receiver %s, '.. @@ -2695,11 +2797,11 @@ local function rebalancer_worker_f(worker_id, dispenser, quit_cond) log.info('Can not finish transfers to %s, skip to next round', id) worker_throttle_count = 0 dispenser.error = dispenser.error or err - route_dispenser_skip(dispenser, id) + dispenser:skip(id) goto continue end worker_throttle_count = worker_throttle_count + 1 - if route_dispenser_throttle(dispenser, id) then + if dispenser:throttle(id) then log.error('Too many buckets is being sent to %s', id) end if worker_throttle_count < dispenser.rlist.count then @@ -2713,7 +2815,7 @@ local function rebalancer_worker_f(worker_id, dispenser, quit_cond) log.info('The worker is back') end ::continue:: - id = route_dispenser_pop(dispenser) + id, bid = rebalancer_dispenser_pop(dispenser) end quit_cond:broadcast() end @@ -2729,7 +2831,7 @@ local function rebalancer_service_apply_routes_f(service, routes) setmetatable(routes, {__serialize = 'mapping'}) log.info('Apply rebalancer routes with %d workers:\n%s', worker_count, yaml_encode(routes)) - local dispenser = route_dispenser_create(routes) + local dispenser = route_dispenser.new(routes) local _status = box.space._bucket.index.status assert(_status:count({BSENDING}) == 0) assert(_status:count({BRECEIVING}) == 0) @@ -2754,6 +2856,11 @@ local function rebalancer_service_apply_routes_f(service, routes) 'Rebalancer worker %d threw an exception: %s', i, res)) end end + -- There may be prepared bucket left due to send errors, which caused + -- skipping a replicaset. Unblock them already, so that they are recovered. + for _, bid in ipairs(dispenser.prepared_buckets) do + bucket_send_end(bid) + end if not dispenser.error then log.info('Rebalancer routes are applied') service:set_status_ok() @@ -2972,7 +3079,7 @@ local function rebalancer_request_state() local _bucket = box.space._bucket local status_index = _bucket.index.status local repl_id = M.this_replica.id - for _, status in pairs({BSENDING, BRECEIVING, BGARBAGE}) do + for _, status in pairs({BSENDING, BRECEIVING, BGARBAGE, BREADONLY}) do if #status_index:select({status}, {limit = 1}) > 0 then local err = string.format('Replica %s has %s buckets during ' .. 'rebalancing', repl_id, status) @@ -3377,6 +3484,7 @@ end service_call_api = setmetatable({ bucket_recv = bucket_recv, bucket_test_gc = bucket_test_gc, + bucket_test_send = bucket_test_send, rebalancer_apply_routes = rebalancer_apply_routes, rebalancer_request_state = rebalancer_request_state, recovery_bucket_stat = recovery_bucket_stat, @@ -3793,6 +3901,7 @@ local function storage_cfg_xc(cfgctx) M.rebalancer_disbalance_threshold = new_cfg.rebalancer_disbalance_threshold M.rebalancer_receiving_quota = new_cfg.rebalancer_max_receiving M.rebalancer_worker_count = new_cfg.rebalancer_max_sending + M.rebalancer_bucket_send_timeout = new_cfg.rebalancer_bucket_send_timeout M.sync_timeout = new_cfg.sync_timeout M.current_cfg = new_cfg storage_cfg_master_commit(cfgctx) @@ -3990,6 +4099,7 @@ local function storage_info(opts) state.bucket.garbage = status:count({BSENT}) state.bucket.receiving = status:count({BRECEIVING}) state.bucket.sending = status:count({BSENDING}) + state.bucket.readonly = status:count({BREADONLY}) state.bucket.pinned = pinned if state.bucket.receiving ~= 0 and state.bucket.sending ~= 0 then -- @@ -4203,14 +4313,6 @@ M.api_call_cache = storage_api_call_unsafe M.gc_bucket_drop = gc_bucket_drop M.rebalancer_build_routes = rebalancer_build_routes M.rebalancer_calculate_metrics = rebalancer_calculate_metrics -M.route_dispenser = { - create = route_dispenser_create, - put = route_dispenser_put, - throttle = route_dispenser_throttle, - skip = route_dispenser_skip, - pop = route_dispenser_pop, - sent = route_dispenser_sent, -} M.bucket_state_edges = bucket_state_edges M.bucket_are_all_rw = bucket_are_all_rw_public diff --git a/vshard/storage/route_dispenser.lua b/vshard/storage/route_dispenser.lua new file mode 100644 index 00000000..e6ad2715 --- /dev/null +++ b/vshard/storage/route_dispenser.lua @@ -0,0 +1,193 @@ +-- +-- Dispenser is a container of routes received from the rebalancer. Its task is +-- to hand out the routes to worker fibers in a round-robin manner so as any +-- two sequential results are different. It allows to spread dispensing evenly +-- over the receiver nodes. +-- +local fiber = require('fiber') +local rlist = require('vshard.rlist') +local lerror = require('vshard.error') +local util = require('vshard.util') + +-- +-- Put one bucket back to the dispenser. It happens on any error. +-- +local function route_dispenser_put(dispenser, id) + local dst = dispenser.map[id] + if dst then + dispenser.remaining_count = dispenser.remaining_count + 1 + local bucket_count = dst.bucket_count + 1 + dst.bucket_count = bucket_count + if bucket_count == 1 then + dispenser.rlist:add_tail(dst) + end + end +end + +-- +-- In case if a receiver responded with a serious error it is not +-- safe to send more buckets to there. For example, if it was a +-- timeout, it is unknown whether the bucket was received or not. +-- If it was a box error like index key conflict, then it is even +-- worse and the cluster is broken. +-- +local function route_dispenser_skip(dispenser, id) + local map = dispenser.map + local dst = map[id] + if dst then + map[id] = nil + dispenser.rlist:remove(dst) + dispenser.remaining_count = dispenser.remaining_count - dst.bucket_count + assert(dispenser.remaining_count >= 0) + end +end + +-- +-- Set that the receiver @a id was throttled. When it happens +-- first time it is logged. +-- +local function route_dispenser_throttle(dispenser, id) + local dst = dispenser.map[id] + if dst then + local old_value = dst.is_throttle_warned + dst.is_throttle_warned = true + return not old_value + end + return false +end + +-- +-- Notify the dispenser that a bucket was successfully sent to +-- @a id. It has no any functional purpose except tracking +-- progress. +-- +local function route_dispenser_sent(dispenser, id) + local dst = dispenser.map[id] + if dst then + local new_progress = dst.progress + 1 + dst.progress = new_progress + local need_to_send = dst.need_to_send + return new_progress == need_to_send, need_to_send + end + return false +end + +-- +-- Take a next destination to send a bucket to. +-- +local function route_dispenser_pop(dispenser) + local buckets = dispenser.prepared_buckets + local rlist = dispenser.rlist + local dst = rlist.first + if dst and #buckets > 0 then + dispenser.remaining_count = dispenser.remaining_count - 1 + assert(dispenser.remaining_count >= 0) + local bucket_count = dst.bucket_count - 1 + dst.bucket_count = bucket_count + rlist:remove(dst) + if bucket_count > 0 then + rlist:add_tail(dst) + end + return dst.id, table.remove(buckets) + end + return nil +end + +local function route_dispenser_wait_prepared(dispenser, timeout) + if #dispenser.prepared_buckets > 0 then + -- Fast path. In most cases buckets are already prepared. + return true + end + if not dispenser.rlist.first or dispenser.remaining_count == 0 then + return nil, lerror.make('Nothing to wait anymore') + end + if not dispenser.is_prepare_in_progress then + -- Nobody is preparing the buckets, we must do it. + return false + end + return util.fiber_cond_wait(dispenser.prepare_cond, timeout) +end + +local function route_dispenser_prepare_begin(dispenser) + assert(not dispenser.is_prepare_in_progress) + assert(#dispenser.prepared_buckets == 0) + dispenser.is_prepare_in_progress = true +end + +local function route_dispenser_prepare_commit(dispenser, buckets) + assert(dispenser.is_prepare_in_progress) + assert(#dispenser.prepared_buckets == 0) + dispenser.is_prepare_in_progress = false + dispenser.prepare_cond:broadcast() + if buckets then + assert(type(buckets) == 'table') + dispenser.prepared_buckets = buckets + end +end + +local route_dispenser_mt = { + __index = { + put = route_dispenser_put, + skip = route_dispenser_skip, + throttle = route_dispenser_throttle, + sent = route_dispenser_sent, + pop = route_dispenser_pop, + wait_prepared = route_dispenser_wait_prepared, + prepare_begin = route_dispenser_prepare_begin, + prepare_commit = route_dispenser_prepare_commit, + } +} + +local function route_dispenser_new(routes) + local rlist = rlist.new() + local map = {} + local total = 0 + for id, bucket_count in pairs(routes) do + total = total + bucket_count + local new = { + -- Receiver's ID. + id = id, + -- Rest of buckets to send. The receiver will be + -- dispensed this number of times. + bucket_count = bucket_count, + -- Constant value to be able to track progress. + need_to_send = bucket_count, + -- Number of *successfully* sent buckets. + progress = 0, + -- If a user set too long max number of receiving + -- buckets, or too high number of workers, worker + -- fibers will receive 'throttle' errors, perhaps + -- quite often. So as not to clog the log each + -- destination is logged as throttled only once. + is_throttle_warned = false, + } + -- Map of destinations is stored in addition to the queue, + -- because + -- 1) It is possible, that there are no more buckets to + -- send, but suddenly one of the workers trying to send + -- the last bucket receives a throttle error. In that + -- case the bucket is put back, and the destination + -- returns to the queue; + -- 2) After all buckets are sent, and the queue is empty, + -- the main applier fiber does some analysis on the + -- destinations. + map[id] = new + rlist:add_tail(new) + end + return setmetatable({ + rlist = rlist, + map = map, + -- Remaining bucket count, which has to be sent. + remaining_count = total, + -- The table of buckets, which are ready to be sent. + prepared_buckets = {}, + -- The buckets are prepared by a single worker, which locks this var. + is_prepare_in_progress = false, + -- Condition to wait for buckets to be prepared. + prepare_cond = fiber.cond(), + }, route_dispenser_mt) +end + +return { + new = route_dispenser_new, +} diff --git a/vshard/util.lua b/vshard/util.lua index 427c3706..6fdef9af 100644 --- a/vshard/util.lua +++ b/vshard/util.lua @@ -478,6 +478,15 @@ else uri_format = luri.format end +local function errinj_countdown(errinj, countdown_name, callback) + if errinj[countdown_name] and errinj[countdown_name] >= 0 then + errinj[countdown_name] = errinj[countdown_name] - 1 + if errinj[countdown_name] == -1 then + callback() + end + end +end + return { core_version = tnt_version, uri_eq = uri_eq, @@ -503,4 +512,5 @@ return { replicaset_uuid = replicaset_uuid, uri_format = uri_format, module_unload_functions = module_unload_functions, + errinj_countdown = errinj_countdown, }