Skip to content

Commit 6d13876

Browse files
authored
[FLINK-36583] Add Slow Request Kube Client Metrics (#908)
1 parent 7390375 commit 6d13876

File tree

5 files changed

+158
-4
lines changed

5 files changed

+158
-4
lines changed

docs/layouts/shortcodes/generated/kubernetes_operator_metric_configuration.html

+6
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,12 @@
3232
<td>Boolean</td>
3333
<td>Enable KubernetesClient metrics for measuring the HTTP traffic to the Kubernetes API Server by response code group, e.g. 1xx, 2xx.</td>
3434
</tr>
35+
<tr>
36+
<td><h5>kubernetes.operator.kubernetes.client.metrics.slow.request.threshold</h5></td>
37+
<td style="word-wrap: break-word;">5 s</td>
38+
<td>Duration</td>
39+
<td>Threshold value that triggers slow request counter for Kubernetes client metrics</td>
40+
</tr>
3541
<tr>
3642
<td><h5>kubernetes.operator.metrics.histogram.sample.size</h5></td>
3743
<td style="word-wrap: break-word;">1000</td>

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkOperatorConfiguration.java

+7-1
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
import java.util.Optional;
3939
import java.util.Set;
4040

41+
import static org.apache.flink.kubernetes.operator.metrics.KubernetesOperatorMetricOptions.OPERATOR_KUBERNETES_SLOW_REQUEST_THRESHOLD;
4142
import static org.apache.flink.kubernetes.operator.utils.EnvUtils.ENV_WATCH_NAMESPACES;
4243

4344
/** Configuration class for operator. */
@@ -75,6 +76,7 @@ public class FlinkOperatorConfiguration {
7576
LeaderElectionConfiguration leaderElectionConfiguration;
7677
DeletionPropagation deletionPropagation;
7778
boolean snapshotResourcesEnabled;
79+
Duration slowRequestThreshold;
7880

7981
public static FlinkOperatorConfiguration fromConfiguration(Configuration operatorConfig) {
8082
Duration reconcileInterval =
@@ -190,6 +192,9 @@ public static FlinkOperatorConfiguration fromConfiguration(Configuration operato
190192
boolean snapshotResourcesEnabled =
191193
operatorConfig.get(KubernetesOperatorConfigOptions.SNAPSHOT_RESOURCE_ENABLED);
192194

195+
Duration slowRequestThreshold =
196+
operatorConfig.get(OPERATOR_KUBERNETES_SLOW_REQUEST_THRESHOLD);
197+
193198
return new FlinkOperatorConfiguration(
194199
reconcileInterval,
195200
reconcilerMaxParallelism,
@@ -218,7 +223,8 @@ public static FlinkOperatorConfiguration fromConfiguration(Configuration operato
218223
labelSelector,
219224
getLeaderElectionConfig(operatorConfig),
220225
deletionPropagation,
221-
snapshotResourcesEnabled);
226+
snapshotResourcesEnabled,
227+
slowRequestThreshold);
222228
}
223229

224230
private static GenericRetry getRetryConfig(Configuration conf) {

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/KubernetesClientMetrics.java

+22-3
Original file line numberDiff line numberDiff line change
@@ -31,10 +31,9 @@
3131
import io.fabric8.kubernetes.client.http.HttpRequest;
3232
import io.fabric8.kubernetes.client.http.HttpResponse;
3333
import io.fabric8.kubernetes.client.http.Interceptor;
34-
import org.slf4j.Logger;
35-
import org.slf4j.LoggerFactory;
3634

3735
import java.nio.ByteBuffer;
36+
import java.time.Duration;
3837
import java.util.ArrayList;
3938
import java.util.List;
4039
import java.util.Map;
@@ -48,6 +47,7 @@ public class KubernetesClientMetrics implements Interceptor {
4847
public static final String KUBE_CLIENT_GROUP = "KubeClient";
4948
public static final String HTTP_REQUEST_GROUP = "HttpRequest";
5049
public static final String HTTP_REQUEST_FAILED_GROUP = "Failed";
50+
public static final String HTTP_REQUEST_SLOW_GROUP = "Slow";
5151
public static final String HTTP_RESPONSE_GROUP = "HttpResponse";
5252
public static final String HTTP_RESPONSE_1XX = "1xx";
5353
public static final String HTTP_RESPONSE_2XX = "2xx";
@@ -62,10 +62,12 @@ public class KubernetesClientMetrics implements Interceptor {
6262

6363
private final MetricGroup requestMetricGroup;
6464
private final MetricGroup failedRequestMetricGroup;
65+
private final MetricGroup slowRequestMetricGroup;
6566
private final MetricGroup responseMetricGroup;
6667

6768
private final Counter requestCounter;
6869
private final Counter failedRequestCounter;
70+
private final Counter slowRequestCounter;
6971
private final Counter responseCounter;
7072

7173
private final SynchronizedMeterView requestRateMeter;
@@ -79,7 +81,7 @@ public class KubernetesClientMetrics implements Interceptor {
7981
private final Map<String, Counter> requestMethodCounter = new ConcurrentHashMap<>();
8082
private final LongSupplier nanoTimeSource;
8183

82-
private final Logger logger = LoggerFactory.getLogger(KubernetesClientMetrics.class);
84+
private final Duration slowRequestThreshold;
8385

8486
public KubernetesClientMetrics(
8587
MetricGroup parentGroup, FlinkOperatorConfiguration flinkOperatorConfiguration) {
@@ -95,12 +97,16 @@ public KubernetesClientMetrics(
9597

9698
this.requestMetricGroup = metricGroup.addGroup(HTTP_REQUEST_GROUP);
9799
this.failedRequestMetricGroup = requestMetricGroup.addGroup(HTTP_REQUEST_FAILED_GROUP);
100+
this.slowRequestMetricGroup = requestMetricGroup.addGroup(HTTP_REQUEST_SLOW_GROUP);
98101
this.responseMetricGroup = metricGroup.addGroup(HTTP_RESPONSE_GROUP);
99102

100103
this.requestCounter =
101104
OperatorMetricUtils.synchronizedCounter(requestMetricGroup.counter(COUNTER));
102105
this.failedRequestCounter =
103106
OperatorMetricUtils.synchronizedCounter(failedRequestMetricGroup.counter(COUNTER));
107+
this.slowRequestThreshold = flinkOperatorConfiguration.getSlowRequestThreshold();
108+
this.slowRequestCounter =
109+
OperatorMetricUtils.synchronizedCounter(slowRequestMetricGroup.counter(COUNTER));
104110
this.responseCounter =
105111
OperatorMetricUtils.synchronizedCounter(responseMetricGroup.counter(COUNTER));
106112

@@ -207,6 +213,16 @@ Histogram getResponseLatency() {
207213
return responseLatency;
208214
}
209215

216+
@VisibleForTesting
217+
public Counter getSlowRequestCounter() {
218+
return slowRequestCounter;
219+
}
220+
221+
@VisibleForTesting
222+
public Duration getSlowRequestThreshold() {
223+
return slowRequestThreshold;
224+
}
225+
210226
@VisibleForTesting
211227
SynchronizedMeterView getRequestFailedRateMeter() {
212228
return requestFailedRateMeter;
@@ -236,6 +252,9 @@ private void trackRequestLatency(HttpRequest request) {
236252
final long requestStartNanos = Long.parseLong(header);
237253
final long latency = currentNanos - requestStartNanos;
238254
this.responseLatency.update(latency);
255+
if (latency >= slowRequestThreshold.toNanos()) {
256+
slowRequestCounter.inc();
257+
}
239258
}
240259
}
241260

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/KubernetesOperatorMetricOptions.java

+9
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@
1919

2020
import org.apache.flink.configuration.ConfigOption;
2121

22+
import java.time.Duration;
23+
2224
import static org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions.operatorConfig;
2325

2426
/** Configuration options for metrics. */
@@ -52,6 +54,13 @@ public class KubernetesOperatorMetricOptions {
5254
.withDescription(
5355
"Enable KubernetesClient metrics for measuring the HTTP traffic to the Kubernetes API Server by response code group, e.g. 1xx, 2xx.");
5456

57+
public static final ConfigOption<Duration> OPERATOR_KUBERNETES_SLOW_REQUEST_THRESHOLD =
58+
operatorConfig("kubernetes.client.metrics.slow.request.threshold")
59+
.durationType()
60+
.defaultValue(Duration.ofSeconds(5))
61+
.withDescription(
62+
"Threshold value that triggers slow request counter for Kubernetes client metrics");
63+
5564
public static final ConfigOption<Boolean> OPERATOR_RESOURCE_METRICS_ENABLED =
5665
operatorConfig("resource.metrics.enabled")
5766
.booleanType()

flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/metrics/KubernetesClientMetricsFabric8InterceptorTest.java

+114
Original file line numberDiff line numberDiff line change
@@ -345,6 +345,120 @@ registry, new Configuration(), NAMESPACE, NAME, HOST),
345345
.isEqualTo(1000L);
346346
}
347347

348+
@Test
349+
void shouldTrackResponseAsSlowResponseAboveThreshold() {
350+
// Given
351+
long[] currentTime = {0L};
352+
kubernetesClientMetrics =
353+
new KubernetesClientMetrics(
354+
KubernetesOperatorMetricGroup.create(
355+
registry, new Configuration(), NAMESPACE, NAME, HOST),
356+
FlinkOperatorConfiguration.fromConfiguration(operatorConfig),
357+
() -> currentTime[0]);
358+
final HttpRequest postRequest =
359+
builder.post("application/json", "{}").uri("/random").build();
360+
kubernetesClientMetrics.before(builder, postRequest, emptyTags);
361+
assertThat(kubernetesClientMetrics.getSlowRequestCounter())
362+
.extracting(Counter::getCount)
363+
.isEqualTo(0L);
364+
currentTime[0] += kubernetesClientMetrics.getSlowRequestThreshold().toNanos() + 1L;
365+
366+
// When
367+
kubernetesClientMetrics.after(
368+
postRequest,
369+
new StubHttpResponse(postRequest, Map.of(), 200),
370+
(value, asyncBody) -> {});
371+
372+
// Then
373+
assertThat(kubernetesClientMetrics.getSlowRequestCounter())
374+
.extracting(Counter::getCount)
375+
.isEqualTo(1L);
376+
}
377+
378+
@Test
379+
void shouldNotTrackResponseAsSlowResponseBelowThreshold() {
380+
// Given
381+
long[] currentTime = {0L};
382+
kubernetesClientMetrics =
383+
new KubernetesClientMetrics(
384+
KubernetesOperatorMetricGroup.create(
385+
registry, new Configuration(), NAMESPACE, NAME, HOST),
386+
FlinkOperatorConfiguration.fromConfiguration(operatorConfig),
387+
() -> currentTime[0]);
388+
final HttpRequest postRequest =
389+
builder.post("application/json", "{}").uri("/random").build();
390+
kubernetesClientMetrics.before(builder, postRequest, emptyTags);
391+
assertThat(kubernetesClientMetrics.getSlowRequestCounter())
392+
.extracting(Counter::getCount)
393+
.isEqualTo(0L);
394+
currentTime[0] += kubernetesClientMetrics.getSlowRequestThreshold().toNanos() - 1L;
395+
396+
// When
397+
kubernetesClientMetrics.after(
398+
postRequest,
399+
new StubHttpResponse(postRequest, Map.of(), 200),
400+
(value, asyncBody) -> {});
401+
402+
// Then
403+
assertThat(kubernetesClientMetrics.getSlowRequestCounter())
404+
.extracting(Counter::getCount)
405+
.isEqualTo(0L);
406+
}
407+
408+
@Test
409+
void shouldTrackResponseForFailedConnectionAsSlowResponseAboveThreshold() {
410+
// Given
411+
long[] currentTime = {0L};
412+
kubernetesClientMetrics =
413+
new KubernetesClientMetrics(
414+
KubernetesOperatorMetricGroup.create(
415+
registry, new Configuration(), NAMESPACE, NAME, HOST),
416+
FlinkOperatorConfiguration.fromConfiguration(operatorConfig),
417+
() -> currentTime[0]);
418+
final HttpRequest postRequest =
419+
builder.post("application/json", "{}").uri("/random").build();
420+
kubernetesClientMetrics.before(builder, postRequest, emptyTags);
421+
assertThat(kubernetesClientMetrics.getSlowRequestCounter())
422+
.extracting(Counter::getCount)
423+
.isEqualTo(0L);
424+
currentTime[0] += kubernetesClientMetrics.getSlowRequestThreshold().toNanos() + 1L;
425+
426+
// When
427+
kubernetesClientMetrics.afterConnectionFailure(postRequest, new RuntimeException("kaboom"));
428+
429+
// Then
430+
assertThat(kubernetesClientMetrics.getSlowRequestCounter())
431+
.extracting(Counter::getCount)
432+
.isEqualTo(1L);
433+
}
434+
435+
@Test
436+
void shouldNotTrackResponseForFailedConnectionAsSlowResponseUnderThreshold() {
437+
// Given
438+
long[] currentTime = {0L};
439+
kubernetesClientMetrics =
440+
new KubernetesClientMetrics(
441+
KubernetesOperatorMetricGroup.create(
442+
registry, new Configuration(), NAMESPACE, NAME, HOST),
443+
FlinkOperatorConfiguration.fromConfiguration(operatorConfig),
444+
() -> currentTime[0]);
445+
final HttpRequest postRequest =
446+
builder.post("application/json", "{}").uri("/random").build();
447+
kubernetesClientMetrics.before(builder, postRequest, emptyTags);
448+
assertThat(kubernetesClientMetrics.getSlowRequestCounter())
449+
.extracting(Counter::getCount)
450+
.isEqualTo(0L);
451+
currentTime[0] += kubernetesClientMetrics.getSlowRequestThreshold().toNanos() - 1L;
452+
453+
// When
454+
kubernetesClientMetrics.afterConnectionFailure(postRequest, new RuntimeException("kaboom"));
455+
456+
// Then
457+
assertThat(kubernetesClientMetrics.getSlowRequestCounter())
458+
.extracting(Counter::getCount)
459+
.isEqualTo(0L);
460+
}
461+
348462
@Test
349463
void shouldTrackFailedRequests() {
350464
// Given

0 commit comments

Comments
 (0)