Skip to content

Commit ce4439c

Browse files
Doug RohrerJeetKunDoug / fdushin / lehoff
Doug Rohrer
authored and
JeetKunDoug / fdushin / lehoff
committed
* Sinusoidal Performance issue fix:
Added a callback to yz_index_hashtree:update exactly the same as the callback added on the KV side in 2.0.7, because once the snapshot is completed, we needed to (and can) allow the Solr queues to continue writing to Solr. * Refactor to partition/index per queue: By making Solr queues responsible for only a single {partition, index} pair we greatly simplify the code of the queue and the helper. Rather than a pool of queues, and a pool of helpers, we now have one queue/helper pair per partition/index. Beyond code clarity, this provides many benefits: - We can now drain by partition, which means we can do parallel exchanges again. - Because draining a partition only affects the queues for that partition, we greatly reduce the risk of many vnodes being blocked by a single slow drain request. - All the code dealing with `dict`s of indexq records is now gone - each solrq has only a single index/partition, and the indexq record is now gone (collapsed into the main state record). - Additional refactoring around the interaction between helper and worker, simplifying handling of draining further. - If we no longer own a partition, we simply stop that worker/helper pair (which now has its own supervisor as well). * Things we tried in fixing the performance issue that didn't work: - Throttling batches to Solr based on current in-flight queue lengths. This just allowed the queues to back up more. - Attempted to introduce backpressure by using the `riak_core_throttle` module in a pre-commit hook in `riak_kv_put_fsm`. This failed to resolve the issue, and in fact made it worse as we would hold up a large number of requests and then dump them all into KV and the Solr queues. * Additional Fixes - Added queue length and aux_queue length to solrq_worker status - Fixed typespecs for fuse resets in yz_rt, and light refactor of find_representatives to make useable by yz_stat_test. - Remove PULSE from yz_solrq_eqc and supporting modules and Makefile - Meck sidejob:resource_exists so stats updates (also mecked) can work. - Fix format issues (places where we call ?EQC_DEBUG and forgot to pass an array) which would crash the running test, but not production code. - Add `?YZ_SHOULD_INDEX` macro so it can be used in a guard clause. - Make `yz_kv:should_index/1` use `?YZ_SHOULD_INDEX` as its implementaiton so we don't end up changing one and not the other. - Refactor `yz_solrq_helper:update_solr/3` to use `?YZ_SHOULD_INDEX` and reduce complexity/nested cases. - Clean up some deeply nested funs/maps in `yz_solrq_helper`. While we now will iterate over Entries twice, they are small (must be < max batch size) and this makes the code much easier to reason about. - Added stats for bad entries and extraction errors, and removed duplicate measurements of index failures.
1 parent 0bd8592 commit ce4439c

30 files changed

+1338
-1384
lines changed

Makefile

-7
Original file line numberDiff line numberDiff line change
@@ -26,13 +26,6 @@ clean:
2626
distclean: clean
2727
$(REBAR) delete-deps
2828

29-
# You should 'clean' before your first run of this target
30-
# so that deps get built with PULSE where needed.
31-
pulse:
32-
./rebar compile -D PULSE
33-
./rebar eunit -D PULSE skip_deps=true suite=$(PULSE_TESTS)
34-
35-
3629
##
3730
## Dialyzer
3831
##

debug/yz_perf.erl

-74
This file was deleted.

include/yokozuna.hrl

+2-1
Original file line numberDiff line numberDiff line change
@@ -350,6 +350,7 @@
350350

351351
-define(TOMBSTONE, <<>>).
352352
-define(YZ_INDEX_TOMBSTONE, <<"_dont_index_">>).
353+
-define(YZ_SHOULD_INDEX(Index), Index =/= ?YZ_INDEX_TOMBSTONE).
353354
-define(YZ_INDEX, search_index).
354355

