Skip to content

Commit 693c3f3

Browse files
authored
[COMMON] separate JndiClient from MBeanClient (#1679)
1 parent c9219bf commit 693c3f3

File tree

47 files changed

+349
-333
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

47 files changed

+349
-333
lines changed

app/src/main/java/org/astraea/app/performance/Report.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -20,15 +20,15 @@
2020
import java.util.Optional;
2121
import java.util.stream.Collectors;
2222
import org.astraea.common.metrics.BeanQuery;
23-
import org.astraea.common.metrics.MBeanClient;
23+
import org.astraea.common.metrics.JndiClient;
2424
import org.astraea.common.metrics.client.consumer.ConsumerMetrics;
2525
import org.astraea.common.metrics.client.consumer.HasConsumerFetchMetrics;
2626
import org.astraea.common.metrics.client.producer.ProducerMetrics;
2727

2828
public interface Report {
2929

3030
static long recordsConsumedTotal() {
31-
var client = MBeanClient.local();
31+
var client = JndiClient.local();
3232
return (long)
3333
ConsumerMetrics.fetch(client).stream()
3434
.mapToDouble(HasConsumerFetchMetrics::recordsConsumedTotal)
@@ -37,7 +37,7 @@ static long recordsConsumedTotal() {
3737

3838
static List<Report> consumers() {
3939

40-
return ConsumerMetrics.fetch(MBeanClient.local()).stream()
40+
return ConsumerMetrics.fetch(JndiClient.local()).stream()
4141
.map(
4242
m ->
4343
new Report() {
@@ -74,7 +74,7 @@ public String clientId() {
7474
@Override
7575
public Optional<Double> e2eLatency() {
7676
return Optional.ofNullable(
77-
MBeanClient.local()
77+
JndiClient.local()
7878
.bean(
7979
BeanQuery.builder()
8080
.domainName(ConsumerThread.DOMAIN_NAME)
@@ -91,7 +91,7 @@ public Optional<Double> e2eLatency() {
9191
}
9292

9393
static List<Report> producers() {
94-
return ProducerMetrics.producer(MBeanClient.local()).stream()
94+
return ProducerMetrics.producer(JndiClient.local()).stream()
9595
.map(
9696
m ->
9797
new Report() {
@@ -113,7 +113,7 @@ public double avgLatency() {
113113
@Override
114114
public Optional<Double> e2eLatency() {
115115
return Optional.ofNullable(
116-
MBeanClient.local()
116+
JndiClient.local()
117117
.bean(
118118
BeanQuery.builder()
119119
.domainName(ProducerThread.DOMAIN_NAME)

app/src/main/java/org/astraea/app/performance/TrackerThread.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@
2727
import org.astraea.common.DataSize;
2828
import org.astraea.common.Utils;
2929
import org.astraea.common.metrics.HasBeanObject;
30-
import org.astraea.common.metrics.MBeanClient;
30+
import org.astraea.common.metrics.JndiClient;
3131
import org.astraea.common.metrics.client.consumer.ConsumerMetrics;
3232
import org.astraea.common.metrics.client.consumer.HasConsumerCoordinatorMetrics;
3333
import org.astraea.common.metrics.client.producer.HasProducerTopicMetrics;
@@ -37,7 +37,7 @@
3737
public interface TrackerThread extends AbstractThread {
3838

3939
class ProducerPrinter {
40-
private final MBeanClient mBeanClient = MBeanClient.local();
40+
private final JndiClient mBeanClient = JndiClient.local();
4141
private final Supplier<List<Report>> reportSupplier;
4242
private long lastRecords = 0;
4343

@@ -97,7 +97,7 @@ boolean tryToPrint(Duration duration) {
9797
}
9898

9999
class ConsumerPrinter {
100-
private final MBeanClient mBeanClient = MBeanClient.local();
100+
private final JndiClient mBeanClient = JndiClient.local();
101101
private final Supplier<List<Report>> reportSupplier;
102102
private long lastRecords = 0;
103103

app/src/main/java/org/astraea/app/publisher/MetricPublisher.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@
2626
import org.astraea.common.Utils;
2727
import org.astraea.common.admin.Admin;
2828
import org.astraea.common.admin.NodeInfo;
29-
import org.astraea.common.metrics.MBeanClient;
29+
import org.astraea.common.metrics.JndiClient;
3030
import org.astraea.common.metrics.collector.MetricFetcher;
3131

3232
/** Keep fetching all kinds of metrics and publish to inner topics. */
@@ -58,7 +58,7 @@ static void execute(Arguments arguments) {
5858
Collectors.toUnmodifiableMap(
5959
NodeInfo::id,
6060
node ->
61-
MBeanClient.jndi(
61+
JndiClient.of(
6262
node.host(),
6363
arguments.idToJmxPort().apply(node.id()))))))
6464
.fetchBeanDelay(arguments.period)

app/src/main/java/org/astraea/app/web/BeanHandler.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323
import org.astraea.common.admin.Admin;
2424
import org.astraea.common.metrics.BeanObject;
2525
import org.astraea.common.metrics.BeanQuery;
26-
import org.astraea.common.metrics.MBeanClient;
26+
import org.astraea.common.metrics.JndiClient;
2727

2828
public class BeanHandler implements Handler {
2929
private final Admin admin;
@@ -45,8 +45,7 @@ public CompletionStage<Response> get(Channel channel) {
4545
brokers.stream()
4646
.map(
4747
b -> {
48-
try (var client =
49-
MBeanClient.jndi(b.host(), jmxPorts.apply(b.id()))) {
48+
try (var client = JndiClient.of(b.host(), jmxPorts.apply(b.id()))) {
5049
return new NodeBean(
5150
b.host(),
5251
client.beans(builder.build()).stream()

app/src/main/java/org/astraea/app/web/WebService.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
import org.astraea.common.Utils;
3636
import org.astraea.common.admin.Admin;
3737
import org.astraea.common.admin.NodeInfo;
38+
import org.astraea.common.metrics.JndiClient;
3839
import org.astraea.common.metrics.MBeanClient;
3940
import org.astraea.common.metrics.collector.MetricSensor;
4041
import org.astraea.common.metrics.collector.MetricStore;
@@ -62,8 +63,7 @@ public WebService(
6263
Collectors.toUnmodifiableMap(
6364
NodeInfo::id,
6465
b ->
65-
MBeanClient.jndi(
66-
b.host(), brokerIdToJmxPort.apply(b.id())))));
66+
JndiClient.of(b.host(), brokerIdToJmxPort.apply(b.id())))));
6767
var metricStore =
6868
MetricStore.builder()
6969
.beanExpiration(beanExpiration)

app/src/test/java/org/astraea/app/web/BalancerHandlerTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,7 @@
7676
import org.astraea.common.json.JsonConverter;
7777
import org.astraea.common.json.TypeRef;
7878
import org.astraea.common.metrics.ClusterBean;
79+
import org.astraea.common.metrics.JndiClient;
7980
import org.astraea.common.metrics.MBeanClient;
8081
import org.astraea.common.metrics.collector.MetricSensor;
8182
import org.astraea.common.metrics.collector.MetricStore;
@@ -1355,8 +1356,7 @@ private MetricStore metricStore(Admin admin, List<CostWeight> costWeights) {
13551356
Collectors.toUnmodifiableMap(
13561357
NodeInfo::id,
13571358
b ->
1358-
MBeanClient.jndi(
1359-
b.host(), brokerIdToJmxPort.apply(b.id())))));
1359+
JndiClient.of(b.host(), brokerIdToJmxPort.apply(b.id())))));
13601360
var cw = costWeights.stream().map(x -> x.cost).collect(Collectors.toSet());
13611361
var cf = Utils.costFunctions(cw, HasClusterCost.class, Configuration.EMPTY);
13621362
var metricSensors = cf.stream().map(c -> c.metricSensor().get()).collect(Collectors.toList());

common/src/main/java/org/astraea/common/assignor/Assignor.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
import org.astraea.common.consumer.ConsumerConfigs;
4040
import org.astraea.common.cost.HasPartitionCost;
4141
import org.astraea.common.cost.ReplicaLeaderSizeCost;
42+
import org.astraea.common.metrics.JndiClient;
4243
import org.astraea.common.metrics.MBeanClient;
4344
import org.astraea.common.metrics.collector.MetricStore;
4445
import org.astraea.common.partitioner.PartitionerUtils;
@@ -159,13 +160,13 @@ public final void configure(Map<String, ?> configs) {
159160
.brokers()
160161
.thenApply(
161162
brokers -> {
162-
var map = new HashMap<Integer, MBeanClient>();
163+
var map = new HashMap<Integer, JndiClient>();
163164
brokers.forEach(
164165
b ->
165166
map.put(
166-
b.id(), MBeanClient.jndi(b.host(), jmxPortGetter.apply(b.id()))));
167+
b.id(), JndiClient.of(b.host(), jmxPortGetter.apply(b.id()))));
167168
// add local client to fetch consumer metrics
168-
map.put(-1, MBeanClient.local());
169+
map.put(-1, JndiClient.local());
169170
return Collections.unmodifiableMap(map);
170171
});
171172
metricStore =
Lines changed: 184 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,184 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.astraea.common.metrics;
18+
19+
import java.io.IOException;
20+
import java.lang.management.ManagementFactory;
21+
import java.net.MalformedURLException;
22+
import java.util.Arrays;
23+
import java.util.Collection;
24+
import java.util.HashMap;
25+
import java.util.List;
26+
import java.util.function.Consumer;
27+
import java.util.stream.Collectors;
28+
import java.util.stream.Stream;
29+
import javax.management.AttributeNotFoundException;
30+
import javax.management.InstanceNotFoundException;
31+
import javax.management.MBeanException;
32+
import javax.management.MBeanFeatureInfo;
33+
import javax.management.MBeanServerConnection;
34+
import javax.management.ObjectInstance;
35+
import javax.management.ReflectionException;
36+
import javax.management.RuntimeMBeanException;
37+
import javax.management.remote.JMXConnectorFactory;
38+
import javax.management.remote.JMXServiceURL;
39+
import org.astraea.common.Utils;
40+
41+
/** A MBeanClient used to retrieve mbean value from remote Jmx server. */
42+
public interface JndiClient extends MBeanClient, AutoCloseable {
43+
44+
/**
45+
* @param host the address of jmx server
46+
* @param port the port of jmx server
47+
* @return a mbean client using JNDI to lookup metrics.
48+
*/
49+
static JndiClient of(String host, int port) {
50+
try {
51+
return of(
52+
new JMXServiceURL(
53+
String.format(
54+
"service:jmx:rmi://%s:%s/jndi/rmi://%s:%s/jmxrmi", host, port, host, port)));
55+
} catch (MalformedURLException e) {
56+
throw new IllegalArgumentException(e);
57+
}
58+
}
59+
60+
static JndiClient of(JMXServiceURL jmxServiceURL) {
61+
return Utils.packException(
62+
() -> {
63+
var jmxConnector = JMXConnectorFactory.connect(jmxServiceURL);
64+
return new BasicMBeanClient(
65+
jmxConnector.getMBeanServerConnection(),
66+
jmxServiceURL.getHost(),
67+
jmxServiceURL.getPort()) {
68+
@Override
69+
public void close() {
70+
Utils.close(jmxConnector);
71+
}
72+
};
73+
});
74+
}
75+
76+
static JndiClient local() {
77+
return new BasicMBeanClient(ManagementFactory.getPlatformMBeanServer(), Utils.hostname(), -1);
78+
}
79+
80+
@Override
81+
default void close() {}
82+
83+
class BasicMBeanClient implements JndiClient {
84+
85+
private final MBeanServerConnection connection;
86+
final String host;
87+
88+
final int port;
89+
90+
BasicMBeanClient(MBeanServerConnection connection, String host, int port) {
91+
this.connection = connection;
92+
this.host = host;
93+
this.port = port;
94+
}
95+
96+
@Override
97+
public BeanObject bean(BeanQuery beanQuery) {
98+
return Utils.packException(
99+
() -> {
100+
// ask for MBeanInfo
101+
var mBeanInfo = connection.getMBeanInfo(beanQuery.objectName());
102+
103+
// create a list builder all available attributes name
104+
var attributeName =
105+
Arrays.stream(mBeanInfo.getAttributes())
106+
.map(MBeanFeatureInfo::getName)
107+
.collect(Collectors.toList());
108+
109+
// query the result
110+
return queryBean(beanQuery, attributeName);
111+
});
112+
}
113+
114+
BeanObject queryBean(BeanQuery beanQuery, Collection<String> attributeNameCollection)
115+
throws ReflectionException,
116+
InstanceNotFoundException,
117+
IOException,
118+
AttributeNotFoundException,
119+
MBeanException {
120+
// fetch attribute value from mbean server
121+
var attributeNameArray = attributeNameCollection.toArray(new String[0]);
122+
var attributeList =
123+
connection.getAttributes(beanQuery.objectName(), attributeNameArray).asList();
124+
125+
// collect attribute name & value into a map
126+
var attributes = new HashMap<String, Object>();
127+
attributeList.forEach(attribute -> attributes.put(attribute.getName(), attribute.getValue()));
128+
129+
// according to the javadoc of MBeanServerConnection#getAttributes, the API will
130+
// ignore any error occurring during the fetch process (for example, attribute not
131+
// exists). Below code check for such condition and try to figure out what exactly
132+
// the error is. put it into attributes return result.
133+
for (var str : attributeNameArray) {
134+
if (attributes.containsKey(str)) continue;
135+
try {
136+
attributes.put(str, connection.getAttribute(beanQuery.objectName(), str));
137+
} catch (RuntimeMBeanException e) {
138+
if (!(e.getCause() instanceof UnsupportedOperationException))
139+
throw new IllegalStateException(e);
140+
// the UnsupportedOperationException is thrown when we query unacceptable
141+
// attribute. we just skip it as it is normal case to
142+
// return "acceptable" attribute only
143+
}
144+
}
145+
146+
// collect result, and build a new BeanObject as return result
147+
return new BeanObject(beanQuery.domainName(), beanQuery.properties(), attributes);
148+
}
149+
150+
@Override
151+
public Collection<BeanObject> beans(
152+
BeanQuery beanQuery, Consumer<RuntimeException> errorHandle) {
153+
return Utils.packException(
154+
() ->
155+
connection.queryMBeans(beanQuery.objectName(), null).stream()
156+
// Parallelize the sampling of bean objects. The underlying RMI is thread-safe.
157+
// https://github.com/skiptests/astraea/issues/1553#issuecomment-1461143723
158+
.parallel()
159+
.map(ObjectInstance::getObjectName)
160+
.map(BeanQuery::fromObjectName)
161+
.flatMap(
162+
query -> {
163+
try {
164+
return Stream.of(bean(query));
165+
} catch (RuntimeException e) {
166+
errorHandle.accept(e);
167+
return Stream.empty();
168+
}
169+
})
170+
.collect(Collectors.toUnmodifiableList()));
171+
}
172+
173+
/**
174+
* Returns the list of domains in which any MBean is currently registered.
175+
*
176+
* <p>The order of strings within the returned array is not defined.
177+
*
178+
* @return a {@link List} of domain name {@link String}
179+
*/
180+
List<String> domains() {
181+
return Utils.packException(() -> Arrays.asList(connection.getDomains()));
182+
}
183+
}
184+
}

0 commit comments

Comments
 (0)