Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-17561: KIP-1091 add operator metrics #17820

Open
wants to merge 3 commits into
base: trunk
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -180,15 +180,16 @@ public void shouldPushMetricsToBroker(final String recordingLevel) throws Except
final String name = mn.name().replace('-', '.');
final String group = mn.group().replace("-metrics", "").replace('-', '.');
return "org.apache.kafka." + group + "." + name;
}).sorted().collect(Collectors.toList());
}).filter(name -> !name.equals("org.apache.kafka.stream.thread.state"))// telemetry reporter filters out string metrics
.sorted().collect(Collectors.toList());
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Telemetry filters out string metrics, so I remove it here from the expected metrics list

final List<String> actualMetrics = new ArrayList<>(TelemetryPlugin.SUBSCRIBED_METRICS.get(mainConsumerInstanceId));
assertEquals(expectedMetrics, actualMetrics);

TestUtils.waitForCondition(() -> !TelemetryPlugin.SUBSCRIBED_METRICS.get(adminInstanceId).isEmpty(),
30_000,
"Never received subscribed metrics");
final List<String> actualInstanceMetrics = TelemetryPlugin.SUBSCRIBED_METRICS.get(adminInstanceId);
final List<String> expectedInstanceMetrics = Arrays.asList("org.apache.kafka.stream.alive.stream.threads", "org.apache.kafka.stream.failed.stream.threads");
final List<String> expectedInstanceMetrics = Arrays.asList("org.apache.kafka.stream.alive.stream.threads", "org.apache.kafka.stream.client.state", "org.apache.kafka.stream.failed.stream.threads", "org.apache.kafka.stream.recording.level");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: line too long. How about

final List<String> expectedInstanceMetrics = Arrays.asList(
    "org.apache.kafka.stream.client.state",
    "org.apache.kafka.stream.alive.stream.threads",
    "org.apache.kafka.stream.failed.stream.threads",
    "org.apache.kafka.stream.recording.level"
);

assertEquals(expectedInstanceMetrics, actualInstanceMetrics);

