From 873b6ad6aa6727f66dbd084a069425ab5299ead2 Mon Sep 17 00:00:00 2001 From: qoo332001 Date: Tue, 18 Apr 2023 23:46:38 +0800 Subject: [PATCH 1/6] revise BalancerHandler and add MetricSensorHandler --- .../org/astraea/app/web/BalancerHandler.java | 35 +----------- ...nHandler.java => MetricSensorHandler.java} | 44 ++++++++++++++- .../java/org/astraea/app/web/WebService.java | 56 +++++++++++++++++-- ...Test.java => MetricSensorHandlerTest.java} | 13 +++-- 4 files changed, 104 insertions(+), 44 deletions(-) rename app/src/main/java/org/astraea/app/web/{BeanHandler.java => MetricSensorHandler.java} (69%) rename app/src/test/java/org/astraea/app/web/{BeanHandlerTest.java => MetricSensorHandlerTest.java} (83%) diff --git a/app/src/main/java/org/astraea/app/web/BalancerHandler.java b/app/src/main/java/org/astraea/app/web/BalancerHandler.java index 75c1ad93c2..372790a3c0 100644 --- a/app/src/main/java/org/astraea/app/web/BalancerHandler.java +++ b/app/src/main/java/org/astraea/app/web/BalancerHandler.java @@ -28,7 +28,6 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentLinkedQueue; import java.util.function.Function; import java.util.function.Supplier; import java.util.stream.Collectors; @@ -36,7 +35,6 @@ import org.astraea.common.Utils; import org.astraea.common.admin.Admin; import org.astraea.common.admin.ClusterInfo; -import org.astraea.common.admin.NodeInfo; import org.astraea.common.admin.Replica; import org.astraea.common.balancer.AlgorithmConfig; import org.astraea.common.balancer.Balancer; @@ -48,8 +46,6 @@ import org.astraea.common.cost.HasMoveCost; import org.astraea.common.cost.MigrationCost; import org.astraea.common.json.TypeRef; -import org.astraea.common.metrics.MBeanClient; -import org.astraea.common.metrics.collector.MetricSensor; import org.astraea.common.metrics.collector.MetricStore; class BalancerHandler implements Handler, AutoCloseable { @@ -61,35 +57,12 @@ class BalancerHandler implements Handler, AutoCloseable { new ConcurrentHashMap<>(); private final Map> planExecutions = new ConcurrentHashMap<>(); - private final Collection sensors = new ConcurrentLinkedQueue<>(); - private final MetricStore metricStore; - BalancerHandler(Admin admin, Function jmxPortMapper) { + BalancerHandler(Admin admin, MetricStore metricStore) { this.admin = admin; this.balancerConsole = BalancerConsole.create(admin); - Supplier>> clientSupplier = - () -> - admin - .brokers() - .thenApply( - brokers -> - brokers.stream() - .collect( - Collectors.toUnmodifiableMap( - NodeInfo::id, - b -> MBeanClient.jndi(b.host(), jmxPortMapper.apply(b.id()))))); - this.metricStore = - MetricStore.builder() - .beanExpiration(Duration.ofSeconds(90)) - .localReceiver(clientSupplier) - .sensorsSupplier( - () -> - sensors.stream() - .collect( - Collectors.toUnmodifiableMap( - Function.identity(), ignored -> (id, ee) -> {}))) - .build(); + this.metricStore = metricStore; } @Override @@ -117,9 +90,6 @@ public CompletionStage post(Channel channel) { Utils.construct(request.balancerClasspath, Balancer.class, request.balancerConfig); synchronized (this) { var taskId = UUID.randomUUID().toString(); - request.algorithmConfig.clusterCostFunction().metricSensor().ifPresent(sensors::add); - request.algorithmConfig.moveCostFunction().metricSensor().ifPresent(sensors::add); - var task = balancerConsole .launchRebalancePlanGeneration() @@ -262,7 +232,6 @@ static PostRequestWrapper parsePostRequestWrapper( static class BalancerPostRequest implements Request { String balancer = GreedyBalancer.class.getName(); - Map balancerConfig = Map.of(); Map costConfig = Map.of(); Duration timeout = Duration.ofSeconds(3); diff --git a/app/src/main/java/org/astraea/app/web/BeanHandler.java b/app/src/main/java/org/astraea/app/web/MetricSensorHandler.java similarity index 69% rename from app/src/main/java/org/astraea/app/web/BeanHandler.java rename to app/src/main/java/org/astraea/app/web/MetricSensorHandler.java index 26dd4edc9f..dd46ceeb0c 100644 --- a/app/src/main/java/org/astraea/app/web/BeanHandler.java +++ b/app/src/main/java/org/astraea/app/web/MetricSensorHandler.java @@ -16,22 +16,42 @@ */ package org.astraea.app.web; +import java.util.Collection; import java.util.List; +import java.util.Set; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; import java.util.function.Function; import java.util.stream.Collectors; +import org.astraea.common.Configuration; +import org.astraea.common.Utils; import org.astraea.common.admin.Admin; +import org.astraea.common.cost.CostFunction; +import org.astraea.common.json.TypeRef; import org.astraea.common.metrics.BeanObject; import org.astraea.common.metrics.BeanQuery; import org.astraea.common.metrics.MBeanClient; +import org.astraea.common.metrics.collector.MetricSensor; -public class BeanHandler implements Handler { +public class MetricSensorHandler implements Handler { private final Admin admin; private final Function jmxPorts; + private final Collection sensors; - BeanHandler(Admin admin, Function jmxPorts) { + MetricSensorHandler( + Admin admin, Function jmxPorts, Collection sensors) { this.admin = admin; this.jmxPorts = jmxPorts; + this.sensors = sensors; + } + + @Override + public CompletionStage post(Channel channel) { + var metricSensorPostRequest = channel.request(TypeRef.of(MetricSensorPostRequest.class)); + var costs = costs(metricSensorPostRequest.costs); + if (!costs.isEmpty()) sensors.clear(); + costs.forEach(costFunction -> costFunction.metricSensor().ifPresent(sensors::add)); + return CompletableFuture.completedFuture(new PostResponse(metricSensorPostRequest.costs)); } @Override @@ -57,6 +77,11 @@ public CompletionStage get(Channel channel) { .collect(Collectors.toUnmodifiableList()))); } + private static Set costs(Set costs) { + if (costs.isEmpty()) throw new IllegalArgumentException("costs is not specified"); + return Utils.costFunctions(costs, CostFunction.class, Configuration.EMPTY); + } + static class Property implements Response { final String key; final String value; @@ -112,4 +137,19 @@ static class NodeBeans implements Response { this.nodeBeans = nodeBeans; } } + + static class MetricSensorPostRequest implements Request { + Set costs = + Set.of( + "org.astraea.common.cost.ReplicaLeaderCost", + "org.astraea.common.cost.NetworkIngressCost"); + } + + static class PostResponse implements Response { + final Set costs; + + PostResponse(Set costs) { + this.costs = costs; + } + } } diff --git a/app/src/main/java/org/astraea/app/web/WebService.java b/app/src/main/java/org/astraea/app/web/WebService.java index 3fa2b56330..05b000a2a4 100644 --- a/app/src/main/java/org/astraea/app/web/WebService.java +++ b/app/src/main/java/org/astraea/app/web/WebService.java @@ -21,22 +21,61 @@ import com.sun.net.httpserver.HttpServer; import java.net.InetSocketAddress; import java.time.Duration; +import java.util.Collection; import java.util.Map; +import java.util.concurrent.CompletionStage; +import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.TimeUnit; import java.util.function.Function; +import java.util.function.Supplier; +import java.util.stream.Collectors; import org.astraea.app.argument.DurationField; import org.astraea.app.argument.IntegerMapField; import org.astraea.app.argument.NonNegativeIntegerField; import org.astraea.common.Utils; import org.astraea.common.admin.Admin; +import org.astraea.common.admin.NodeInfo; +import org.astraea.common.metrics.MBeanClient; +import org.astraea.common.metrics.collector.MetricSensor; +import org.astraea.common.metrics.collector.MetricStore; public class WebService implements AutoCloseable { private final HttpServer server; private final Admin admin; + private final Collection sensors = new ConcurrentLinkedQueue<>(); - public WebService(Admin admin, int port, Function brokerIdToJmxPort) { + public WebService( + Admin admin, + int port, + Function brokerIdToJmxPort, + Duration beanExpiration) { this.admin = admin; + Supplier>> clientSupplier = + () -> + admin + .brokers() + .thenApply( + brokers -> + brokers.stream() + .collect( + Collectors.toUnmodifiableMap( + NodeInfo::id, + b -> + MBeanClient.jndi( + b.host(), brokerIdToJmxPort.apply(b.id()))))); + var metricStore = + MetricStore.builder() + .beanExpiration(beanExpiration) + .localReceiver(clientSupplier) + .sensorsSupplier( + () -> + sensors.stream() + .distinct() + .collect( + Collectors.toUnmodifiableMap( + Function.identity(), ignored -> (id, ee) -> {}))) + .build(); server = Utils.packException(() -> HttpServer.create(new InetSocketAddress(port), 0)); server.createContext("/topics", to(new TopicHandler(admin))); server.createContext("/groups", to(new GroupHandler(admin))); @@ -44,10 +83,10 @@ public WebService(Admin admin, int port, Function brokerIdToJm server.createContext("/producers", to(new ProducerHandler(admin))); server.createContext("/quotas", to(new QuotaHandler(admin))); server.createContext("/transactions", to(new TransactionHandler(admin))); - server.createContext("/beans", to(new BeanHandler(admin, brokerIdToJmxPort))); + server.createContext("/beans", to(new MetricSensorHandler(admin, brokerIdToJmxPort, sensors))); server.createContext("/records", to(new RecordHandler(admin))); server.createContext("/reassignments", to(new ReassignmentHandler(admin))); - server.createContext("/balancer", to(new BalancerHandler(admin, brokerIdToJmxPort))); + server.createContext("/balancer", to(new BalancerHandler(admin, metricStore))); server.createContext("/throttles", to(new ThrottleHandler(admin))); server.start(); } @@ -64,7 +103,9 @@ public void close() { public static void main(String[] args) throws Exception { var arg = org.astraea.app.argument.Argument.parse(new Argument(), args); - try (var service = new WebService(Admin.of(arg.configs()), arg.port, arg::jmxPortMapping)) { + try (var service = + new WebService( + Admin.of(arg.configs()), arg.port, arg::jmxPortMapping, arg.beanExpiration)) { if (arg.ttl == null) { System.out.println("enter ctrl + c to terminate web service"); TimeUnit.MILLISECONDS.sleep(Long.MAX_VALUE); @@ -118,5 +159,12 @@ int jmxPortMapping(int brokerId) { validateWith = DurationField.class, converter = DurationField.class) Duration ttl = null; + + @Parameter( + names = {"--bean.expiration"}, + description = "Duration: the life of collected metrics", + validateWith = DurationField.class, + converter = DurationField.class) + Duration beanExpiration = Duration.ofHours(1); } } diff --git a/app/src/test/java/org/astraea/app/web/BeanHandlerTest.java b/app/src/test/java/org/astraea/app/web/MetricSensorHandlerTest.java similarity index 83% rename from app/src/test/java/org/astraea/app/web/BeanHandlerTest.java rename to app/src/test/java/org/astraea/app/web/MetricSensorHandlerTest.java index ebf01eae77..7a1abcf116 100644 --- a/app/src/test/java/org/astraea/app/web/BeanHandlerTest.java +++ b/app/src/test/java/org/astraea/app/web/MetricSensorHandlerTest.java @@ -18,6 +18,7 @@ import java.time.Duration; import java.util.Map; +import java.util.Set; import org.astraea.common.Utils; import org.astraea.common.admin.Admin; import org.astraea.it.Service; @@ -25,7 +26,7 @@ import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; -public class BeanHandlerTest { +public class MetricSensorHandlerTest { private static final Service SERVICE = Service.builder().numberOfBrokers(3).build(); @@ -40,21 +41,23 @@ void testBeans() { try (var admin = Admin.of(SERVICE.bootstrapServers())) { admin.creator().topic(topic).numberOfPartitions(10).run().toCompletableFuture().join(); Utils.sleep(Duration.ofSeconds(2)); - var handler = new BeanHandler(admin, name -> SERVICE.jmxServiceURL().getPort()); + var handler = + new MetricSensorHandler(admin, name -> SERVICE.jmxServiceURL().getPort(), Set.of()); var response = Assertions.assertInstanceOf( - BeanHandler.NodeBeans.class, handler.get(Channel.EMPTY).toCompletableFuture().join()); + MetricSensorHandler.NodeBeans.class, + handler.get(Channel.EMPTY).toCompletableFuture().join()); Assertions.assertNotEquals(0, response.nodeBeans.size()); var response1 = Assertions.assertInstanceOf( - BeanHandler.NodeBeans.class, + MetricSensorHandler.NodeBeans.class, handler.get(Channel.ofTarget("kafka.server")).toCompletableFuture().join()); Assertions.assertNotEquals(0, response1.nodeBeans.size()); var response2 = Assertions.assertInstanceOf( - BeanHandler.NodeBeans.class, + MetricSensorHandler.NodeBeans.class, handler.get(Channel.ofQueries(Map.of("topic", topic))).toCompletableFuture().join()); Assertions.assertNotEquals(0, response2.nodeBeans.size()); } From 89fd783e974d182b510045b1a8844013d73f2a0d Mon Sep 17 00:00:00 2001 From: qoo332001 Date: Fri, 21 Apr 2023 23:25:40 +0800 Subject: [PATCH 2/6] add test and fix issues --- .../java/org/astraea/app/web/BeanHandler.java | 115 +++++++++++++++++ .../astraea/app/web/MetricSensorHandler.java | 119 +++--------------- .../java/org/astraea/app/web/WebService.java | 3 +- .../astraea/app/web/BalancerHandlerTest.java | 75 ++++++++--- .../org/astraea/app/web/BeanHandlerTest.java | 62 +++++++++ .../app/web/MetricSensorHandlerTest.java | 47 ++++--- .../org/astraea/app/web/TopicHandlerTest.java | 5 +- .../org/astraea/app/web/WebServiceTest.java | 3 +- 8 files changed, 283 insertions(+), 146 deletions(-) create mode 100644 app/src/main/java/org/astraea/app/web/BeanHandler.java create mode 100644 app/src/test/java/org/astraea/app/web/BeanHandlerTest.java diff --git a/app/src/main/java/org/astraea/app/web/BeanHandler.java b/app/src/main/java/org/astraea/app/web/BeanHandler.java new file mode 100644 index 0000000000..26dd4edc9f --- /dev/null +++ b/app/src/main/java/org/astraea/app/web/BeanHandler.java @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.astraea.app.web; + +import java.util.List; +import java.util.concurrent.CompletionStage; +import java.util.function.Function; +import java.util.stream.Collectors; +import org.astraea.common.admin.Admin; +import org.astraea.common.metrics.BeanObject; +import org.astraea.common.metrics.BeanQuery; +import org.astraea.common.metrics.MBeanClient; + +public class BeanHandler implements Handler { + private final Admin admin; + private final Function jmxPorts; + + BeanHandler(Admin admin, Function jmxPorts) { + this.admin = admin; + this.jmxPorts = jmxPorts; + } + + @Override + public CompletionStage get(Channel channel) { + var builder = BeanQuery.builder().usePropertyListPattern().properties(channel.queries()); + return admin + .brokers() + .thenApply( + brokers -> + new NodeBeans( + brokers.stream() + .map( + b -> { + try (var client = + MBeanClient.jndi(b.host(), jmxPorts.apply(b.id()))) { + return new NodeBean( + b.host(), + client.beans(builder.build()).stream() + .map(Bean::new) + .collect(Collectors.toUnmodifiableList())); + } + }) + .collect(Collectors.toUnmodifiableList()))); + } + + static class Property implements Response { + final String key; + final String value; + + Property(String key, String value) { + this.key = key; + this.value = value; + } + } + + static class Attribute implements Response { + final String key; + final String value; + + Attribute(String key, String value) { + this.key = key; + this.value = value; + } + } + + static class Bean implements Response { + final String domainName; + final List properties; + final List attributes; + + Bean(BeanObject obj) { + this.domainName = obj.domainName(); + this.properties = + obj.properties().entrySet().stream() + .map(e -> new Property(e.getKey(), e.getValue())) + .collect(Collectors.toUnmodifiableList()); + this.attributes = + obj.attributes().entrySet().stream() + .map(e -> new Attribute(e.getKey(), e.getValue().toString())) + .collect(Collectors.toUnmodifiableList()); + } + } + + static class NodeBean implements Response { + final String host; + final List beans; + + NodeBean(String host, List beans) { + this.host = host; + this.beans = beans; + } + } + + static class NodeBeans implements Response { + final List nodeBeans; + + NodeBeans(List nodeBeans) { + this.nodeBeans = nodeBeans; + } + } +} diff --git a/app/src/main/java/org/astraea/app/web/MetricSensorHandler.java b/app/src/main/java/org/astraea/app/web/MetricSensorHandler.java index dd46ceeb0c..9f5f7f3718 100644 --- a/app/src/main/java/org/astraea/app/web/MetricSensorHandler.java +++ b/app/src/main/java/org/astraea/app/web/MetricSensorHandler.java @@ -17,64 +17,44 @@ package org.astraea.app.web; import java.util.Collection; -import java.util.List; import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; -import java.util.function.Function; import java.util.stream.Collectors; import org.astraea.common.Configuration; import org.astraea.common.Utils; -import org.astraea.common.admin.Admin; import org.astraea.common.cost.CostFunction; import org.astraea.common.json.TypeRef; -import org.astraea.common.metrics.BeanObject; -import org.astraea.common.metrics.BeanQuery; -import org.astraea.common.metrics.MBeanClient; import org.astraea.common.metrics.collector.MetricSensor; public class MetricSensorHandler implements Handler { - private final Admin admin; - private final Function jmxPorts; + private final Collection sensors; + private static final Set DEFAULT_COSTS = + Set.of( + "org.astraea.common.cost.ReplicaLeaderCost", + "org.astraea.common.cost.NetworkIngressCost"); - MetricSensorHandler( - Admin admin, Function jmxPorts, Collection sensors) { - this.admin = admin; - this.jmxPorts = jmxPorts; + MetricSensorHandler(Collection sensors) { this.sensors = sensors; } @Override - public CompletionStage post(Channel channel) { + public CompletionStage get(Channel channel) { + var costs = + sensors.isEmpty() + ? DEFAULT_COSTS + : sensors.stream().map(x -> x.getClass().getName()).collect(Collectors.toSet()); + return CompletableFuture.completedFuture(new Response(costs)); + } + + @Override + public CompletionStage post(Channel channel) { var metricSensorPostRequest = channel.request(TypeRef.of(MetricSensorPostRequest.class)); var costs = costs(metricSensorPostRequest.costs); if (!costs.isEmpty()) sensors.clear(); costs.forEach(costFunction -> costFunction.metricSensor().ifPresent(sensors::add)); - return CompletableFuture.completedFuture(new PostResponse(metricSensorPostRequest.costs)); - } - - @Override - public CompletionStage get(Channel channel) { - var builder = BeanQuery.builder().usePropertyListPattern().properties(channel.queries()); - return admin - .brokers() - .thenApply( - brokers -> - new NodeBeans( - brokers.stream() - .map( - b -> { - try (var client = - MBeanClient.jndi(b.host(), jmxPorts.apply(b.id()))) { - return new NodeBean( - b.host(), - client.beans(builder.build()).stream() - .map(Bean::new) - .collect(Collectors.toUnmodifiableList())); - } - }) - .collect(Collectors.toUnmodifiableList()))); + return CompletableFuture.completedFuture(new Response(metricSensorPostRequest.costs)); } private static Set costs(Set costs) { @@ -82,73 +62,14 @@ private static Set costs(Set costs) { return Utils.costFunctions(costs, CostFunction.class, Configuration.EMPTY); } - static class Property implements Response { - final String key; - final String value; - - Property(String key, String value) { - this.key = key; - this.value = value; - } - } - - static class Attribute implements Response { - final String key; - final String value; - - Attribute(String key, String value) { - this.key = key; - this.value = value; - } - } - - static class Bean implements Response { - final String domainName; - final List properties; - final List attributes; - - Bean(BeanObject obj) { - this.domainName = obj.domainName(); - this.properties = - obj.properties().entrySet().stream() - .map(e -> new Property(e.getKey(), e.getValue())) - .collect(Collectors.toUnmodifiableList()); - this.attributes = - obj.attributes().entrySet().stream() - .map(e -> new Attribute(e.getKey(), e.getValue().toString())) - .collect(Collectors.toUnmodifiableList()); - } - } - - static class NodeBean implements Response { - final String host; - final List beans; - - NodeBean(String host, List beans) { - this.host = host; - this.beans = beans; - } - } - - static class NodeBeans implements Response { - final List nodeBeans; - - NodeBeans(List nodeBeans) { - this.nodeBeans = nodeBeans; - } - } - static class MetricSensorPostRequest implements Request { - Set costs = - Set.of( - "org.astraea.common.cost.ReplicaLeaderCost", - "org.astraea.common.cost.NetworkIngressCost"); + Set costs = DEFAULT_COSTS; } - static class PostResponse implements Response { + static class Response implements org.astraea.app.web.Response { final Set costs; - PostResponse(Set costs) { + Response(Set costs) { this.costs = costs; } } diff --git a/app/src/main/java/org/astraea/app/web/WebService.java b/app/src/main/java/org/astraea/app/web/WebService.java index 05b000a2a4..18c43ad7c6 100644 --- a/app/src/main/java/org/astraea/app/web/WebService.java +++ b/app/src/main/java/org/astraea/app/web/WebService.java @@ -83,7 +83,8 @@ public WebService( server.createContext("/producers", to(new ProducerHandler(admin))); server.createContext("/quotas", to(new QuotaHandler(admin))); server.createContext("/transactions", to(new TransactionHandler(admin))); - server.createContext("/beans", to(new MetricSensorHandler(admin, brokerIdToJmxPort, sensors))); + server.createContext("/beans", to(new BeanHandler(admin, brokerIdToJmxPort))); + server.createContext("/metricSensors", to(new MetricSensorHandler(sensors))); server.createContext("/records", to(new RecordHandler(admin))); server.createContext("/reassignments", to(new ReassignmentHandler(admin))); server.createContext("/balancer", to(new BalancerHandler(admin, metricStore))); diff --git a/app/src/test/java/org/astraea/app/web/BalancerHandlerTest.java b/app/src/test/java/org/astraea/app/web/BalancerHandlerTest.java index 0aed00deaa..bb5e2d50b0 100644 --- a/app/src/test/java/org/astraea/app/web/BalancerHandlerTest.java +++ b/app/src/test/java/org/astraea/app/web/BalancerHandlerTest.java @@ -47,6 +47,7 @@ import java.util.concurrent.atomic.LongAdder; import java.util.function.Consumer; import java.util.function.Function; +import java.util.function.Supplier; import java.util.regex.Pattern; import java.util.stream.Collectors; import java.util.stream.IntStream; @@ -76,7 +77,9 @@ import org.astraea.common.cost.ReplicaLeaderCost; import org.astraea.common.json.JsonConverter; import org.astraea.common.json.TypeRef; +import org.astraea.common.metrics.MBeanClient; import org.astraea.common.metrics.collector.MetricSensor; +import org.astraea.common.metrics.collector.MetricStore; import org.astraea.common.metrics.platform.HostMetrics; import org.astraea.common.metrics.platform.JvmMemory; import org.astraea.common.producer.Producer; @@ -92,7 +95,8 @@ public class BalancerHandlerTest { - private static final Service SERVICE = Service.builder().numberOfBrokers(3).build(); + private static final int numberOfBrokers = 3; + private static final Service SERVICE = Service.builder().numberOfBrokers(numberOfBrokers).build(); @AfterAll static void closeService() { @@ -115,8 +119,8 @@ static void closeService() { @Timeout(value = 60) void testReport() { var topics = createAndProduceTopic(3); - try (var admin = Admin.of(SERVICE.bootstrapServers()); - var handler = new BalancerHandler(admin, id -> SERVICE.jmxServiceURL().getPort())) { + try (var admin = Admin.of(SERVICE.bootstrapServers())) { + var handler = new BalancerHandler(admin, metricStore(admin, List.of())); // make sure all replicas have admin .clusterInfo(Set.copyOf(topics)) @@ -309,7 +313,7 @@ void testBestPlan() { void testMoveCost(String leaderLimit, String sizeLimit) { createAndProduceTopic(3); try (var admin = Admin.of(SERVICE.bootstrapServers()); - var handler = new BalancerHandler(admin, id -> SERVICE.jmxServiceURL().getPort())) { + var handler = new BalancerHandler(admin, metricStore(admin, List.of()))) { var request = new BalancerHandler.BalancerPostRequest(); request.moveCosts = Set.of( @@ -356,7 +360,7 @@ void testMoveCost(String leaderLimit, String sizeLimit) { void testNoReport() { var topic = Utils.randomString(10); try (var admin = Admin.of(SERVICE.bootstrapServers()); - var handler = new BalancerHandler(admin, id -> SERVICE.jmxServiceURL().getPort())) { + var handler = new BalancerHandler(admin, metricStore(admin, List.of()))) { admin.creator().topic(topic).numberOfPartitions(1).run().toCompletableFuture().join(); Utils.sleep(Duration.ofSeconds(1)); var post = @@ -404,7 +408,7 @@ void testPut() { // arrange createAndProduceTopic(3, 10, (short) 2, false); try (var admin = Admin.of(SERVICE.bootstrapServers()); - var handler = new BalancerHandler(admin, id -> SERVICE.jmxServiceURL().getPort())) { + var handler = new BalancerHandler(admin, metricStore(admin, List.of()))) { var request = new BalancerHandler.BalancerPostRequest(); request.balancerConfig = Map.of("iteration", "100"); var progress = submitPlanGeneration(handler, request); @@ -433,7 +437,7 @@ void testPut() { void testBadPut() { createAndProduceTopic(3); try (var admin = Admin.of(SERVICE.bootstrapServers()); - var handler = new BalancerHandler(admin, id -> SERVICE.jmxServiceURL().getPort())) { + var handler = new BalancerHandler(admin, metricStore(admin, List.of()))) { // no id offered Assertions.assertThrows( @@ -454,7 +458,7 @@ void testBadPut() { void testSubmitRebalancePlanThreadSafe() { var topic = Utils.randomString(); try (var admin = Admin.of(SERVICE.bootstrapServers()); - var handler = new BalancerHandler(admin, id -> SERVICE.jmxServiceURL().getPort())) { + var handler = new BalancerHandler(admin, metricStore(admin, List.of()))) { admin.creator().topic(topic).numberOfPartitions(30).run().toCompletableFuture().join(); Utils.sleep(Duration.ofSeconds(3)); admin @@ -502,7 +506,7 @@ void testSubmitRebalancePlanThreadSafe() { @Timeout(value = 60) void testRebalanceDetectOngoing() { try (var admin = Admin.of(SERVICE.bootstrapServers()); - var handler = new BalancerHandler(admin, id -> SERVICE.jmxServiceURL().getPort())) { + var handler = new BalancerHandler(admin, metricStore(admin, List.of()))) { // create topic var theTopic = Utils.randomString(); admin.creator().topic(theTopic).numberOfPartitions(1).run().toCompletableFuture().join(); @@ -576,7 +580,7 @@ void testGenerationDetectOngoing() { .thenAnswer(invoke -> CompletableFuture.completedFuture(Set.of("A", "B", "C"))); Mockito.when(admin.clusterInfo(Mockito.any())) .thenAnswer(invoke -> CompletableFuture.completedFuture(clusterHasFuture)); - try (var handler = new BalancerHandler(admin, id -> SERVICE.jmxServiceURL().getPort())) { + try (var handler = new BalancerHandler(admin, metricStore(admin, List.of()))) { var task0 = (BalancerHandler.PostPlanResponse) handler.post(defaultPostPlan).toCompletableFuture().join(); @@ -621,7 +625,7 @@ void testGenerationDetectOngoing() { void testPutSanityCheck() { var topic = createAndProduceTopic(1).iterator().next(); try (var admin = Admin.of(SERVICE.bootstrapServers()); - var handler = new BalancerHandler(admin, id -> SERVICE.jmxServiceURL().getPort())) { + var handler = new BalancerHandler(admin, metricStore(admin, List.of()))) { var request = new BalancerHandler.BalancerPostRequest(); request.balancerConfig = Map.of(BalancerConfigs.BALANCER_ALLOWED_TOPICS_REGEX, Pattern.quote(topic)); @@ -661,7 +665,7 @@ void testPutSanityCheck() { void testLookupRebalanceProgress() { createAndProduceTopic(3); try (var admin = Admin.of(SERVICE.bootstrapServers()); - var handler = new BalancerHandler(admin, id -> SERVICE.jmxServiceURL().getPort())) { + var handler = new BalancerHandler(admin, metricStore(admin, List.of()))) { var progress = submitPlanGeneration(handler, new BalancerPostRequest()); Assertions.assertEquals(Searched, progress.phase); @@ -715,7 +719,7 @@ void testLookupRebalanceProgress() { void testLookupBadExecutionProgress() { createAndProduceTopic(3); try (var admin = Admin.of(SERVICE.bootstrapServers()); - var handler = new BalancerHandler(admin, id -> SERVICE.jmxServiceURL().getPort())) { + var handler = new BalancerHandler(admin, metricStore(admin, List.of()))) { var post = Assertions.assertInstanceOf( BalancerHandler.PostPlanResponse.class, @@ -769,7 +773,7 @@ void testLookupBadExecutionProgress() { void testBadLookupRequest() { createAndProduceTopic(3); try (var admin = Admin.of(SERVICE.bootstrapServers()); - var handler = new BalancerHandler(admin, id -> SERVICE.jmxServiceURL().getPort())) { + var handler = new BalancerHandler(admin, metricStore(admin, List.of()))) { Assertions.assertEquals( 404, handler.get(Channel.ofTarget("no such plan")).toCompletableFuture().join().code()); @@ -786,7 +790,7 @@ void testBadLookupRequest() { void testPutIdempotent() { var topics = createAndProduceTopic(3); try (var admin = Admin.of(SERVICE.bootstrapServers()); - var handler = new BalancerHandler(admin, id -> SERVICE.jmxServiceURL().getPort())) { + var handler = new BalancerHandler(admin, metricStore(admin, List.of()))) { var request = new BalancerHandler.BalancerPostRequest(); request.balancerConfig = Map.of( @@ -893,7 +897,7 @@ void testTimeout() { createAndProduceTopic(5); var costFunction = Collections.singleton(costWeight(TimeoutCost.class.getName(), 1)); try (var admin = Admin.of(SERVICE.bootstrapServers()); - var handler = new BalancerHandler(admin, (ignore) -> SERVICE.jmxServiceURL().getPort())) { + var handler = new BalancerHandler(admin, metricStore(admin, List.of()))) { var channel = httpRequest(Map.of(TIMEOUT_KEY, "10", CLUSTER_COSTS_KEY, costFunction)); var post = (BalancerHandler.PostPlanResponse) handler.post(channel).toCompletableFuture().join(); @@ -911,8 +915,9 @@ void testTimeout() { @Test void testCostWithSensor() { var topics = createAndProduceTopic(3); + var function = List.of(costWeight(SensorAndCost.class.getName(), 1)); try (var admin = Admin.of(SERVICE.bootstrapServers()); - var handler = new BalancerHandler(admin, (ignore) -> SERVICE.jmxServiceURL().getPort())) { + var handler = new BalancerHandler(admin, metricStore(admin, function))) { var invoked = new AtomicBoolean(); SensorAndCost.callback.set( (clusterBean) -> { @@ -927,7 +932,6 @@ void testCostWithSensor() { metrics.forEach(i -> Assertions.assertInstanceOf(JvmMemory.class, i)); invoked.set(true); }); - var function = List.of(costWeight(SensorAndCost.class.getName(), 1)); var request = new BalancerHandler.BalancerPostRequest(); request.timeout = Duration.ofSeconds(15); @@ -1073,7 +1077,7 @@ void testChangeOrder() { void testExecutorConfig() { var topic = createAndProduceTopic(1).iterator().next(); try (var admin = Admin.of(SERVICE.bootstrapServers()); - var handler = new BalancerHandler(admin, id -> SERVICE.jmxServiceURL().getPort())) { + var handler = new BalancerHandler(admin, metricStore(admin, List.of()))) { var request = new BalancerHandler.BalancerPostRequest(); request.balancerConfig = Map.of(BalancerConfigs.BALANCER_ALLOWED_TOPICS_REGEX, topic); var theProgress = submitPlanGeneration(handler, request); @@ -1107,7 +1111,7 @@ void testExecutorConfig() { void testBalancerConfig() { createAndProduceTopic(1); try (var admin = Admin.of(SERVICE.bootstrapServers()); - var handler = new BalancerHandler(admin, id -> SERVICE.jmxServiceURL().getPort())) { + var handler = new BalancerHandler(admin, metricStore(admin, List.of()))) { var request = new BalancerPostRequest(); request.balancer = SpyBalancer.class.getName(); request.balancerConfig = @@ -1350,4 +1354,35 @@ void testJsonToBalancerPostRequest() { Assertions.assertThrows(IllegalArgumentException.class, noCostRequest::clusterCost); } + + private MetricStore metricStore(Admin admin, List costWeights) { + Function brokerIdToJmxPort = (id) -> SERVICE.jmxServiceURL().getPort(); + Supplier>> clientSupplier = + () -> + admin + .brokers() + .thenApply( + brokers -> + brokers.stream() + .collect( + Collectors.toUnmodifiableMap( + NodeInfo::id, + b -> + MBeanClient.jndi( + b.host(), brokerIdToJmxPort.apply(b.id()))))); + var cw = costWeights.stream().map(x -> x.cost).collect(Collectors.toSet()); + var cf = Utils.costFunctions(cw, HasClusterCost.class, Configuration.EMPTY); + var metricSensors = cf.stream().map(c -> c.metricSensor().get()).collect(Collectors.toList()); + return MetricStore.builder() + .beanExpiration(Duration.ofMinutes(2)) + .localReceiver(clientSupplier) + .sensorsSupplier( + () -> + metricSensors.stream() + .distinct() + .collect( + Collectors.toUnmodifiableMap( + Function.identity(), ignored -> (id, ee) -> {}))) + .build(); + } } diff --git a/app/src/test/java/org/astraea/app/web/BeanHandlerTest.java b/app/src/test/java/org/astraea/app/web/BeanHandlerTest.java new file mode 100644 index 0000000000..ebf01eae77 --- /dev/null +++ b/app/src/test/java/org/astraea/app/web/BeanHandlerTest.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.astraea.app.web; + +import java.time.Duration; +import java.util.Map; +import org.astraea.common.Utils; +import org.astraea.common.admin.Admin; +import org.astraea.it.Service; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +public class BeanHandlerTest { + + private static final Service SERVICE = Service.builder().numberOfBrokers(3).build(); + + @AfterAll + static void closeService() { + SERVICE.close(); + } + + @Test + void testBeans() { + var topic = Utils.randomString(10); + try (var admin = Admin.of(SERVICE.bootstrapServers())) { + admin.creator().topic(topic).numberOfPartitions(10).run().toCompletableFuture().join(); + Utils.sleep(Duration.ofSeconds(2)); + var handler = new BeanHandler(admin, name -> SERVICE.jmxServiceURL().getPort()); + var response = + Assertions.assertInstanceOf( + BeanHandler.NodeBeans.class, handler.get(Channel.EMPTY).toCompletableFuture().join()); + Assertions.assertNotEquals(0, response.nodeBeans.size()); + + var response1 = + Assertions.assertInstanceOf( + BeanHandler.NodeBeans.class, + handler.get(Channel.ofTarget("kafka.server")).toCompletableFuture().join()); + Assertions.assertNotEquals(0, response1.nodeBeans.size()); + + var response2 = + Assertions.assertInstanceOf( + BeanHandler.NodeBeans.class, + handler.get(Channel.ofQueries(Map.of("topic", topic))).toCompletableFuture().join()); + Assertions.assertNotEquals(0, response2.nodeBeans.size()); + } + } +} diff --git a/app/src/test/java/org/astraea/app/web/MetricSensorHandlerTest.java b/app/src/test/java/org/astraea/app/web/MetricSensorHandlerTest.java index 7a1abcf116..83e2c18fce 100644 --- a/app/src/test/java/org/astraea/app/web/MetricSensorHandlerTest.java +++ b/app/src/test/java/org/astraea/app/web/MetricSensorHandlerTest.java @@ -17,49 +17,48 @@ package org.astraea.app.web; import java.time.Duration; -import java.util.Map; -import java.util.Set; +import java.util.HashSet; import org.astraea.common.Utils; import org.astraea.common.admin.Admin; +import org.astraea.common.metrics.collector.MetricSensor; import org.astraea.it.Service; -import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; -public class MetricSensorHandlerTest { - +class MetricSensorHandlerTest { private static final Service SERVICE = Service.builder().numberOfBrokers(3).build(); - @AfterAll - static void closeService() { - SERVICE.close(); - } - @Test void testBeans() { var topic = Utils.randomString(10); try (var admin = Admin.of(SERVICE.bootstrapServers())) { admin.creator().topic(topic).numberOfPartitions(10).run().toCompletableFuture().join(); Utils.sleep(Duration.ofSeconds(2)); - var handler = - new MetricSensorHandler(admin, name -> SERVICE.jmxServiceURL().getPort(), Set.of()); - var response = + var sensors = new HashSet(); + var defaultCostHandler = new MetricSensorHandler(sensors); + var defaultCostResponse = Assertions.assertInstanceOf( - MetricSensorHandler.NodeBeans.class, - handler.get(Channel.EMPTY).toCompletableFuture().join()); - Assertions.assertNotEquals(0, response.nodeBeans.size()); + MetricSensorHandler.Response.class, + defaultCostHandler.get(Channel.EMPTY).toCompletableFuture().join()); + Assertions.assertEquals(2, defaultCostResponse.costs.size()); - var response1 = + var changedCostResponse = Assertions.assertInstanceOf( - MetricSensorHandler.NodeBeans.class, - handler.get(Channel.ofTarget("kafka.server")).toCompletableFuture().join()); - Assertions.assertNotEquals(0, response1.nodeBeans.size()); + MetricSensorHandler.Response.class, + defaultCostHandler + .post( + Channel.builder() + .request("{\"costs\": [\"org.astraea.common.cost.ReplicaLeaderCost\"]}") + .build()) + .toCompletableFuture() + .join()); + Assertions.assertEquals(1, changedCostResponse.costs.size()); - var response2 = + var changedCostGetResponse = Assertions.assertInstanceOf( - MetricSensorHandler.NodeBeans.class, - handler.get(Channel.ofQueries(Map.of("topic", topic))).toCompletableFuture().join()); - Assertions.assertNotEquals(0, response2.nodeBeans.size()); + MetricSensorHandler.Response.class, + defaultCostHandler.get(Channel.EMPTY).toCompletableFuture().join()); + Assertions.assertEquals(1, changedCostGetResponse.costs.size()); } } } diff --git a/app/src/test/java/org/astraea/app/web/TopicHandlerTest.java b/app/src/test/java/org/astraea/app/web/TopicHandlerTest.java index d20ec12320..6aa023deff 100644 --- a/app/src/test/java/org/astraea/app/web/TopicHandlerTest.java +++ b/app/src/test/java/org/astraea/app/web/TopicHandlerTest.java @@ -66,7 +66,10 @@ void testWithWebService() { try (var service = new WebService( - Admin.of(SERVICE.bootstrapServers()), 0, id -> SERVICE.jmxServiceURL().getPort())) { + Admin.of(SERVICE.bootstrapServers()), + 0, + id -> SERVICE.jmxServiceURL().getPort(), + Duration.ofMillis(5))) { Response response = HttpExecutor.builder() .build() diff --git a/app/src/test/java/org/astraea/app/web/WebServiceTest.java b/app/src/test/java/org/astraea/app/web/WebServiceTest.java index ac284c1713..d40700a0b8 100644 --- a/app/src/test/java/org/astraea/app/web/WebServiceTest.java +++ b/app/src/test/java/org/astraea/app/web/WebServiceTest.java @@ -16,6 +16,7 @@ */ package org.astraea.app.web; +import java.time.Duration; import java.util.concurrent.ThreadLocalRandom; import org.astraea.app.argument.Argument; import org.astraea.common.admin.Admin; @@ -39,7 +40,7 @@ void testArgument() { @Timeout(10) @Test void testClose() { - var web = new WebService(Mockito.mock(Admin.class), 0, id -> -1); + var web = new WebService(Mockito.mock(Admin.class), 0, id -> -1, Duration.ofMillis(5)); web.close(); } From 8ac027fa7af9e3fd3a4f5c09a3b80e3795672f99 Mon Sep 17 00:00:00 2001 From: qoo332001 Date: Sat, 22 Apr 2023 19:48:31 +0800 Subject: [PATCH 3/6] fix issues --- .../main/java/org/astraea/app/web/MetricSensorHandler.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/app/src/main/java/org/astraea/app/web/MetricSensorHandler.java b/app/src/main/java/org/astraea/app/web/MetricSensorHandler.java index 9f5f7f3718..db006e7506 100644 --- a/app/src/main/java/org/astraea/app/web/MetricSensorHandler.java +++ b/app/src/main/java/org/astraea/app/web/MetricSensorHandler.java @@ -40,7 +40,7 @@ public class MetricSensorHandler implements Handler { } @Override - public CompletionStage get(Channel channel) { + public CompletionStage get(Channel channel) { var costs = sensors.isEmpty() ? DEFAULT_COSTS @@ -49,10 +49,10 @@ public CompletionStage get(Channel channel) { } @Override - public CompletionStage post(Channel channel) { + public CompletionStage post(Channel channel) { var metricSensorPostRequest = channel.request(TypeRef.of(MetricSensorPostRequest.class)); var costs = costs(metricSensorPostRequest.costs); - if (!costs.isEmpty()) sensors.clear(); + sensors.clear(); costs.forEach(costFunction -> costFunction.metricSensor().ifPresent(sensors::add)); return CompletableFuture.completedFuture(new Response(metricSensorPostRequest.costs)); } From a785edc07bf051ee4a9e67edd98613730bf73c39 Mon Sep 17 00:00:00 2001 From: qoo332001 Date: Sun, 23 Apr 2023 23:08:32 +0800 Subject: [PATCH 4/6] fix issues --- .../astraea/app/web/MetricSensorHandler.java | 19 +++--- .../java/org/astraea/app/web/WebService.java | 26 ++++++++- .../app/web/MetricSensorHandlerTest.java | 4 +- docs/web_server/README.md | 1 + .../web_api_metricSensors_chinese.md | 58 +++++++++++++++++++ 5 files changed, 93 insertions(+), 15 deletions(-) create mode 100644 docs/web_server/web_api_metricSensors_chinese.md diff --git a/app/src/main/java/org/astraea/app/web/MetricSensorHandler.java b/app/src/main/java/org/astraea/app/web/MetricSensorHandler.java index db006e7506..0aa6344436 100644 --- a/app/src/main/java/org/astraea/app/web/MetricSensorHandler.java +++ b/app/src/main/java/org/astraea/app/web/MetricSensorHandler.java @@ -16,35 +16,36 @@ */ package org.astraea.app.web; -import java.util.Collection; import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; import java.util.stream.Collectors; +import org.astraea.app.web.WebService.MetricSensors; import org.astraea.common.Configuration; import org.astraea.common.Utils; import org.astraea.common.cost.CostFunction; import org.astraea.common.json.TypeRef; -import org.astraea.common.metrics.collector.MetricSensor; public class MetricSensorHandler implements Handler { - private final Collection sensors; + private final MetricSensors metricSensors; private static final Set DEFAULT_COSTS = Set.of( "org.astraea.common.cost.ReplicaLeaderCost", "org.astraea.common.cost.NetworkIngressCost"); - MetricSensorHandler(Collection sensors) { - this.sensors = sensors; + MetricSensorHandler(MetricSensors metricSensors) { + this.metricSensors = metricSensors; } @Override public CompletionStage get(Channel channel) { var costs = - sensors.isEmpty() + metricSensors.metricSensors().isEmpty() ? DEFAULT_COSTS - : sensors.stream().map(x -> x.getClass().getName()).collect(Collectors.toSet()); + : metricSensors.metricSensors().stream() + .map(x -> x.getClass().getName()) + .collect(Collectors.toSet()); return CompletableFuture.completedFuture(new Response(costs)); } @@ -52,8 +53,8 @@ public CompletionStage get(Channel channel) { public CompletionStage post(Channel channel) { var metricSensorPostRequest = channel.request(TypeRef.of(MetricSensorPostRequest.class)); var costs = costs(metricSensorPostRequest.costs); - sensors.clear(); - costs.forEach(costFunction -> costFunction.metricSensor().ifPresent(sensors::add)); + metricSensors.clearSensors(); + costs.forEach(costFunction -> costFunction.metricSensor().ifPresent(metricSensors::addSensors)); return CompletableFuture.completedFuture(new Response(metricSensorPostRequest.costs)); } diff --git a/app/src/main/java/org/astraea/app/web/WebService.java b/app/src/main/java/org/astraea/app/web/WebService.java index 18c43ad7c6..edf0c9ca58 100644 --- a/app/src/main/java/org/astraea/app/web/WebService.java +++ b/app/src/main/java/org/astraea/app/web/WebService.java @@ -43,7 +43,7 @@ public class WebService implements AutoCloseable { private final HttpServer server; private final Admin admin; - private final Collection sensors = new ConcurrentLinkedQueue<>(); + private final MetricSensors metricSensors = new MetricSensors(); public WebService( Admin admin, @@ -70,7 +70,7 @@ public WebService( .localReceiver(clientSupplier) .sensorsSupplier( () -> - sensors.stream() + metricSensors.metricSensors().stream() .distinct() .collect( Collectors.toUnmodifiableMap( @@ -84,7 +84,7 @@ public WebService( server.createContext("/quotas", to(new QuotaHandler(admin))); server.createContext("/transactions", to(new TransactionHandler(admin))); server.createContext("/beans", to(new BeanHandler(admin, brokerIdToJmxPort))); - server.createContext("/metricSensors", to(new MetricSensorHandler(sensors))); + server.createContext("/metricSensors", to(new MetricSensorHandler(metricSensors))); server.createContext("/records", to(new RecordHandler(admin))); server.createContext("/reassignments", to(new ReassignmentHandler(admin))); server.createContext("/balancer", to(new BalancerHandler(admin, metricStore))); @@ -168,4 +168,24 @@ int jmxPortMapping(int brokerId) { converter = DurationField.class) Duration beanExpiration = Duration.ofHours(1); } + + static class MetricSensors { + private final Collection sensors; + + MetricSensors() { + sensors = new ConcurrentLinkedQueue<>(); + } + + Collection metricSensors() { + return sensors; + } + + void clearSensors() { + sensors.clear(); + } + + void addSensors(MetricSensor metricSensor) { + sensors.add(metricSensor); + } + } } diff --git a/app/src/test/java/org/astraea/app/web/MetricSensorHandlerTest.java b/app/src/test/java/org/astraea/app/web/MetricSensorHandlerTest.java index 83e2c18fce..d289fce194 100644 --- a/app/src/test/java/org/astraea/app/web/MetricSensorHandlerTest.java +++ b/app/src/test/java/org/astraea/app/web/MetricSensorHandlerTest.java @@ -17,10 +17,8 @@ package org.astraea.app.web; import java.time.Duration; -import java.util.HashSet; import org.astraea.common.Utils; import org.astraea.common.admin.Admin; -import org.astraea.common.metrics.collector.MetricSensor; import org.astraea.it.Service; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; @@ -34,7 +32,7 @@ void testBeans() { try (var admin = Admin.of(SERVICE.bootstrapServers())) { admin.creator().topic(topic).numberOfPartitions(10).run().toCompletableFuture().join(); Utils.sleep(Duration.ofSeconds(2)); - var sensors = new HashSet(); + var sensors = new WebService.MetricSensors(); var defaultCostHandler = new MetricSensorHandler(sensors); var defaultCostResponse = Assertions.assertInstanceOf( diff --git a/docs/web_server/README.md b/docs/web_server/README.md index 35052dba79..d311cc75f4 100644 --- a/docs/web_server/README.md +++ b/docs/web_server/README.md @@ -21,6 +21,7 @@ Astraea 建立了一套 Web Server 服務,使用者可以透過簡易好上手 - [/quotas](./web_api_quotas_chinese.md) - [/transactions](./web_api_transactions_chinese.md) - [/beans](./web_api_beans_chinese.md) +- [/metricSensors](./web_api_metricSensors_chinese.md) - [/reassignments](./web_api_reassignments_chinese.md) - [/records](./web_api_records_chinese.md) - [/balancer](./web_api_balancer_chinese.md) diff --git a/docs/web_server/web_api_metricSensors_chinese.md b/docs/web_server/web_api_metricSensors_chinese.md new file mode 100644 index 0000000000..94d4c84040 --- /dev/null +++ b/docs/web_server/web_api_metricSensors_chinese.md @@ -0,0 +1,58 @@ +/metricSensors +=== + +- [指定MetricSensors](#指定-MetricSensors) +- [查詢已指定的 MetricSensors](#查詢已指定的-MetricSensors) + +## 指定 MetricSensors +```shell +GET /metricSensors +``` + +cURL 範例 +```shell +curl -X POST http://localhost:8001/metricSensors \ + -H "Content-Type: application/json" \ + -d '{ + "costs": [ + "org.astraea.common.cost.ReplicaLeaderCost", + "org.astraea.common.cost.NetworkIngressCost", + "org.astraea.common.cost.NetworkEgressCost" + ] + }' +``` + +JSON Response 範例 +- `costs`: 目前已經註冊的`MetricSensors`之`Costfunction`,`MetricStore`會根據這些`MetricSensors`去撈取所需的metrics +```json +{ + "costs":[ + "org.astraea.common.cost.NetworkIngressCost", + "org.astraea.common.cost.ReplicaLeaderCost", + "org.astraea.common.cost.NetworkEgressCost" + ] +} +``` + +## 查詢已指定的 MetricSensors + +```shell +GET /metricSensors +``` + +cURL 範例 + +查詢已經註冊的`MetricSensors`之`Costfunction` +```shell +curl -X GET http://localhost:8001/metricSensors +``` + +JSON Response 範例 + ```json +{ + "costs":[ + "org.astraea.common.cost.NetworkCost$$Lambda$478/0x0000000840297840", + "org.astraea.common.cost.ReplicaLeaderCost$$Lambda$476/0x0000000840297040" + ] +} + ``` From 46054cfb42b838a46466d0e11ea9d7984d1e1671 Mon Sep 17 00:00:00 2001 From: qoo332001 Date: Mon, 24 Apr 2023 19:14:43 +0800 Subject: [PATCH 5/6] update docs and fix issues --- .../org/astraea/app/web/MetricSensorHandler.java | 16 ++++++++-------- .../java/org/astraea/app/web/WebService.java | 10 +++++----- .../astraea/app/web/MetricSensorHandlerTest.java | 2 +- docs/web_server/web_api_metricSensors_chinese.md | 16 ++++++++++++++++ 4 files changed, 30 insertions(+), 14 deletions(-) diff --git a/app/src/main/java/org/astraea/app/web/MetricSensorHandler.java b/app/src/main/java/org/astraea/app/web/MetricSensorHandler.java index 0aa6344436..7f142504d0 100644 --- a/app/src/main/java/org/astraea/app/web/MetricSensorHandler.java +++ b/app/src/main/java/org/astraea/app/web/MetricSensorHandler.java @@ -20,7 +20,7 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; import java.util.stream.Collectors; -import org.astraea.app.web.WebService.MetricSensors; +import org.astraea.app.web.WebService.Sensors; import org.astraea.common.Configuration; import org.astraea.common.Utils; import org.astraea.common.cost.CostFunction; @@ -28,22 +28,22 @@ public class MetricSensorHandler implements Handler { - private final MetricSensors metricSensors; + private final Sensors sensors; private static final Set DEFAULT_COSTS = Set.of( "org.astraea.common.cost.ReplicaLeaderCost", "org.astraea.common.cost.NetworkIngressCost"); - MetricSensorHandler(MetricSensors metricSensors) { - this.metricSensors = metricSensors; + MetricSensorHandler(Sensors sensors) { + this.sensors = sensors; } @Override public CompletionStage get(Channel channel) { var costs = - metricSensors.metricSensors().isEmpty() + sensors.metricSensors().isEmpty() ? DEFAULT_COSTS - : metricSensors.metricSensors().stream() + : sensors.metricSensors().stream() .map(x -> x.getClass().getName()) .collect(Collectors.toSet()); return CompletableFuture.completedFuture(new Response(costs)); @@ -53,8 +53,8 @@ public CompletionStage get(Channel channel) { public CompletionStage post(Channel channel) { var metricSensorPostRequest = channel.request(TypeRef.of(MetricSensorPostRequest.class)); var costs = costs(metricSensorPostRequest.costs); - metricSensors.clearSensors(); - costs.forEach(costFunction -> costFunction.metricSensor().ifPresent(metricSensors::addSensors)); + sensors.clearSensors(); + costs.forEach(costFunction -> costFunction.metricSensor().ifPresent(sensors::addSensors)); return CompletableFuture.completedFuture(new Response(metricSensorPostRequest.costs)); } diff --git a/app/src/main/java/org/astraea/app/web/WebService.java b/app/src/main/java/org/astraea/app/web/WebService.java index edf0c9ca58..878430d4b0 100644 --- a/app/src/main/java/org/astraea/app/web/WebService.java +++ b/app/src/main/java/org/astraea/app/web/WebService.java @@ -43,7 +43,7 @@ public class WebService implements AutoCloseable { private final HttpServer server; private final Admin admin; - private final MetricSensors metricSensors = new MetricSensors(); + private final Sensors sensors = new Sensors(); public WebService( Admin admin, @@ -70,7 +70,7 @@ public WebService( .localReceiver(clientSupplier) .sensorsSupplier( () -> - metricSensors.metricSensors().stream() + sensors.metricSensors().stream() .distinct() .collect( Collectors.toUnmodifiableMap( @@ -84,7 +84,7 @@ public WebService( server.createContext("/quotas", to(new QuotaHandler(admin))); server.createContext("/transactions", to(new TransactionHandler(admin))); server.createContext("/beans", to(new BeanHandler(admin, brokerIdToJmxPort))); - server.createContext("/metricSensors", to(new MetricSensorHandler(metricSensors))); + server.createContext("/metricSensors", to(new MetricSensorHandler(sensors))); server.createContext("/records", to(new RecordHandler(admin))); server.createContext("/reassignments", to(new ReassignmentHandler(admin))); server.createContext("/balancer", to(new BalancerHandler(admin, metricStore))); @@ -169,10 +169,10 @@ int jmxPortMapping(int brokerId) { Duration beanExpiration = Duration.ofHours(1); } - static class MetricSensors { + static class Sensors { private final Collection sensors; - MetricSensors() { + Sensors() { sensors = new ConcurrentLinkedQueue<>(); } diff --git a/app/src/test/java/org/astraea/app/web/MetricSensorHandlerTest.java b/app/src/test/java/org/astraea/app/web/MetricSensorHandlerTest.java index d289fce194..e84533fec5 100644 --- a/app/src/test/java/org/astraea/app/web/MetricSensorHandlerTest.java +++ b/app/src/test/java/org/astraea/app/web/MetricSensorHandlerTest.java @@ -32,7 +32,7 @@ void testBeans() { try (var admin = Admin.of(SERVICE.bootstrapServers())) { admin.creator().topic(topic).numberOfPartitions(10).run().toCompletableFuture().join(); Utils.sleep(Duration.ofSeconds(2)); - var sensors = new WebService.MetricSensors(); + var sensors = new WebService.Sensors(); var defaultCostHandler = new MetricSensorHandler(sensors); var defaultCostResponse = Assertions.assertInstanceOf( diff --git a/docs/web_server/web_api_metricSensors_chinese.md b/docs/web_server/web_api_metricSensors_chinese.md index 94d4c84040..541f79ec65 100644 --- a/docs/web_server/web_api_metricSensors_chinese.md +++ b/docs/web_server/web_api_metricSensors_chinese.md @@ -1,6 +1,22 @@ /metricSensors === +需要metricSensors的原因 (詳細請看[#1665](https://github.com/skiptests/astraea/pull/1665)) : + +- 原來的`BalancerHandler`沒辦法長時間的收集metrics(在使用者執行PostRequest時才會收集metrics),這會導致一些需要長時間統計metrics的`CostFunction`(例如 [add PartitionMigrateTimeCost and revise MetricSensor#fetch #1665](https://github.com/skiptests/astraea/pull/1665))無法收集足夠的metrics來計算分數 +- 此將`WebService`修改成平常就可以收集metrics,以及可以隨時選擇感興趣的指標(選擇`CostFunction`),如此變可以拉長指標的蒐集時間,並用長時間收集的metrics來計算`CostFunction` + +新增metricSensors前後差異: + +- 原本的`BalancerHandler`的流程如下: + 1. 使用者希望做負載平衡時打開`WebService`,此時`MetricStore`已經build,但因為沒有註冊`MetricsSensor`,因此不會撈取任何metrics + 2. 使用者送出PostRequest,此時成功註冊`MetricsSensor`並開始撈取metrics + 3. 等待撈到足夠的metrics後,`CostFunction`開始計算所需的分數 +- 修改後的`MetricSensorHandler`與`BalancerHandler`的流程如下: + 1. 使用者平常就會開著`WebService`,且同時透過送出`MetricSensorHandler`的PostRequest來選擇未來可能會想要做負載平衡的`CostFunction`,此時會同時註冊這些`CostFunction`的`MetricsSensor`並開始撈取metrics + 2. 當使用者要做負載平衡時,此時叢集已經收集了一段時間的metrics,此時呼叫`BalancerHandler`的PostRequest,可以使用這些統計一段時間的metrics來做負載平衡 + 3. `CostFunction`開始計算所需的分數 + - [指定MetricSensors](#指定-MetricSensors) - [查詢已指定的 MetricSensors](#查詢已指定的-MetricSensors) From 9f65d719c9ba7ff6140c64cfd54f42944e4d8ab6 Mon Sep 17 00:00:00 2001 From: qoo332001 Date: Wed, 26 Apr 2023 14:24:44 +0800 Subject: [PATCH 6/6] update docs --- ...cSensorHandler.java => SensorHandler.java} | 4 +-- .../java/org/astraea/app/web/WebService.java | 2 +- ...andlerTest.java => SensorHandlerTest.java} | 10 +++---- .../web_api_metricSensors_chinese.md | 26 +++++-------------- 4 files changed, 14 insertions(+), 28 deletions(-) rename app/src/main/java/org/astraea/app/web/{MetricSensorHandler.java => SensorHandler.java} (96%) rename app/src/test/java/org/astraea/app/web/{MetricSensorHandlerTest.java => SensorHandlerTest.java} (90%) diff --git a/app/src/main/java/org/astraea/app/web/MetricSensorHandler.java b/app/src/main/java/org/astraea/app/web/SensorHandler.java similarity index 96% rename from app/src/main/java/org/astraea/app/web/MetricSensorHandler.java rename to app/src/main/java/org/astraea/app/web/SensorHandler.java index 7f142504d0..425d2f15cc 100644 --- a/app/src/main/java/org/astraea/app/web/MetricSensorHandler.java +++ b/app/src/main/java/org/astraea/app/web/SensorHandler.java @@ -26,7 +26,7 @@ import org.astraea.common.cost.CostFunction; import org.astraea.common.json.TypeRef; -public class MetricSensorHandler implements Handler { +public class SensorHandler implements Handler { private final Sensors sensors; private static final Set DEFAULT_COSTS = @@ -34,7 +34,7 @@ public class MetricSensorHandler implements Handler { "org.astraea.common.cost.ReplicaLeaderCost", "org.astraea.common.cost.NetworkIngressCost"); - MetricSensorHandler(Sensors sensors) { + SensorHandler(Sensors sensors) { this.sensors = sensors; } diff --git a/app/src/main/java/org/astraea/app/web/WebService.java b/app/src/main/java/org/astraea/app/web/WebService.java index 878430d4b0..e56316ce53 100644 --- a/app/src/main/java/org/astraea/app/web/WebService.java +++ b/app/src/main/java/org/astraea/app/web/WebService.java @@ -84,7 +84,7 @@ public WebService( server.createContext("/quotas", to(new QuotaHandler(admin))); server.createContext("/transactions", to(new TransactionHandler(admin))); server.createContext("/beans", to(new BeanHandler(admin, brokerIdToJmxPort))); - server.createContext("/metricSensors", to(new MetricSensorHandler(sensors))); + server.createContext("/sensors", to(new SensorHandler(sensors))); server.createContext("/records", to(new RecordHandler(admin))); server.createContext("/reassignments", to(new ReassignmentHandler(admin))); server.createContext("/balancer", to(new BalancerHandler(admin, metricStore))); diff --git a/app/src/test/java/org/astraea/app/web/MetricSensorHandlerTest.java b/app/src/test/java/org/astraea/app/web/SensorHandlerTest.java similarity index 90% rename from app/src/test/java/org/astraea/app/web/MetricSensorHandlerTest.java rename to app/src/test/java/org/astraea/app/web/SensorHandlerTest.java index e84533fec5..bce5c14fdd 100644 --- a/app/src/test/java/org/astraea/app/web/MetricSensorHandlerTest.java +++ b/app/src/test/java/org/astraea/app/web/SensorHandlerTest.java @@ -23,7 +23,7 @@ import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; -class MetricSensorHandlerTest { +class SensorHandlerTest { private static final Service SERVICE = Service.builder().numberOfBrokers(3).build(); @Test @@ -33,16 +33,16 @@ void testBeans() { admin.creator().topic(topic).numberOfPartitions(10).run().toCompletableFuture().join(); Utils.sleep(Duration.ofSeconds(2)); var sensors = new WebService.Sensors(); - var defaultCostHandler = new MetricSensorHandler(sensors); + var defaultCostHandler = new SensorHandler(sensors); var defaultCostResponse = Assertions.assertInstanceOf( - MetricSensorHandler.Response.class, + SensorHandler.Response.class, defaultCostHandler.get(Channel.EMPTY).toCompletableFuture().join()); Assertions.assertEquals(2, defaultCostResponse.costs.size()); var changedCostResponse = Assertions.assertInstanceOf( - MetricSensorHandler.Response.class, + SensorHandler.Response.class, defaultCostHandler .post( Channel.builder() @@ -54,7 +54,7 @@ void testBeans() { var changedCostGetResponse = Assertions.assertInstanceOf( - MetricSensorHandler.Response.class, + SensorHandler.Response.class, defaultCostHandler.get(Channel.EMPTY).toCompletableFuture().join()); Assertions.assertEquals(1, changedCostGetResponse.costs.size()); } diff --git a/docs/web_server/web_api_metricSensors_chinese.md b/docs/web_server/web_api_metricSensors_chinese.md index 541f79ec65..9096dc2444 100644 --- a/docs/web_server/web_api_metricSensors_chinese.md +++ b/docs/web_server/web_api_metricSensors_chinese.md @@ -1,33 +1,19 @@ /metricSensors === -需要metricSensors的原因 (詳細請看[#1665](https://github.com/skiptests/astraea/pull/1665)) : +此api用來指定未來可能會想要使用的`Costfunction`之`MetricSensors`,主要功能如下 (相關討論請看[#1665](https://github.com/skiptests/astraea/pull/1665)) : -- 原來的`BalancerHandler`沒辦法長時間的收集metrics(在使用者執行PostRequest時才會收集metrics),這會導致一些需要長時間統計metrics的`CostFunction`(例如 [add PartitionMigrateTimeCost and revise MetricSensor#fetch #1665](https://github.com/skiptests/astraea/pull/1665))無法收集足夠的metrics來計算分數 -- 此將`WebService`修改成平常就可以收集metrics,以及可以隨時選擇感興趣的指標(選擇`CostFunction`),如此變可以拉長指標的蒐集時間,並用長時間收集的metrics來計算`CostFunction` - -新增metricSensors前後差異: - -- 原本的`BalancerHandler`的流程如下: - 1. 使用者希望做負載平衡時打開`WebService`,此時`MetricStore`已經build,但因為沒有註冊`MetricsSensor`,因此不會撈取任何metrics - 2. 使用者送出PostRequest,此時成功註冊`MetricsSensor`並開始撈取metrics - 3. 等待撈到足夠的metrics後,`CostFunction`開始計算所需的分數 -- 修改後的`MetricSensorHandler`與`BalancerHandler`的流程如下: - 1. 使用者平常就會開著`WebService`,且同時透過送出`MetricSensorHandler`的PostRequest來選擇未來可能會想要做負載平衡的`CostFunction`,此時會同時註冊這些`CostFunction`的`MetricsSensor`並開始撈取metrics - 2. 當使用者要做負載平衡時,此時叢集已經收集了一段時間的metrics,此時呼叫`BalancerHandler`的PostRequest,可以使用這些統計一段時間的metrics來做負載平衡 - 3. `CostFunction`開始計算所需的分數 - -- [指定MetricSensors](#指定-MetricSensors) -- [查詢已指定的 MetricSensors](#查詢已指定的-MetricSensors) +- [指定MetricSensors](#指定-MetricSensors): 選擇要使用的`Costfunction`之`MetricSensor` +- [查詢已指定的 MetricSensors](#查詢已指定的-MetricSensors): 查看當前已指定之`MetricSensor`的`CostFunction` ## 指定 MetricSensors ```shell -GET /metricSensors +GET /sensors ``` cURL 範例 ```shell -curl -X POST http://localhost:8001/metricSensors \ +curl -X POST http://localhost:8001/sensors \ -H "Content-Type: application/json" \ -d '{ "costs": [ @@ -60,7 +46,7 @@ cURL 範例 查詢已經註冊的`MetricSensors`之`Costfunction` ```shell -curl -X GET http://localhost:8001/metricSensors +curl -X GET http://localhost:8001/sensors ``` JSON Response 範例