355356
%%%===================================================================
@@ -506,7 +507,7 @@
506507
-type solrq_batch_max() :: pos_integer().
507508
-type solrq_batch_flush_interval() :: non_neg_integer()|infinity.
508509
-type solrq_hwm() :: non_neg_integer().
509-
-type purge_strategy() :: ?PURGE_NONE|?PURGE_ONE|?PURGE_IDX|?PURGE_ALL.
510+
-type purge_strategy() :: ?PURGE_NONE|?PURGE_ONE|?PURGE_IDX.
510511

511512
%%%===================================================================
512513
%%% draining

priv/yokozuna.schema

+6-30
Original file line numberDiff line numberDiff line change
@@ -197,8 +197,8 @@
197197
%% but are guaranteed to be flushed within the
198198
%% "search.queue.batch.flush_interval".
199199
{mapping, "search.queue.batch.minimum", "yokozuna.solrq_batch_min", [
200-
{default, 1},
201-
{commented, 1},
200+
{default, 10},
201+
{commented, 10},
202202
{datatype, integer},
203203
{validators, ["positive_integer"]}
204204
]}.
@@ -211,8 +211,8 @@
211211
%% "search.queue.batch.maximum object" will be delivered into Solr in any
212212
%% given request.
213213
{mapping, "search.queue.batch.maximum", "yokozuna.solrq_batch_max", [
214-
{default, 100},
215-
{commented, 100},
214+
{default, 500},
215+
{commented, 500},
216216
{datatype, integer},
217217
{validators, ["positive_integer"]}
218218
]}.
@@ -239,8 +239,8 @@
239239
%% parameter exercises flow control between Riak and the Riak
240240
%% Search batching subsystem if writes into Solr start to fall behind.
241241
{mapping, "search.queue.high_watermark", "yokozuna.solrq_hwm", [
242-
{default, 10},
243-
{commented, 10},
242+
{default, 1000},
243+
{commented, 1000},
244244
{datatype, integer},
245245
{validators, ["non_negative_integer"]}
246246
]}.
@@ -264,30 +264,6 @@
264264
{datatype, {enum, [purge_one, purge_index, purge_all, off]}}
265265
]}.
266266

267-
%% @doc The number of solr queue workers to instantiate in the Riak
268-
%% Search application. Solr queue workers are responsible for enqueuing
269-
%% objects for insertion or update into Solr. Increasing the number of
270-
%% solrq distributes the queuing of objects, and can lead to greater
271-
%% throughput under high load, potentially at the expense of smaller batch
272-
%% sizes.
273-
{mapping, "search.queue.worker_count", "yokozuna.solrq_worker_count", [
274-
{default, 10},
275-
{commented, 10},
276-
{datatype, integer},
277-
{validators, ["positive_integer"]}
278-
]}.
279-
280-
%% @doc The number of solr queue helpers to instantiate in the Riak
281-
%% Search application. Solr queue helpers are responsible for delivering
282-
%% batches of data into Solr. Increasing the number of solrq helpers will
283-
%% increase concurrent writes into Solr.
284-
{mapping, "search.queue.helper_count", "yokozuna.solrq_helper_count", [
285-
{default, 10},
286-
{commented, 10},
287-
{datatype, integer},
288-
{validators, ["positive_integer"]}
289-
]}.
290-
291267
%% @doc The amount of time to wait before a drain operation times out.
292268
%% If a drain times out during an AAE exchange, the exchange is cancelled
293269
%% and retried at a later time.

riak_test/intercepts/yz_solr_intercepts.erl

+2-2
Original file line numberDiff line numberDiff line change
@@ -27,5 +27,5 @@ entropy_data_cant_complete(Core, Filter) ->
2727
index_batch_call_orig(Core, Ops) ->
2828
?M:index_batch_orig(Core, Ops).
2929

30-
index_batch_throw_exception(_Core, _Ops) ->
31-
throw({"Failed to index docs", other, error}).
30+
index_batch_returns_other_error(_Core, _Ops) ->
31+
{error, other, "Failed to index docs"}.

riak_test/yokozuna_essential.erl

