Skip to content

Commit 26f5c42

Browse files
committed
Store indexes in the ring and sync during ring events
An index is a cluster-wide resource. If the create index API is called on one node then it should be called on all nodes--even if they aren't part of the cluster during the time of the call. A metadata entry in the ring is used to store the list of indexes. During a ring event the local list of lindexes is checked against that in the ring. The delta is calculated and missing indexes are added. Removal is not yet implemented. An internal index named `_yz` has been added. Currently, it's only use is to determine if Solr is up by pinging the `_yz` index. However, it may be useful to use this index as a place to store data internal to Yokozuna or Riak. Finally, some changes were made to the process that starts Solr. The current working directory was changed to be the same as Riak's start script, the lib dir is now passed at startup, and the solr log now resides in same location as Riak logs, and make sure to wait until Solr is fully started or things break when a ring event is handled.
1 parent a338429 commit 26f5c42

9 files changed

+163
-46
lines changed

include/yokozuna.hrl

+10
Original file line numberDiff line numberDiff line change
@@ -55,8 +55,11 @@
5555
-type value() :: term().
5656
-type field() :: {name(), value()}.
5757
-type fields() :: [field()].
58+
-type index_name() :: string().
5859
-type doc() :: {doc, fields()}.
5960
-type base64() :: base64:ascii_string().
61+
-type ordset() :: ordsets:ordset().
62+
-type ring() :: riak_core_ring:riak_core_ring().
6063
-type solr_vclocks() :: #solr_vclocks{}.
6164
-type iso8601() :: string().
6265
-type tree_name() :: atom().
@@ -75,6 +78,10 @@
7578
%% Mapping from logical partition to partition
7679
-type logical_idx() :: [{lp(), p()}].
7780

81+
-type node_event() :: {node_event, node(), up | down}.
82+
-type ring_event() :: {ring_event, riak_core_ring:riak_core_ring()}.
83+
-type event() :: node_event() | ring_event().
84+
7885

7986
%%%===================================================================
8087
%%% Macros
@@ -85,7 +92,9 @@
8592

8693
-define(INT_TO_BIN(I), list_to_binary(integer_to_list(I))).
8794

95+
-define(YZ_INDEX, "_yz").
8896
-define(YZ_DEFAULT_SOLR_PORT, "8983").
97+
-define(YZ_DEFAULT_SOLR_STARTUP_WAIT, 15).
8998
-define(YZ_EVENTS_TAB, yz_events_tab).
9099
-define(YZ_ENTROPY_DATA_FIELD, '_yz_ed').
91100
-define(YZ_ROOT_DIR, app_helper:get_env(?YZ_APP_NAME, root_dir, "data/yz")).
@@ -97,3 +106,4 @@
97106
-define(YZ_APP_NAME, yokozuna).
98107
-define(YZ_SVC_NAME, yokozuna).
99108
-define(YZ_VNODE_MASTER, yokozuna_vnode_master).
109+
-define(YZ_META_INDEXES, yokozuna_indexes).

priv/conf/config.xml

+1-1
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@
7373
<lib dir="../../dist/" regex="apache-solr-langid-\d.*\.jar" />
7474
<lib dir="../../contrib/langid/lib/" regex=".*\.jar" />
7575

76-
<lib dir="${yz_java_lib_dir}" />
76+
<lib dir="${yz.lib.dir}" />
7777

7878
<!-- an exact 'path' can be used instead of a 'dir' to specify a
7979
specific file. This will cause a serious error to be logged if

priv/logging.properties

+1-1
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
handlers = java.util.logging.FileHandler
44

55
java.util.logging.FileHandler.formatter = java.util.logging.SimpleFormatter
6-
java.util.logging.FileHandler.pattern = ./logs/solr-%g.log
6+
java.util.logging.FileHandler.pattern = {{platform_log_dir}}/solr-%g.log
77
java.util.logging.FileHandler.append = true
88
java.util.logging.FileHandler.count = 5
99
java.util.logging.FileHandler.limit = 10485760

priv/solr.xml

+2-1
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
If 'null' (or absent), cores will not be manageable via request handler
3030
-->
3131
<cores adminPath="/admin/cores">
32-
<!-- <core name="default" instanceDir="default" /> -->
32+
<core name="_yz" schema="schema.xml" config="config.xml" instanceDir="_yz">
33+
</core>
3334
</cores>
3435
</solr>

src/yokozuna_app.erl

+1
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131

3232
start(_StartType, _StartArgs) ->
3333
riak_core:wait_for_service(riak_kv),
34+
yz_index:add_to_ring(?YZ_INDEX),
3435
case yokozuna_sup:start_link() of
3536
{ok, Pid} ->
3637
register_app(),

src/yz_events.erl

+64-20
Original file line numberDiff line numberDiff line change
@@ -18,13 +18,17 @@
1818
%%
1919
%% -------------------------------------------------------------------
2020

21+
%% @doc Functionality related to events. This is the single producer of
22+
%% writes to the ETS table `yz_events`.
23+
2124
-module(yz_events).
22-
-compile(export_all).
2325
-behavior(gen_server).
26+
-compile(export_all).
27+
-export([handle_cast/2,
28+
init/1,
29+
terminate/2]).
2430
-include("yokozuna.hrl").
2531

26-
%% @doc Functionality related to events. This is the single producer of
27-
%% writes to the ETS table `yz_events`.
2832

2933
%%%===================================================================
3034
%%% API
@@ -51,23 +55,45 @@ init([]) ->
5155
ok = create_events_table(),
5256
{ok, none}.
5357

54-
handle_cast({node_event, Node, Status}, S) ->
58+
handle_cast({node_event, _Node, _Status}=NE, S) ->
5559
Mapping = get_mapping(),
56-
Mapping2 = handle_node_event(Node, Status, Mapping),
60+
Mapping2 = new_mapping(NE, Mapping),
5761
ok = set_mapping(Mapping2),
62+
5863
{noreply, S};
5964

60-
handle_cast({ring_event, Ring}, S) ->
65+
handle_cast({ring_event, Ring}=RE, S) ->
6166
Mapping = get_mapping(),
62-
Mapping2 = handle_ring_event(Ring, Mapping),
67+
Mapping2 = new_mapping(RE, Mapping),
6368
ok = set_mapping(Mapping2),
69+
70+
Local = yz_index:indexes(),
71+
Cluster = yz_index:get_indexes_from_ring(Ring),
72+
{Removed, Added, Same} = index_delta(Local, Cluster),
73+
ok = sync_indexes(Removed, Added, Same),
74+
6475
{noreply, S}.
6576

77+
terminate(_Reason, _S) ->
78+
ok = destroy_events_table().
79+
6680

6781
%%%===================================================================
6882
%%% Private
6983
%%%===================================================================
7084

85+
-spec add_index(index_name()) -> ok.
86+
add_index(Name) ->
87+
case yz_index:exists(Name) of
88+
true -> ok;
89+
false -> ok = yz_index:create(Name)
90+
end.
91+
92+
-spec add_indexes(ordset()) -> ok.
93+
add_indexes(Names) ->
94+
lists:foreach(fun add_index/1, Names),
95+
ok.
96+
7197
-spec add_node(node(), list()) -> list().
7298
add_node(Node, Mapping) ->
7399
HostPort = host_port(Node),
@@ -88,19 +114,10 @@ create_events_table() ->
88114
?YZ_EVENTS_TAB = ets:new(?YZ_EVENTS_TAB, Opts),
89115
ok.
90116

91-
-spec handle_node_event(node(), up | down, list()) -> list().
92-
handle_node_event(Node, down, Mapping) ->
93-
remove_node(Node, Mapping);
94-
handle_node_event(Node, up, Mapping) ->
95-
add_node(Node, Mapping).
96-
97-
-spec handle_ring_event(riak_core_ring:ring(), list()) -> list().
98-
handle_ring_event(Ring, Mapping) ->
99-
Nodes = riak_core_ring:all_members(Ring),
100-
{Removed, Added} = node_ops(Mapping, Nodes),
101-
Mapping2 = remove_nodes(Removed, Mapping),
102-
Mapping3 = add_nodes(Added, Mapping2),
103-
check_unkown(Mapping3).
117+
-spec destroy_events_table() -> ok.
118+
destroy_events_table() ->
119+
true = ets:delete(?YZ_EVENTS_TAB),
120+
ok.
104121

105122
-spec host_port(node()) -> {string(), non_neg_integer() | unknown}.
106123
host_port(Node) ->
@@ -118,6 +135,14 @@ hostname(Node) ->
118135
[_, Host] = re:split(S, "@", [{return, list}]),
119136
Host.
120137

