Skip to content

KAFKA-17561: KIP-1091 add operator metrics #17820

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Nov 18, 2024

Conversation

bbejeck
Copy link
Member

@bbejeck bbejeck commented Nov 14, 2024

Implementation of KIP-1091 adding operator metrics to Kafka Streams

Updated existing tests to validate added metrics

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

@@ -180,15 +180,16 @@ public void shouldPushMetricsToBroker(final String recordingLevel) throws Except
final String name = mn.name().replace('-', '.');
final String group = mn.group().replace("-metrics", "").replace('-', '.');
return "org.apache.kafka." + group + "." + name;
}).sorted().collect(Collectors.toList());
}).filter(name -> !name.equals("org.apache.kafka.stream.thread.state"))// telemetry reporter filters out string metrics
.sorted().collect(Collectors.toList());
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Telemetry filters out string metrics, so I remove it here from the expected metrics list

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do not understand this. According to the KIP and the code in ThreadMetrics the type of the metric is numeric.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We also added a state metric for stream threads that reports the current state of the thread as a string.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it!
I thought org.apache.kafka.stream.thread.thread.state was a typo and it should actually be org.apache.kafka.stream.thread.state. So I was wondering, why you wanted to exclude the metric you explicitly added for sending it to the broker from the test.

@bbejeck bbejeck requested review from cadonna and mjsax November 14, 2024 20:58
@mjsax mjsax added the kip Requires or implements a KIP label Nov 15, 2024
Copy link
Member

