Skip to content

Commit b47386e

Browse files
committed
Add flint opensearch metrics
Signed-off-by: Louis Chu <[email protected]>
1 parent d289984 commit b47386e

File tree

18 files changed

+476
-90
lines changed

18 files changed

+476
-90
lines changed

build.sbt

+2-1
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,8 @@ lazy val flintCore = (project in file("flint-core"))
6767
"org.scalatest" %% "scalatest-flatspec" % "3.2.15" % "test",
6868
"org.scalatestplus" %% "mockito-4-6" % "3.2.15.0" % "test",
6969
"com.stephenn" %% "scalatest-json-jsonassert" % "0.2.5" % "test",
70-
"org.mockito" % "mockito-core" % "2.23.0" % "test",
70+
"org.mockito" % "mockito-core" % "4.6.1" % "test",
71+
"org.mockito" % "mockito-inline" % "4.6.1" % "test",
7172
"org.mockito" % "mockito-junit-jupiter" % "3.12.4" % "test",
7273
"org.junit.jupiter" % "junit-jupiter-api" % "5.9.0" % "test",
7374
"org.junit.jupiter" % "junit-jupiter-engine" % "5.9.0" % "test",
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package org.opensearch.flint.core;
7+
8+
import org.opensearch.action.bulk.BulkRequest;
9+
import org.opensearch.action.bulk.BulkResponse;
10+
import org.opensearch.action.delete.DeleteRequest;
11+
import org.opensearch.action.delete.DeleteResponse;
12+
import org.opensearch.action.get.GetRequest;
13+
import org.opensearch.action.get.GetResponse;
14+
import org.opensearch.action.index.IndexRequest;
15+
import org.opensearch.action.index.IndexResponse;
16+
import org.opensearch.action.search.ClearScrollRequest;
17+
import org.opensearch.action.search.ClearScrollResponse;
18+
import org.opensearch.action.search.SearchRequest;
19+
import org.opensearch.action.search.SearchResponse;
20+
import org.opensearch.action.search.SearchScrollRequest;
21+
import org.opensearch.action.update.UpdateRequest;
22+
import org.opensearch.action.DocWriteResponse;
23+
import org.opensearch.client.indices.CreateIndexRequest;
24+
import org.opensearch.client.indices.CreateIndexResponse;
25+
import org.opensearch.client.indices.GetIndexRequest;
26+
import org.opensearch.client.indices.GetIndexResponse;
27+
import org.opensearch.action.admin.indices.delete.DeleteIndexRequest;
28+
import org.opensearch.client.RequestOptions;
29+
30+
import java.io.IOException;
31+
32+
/**
33+
* Interface for wrapping the OpenSearch High Level REST Client with additional functionality,
34+
* such as metrics tracking.
35+
*/
36+
public interface IRestHighLevelClient {
37+
38+
BulkResponse bulk(BulkRequest bulkRequest, RequestOptions options) throws IOException;
39+
40+
ClearScrollResponse clearScroll(ClearScrollRequest clearScrollRequest, RequestOptions options) throws IOException;
41+
42+
CreateIndexResponse createIndex(CreateIndexRequest createIndexRequest, RequestOptions options) throws IOException;
43+
44+
void deleteIndex(DeleteIndexRequest deleteIndexRequest, RequestOptions options) throws IOException;
45+
46+
DeleteResponse delete(DeleteRequest deleteRequest, RequestOptions options) throws IOException;
47+
48+
GetResponse get(GetRequest getRequest, RequestOptions options) throws IOException;
49+
50+
GetIndexResponse getIndex(GetIndexRequest getIndexRequest, RequestOptions options) throws IOException;
51+
52+
IndexResponse index(IndexRequest indexRequest, RequestOptions options) throws IOException;
53+
54+
Boolean isIndexExists(GetIndexRequest getIndexRequest, RequestOptions options) throws IOException;
55+
56+
SearchResponse search(SearchRequest searchRequest, RequestOptions options) throws IOException;
57+
58+
SearchResponse scroll(SearchScrollRequest searchScrollRequest, RequestOptions options) throws IOException;
59+
60+
DocWriteResponse update(UpdateRequest updateRequest, RequestOptions options) throws IOException;
61+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,165 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package org.opensearch.flint.core;
7+
8+
import org.opensearch.action.admin.indices.delete.DeleteIndexRequest;
9+
import org.opensearch.action.bulk.BulkRequest;
10+
import org.opensearch.action.bulk.BulkResponse;
11+
import org.opensearch.action.delete.DeleteRequest;
12+
import org.opensearch.action.delete.DeleteResponse;
13+
import org.opensearch.action.get.GetRequest;
14+
import org.opensearch.action.get.GetResponse;
15+
import org.opensearch.action.index.IndexRequest;
16+
import org.opensearch.action.index.IndexResponse;
17+
import org.opensearch.action.search.ClearScrollRequest;
18+
import org.opensearch.action.search.ClearScrollResponse;
19+
import org.opensearch.action.search.SearchRequest;
20+
import org.opensearch.action.search.SearchResponse;
21+
import org.opensearch.action.search.SearchScrollRequest;
22+
import org.opensearch.action.update.UpdateRequest;
23+
import org.opensearch.OpenSearchException;
24+
import org.opensearch.action.update.UpdateResponse;
25+
import org.opensearch.client.RequestOptions;
26+
import org.opensearch.client.RestHighLevelClient;
27+
import org.opensearch.client.indices.CreateIndexRequest;
28+
import org.opensearch.client.indices.CreateIndexResponse;
29+
import org.opensearch.client.indices.GetIndexRequest;
30+
import org.opensearch.client.indices.GetIndexResponse;
31+
import org.opensearch.flint.core.metrics.MetricsUtil;
32+
33+
import java.io.Closeable;
34+
import java.io.IOException;
35+
36+
import static org.opensearch.flint.core.metrics.MetricConstants.*;
37+
38+
/**
39+
* A wrapper class for RestHighLevelClient to facilitate OpenSearch operations
40+
* with integrated metrics tracking.
41+
*/
42+
public class RestHighLevelClientWrapper implements IRestHighLevelClient, Closeable {
43+
private final RestHighLevelClient client;
44+
45+
/**
46+
* Constructs a new RestHighLevelClientWrapper.
47+
*
48+
* @param client the RestHighLevelClient instance to wrap
49+
*/
50+
public RestHighLevelClientWrapper(RestHighLevelClient client) {
51+
this.client = client;
52+
}
53+
54+
@Override
55+
public BulkResponse bulk(BulkRequest bulkRequest, RequestOptions options) throws IOException {
56+
return execute(OS_WRITE_OP_METRIC_PREFIX, () -> client.bulk(bulkRequest, options));
57+
}
58+
59+
@Override
60+
public ClearScrollResponse clearScroll(ClearScrollRequest clearScrollRequest, RequestOptions options) throws IOException {
61+
return execute(OS_READ_OP_METRIC_PREFIX, () -> client.clearScroll(clearScrollRequest, options));
62+
}
63+
64+
@Override
65+
public CreateIndexResponse createIndex(CreateIndexRequest createIndexRequest, RequestOptions options) throws IOException {
66+
return execute(OS_WRITE_OP_METRIC_PREFIX, () -> client.indices().create(createIndexRequest, options));
67+
}
68+
69+
@Override
70+
public void deleteIndex(DeleteIndexRequest deleteIndexRequest, RequestOptions options) throws IOException {
71+
execute(OS_WRITE_OP_METRIC_PREFIX, () -> client.indices().delete(deleteIndexRequest, options));
72+
}
73+
74+
@Override
75+
public DeleteResponse delete(DeleteRequest deleteRequest, RequestOptions options) throws IOException {
76+
return execute(OS_WRITE_OP_METRIC_PREFIX, () -> client.delete(deleteRequest, options));
77+
}
78+
79+
@Override
80+
public GetResponse get(GetRequest getRequest, RequestOptions options) throws IOException {
81+
return execute(OS_READ_OP_METRIC_PREFIX, () -> client.get(getRequest, options));
82+
}
83+
84+
@Override
85+
public GetIndexResponse getIndex(GetIndexRequest getIndexRequest, RequestOptions options) throws IOException {
86+
return execute(OS_READ_OP_METRIC_PREFIX, () -> client.indices().get(getIndexRequest, options));
87+
}
88+
89+
@Override
90+
public IndexResponse index(IndexRequest indexRequest, RequestOptions options) throws IOException {
91+
return execute(OS_WRITE_OP_METRIC_PREFIX, () -> client.index(indexRequest, options));
92+
}
93+
94+
@Override
95+
public Boolean isIndexExists(GetIndexRequest getIndexRequest, RequestOptions options) throws IOException {
96+
return execute(OS_READ_OP_METRIC_PREFIX, () -> client.indices().exists(getIndexRequest, options));
97+
}
98+
99+
@Override
100+
public SearchResponse search(SearchRequest searchRequest, RequestOptions options) throws IOException {
101+
return execute(OS_READ_OP_METRIC_PREFIX, () -> client.search(searchRequest, options));
102+
}
103+
104+
@Override
105+
public SearchResponse scroll(SearchScrollRequest searchScrollRequest, RequestOptions options) throws IOException {
106+
return execute(OS_READ_OP_METRIC_PREFIX, () -> client.scroll(searchScrollRequest, options));
107+
}
108+
109+
@Override
110+
public UpdateResponse update(UpdateRequest updateRequest, RequestOptions options) throws IOException {
111+
return execute(OS_WRITE_OP_METRIC_PREFIX, () -> client.update(updateRequest, options));
112+
}
113+
/**
114+
* Executes a given operation, tracks metrics, and handles exceptions.
115+
*
116+
* @param metricNamePrefix the prefix for the metric name
117+
* @param operation the operation to execute
118+
* @param <T> the return type of the operation
119+
* @return the result of the operation
120+
* @throws IOException if an I/O exception occurs
121+
*/
122+
private <T> T execute(String metricNamePrefix, IOCallable<T> operation) throws IOException {
123+
try {
124+
T result = operation.call();
125+
MetricsUtil.publishOpenSearchMetric(metricNamePrefix, 200);
126+
return result;
127+
} catch (Exception e) {
128+
OpenSearchException openSearchException = extractOpenSearchException(e);
129+
int statusCode = openSearchException != null ? openSearchException.status().getStatus() : 500;
130+
MetricsUtil.publishOpenSearchMetric(metricNamePrefix, statusCode);
131+
throw e;
132+
}
133+
}
134+
135+
/**
136+
* Extracts an OpenSearchException from the given Throwable.
137+
* Checks if the Throwable is an instance of OpenSearchException or caused by one.
138+
*
139+
* @param ex the exception to be checked
140+
* @return the extracted OpenSearchException, or null if not found
141+
*/
142+
private OpenSearchException extractOpenSearchException(Throwable ex) {
143+
if (ex instanceof OpenSearchException) {
144+
return (OpenSearchException) ex;
145+
} else if (ex.getCause() instanceof OpenSearchException) {
146+
return (OpenSearchException) ex.getCause();
147+
}
148+
return null;
149+
}
150+
151+
/**
152+
* Functional interface for operations that can throw IOException.
153+
*
154+
* @param <T> the return type of the operation
155+
*/
156+
@FunctionalInterface
157+
private interface IOCallable<T> {
158+
T call() throws IOException;
159+
}
160+
161+
@Override
162+
public void close() throws IOException {
163+
client.close();
164+
}
165+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package org.opensearch.flint.core.metrics;
7+
8+
/**
9+
* This class defines custom metric constants used for monitoring flint operations.
10+
*/
11+
public class MetricConstants {
12+
13+
/**
14+
* The prefix for all read-related metrics in OpenSearch.
15+
* This constant is used as a part of metric names to categorize and identify metrics related to read operations.
16+
*/
17+
public static final String OS_READ_OP_METRIC_PREFIX = "opensearch.read";
18+
19+
/**
20+
* The prefix for all write-related metrics in OpenSearch.
21+
* Similar to OS_READ_METRIC_PREFIX, this constant is used for categorizing and identifying metrics that pertain to write operations.
22+
*/
23+
public static final String OS_WRITE_OP_METRIC_PREFIX = "opensearch.write";
24+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package org.opensearch.flint.core.metrics;
7+
8+
import com.codahale.metrics.Counter;
9+
import org.apache.spark.SparkEnv;
10+
import org.apache.spark.metrics.source.FlintMetricSource;
11+
import org.apache.spark.metrics.source.Source;
12+
import org.opensearch.OpenSearchException;
13+
import scala.collection.Seq;
14+
15+
import java.util.logging.Logger;
16+
17+
/**
18+
* Utility class for managing metrics in the OpenSearch Flint context.
19+
*/
20+
public final class MetricsUtil {
21+
22+
private static final Logger LOG = Logger.getLogger(MetricsUtil.class.getName());
23+
24+
// Private constructor to prevent instantiation
25+
private MetricsUtil() {
26+
}
27+
28+
/**
29+
* Publish an OpenSearch metric based on the status code.
30+
*
31+
* @param metricNamePrefix the prefix for the metric name
32+
* @param statusCode the HTTP status code
33+
*/
34+
public static void publishOpenSearchMetric(String metricNamePrefix, int statusCode) {
35+
String metricName = constructMetricName(metricNamePrefix, statusCode);
36+
Counter counter = getOrCreateCounter(metricName);
37+
if (counter != null) {
38+
counter.inc();
39+
}
40+
}
41+
42+
// Constructs the metric name based on the provided prefix and status code
43+
private static String constructMetricName(String metricNamePrefix, int statusCode) {
44+
String metricSuffix = getMetricSuffixForStatusCode(statusCode);
45+
return metricNamePrefix + "." + metricSuffix;
46+
}
47+
48+
// Determines the metric suffix based on the HTTP status code
49+
private static String getMetricSuffixForStatusCode(int statusCode) {
50+
if (statusCode == 403) {
51+
return "403.count";
52+
} else if (statusCode >= 500) {
53+
return "5xx.count";
54+
} else if (statusCode >= 400) {
55+
return "4xx.count";
56+
} else if (statusCode >= 200) {
57+
return "2xx.count";
58+
}
59+
return "unknown.count"; // default for unhandled status codes
60+
}
61+
62+
// Retrieves or creates a new counter for the given metric name
63+
private static Counter getOrCreateCounter(String metricName) {
64+
SparkEnv sparkEnv = SparkEnv.get();
65+
if (sparkEnv == null) {
66+
LOG.warning("Spark environment not available, cannot instrument metric: " + metricName);
67+
return null;
68+
}
69+
70+
FlintMetricSource flintMetricSource = getOrInitFlintMetricSource(sparkEnv);
71+
Counter counter = flintMetricSource.metricRegistry().getCounters().get(metricName);
72+
if (counter == null) {
73+
counter = flintMetricSource.metricRegistry().counter(metricName);
74+
}
75+
return counter;
76+
}
77+
78+
// Gets or initializes the FlintMetricSource
79+
private static FlintMetricSource getOrInitFlintMetricSource(SparkEnv sparkEnv) {
80+
Seq<Source> metricSourceSeq = sparkEnv.metricsSystem().getSourcesByName(FlintMetricSource.FLINT_METRIC_SOURCE_NAME());
81+
82+
if (metricSourceSeq == null || metricSourceSeq.isEmpty()) {
83+
FlintMetricSource metricSource = new FlintMetricSource();
84+
sparkEnv.metricsSystem().registerSource(metricSource);
85+
return metricSource;
86+
}
87+
return (FlintMetricSource) metricSourceSeq.head();
88+
}
89+
}

0 commit comments

Comments
 (0)