Skip to content

Commit 8f5223c

Browse files
committed
router: make full map_callrw with split args
This patch introduces a new way of `map_callrw` execution by which we can pass some arguments to all storages and split buckets' arguments to those storages that have at least one bucket of `bucket_ids`. To achieve this we introduce a new string option - `mode` to `map_callrw` api. Also we change the logic of `router_ref_storage_all` ref function. Firstly we ref all storages and get back an amount of "moved" buckets according to the previously built router's cache. Then if there are no "moved" buckets we accumulate and check total amount of buckets on all storages and finish map_callrw ref stage. Otherwise, if there are some "moved" buckets we perform the second network hop by checking on which replicasets do the remaining "moved" buckets reside on. Closes #559 @TarantoolBot document Title: vshard: `mode` option for `router.map_callrw()` This string option regulates on which storages the user function will be executed via `map_callrw`. Possible values: 1) mode = 'partial'. In this mode user function will be executed on storages that have at least one bucket of 'bucket_ids'. The 'bucket_ids' option can be presented in two ways: like a numeric array of buckets' ids or like a map of buckets' arguments. In first one user function will only receive args, in second one it will additionally receive buckets' arguments. 2) mode = 'full'. In this mode user function will be executed with args on all storages in cluster. If we pass 'bucket_ids' like a map of bucket's arguments the user function will additionally receive buckets' arguments on those storages that have at least one bucket of 'bucket_ids'. If we didn't specify the 'mode' option, then it is set based on 'bucket_ids' option - if 'bucket_ids' is presented, the mode will be 'partial' otherwise 'full'. Also now `map_callrw` ends with error in cases of `<mode = 'full', bucket_ids = {1, 2, ...}>` and `<mode = 'partial', bucket_ids = nil>`.
1 parent 1be7b8e commit 8f5223c

8 files changed

Lines changed: 431 additions & 44 deletions

File tree

test/router-luatest/map_callrw_test.lua