TestUtils.waitForCondition(() -> TelemetryPlugin.processId != null,
Expand Down
21 changes: 19 additions & 2 deletions streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@
import java.util.function.Consumer;
import java.util.function.Supplier;

import static org.apache.kafka.streams.StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG;
import static org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.SHUTDOWN_CLIENT;
import static org.apache.kafka.streams.internals.ApiUtils.prepareMillisCheckFailMsgPrefix;
import static org.apache.kafka.streams.internals.ApiUtils.validateMillisecondDuration;
Expand Down Expand Up @@ -987,6 +988,8 @@ private KafkaStreams(final TopologyMetadata topologyMetadata,
ClientMetrics.addApplicationIdMetric(streamsMetrics, applicationConfigs.getString(StreamsConfig.APPLICATION_ID_CONFIG));
ClientMetrics.addTopologyDescriptionMetric(streamsMetrics, (metricsConfig, now) -> this.topologyMetadata.topologyDescriptionString());
ClientMetrics.addStateMetric(streamsMetrics, (metricsConfig, now) -> state);
ClientMetrics.addClientStateTelemetryMetric(streamsMetrics, (metricsConfig, now) -> state.ordinal());
ClientMetrics.addClientRecordingLevelMetric(streamsMetrics, calculateMetricsRecordingLevel());
threads = Collections.synchronizedList(new LinkedList<>());
ClientMetrics.addNumAliveStreamThreadMetric(streamsMetrics, (metricsConfig, now) -> numLiveStreamThreads());

Expand Down Expand Up @@ -1067,7 +1070,7 @@ private StreamThread createAndAddStreamThread(final long cacheSizePerThread, fin
private static Metrics createMetrics(final StreamsConfig config, final Time time, final String clientId) {
final MetricConfig metricConfig = new MetricConfig()
.samples(config.getInt(StreamsConfig.METRICS_NUM_SAMPLES_CONFIG))
.recordLevel(Sensor.RecordingLevel.forName(config.getString(StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG)))
.recordLevel(Sensor.RecordingLevel.forName(config.getString(METRICS_RECORDING_LEVEL_CONFIG)))
.timeWindow(config.getLong(StreamsConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG), TimeUnit.MILLISECONDS);
final List<MetricsReporter> reporters = CommonClientConfigs.metricsReporters(clientId, config);

Expand Down Expand Up @@ -1250,6 +1253,20 @@ private Optional<String> removeStreamThread(final long timeoutMs) throws Timeout
return Optional.empty();
}

private int calculateMetricsRecordingLevel() {
final int recordingLevel;
final String recordingLevelString = applicationConfigs.getString(METRICS_RECORDING_LEVEL_CONFIG);
if (recordingLevelString.equals("INFO")) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not use a switch statement?

recordingLevel = 0;
} else if (recordingLevelString.equals("DEBUG")) {
recordingLevel = 1;
} else {
// Must be TRACE level
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might be better to use another if and final else throw an exception as safe guard? Would also highlight that we need to update this code, in case we add a new level?

recordingLevel = 2;
}
return recordingLevel;
}

/*
* Takes a snapshot and counts the number of stream threads which are not in PENDING_SHUTDOWN or DEAD
*
Expand Down Expand Up @@ -1334,7 +1351,7 @@ private ScheduledExecutorService setupStateDirCleaner() {

private static ScheduledExecutorService maybeCreateRocksDBMetricsRecordingService(final String clientId,
final StreamsConfig config) {
if (RecordingLevel.forName(config.getString(StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG)) == RecordingLevel.DEBUG) {
if (RecordingLevel.forName(config.getString(METRICS_RECORDING_LEVEL_CONFIG)) == RecordingLevel.DEBUG) {
return Executors.newSingleThreadScheduledExecutor(r -> {
final Thread thread = new Thread(r, clientId + "-RocksDBMetricsRecordingTrigger");
thread.setDaemon(true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,14 @@ private ClientMetrics() {}
private static final String APPLICATION_ID = "application-id";
private static final String TOPOLOGY_DESCRIPTION = "topology-description";
private static final String STATE = "state";
private static final String CLIENT_STATE = "client-state";
private static final String ALIVE_STREAM_THREADS = "alive-stream-threads";
private static final String VERSION_FROM_FILE;
private static final String COMMIT_ID_FROM_FILE;
private static final String DEFAULT_VALUE = "unknown";
private static final String FAILED_STREAM_THREADS = "failed-stream-threads";
private static final String RECORDING_LEVEL = "recording-level";


static {
final Properties props = new Properties();
Expand All @@ -67,6 +70,7 @@ private ClientMetrics() {}
private static final String STATE_DESCRIPTION = "The state of the Kafka Streams client";
private static final String ALIVE_STREAM_THREADS_DESCRIPTION = "The current number of alive stream threads that are running or participating in rebalance";
private static final String FAILED_STREAM_THREADS_DESCRIPTION = "The number of failed stream threads since the start of the Kafka Streams client";
private static final String RECORDING_LEVEL_DESCRIPTION = "The metrics recording level of the Kafka Streams client";

public static String version() {
return VERSION_FROM_FILE;
Expand Down Expand Up @@ -123,6 +127,26 @@ public static void addStateMetric(final StreamsMetricsImpl streamsMetrics,
);
}

public static void addClientStateTelemetryMetric(final StreamsMetricsImpl streamsMetrics,
final Gauge<Integer> stateProvider) {
streamsMetrics.addClientLevelMutableMetric(
CLIENT_STATE,
STATE_DESCRIPTION,
RecordingLevel.INFO,
stateProvider
);
}

public static void addClientRecordingLevelMetric(final StreamsMetricsImpl streamsMetrics,
final int recordingLevel) {
streamsMetrics.addClientLevelImmutableMetric(
RECORDING_LEVEL,
RECORDING_LEVEL_DESCRIPTION,
RecordingLevel.INFO,
recordingLevel
);
}

public static void addNumAliveStreamThreadMetric(final StreamsMetricsImpl streamsMetrics,
final Gauge<Integer> stateProvider) {
streamsMetrics.addClientLevelMutableMetric(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Locale;
import java.util.Map;
import java.util.Optional;
import java.util.Queue;
Expand Down Expand Up @@ -614,6 +615,12 @@ public StreamThread(final Time time,
streamsMetrics,
time.milliseconds()
);
ThreadMetrics.addThreadStateTelemetryMetric(threadId,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: threadId should be in the next line by itself

streamsMetrics,
(metricConfig, now) -> this.state().ordinal());
ThreadMetrics.addThreadStateMetric(threadId,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as above

streamsMetrics,
(metricConfig, now) -> this.state().name().toLowerCase(Locale.getDefault()));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the corresponding client metric we just use

 ClientMetrics.addStateMetric(streamsMetrics, (metricsConfig, now) -> state);

Why so "complicated" -- I am also ok to update the code for the client metric. But both should be the same?

(Or maybe keep ... -> state and add a "fancy" toString() overload to both enum (for client and thread) which model the state?

ThreadMetrics.addThreadBlockedTimeMetric(
threadId,
new StreamThreadTotalBlockedTime(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package org.apache.kafka.streams.processor.internals.metrics;

import org.apache.kafka.common.metrics.Gauge;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.metrics.Sensor.RecordingLevel;
import org.apache.kafka.streams.processor.internals.StreamThreadTotalBlockedTime;
Expand Down Expand Up @@ -44,7 +45,9 @@ private ThreadMetrics() {}
private static final String CREATE_TASK = "task-created";
private static final String CLOSE_TASK = "task-closed";
private static final String BLOCKED_TIME = "blocked-time-ns-total";
private static final String STATE = "state";
private static final String THREAD_START_TIME = "thread-start-time";
private static final String THREAD_STATE = "thread-state";

private static final String COMMIT_DESCRIPTION = "calls to commit";
private static final String COMMIT_TOTAL_DESCRIPTION = TOTAL_DESCRIPTION + COMMIT_DESCRIPTION;
Expand Down Expand Up @@ -88,6 +91,8 @@ private ThreadMetrics() {}
"The total time the thread spent blocked on kafka in nanoseconds";
private static final String THREAD_START_TIME_DESCRIPTION =
"The time that the thread was started";
private static final String THREAD_STATE_DESCRIPTION =
"The current state of the thread";

public static Sensor createTaskSensor(final String threadId,
final StreamsMetricsImpl streamsMetrics) {
Expand Down Expand Up @@ -290,6 +295,30 @@ public static void addThreadStartTimeMetric(final String threadId,
);
}

public static void addThreadStateTelemetryMetric(final String threadId,
final StreamsMetricsImpl streamsMetrics,
final Gauge<Integer> threadStateProvider) {
streamsMetrics.addThreadLevelMutableMetric(
THREAD_STATE,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: indention too deep (should only be 4 whitespace, not 8 -- should be an IDE setting)

THREAD_STATE_DESCRIPTION,
threadId,
threadStateProvider
);
}

public static void addThreadStateMetric(final String threadId,
final StreamsMetricsImpl streamsMetrics,
final Gauge<String> threadStateProvider) {
streamsMetrics.addThreadLevelMutableMetric(
STATE,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as above

THREAD_STATE_DESCRIPTION,
threadId,
threadStateProvider
);
}



public static void addThreadBlockedTimeMetric(final String threadId,
final StreamThreadTotalBlockedTime blockedTime,
final StreamsMetricsImpl streamsMetrics) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,32 @@ public void shouldAddAliveStreamThreadsMetric() {
);
}

@Test
public void shouldAddClientStateTelemetryMetric() {
final String name = "client-state";
final String description = "The state of the Kafka Streams client";
final Gauge<Integer> stateProvider = (config, now) -> State.RUNNING.ordinal();
setUpAndVerifyMutableMetric(
name,
description,
stateProvider,
() -> ClientMetrics.addClientStateTelemetryMetric(streamsMetrics, stateProvider)
);
}

@Test
public void shouldAddRecordingLevelMetric() {
final String name = "recording-level";
final String description = "The metrics recording level of the Kafka Streams client";
final int recordingLevel = 1;
setUpAndVerifyImmutableMetric(
name,
description,
recordingLevel,
() -> ClientMetrics.addClientRecordingLevelMetric(streamsMetrics, recordingLevel)
);
}

@Test
public void shouldGetFailedStreamThreadsSensor() {
final String name = "failed-stream-threads";
Expand Down Expand Up @@ -159,4 +185,19 @@ private void setUpAndVerifyImmutableMetric(final String name,
eq(value)
);
}

private void setUpAndVerifyImmutableMetric(final String name,
final String description,
final int value,
final Runnable metricAdder) {

metricAdder.run();

verify(streamsMetrics).addClientLevelImmutableMetric(
eq(name),
eq(description),
eq(RecordingLevel.INFO),
eq(value)
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,15 @@
import org.apache.kafka.common.metrics.Gauge;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.metrics.Sensor.RecordingLevel;
import org.apache.kafka.streams.processor.internals.StreamThread;
import org.apache.kafka.streams.processor.internals.StreamThreadTotalBlockedTime;

import org.junit.jupiter.api.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.MockedStatic;

import java.util.Collections;
import java.util.Locale;
import java.util.Map;

import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.LATENCY_SUFFIX;
Expand Down Expand Up @@ -413,6 +415,39 @@ public void shouldAddThreadStartTimeMetric() {
);
}

@Test
public void shouldAddThreadStateTelemetryMetric() {
final Gauge<Integer> threadStateProvider = (streamsMetrics, startTime) -> StreamThread.State.RUNNING.ordinal();
ThreadMetrics.addThreadStateTelemetryMetric(
"threadId",
streamsMetrics,
threadStateProvider
);
verify(streamsMetrics).addThreadLevelMutableMetric(
"thread-state",
"The current state of the thread",
"threadId",
threadStateProvider
);
}

@Test
public void shouldAddThreadStateJMXMetric() {
final Gauge<String> threadStateProvider = (streamsMetrics, startTime) -> StreamThread.State.RUNNING.name().toLowerCase(Locale.getDefault());
ThreadMetrics.addThreadStateMetric(
"threadId",
streamsMetrics,
threadStateProvider
);
verify(streamsMetrics).addThreadLevelMutableMetric(
"state",
"The current state of the thread",
"threadId",
threadStateProvider
);
}


@Test
public void shouldAddTotalBlockedTimeMetric() {
// Given:
Expand Down