Skip to content

Commit 9c34a1d

Browse files
authored
Add interactive job metrics (#240)
* Add repl metrics Signed-off-by: Louis Chu <[email protected]> * Fix style after rebase main Signed-off-by: Louis Chu <[email protected]> * Rename vars Signed-off-by: Louis Chu <[email protected]> --------- Signed-off-by: Louis Chu <[email protected]>
1 parent 9c15194 commit 9c34a1d

File tree

6 files changed

+213
-13
lines changed

6 files changed

+213
-13
lines changed

flint-core/src/main/java/org/opensearch/flint/core/metrics/MetricConstants.java

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,4 +21,49 @@ public class MetricConstants {
2121
* Similar to OS_READ_METRIC_PREFIX, this constant is used for categorizing and identifying metrics that pertain to write operations.
2222
*/
2323
public static final String OS_WRITE_OP_METRIC_PREFIX = "opensearch.write";
24+
25+
/**
26+
* Metric name for counting the errors encountered with Amazon S3 operations.
27+
*/
28+
public static final String S3_ERR_CNT_METRIC = "s3.error.count";
29+
30+
/**
31+
* Metric name for counting the number of sessions currently running.
32+
*/
33+
public static final String REPL_RUNNING_METRIC = "session.running.count";
34+
35+
/**
36+
* Metric name for counting the number of sessions that have failed.
37+
*/
38+
public static final String REPL_FAILED_METRIC = "session.failed.count";
39+
40+
/**
41+
* Metric name for counting the number of sessions that have successfully completed.
42+
*/
43+
public static final String REPL_SUCCESS_METRIC = "session.success.count";
44+
45+
/**
46+
* Metric name for tracking the processing time of sessions.
47+
*/
48+
public static final String REPL_PROCESSING_TIME_METRIC = "session.processingTime";
49+
50+
/**
51+
* Metric name for counting the number of statements currently running.
52+
*/
53+
public static final String STATEMENT_RUNNING_METRIC = "statement.running.count";
54+
55+
/**
56+
* Metric name for counting the number of statements that have failed.
57+
*/
58+
public static final String STATEMENT_FAILED_METRIC = "statement.failed.count";
59+
60+
/**
61+
* Metric name for counting the number of statements that have successfully completed.
62+
*/
63+
public static final String STATEMENT_SUCCESS_METRIC = "statement.success.count";
64+
65+
/**
66+
* Metric name for tracking the processing time of statements.
67+
*/
68+
public static final String STATEMENT_PROCESSING_TIME_METRIC = "STATEMENT.processingTime";
2469
}

flint-core/src/main/java/org/opensearch/flint/core/metrics/MetricsUtil.java

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
package org.opensearch.flint.core.metrics;
77

88
import com.codahale.metrics.Counter;
9+
import com.codahale.metrics.Timer;
910
import org.apache.spark.SparkEnv;
1011
import org.apache.spark.metrics.source.FlintMetricSource;
1112
import org.apache.spark.metrics.source.Source;
@@ -38,6 +39,47 @@ public static void incrementCounter(String metricName) {
3839
}
3940
}
4041