Lines changed: 177 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -685,3 +685,180 @@ g.test_map_callrw_with_cdata_bucket_id = function(cg)
685685
ilt.assert_not(err)
686686
end)
687687
end
688+
689+
g.test_full_map_callrw_with_numeric_bucket_ids = function(cg)
690+
local res = router_do_map(cg.router, {123}, {
691+
mode = 'full',
692+
timeout = vtest.wait_timeout,
693+
bucket_ids = {1, 2, 3}
694+
})
695+
t.assert(res.err)
696+
t.assert_equals(res.err.message, 'Router can\'t execute map_callrw ' ..
697+
'with \'full\' mode and numeric bucket_ids')
698+
t.assert_not(res.err_id)
699+
t.assert_not(res.val)
700+
end
701+
702+
g.test_partial_map_callrw_with_nil_bucket_ids = function(cg)
703+
local res = router_do_map(cg.router, {123}, {
704+
mode = 'partial',
705+
timeout = vtest.wait_timeout
706+
})
707+
t.assert(res.err)
708+
t.assert_equals(res.err.message, 'Router can\'t execute map_callrw ' ..
709+
'with \'partial\' mode and nil bucket_ids')
710+
t.assert_not(res.err_id)
711+
t.assert_not(res.val)
712+
end
713+
714+
local function make_do_map_tracking_bucket_ids(cg)
715+
-- We override 'do_map' function on storages in order to check that
716+
-- default arguments and bucket arguments were successfully passed into
717+
-- destination storages according to mode and bucket_ids options.
718+
vtest.cluster_exec_each_master(cg, function()
719+
rawset(_G, 'old_do_map', _G.do_map)
720+
rawset(_G, 'do_map', function(args, bucket_args)
721+
ilt.assert_gt(require('vshard.storage.ref').count, 0)
722+
return {ivutil.replicaset_uuid(),
723+
{args = args, b_args = bucket_args}}
724+
end)
725+
end)
726+
end
727+
728+
local function reset_do_map_to_old_state(cg)
729+
vtest.cluster_exec_each_master(cg, function()
730+
rawset(_G, 'do_map', _G.old_do_map)
731+
end)
732+
end
733+
734+
g.test_full_map_callrw_with_split_args = function(cg)
735+
make_do_map_tracking_bucket_ids(cg)
736+
local bid1 = vtest.storage_first_bucket(g.replica_1_a)
737+
local bid2 = vtest.storage_first_bucket(g.replica_2_a)
738+
local bid3 = vtest.storage_first_bucket(g.replica_3_a)
739+
740+
local res = router_do_map(cg.router, {0}, {
741+
mode = 'full',
742+
timeout = vtest.wait_timeout,
743+
bucket_ids = {[bid1] = {111}, [bid2] = {222}, [bid3] = {333}}
744+
})
745+
t.assert_not(res.err)
746+
t.assert_not(res.err_id)
747+
t.assert_equals(res.val, {
748+
[cg.rs1_uuid] = {{cg.rs1_uuid, {args = 0, b_args = {[bid1] = {111}}}}},
749+
[cg.rs2_uuid] = {{cg.rs2_uuid, {args = 0, b_args = {[bid2] = {222}}}}},
750+
[cg.rs3_uuid] = {{cg.rs3_uuid, {args = 0, b_args = {[bid3] = {333}}}}},
751+
})
752+
753+
res = router_do_map(cg.router, {0}, {
754+
mode = 'full',
755+
timeout = vtest.wait_timeout,
756+
bucket_ids = {[bid2] = {222}}
757+
})
758+
t.assert_not(res.err)
759+
t.assert_not(res.err_id)
760+
t.assert_equals(res.val, {
761+
[cg.rs1_uuid] = {{cg.rs1_uuid, {args = 0}}},
762+
[cg.rs2_uuid] = {{cg.rs2_uuid, {args = 0, b_args = {[bid2] = {222}}}}},
763+
[cg.rs3_uuid] = {{cg.rs3_uuid, {args = 0}}},
764+
})
765+
reset_do_map_to_old_state(cg)
766+
end
767+
768+
g.test_full_map_callrw_without_bucket_ids = function(cg)
769+
make_do_map_tracking_bucket_ids(cg)
770+
local res = router_do_map(cg.router, {0}, {
771+
mode = 'full',
772+
timeout = vtest.wait_timeout
773+
})
774+
t.assert_not(res.err)
775+
t.assert_not(res.err_id)
776+
t.assert_equals(res.val, {
777+
[cg.rs1_uuid] = {{cg.rs1_uuid, {args = 0}}},
778+
[cg.rs2_uuid] = {{cg.rs2_uuid, {args = 0}}},
779+
[cg.rs3_uuid] = {{cg.rs3_uuid, {args = 0}}},
780+
})
781+
reset_do_map_to_old_state(cg)
782+
end
783+
784+
g.test_partial_map_callrw_with_numeric_bucket_ids = function(cg)
785+
make_do_map_tracking_bucket_ids(cg)
786+
local bid1 = vtest.storage_first_bucket(g.replica_1_a)
787+
local res = router_do_map(cg.router, {0}, {
788+
mode = 'partial',
789+
timeout = vtest.wait_timeout,
790+
bucket_ids = {bid1}
791+
})
792+
t.assert_not(res.err)
793+
t.assert_not(res.err_id)
794+
t.assert_equals(res.val, {[cg.rs1_uuid] = {{cg.rs1_uuid, {args = 0}}}})
795+
reset_do_map_to_old_state(cg)
796+
end
797+
798+
g.test_partial_map_callrw_with_split_args = function(cg)
799+
make_do_map_tracking_bucket_ids(cg)
800+
local bid1 = vtest.storage_first_bucket(g.replica_1_a)
801+
local bid2 = vtest.storage_first_bucket(g.replica_2_a)
802+
local res = router_do_map(cg.router, {0}, {
803+
mode = 'partial',
804+
timeout = vtest.wait_timeout,
805+
bucket_ids = {[bid1] = {111}, [bid2] = {222}}
806+
})
807+
t.assert_not(res.err)
808+
t.assert_not(res.err_id)
809+
t.assert_equals(res.val, {
810+
[cg.rs1_uuid] = {{cg.rs1_uuid, {args = 0, b_args = {[bid1] = {111}}}}},
811+
[cg.rs2_uuid] = {{cg.rs2_uuid, {args = 0, b_args = {[bid2] = {222}}}}},
812+
})
813+
reset_do_map_to_old_state(cg)
814+
end
815+
816+
local function move_bucket(src_storage, dest_storage, bucket_id)
817+
src_storage:exec(function(bucket_id, replicaset_id)
818+
t.helpers.retrying({timeout = 60}, function()
819+
local res, err = ivshard.storage.bucket_send(bucket_id,
820+
replicaset_id)
821+
t.assert_not(err)
822+
t.assert(res)
823+
end)
824+
end, {bucket_id, dest_storage:replicaset_uuid()})
825+
src_storage:exec(function(bucket_id)
826+
t.helpers.retrying({timeout = 10}, function()
827+
t.assert_equals(box.space._bucket:select(bucket_id), {})
828+
end)
829+
end, {bucket_id})
830+
dest_storage:exec(function(bucket_id)
831+
t.helpers.retrying({timeout = 10}, function()
832+
t.assert_equals(box.space._bucket:get(bucket_id).status, 'active')
833+
end)
834+
end, {bucket_id})
835+
end
836+
837+
g.test_full_map_callrw_with_split_args_and_broken_cache = function(cg)
838+
make_do_map_tracking_bucket_ids(cg)
839+
cg.router:exec(function()
840+
ivshard.router.internal.errinj.ERRINJ_LONG_DISCOVERY = true
841+
ivshard.router.discovery_wakeup()
842+
end)
843+
844+
local moved_bucket = vtest.storage_first_bucket(cg.replica_1_a)
845+
move_bucket(g.replica_1_a, g.replica_2_a, moved_bucket)
846+
local res = router_do_map(cg.router, {0}, {
847+
mode = 'partial',
848+
timeout = vtest.wait_timeout,
849+
bucket_ids = {[moved_bucket] = {111}}
850+
})
851+
t.assert_not(res.err)
852+
t.assert_not(res.err_id)
853+
t.assert_equals(res.val, {
854+
[cg.rs2_uuid] = {{cg.rs2_uuid, {args = 0,
855+
b_args = {[moved_bucket] = {111}}}}},
856+
})
857+
858+
cg.router:exec(function()
859+
ivshard.router.internal.errinj.ERRINJ_LONG_DISCOVERY = false
860+
ivshard.router.discovery_wakeup()
861+
end)
862+
move_bucket(g.replica_2_a, g.replica_1_a, moved_bucket)
863+
reset_do_map_to_old_state(cg)
864+
end

