Skip to content

Commit dbb66f0

Browse files
ansjcydzane17
andauthored
asynchronous search operations in reader (#363)
* Add metric labels to historical data (#326) Signed-off-by: David Zane <[email protected]> Signed-off-by: Chenyang Ji <[email protected]> * asynchronous search operations in reader (#344) * Use async operations in reader Signed-off-by: Chenyang Ji <[email protected]> * add unit tests Signed-off-by: Chenyang Ji <[email protected]> * refactor the code with common util functions and complete unit tests Signed-off-by: Chenyang Ji <[email protected]> * refactor the logic to build index name from range Signed-off-by: Chenyang Ji <[email protected]> * use constants over string literals Signed-off-by: Chenyang Ji <[email protected]> * handle context in a better way and fix a bug result in duplicate records Signed-off-by: Chenyang Ji <[email protected]> --------- Signed-off-by: Chenyang Ji <[email protected]> --------- Signed-off-by: David Zane <[email protected]> Signed-off-by: Chenyang Ji <[email protected]> Co-authored-by: David Zane <[email protected]>
1 parent 54badcc commit dbb66f0

29 files changed

+2652
-381
lines changed

src/main/java/org/opensearch/plugin/insights/core/exporter/LocalIndexExporter.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@
88

99
package org.opensearch.plugin.insights.core.exporter;
1010

11-
import static org.opensearch.plugin.insights.core.utils.ExporterReaderUtils.generateLocalIndexDateHash;
1211
import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.DEFAULT_DELETE_AFTER_VALUE;
1312
import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.DEFAULT_TEMPLATE_PRIORITY;
1413
import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.TOP_QUERIES_INDEX_PATTERN_GLOB;
@@ -47,6 +46,7 @@
4746
import org.opensearch.index.IndexNotFoundException;
4847
import org.opensearch.plugin.insights.core.metrics.OperationalMetric;
4948
import org.opensearch.plugin.insights.core.metrics.OperationalMetricsCounter;
49+
import org.opensearch.plugin.insights.core.utils.IndexDiscoveryHelper;
5050
import org.opensearch.plugin.insights.rules.model.SearchQueryRecord;
5151

5252
/**
@@ -218,7 +218,7 @@ void bulk(final String indexName, final List<SearchQueryRecord> records) throws
218218
for (SearchQueryRecord record : records) {
219219
bulkRequestBuilder.add(
220220
new IndexRequest(indexName).id(record.getId())
221-
.source(record.toXContent(XContentFactory.jsonBuilder(), ToXContent.EMPTY_PARAMS))
221+
.source(record.toXContentForExport(XContentFactory.jsonBuilder(), ToXContent.EMPTY_PARAMS))
222222
);
223223
}
224224
bulkRequestBuilder.execute(new ActionListener<BulkResponse>() {
@@ -248,7 +248,7 @@ public void close() {
248248
*/
249249
String buildLocalIndexName() {
250250
ZonedDateTime currentTime = ZonedDateTime.now(ZoneOffset.UTC);
251-
return indexPattern.format(currentTime) + "-" + generateLocalIndexDateHash(currentTime.toLocalDate());
251+
return IndexDiscoveryHelper.buildLocalIndexName(indexPattern, currentTime);
252252
}
253253

254254
/**

src/main/java/org/opensearch/plugin/insights/core/listener/QueryInsightsListener.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88

99
package org.opensearch.plugin.insights.core.listener;
1010

11+
import static org.opensearch.plugin.insights.rules.model.SearchQueryRecord.DEFAULT_TOP_N_QUERY_MAP;
1112
import static org.opensearch.plugin.insights.settings.QueryCategorizationSettings.SEARCH_QUERY_METRICS_ENABLED_SETTING;
1213
import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.TOP_N_QUERIES_GROUPING_FIELD_NAME;
1314
import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.TOP_N_QUERIES_GROUPING_FIELD_TYPE;
@@ -267,6 +268,7 @@ private void constructSearchQueryRecord(final SearchPhaseContext context, final
267268
attributes.put(Attribute.TASK_RESOURCE_USAGES, tasksResourceUsages);
268269
attributes.put(Attribute.GROUP_BY, QueryInsightsSettings.DEFAULT_GROUPING_TYPE);
269270
attributes.put(Attribute.NODE_ID, clusterService.localNode().getId());
271+
attributes.put(Attribute.TOP_N_QUERY, new HashMap<>(DEFAULT_TOP_N_QUERY_MAP));
270272

271273
if (queryInsightsService.isGroupingEnabled() || log.isTraceEnabled()) {
272274
// Generate the query shape only if grouping is enabled or trace logging is enabled

src/main/java/org/opensearch/plugin/insights/core/metrics/OperationalMetric.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212

1313
public enum OperationalMetric {
1414
LOCAL_INDEX_READER_PARSING_EXCEPTIONS("Number of errors when parsing with LocalIndexReader"),
15+
LOCAL_INDEX_READER_SEARCH_EXCEPTIONS("Number of errors when searching with LocalIndexReader"),
1516
LOCAL_INDEX_EXPORTER_BULK_FAILURES("Number of failures when ingesting Query Insights data to local indices"),
1617
LOCAL_INDEX_EXPORTER_DELETE_FAILURES("Number of failures when deleting local indices"),
1718
LOCAL_INDEX_EXPORTER_EXCEPTIONS("Number of exceptions in Query Insights LocalIndexExporter"),

src/main/java/org/opensearch/plugin/insights/core/reader/LocalIndexReader.java

Lines changed: 71 additions & 67 deletions
Original file line numberDiff line numberDiff line change
@@ -8,38 +8,27 @@
88

99
package org.opensearch.plugin.insights.core.reader;
1010

11-
import static org.opensearch.plugin.insights.core.utils.ExporterReaderUtils.generateLocalIndexDateHash;
12-
import static org.opensearch.plugin.insights.rules.model.SearchQueryRecord.VERBOSE_ONLY_FIELDS;
13-
1411
import java.time.ZoneOffset;
1512
import java.time.ZonedDateTime;
1613
import java.time.format.DateTimeFormatter;
1714
import java.util.ArrayList;
18-
import java.util.Arrays;
15+
import java.util.Collections;
1916
import java.util.List;
2017
import org.apache.logging.log4j.LogManager;
2118
import org.apache.logging.log4j.Logger;
2219
import org.opensearch.action.search.SearchRequest;
2320
import org.opensearch.action.search.SearchResponse;
2421
import org.opensearch.client.Client;
25-
import org.opensearch.common.xcontent.LoggingDeprecationHandler;
26-
import org.opensearch.common.xcontent.XContentType;
27-
import org.opensearch.core.common.Strings;
22+
import org.opensearch.core.action.ActionListener;
2823
import org.opensearch.core.xcontent.NamedXContentRegistry;
29-
import org.opensearch.core.xcontent.XContentParser;
30-
import org.opensearch.index.IndexNotFoundException;
31-
import org.opensearch.index.query.BoolQueryBuilder;
32-
import org.opensearch.index.query.MatchQueryBuilder;
33-
import org.opensearch.index.query.QueryBuilders;
34-
import org.opensearch.index.query.RangeQueryBuilder;
3524
import org.opensearch.plugin.insights.core.metrics.OperationalMetric;
3625
import org.opensearch.plugin.insights.core.metrics.OperationalMetricsCounter;
37-
import org.opensearch.plugin.insights.rules.model.Attribute;
26+
import org.opensearch.plugin.insights.core.utils.IndexDiscoveryHelper;
27+
import org.opensearch.plugin.insights.core.utils.QueryInsightsQueryBuilder;
28+
import org.opensearch.plugin.insights.core.utils.SearchResponseParser;
29+
import org.opensearch.plugin.insights.rules.model.MetricType;
3830
import org.opensearch.plugin.insights.rules.model.SearchQueryRecord;
39-
import org.opensearch.search.SearchHit;
40-
import org.opensearch.search.builder.SearchSourceBuilder;
41-
import org.opensearch.search.sort.SortBuilders;
42-
import org.opensearch.search.sort.SortOrder;
31+
import reactor.util.annotation.NonNull;
4332

4433
/**
4534
* Local index reader for reading query insights data from local OpenSearch indices.
@@ -102,64 +91,83 @@ public LocalIndexReader setIndexPattern(DateTimeFormatter indexPattern) {
10291
/**
10392
* Export a list of SearchQueryRecord from local index
10493
*
105-
* @param from start timestamp
106-
* @param to end timestamp
107-
* @param id query/group id
108-
* @param verbose whether to return full output
109-
* @return list of SearchQueryRecords whose timestamps fall between from and to
94+
* @param from start timestamp
95+
* @param to end timestamp
96+
* @param id query/group id
97+
* @param verbose whether to return full output
98+
* @param metricType metric type to read
11099
*/
111100
@Override
112-
public List<SearchQueryRecord> read(final String from, final String to, final String id, final Boolean verbose) {
113-
List<SearchQueryRecord> records = new ArrayList<>();
101+
public void read(
102+
final String from,
103+
final String to,
104+
final String id,
105+
final Boolean verbose,
106+
@NonNull final MetricType metricType,
107+
final ActionListener<List<SearchQueryRecord>> listener
108+
) {
109+
// Validate input parameters
114110
if (from == null || to == null) {
115-
return records;
111+
listener.onResponse(new ArrayList<>());
112+
return;
116113
}
114+
115+
// Parse and validate date range
117116
final ZonedDateTime start = ZonedDateTime.parse(from);
118117
ZonedDateTime end = ZonedDateTime.parse(to);
119118
ZonedDateTime now = ZonedDateTime.now(ZoneOffset.UTC);
120119
if (end.isAfter(now)) {
121120
end = now;
122121
}
123-
ZonedDateTime curr = start;
124-
// TODO: send single search request instead of one per index
125-
while (curr.isBefore(end.plusDays(1).toLocalDate().atStartOfDay(end.getZone()))) {
126-
String indexName = buildLocalIndexName(curr);
127-
SearchRequest searchRequest = new SearchRequest(indexName);
128-
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder().size(MAX_TOP_N_INDEX_READ_SIZE);
129-
MatchQueryBuilder excludeQuery = QueryBuilders.matchQuery("indices", "top_queries*");
130-
RangeQueryBuilder rangeQuery = QueryBuilders.rangeQuery("timestamp")
131-
.from(start.toInstant().toEpochMilli())
132-
.to(end.toInstant().toEpochMilli());
133-
BoolQueryBuilder query = QueryBuilders.boolQuery().must(rangeQuery).mustNot(excludeQuery);
134-
135-
if (id != null) {
136-
query.must(QueryBuilders.matchQuery("id", id));
122+
final ZonedDateTime finalEnd = end;
123+
124+
// Discover indices in the date range
125+
IndexDiscoveryHelper.discoverIndicesInDateRange(client, indexPattern, start, finalEnd, new ActionListener<List<String>>() {
126+
@Override
127+
public void onResponse(List<String> indexNames) {
128+
if (indexNames.isEmpty()) {
129+
listener.onResponse(Collections.emptyList());
130+
return;
131+
}
132+
133+
// Build and execute search request
134+
executeSearchRequest(indexNames, start, finalEnd, id, verbose, metricType, listener);
137135
}
138-
searchSourceBuilder.query(query);
139-
if (Boolean.FALSE.equals(verbose)) {
140-
// Exclude these fields
141-
searchSourceBuilder.fetchSource(
142-
Strings.EMPTY_ARRAY,
143-
Arrays.stream(VERBOSE_ONLY_FIELDS).map(Attribute::toString).toArray(String[]::new)
144-
);
136+
137+
@Override
138+
public void onFailure(Exception e) {
139+
listener.onFailure(e);
145140
}
146-
searchSourceBuilder.sort(SortBuilders.fieldSort("measurements.latency.number").order(SortOrder.DESC));
147-
searchRequest.source(searchSourceBuilder);
148-
try {
149-
SearchResponse searchResponse = client.search(searchRequest).actionGet();
150-
for (SearchHit hit : searchResponse.getHits()) {
151-
XContentParser parser = XContentType.JSON.xContent()
152-
.createParser(namedXContentRegistry, LoggingDeprecationHandler.INSTANCE, hit.getSourceAsString());
153-
SearchQueryRecord record = SearchQueryRecord.fromXContent(parser);
154-
records.add(record);
155-
}
156-
} catch (IndexNotFoundException ignored) {} catch (Exception e) {
157-
OperationalMetricsCounter.getInstance().incrementCounter(OperationalMetric.LOCAL_INDEX_READER_PARSING_EXCEPTIONS);
158-
logger.error("Unable to parse search hit: ", e);
141+
});
142+
}
143+
144+
/**
145+
* Executes the search request against the discovered indices.
146+
*/
147+
private void executeSearchRequest(
148+
final List<String> indexNames,
149+
final ZonedDateTime start,
150+
final ZonedDateTime end,
151+
final String id,
152+
final Boolean verbose,
153+
final MetricType metricType,
154+
final ActionListener<List<SearchQueryRecord>> listener
155+
) {
156+
SearchRequest searchRequest = QueryInsightsQueryBuilder.buildTopNSearchRequest(indexNames, start, end, id, verbose, metricType);
157+
158+
client.search(searchRequest, new ActionListener<SearchResponse>() {
159+
@Override
160+
public void onResponse(SearchResponse searchResponse) {
161+
SearchResponseParser.parseSearchResponse(searchResponse, namedXContentRegistry, listener);
159162
}
160-
curr = curr.plusDays(1);
161-
}
162-
return records;
163+
164+
@Override
165+
public void onFailure(Exception e) {
166+
OperationalMetricsCounter.getInstance().incrementCounter(OperationalMetric.LOCAL_INDEX_READER_SEARCH_EXCEPTIONS);
167+
logger.error("Failed to search indices {}: ", indexNames, e);
168+
listener.onFailure(e);
169+
}
170+
});
163171
}
164172

165173
/**
@@ -169,8 +177,4 @@ public List<SearchQueryRecord> read(final String from, final String to, final St
169177
public void close() {
170178
logger.debug("Closing the LocalIndexReader..");
171179
}
172-
173-
private String buildLocalIndexName(ZonedDateTime current) {
174-
return current.format(indexPattern) + "-" + generateLocalIndexDateHash(current.toLocalDate());
175-
}
176180
}

src/main/java/org/opensearch/plugin/insights/core/reader/QueryInsightsReader.java

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@
1010

1111
import java.io.Closeable;
1212
import java.util.List;
13+
import org.opensearch.core.action.ActionListener;
14+
import org.opensearch.plugin.insights.rules.model.MetricType;
1315
import org.opensearch.plugin.insights.rules.model.SearchQueryRecord;
1416

1517
/**
@@ -19,13 +21,21 @@ public interface QueryInsightsReader extends Closeable {
1921
/**
2022
* Reader a list of SearchQueryRecord
2123
*
22-
* @param from string
23-
* @param to string
24-
* @param id query/group id
25-
* @param verbose whether to return full output
26-
* @return List of SearchQueryRecord
24+
* @param from string
25+
* @param to string
26+
* @param id query/group id
27+
* @param verbose whether to return full output
28+
* @param metricType metric type to read
29+
* @param listener listener to be called when the read operation is complete
2730
*/
28-
List<SearchQueryRecord> read(final String from, final String to, final String id, final Boolean verbose);
31+
void read(
32+
final String from,
33+
final String to,
34+
final String id,
35+
final Boolean verbose,
36+
final MetricType metricType,
37+
final ActionListener<List<SearchQueryRecord>> listener
38+
);
2939

3040
String getId();
3141
}

src/main/java/org/opensearch/plugin/insights/core/service/QueryInsightsService.java

Lines changed: 5 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919
import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.TOP_N_EXPORTER_DELETE_AFTER;
2020
import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.TOP_N_EXPORTER_TEMPLATE_PRIORITY;
2121
import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.TOP_N_EXPORTER_TYPE;
22-
import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.TOP_QUERIES_INDEX_PATTERN_GLOB;
2322

2423
import java.io.IOException;
2524
import java.time.Duration;
@@ -62,6 +61,7 @@
6261
import org.opensearch.plugin.insights.core.reader.QueryInsightsReaderFactory;
6362
import org.opensearch.plugin.insights.core.service.categorizer.QueryShapeGenerator;
6463
import org.opensearch.plugin.insights.core.service.categorizer.SearchQueryCategorizer;
64+
import org.opensearch.plugin.insights.core.utils.IndexDiscoveryHelper;
6565
import org.opensearch.plugin.insights.rules.model.GroupingType;
6666
import org.opensearch.plugin.insights.rules.model.MetricType;
6767
import org.opensearch.plugin.insights.rules.model.SearchQueryRecord;
@@ -674,11 +674,9 @@ void deleteExpiredTopNIndices() {
674674
if (topQueriesExporter != null && topQueriesExporter.getClass() == LocalIndexExporter.class) {
675675
final LocalIndexExporter localIndexExporter = (LocalIndexExporter) topQueriesExporter;
676676
threadPool.executor(QUERY_INSIGHTS_EXECUTOR).execute(() -> {
677-
final ClusterStateRequest clusterStateRequest = new ClusterStateRequest().clear()
678-
.indices(TOP_QUERIES_INDEX_PATTERN_GLOB)
679-
.metadata(true)
680-
.local(true)
681-
.indicesOptions(IndicesOptions.strictExpand());
677+
final ClusterStateRequest clusterStateRequest = IndexDiscoveryHelper.createClusterStateRequest(
678+
IndicesOptions.strictExpand()
679+
);
682680

683681
client.admin().cluster().state(clusterStateRequest, ActionListener.wrap(clusterStateResponse -> {
684682
final Map<String, IndexMetadata> indexMetadataMap = clusterStateResponse.getState().metadata().indices();
@@ -708,11 +706,7 @@ void deleteExpiredTopNIndices() {
708706
* @param localIndexExporter the exporter to handle the local index operations
709707
*/
710708
void deleteAllTopNIndices(final Client client, final LocalIndexExporter localIndexExporter) {
711-
final ClusterStateRequest clusterStateRequest = new ClusterStateRequest().clear()
712-
.indices(TOP_QUERIES_INDEX_PATTERN_GLOB)
713-
.metadata(true)
714-
.local(true)
715-
.indicesOptions(IndicesOptions.strictExpand());
709+
final ClusterStateRequest clusterStateRequest = IndexDiscoveryHelper.createClusterStateRequest(IndicesOptions.strictExpand());
716710

717711
client.admin().cluster().state(clusterStateRequest, ActionListener.wrap(clusterStateResponse -> {
718712
clusterStateResponse.getState()

0 commit comments

Comments
 (0)