9
9
-behaviour (mirrored_supervisor ).
10
10
11
11
-export ([start_link /0 , init /1 , adjust /2 , stop_child /1 , cleanup_specs /0 ]).
12
+ -export ([id_to_khepri_path /1 ]).
12
13
13
14
-import (rabbit_misc , [pget /2 ]).
14
15
-import (rabbit_data_coercion , [to_map /1 , to_list /1 ]).
@@ -61,10 +62,9 @@ obfuscated_uris_parameters(Def) when is_list(Def) ->
61
62
62
63
child_exists (Name ) ->
63
64
Id = id (Name ),
64
- % % older format, pre 3.13.0 and 3.12.8. See rabbitmq/rabbitmq-server#9894.
65
- OldId = old_id (Name ),
65
+ TmpExpId = temp_experimental_id (Name ),
66
66
lists :any (fun ({ChildId , _ , _ , _ }) ->
67
- ChildId =:= Id orelse ChildId =:= OldId
67
+ ChildId =:= Id orelse ChildId =:= TmpExpId
68
68
end ,
69
69
mirrored_supervisor :which_children (? SUPERVISOR )).
70
70
@@ -74,20 +74,14 @@ stop_child({VHost, ShovelName} = Name) ->
74
74
case get ({shovel_worker_autodelete , Name }) of
75
75
true -> ok ; % % [1]
76
76
_ ->
77
- case stop_and_delete_child (id (Name )) of
77
+ Id = id (Name ),
78
+ case stop_and_delete_child (Id ) of
78
79
ok ->
79
80
ok ;
80
81
{error , not_found } ->
81
- case rabbit_khepri :is_enabled () of
82
- true ->
83
- % % Old id format is not supported by and cannot exist in Khepri
84
- ok ;
85
- false ->
86
- % % try older format, pre 3.13.0 and 3.12.8.
87
- % % See rabbitmq/rabbitmq-server#9894.
88
- _ = stop_and_delete_child (old_id (Name )),
89
- ok
90
- end
82
+ TmpExpId = temp_experimental_id (Name ),
83
+ _ = stop_and_delete_child (TmpExpId ),
84
+ ok
91
85
end ,
92
86
rabbit_shovel_status :remove (Name )
93
87
end ,
@@ -112,48 +106,54 @@ stop_and_delete_child(Id) ->
112
106
113
107
cleanup_specs () ->
114
108
Children = mirrored_supervisor :which_children (? SUPERVISOR ),
115
-
116
- ChildIdSet = sets :from_list ([element (1 , S ) || S <- Children ]),
117
- ParamsSet = params_to_child_ids (rabbit_khepri :is_enabled ()),
118
- F = fun (ChildId , ok ) ->
119
- try
120
- % % The supervisor operation is very unlikely to fail, it's the schema
121
- % % data stores that can make a fuss about a non-existent or non-standard value passed in.
122
- % % For example, an old style Shovel name is an invalid Khepri query path element. MK.
123
- _ = mirrored_supervisor :delete_child (? SUPERVISOR , ChildId )
124
- catch _ :_ :_Stacktrace ->
125
- ok
126
- end ,
127
- ok
128
- end ,
109
+ ParamsSet = sets :from_list (
110
+ [id ({proplists :get_value (vhost , S ),
111
+ proplists :get_value (name , S )})
112
+ || S <- rabbit_runtime_parameters :list_component (
113
+ <<" shovel" >>)]),
129
114
% % Delete any supervisor children that do not have their respective runtime parameters in the database.
130
- SetToCleanUp = sets :subtract (ChildIdSet , ParamsSet ),
131
- ok = sets :fold (F , ok , SetToCleanUp ).
132
-
133
- params_to_child_ids (_KhepriEnabled = true ) ->
134
- % % Old id format simply cannot exist in Khepri because having Khepri enabled
135
- % % means a cluster-wide move to 3.13+, so we can conditionally compute what specs we care about. MK.
136
- sets :from_list ([id ({proplists :get_value (vhost , S ), proplists :get_value (name , S )})
137
- || S <- rabbit_runtime_parameters :list_component (<<" shovel" >>)]);
138
- params_to_child_ids (_KhepriEnabled = false ) ->
139
- sets :from_list (
140
- lists :flatmap (
141
- fun (S ) ->
142
- Name = {proplists :get_value (vhost , S ), proplists :get_value (name , S )},
143
- % % Supervisor Id format was different pre 3.13.0 and 3.12.8.
144
- % % Try both formats to cover the transitionary mixed version cluster period.
145
- [id (Name ), old_id (Name )]
146
- end ,
147
- rabbit_runtime_parameters :list_component (<<" shovel" >>))).
115
+ lists :foreach (
116
+ fun
117
+ ({{VHost , ShovelName } = ChildId , _ , _ , _ })
118
+ when is_binary (VHost ) andalso is_binary (ShovelName ) ->
119
+ case sets :is_element (ChildId , ParamsSet ) of
120
+ false ->
121
+ _ = mirrored_supervisor :delete_child (
122
+ ? SUPERVISOR , ChildId );
123
+ true ->
124
+ ok
125
+ end ;
126
+ ({{List , {VHost , ShovelName } = Id } = ChildId , _ , _ , _ })
127
+ when is_list (List ) andalso
128
+ is_binary (VHost ) andalso is_binary (ShovelName ) ->
129
+ case sets :is_element (Id , ParamsSet ) of
130
+ false ->
131
+ _ = mirrored_supervisor :delete_child (
132
+ ? SUPERVISOR , ChildId );
133
+ true ->
134
+ ok
135
+ end
136
+ end , Children ).
148
137
149
138
% %----------------------------------------------------------------------------
150
139
151
140
init ([]) ->
152
141
{ok , {{one_for_one , 3 , 10 }, []}}.
153
142
154
- id ({V , S } = Name ) ->
155
- {[V , S ], Name }.
156
-
157
- % % older format, pre 3.13.0 and 3.12.8. See rabbitmq/rabbitmq-server#9894
158
- old_id ({_V , _S } = Name ) ->
143
+ id ({VHost , ShovelName } = Name )
144
+ when is_binary (VHost ) andalso is_binary (ShovelName ) ->
159
145
Name .
146
+
147
+ id_to_khepri_path ({VHost , ShovelName })
148
+ when is_binary (VHost ) andalso is_binary (ShovelName ) ->
149
+ [VHost , ShovelName ];
150
+ id_to_khepri_path ({List , {VHost , ShovelName }})
151
+ when is_list (List ) andalso is_binary (VHost ) andalso is_binary (ShovelName ) ->
152
+ [VHost , ShovelName ].
153
+
154
+ % % Temporary experimental format, erroneously backported to some 3.11.x and
155
+ % % 3.12.x releases in rabbitmq/rabbitmq-server#9796.
156
+ % %
157
+ % % See rabbitmq/rabbitmq-server#10306.
158
+ temp_experimental_id ({V , S } = Name ) ->
159
+ {[V , S ], Name }.
0 commit comments