From ae914d967369a54f89060b9e271b7892c580c238 Mon Sep 17 00:00:00 2001 From: Roman Gromov Date: Tue, 3 Mar 2026 15:35:16 +0300 Subject: [PATCH 1/5] router: refactoring of router_ref_storage_* functions Before this patch the main `map_callrw` ref functions such as `router_ref_storage_all` and `router_ref_storage_by_buckets` were enormous (71 and 108 lines of code). Also these functions have a large number of similar functional code blocks such as "sending refs", "collecting refs" e.t.c. Since in tarantool/vshard#559 patch we will extend the logic of full map_callrw making it able to work with split args, the `router_ref_storage_all` can double in size. It can lead to degradation of our codebase due to less readability. To fix it we firstly determine general and repeated code blocks in ref functions: 1) `ref-prepare`: groups buckets by replicasets with router's cache, builds a table of "target" replicasets and waits necessary masters. 2) `ref-send`: sends refs to the remote storage asynchronously and builds a table of future objects for the next processing. 3) `ref-collect`: waits until future objects are ready in order to extract payload from it (responses of storages' functions). 4) `ref-process`: a custom logic for `full` or `partial` map_callrw modes which describes how we should process results from future objects. After defining the main stages of ref map_callrw functions we should unify them so that we can use them in both `router_ref_storage_all` and `router_ref_storage_by_buckets`. Needed for tarantool/vshard#559 NO_TEST=refactoring NO_DOC=refactoring --- vshard/router/init.lua | 264 ++++++++++++++++++++++------------------- 1 file changed, 144 insertions(+), 120 deletions(-) diff --git a/vshard/router/init.lua b/vshard/router/init.lua index 098bca2c..65690d1d 100644 --- a/vshard/router/init.lua +++ b/vshard/router/init.lua @@ -772,59 +772,140 @@ local function router_call(router, bucket_id, opts, ...) end -- --- Perform Ref stage of the Ref-Map-Reduce process on all the known replicasets. +-- Prepares a cluster before sending refs to remote storages: +-- 1) Groups the buckets by replicasets according to the router's cache +-- in case of partial map_callrw; +-- 2) Builds a table of replicasets on which refs should be sent; +-- 3) Waits necessary masters until all connections are established; -- -local function router_ref_storage_all(router, timeout) - local replicasets = router.replicasets +local function router_map_callrw_prepare(router, timeout, bucket_ids) + local err, err_id + local replicasets_to_wait, grouped_buckets = {}, {} + local replicasets_all = router.replicasets local deadline = fiber_clock() + timeout - local err, err_id, res - local futures = {} - local bucket_count = 0 - local opts_async = {is_async = true} - local rs_count = 0 - local rid = M.ref_id - M.ref_id = rid + 1 - -- Nil checks are done explicitly here (== nil instead of 'not'), because - -- netbox requests return box.NULL instead of nils. - - -- - -- Ref stage: send. - -- - -- Netbox async requests work only with active connections. Need to wait - -- for the connection explicitly. + if bucket_ids then + bucket_ids = bucket_ids or {} + -- Group the buckets by replicasets according to the router cache. + grouped_buckets, err = buckets_group(router, bucket_ids, timeout) + if err ~= nil then + return nil, err + end + for rs_id, _ in pairs(grouped_buckets) do + table.insert(replicasets_to_wait, replicasets_all[rs_id]) + end + else + replicasets_to_wait = replicasets_all + end + -- Netbox async requests work only with active connections. + -- So, we need to wait for the master connection explicitly. + timeout = deadline - fiber_clock() timeout, err, err_id = lreplicaset.wait_masters_connect( - replicasets, timeout) + replicasets_to_wait, timeout) if not timeout then - goto fail + return nil, err, err_id end - for id, rs in pairs(replicasets) do - res, err = rs:callrw('vshard.storage._call', - {'storage_ref', rid, timeout}, opts_async) + return timeout, nil, nil, grouped_buckets +end + +-- +-- Sends asynchronous refs to the remote storages and forms a table of future +-- objects. An arguments' table for storage_ref_* functions is built according +-- to args_builder closure which captures necessary router's variables from +-- high-level ref functions (such as router_ref_storage_all and router_ref_ +-- storage_by_buckets). +-- +local function router_map_callrw_send(router, timeout, args_builder, + grouped_buckets) + local futures = {} + local opts_async = {is_async = true} + local replicasets_all = router.replicasets + local rs_ids = grouped_buckets or replicasets_all + for rs_id, _ in pairs(rs_ids) do + local args_ref = args_builder(rs_id) + local res, err = replicasets_all[rs_id]:callrw('vshard.storage._call', + args_ref, opts_async) if res == nil then - err_id = id - goto fail + return nil, err, rs_id end - futures[id] = res - rs_count = rs_count + 1 + futures[rs_id] = res end - -- - -- Ref stage: collect. - -- + return timeout, nil, nil, futures +end + +-- +-- Waits until all future objects are ready and extracts results from it. +-- +local function router_map_callrw_collect(futures, timeout) + local results = {} + local deadline = fiber_clock() + timeout for id, future in pairs(futures) do - res, err = future_wait(future, timeout) + timeout = deadline - fiber_clock() + local res, err = future_wait(future, timeout) -- Handle netbox error first. if res == nil then - err_id = id - goto fail + return nil, err, id end -- Ref returns nil,err or bucket count. res, err = res[1], res[2] if res == nil then - err_id = id - goto fail + return nil, err, id end + results[id] = res + end + return timeout, nil, nil, results +end + +-- +-- Handles all buckets which were moved from the remote storages, rewrites the +-- router's cache and builds a new table of bucket_ids for the next iteration +-- of partial map_callrw. +-- +local function router_map_callrw_process_moved(router, results) + local bucket_ids = {} + for _, res in pairs(results) do + for _, bucket in pairs(res.moved) do + local bid = bucket.id + local dst = bucket.dst + -- 'Reset' regardless of 'set'. So as not to + -- bother with 'set' errors. If it fails, then + -- won't matter. It is a best-effort thing. + bucket_reset(router, bid) + if dst ~= nil then + bucket_set(router, bid, dst) + end + table.insert(bucket_ids, bid) + end + end + return bucket_ids +end + +-- +-- Perform Ref stage of the Ref-Map-Reduce process on all the known replicasets. +-- +local function router_ref_storage_all(router, timeout) + local bucket_count = 0 + local err, err_id, args_builder, results + local futures = {} + local replicasets_all = router.replicasets + local rid = M.ref_id + M.ref_id = rid + 1 + + timeout, err, err_id = router_map_callrw_prepare(router, timeout) + if not timeout then + goto fail + end + args_builder = function() return {'storage_ref', rid, timeout} end + timeout, err, err_id, futures = router_map_callrw_send(router, timeout, + args_builder) + if not timeout then + goto fail + end + timeout, err, err_id, results = router_map_callrw_collect(futures, timeout) + if not timeout then + goto fail + end + for _, res in pairs(results) do bucket_count = bucket_count + res - timeout = deadline - fiber_clock() end -- All refs are done but not all buckets are covered. This is odd and can -- mean many things. The most possible ones: 1) outdated configuration on @@ -837,13 +918,13 @@ local function router_ref_storage_all(router, timeout) router.total_bucket_count - bucket_count) goto fail end - do return timeout, nil, nil, rid, replicasets end + do return timeout, nil, nil, rid, replicasets_all end ::fail:: for _, f in pairs(futures) do f:discard() end - return nil, err, err_id, rid, replicasets + return nil, err, err_id, rid, replicasets_all end -- @@ -851,103 +932,46 @@ end -- replicasets, which contains all the listed bucket IDs. -- local function router_ref_storage_by_buckets(router, bucket_ids, timeout) - local grouped_buckets - local group_count - local err, err_id, res - local replicasets_all = router.replicasets - local replicasets_to_map = {} - local futures = {} - local opts_async = {is_async = true} - local deadline = fiber_clock() + timeout + local err, err_id, grouped_buckets, args_builder, results + local replicasets_to_map, futures = {}, {} local rid = M.ref_id M.ref_id = rid + 1 - -- Nil checks are done explicitly here (== nil instead of 'not'), because -- netbox requests return box.NULL instead of nils. - - -- Ref stage. while next(bucket_ids) do - -- Group the buckets by replicasets according to the router cache. - grouped_buckets, err = buckets_group(router, bucket_ids, timeout) - if grouped_buckets == nil then - goto fail - end - timeout = deadline - fiber_clock() - - -- Netbox async requests work only with active connections. - -- So, first need to wait for the master connection explicitly. - local replicasets_to_check = {} - group_count = 0 - for uuid, _ in pairs(grouped_buckets) do - group_count = group_count + 1 - table.insert(replicasets_to_check, replicasets_all[uuid]) - end - timeout, err, err_id = lreplicaset.wait_masters_connect( - replicasets_to_check, timeout) + timeout, err, err_id, grouped_buckets = router_map_callrw_prepare( + router, timeout, bucket_ids) if not timeout then goto fail end - - -- Send ref requests with timeouts to the replicasets. - futures = table_new(0, group_count) - for id, buckets in pairs(grouped_buckets) do - if timeout == nil then - err_id = id - goto fail - end - local args_ref - if replicasets_to_map[id] then + args_builder = function(rs_id) + local buckets = grouped_buckets[rs_id] or {} + if replicasets_to_map[rs_id] then -- Replicaset is already referenced on a previous iteration. -- Simply get the moved buckets without double referencing. - args_ref = { - 'storage_ref_check_with_buckets', rid, buckets} + return {'storage_ref_check_with_buckets', rid, buckets} else - args_ref = { - 'storage_ref_make_with_buckets', rid, timeout, buckets} - end - res, err = replicasets_all[id]:callrw('vshard.storage._call', - args_ref, opts_async) - if res == nil then - err_id = id - goto fail + return {'storage_ref_make_with_buckets', rid, timeout, buckets} end - futures[id] = res end - - -- Wait for the refs to be done and collect moved buckets. - bucket_ids = {} - for id, f in pairs(futures) do - res, err = future_wait(f, timeout) - -- Handle netbox error first. - if res == nil then - err_id = id - goto fail - end - -- Ref returns nil,err or {is_done, moved}. - res, err = res[1], res[2] - if res == nil then - err_id = id - goto fail - end - for _, bucket in pairs(res.moved) do - local bid = bucket.id - local dst = bucket.dst - -- 'Reset' regardless of 'set'. So as not to - -- bother with 'set' errors. If it fails, then - -- won't matter. It is a best-effort thing. - bucket_reset(router, bid) - if dst ~= nil then - bucket_set(router, bid, dst) - end - table.insert(bucket_ids, bid) - end + timeout, err, err_id, futures = router_map_callrw_send( + router, timeout, args_builder, grouped_buckets) + if not timeout then + goto fail + end + timeout, err, err_id, results = router_map_callrw_collect(futures, + timeout) + if not timeout then + goto fail + end + bucket_ids = router_map_callrw_process_moved(router, results) + for rs_id, res in pairs(results) do if res.is_done then - assert(not replicasets_to_map[id]) - -- If there are no buckets on the replicaset, it would not be - -- referenced. - replicasets_to_map[id] = replicasets_all[id] + assert(not replicasets_to_map[rs_id]) + -- If there are no buckets on the replicaset, it would + -- not be referenced. + replicasets_to_map[rs_id] = router.replicasets[rs_id] end - timeout = deadline - fiber_clock() end end do return timeout, nil, nil, rid, replicasets_to_map end From 4e61f997ba0c8f82fad067e3814d38389d726b60 Mon Sep 17 00:00:00 2001 From: Roman Gromov Date: Mon, 30 Mar 2026 13:04:36 +0300 Subject: [PATCH 2/5] storage: refactoring of bucket_get_moved In this patch we change `allstatus.GARBAGE/SENT` on `BACTIVE/BSENT` to not repeat the code. Needed for tarantool/vshard#214 NO_DOC=refactoring NO_TEST=refactoring --- vshard/storage/init.lua | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/vshard/storage/init.lua b/vshard/storage/init.lua index f62d150f..d4c815f5 100644 --- a/vshard/storage/init.lua +++ b/vshard/storage/init.lua @@ -3196,7 +3196,6 @@ end -- under any circumstances. -- local function bucket_get_moved(bucket_ids) - local allstatus = consts.BUCKET local res = {} for _, bucket_id in pairs(bucket_ids) do local bucket = box.space._bucket:get{bucket_id} @@ -3205,7 +3204,7 @@ local function bucket_get_moved(bucket_ids) is_moved = true else local status = bucket.status - is_moved = status == allstatus.GARBAGE or status == allstatus.SENT + is_moved = status == BGARBAGE or status == BSENT end if is_moved then table.insert(res, { From 13f266cf737312c731e630b5676446d1654ddb49 Mon Sep 17 00:00:00 2001 From: Roman Gromov Date: Tue, 3 Mar 2026 15:49:08 +0300 Subject: [PATCH 3/5] router: refactoring of ref_ids' logic in router_map_callrw This patch takes initialization of `rid` out to `router_map_callrw` and passes this variable to ref-functions. It is needed for future features tidiness, for example - `make full map_callrw with split args` in which the logic of `router_map_callrw` becomes more complex. Needed for #559 NO_DOC=refactoring NO_TEST=refactoring --- vshard/router/init.lua | 27 +++++++++++++-------------- 1 file changed, 13 insertions(+), 14 deletions(-) diff --git a/vshard/router/init.lua b/vshard/router/init.lua index 65690d1d..04485ed4 100644 --- a/vshard/router/init.lua +++ b/vshard/router/init.lua @@ -882,13 +882,11 @@ end -- -- Perform Ref stage of the Ref-Map-Reduce process on all the known replicasets. -- -local function router_ref_storage_all(router, timeout) +local function router_ref_storage_all(router, timeout, rid) local bucket_count = 0 local err, err_id, args_builder, results local futures = {} local replicasets_all = router.replicasets - local rid = M.ref_id - M.ref_id = rid + 1 timeout, err, err_id = router_map_callrw_prepare(router, timeout) if not timeout then @@ -918,24 +916,22 @@ local function router_ref_storage_all(router, timeout) router.total_bucket_count - bucket_count) goto fail end - do return timeout, nil, nil, rid, replicasets_all end + do return timeout, nil, nil, replicasets_all end ::fail:: for _, f in pairs(futures) do f:discard() end - return nil, err, err_id, rid, replicasets_all + return nil, err, err_id, replicasets_all end -- -- Perform Ref stage of the Ref-Map-Reduce process on a subset of all the -- replicasets, which contains all the listed bucket IDs. -- -local function router_ref_storage_by_buckets(router, bucket_ids, timeout) +local function router_ref_storage_by_buckets(router, bucket_ids, timeout, rid) local err, err_id, grouped_buckets, args_builder, results local replicasets_to_map, futures = {}, {} - local rid = M.ref_id - M.ref_id = rid + 1 -- Nil checks are done explicitly here (== nil instead of 'not'), because -- netbox requests return box.NULL instead of nils. while next(bucket_ids) do @@ -974,13 +970,13 @@ local function router_ref_storage_by_buckets(router, bucket_ids, timeout) end end end - do return timeout, nil, nil, rid, replicasets_to_map end + do return timeout, nil, nil, replicasets_to_map end ::fail:: for _, f in pairs(futures) do f:discard() end - return nil, err, err_id, rid, replicasets_to_map + return nil, err, err_id, replicasets_to_map end -- @@ -1170,17 +1166,20 @@ local function router_map_callrw(router, func, args, opts) else timeout = consts.CALL_TIMEOUT_MIN end + rid = M.ref_id + M.ref_id = rid + 1 if plain_bucket_ids then - timeout, err, err_id, rid, replicasets_to_map = - router_ref_storage_by_buckets(router, plain_bucket_ids, timeout) + timeout, err, err_id, replicasets_to_map = + router_ref_storage_by_buckets(router, plain_bucket_ids, timeout, + rid) -- Grouped arguments are only possible with partial Map-Reduce. if timeout then grouped_args = router_group_map_callrw_args( router, plain_bucket_ids, bucket_ids) end else - timeout, err, err_id, rid, replicasets_to_map = - router_ref_storage_all(router, timeout) + timeout, err, err_id, replicasets_to_map = + router_ref_storage_all(router, timeout, rid) end if timeout then map, err, err_id = replicasets_map_reduce(replicasets_to_map, rid, func, From 1be7b8e1055ecd2f2033d6304408e246b0f2ba46 Mon Sep 17 00:00:00 2001 From: Roman Gromov Date: Fri, 27 Mar 2026 15:11:03 +0300 Subject: [PATCH 4/5] test: refactoring of router/reload_test Before this patch the `router-luatest/reload_test` checked router's services only with old routers. However in future patch (gh-214) we need to check map_callrw with old storages. In order to make it able we: 1) change `vtest.cluster_new` so that we can pass server_config with certain ENV (LUA_PATH) variable into it. It can help us to create a new cluster on old version of vshard. 2) change `reload_router` to more general `reload_server` in order to unify the process of servers (router / storage) upgrade in `router-luatest/reload_test`. 3) unify the process of cluster creation - `create_cluster_on_specific_version` and the process of getting server's config with new ENV (LUA_PATH) variable - `get_config_for_specific_vshard_version`. Needed for tarantool/vshard#214 NO_DOC=refactoring NO_TEST=refactoring --- test/luatest_helpers/cluster.lua | 1 + test/luatest_helpers/vtest.lua | 4 +- test/router-luatest/reload_test.lua | 107 +++++++++++++++------------- 3 files changed, 62 insertions(+), 50 deletions(-) diff --git a/test/luatest_helpers/cluster.lua b/test/luatest_helpers/cluster.lua index 43e3479f..e247bad8 100644 --- a/test/luatest_helpers/cluster.lua +++ b/test/luatest_helpers/cluster.lua @@ -42,6 +42,7 @@ function Cluster:drop() server:cleanup() end end + self.servers = {} end function Cluster:get_index(server) diff --git a/test/luatest_helpers/vtest.lua b/test/luatest_helpers/vtest.lua index c054e09a..f849fd78 100644 --- a/test/luatest_helpers/vtest.lua +++ b/test/luatest_helpers/vtest.lua @@ -148,7 +148,8 @@ end -- -- Build new cluster by a given config. -- -local function cluster_new(g, cfg) +local function cluster_new(g, cfg, server_config) + server_config = server_config or {} if not g.cluster then g.cluster = cluster:new({}) end @@ -204,6 +205,7 @@ local function cluster_new(g, cfg) local server = g.cluster:build_server({ alias = replica_name, box_cfg = box_cfg, + env = server_config.env }, 'storage.lua') g[replica_name] = server -- VShard specific details to use in various helper functions. diff --git a/test/router-luatest/reload_test.lua b/test/router-luatest/reload_test.lua index c1ae28d7..36e2d71a 100644 --- a/test/router-luatest/reload_test.lua +++ b/test/router-luatest/reload_test.lua @@ -28,6 +28,39 @@ local cfg_template = { } local global_cfg +local function get_config_for_specific_vshard_version(hash) + git_util.exec('checkout', {args = hash .. ' -f', dir = g.vshard_copy_path}) + local path = g.vshard_copy_path_load + local lua_path = string.format('%s/?.lua;%s/?/init.lua;', path, path) + -- Force 'require' to use new directory + return {env = {['LUA_PATH'] = lua_path .. os.getenv('LUA_PATH')}} +end + +local function create_cluster_on_specific_version(hash) + local server_config = get_config_for_specific_vshard_version(hash) + vtest.cluster_new(g, global_cfg, server_config) + vtest.cluster_bootstrap(g, global_cfg) + vtest.cluster_rebalancer_disable(g) + + vtest.cluster_exec_each_master(g, function() + local test = box.schema.space.create('test', {format = { + {'id', 'unsigned'}, + {'bucket_id', 'unsigned'}, + }}) + + test:create_index('primary') + test:create_index('bucket_id', {unique = false, parts = {2}}) + end) + vtest.cluster_exec_each(g, function() + rawset(_G, 'insert', function(space_name, tuple) + return box.space[space_name]:insert(tuple) + end) + rawset(_G, 'get', function(space_name, key) + return box.space[space_name]:get(key) + end) + end) +end + g.before_all(function() -- Override of the built in modules is available only since 2.11.0. t.run_only_if(vutil.version_is_at_least(2, 11, 0, nil, 0, 0)) @@ -62,31 +95,7 @@ g.before_all(function() -- Hash of the latest commit for testing router on the latest version. g.latest_hash = git_util.log_hashes({args = '-1', dir = vtest.sourcedir})[1] - - -- No need to reload storages. Just run them on the latest version. - vtest.cluster_new(g, global_cfg) - vtest.cluster_bootstrap(g, global_cfg) - vtest.cluster_rebalancer_disable(g) - - -- Basic storage configuration - vtest.cluster_exec_each_master(g, function() - local test = box.schema.space.create('test', {format = { - {'id', 'unsigned'}, - {'bucket_id', 'unsigned'}, - }}) - - test:create_index('primary') - test:create_index('bucket_id', {unique = false, parts = {2}}) - end) - - vtest.cluster_exec_each(g, function() - rawset(_G, 'insert', function(space_name, tuple) - return box.space[space_name]:insert(tuple) - end) - rawset(_G, 'get', function(space_name, key) - return box.space[space_name]:get(key) - end) - end) + create_cluster_on_specific_version(g.latest_hash) end) g.after_all(function() @@ -115,55 +124,55 @@ end -- * Checkout to old version; -- * Create and start a router; -- 3. Test smth on the old version; --- 4. Invoke reload_router(): +-- 4. Invoke reload_server(): -- * Checkout to the latest version; -- * Reload the module. -- 5. Test smth on the new version -- 6. Drop a router with vtest.drop_instance -- local function create_router_at(hash) - git_util.exec('checkout', {args = hash .. ' -f', dir = g.vshard_copy_path}) - local path = g.vshard_copy_path_load - local lua_path = string.format("%s/?.lua;%s/?/init.lua;", path, path) - local router = vtest.router_new(g, 'router', nil, { - env = { - -- Force 'require' to use new directory - ['LUA_PATH'] = lua_path .. os.getenv('LUA_PATH') - }, - }) + local server_config = get_config_for_specific_vshard_version(hash) + local router = vtest.router_new(g, 'router', nil, server_config) router_cfg(router, global_cfg) return router end -- --- Reloads router. If service_name is provided, then +-- Reloads server (router or storage). If service_name is provided, then -- the function also waits until the service is restarted. -local function reload_router(router, service_name) +-- +local function reload_server(server, server_type, service_name) git_util.exec('checkout', {args = g.latest_hash .. ' -f', dir = g.vshard_copy_path}) - router:exec(function(service_name) + server:exec(function(server_type, service_name) local service - local internal = ivshard.router.internal + local internal = ivshard[server_type].internal if service_name ~= nil then - service = internal.static_router[service_name] + if server_type == 'router' then + service = internal.static_router[service_name] + else + service = internal[service_name] + end ilt.assert_not_equals(service, nil) end - ilt.assert_equals(ivshard.router.module_version(), 0) - package.loaded['vshard.router'] = nil - ivshard.router = require('vshard.router') + local package_name = string.format('vshard.%s', server_type) + ilt.assert_equals(ivshard[server_type].module_version(), 0) + package.loaded[package_name] = nil + ivshard[server_type] = require(package_name) _G.ivconst = require('vshard.consts') - ilt.assert_equals(ivshard.router.module_version(), 1) + ilt.assert_equals(ivshard[server_type].module_version(), 1) if service ~= nil then ilt.helpers.retrying({timeout = ivtest.wait_timeout, delay = ivtest.busy_step}, function() - if service == internal.static_router[service_name] then + if service == internal.static_router[service_name] or + service == internal[service_name] then error('Service have not been reloaded yet') end end) end - end, {service_name}) + end, {server_type, service_name}) end local function test_basic_template(router) @@ -185,7 +194,7 @@ g.test_basic = function(g) local router = create_router_at(hash) router_assert_version_equals(router, nil) test_basic_template(router) - reload_router(router) + reload_server(router, 'router') router_assert_version_equals(router, vconsts.VERSION) test_basic_template(router) vtest.drop_instance(g, router) @@ -228,7 +237,7 @@ g.test_discovery = function(g) local router = create_router_at(hash) router_assert_version_equals(router, nil) test_discovery_template(g, router) - reload_router(router, 'discovery_service') + reload_server(router, 'router', 'discovery_service') router_assert_version_equals(router, vconsts.VERSION) test_discovery_template(g, router) vtest.drop_instance(g, router) @@ -327,7 +336,7 @@ g.test_master_search = function(g) local auto_master_cfg = vtest.config_new(auto_master_cfg_template) test_master_search_template(g, router, auto_master_cfg) - reload_router(router) + reload_server(router, 'router') router_assert_version_equals(router, vconsts.VERSION) -- Wait for old master_search service to be stopped. router:exec(function() From 8f5223ceb24fb0ba461a60440b82b54752b93fbc Mon Sep 17 00:00:00 2001 From: Roman Gromov Date: Fri, 27 Mar 2026 15:11:48 +0300 Subject: [PATCH 5/5] router: make full map_callrw with split args This patch introduces a new way of `map_callrw` execution by which we can pass some arguments to all storages and split buckets' arguments to those storages that have at least one bucket of `bucket_ids`. To achieve this we introduce a new string option - `mode` to `map_callrw` api. Also we change the logic of `router_ref_storage_all` ref function. Firstly we ref all storages and get back an amount of "moved" buckets according to the previously built router's cache. Then if there are no "moved" buckets we accumulate and check total amount of buckets on all storages and finish map_callrw ref stage. Otherwise, if there are some "moved" buckets we perform the second network hop by checking on which replicasets do the remaining "moved" buckets reside on. Closes tarantool#559 @TarantoolBot document Title: vshard: `mode` option for `router.map_callrw()` This string option regulates on which storages the user function will be executed via `map_callrw`. Possible values: 1) mode = 'partial'. In this mode user function will be executed on storages that have at least one bucket of 'bucket_ids'. The 'bucket_ids' option can be presented in two ways: like a numeric array of buckets' ids or like a map of buckets' arguments. In first one user function will only receive args, in second one it will additionally receive buckets' arguments. 2) mode = 'full'. In this mode user function will be executed with args on all storages in cluster. If we pass 'bucket_ids' like a map of bucket's arguments the user function will additionally receive buckets' arguments on those storages that have at least one bucket of 'bucket_ids'. If we didn't specify the 'mode' option, then it is set based on 'bucket_ids' option - if 'bucket_ids' is presented, the mode will be 'partial' otherwise 'full'. Also now `map_callrw` ends with error in cases of `` and ``. --- test/router-luatest/map_callrw_test.lua | 177 ++++++++++++++++++++++++ test/router-luatest/reload_test.lua | 78 +++++++++++ test/storage-luatest/storage_1_test.lua | 19 +-- test/upgrade/upgrade.result | 1 + vshard/consts.lua | 5 + vshard/error.lua | 5 + vshard/router/init.lua | 159 ++++++++++++++++----- vshard/storage/init.lua | 31 ++++- 8 files changed, 431 insertions(+), 44 deletions(-) diff --git a/test/router-luatest/map_callrw_test.lua b/test/router-luatest/map_callrw_test.lua index 72c6c717..a8d4b3f0 100644 --- a/test/router-luatest/map_callrw_test.lua +++ b/test/router-luatest/map_callrw_test.lua @@ -685,3 +685,180 @@ g.test_map_callrw_with_cdata_bucket_id = function(cg) ilt.assert_not(err) end) end + +g.test_full_map_callrw_with_numeric_bucket_ids = function(cg) + local res = router_do_map(cg.router, {123}, { + mode = 'full', + timeout = vtest.wait_timeout, + bucket_ids = {1, 2, 3} + }) + t.assert(res.err) + t.assert_equals(res.err.message, 'Router can\'t execute map_callrw ' .. + 'with \'full\' mode and numeric bucket_ids') + t.assert_not(res.err_id) + t.assert_not(res.val) +end + +g.test_partial_map_callrw_with_nil_bucket_ids = function(cg) + local res = router_do_map(cg.router, {123}, { + mode = 'partial', + timeout = vtest.wait_timeout + }) + t.assert(res.err) + t.assert_equals(res.err.message, 'Router can\'t execute map_callrw ' .. + 'with \'partial\' mode and nil bucket_ids') + t.assert_not(res.err_id) + t.assert_not(res.val) +end + +local function make_do_map_tracking_bucket_ids(cg) + -- We override 'do_map' function on storages in order to check that + -- default arguments and bucket arguments were successfully passed into + -- destination storages according to mode and bucket_ids options. + vtest.cluster_exec_each_master(cg, function() + rawset(_G, 'old_do_map', _G.do_map) + rawset(_G, 'do_map', function(args, bucket_args) + ilt.assert_gt(require('vshard.storage.ref').count, 0) + return {ivutil.replicaset_uuid(), + {args = args, b_args = bucket_args}} + end) + end) +end + +local function reset_do_map_to_old_state(cg) + vtest.cluster_exec_each_master(cg, function() + rawset(_G, 'do_map', _G.old_do_map) + end) +end + +g.test_full_map_callrw_with_split_args = function(cg) + make_do_map_tracking_bucket_ids(cg) + local bid1 = vtest.storage_first_bucket(g.replica_1_a) + local bid2 = vtest.storage_first_bucket(g.replica_2_a) + local bid3 = vtest.storage_first_bucket(g.replica_3_a) + + local res = router_do_map(cg.router, {0}, { + mode = 'full', + timeout = vtest.wait_timeout, + bucket_ids = {[bid1] = {111}, [bid2] = {222}, [bid3] = {333}} + }) + t.assert_not(res.err) + t.assert_not(res.err_id) + t.assert_equals(res.val, { + [cg.rs1_uuid] = {{cg.rs1_uuid, {args = 0, b_args = {[bid1] = {111}}}}}, + [cg.rs2_uuid] = {{cg.rs2_uuid, {args = 0, b_args = {[bid2] = {222}}}}}, + [cg.rs3_uuid] = {{cg.rs3_uuid, {args = 0, b_args = {[bid3] = {333}}}}}, + }) + + res = router_do_map(cg.router, {0}, { + mode = 'full', + timeout = vtest.wait_timeout, + bucket_ids = {[bid2] = {222}} + }) + t.assert_not(res.err) + t.assert_not(res.err_id) + t.assert_equals(res.val, { + [cg.rs1_uuid] = {{cg.rs1_uuid, {args = 0}}}, + [cg.rs2_uuid] = {{cg.rs2_uuid, {args = 0, b_args = {[bid2] = {222}}}}}, + [cg.rs3_uuid] = {{cg.rs3_uuid, {args = 0}}}, + }) + reset_do_map_to_old_state(cg) +end + +g.test_full_map_callrw_without_bucket_ids = function(cg) + make_do_map_tracking_bucket_ids(cg) + local res = router_do_map(cg.router, {0}, { + mode = 'full', + timeout = vtest.wait_timeout + }) + t.assert_not(res.err) + t.assert_not(res.err_id) + t.assert_equals(res.val, { + [cg.rs1_uuid] = {{cg.rs1_uuid, {args = 0}}}, + [cg.rs2_uuid] = {{cg.rs2_uuid, {args = 0}}}, + [cg.rs3_uuid] = {{cg.rs3_uuid, {args = 0}}}, + }) + reset_do_map_to_old_state(cg) +end + +g.test_partial_map_callrw_with_numeric_bucket_ids = function(cg) + make_do_map_tracking_bucket_ids(cg) + local bid1 = vtest.storage_first_bucket(g.replica_1_a) + local res = router_do_map(cg.router, {0}, { + mode = 'partial', + timeout = vtest.wait_timeout, + bucket_ids = {bid1} + }) + t.assert_not(res.err) + t.assert_not(res.err_id) + t.assert_equals(res.val, {[cg.rs1_uuid] = {{cg.rs1_uuid, {args = 0}}}}) + reset_do_map_to_old_state(cg) +end + +g.test_partial_map_callrw_with_split_args = function(cg) + make_do_map_tracking_bucket_ids(cg) + local bid1 = vtest.storage_first_bucket(g.replica_1_a) + local bid2 = vtest.storage_first_bucket(g.replica_2_a) + local res = router_do_map(cg.router, {0}, { + mode = 'partial', + timeout = vtest.wait_timeout, + bucket_ids = {[bid1] = {111}, [bid2] = {222}} + }) + t.assert_not(res.err) + t.assert_not(res.err_id) + t.assert_equals(res.val, { + [cg.rs1_uuid] = {{cg.rs1_uuid, {args = 0, b_args = {[bid1] = {111}}}}}, + [cg.rs2_uuid] = {{cg.rs2_uuid, {args = 0, b_args = {[bid2] = {222}}}}}, + }) + reset_do_map_to_old_state(cg) +end + +local function move_bucket(src_storage, dest_storage, bucket_id) + src_storage:exec(function(bucket_id, replicaset_id) + t.helpers.retrying({timeout = 60}, function() + local res, err = ivshard.storage.bucket_send(bucket_id, + replicaset_id) + t.assert_not(err) + t.assert(res) + end) + end, {bucket_id, dest_storage:replicaset_uuid()}) + src_storage:exec(function(bucket_id) + t.helpers.retrying({timeout = 10}, function() + t.assert_equals(box.space._bucket:select(bucket_id), {}) + end) + end, {bucket_id}) + dest_storage:exec(function(bucket_id) + t.helpers.retrying({timeout = 10}, function() + t.assert_equals(box.space._bucket:get(bucket_id).status, 'active') + end) + end, {bucket_id}) +end + +g.test_full_map_callrw_with_split_args_and_broken_cache = function(cg) + make_do_map_tracking_bucket_ids(cg) + cg.router:exec(function() + ivshard.router.internal.errinj.ERRINJ_LONG_DISCOVERY = true + ivshard.router.discovery_wakeup() + end) + + local moved_bucket = vtest.storage_first_bucket(cg.replica_1_a) + move_bucket(g.replica_1_a, g.replica_2_a, moved_bucket) + local res = router_do_map(cg.router, {0}, { + mode = 'partial', + timeout = vtest.wait_timeout, + bucket_ids = {[moved_bucket] = {111}} + }) + t.assert_not(res.err) + t.assert_not(res.err_id) + t.assert_equals(res.val, { + [cg.rs2_uuid] = {{cg.rs2_uuid, {args = 0, + b_args = {[moved_bucket] = {111}}}}}, + }) + + cg.router:exec(function() + ivshard.router.internal.errinj.ERRINJ_LONG_DISCOVERY = false + ivshard.router.discovery_wakeup() + end) + move_bucket(g.replica_2_a, g.replica_1_a, moved_bucket) + reset_do_map_to_old_state(cg) +end diff --git a/test/router-luatest/reload_test.lua b/test/router-luatest/reload_test.lua index 36e2d71a..9ef86b66 100644 --- a/test/router-luatest/reload_test.lua +++ b/test/router-luatest/reload_test.lua @@ -58,6 +58,11 @@ local function create_cluster_on_specific_version(hash) rawset(_G, 'get', function(space_name, key) return box.space[space_name]:get(key) end) + rawset(_G, 'do_map', function(args, bucket_args) + ilt.assert_gt(require('vshard.storage.ref').count, 0) + return {ivutil.replicaset_uuid(), + {args = args, b_args = bucket_args}} + end) end) end @@ -352,3 +357,76 @@ g.test_master_search = function(g) test_master_search_template(g, router, auto_master_cfg) vtest.drop_instance(g, router) end + +g.before_test('test_map_callrw', function(g) + g.cluster:drop() + -- Full mapp_callrw with split args was introduced just right after + -- this commit. We need to test the behavior of map_callrw on old + -- storage versions in order to check that there will be no crashes + -- of storages due to changes in storage_ref_* functions. + create_cluster_on_specific_version( + '1be7b8e1055ecd2f2033d6304408e246b0f2ba46') +end) + +g.after_test('test_map_callrw', function(g) + g.cluster:drop() + create_cluster_on_specific_version(g.latest_hash) +end) + +g.test_map_callrw = function(g) + local rs_uuids = {g.replica_1_a:replicaset_uuid(), + g.replica_2_a:replicaset_uuid()} + local router = vtest.router_new(g, 'router') + router_cfg(router, global_cfg) + -- The latest router and old masters + router:exec(function(rs1_uuid, rs2_uuid) + -- Full map_callrw + local res, err, err_id = ivshard.router.map_callrw('do_map', {'arg_1'}, + {timeout = ivtest.wait_timeout, mode = 'full'}) + t.assert_not(err) + t.assert_not(err_id) + t.assert_equals(res, { + [rs1_uuid] = {{rs1_uuid, {args = 'arg_1'}}}, + [rs2_uuid] = {{rs2_uuid, {args = 'arg_1'}}}, + }) + -- Full map_callrw with split args + res, err, err_id = ivshard.router.map_callrw('do_map', {'arg_1'}, + {timeout = 3, mode = 'full', bucket_ids = {[1] = {'b_arg_1'}}}) + t.assert_equals(err.name, 'UNSUPPORTED') + t.assert_not(err_id) + t.assert_not(res) + -- Partial map_callrw with numeric buckets' args + res, err, err_id = ivshard.router.map_callrw('do_map', {'arg_1'}, + {timeout = 3, mode = 'partial', bucket_ids = {1,}}) + t.assert_not(err) + t.assert_not(err_id) + t.assert_equals(res, { + [rs1_uuid] = {{rs1_uuid, {args = 'arg_1'}}}, + }) + -- Partial map_callrw with split args + res, err, err_id = ivshard.router.map_callrw('do_map', {'arg_1'}, + {timeout = 3, mode = 'partial', bucket_ids = {[1] = {'b_arg_1'}}}) + t.assert_not(err) + t.assert_not(err_id) + t.assert_equals(res, { + [rs1_uuid] = {{rs1_uuid, {args = 'arg_1', + b_args = {{'b_arg_1'}}}}}, + }) + end, rs_uuids) + for _, storage in pairs({g.replica_1_a, g.replica_2_a}) do + reload_server(storage, 'storage') + end + -- The latest router and latest masters + router:exec(function(rs1_uuid, rs2_uuid) + -- Full map_callrw with split args + local res, err, err_id = ivshard.router.map_callrw('do_map', {'arg_1'}, + {timeout = 3, mode = 'full', bucket_ids = {[1] = {'b_arg_1'}}}) + t.assert_not(err) + t.assert_not(err_id) + t.assert_equals(res, { + [rs1_uuid] = {{rs1_uuid, {args = 'arg_1', + b_args = {{'b_arg_1'}}}}}, + [rs2_uuid] = {{rs2_uuid, {args = 'arg_1'}}}, + }) + end, rs_uuids) +end diff --git a/test/storage-luatest/storage_1_test.lua b/test/storage-luatest/storage_1_test.lua index 4d671de7..3cbca360 100644 --- a/test/storage-luatest/storage_1_test.lua +++ b/test/storage-luatest/storage_1_test.lua @@ -240,14 +240,14 @@ test_group.test_ref_with_buckets_basic = function(g) res, err = ivshard.storage._call( 'storage_ref_make_with_buckets', rid, iwait_timeout, {}) ilt.assert_equals(err, nil) - ilt.assert_equals(res, {moved = {}}) + ilt.assert_equals(res, {moved = {}, total = 10}) ilt.assert_equals(lref.count, 0) -- Check for a single ok bucket. res, err = ivshard.storage._call( 'storage_ref_make_with_buckets', rid, iwait_timeout, {bids[1]}) ilt.assert_equals(err, nil) - ilt.assert_equals(res, {is_done = true, moved = {}}) + ilt.assert_equals(res, {is_done = true, moved = {}, total = 10}) ilt.assert_equals(lref.count, 1) _, err = ivshard.storage._call('storage_unref', rid) ilt.assert_equals(err, nil) @@ -258,7 +258,7 @@ test_group.test_ref_with_buckets_basic = function(g) 'storage_ref_make_with_buckets', rid, iwait_timeout, {bids[1], bids[2]}) ilt.assert_equals(err, nil) - ilt.assert_equals(res, {is_done = true, moved = {}}) + ilt.assert_equals(res, {is_done = true, moved = {}, total = 10}) _, err = ivshard.storage._call('storage_unref', rid) ilt.assert_equals(err, nil) @@ -267,7 +267,7 @@ test_group.test_ref_with_buckets_basic = function(g) 'storage_ref_make_with_buckets', rid, iwait_timeout, {bids[1], bids[1]}) ilt.assert_equals(err, nil) - ilt.assert_equals(res, {is_done = true, moved = {}}) + ilt.assert_equals(res, {is_done = true, moved = {}, total = 10}) ilt.assert_equals(lref.count, 1) _, err = ivshard.storage._call('storage_unref', rid) ilt.assert_equals(err, nil) @@ -285,7 +285,8 @@ test_group.test_ref_with_buckets_basic = function(g) {id = bucket_count + 1}, {id = bucket_count + 2}, {id = bucket_count + 3}, - } + }, + total = 10, }) _, err = ivshard.storage._call('storage_unref', rid) ilt.assert_equals(err, nil) @@ -298,7 +299,7 @@ test_group.test_ref_with_buckets_basic = function(g) {bucket_count + 1, bucket_count + 2} ) ilt.assert_equals(err, nil) - ilt.assert_equals(res, {moved = { + ilt.assert_equals(res, {total = 10, moved = { {id = bucket_count + 1}, {id = bucket_count + 2}, }}) @@ -347,7 +348,7 @@ test_group.test_ref_with_buckets_return_last_known_dst = function(g) local res, err = ivshard.storage._call( 'storage_ref_make_with_buckets', rid, iwait_timeout, {bid}) ilt.assert_equals(err, nil) - ilt.assert_equals(res, {moved = {{ + ilt.assert_equals(res, {total = 10, moved = {{ id = bid, dst = id, status = ivconst.BUCKET.SENT, @@ -400,6 +401,7 @@ test_group.test_ref_with_buckets_move_part_while_referencing = function(g) ilt.assert_equals(res, { moved = {{id = bids[2], dst = id}}, is_done = true, + total = 8, }) -- Ref was done, because at least one bucket was ok. ilt.assert_equals(lref.count, 1) @@ -455,7 +457,8 @@ test_group.test_ref_with_buckets_move_all_while_referencing = function(g) moved = { {id = bids[1], dst = id}, {id = bids[2], dst = id}, - } + }, + total = 8, }) -- Ref was not done, because all the buckets moved out. ilt.assert_equals(lref.count, 0) diff --git a/test/upgrade/upgrade.result b/test/upgrade/upgrade.result index e0c9469b..58643fc0 100644 --- a/test/upgrade/upgrade.result +++ b/test/upgrade/upgrade.result @@ -179,6 +179,7 @@ vshard.storage._call('test_api', 1, 2, 3) | - recovery_bucket_stat | - storage_map | - storage_ref + | - storage_ref_check_existent | - storage_ref_check_with_buckets | - storage_ref_make_with_buckets | - storage_unref diff --git a/vshard/consts.lua b/vshard/consts.lua index f5ad61e4..6ce3b2fc 100644 --- a/vshard/consts.lua +++ b/vshard/consts.lua @@ -28,6 +28,11 @@ return { RED = 3, }, + MAP_CALLRW_MODE = { + FULL = 'full', + PARTIAL = 'partial', + }, + REPLICATION_THRESHOLD_SOFT = 1, REPLICATION_THRESHOLD_HARD = 5, REPLICATION_THRESHOLD_FAIL = 10, diff --git a/vshard/error.lua b/vshard/error.lua index 0892758c..f8dea1bc 100644 --- a/vshard/error.lua +++ b/vshard/error.lua @@ -207,6 +207,11 @@ local error_message_template = { msg = 'Mismatch server name: expected "%s", but got "%s"', args = {'expected_name', 'actual_name'}, }, + [42] = { + name = 'UNSUPPORTED', + msg = 'Can\'t perform %s. The storage should be upgraded', + args = {'operation'}, + } } -- diff --git a/vshard/router/init.lua b/vshard/router/init.lua index 04485ed4..17c34723 100644 --- a/vshard/router/init.lua +++ b/vshard/router/init.lua @@ -29,6 +29,8 @@ local map_serializer = { __serialize = 'map' } local future_wait = util.future_wait local msgpack_is_object = lmsgpack.is_object +local MAP_CALLRW_FULL = consts.MAP_CALLRW_MODE.FULL +local MAP_CALLRW_PARTIAL = consts.MAP_CALLRW_MODE.PARTIAL if not util.feature.msgpack_object then local msg = 'Msgpack object feature is not supported by current '.. @@ -774,27 +776,28 @@ end -- -- Prepares a cluster before sending refs to remote storages: -- 1) Groups the buckets by replicasets according to the router's cache --- in case of partial map_callrw; +-- in case of partial map_callrw or full map_callrw with split args; -- 2) Builds a table of replicasets on which refs should be sent; -- 3) Waits necessary masters until all connections are established; -- -local function router_map_callrw_prepare(router, timeout, bucket_ids) +local function router_map_callrw_prepare(router, timeout, mode, bucket_ids) local err, err_id local replicasets_to_wait, grouped_buckets = {}, {} local replicasets_all = router.replicasets local deadline = fiber_clock() + timeout if bucket_ids then - bucket_ids = bucket_ids or {} -- Group the buckets by replicasets according to the router cache. grouped_buckets, err = buckets_group(router, bucket_ids, timeout) if err ~= nil then return nil, err end + end + if mode == MAP_CALLRW_FULL then + replicasets_to_wait = replicasets_all + else for rs_id, _ in pairs(grouped_buckets) do table.insert(replicasets_to_wait, replicasets_all[rs_id]) end - else - replicasets_to_wait = replicasets_all end -- Netbox async requests work only with active connections. -- So, we need to wait for the master connection explicitly. @@ -863,7 +866,11 @@ end local function router_map_callrw_process_moved(router, results) local bucket_ids = {} for _, res in pairs(results) do - for _, bucket in pairs(res.moved) do + if type(res) ~= 'table' then + goto continue + end + local moved = res.moved or {} + for _, bucket in pairs(moved) do local bid = bucket.id local dst = bucket.dst -- 'Reset' regardless of 'set'. So as not to @@ -875,24 +882,43 @@ local function router_map_callrw_process_moved(router, results) end table.insert(bucket_ids, bid) end + ::continue:: end return bucket_ids end +local function router_map_callrw_process_existent(router, results) + for rs_id, res in pairs(results) do + for _, bucket_id in pairs(res) do + bucket_reset(router, bucket_id) + bucket_set(router, bucket_id, rs_id) + end + end +end + -- -- Perform Ref stage of the Ref-Map-Reduce process on all the known replicasets. -- -local function router_ref_storage_all(router, timeout, rid) +local function router_ref_storage_all(router, bucket_ids, timeout, rid) + local mode = MAP_CALLRW_FULL local bucket_count = 0 - local err, err_id, args_builder, results + local err, err_id, grouped_buckets, args_builder, results local futures = {} local replicasets_all = router.replicasets - timeout, err, err_id = router_map_callrw_prepare(router, timeout) + timeout, err, err_id, grouped_buckets = router_map_callrw_prepare( + router, timeout, mode, bucket_ids) if not timeout then goto fail end - args_builder = function() return {'storage_ref', rid, timeout} end + args_builder = function(rs_id) + local buckets = grouped_buckets[rs_id] or {} + if grouped_buckets[rs_id] then + return {'storage_ref_make_with_buckets', rid, timeout, buckets} + else + return {'storage_ref', rid, timeout} + end + end timeout, err, err_id, futures = router_map_callrw_send(router, timeout, args_builder) if not timeout then @@ -903,7 +929,34 @@ local function router_ref_storage_all(router, timeout, rid) goto fail end for _, res in pairs(results) do - bucket_count = bucket_count + res + if type(res) == 'table' and not res.total then + -- This error throws only in case when the updated router tries + -- to calculate the total amount of buckets . On old storage + -- versions the function storage_ref_make_with_buckets doesn't + -- return total amount of buckets. + err = lerror.vshard(lerror.code.UNSUPPORTED, + 'full map_callrw with split args') + goto fail + end + local rs_total = type(res) == 'table' and res.total or res + bucket_count = bucket_count + rs_total + end + bucket_ids = router_map_callrw_process_moved(router, {}, results) + if next(bucket_ids) then + args_builder = function() + return {'storage_ref_check_existent', rid, bucket_ids} + end + timeout, err, err_id, futures = router_map_callrw_send(router, timeout, + args_builder) + if not timeout then + goto fail + end + timeout, err, err_id, results = router_map_callrw_collect(futures, + timeout) + if not timeout then + goto fail + end + router_map_callrw_process_existent(router, results) end -- All refs are done but not all buckets are covered. This is odd and can -- mean many things. The most possible ones: 1) outdated configuration on @@ -930,13 +983,14 @@ end -- replicasets, which contains all the listed bucket IDs. -- local function router_ref_storage_by_buckets(router, bucket_ids, timeout, rid) + local mode = MAP_CALLRW_PARTIAL local err, err_id, grouped_buckets, args_builder, results local replicasets_to_map, futures = {}, {} -- Nil checks are done explicitly here (== nil instead of 'not'), because -- netbox requests return box.NULL instead of nils. while next(bucket_ids) do timeout, err, err_id, grouped_buckets = router_map_callrw_prepare( - router, timeout, bucket_ids) + router, timeout, mode, bucket_ids) if not timeout then goto fail end @@ -999,13 +1053,14 @@ local function replicasets_map_reduce(replicasets, rid, func, args, -- local func_args = {'storage_map', rid, func, args} for id, rs in pairs(replicasets) do - if grouped_args ~= nil then + local rs_args = grouped_args and grouped_args[id] + if rs_args then -- It's cheaper to push and then pop, rather then deepcopy -- arguments table for every call. - table.insert(args, grouped_args[id]) + table.insert(args, rs_args) end local res, err = rs:callrw('vshard.storage._call', func_args, opts_map) - if grouped_args ~= nil then + if rs_args then table.remove(args) end if res == nil then @@ -1103,14 +1158,50 @@ local function router_group_map_callrw_args(router, bucket_ids, bucket_args) return grouped_args end +-- +-- Set the appropriate mode according to bucket_ids option for backward +-- compatibility (in case of opts_mode is nil) and check the given opts_mode +-- correctness in other cases. +-- +local function router_check_map_callrw_mode(opts_mode, bucket_ids) + if opts_mode == nil then + return bucket_ids and MAP_CALLRW_PARTIAL or MAP_CALLRW_FULL + end + if opts_mode == MAP_CALLRW_PARTIAL and bucket_ids == nil then + return nil, lerror.make('Router can\'t execute map_callrw with ' .. + '\'partial\' mode and nil bucket_ids') + end + if opts_mode == MAP_CALLRW_FULL and util.table_is_numeric(bucket_ids) then + return nil, lerror.make('Router can\'t execute map_callrw with ' .. + '\'full\' mode and numeric bucket_ids') + end + return opts_mode +end + -- -- Consistent Map-Reduce. The given function is called on masters in the cluster -- with a guarantee that in case of success it was executed with all buckets -- being accessible for reads and writes. -- --- The selection of masters depends on bucket_ids option. When specified, the --- Map-Reduce is performed only on masters having at least one of these buckets. --- Otherwise it is executed on all the masters in the cluster. +-- The selection of masters depends on 'mode' and 'bucket_ids' options. There +-- are 2 general modes how map_callrw can be executed: +-- 1) mode = 'partial'. In this mode user function will be executed on +-- storages that have at least one bucket of 'bucket_ids'. The +-- 'bucket_ids' option can be presented in two ways: like a numeric array +-- of buckets' ids or like a map of buckets' arguments. In first one user +-- function will only receive args, in second one it will additionally +-- receive buckets' arguments. +-- 2) mode = 'full'. In this mode user function will be executed with args on +-- all storages in cluster. If we pass 'bucket_ids' like a map of bucket's +-- arguments the user function will additionally receive buckets' +-- arguments on those storages that have at least one bucket of +-- 'bucket_ids'. +-- +-- If we didn't specify the 'mode' option, then it is set based on 'bucket_ids' +-- option - if 'bucket_ids' is presented, the mode will be 'partial' otherwise +-- 'full'. Also the next combination of map_callrw options can lead to error: +-- and . -- -- Consistency in scope of map-reduce means all the data was accessible, and -- didn't move during map requests execution. To preserve the consistency there @@ -1133,6 +1224,8 @@ end -- @param func Name of the function to call. -- @param args Function arguments passed in netbox style (as an array). -- @param opts Options. See below: +-- - mode - a string option ('full' / 'partial') that represents a way of +-- execution of user function on destination storages. -- - timeout - a number of seconds. Note that the refs may end up being kept -- on the storages during this entire timeout if something goes wrong. -- For instance, network issues appear. This means better not use a @@ -1156,8 +1249,13 @@ end -- local function router_map_callrw(router, func, args, opts) local replicasets_to_map, err, err_id, map, rid - local timeout, do_return_raw, bucket_ids, plain_bucket_ids, grouped_args + local mode, timeout, do_return_raw, bucket_ids, plain_bucket_ids, + grouped_args if opts then + mode, err = router_check_map_callrw_mode(opts.mode, opts.bucket_ids) + if err then + return nil, err + end timeout = opts.timeout or consts.CALL_TIMEOUT_MIN do_return_raw = opts.return_raw bucket_ids = opts.bucket_ids @@ -1165,27 +1263,26 @@ local function router_map_callrw(router, func, args, opts) util.table_keys(bucket_ids) else timeout = consts.CALL_TIMEOUT_MIN + mode = MAP_CALLRW_FULL end rid = M.ref_id M.ref_id = rid + 1 - if plain_bucket_ids then + if mode == MAP_CALLRW_FULL then + timeout, err, err_id, replicasets_to_map = + router_ref_storage_all(router, plain_bucket_ids, timeout, rid) + else timeout, err, err_id, replicasets_to_map = router_ref_storage_by_buckets(router, plain_bucket_ids, timeout, rid) - -- Grouped arguments are only possible with partial Map-Reduce. - if timeout then + end + if timeout then + if plain_bucket_ids then grouped_args = router_group_map_callrw_args( router, plain_bucket_ids, bucket_ids) end - else - timeout, err, err_id, replicasets_to_map = - router_ref_storage_all(router, timeout, rid) - end - if timeout then - map, err, err_id = replicasets_map_reduce(replicasets_to_map, rid, func, - args, grouped_args, { - timeout = timeout, return_raw = do_return_raw - }) + opts = {timeout = timeout, return_raw = do_return_raw, mode = mode} + map, err, err_id = replicasets_map_reduce( + replicasets_to_map, rid, func, args, grouped_args, opts) if map then return map end diff --git a/vshard/storage/init.lua b/vshard/storage/init.lua index d4c815f5..672889ea 100644 --- a/vshard/storage/init.lua +++ b/vshard/storage/init.lua @@ -3179,6 +3179,26 @@ local function storage_call(bucket_id, mode, name, args) return ok, ret1, ret2, ret3 end +local function bucket_get_existent(bucket_ids) + local res = {} + for _, bucket_id in pairs(bucket_ids) do + local bucket = box.space._bucket:get{bucket_id} + if bucket and bucket.status ~= BGARBAGE and bucket.status ~= BSENT then + table.insert(res, bucket_id) + end + end + return res +end + +local function storage_ref_check_existent(rid, bucket_ids) + local ok, err = lref.check(rid, box.session.id()) + if not ok then + return nil, err + end + bucket_ids = bucket_ids or {} + return bucket_get_existent(bucket_ids) +end + -- -- Bind a new storage ref to the current box session. Is used as a part of -- Map-Reduce API. @@ -3243,11 +3263,11 @@ local function storage_ref_make_with_buckets(rid, timeout, bucket_ids) if #moved == #bucket_ids then -- If all the passed buckets are absent, there is no need to create a -- ref. - return {moved = moved} + return {moved = moved, total = bucket_count()} end local bucket_generation = M.bucket_generation - local ok, err = storage_ref(rid, timeout) - if not ok then + local bucket_count, err = storage_ref(rid, timeout) + if not bucket_count then return nil, err end if M.bucket_generation ~= bucket_generation then @@ -3256,10 +3276,10 @@ local function storage_ref_make_with_buckets(rid, timeout, bucket_ids) moved = bucket_get_moved(bucket_ids) if #moved == #bucket_ids then storage_unref(rid) - return {moved = moved} + return {moved = moved, total = bucket_count} end end - return {is_done = true, moved = moved} + return {is_done = true, moved = moved, total = bucket_count} end -- @@ -3380,6 +3400,7 @@ service_call_api = setmetatable({ rebalancer_request_state = rebalancer_request_state, recovery_bucket_stat = recovery_bucket_stat, storage_ref = storage_ref, + storage_ref_check_existent = storage_ref_check_existent, storage_ref_make_with_buckets = storage_ref_make_with_buckets, storage_ref_check_with_buckets = storage_ref_check_with_buckets, storage_unref = storage_unref,