Skip to content

Commit 451850f

Browse files
Merge pull request #13836 from rabbitmq/super-streams-ui
Management UI: new page and elements for superstreams (partitioned streams)
2 parents c458cba + ef09b19 commit 451850f

File tree

5 files changed

+282
-6
lines changed

5 files changed

+282
-6
lines changed

deps/rabbitmq_stream/src/rabbit_stream_manager.erl

+4-4
Original file line numberDiff line numberDiff line change
@@ -406,7 +406,7 @@ stream_queue_arguments(ArgumentsAcc, Arguments)
406406
stream_queue_arguments(ArgumentsAcc,
407407
#{<<"max-length-bytes">> := Value} = Arguments) ->
408408
stream_queue_arguments([{<<"x-max-length-bytes">>, long,
409-
binary_to_integer(Value)}]
409+
rabbit_data_coercion:to_integer(Value)}]
410410
++ ArgumentsAcc,
411411
maps:remove(<<"max-length-bytes">>, Arguments));
412412
stream_queue_arguments(ArgumentsAcc,
@@ -418,14 +418,14 @@ stream_queue_arguments(ArgumentsAcc,
418418
#{<<"stream-max-segment-size-bytes">> := Value} =
419419
Arguments) ->
420420
stream_queue_arguments([{<<"x-stream-max-segment-size-bytes">>, long,
421-
binary_to_integer(Value)}]
421+
rabbit_data_coercion:to_integer(Value)}]
422422
++ ArgumentsAcc,
423423
maps:remove(<<"stream-max-segment-size-bytes">>,
424424
Arguments));
425425
stream_queue_arguments(ArgumentsAcc,
426426
#{<<"initial-cluster-size">> := Value} = Arguments) ->
427427
stream_queue_arguments([{<<"x-initial-cluster-size">>, long,
428-
binary_to_integer(Value)}]
428+
rabbit_data_coercion:to_integer(Value)}]
429429
++ ArgumentsAcc,
430430
maps:remove(<<"initial-cluster-size">>, Arguments));
431431
stream_queue_arguments(ArgumentsAcc,
@@ -437,7 +437,7 @@ stream_queue_arguments(ArgumentsAcc,
437437
stream_queue_arguments(ArgumentsAcc,
438438
#{<<"stream-filter-size-bytes">> := Value} = Arguments) ->
439439
stream_queue_arguments([{<<"x-stream-filter-size-bytes">>, long,
440-
binary_to_integer(Value)}]
440+
rabbit_data_coercion:to_integer(Value)}]
441441
++ ArgumentsAcc,
442442
maps:remove(<<"stream-filter-size-bytes">>, Arguments));
443443
stream_queue_arguments(ArgumentsAcc, _Arguments) ->

deps/rabbitmq_stream_management/priv/www/js/stream.js

+10-1
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,15 @@ dispatcher_add(function(sammy) {
1010
'consumers': '/stream/connections/' + vhost + '/' + name + '/consumers',
1111
'publishers': '/stream/connections/' + vhost + '/' + name + '/publishers'},
1212
'streamConnection', '#/stream/connections');
13-
});
13+
});
14+
sammy.get('#/stream/super-streams', function() {
15+
render({'vhosts': '/vhosts'}, 'superStreams', '#/stream/super-streams')
16+
});
17+
sammy.put('#/stream/super-streams', function() {
18+
put_cast_params(this, '/stream/super-streams/:vhost/:name',
19+
['name', 'pattern', 'policy'], ['priority'], []);
20+
location.href = "/#/queues";
21+
});
1422
// not exactly dispatcher stuff, but we have to make sure this is called before
1523
// HTTP requests are made in case of refresh of the queue page
1624
QUEUE_EXTRA_CONTENT_REQUESTS.push(function(vhost, queue) {
@@ -33,6 +41,7 @@ dispatcher_add(function(sammy) {
3341
});
3442

3543
NAVIGATION['Stream Connections'] = ['#/stream/connections', "monitoring"];
44+
NAVIGATION['Super Streams'] = ['#/stream/super-streams', "management"];
3645

3746
var ALL_STREAM_CONNECTION_COLUMNS =
3847
{'Overview': [['user', 'User name', true],
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
<h2> Super Streams </h2>
2+
3+
<% if (ac.canAccessVhosts()) { %>
4+
<div class="section">
5+
<h2>Add a new super stream</h2>
6+
<div class="hider">
7+
<form action="#/stream/super-streams" method="put">
8+
<table class="form">
9+
<% if (display.vhosts) { %>
10+
<tr>
11+
<th><label>Virtual host:</label></th>
12+
<td>
13+
<select name="vhost">
14+
<% for (var i = 0; i < vhosts.length; i++) { %>
15+
<option value="<%= fmt_string(vhosts[i].name) %>" <%= (vhosts[i].name === current_vhost) ? 'selected="selected"' : '' %>><%= fmt_string(vhosts[i].name) %></option>
16+
<% } %>
17+
</select>
18+
</td>
19+
</tr>
20+
<% } else { %>
21+
<tr><td><input type="hidden" name="vhost" value="<%= fmt_string(vhosts[0].name) %>"/></td></tr>
22+
<% } %>
23+
<tr>
24+
<th><label>Name:</label></th>
25+
<td><input type="text" name="name"/><span class="mand">*</span></td>
26+
</tr>
27+
<tr>
28+
<th>
29+
<label>
30+
<select name="has-partitions" class="narrow controls-appearance">
31+
<option value="partitions" selected="selected">Partitions:</option>
32+
<option value="binding-keys">Binding keys:</option>
33+
</select>
34+
</label>
35+
</th>
36+
<td>
37+
<div id="partitions-div">
38+
<input type="partitions" name="partitions" />
39+
<span class="mand">*</span><br/>
40+
</div>
41+
<div id="binding-keys-div" style="display: none;">
42+
<input type="binding-keys" name="binding-keys" />
43+
<span class="mand">*</span><br/>
44+
</div>
45+
</td>
46+
</tr>
47+
<tr>
48+
<th><label>Arguments:</label></th>
49+
<td>
50+
<div class="multifield" id="arguments"></div>
51+
<table class="argument-links">
52+
<tr>
53+
<td>Add</td>
54+
<td>
55+
<span class="argument-link" field="arguments" key="max-length-bytes" type="number">Max length bytes</span> <span class="help" id="queue-max-length-bytes"></span>
56+
| <span class="argument-link" field="arguments" key="max-age" type="string">Max time retention</span><span class="help" id="queue-max-age"></span>
57+
| <span class="argument-link" field="arguments" key="stream-max-segment-size-bytes" type="number">Max segment size in bytes</span><span class="help" id="queue-stream-max-segment-size-bytes"></span></br>
58+
| <span class="argument-link" field="arguments" key="initial-cluster-size" type="number">Initial cluster size</span><span class="help" id="queue-initial-cluster-size"></span>
59+
| <span class="argument-link" field="arguments" key="queue-leader-locator" type="string">Leader locator</span><span class="help" id="queue-leader-locator"></span>
60+
</td>
61+
</tr>
62+
</table>
63+
</td>
64+
</tr>
65+
</table>
66+
<input type="submit" value="Add super stream"/>
67+
</form>
68+
</div>
69+
</div>
70+
<% } %>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,165 @@
1+
%% This Source Code Form is subject to the terms of the Mozilla Public
2+
%% License, v. 2.0. If a copy of the MPL was not distributed with this
3+
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
4+
%%
5+
%% Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
6+
%%
7+
8+
-module(rabbit_stream_super_stream_mgmt).
9+
10+
-behaviour(rabbit_mgmt_extension).
11+
12+
-export([dispatcher/0,
13+
web_ui/0]).
14+
-export([init/2,
15+
content_types_accepted/2,
16+
is_authorized/2,
17+
resource_exists/2,
18+
allowed_methods/2,
19+
accept_content/2]).
20+
-export([variances/2]).
21+
22+
-include_lib("rabbitmq_management_agent/include/rabbit_mgmt_records.hrl").
23+
-include_lib("rabbit_common/include/rabbit.hrl").
24+
25+
-define(DEFAULT_RPC_TIMEOUT, 30_000).
26+
27+
dispatcher() ->
28+
[{"/stream/super-streams/:vhost/:name", ?MODULE, []}].
29+
30+
web_ui() ->
31+
[].
32+
33+
%%--------------------------------------------------------------------
34+
35+
init(Req, _State) ->
36+
{cowboy_rest,
37+
rabbit_mgmt_headers:set_common_permission_headers(Req, ?MODULE),
38+
#context{}}.
39+
40+
variances(Req, Context) ->
41+
{[<<"accept-encoding">>, <<"origin">>], Req, Context}.
42+
43+
content_types_accepted(ReqData, Context) ->
44+
{[{{<<"application">>, <<"json">>, '*'}, accept_content}], ReqData, Context}.
45+
46+
allowed_methods(ReqData, Context) ->
47+
{[<<"PUT">>, <<"OPTIONS">>], ReqData, Context}.
48+
49+
resource_exists(ReqData, Context) ->
50+
%% just checking that the vhost requested exists
51+
{case rabbit_mgmt_util:all_or_one_vhost(ReqData, fun (_) -> [] end) of
52+
vhost_not_found -> false;
53+
_ -> true
54+
end, ReqData, Context}.
55+
56+
is_authorized(ReqData, Context) ->
57+
rabbit_mgmt_util:is_authorized_vhost(ReqData, Context).
58+
59+
accept_content(ReqData0, #context{user = #user{username = ActingUser}} = Context) ->
60+
%% TODO validate arguments?
61+
VHost = rabbit_mgmt_util:id(vhost, ReqData0),
62+
Name = rabbit_mgmt_util:id(name, ReqData0),
63+
rabbit_mgmt_util:with_decode(
64+
[], ReqData0, Context,
65+
fun([], BodyMap, ReqData) ->
66+
PartitionsBin = maps:get(partitions, BodyMap, undefined),
67+
BindingKeysStr = maps:get('binding-keys', BodyMap, undefined),
68+
case validate_partitions_or_binding_keys(PartitionsBin, BindingKeysStr, ReqData, Context) of
69+
ok ->
70+
Arguments = maps:get(arguments, BodyMap, #{}),
71+
Node = get_node(BodyMap),
72+
case PartitionsBin of
73+
undefined ->
74+
BindingKeys = binding_keys(BindingKeysStr),
75+
Streams = streams_from_binding_keys(Name, BindingKeys),
76+
create_super_stream(Node, VHost, Name, Streams,
77+
Arguments, BindingKeys, ActingUser,
78+
ReqData, Context);
79+
_ ->
80+
case validate_partitions(PartitionsBin, ReqData, Context) of
81+
Partitions when is_integer(Partitions) ->
82+
Streams = streams_from_partitions(Name, Partitions),
83+
RoutingKeys = routing_keys(Partitions),
84+
create_super_stream(Node, VHost, Name, Streams,
85+
Arguments, RoutingKeys, ActingUser,
86+
ReqData, Context);
87+
Error ->
88+
Error
89+
end
90+
end;
91+
Error ->
92+
Error
93+
end
94+
end).
95+
96+
%%-------------------------------------------------------------------
97+
get_node(Props) ->
98+
case maps:get(<<"node">>, Props, undefined) of
99+
undefined -> node();
100+
N -> rabbit_nodes:make(
101+
binary_to_list(N))
102+
end.
103+
104+
binding_keys(BindingKeysStr) ->
105+
[rabbit_data_coercion:to_binary(
106+
string:strip(K))
107+
|| K
108+
<- string:tokens(
109+
rabbit_data_coercion:to_list(BindingKeysStr), ",")].
110+
111+
routing_keys(Partitions) ->
112+
[integer_to_binary(K) || K <- lists:seq(0, Partitions - 1)].
113+
114+
streams_from_binding_keys(Name, BindingKeys) ->
115+
[list_to_binary(binary_to_list(Name)
116+
++ "-"
117+
++ binary_to_list(K))
118+
|| K <- BindingKeys].
119+
120+
streams_from_partitions(Name, Partitions) ->
121+
[list_to_binary(binary_to_list(Name)
122+
++ "-"
123+
++ integer_to_list(K))
124+
|| K <- lists:seq(0, Partitions - 1)].
125+
126+
create_super_stream(NodeName, VHost, SuperStream, Streams, Arguments,
127+
RoutingKeys, ActingUser, ReqData, Context) ->
128+
case rabbit_misc:rpc_call(NodeName,
129+
rabbit_stream_manager,
130+
create_super_stream,
131+
[VHost,
132+
SuperStream,
133+
Streams,
134+
Arguments,
135+
RoutingKeys,
136+
ActingUser],
137+
?DEFAULT_RPC_TIMEOUT) of
138+
ok ->
139+
{true, ReqData, Context};
140+
{error, Reason} ->
141+
rabbit_mgmt_util:bad_request(io_lib:format("~p", [Reason]),
142+
ReqData, Context)
143+
end.
144+
145+
validate_partitions_or_binding_keys(undefined, undefined, ReqData, Context) ->
146+
rabbit_mgmt_util:bad_request("Must specify partitions or binding keys", ReqData, Context);
147+
validate_partitions_or_binding_keys(_, undefined, _, _) ->
148+
ok;
149+
validate_partitions_or_binding_keys(undefined, _, _, _) ->
150+
ok;
151+
validate_partitions_or_binding_keys(_, _, ReqData, Context) ->
152+
rabbit_mgmt_util:bad_request("Specify partitions or binding keys, not both", ReqData, Context).
153+
154+
validate_partitions(PartitionsBin, ReqData, Context) ->
155+
try
156+
case rabbit_data_coercion:to_integer(PartitionsBin) of
157+
Int when Int < 1 ->
158+
rabbit_mgmt_util:bad_request("The partition number must be greater than 0", ReqData, Context);
159+
Int ->
160+
Int
161+
end
162+
catch
163+
_:_ ->
164+
rabbit_mgmt_util:bad_request("The partitions must be a number", ReqData, Context)
165+
end.

deps/rabbitmq_stream_management/test/http_SUITE.erl

+33-1
Original file line numberDiff line numberDiff line change
@@ -10,13 +10,20 @@
1010
-include_lib("rabbit_common/include/rabbit_framing.hrl").
1111
-include_lib("rabbitmq_ct_helpers/include/rabbit_mgmt_test.hrl").
1212

13+
-import(rabbit_mgmt_test_util, [
14+
http_put/4
15+
]).
16+
1317
-compile(export_all).
1418

1519
all() ->
1620
[{group, non_parallel_tests}].
1721

1822
groups() ->
19-
[{non_parallel_tests, [], [stream_management]}].
23+
[{non_parallel_tests, [], [
24+
stream_management,
25+
create_super_stream
26+
]}].
2027

2128
%% -------------------------------------------------------------------
2229
%% Testsuite setup/teardown.
@@ -27,6 +34,7 @@ init_per_suite(Config) ->
2734
true ->
2835
{skip, "suite is not mixed versions compatible"};
2936
_ ->
37+
inets:start(),
3038
rabbit_ct_helpers:log_environment(),
3139
Config1 =
3240
rabbit_ct_helpers:set_config(Config,
@@ -108,6 +116,30 @@ stream_management(Config) ->
108116
{"MANAGEMENT_PORT=~b", [ManagementPortNode]}]),
109117
{ok, _} = MakeResult.
110118

119+
create_super_stream(Config) ->
120+
http_put(Config, "/stream/super-streams/%2F/carrots", #{partitions => 3,
121+
'binding-keys' => "streamA"},
122+
?BAD_REQUEST),
123+
http_put(Config, "/stream/super-streams/%2F/carrots", #{partitions => "this is not a partition"},
124+
?BAD_REQUEST),
125+
http_put(Config, "/stream/super-streams/%2F/carrots", #{partitions => 3},
126+
{group, '2xx'}),
127+
http_put(Config, "/stream/super-streams/%2F/cucumber", #{'binding-keys' => "fresh-cucumber"},
128+
{group, '2xx'}),
129+
http_put(Config, "/stream/super-streams/%2F/aubergine",
130+
#{partitions => 3,
131+
arguments => #{'max-length-bytes' => 1000000,
132+
'max-age' => <<"1h">>,
133+
'stream-max-segment-size' => 500,
134+
'initial-cluster-size' => 2,
135+
'queue-leader-locator' => <<"client-local">>}},
136+
{group, '2xx'}),
137+
http_put(Config, "/stream/super-streams/%2F/watermelon",
138+
#{partitions => 3,
139+
arguments => #{'queue-leader-locator' => <<"remote">>}},
140+
?BAD_REQUEST),
141+
ok.
142+
111143
get_stream_port(Config) ->
112144
rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_stream).
113145

0 commit comments

Comments
 (0)