@mjsax mjsax left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Made a pass. Might be good if @cadonna could take a look too.

} else if (recordingLevelString.equals("DEBUG")) {
recordingLevel = 1;
} else {
// Must be TRACE level
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might be better to use another if and final else throw an exception as safe guard? Would also highlight that we need to update this code, in case we add a new level?

private int calculateMetricsRecordingLevel() {
final int recordingLevel;
final String recordingLevelString = applicationConfigs.getString(METRICS_RECORDING_LEVEL_CONFIG);
if (recordingLevelString.equals("INFO")) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not use a switch statement?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought Java 11 had switch expressions but it's Java 13+ so I was a little disappointed, but now looking at again the code will still benefit from using switch

@@ -614,6 +615,12 @@ public StreamThread(final Time time,
streamsMetrics,
time.milliseconds()
);
ThreadMetrics.addThreadStateTelemetryMetric(threadId,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: threadId should be in the next line by itself

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: we usually use 4 spaces indentation.

ThreadMetrics.addThreadStateTelemetryMetric(threadId,
streamsMetrics,
(metricConfig, now) -> this.state().ordinal());
ThreadMetrics.addThreadStateMetric(threadId,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as above

(metricConfig, now) -> this.state().ordinal());
ThreadMetrics.addThreadStateMetric(threadId,
streamsMetrics,
(metricConfig, now) -> this.state().name().toLowerCase(Locale.getDefault()));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the corresponding client metric we just use

 ClientMetrics.addStateMetric(streamsMetrics, (metricsConfig, now) -> state);

Why so "complicated" -- I am also ok to update the code for the client metric. But both should be the same?

(Or maybe keep ... -> state and add a "fancy" toString() overload to both enum (for client and thread) which model the state?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The KIP specifies lower-case string but I like the other approach so I'll update it.

final StreamsMetricsImpl streamsMetrics,
final Gauge<Integer> threadStateProvider) {
streamsMetrics.addThreadLevelMutableMetric(
THREAD_STATE,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: indention too deep (should only be 4 whitespace, not 8 -- should be an IDE setting)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ack, I think I need to copy settings from someone this is a common occurance with my PRs

final StreamsMetricsImpl streamsMetrics,
final Gauge<String> threadStateProvider) {
streamsMetrics.addThreadLevelMutableMetric(
STATE,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as above

final List<String> actualMetrics = new ArrayList<>(TelemetryPlugin.SUBSCRIBED_METRICS.get(mainConsumerInstanceId));
assertEquals(expectedMetrics, actualMetrics);

TestUtils.waitForCondition(() -> !TelemetryPlugin.SUBSCRIBED_METRICS.get(adminInstanceId).isEmpty(),
30_000,
"Never received subscribed metrics");
final List<String> actualInstanceMetrics = TelemetryPlugin.SUBSCRIBED_METRICS.get(adminInstanceId);
final List<String> expectedInstanceMetrics = Arrays.asList("org.apache.kafka.stream.alive.stream.threads", "org.apache.kafka.stream.failed.stream.threads");
final List<String> expectedInstanceMetrics = Arrays.asList("org.apache.kafka.stream.alive.stream.threads", "org.apache.kafka.stream.client.state", "org.apache.kafka.stream.failed.stream.threads", "org.apache.kafka.stream.recording.level");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: line too long. How about

final List<String> expectedInstanceMetrics = Arrays.asList(
    "org.apache.kafka.stream.client.state",
    "org.apache.kafka.stream.alive.stream.threads",
    "org.apache.kafka.stream.failed.stream.threads",
    "org.apache.kafka.stream.recording.level"
);

Copy link
Member

@cadonna cadonna left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR, @bbejeck !

Could you please also update MetricsIntegrationTest with the new metrics?

public void shouldAddThreadStateTelemetryMetric() {
final Gauge<Integer> threadStateProvider = (streamsMetrics, startTime) -> StreamThread.State.RUNNING.ordinal();
ThreadMetrics.addThreadStateTelemetryMetric(
"threadId",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a constant for a test thread ID:

Suggested change
"threadId",
THREAD_ID,

verify(streamsMetrics).addThreadLevelMutableMetric(
"thread-state",
"The current state of the thread",
"threadId",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
"threadId",
THREAD_ID,

public void shouldAddThreadStateJMXMetric() {
final Gauge<String> threadStateProvider = (streamsMetrics, startTime) -> StreamThread.State.RUNNING.name().toLowerCase(Locale.getDefault());
ThreadMetrics.addThreadStateMetric(
"threadId",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
"threadId",
THREAD_ID,

verify(streamsMetrics).addThreadLevelMutableMetric(
"state",
"The current state of the thread",
"threadId",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
"threadId",
THREAD_ID,

@@ -614,6 +615,12 @@ public StreamThread(final Time time,
streamsMetrics,
time.milliseconds()
);
ThreadMetrics.addThreadStateTelemetryMetric(threadId,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: we usually use 4 spaces indentation.

Comment on lines 1070 to 1073
.recordLevel(Sensor.RecordingLevel.forName(config.getString(StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG)))
.recordLevel(Sensor.RecordingLevel.forName(config.getString(METRICS_RECORDING_LEVEL_CONFIG)))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why did you remove the StreamsConfig. prefix? For all other config names, we use the prefix.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unintended, I let IntelliJ do the driving on that one, I'll revert

@@ -180,15 +180,16 @@ public void shouldPushMetricsToBroker(final String recordingLevel) throws Except
final String name = mn.name().replace('-', '.');
final String group = mn.group().replace("-metrics", "").replace('-', '.');
return "org.apache.kafka." + group + "." + name;
}).sorted().collect(Collectors.toList());
}).filter(name -> !name.equals("org.apache.kafka.stream.thread.state"))// telemetry reporter filters out string metrics
.sorted().collect(Collectors.toList());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do not understand this. According to the KIP and the code in ThreadMetrics the type of the metric is numeric.

}

@Test
public void shouldAddThreadStateJMXMetric() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit:

Suggested change
public void shouldAddThreadStateJMXMetric() {
public void shouldAddThreadStateJmxMetric() {

@bbejeck
Copy link
Member Author

bbejeck commented Nov 15, 2024

@mjsax and @cadonna - thanks for the reviews! All comments addressed.

Copy link
Member

@mjsax mjsax left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. Assuming build passes.

Copy link
Member

@cadonna cadonna left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the updates, @bbejeck !

LGTM!

@@ -180,15 +180,16 @@ public void shouldPushMetricsToBroker(final String recordingLevel) throws Except
final String name = mn.name().replace('-', '.');
final String group = mn.group().replace("-metrics", "").replace('-', '.');
return "org.apache.kafka." + group + "." + name;
}).sorted().collect(Collectors.toList());
}).filter(name -> !name.equals("org.apache.kafka.stream.thread.state"))// telemetry reporter filters out string metrics
.sorted().collect(Collectors.toList());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it!
I thought org.apache.kafka.stream.thread.thread.state was a typo and it should actually be org.apache.kafka.stream.thread.state. So I was wondering, why you wanted to exclude the metric you explicitly added for sending it to the broker from the test.

@bbejeck bbejeck merged commit 50c15b9 into apache:trunk Nov 18, 2024
8 checks passed
@bbejeck
Copy link
Member Author

bbejeck commented Nov 18, 2024

Merged #17820 into trunk

@bbejeck bbejeck deleted the KAFKA-17561_add_operator_metrics branch November 18, 2024 15:30
chiacyu pushed a commit to chiacyu/kafka that referenced this pull request Nov 30, 2024
Implementation of KIP-1091 adding operator metrics to Kafka Streams
Updated existing tests to validate added metrics
Reviewers: Bruno Cadonna <[email protected]>, Matthias Sax <[email protected]>
tedyu pushed a commit to tedyu/kafka that referenced this pull request Jan 6, 2025
Implementation of KIP-1091 adding operator metrics to Kafka Streams
Updated existing tests to validate added metrics
Reviewers: Bruno Cadonna <[email protected]>, Matthias Sax <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
kip Requires or implements a KIP streams
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants