2222 doc_count ,
2323 w ,
2424 grouped_docs ,
25- reply
25+ reply ,
26+ update_options ,
27+ started = []
2628}).
2729
2830go (_ , [], _ ) ->
@@ -33,25 +35,25 @@ go(DbName, AllDocs0, Opts) ->
3335 validate_atomic_update (DbName , AllDocs , lists :member (all_or_nothing , Opts )),
3436 Options = lists :delete (all_or_nothing , Opts ),
3537 GroupedDocs = lists :map (
36- fun ({# shard {name = Name , node = Node } = Shard , Docs }) ->
37- Docs1 = untag_docs (Docs ),
38- Ref = rexi :cast (Node , {fabric_rpc , update_docs , [Name , Docs1 , Options ]}),
39- {Shard # shard {ref = Ref }, Docs }
38+ fun ({# shard {} = Shard , Docs }) ->
39+ {Shard # shard {ref = make_ref ()}, Docs }
4040 end ,
4141 group_docs_by_shard (DbName , AllDocs )
4242 ),
4343 {Workers , _ } = lists :unzip (GroupedDocs ),
4444 RexiMon = fabric_util :create_monitors (Workers ),
4545 W = couch_util :get_value (w , Options , integer_to_list (mem3 :quorum (DbName ))),
4646 Acc0 = # acc {
47+ update_options = Options ,
4748 waiting_count = length (Workers ),
4849 doc_count = length (AllDocs ),
4950 w = list_to_integer (W ),
5051 grouped_docs = GroupedDocs ,
5152 reply = dict :new ()
5253 },
5354 Timeout = fabric_util :request_timeout (),
54- try rexi_utils :recv (Workers , # shard .ref , fun handle_message /3 , Acc0 , infinity , Timeout ) of
55+ Acc1 = start_leaders (Acc0 ),
56+ try rexi_utils :recv (Workers , # shard .ref , fun handle_message /3 , Acc1 , infinity , Timeout ) of
5557 {ok , {Health , Results }} when
5658 Health =:= ok ; Health =:= accepted ; Health =:= error
5759 ->
@@ -101,6 +103,7 @@ handle_message({ok, Replies}, Worker, #acc{} = Acc0) ->
101103 reply = DocReplyDict0
102104 } = Acc0 ,
103105 {value , {_ , Docs }, NewGrpDocs } = lists :keytake (Worker , 1 , GroupedDocs ),
106+ Acc1 = start_followers (Worker , Acc0 ),
104107 DocReplyDict = append_update_replies (Docs , Replies , DocReplyDict0 ),
105108 case {WaitingCount , dict :size (DocReplyDict )} of
106109 {1 , _ } ->
@@ -115,7 +118,7 @@ handle_message({ok, Replies}, Worker, #acc{} = Acc0) ->
115118 % we've got at least one reply for each document, let's take a look
116119 case dict :fold (fun maybe_reply /3 , {stop , W , []}, DocReplyDict ) of
117120 continue ->
118- {ok , Acc0 # acc {
121+ {ok , Acc1 # acc {
119122 waiting_count = WaitingCount - 1 ,
120123 grouped_docs = NewGrpDocs ,
121124 reply = DocReplyDict
@@ -124,7 +127,7 @@ handle_message({ok, Replies}, Worker, #acc{} = Acc0) ->
124127 {stop , {ok , FinalReplies }}
125128 end ;
126129 _ ->
127- {ok , Acc0 # acc {
130+ {ok , Acc1 # acc {
128131 waiting_count = WaitingCount - 1 , grouped_docs = NewGrpDocs , reply = DocReplyDict
129132 }}
130133 end ;
@@ -318,6 +321,53 @@ group_docs_by_shard(DbName, Docs) ->
318321 )
319322 ).
320323
324+ % % use 'lowest' node that hosts this shard range as leader
325+ is_leader (Worker , Workers ) ->
326+ Worker == lists :min ([W || W <- Workers , W # shard .range == Worker # shard .range ]).
327+
328+ start_leaders (# acc {} = Acc0 ) ->
329+ # acc {grouped_docs = GroupedDocs } = Acc0 ,
330+ {Workers , _ } = lists :unzip (GroupedDocs ),
331+ Started = lists :foldl (
332+ fun ({Worker , Docs }, RefAcc ) ->
333+ case is_leader (Worker , Workers ) of
334+ true ->
335+ start_worker (Worker , Docs , Acc0 ),
336+ [Worker # shard .ref | RefAcc ];
337+ false ->
338+ RefAcc
339+ end
340+ end ,
341+ [],
342+ GroupedDocs
343+ ),
344+ Acc0 # acc {started = lists :append ([Started , Acc0 # acc .started ])}.
345+
346+ start_followers (# shard {} = Leader , # acc {} = Acc0 ) ->
347+ Followers = [
348+ {Worker , Docs }
349+ || {Worker , Docs } <- Acc0 # acc .grouped_docs ,
350+ Worker # shard .range == Leader # shard .range ,
351+ not lists :member (Worker # shard .ref , Acc0 # acc .started )
352+ ],
353+ lists :foreach (
354+ fun ({Worker , Docs }) ->
355+ start_worker (Worker , Docs , Acc0 )
356+ end ,
357+ Followers
358+ ),
359+ Started = [Ref || {# shard {ref = Ref }, _Docs } <- Followers ],
360+ Acc0 # acc {started = lists :append ([Started , Acc0 # acc .started ])}.
361+
362+ start_worker (# shard {ref = Ref } = Worker , Docs , # acc {} = Acc0 ) when is_reference (Ref ) ->
363+ # shard {name = Name , node = Node } = Worker ,
364+ # acc {update_options = UpdateOptions } = Acc0 ,
365+ rexi :cast_ref (Ref , Node , {fabric_rpc , update_docs , [Name , untag_docs (Docs ), UpdateOptions ]}),
366+ ok ;
367+ start_worker (# shard {ref = undefined }, _Docs , # acc {}) ->
368+ % for unit tests below.
369+ ok .
370+
321371append_update_replies ([], [], DocReplyDict ) ->
322372 DocReplyDict ;
323373append_update_replies ([Doc | Rest ], [], Dict0 ) ->
0 commit comments