Skip to content

Commit 6bafd2b

Browse files
authored
Fill jvm.thread.state attribute for jvm.thread.count metric on jdk8 (#12724)
1 parent c631801 commit 6bafd2b

File tree

2 files changed

+115
-5
lines changed
  • instrumentation/runtime-telemetry/runtime-telemetry-java8/library/src

2 files changed

+115
-5
lines changed

instrumentation/runtime-telemetry/runtime-telemetry-java8/library/src/main/java/io/opentelemetry/instrumentation/runtimemetrics/java8/Threads.java

+64-3
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@
2525
import java.util.Locale;
2626
import java.util.Map;
2727
import java.util.function.Consumer;
28+
import java.util.function.Function;
29+
import java.util.function.Supplier;
2830
import javax.annotation.Nullable;
2931

3032
/**
@@ -55,11 +57,34 @@ public final class Threads {
5557

5658
/** Register observers for java runtime class metrics. */
5759
public static List<AutoCloseable> registerObservers(OpenTelemetry openTelemetry) {
58-
return INSTANCE.registerObservers(openTelemetry, ManagementFactory.getThreadMXBean());
60+
return INSTANCE.registerObservers(openTelemetry, !isJava9OrNewer());
61+
}
62+
63+
private List<AutoCloseable> registerObservers(OpenTelemetry openTelemetry, boolean useThread) {
64+
if (useThread) {
65+
return registerObservers(openTelemetry, Threads::getThreads);
66+
}
67+
return registerObservers(openTelemetry, ManagementFactory.getThreadMXBean());
5968
}
6069

6170
// Visible for testing
6271
List<AutoCloseable> registerObservers(OpenTelemetry openTelemetry, ThreadMXBean threadBean) {
72+
return registerObservers(
73+
openTelemetry,
74+
isJava9OrNewer() ? Threads::java9AndNewerCallback : Threads::java8Callback,
75+
threadBean);
76+
}
77+
78+
// Visible for testing
79+
List<AutoCloseable> registerObservers(
80+
OpenTelemetry openTelemetry, Supplier<Thread[]> threadSupplier) {
81+
return registerObservers(openTelemetry, Threads::java8ThreadCallback, threadSupplier);
82+
}
83+
84+
private static <T> List<AutoCloseable> registerObservers(
85+
OpenTelemetry openTelemetry,
86+
Function<T, Consumer<ObservableLongMeasurement>> callbackProvider,
87+
T threadInfo) {
6388
Meter meter = JmxRuntimeMetricsUtil.getMeter(openTelemetry);
6489
List<AutoCloseable> observables = new ArrayList<>();
6590

@@ -68,8 +93,7 @@ List<AutoCloseable> registerObservers(OpenTelemetry openTelemetry, ThreadMXBean
6893
.upDownCounterBuilder("jvm.thread.count")
6994
.setDescription("Number of executing platform threads.")
7095
.setUnit("{thread}")
71-
.buildWithCallback(
72-
isJava9OrNewer() ? java9AndNewerCallback(threadBean) : java8Callback(threadBean)));
96+
.buildWithCallback(callbackProvider.apply(threadInfo)));
7397

7498
return observables;
7599
}
@@ -104,6 +128,36 @@ private static Consumer<ObservableLongMeasurement> java8Callback(ThreadMXBean th
104128
};
105129
}
106130

131+
private static Consumer<ObservableLongMeasurement> java8ThreadCallback(
132+
Supplier<Thread[]> supplier) {
133+
return measurement -> {
134+
Map<Attributes, Long> counts = new HashMap<>();
135+
for (Thread thread : supplier.get()) {
136+
Attributes threadAttributes = threadAttributes(thread);
137+
counts.compute(threadAttributes, (k, value) -> value == null ? 1 : value + 1);
138+
}
139+
counts.forEach((threadAttributes, count) -> measurement.record(count, threadAttributes));
140+
};
141+
}
142+
143+
// Visible for testing
144+
static Thread[] getThreads() {
145+
ThreadGroup threadGroup = Thread.currentThread().getThreadGroup();
146+
while (threadGroup.getParent() != null) {
147+
threadGroup = threadGroup.getParent();
148+
}
149+
// use a slightly larger array in case new threads are created
150+
int count = threadGroup.activeCount() + 10;
151+
Thread[] threads = new Thread[count];
152+
int resultSize = threadGroup.enumerate(threads);
153+
if (resultSize == threads.length) {
154+
return threads;
155+
}
156+
Thread[] result = new Thread[resultSize];
157+
System.arraycopy(threads, 0, result, 0, resultSize);
158+
return result;
159+
}
160+
107161
private static Consumer<ObservableLongMeasurement> java9AndNewerCallback(
108162
ThreadMXBean threadBean) {
109163
return measurement -> {
@@ -132,5 +186,12 @@ private static Attributes threadAttributes(ThreadInfo threadInfo) {
132186
JvmAttributes.JVM_THREAD_DAEMON, isDaemon, JvmAttributes.JVM_THREAD_STATE, threadState);
133187
}
134188

189+
private static Attributes threadAttributes(Thread thread) {
190+
boolean isDaemon = thread.isDaemon();
191+
String threadState = thread.getState().name().toLowerCase(Locale.ROOT);
192+
return Attributes.of(
193+
JvmAttributes.JVM_THREAD_DAEMON, isDaemon, JvmAttributes.JVM_THREAD_STATE, threadState);
194+
}
195+
135196
private Threads() {}
136197
}