138+
-spec index_delta(ordset(), ordset()) ->
139+
{Removed::ordset(), Added::ordset(), Same::ordset()}.
140+
index_delta(Local, Cluster) ->
141+
Removed = ordsets:subtract(Local, Cluster),
142+
Added = ordsets:subtract(Cluster, Local),
143+
Same = ordsets:intersection(Cluster, Local),
144+
{Removed, Added, Same}.
145+
121146
-spec is_unknown(tuple()) -> boolean().
122147
is_unknown({_, {_, unknown}}) -> true;
123148
is_unknown({_, {_, Port}}) when is_list(Port) -> false.
@@ -126,6 +151,18 @@ is_unknown({_, {_, Port}}) when is_list(Port) -> false.
126151
just_nodes(Mapping) ->
127152
[Node || {Node, _} <- Mapping].
128153

154+
-spec new_mapping(event(), list()) -> list().
155+
new_mapping({node_event, Node, down}, Mapping) ->
156+
remove_node(Node, Mapping);
157+
new_mapping({node_event, Node, up}, Mapping) ->
158+
add_node(Node, Mapping);
159+
new_mapping({ring_event, Ring}, Mapping) ->
160+
Nodes = riak_core_ring:all_members(Ring),
161+
{Removed, Added} = node_ops(Mapping, Nodes),
162+
Mapping2 = remove_nodes(Removed, Mapping),
163+
Mapping3 = add_nodes(Added, Mapping2),
164+
check_unkown(Mapping3).
165+
129166
-spec node_ops(list(), list()) -> {Removed::list(), Added::list()}.
130167
node_ops(Mapping, Nodes) ->
131168
MappingNodesSet = sets:from_list(just_nodes(Mapping)),
@@ -134,6 +171,9 @@ node_ops(Mapping, Nodes) ->
134171
Added = sets:subtract(NodesSet, MappingNodesSet),
135172
{sets:to_list(Removed), sets:to_list(Added)}.
136173

174+
remove_indexes(_Names) ->
175+
throw(implement_remove_indexes).
176+
137177
-spec remove_node(node(), list()) -> list().
138178
remove_node(Node, Mapping) ->
139179
proplists:delete(Node, Mapping).
@@ -154,6 +194,10 @@ set_mapping(Mapping) ->
154194
true = ets:insert(?YZ_EVENTS_TAB, [{mapping, Mapping}]),
155195
ok.
156196

197+
sync_indexes(_Removed, Added, Same) ->
198+
%% ok = remove_indexes(Removed),
199+
ok = add_indexes(Added ++ Same).
200+
157201
watch_node_events() ->
158202
riak_core_node_watcher_events:add_sup_callback(fun send_node_update/1).
159203

src/yz_index.erl

+36-7
Original file line numberDiff line numberDiff line change
@@ -31,31 +31,59 @@
3131
%%% API
3232
%%%===================================================================
3333

34+
-spec add_to_ring(string()) -> ok.
35+
add_to_ring(Name) ->
36+
{ok, Ring} = riak_core_ring_manager:get_raw_ring(),
37+
ok = add_to_ring(Ring, Name).
38+
3439
%% TODO: Allow data dir to be changed
3540
-spec create(string()) -> ok.
3641
create(Name) ->
3742
IndexDir = index_dir(Name),
3843
ConfDir = filename:join([IndexDir, "conf"]),
3944
ConfFiles = filelib:wildcard(filename:join([?YZ_PRIV, "conf", "*"])),
4045
DataDir = filename:join([IndexDir, "data"]),
41-
JavaLibDir = java_lib_dir(),
4246

43-
make_dirs([ConfDir, DataDir, JavaLibDir]),
47+
make_dirs([ConfDir, DataDir]),
4448
copy_files(ConfFiles, ConfDir),
4549