test/router-luatest/reload_test.lua

Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,11 @@ local function create_cluster_on_specific_version(hash)
5858
rawset(_G, 'get', function(space_name, key)
5959
return box.space[space_name]:get(key)
6060
end)
61+
rawset(_G, 'do_map', function(args, bucket_args)
62+
ilt.assert_gt(require('vshard.storage.ref').count, 0)
63+
return {ivutil.replicaset_uuid(),
64+
{args = args, b_args = bucket_args}}
65+
end)
6166
end)
6267
end
6368

@@ -352,3 +357,76 @@ g.test_master_search = function(g)
352357
test_master_search_template(g, router, auto_master_cfg)
353358
vtest.drop_instance(g, router)
354359
end
360+
361+
g.before_test('test_map_callrw', function(g)
362+
g.cluster:drop()
363+
-- Full mapp_callrw with split args was introduced just right after
364+
-- this commit. We need to test the behavior of map_callrw on old
365+
-- storage versions in order to check that there will be no crashes
366+
-- of storages due to changes in storage_ref_* functions.
367+
create_cluster_on_specific_version(
368+
'1be7b8e1055ecd2f2033d6304408e246b0f2ba46')
369+
end)
370+
371+
g.after_test('test_map_callrw', function(g)
372+
g.cluster:drop()
373+
create_cluster_on_specific_version(g.latest_hash)
374+
end)
375+
376+
g.test_map_callrw = function(g)
377+
local rs_uuids = {g.replica_1_a:replicaset_uuid(),
378+
g.replica_2_a:replicaset_uuid()}
379+
local router = vtest.router_new(g, 'router')
380+
router_cfg(router, global_cfg)
381+
-- The latest router and old masters
382+
router:exec(function(rs1_uuid, rs2_uuid)
383+
-- Full map_callrw
384+
local res, err, err_id = ivshard.router.map_callrw('do_map', {'arg_1'},
385+
{timeout = ivtest.wait_timeout, mode = 'full'})
386+
t.assert_not(err)
387+
t.assert_not(err_id)
388+
t.assert_equals(res, {
389+
[rs1_uuid] = {{rs1_uuid, {args = 'arg_1'}}},
390+
[rs2_uuid] = {{rs2_uuid, {args = 'arg_1'}}},
391+
})
392+
-- Full map_callrw with split args
393+
res, err, err_id = ivshard.router.map_callrw('do_map', {'arg_1'},
394+
{timeout = 3, mode = 'full', bucket_ids = {[1] = {'b_arg_1'}}})
395+
t.assert_equals(err.name, 'UNSUPPORTED')
396+
t.assert_not(err_id)
397+
t.assert_not(res)
398+
-- Partial map_callrw with numeric buckets' args
399+
res, err, err_id = ivshard.router.map_callrw('do_map', {'arg_1'},
400+
{timeout = 3, mode = 'partial', bucket_ids = {1,}})
401+
t.assert_not(err)
402+
t.assert_not(err_id)
403+
t.assert_equals(res, {
404+
[rs1_uuid] = {{rs1_uuid, {args = 'arg_1'}}},
405+
})
406+
-- Partial map_callrw with split args
407+
res, err, err_id = ivshard.router.map_callrw('do_map', {'arg_1'},
408+
{timeout = 3, mode = 'partial', bucket_ids = {[1] = {'b_arg_1'}}})
409+
t.assert_not(err)
410+
t.assert_not(err_id)
411+
t.assert_equals(res, {
412+
[rs1_uuid] = {{rs1_uuid, {args = 'arg_1',
413+
b_args = {{'b_arg_1'}}}}},
414+
})
415+
end, rs_uuids)
416+
for _, storage in pairs({g.replica_1_a, g.replica_2_a}) do
417+
reload_server(storage, 'storage')
418+
end
419+
-- The latest router and latest masters
420+
router:exec(function(rs1_uuid, rs2_uuid)
421+
-- Full map_callrw with split args
422+
local res, err, err_id = ivshard.router.map_callrw('do_map', {'arg_1'},
423+
{timeout = 3, mode = 'full', bucket_ids = {[1] = {'b_arg_1'}}})
424+
t.assert_not(err)
425+
t.assert_not(err_id)
426+
t.assert_equals(res, {
427+
[rs1_uuid] = {{rs1_uuid, {args = 'arg_1',
428+
b_args = {{'b_arg_1'}}}}},
429+
[rs2_uuid] = {{rs2_uuid, {args = 'arg_1'}}},
430+
})
431+
end, rs_uuids)
432+
end

