Skip to content

Commit 63a34bd

Browse files
authored
revise BalancerHandler and add MetricSensorHandler (#1666)
1 parent aefda10 commit 63a34bd

File tree

9 files changed

+335
-58
lines changed

9 files changed

+335
-58
lines changed

app/src/main/java/org/astraea/app/web/BalancerHandler.java

Lines changed: 2 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -28,15 +28,13 @@
2828
import java.util.concurrent.CompletableFuture;
2929
import java.util.concurrent.CompletionStage;
3030
import java.util.concurrent.ConcurrentHashMap;
31-
import java.util.concurrent.ConcurrentLinkedQueue;
3231
import java.util.function.Function;
3332
import java.util.function.Supplier;
3433
import java.util.stream.Collectors;
3534
import org.astraea.common.Configuration;
3635
import org.astraea.common.Utils;
3736
import org.astraea.common.admin.Admin;
3837
import org.astraea.common.admin.ClusterInfo;
39-
import org.astraea.common.admin.NodeInfo;
4038
import org.astraea.common.admin.Replica;
4139
import org.astraea.common.balancer.AlgorithmConfig;
4240
import org.astraea.common.balancer.Balancer;
@@ -48,8 +46,6 @@
4846
import org.astraea.common.cost.HasMoveCost;
4947
import org.astraea.common.cost.MigrationCost;
5048
import org.astraea.common.json.TypeRef;
51-
import org.astraea.common.metrics.MBeanClient;
52-
import org.astraea.common.metrics.collector.MetricSensor;
5349
import org.astraea.common.metrics.collector.MetricStore;
5450

5551
class BalancerHandler implements Handler, AutoCloseable {
@@ -61,35 +57,12 @@ class BalancerHandler implements Handler, AutoCloseable {
6157
new ConcurrentHashMap<>();
6258
private final Map<String, CompletionStage<Void>> planExecutions = new ConcurrentHashMap<>();
6359

64-
private final Collection<MetricSensor> sensors = new ConcurrentLinkedQueue<>();
65-
6660
private final MetricStore metricStore;
6761

68-
BalancerHandler(Admin admin, Function<Integer, Integer> jmxPortMapper) {
62+
BalancerHandler(Admin admin, MetricStore metricStore) {
6963
this.admin = admin;
7064
this.balancerConsole = BalancerConsole.create(admin);
71-
Supplier<CompletionStage<Map<Integer, MBeanClient>>> clientSupplier =
72-
() ->
73-
admin
74-
.brokers()
75-
.thenApply(
76-
brokers ->
77-
brokers.stream()
78-
.collect(
79-
Collectors.toUnmodifiableMap(
80-
NodeInfo::id,
81-
b -> MBeanClient.jndi(b.host(), jmxPortMapper.apply(b.id())))));
82-
this.metricStore =
83-
MetricStore.builder()
84-
.beanExpiration(Duration.ofSeconds(90))
85-
.localReceiver(clientSupplier)
86-
.sensorsSupplier(
87-
() ->
88-
sensors.stream()
89-
.collect(
90-
Collectors.toUnmodifiableMap(
91-
Function.identity(), ignored -> (id, ee) -> {})))
92-
.build();
65+
this.metricStore = metricStore;
9366
}
9467

9568
@Override
@@ -117,9 +90,6 @@ public CompletionStage<Response> post(Channel channel) {
11790
Utils.construct(request.balancerClasspath, Balancer.class, request.balancerConfig);
11891
synchronized (this) {
11992
var taskId = UUID.randomUUID().toString();
120-
request.algorithmConfig.clusterCostFunction().metricSensor().ifPresent(sensors::add);
121-
request.algorithmConfig.moveCostFunction().metricSensor().ifPresent(sensors::add);
122-
12393
var task =
12494
balancerConsole
12595
.launchRebalancePlanGeneration()
@@ -262,7 +232,6 @@ static PostRequestWrapper parsePostRequestWrapper(
262232
static class BalancerPostRequest implements Request {
263233

264234
String balancer = GreedyBalancer.class.getName();
265-
266235
Map<String, String> balancerConfig = Map.of();
267236
Map<String, String> costConfig = Map.of();
268237
Duration timeout = Duration.ofSeconds(3);
Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.astraea.app.web;
18+
19+
import java.util.Set;
20+
import java.util.concurrent.CompletableFuture;
21+
import java.util.concurrent.CompletionStage;
22+
import java.util.stream.Collectors;
23+
import org.astraea.app.web.WebService.Sensors;
24+
import org.astraea.common.Configuration;
25+
import org.astraea.common.Utils;
26+
import org.astraea.common.cost.CostFunction;
27+
import org.astraea.common.json.TypeRef;
28+
29+
public class SensorHandler implements Handler {
30+
31+
private final Sensors sensors;
32+
private static final Set<String> DEFAULT_COSTS =
33+
Set.of(
34+
"org.astraea.common.cost.ReplicaLeaderCost",
35+
"org.astraea.common.cost.NetworkIngressCost");
36+
37+
SensorHandler(Sensors sensors) {
38+
this.sensors = sensors;
39+
}
40+
41+
@Override
42+
public CompletionStage<Response> get(Channel channel) {
43+
var costs =
44+
sensors.metricSensors().isEmpty()
45+
? DEFAULT_COSTS
46+
: sensors.metricSensors().stream()
47+
.map(x -> x.getClass().getName())
48+
.collect(Collectors.toSet());
49+
return CompletableFuture.completedFuture(new Response(costs));
50+
}
51+
52+
@Override
53+
public CompletionStage<Response> post(Channel channel) {
54+
var metricSensorPostRequest = channel.request(TypeRef.of(MetricSensorPostRequest.class));
55+
var costs = costs(metricSensorPostRequest.costs);
56+
sensors.clearSensors();
57+
costs.forEach(costFunction -> costFunction.metricSensor().ifPresent(sensors::addSensors));
58+
return CompletableFuture.completedFuture(new Response(metricSensorPostRequest.costs));
59+
}
60+
61+
private static Set<CostFunction> costs(Set<String> costs) {
62+
if (costs.isEmpty()) throw new IllegalArgumentException("costs is not specified");
63+
return Utils.costFunctions(costs, CostFunction.class, Configuration.EMPTY);
64+
}
65+
66+
static class MetricSensorPostRequest implements Request {
67+
Set<String> costs = DEFAULT_COSTS;
68+
}
69+
70+
static class Response implements org.astraea.app.web.Response {
71+
final Set<String> costs;
72+
73+
Response(Set<String> costs) {
74+
this.costs = costs;
75+
}
76+
}
77+
}

app/src/main/java/org/astraea/app/web/WebService.java

Lines changed: 72 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,22 +21,61 @@
2121
import com.sun.net.httpserver.HttpServer;
2222
import java.net.InetSocketAddress;
2323
import java.time.Duration;
24+
import java.util.Collection;
2425
import java.util.Map;
26+
import java.util.concurrent.CompletionStage;
27+
import java.util.concurrent.ConcurrentLinkedQueue;
2528
import java.util.concurrent.TimeUnit;
2629
import java.util.function.Function;
30+
import java.util.function.Supplier;
31+
import java.util.stream.Collectors;
2732
import org.astraea.app.argument.DurationField;
2833
import org.astraea.app.argument.IntegerMapField;
2934
import org.astraea.app.argument.NonNegativeIntegerField;
3035
import org.astraea.common.Utils;
3136
import org.astraea.common.admin.Admin;
37+
import org.astraea.common.admin.NodeInfo;
38+
import org.astraea.common.metrics.MBeanClient;
39+
import org.astraea.common.metrics.collector.MetricSensor;
40+
import org.astraea.common.metrics.collector.MetricStore;
3241

3342
public class WebService implements AutoCloseable {
3443

3544
private final HttpServer server;
3645
private final Admin admin;
46+
private final Sensors sensors = new Sensors();
3747

38-
public WebService(Admin admin, int port, Function<Integer, Integer> brokerIdToJmxPort) {
48+
public WebService(
49+
Admin admin,
50+
int port,
51+
Function<Integer, Integer> brokerIdToJmxPort,
52+
Duration beanExpiration) {
3953
this.admin = admin;
54+
Supplier<CompletionStage<Map<Integer, MBeanClient>>> clientSupplier =
55+
() ->
56+
admin
57+
.brokers()
58+
.thenApply(
59+
brokers ->
60+
brokers.stream()
61+
.collect(
62+
Collectors.toUnmodifiableMap(
63+
NodeInfo::id,
64+
b ->
65+
MBeanClient.jndi(
66+
b.host(), brokerIdToJmxPort.apply(b.id())))));
67+
var metricStore =
68+
MetricStore.builder()
69+
.beanExpiration(beanExpiration)
70+
.localReceiver(clientSupplier)
71+
.sensorsSupplier(
72+
() ->
73+
sensors.metricSensors().stream()
74+
.distinct()
75+
.collect(
76+
Collectors.toUnmodifiableMap(
77+
Function.identity(), ignored -> (id, ee) -> {})))
78+
.build();
4079
server = Utils.packException(() -> HttpServer.create(new InetSocketAddress(port), 0));
4180
server.createContext("/topics", to(new TopicHandler(admin)));
4281
server.createContext("/groups", to(new GroupHandler(admin)));
@@ -45,9 +84,10 @@ public WebService(Admin admin, int port, Function<Integer, Integer> brokerIdToJm
4584
server.createContext("/quotas", to(new QuotaHandler(admin)));
4685
server.createContext("/transactions", to(new TransactionHandler(admin)));
4786
server.createContext("/beans", to(new BeanHandler(admin, brokerIdToJmxPort)));
87+
server.createContext("/sensors", to(new SensorHandler(sensors)));
4888
server.createContext("/records", to(new RecordHandler(admin)));
4989
server.createContext("/reassignments", to(new ReassignmentHandler(admin)));
50-
server.createContext("/balancer", to(new BalancerHandler(admin, brokerIdToJmxPort)));
90+
server.createContext("/balancer", to(new BalancerHandler(admin, metricStore)));
5191
server.createContext("/throttles", to(new ThrottleHandler(admin)));
5292
server.start();
5393
}
@@ -64,7 +104,9 @@ public void close() {
64104

65105
public static void main(String[] args) throws Exception {
66106
var arg = org.astraea.app.argument.Argument.parse(new Argument(), args);
67-
try (var service = new WebService(Admin.of(arg.configs()), arg.port, arg::jmxPortMapping)) {
107+
try (var service =
108+
new WebService(
109+
Admin.of(arg.configs()), arg.port, arg::jmxPortMapping, arg.beanExpiration)) {
68110
if (arg.ttl == null) {
69111
System.out.println("enter ctrl + c to terminate web service");
70112
TimeUnit.MILLISECONDS.sleep(Long.MAX_VALUE);
@@ -118,5 +160,32 @@ int jmxPortMapping(int brokerId) {
118160
validateWith = DurationField.class,
119161
converter = DurationField.class)
120162
Duration ttl = null;
163+
164+
@Parameter(
165+
names = {"--bean.expiration"},
166+
description = "Duration: the life of collected metrics",
167+
validateWith = DurationField.class,
168+
converter = DurationField.class)
169+
Duration beanExpiration = Duration.ofHours(1);
170+
}
171+
172+
static class Sensors {
173+
private final Collection<MetricSensor> sensors;
174+
175+
Sensors() {
176+
sensors = new ConcurrentLinkedQueue<>();
177+
}
178+
179+
Collection<MetricSensor> metricSensors() {
180+
return sensors;
181+
}
182+
183+
void clearSensors() {
184+
sensors.clear();
185+
}
186+
187+
void addSensors(MetricSensor metricSensor) {
188+
sensors.add(metricSensor);
189+
}
121190
}
122191
}

0 commit comments

Comments
 (0)