4650
CoreProps = [
4751
{name, Name},
4852
{index_dir, IndexDir},
4953
{cfg_file, ?YZ_CORE_CFG_FILE},
50-
{java_lib_dir, JavaLibDir},
5154
{schema_file, ?YZ_SCHEMA_FILE}
5255
],
53-
ok = yz_solr:core(create, CoreProps).
56+
{ok, _, _} = yz_solr:core(create, CoreProps),
57+
ok.
58+
59+
-spec exists(string()) -> boolean().
60+
exists(Name) ->
61+
true == yz_solr:ping(Name).
62+
63+
-spec get_indexes_from_ring(ring()) -> ordset().
64+
get_indexes_from_ring(Ring) ->
65+
case riak_core_ring:get_meta(?YZ_META_INDEXES, Ring) of
66+
{ok, Indexes} -> Indexes;
67+
undefined -> []
68+
end.
69+
70+
-spec indexes() -> ordset().
71+
indexes() ->
72+
{ok, _, Body} = yz_solr:core(status, [{wt,json}]),
73+
Status = yz_solr:get_path(mochijson2:decode(Body), [<<"status">>]),
74+
ordsets:from_list([binary_to_list(Name) || {Name, _} <- Status]).
5475

5576
%%%===================================================================
5677
%%% Private
5778
%%%===================================================================
5879

80+
add_to_ring(Ring, Name) ->
81+
Indexes = get_indexes_from_ring(Ring),
82+
Indexes2 = ordsets:add_element(Name, Indexes),
83+
Ring2 = riak_core_ring:update_meta(?YZ_META_INDEXES, Indexes2, Ring),
84+
{ok, _Ring3} = riak_core_ring_manager:ring_trans(set_ring_trans(Ring2), []),
85+
ok.
86+
5987
copy_files([], _) ->
6088
ok;
6189
copy_files([File|Rest], Dir) ->
@@ -75,9 +103,6 @@ index_dir(Name) ->
75103
YZDir = app_helper:get_env(?YZ_APP_NAME, yz_dir, ?YZ_DEFAULT_DIR),
76104
filename:absname(filename:join([YZDir, Name])).
77105

78-
java_lib_dir() ->
79-
?YZ_PRIV ++ "/java_lib".
80-
81106
make_dir(Dir) ->
82107
case filelib:is_dir(Dir) of
83108
true ->
@@ -96,6 +121,10 @@ make_dirs([Dir|Rest]) ->
96121
schema_file() ->
97122
?YZ_PRIV ++ "/" ++ ?YZ_SCHEMA_FILE.
98123

124+
set_ring_trans(Ring) ->
125+
fun(_,_) -> {new_ring, Ring} end.
126+
127+
99128
%% Old stuff relate to exception handling ideas I was playing with
100129

101130
%% create2(Name, _Schema) ->

src/yz_solr.erl

+15-5
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@
2424

2525
-define(CORE_ALIASES, [{index_dir, instanceDir},
2626
{cfg_file, config},
27-
{java_lib_dir, "property.yz_java_lib_dir"},
2827
{schema_file, schema}]).
2928
-define(DEFAULT_URL, "http://localhost:8983/solr").
3029
-define(DEFAULT_VCLOCK_N, 1000).
@@ -61,10 +60,10 @@ core(Action, Props) ->
6160
URL = BaseURL ++ "?" ++ Encoded,
6261

6362
case ibrowse:send_req(URL, [], get, [], Opts) of
64-
{ok, "200", _Headers, _Body} ->
65-
ok;
63+
{ok, "200", Headers, Body} ->
64+
{ok, Headers, Body};
6665
X ->
67-
throw({error_calling_solr, create_core, X})
66+
throw({error_calling_solr, core, Action, X})
6867
end.
6968

7069
delete(Core, DocID) ->
@@ -174,7 +173,8 @@ filter_to_str({Partition, FPFilter}) ->
174173
"(" ++ PNQ ++ " AND (" ++ FPQ2 ++ "))".
175174

176175

177-
convert_action(create) -> "CREATE".
176+
convert_action(create) -> "CREATE";
177+
convert_action(status) -> "STATUS".
178178

179179
%% TODO: Encoding functions copied from esolr, redo this.
180180
encode_commit() ->
@@ -214,6 +214,16 @@ get_pairs(R) ->
214214
to_pair({struct, [{_,DocId},{_,Base64VClock}]}) ->
215215
{DocId, Base64VClock}.
216216

217+
get_path({struct, PL}, Path) ->
218+
get_path(PL, Path);
219+
get_path(PL, [Name]) ->
220+
case proplists:get_value(Name, PL) of
221+
{struct, Obj} -> Obj;
222+
Val -> Val
223+
end;
224+
get_path(PL, [Name|Path]) ->
225+
get_path(proplists:get_value(Name, PL), Path).
226+
217227
get_response(R) ->
218228
json_get_key(<<"response">>, R).
219229

0 commit comments

Comments
 (0)