+22-1
Original file line numberDiff line numberDiff line change
@@ -63,12 +63,14 @@ confirm() ->
6363
verify_non_existent_index(Cluster, <<"froot">>),
6464
{0, _} = yz_rt:load_data(Cluster, ?BUCKET, YZBenchDir, ?NUM_KEYS),
6565
yz_rt:commit(Cluster, ?INDEX),
66-
yz_rt:verify_num_match(Cluster, ?INDEX, ?NUM_KEYS),
66+
verify_correct_solrqs(Cluster),
6767
%% Verify data exists before running join
68+
yz_rt:verify_num_match(Cluster, ?INDEX, ?NUM_KEYS),
6869
Cluster2 = join_rest(Cluster, Nodes),
6970
rt:wait_for_cluster_service(Cluster2, riak_kv),
7071
rt:wait_for_cluster_service(Cluster2, yokozuna),
7172
verify_non_owned_data_deleted(Cluster, ?INDEX),
73+
verify_correct_solrqs(Cluster2),
7274
wait_for_indexes(Cluster2),
7375
ok = test_tagging_http(Cluster2),
7476
ok = test_tagging_pb(Cluster2),
@@ -79,6 +81,7 @@ confirm() ->
7981
verify_deletes(Cluster2, ?INDEX, ?NUM_KEYS, KeysDeleted),
8082
ok = test_escaped_key(Cluster2),
8183
verify_unique_id(Cluster2, PBConns),
84+
verify_deleted_index_stops_solrqs(Cluster2, PBConns),
8285
yz_rt:close_pb_conns(PBConns),
8386
pass;
8487
{error, bb_driver_build_failed} ->
@@ -404,3 +407,21 @@ verify_deletes(Cluster, Index, NumKeys, KeysDeleted) ->
404407
NumDeleted = length(KeysDeleted),
405408
lager:info("Verify ~p keys were deleted", [NumDeleted]),
406409
yz_rt:verify_num_match(Cluster, Index, NumKeys - NumDeleted).
410+
411+
412+
verify_deleted_index_stops_solrqs(Cluster, PBConns) ->
413+
PBConn = hd(PBConns),
414+
yz_rt:really_remove_index(Cluster, ?BUCKET, ?INDEX, PBConn),
415+
verify_correct_solrqs(Cluster).
416+
417+
verify_correct_solrqs(Cluster) ->
418+
?assertEqual(ok, rt:wait_until(Cluster, fun check_queues_match/1)).
419+
420+
check_queues_match(Node) ->
421+
CurrentIndexes = rpc:call(Node, yz_index, get_indexes_from_meta, []),
422+
OwnedPartitions = rt:partitions_for_node(Node),
423+
ActiveQueues = rpc:call(Node, yz_solrq_sup, active_queues, []),
424+
ExpectedQueueus = [{Index, Partition} || Index <- CurrentIndexes, Partition <- OwnedPartitions],
425+
lager:debug("Validating correct Solr Queues are running. Node: ~p, Expected: ~p, Active: ~p", [Node, ExpectedQueueus, ActiveQueues]),
426+
lists:sort(ExpectedQueueus) == lists:sort(ActiveQueues).
427+

riak_test/yz_pb.erl

+8-8
Original file line numberDiff line numberDiff line change
@@ -174,14 +174,14 @@ confirm_admin_index(Cluster) ->
174174
[{Host, Port}] = host_entries(rt:connection_info([Node])),
175175
{ok, Pid} = riakc_pb_socket:start_link(Host, (Port-1)),
176176
F = fun(_) ->
177-
%% Remove index from bucket props and delete it
178-
yz_rt:remove_index(Node, Index),
179-
DelResp = riakc_pb_socket:delete_search_index(Pid, Index),
180-
case DelResp of
181-
ok -> true;
182-
{error,<<"notfound">>} -> true
183-
end
184-
end,
177+
%% Remove index from bucket props and delete it
178+
yz_rt:remove_index(Node, Index),
179+
DelResp = riakc_pb_socket:delete_search_index(Pid, Index),
180+
case DelResp of
181+
ok -> true;
182+
{error,<<"notfound">>} -> true
183+
end
184+
end,
185185
yz_rt:wait_until(Cluster, F),
186186
riakc_pb_socket:stop(Pid),
187187
ok.

0 commit comments

Comments
 (0)