Skip to content

Commit 5dc0f26

Browse files
committed
Merge develop-2.2 into develop (using existing rebar.conf from develop)
1 parent 356c9bb commit 5dc0f26

30 files changed

+1273
-1085
lines changed

docs/BATCHING.md

+49-205
Large diffs are not rendered by default.

docs/yz-batching-overview.graffle

+481-445
Large diffs are not rendered by default.

docs/yz-batching-overview.png

2.83 KB
Loading

include/yokozuna.hrl

+4-1
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,10 @@
113113
%% take care of it.
114114
-type component() :: search | index.
115115

116-
-type solr_entry() :: {bkey(), obj(), write_reason(), p(), short_preflist(),
116+
117+
-type object_pair() :: {obj(), obj() | no_old_object}.
118+
119+
-type solr_entry() :: {bkey(), object_pair(), write_reason(), p(), short_preflist(),
117120
hash()}.
118121
-type solr_entries() :: [solr_entry()].
119122

priv/yokozuna.schema

+1-4
Original file line numberDiff line numberDiff line change
@@ -253,15 +253,12 @@
253253
%% - purge_index -> Removes all items associated with one random
254254
%% erroring (references to fuses blown in the code) index in
255255
%% order to get below the search.queue.high_watermark.
256-
%% - purge_all -> Removes all items associated with all
257-
%% erroring (references to fuses blown in the code) indices in
258-
%% order to get below the search.queue.high_watermark.
259256
%% - off -> purging is disabled
260257
{mapping, "search.queue.high_watermark.purge_strategy",
261258
"yokozuna.solrq_hwm_purge_strategy", [
262259
{default, purge_one},
263260
{commented, purge_one},
264-
{datatype, {enum, [purge_one, purge_index, purge_all, off]}}
261+
{datatype, {enum, [purge_one, purge_index, off]}}
265262
]}.
266263

267264
%% @doc The amount of time to wait before a drain operation times out.

riak_test/intercepts/yz_solrq_drain_fsm_intercepts.erl

+16-11
Original file line numberDiff line numberDiff line change
@@ -27,19 +27,24 @@
2727
prepare_crash(start, State) ->
2828
{stop, {error, something_bad_happened}, State}.
2929

30+
%% Put a 1 second sleep in front of resume_workers.
31+
resume_workers_sleep_1s(Pid) ->
32+
timer:sleep(1000),
33+
?M:resume_workers_orig(Pid).
3034

31-
%% Put a 5 second sleep in front of prepare.
32-
prepare_sleep_5s(start, State) ->
33-
timer:sleep(5000),
35+
%% restore the original prepare
36+
prepare_orig(start, State) ->
3437
?M:prepare_orig(start, State).
3538

39+
%% restore the original resume_workers
40+
resume_workers_orig(Pid) ->
41+
?M:resume_workers_orig(Pid).
3642

37-
%% Put a 5 second sleep in front of prepare.
38-
prepare_sleep_1s(start, State) ->
39-
timer:sleep(1000),
40-
?M:prepare_orig(start, State).
43+
%% Timeout on a cancel, full stop
44+
cancel_timeout(_Pid, _CancelTimeout) ->
45+
lager:log(info, self(), "Intercepting cancel/2 and returning timeout"),
46+
timeout.
4147

42-
43-
%% restore the original
44-
prepare_orig(start, State) ->
45-
?M:prepare_orig(start, State).
48+
%% restore the original cancel
49+
cancel_orig(Pid, CancelTimeout) ->
50+
?M:cancel_orig(Pid, CancelTimeout).

riak_test/yokozuna_essential.erl

+4-1
Original file line numberDiff line numberDiff line change
@@ -418,7 +418,10 @@ verify_correct_solrqs(Cluster) ->
418418
?assertEqual(ok, rt:wait_until(Cluster, fun check_queues_match/1)).
419419

420420
check_queues_match(Node) ->
421-
CurrentIndexes = rpc:call(Node, yz_index, get_indexes_from_meta, []),
421+
%% Current Indexes includes ?YZ_INDEX_TOMBSTONE because we need to write the entries
422+
%% for non-indexed data to the YZ AAE tree. Excluding them makes the solrq supervisor
423+
%% constantly start and stop these queues.
424+
CurrentIndexes = rpc:call(Node, yz_index, get_indexes_from_meta, []) ++ [?YZ_INDEX_TOMBSTONE],
422425
OwnedPartitions = rt:partitions_for_node(Node),
423426
ActiveQueues = rpc:call(Node, yz_solrq_sup, active_queues, []),
424427
ExpectedQueueus = [{Index, Partition} || Index <- CurrentIndexes, Partition <- OwnedPartitions],

riak_test/yz_aae_test.erl

+39-31
Original file line numberDiff line numberDiff line change
@@ -10,31 +10,34 @@
1010
-define(BUCKET_TYPE, <<"data">>).
1111
-define(INDEX1, <<"fruit_aae">>).
1212
-define(INDEX2, <<"fruitpie_aae">>).
13-
-define(BUCKETWITHTYPE,
14-
{?BUCKET_TYPE, ?INDEX2}).
13+
-define(BUCKETWITHTYPE, {?BUCKET_TYPE, ?INDEX2}).
1514
-define(BUCKET, ?INDEX1).
1615
-define(REPAIR_MFA, {yz_exchange_fsm, repair, 2}).
1716
-define(SPACER, "testfor spaces ").
1817
-define(AAE_THROTTLE_LIMITS, [{-1, 0}, {10000, 10}]).
19-
-define(CFG,
20-
[{riak_core,
21-
[
22-
{ring_creation_size, 16},
23-
{default_bucket_props, [{n_val, ?N}]},
24-
{handoff_concurrency, 10},
25-
{vnode_management_timer, 1000}
26-
]},
27-
{yokozuna,
28-
[
29-
{enabled, true},
30-
{?SOLRQ_DRAIN_ENABLE, true},
31-
{anti_entropy_tick, 1000},
32-
%% allow AAE to build trees and exchange rapidly
33-
{anti_entropy_build_limit, {100, 1000}},
34-
{anti_entropy_concurrency, 8},
35-
{aae_throttle_limits, ?AAE_THROTTLE_LIMITS}
36-
]}
37-
]).
18+
-define(CFG, [
19+
{riak_core, [
20+
{ring_creation_size, 16},
21+
{default_bucket_props, [{n_val, ?N}]},
22+
{handoff_concurrency, 10},
23+
{vnode_management_timer, 1000}
24+
]},
25+
{riak_kv, [
26+
{force_hashtree_upgrade, true},
27+
{anti_entropy_tick, 1000},
28+
{anti_entropy_build_limit, {100, 1000}},
29+
{anti_entropy_concurrency, 8}
30+
]},
31+
{yokozuna, [
32+
{enabled, true},
33+
{?SOLRQ_DRAIN_ENABLE, true},
34+
{anti_entropy_tick, 1000},
35+
%% allow AAE to build trees and exchange rapidly
36+
{anti_entropy_build_limit, {100, 1000}},
37+
{anti_entropy_concurrency, 8},
38+
{aae_throttle_limits, ?AAE_THROTTLE_LIMITS}
39+
]}
40+
]).
3841

3942
confirm() ->
4043
Cluster = rt:build_cluster(5, ?CFG),
@@ -101,15 +104,14 @@ aae_run(Cluster, Bucket, Index) ->
101104

102105
RepairCountBefore = get_cluster_repair_count(Cluster),
103106
yz_rt:count_calls(Cluster, ?REPAIR_MFA),
104-
NumKeys = [{Bucket, K} || K <- yz_rt:random_keys(?NUM_KEYS)],
105-
NumKeysSpaces = [{Bucket, add_space_to_key(K)} ||
107+
RandomBKeys = [{Bucket, K} || K <- yz_rt:random_keys(?NUM_KEYS)],
108+
RandomBKeysWithSpaces = [{Bucket, add_space_to_key(K)} ||
106109
K <- yz_rt:random_keys(?NUM_KEYS_SPACES)],
107-
{DelNumKeys, _ChangeKeys} = lists:split(length(NumKeys) div 2,
108-
NumKeys),
109-
{DelNumKeysSpaces, _ChangeKeysSpaces} = lists:split(
110-
length(NumKeysSpaces) div 2,
111-
NumKeysSpaces),
112-
AllDelKeys = DelNumKeys ++ DelNumKeysSpaces,
110+
{RandomBKeysToDelete, _} = lists:split(length(RandomBKeys) div 2, RandomBKeys),
111+
{RandomBKeysWithSpacesToDelete, _} = lists:split(
112+
length(RandomBKeysWithSpaces) div 2,
113+
RandomBKeysWithSpaces),
114+
AllDelKeys = RandomBKeysToDelete ++ RandomBKeysWithSpacesToDelete,
113115
lager:info("Deleting ~p keys", [length(AllDelKeys)]),
114116
[delete_key_in_solr(Cluster, Index, K) || K <- AllDelKeys],
115117
lager:info("Verify Solr indexes missing"),
@@ -174,7 +176,7 @@ create_orphan_postings(Cluster, Index, Bucket, Keys) ->
174176
Keys2 = [{Bucket, ?INT_TO_BIN(K)} || K <- Keys],
175177
lager:info("Create orphan postings with keys ~p", [Keys]),
176178
ObjNodePs = [create_obj_node_partition_tuple(Cluster, Key) || Key <- Keys2],
177-
[ok = rpc:call(Node, yz_kv, index, [Obj, put, P])
179+
[ok = rpc:call(Node, yz_kv, index, [{Obj, no_old_object}, put, P])
178180
|| {Obj, Node, P} <- ObjNodePs],
179181
yz_rt:commit(Cluster, Index),
180182
ok.
@@ -330,7 +332,8 @@ verify_count_and_repair_after_error_value(Cluster, {BType, _Bucket}, Index,
330332
%% 1. write KV data to non-indexed bucket
331333
Conn = yz_rt:select_random(PBConns),
332334
lager:info("write 1 bad search field to bucket ~p", [Bucket]),
333-
Obj = riakc_obj:new(Bucket, <<"akey_bad_data">>, <<"{\"date_register\":3333}">>,
335+
Key = <<"akey_bad_data">>,
336+
Obj = riakc_obj:new(Bucket, Key, <<"{\"date_register\":3333}">>,
334337
"application/json"),
335338

336339
ok = riakc_pb_socket:put(Conn, Obj),
@@ -349,6 +352,11 @@ verify_count_and_repair_after_error_value(Cluster, {BType, _Bucket}, Index,
349352
%% 5. verify count after expiration
350353
verify_exchange_after_expire(Cluster, Index),
351354

355+
%% 6. Because it's possible we'll try to repair this key again
356+
%% after clearing trees, delete it from KV
357+
ok = riakc_pb_socket:delete(Conn, Bucket, Key),
358+
yz_rt:commit(Cluster, Index),
359+
352360
ok;
353361
verify_count_and_repair_after_error_value(_Cluster, _Bucket, _Index, _PBConns) ->
354362
ok.

riak_test/yz_rt.erl

+11-7
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,9 @@
2727
-type search_type() :: solr | yokozuna.
2828
-type cluster() :: cluster().
2929

30+
-type intercept() :: {{TargetFunctionName::atom(), TargetArity::non_neg_integer()}, InterceptFunctionName::atom()}.
31+
-type intercepts() :: [intercept()].
32+
3033
-export_type([prop/0, props/0, cluster/0]).
3134

3235
%% @doc Get {Host, Port} from `Cluster'.
@@ -552,6 +555,7 @@ remove_index(Node, BucketType) ->
552555
ok = rpc:call(Node, riak_core_bucket_type, update, [BucketType, Props]).
553556

554557
really_remove_index(Cluster, {BucketType, Bucket}, Index, PBConn) ->
558+
lager:info("Removing index ~p", [Index]),
555559
Node = hd(Cluster),
556560
F = fun(_) ->
557561
Props = [{?YZ_INDEX, ?YZ_INDEX_TOMBSTONE}],
@@ -965,17 +969,17 @@ check_fuse_status(Node, Partition, Indices, FuseCheckFunction) ->
965969

966970
-spec intercept_index_batch(node() | cluster(), module()) -> ok | [ok].
967971
intercept_index_batch(Cluster, Intercept) ->
968-
add_intercept(
972+
add_intercepts(
969973
Cluster,
970-
yz_solr, index_batch, 2, Intercept).
974+
yz_solr, [{{index_batch, 2}, Intercept}]).
971975

972-
-spec add_intercept(node() | cluster(), module(), atom(), non_neg_integer(), module()) -> ok | [ok].
973-
add_intercept(Cluster, Module, Function, Arity, Intercept) when is_list(Cluster) ->
974-
[add_intercept(Node, Module, Function, Arity, Intercept) || Node <- Cluster];
975-
add_intercept(Node, Module, Function, Arity, Intercept) ->
976+
-spec add_intercepts(node() | cluster(), module(), intercepts()) -> ok | [ok].
977+
add_intercepts(Cluster, Module, Intercepts) when is_list(Cluster) ->
978+
[add_intercepts(Node, Module, Intercepts) || Node <- Cluster];
979+
add_intercepts(Node, Module, Intercepts) ->
976980
rt_intercept:add(
977981
Node,
978-
{Module, [{{Function, Arity}, Intercept}]}).
982+
{Module, Intercepts}).
979983

980984
-spec set_yz_aae_mode(node() | cluster(), automatic | manual) -> ok | [ok].
981985
set_yz_aae_mode(Cluster, Mode) when is_list(Cluster) ->

riak_test/yz_solrq_test.erl

+22-9
Original file line numberDiff line numberDiff line change
@@ -111,17 +111,18 @@ confirm() ->
111111
pass.
112112

113113
confirm_drain_fsm_failure(Cluster) ->
114+
lager:info("Starting confirm_drain_fsm_failure"),
114115
yz_stat:reset(),
115116
try
116117
yz_rt:load_intercept_code(Cluster),
117-
yz_rt:add_intercept(Cluster, yz_solrq_drain_fsm, prepare, 2, prepare_crash),
118+
yz_rt:add_intercepts(Cluster, yz_solrq_drain_fsm, [{{prepare, 2}, prepare_crash}]),
118119
%% drain solrqs and wait until the drain failure stats are touched
119120
yz_rt:drain_solrqs(Cluster),
120121
yz_rt:wait_until(Cluster, fun check_drain_failure_stats/1),
121122

122123
lager:info("confirm_drain_fsm_failure ok")
123124
after
124-
yz_rt:add_intercept(Cluster, yz_solrq_drain_fsm, prepare, 2, prepare_orig)
125+
yz_rt:add_intercepts(Cluster, yz_solrq_drain_fsm, [{{prepare, 2}, prepare_orig}])
125126
end.
126127

127128
check_drain_failure_stats(Node) ->
@@ -138,19 +139,20 @@ check_drain_failure_stats(Node) ->
138139
yz_rt:check_stat_values(Stats, Pairs).
139140

140141
confirm_drain_fsm_timeout(Cluster) ->
142+
lager:info("Starting confirm_drain_fsm_timeout"),
141143
yz_stat:reset(),
142144
[rpc:call(
143-
Node, application, set_env, [?YZ_APP_NAME, ?SOLRQ_DRAIN_TIMEOUT, 500])
145+
Node, application, set_env, [?YZ_APP_NAME, ?SOLRQ_DRAIN_TIMEOUT, 250])
144146
|| Node <- Cluster],
145147
try
146148
yz_rt:load_intercept_code(Cluster),
147-
yz_rt:add_intercept(Cluster, yz_solrq_drain_fsm, prepare, 2, prepare_sleep_1s),
149+
yz_rt:add_intercepts(Cluster, yz_solrq_drain_fsm, [{{resume_workers, 1}, resume_workers_sleep_1s}]),
148150
yz_rt:drain_solrqs(Cluster),
149151
yz_rt:wait_until(Cluster, fun check_drain_timeout_stats/1),
150152

151153
lager:info("confirm_drain_fsm_timeout ok")
152154
after
153-
yz_rt:add_intercept(Cluster, yz_solrq_drain_fsm, prepare, 2, prepare_orig),
155+
yz_rt:add_intercepts(Cluster, yz_solrq_drain_fsm, [{{resume_workers, 1}, resume_workers_orig}]),
154156
[rpc:call(
155157
Node, application, set_env, [?YZ_APP_NAME, ?SOLRQ_DRAIN_TIMEOUT, 60000])
156158
|| Node <- Cluster]
@@ -170,26 +172,31 @@ check_drain_timeout_stats(Node) ->
170172
yz_rt:check_stat_values(Stats, Pairs).
171173

172174
confirm_drain_fsm_kill(Cluster) ->
175+
lager:info("Starting confirm_drain_fsm_kill"),
173176
[rpc:call(
174177
Node, application, set_env, [?YZ_APP_NAME, ?SOLRQ_DRAIN_TIMEOUT, 10])
175178
|| Node <- Cluster],
179+
%% technically not needed for this test (because the cancel intercept will
180+
%% just return timeout), but added for completeness
176181
[rpc:call(
177182
Node, application, set_env, [?YZ_APP_NAME, ?SOLRQ_DRAIN_CANCEL_TIMEOUT, 10])
178183
|| Node <- Cluster],
179184
try
180185
yz_test_listener:start(),
181186
yz_rt:load_intercept_code(Cluster),
182-
yz_rt:add_intercept(Cluster, yz_solrq_drain_fsm, prepare, 2, prepare_sleep_5s),
183-
yz_rt:add_intercept(Cluster, yz_solrq_drain_mgr, unlink_and_kill, 2, count_unlink_and_kill),
187+
yz_rt:add_intercepts(Cluster, yz_solrq_drain_fsm, [{{resume_workers, 1}, resume_workers_sleep_1s},
188+
{{cancel, 2}, cancel_timeout}]),
189+
yz_rt:add_intercepts(Cluster, yz_solrq_drain_mgr, [{{unlink_and_kill, 2}, count_unlink_and_kill}]),
184190
yz_rt:drain_solrqs(Cluster),
185191
yz_rt:wait_until(Cluster, fun check_drain_cancel_timeout_stats/1),
186192

187193
?assertEqual(1, length(yz_test_listener:messages())),
188194

189195
lager:info("confirm_drain_fsm_kill ok")
190196
after
191-
yz_rt:add_intercept(Cluster, yz_solrq_drain_fsm, prepare, 2, prepare_orig),
192-
yz_rt:add_intercept(Cluster, yz_solrq_drain_mgr, unlink_and_kill, 2, unlink_and_kill_orig),
197+
yz_rt:add_intercepts(Cluster, yz_solrq_drain_fsm, [{{resume_workers, 1}, resume_workers_orig},
198+
{{cancel, 2}, cancel_orig}]),
199+
yz_rt:add_intercepts(Cluster, yz_solrq_drain_mgr, [{{unlink_and_kill, 2}, unlink_and_kill_orig}]),
193200
yz_test_listener:stop(),
194201
[rpc:call(
195202
Node, application, set_env, [?YZ_APP_NAME, ?SOLRQ_DRAIN_TIMEOUT, 60000])
@@ -214,6 +221,7 @@ check_drain_cancel_timeout_stats(Node) ->
214221

215222

216223
confirm_batch_size(Cluster, PBConn, BKey, Index) ->
224+
lager:info("Starting confirm_batch_size"),
217225
%% First, put one less than the min batch size and expect that there are no
218226
%% search results (because the index operations are queued).
219227
Count = ?SOLRQ_BATCH_MIN_SETTING - 1,
@@ -246,6 +254,7 @@ confirm_batch_size(Cluster, PBConn, BKey, Index) ->
246254
ok.
247255

248256
confirm_hwm(Cluster, PBConn, Bucket, Index, HWM) ->
257+
lager:info("Starting confirm_hwm"),
249258
yz_rt:drain_solrqs(Cluster),
250259
{OldMin, OldMax, OldDelay} = set_index(Cluster, Index, 1, 100, 100),
251260
try
@@ -267,6 +276,7 @@ confirm_hwm(Cluster, PBConn, Bucket, Index, HWM) ->
267276
gteq(A, B) -> A >= B.
268277

269278
confirm_draining(Cluster, PBConn, Bucket, Index) ->
279+
lager:info("Starting confirm_draining"),
270280
Count = ?SOLRQ_BATCH_MIN_SETTING - 1,
271281
Count = put_objects(PBConn, Bucket, Count),
272282
yz_rt:commit(Cluster, Index),
@@ -278,6 +288,7 @@ confirm_draining(Cluster, PBConn, Bucket, Index) ->
278288
ok.
279289

280290
confirm_requeue_undelivered([Node|_] = Cluster, PBConn, BKey, Index) ->
291+
lager:info("Starting confirm_requeue_undelivered"),
281292
yz_rt:load_intercept_code(Node),
282293
yz_rt:intercept_index_batch(Node, index_batch_returns_other_error),
283294

@@ -300,6 +311,7 @@ confirm_requeue_undelivered([Node|_] = Cluster, PBConn, BKey, Index) ->
300311
ok.
301312

302313
confirm_no_contenttype_data(Cluster, PBConn, BKey, Index) ->
314+
lager:info("Starting confirm_no_contenttype_data"),
303315
yz_rt:set_index(Cluster, Index, 1, 100, 100),
304316
Count = 1,
305317
Count = put_no_contenttype_objects(PBConn, BKey, Count),
@@ -309,6 +321,7 @@ confirm_no_contenttype_data(Cluster, PBConn, BKey, Index) ->
309321
ok.
310322

311323
confirm_purge_strategy(Cluster, PBConn) ->
324+
lager:info("Starting confirm_purge_strategy"),
312325
confirm_purge_one_strategy(Cluster, PBConn,
313326
{?BUCKET5, ?INDEX5}),
314327
confirm_purge_idx_strategy(Cluster, PBConn,

0 commit comments

Comments
 (0)