test/storage-luatest/storage_1_test.lua

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -240,14 +240,14 @@ test_group.test_ref_with_buckets_basic = function(g)
240240
res, err = ivshard.storage._call(
241241
'storage_ref_make_with_buckets', rid, iwait_timeout, {})
242242
ilt.assert_equals(err, nil)
243-
ilt.assert_equals(res, {moved = {}})
243+
ilt.assert_equals(res, {moved = {}, total = 10})
244244
ilt.assert_equals(lref.count, 0)
245245

246246
-- Check for a single ok bucket.
247247
res, err = ivshard.storage._call(
248248
'storage_ref_make_with_buckets', rid, iwait_timeout, {bids[1]})
249249
ilt.assert_equals(err, nil)
250-
ilt.assert_equals(res, {is_done = true, moved = {}})
250+
ilt.assert_equals(res, {is_done = true, moved = {}, total = 10})
251251
ilt.assert_equals(lref.count, 1)
252252
_, err = ivshard.storage._call('storage_unref', rid)
253253
ilt.assert_equals(err, nil)
@@ -258,7 +258,7 @@ test_group.test_ref_with_buckets_basic = function(g)
258258
'storage_ref_make_with_buckets', rid, iwait_timeout,
259259
{bids[1], bids[2]})
260260
ilt.assert_equals(err, nil)
261-
ilt.assert_equals(res, {is_done = true, moved = {}})
261+
ilt.assert_equals(res, {is_done = true, moved = {}, total = 10})
262262
_, err = ivshard.storage._call('storage_unref', rid)
263263
ilt.assert_equals(err, nil)
264264