42+
/**
43+
* Decrements the value of the specified metric counter by one, if the counter exists and its current count is greater than zero.
44+
*
45+
* @param metricName The name of the metric counter to be decremented.
46+
*/
47+
public static void decrementCounter(String metricName) {
48+
Counter counter = getOrCreateCounter(metricName);
49+
if (counter != null && counter.getCount() > 0) {
50+
counter.dec();
51+
}
52+
}
53+
54+
/**
55+
* Retrieves a {@link Timer.Context} for the specified metric name, creating a new timer if one does not already exist.
56+
* This context can be used to measure the duration of a particular operation or event.
57+
*
58+
* @param metricName The name of the metric timer to retrieve the context for.
59+
* @return A {@link Timer.Context} instance for timing operations, or {@code null} if the timer could not be created or retrieved.
60+
*/
61+
public static Timer.Context getTimerContext(String metricName) {
62+
Timer timer = getOrCreateTimer(metricName);
63+
if (timer != null) {
64+
return timer.time();
65+
}
66+
return null;
67+
}
68+
69+
/**
70+
* Stops the timer associated with the given {@link Timer.Context}, effectively recording the elapsed time since the timer was started
71+
* and returning the duration. If the context is {@code null}, this method does nothing and returns {@code null}.
72+
*
73+
* @param context The {@link Timer.Context} to stop. May be {@code null}, in which case this method has no effect and returns {@code null}.
74+
* @return The elapsed time in nanoseconds since the timer was started, or {@code null} if the context was {@code null}.
75+
*/
76+
public static Long stopTimer(Timer.Context context) {
77+
if (context != null) {
78+
return context.stop();
79+
}
80+
return null;
81+
}
82+
4183
// Retrieves or creates a new counter for the given metric name
4284
private static Counter getOrCreateCounter(String metricName) {
4385
SparkEnv sparkEnv = SparkEnv.get();
@@ -54,6 +96,22 @@ private static Counter getOrCreateCounter(String metricName) {
5496
return counter;
5597
}
5698

99+
// Retrieves or creates a new Timer for the given metric name
100+
private static Timer getOrCreateTimer(String metricName) {
101+
SparkEnv sparkEnv = SparkEnv.get();
102+
if (sparkEnv == null) {
103+
LOG.warning("Spark environment not available, cannot instrument metric: " + metricName);
104+
return null;
105+
}
106+
107+
FlintMetricSource flintMetricSource = getOrInitFlintMetricSource(sparkEnv);
108+
Timer timer = flintMetricSource.metricRegistry().getTimers().get(metricName);
109+
if (timer == null) {
110+
timer = flintMetricSource.metricRegistry().timer(metricName);
111+
}
112+
return timer;
113+
}
114+
57115
// Gets or initializes the FlintMetricSource
58116
private static FlintMetricSource getOrInitFlintMetricSource(SparkEnv sparkEnv) {
59117
Seq<Source> metricSourceSeq = sparkEnv.metricsSystem().getSourcesByName(FlintMetricSource.FLINT_METRIC_SOURCE_NAME());

flint-core/src/test/java/org/opensearch/flint/core/metrics/MetricsUtilTest.java

Lines changed: 44 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,17 @@
11
package org.opensearch.flint.core.metrics;
22

3+
import com.codahale.metrics.Counter;
4+
import com.codahale.metrics.Timer;
35
import org.apache.spark.SparkEnv;
46
import org.apache.spark.metrics.source.FlintMetricSource;
57
import org.junit.Test;
68
import org.junit.jupiter.api.Assertions;
79
import org.mockito.MockedStatic;
810
import org.mockito.Mockito;
911

12+
import java.util.concurrent.TimeUnit;
13+
14+
import static org.junit.Assert.assertEquals;
1015
import static org.mockito.ArgumentMatchers.any;
1116
import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
1217
import static org.mockito.Mockito.mock;
@@ -18,7 +23,34 @@
1823
public class MetricsUtilTest {
1924

2025
@Test
21-
public void incOpenSearchMetric() {
26+
public void testIncrementDecrementCounter() {
27+
try (MockedStatic<SparkEnv> sparkEnvMock = mockStatic(SparkEnv.class)) {
28+
// Mock SparkEnv
29+
SparkEnv sparkEnv = mock(SparkEnv.class, RETURNS_DEEP_STUBS);
30+
sparkEnvMock.when(SparkEnv::get).thenReturn(sparkEnv);
31+
32+
// Mock FlintMetricSource
33+
FlintMetricSource flintMetricSource = Mockito.spy(new FlintMetricSource());
34+
when(sparkEnv.metricsSystem().getSourcesByName(FlintMetricSource.FLINT_METRIC_SOURCE_NAME()).head())
35+
.thenReturn(flintMetricSource);
36+
37+
// Test the methods
38+
String testMetric = "testPrefix.2xx.count";
39+
MetricsUtil.incrementCounter(testMetric);
40+
MetricsUtil.incrementCounter(testMetric);
41+
MetricsUtil.decrementCounter(testMetric);
42+
43+
// Verify interactions
44+
verify(sparkEnv.metricsSystem(), times(0)).registerSource(any());
45+
verify(flintMetricSource, times(4)).metricRegistry();
46+
Counter counter = flintMetricSource.metricRegistry().getCounters().get(testMetric);
47+
Assertions.assertNotNull(counter);
48+
Assertions.assertEquals(counter.getCount(), 1);
49+
}
50+
}
51+
52+
@Test
53+
public void testStartStopTimer() {
2254
try (MockedStatic<SparkEnv> sparkEnvMock = mockStatic(SparkEnv.class)) {
2355
// Mock SparkEnv
2456
SparkEnv sparkEnv = mock(SparkEnv.class, RETURNS_DEEP_STUBS);
@@ -29,14 +61,21 @@ public void incOpenSearchMetric() {
2961
when(sparkEnv.metricsSystem().getSourcesByName(FlintMetricSource.FLINT_METRIC_SOURCE_NAME()).head())
3062
.thenReturn(flintMetricSource);
3163

32-
// Test the method
33-
MetricsUtil.incrementCounter("testPrefix.2xx.count");
64+
// Test the methods
65+
String testMetric = "testPrefix.processingTime";
66+
Timer.Context context = MetricsUtil.getTimerContext(testMetric);
67+
TimeUnit.MILLISECONDS.sleep(500);
68+
MetricsUtil.stopTimer(context);
3469

3570
// Verify interactions
3671
verify(sparkEnv.metricsSystem(), times(0)).registerSource(any());
3772
verify(flintMetricSource, times(2)).metricRegistry();
38-
Assertions.assertNotNull(
39-
flintMetricSource.metricRegistry().getCounters().get("testPrefix.2xx.count"));
73+
Timer timer = flintMetricSource.metricRegistry().getTimers().get(testMetric);
74+
Assertions.assertNotNull(timer);
75+
Assertions.assertEquals(timer.getCount(), 1L);
76+
assertEquals(1.9, timer.getMeanRate(), 0.1);
77+
} catch (InterruptedException e) {
78+
throw new RuntimeException(e);
4079
}
4180
}
4281
}

spark-sql-application/src/main/scala/org/apache/spark/sql/FlintJobExecutor.scala

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@ import scala.concurrent.duration.{Duration, MINUTES}
1313
import com.amazonaws.services.s3.model.AmazonS3Exception
1414
import org.opensearch.flint.core.FlintClient
1515
import org.opensearch.flint.core.metadata.FlintMetadata
16+
import org.opensearch.flint.core.metrics.MetricConstants
17+
import org.opensearch.flint.core.metrics.MetricsUtil.incrementCounter
1618
import play.api.libs.json.{JsArray, JsBoolean, JsObject, Json, JsString, JsValue}
1719

1820
import org.apache.spark.{SparkConf, SparkException}
@@ -401,6 +403,7 @@ trait FlintJobExecutor {
401403
case r: ParseException =>
402404
handleQueryException(r, "Syntax error", spark, dataSource, query, queryId, sessionId)
403405
case r: AmazonS3Exception =>
406+
incrementCounter(MetricConstants.S3_ERR_CNT_METRIC)
404407
handleQueryException(
405408
r,
406409
"Fail to read data from S3. Cause",

0 commit comments

Comments
 (0)