instrumentation/runtime-telemetry/runtime-telemetry-java8/library/src/test/java/io/opentelemetry/instrumentation/runtimemetrics/java8/ThreadsStableSemconvTest.java

+51-2
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,9 @@
1818
import io.opentelemetry.instrumentation.testing.junit.LibraryInstrumentationExtension;
1919
import java.lang.management.ThreadInfo;
2020
import java.lang.management.ThreadMXBean;
21+
import java.util.Arrays;
22+
import java.util.HashSet;
23+
import java.util.Set;
2124
import org.junit.jupiter.api.Test;
2225
import org.junit.jupiter.api.condition.EnabledForJreRange;
2326
import org.junit.jupiter.api.condition.EnabledOnJre;
@@ -41,7 +44,7 @@ class ThreadsStableSemconvTest {
4144

4245
@Test
4346
@EnabledOnJre(JRE.JAVA_8)
44-
void registerObservers_Java8() {
47+
void registerObservers_Java8Jmx() {
4548
when(threadBean.getThreadCount()).thenReturn(7);
4649
when(threadBean.getDaemonThreadCount()).thenReturn(2);
4750

@@ -75,6 +78,45 @@ void registerObservers_Java8() {
7578
equalTo(JVM_THREAD_DAEMON, false))))));
7679
}
7780

81+
@Test
82+
void registerObservers_Java8Thread() {
83+
Thread threadInfo1 = mock(Thread.class, new ThreadInfoAnswer(false, Thread.State.RUNNABLE));
84+
Thread threadInfo2 = mock(Thread.class, new ThreadInfoAnswer(true, Thread.State.WAITING));
85+
86+
Thread[] threads = new Thread[] {threadInfo1, threadInfo2};
87+
88+
Threads.INSTANCE
89+
.registerObservers(testing.getOpenTelemetry(), () -> threads)
90+
.forEach(cleanup::deferCleanup);
91+
92+
testing.waitAndAssertMetrics(
93+
"io.opentelemetry.runtime-telemetry-java8",
94+
"jvm.thread.count",
95+
metrics ->
96+
metrics.anySatisfy(
97+
metricData ->
98+
assertThat(metricData)
99+
.hasInstrumentationScope(EXPECTED_SCOPE)
100+
.hasDescription("Number of executing platform threads.")
101+
.hasUnit("{thread}")
102+
.hasLongSumSatisfying(
103+
sum ->
104+
sum.isNotMonotonic()
105+
.hasPointsSatisfying(
106+
point ->
107+
point
108+
.hasValue(1)
109+
.hasAttributesSatisfying(
110+
equalTo(JVM_THREAD_DAEMON, false),
111+
equalTo(JVM_THREAD_STATE, "runnable")),
112+
point ->
113+
point
114+
.hasValue(1)
115+
.hasAttributesSatisfying(
116+
equalTo(JVM_THREAD_DAEMON, true),
117+
equalTo(JVM_THREAD_STATE, "waiting"))))));
118+
}
119+
78120
@Test
79121
@EnabledForJreRange(min = JRE.JAVA_9)
80122
void registerObservers_Java9AndNewer() {
@@ -120,6 +162,13 @@ void registerObservers_Java9AndNewer() {
120162
equalTo(JVM_THREAD_STATE, "waiting"))))));
121163
}
122164

165+
@Test
166+
void getThreads() {
167+
Thread[] threads = Threads.getThreads();
168+
Set<Thread> set = new HashSet<>(Arrays.asList(threads));
169+
assertThat(set).contains(Thread.currentThread());
170+
}
171+
123172
static final class ThreadInfoAnswer implements Answer<Object> {
124173

125174
private final boolean isDaemon;
@@ -135,7 +184,7 @@ public Object answer(InvocationOnMock invocation) {
135184
String methodName = invocation.getMethod().getName();
136185
if (methodName.equals("isDaemon")) {
137186
return isDaemon;
138-
} else if (methodName.equals("getThreadState")) {
187+
} else if (methodName.equals("getThreadState") || methodName.equals("getState")) {
139188
return state;
140189
}
141190
return null;

0 commit comments

Comments
 (0)