@@ -267,7 +267,7 @@ test_group.test_ref_with_buckets_basic = function(g)
267267
'storage_ref_make_with_buckets', rid, iwait_timeout,
268268
{bids[1], bids[1]})
269269
ilt.assert_equals(err, nil)
270-
ilt.assert_equals(res, {is_done = true, moved = {}})
270+
ilt.assert_equals(res, {is_done = true, moved = {}, total = 10})
271271
ilt.assert_equals(lref.count, 1)
272272
_, err = ivshard.storage._call('storage_unref', rid)
273273
ilt.assert_equals(err, nil)
@@ -285,7 +285,8 @@ test_group.test_ref_with_buckets_basic = function(g)
285285
{id = bucket_count + 1},
286286
{id = bucket_count + 2},
287287
{id = bucket_count + 3},
288-
}
288+
},
289+
total = 10,
289290
})
290291
_, err = ivshard.storage._call('storage_unref', rid)
291292
ilt.assert_equals(err, nil)
@@ -298,7 +299,7 @@ test_group.test_ref_with_buckets_basic = function(g)
298299
{bucket_count + 1, bucket_count + 2}
299300
)
300301
ilt.assert_equals(err, nil)
301-
ilt.assert_equals(res, {moved = {
302+
ilt.assert_equals(res, {total = 10, moved = {
302303
{id = bucket_count + 1},
303304
{id = bucket_count + 2},
304305
}})
@@ -347,7 +348,7 @@ test_group.test_ref_with_buckets_return_last_known_dst = function(g)
347348
local res, err = ivshard.storage._call(
348349
'storage_ref_make_with_buckets', rid, iwait_timeout, {bid})
349350
ilt.assert_equals(err, nil)
350-
ilt.assert_equals(res, {moved = {{
351+
ilt.assert_equals(res, {total = 10, moved = {{
351352
id = bid,
352353
dst = id,
353354
status = ivconst.BUCKET.SENT,
@@ -400,6 +401,7 @@ test_group.test_ref_with_buckets_move_part_while_referencing = function(g)
400401
ilt.assert_equals(res, {
401402
moved = {{id = bids[2], dst = id}},
402403
is_done = true,
404+
total = 8,
403405
})
404406
-- Ref was done, because at least one bucket was ok.
405407
ilt.assert_equals(lref.count, 1)
@@ -455,7 +457,8 @@ test_group.test_ref_with_buckets_move_all_while_referencing = function(g)
455457
moved = {
456458
{id = bids[1], dst = id},
457459
{id = bids[2], dst = id},
458-
}
460+
},
461+
total = 8,
459462
})
460463
-- Ref was not done, because all the buckets moved out.
461464
ilt.assert_equals(lref.count, 0)

test/upgrade/upgrade.result

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -179,6 +179,7 @@ vshard.storage._call('test_api', 1, 2, 3)
179179
| - recovery_bucket_stat
180180
| - storage_map
181181
| - storage_ref
182+
| - storage_ref_check_existent
182183
| - storage_ref_check_with_buckets
183184
| - storage_ref_make_with_buckets
184185
| - storage_unref

vshard/consts.lua

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,11 @@ return {
2828
RED = 3,
2929
},
3030

31+
MAP_CALLRW_MODE = {
32+
FULL = 'full',
33+
PARTIAL = 'partial',
34+
},
35+
3136
REPLICATION_THRESHOLD_SOFT = 1,
3237
REPLICATION_THRESHOLD_HARD = 5,
3338
REPLICATION_THRESHOLD_FAIL = 10,

vshard/error.lua

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -207,6 +207,11 @@ local error_message_template = {
207207
msg = 'Mismatch server name: expected "%s", but got "%s"',
208208
args = {'expected_name', 'actual_name'},
209209
},
210+
[42] = {
211+
name = 'UNSUPPORTED',
212+
msg = 'Can\'t perform %s. The storage should be upgraded',
213+
args = {'operation'},
214+
}
210215
}
211216

212217
--

0 commit comments

Comments
 (0)