@@ -45,7 +45,7 @@ init() ->
45
45
ok = application :ensure_started (inets ),
46
46
% % we cannot start this plugin yet since it depends on the rabbit app,
47
47
% % which is in the process of being started by the time this function is called
48
- application :load (rabbitmq_peer_discovery_common ),
48
+ _ = application :load (rabbitmq_peer_discovery_common ),
49
49
rabbit_peer_discovery_httpc :maybe_configure_proxy (),
50
50
rabbit_peer_discovery_httpc :maybe_configure_inet6 ().
51
51
@@ -181,7 +181,7 @@ lock(Node) ->
181
181
-spec unlock ({SessionId :: string (), TRef :: timer :tref ()}) -> ok .
182
182
183
183
unlock ({SessionId , TRef }) ->
184
- timer :cancel (TRef ),
184
+ _ = timer :cancel (TRef ),
185
185
? LOG_DEBUG (
186
186
" Stopped session renewal" ,
187
187
#{domain => ? RMQLOG_DOMAIN_PEER_DIS }),
@@ -235,7 +235,7 @@ http_options(HttpOpts0, M) ->
235
235
HttpOpts1 = [TLSOpts | HttpOpts0 ],
236
236
HttpOpts1 .
237
237
238
- -spec filter_nodes (ConsulResult :: list () , AllowWarning :: atom ()) -> list () .
238
+ -spec filter_nodes (ConsulResult :: [#{ term () => term ()}] , AllowWarning :: boolean ()) -> [#{ term () => term ()}] .
239
239
filter_nodes (Nodes , Warn ) ->
240
240
case Warn of
241
241
true ->
@@ -251,10 +251,10 @@ filter_nodes(Nodes, Warn) ->
251
251
false -> Nodes
252
252
end .
253
253
254
- -spec extract_nodes (ConsulResult :: list () ) -> list ().
254
+ -spec extract_nodes (ConsulResult :: [#{ binary () => term ()}] ) -> list ().
255
255
extract_nodes (Data ) -> extract_nodes (Data , []).
256
256
257
- -spec extract_nodes (ConsulResult :: list () , Nodes :: list ())
257
+ -spec extract_nodes (ConsulResult :: [#{ binary () => term ()}] , Nodes :: list ())
258
258
-> list ().
259
259
extract_nodes ([], Nodes ) -> Nodes ;
260
260
extract_nodes ([H | T ], Nodes ) ->
@@ -570,8 +570,6 @@ maybe_re_register({error, Reason}) ->
570
570
#{domain => ? RMQLOG_DOMAIN_PEER_DIS });
571
571
maybe_re_register ({ok , {Members , _NodeType }}) ->
572
572
maybe_re_register (Members );
573
- maybe_re_register ({ok , Members }) ->
574
- maybe_re_register (Members );
575
573
maybe_re_register (Members ) ->
576
574
case lists :member (node (), Members ) of
577
575
true ->
@@ -589,13 +587,14 @@ maybe_re_register(Members) ->
589
587
wait_for_list_nodes () ->
590
588
wait_for_list_nodes (60 ).
591
589
590
+ -spec wait_for_list_nodes (non_neg_integer ()) -> {'ok' , term ()} | {'error' , term ()}.
591
+ wait_for_list_nodes (0 ) ->
592
+ list_nodes ();
592
593
wait_for_list_nodes (N ) ->
593
- case {list_nodes (), N } of
594
- {Reply , 0 } ->
595
- Reply ;
596
- {{ok , _ } = Reply , _ } ->
594
+ case list_nodes () of
595
+ {ok , _ } = Reply ->
597
596
Reply ;
598
- {{ error , _ }, _ } ->
597
+ _ ->
599
598
timer :sleep (1000 ),
600
599
wait_for_list_nodes (N - 1 )
601
600
end .
@@ -606,7 +605,7 @@ wait_for_list_nodes(N) ->
606
605
% % Create a session to be acquired for a common key
607
606
% % @end
608
607
% %--------------------------------------------------------------------
609
- -spec create_session (string (), pos_integer ()) -> {ok , string ()} | {error , Reason :: string ()}.
608
+ -spec create_session (atom (), pos_integer ()) -> {ok , string ()} | {error , Reason :: string ()}.
610
609
create_session (Name , TTL ) ->
611
610
case consul_session_create ([], maybe_add_acl ([]),
612
611
[{'Name' , Name },
@@ -623,10 +622,10 @@ create_session(Name, TTL) ->
623
622
% % Create session
624
623
% % @end
625
624
% %--------------------------------------------------------------------
626
- -spec consul_session_create (Query , Headers , Body ) -> {ok , string ()} | {error , any ()} when
625
+ -spec consul_session_create (Query , Headers , Body ) -> {ok , term ()} | {error , any ()} when
627
626
Query :: list (),
628
627
Headers :: [{string (), string ()}],
629
- Body :: term ().
628
+ Body :: thoas : input_term ().
630
629
consul_session_create (Query , Headers , Body ) ->
631
630
M = ? CONFIG_MODULE :config_map (? BACKEND_CONFIG_KEY ),
632
631
case serialize_json_body (Body ) of
@@ -652,7 +651,7 @@ consul_session_create(Query, Headers, Body) ->
652
651
% % the JSON serialization library.
653
652
% % @end
654
653
% %--------------------------------------------------------------------
655
- -spec serialize_json_body (term ()) -> {ok , Payload :: binary ()} | {error , atom ()}.
654
+ -spec serialize_json_body (thoas : input_term ()) -> {ok , Payload :: binary ()} | {error , atom ()}.
656
655
serialize_json_body ([]) -> {ok , []};
657
656
serialize_json_body (Payload ) ->
658
657
case rabbit_json :try_encode (Payload ) of
@@ -666,7 +665,7 @@ serialize_json_body(Payload) ->
666
665
% % Extract session ID from Consul response
667
666
% % @end
668
667
% %--------------------------------------------------------------------
669
- -spec get_session_id (term ()) -> string ().
668
+ -spec get_session_id (#{ binary () => term ()} ) -> string ().
670
669
get_session_id (#{<<" ID" >> := ID }) -> binary :bin_to_list (ID ).
671
670
672
671
% %--------------------------------------------------------------------
@@ -675,7 +674,7 @@ get_session_id(#{<<"ID">> := ID}) -> binary:bin_to_list(ID).
675
674
% % Start periodically renewing an existing session ttl
676
675
% % @end
677
676
% %--------------------------------------------------------------------
678
- -spec start_session_ttl_updater (string ()) -> ok .
677
+ -spec start_session_ttl_updater (string ()) -> timer : tref () .
679
678
start_session_ttl_updater (SessionId ) ->
680
679
M = ? CONFIG_MODULE :config_map (? BACKEND_CONFIG_KEY ),
681
680
Interval = get_config_key (consul_svc_ttl , M ),
@@ -693,7 +692,7 @@ start_session_ttl_updater(SessionId) ->
693
692
% % @end
694
693
-spec lock (timer :tref (), string (), pos_integer (), pos_integer ()) -> {ok , string ()} | {error , string ()}.
695
694
lock (TRef , _ , Now , EndTime ) when EndTime < Now ->
696
- timer :cancel (TRef ),
695
+ _ = timer :cancel (TRef ),
697
696
{error , " Acquiring lock taking too long, bailing out" };
698
697
lock (TRef , SessionId , _ , EndTime ) ->
699
698
case acquire_lock (SessionId ) of
@@ -707,15 +706,15 @@ lock(TRef, SessionId, _, EndTime) ->
707
706
ok ->
708
707
lock (TRef , SessionId , erlang :system_time (seconds ), EndTime );
709
708
{error , Reason } ->
710
- timer :cancel (TRef ),
709
+ _ = timer :cancel (TRef ),
711
710
{error , lists :flatten (io_lib :format (" Error waiting for lock release, reason: ~ts " ,[Reason ]))}
712
711
end ;
713
712
{error , Reason } ->
714
- timer :cancel (TRef ),
713
+ _ = timer :cancel (TRef ),
715
714
{error , lists :flatten (io_lib :format (" Error obtaining lock status, reason: ~ts " , [Reason ]))}
716
715
end ;
717
716
{error , Reason } ->
718
- timer :cancel (TRef ),
717
+ _ = timer :cancel (TRef ),
719
718
{error , lists :flatten (io_lib :format (" Error while acquiring lock, reason: ~ts " , [Reason ]))}
720
719
end .
721
720
@@ -747,7 +746,7 @@ release_lock(SessionId) ->
747
746
% %--------------------------------------------------------------------
748
747
-spec consul_kv_write (Path , Query , Headers , Body ) -> {ok , any ()} | {error , string ()} when
749
748
Path :: string (),
750
- Query :: [{string (), string ()}],
749
+ Query :: [{string () | atom () , string ()}],
751
750
Headers :: [{string (), string ()}],
752
751
Body :: term ().
753
752
consul_kv_write (Path , Query , Headers , Body ) ->
@@ -839,7 +838,7 @@ base_path() ->
839
838
wait_for_lock_release (false , _ , _ ) -> ok ;
840
839
wait_for_lock_release (_ , Index , Wait ) ->
841
840
case consul_kv_read (startup_lock_path (),
842
- [{index , Index }, {wait , service_ttl (Wait )}],
841
+ [{" index" , Index }, {" wait" , service_ttl (Wait )}],
843
842
maybe_add_acl ([])) of
844
843
{ok , _ } -> ok ;
845
844
{error , _ } = Err -> Err
0 commit comments