Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature branch] Add Neural Stats API #1208

Merged
Merged
Show file tree
Hide file tree
Changes from 19 commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
19480ae
Add neural stats
q-andy Feb 25, 2025
a1f24f0
Refactor event/state stats
q-andy Feb 25, 2025
064465b
Extend state stats and refactor stat combination
q-andy Feb 26, 2025
e4abeca
Fix rest integ test
q-andy Feb 26, 2025
9274a71
Add stats and nodes filtering
q-andy Feb 26, 2025
e1cb163
Add flatten and include metadata query parameter
q-andy Feb 26, 2025
282db07
Add EventStatData to snapshot EventStat info
q-andy Feb 26, 2025
7d08433
Refactor stat filtering
q-andy Feb 27, 2025
df81f11
Add stat aggregations, metadata
q-andy Feb 27, 2025
4677199
Add trailing interval and minutes since last event metadata
q-andy Feb 28, 2025
8cc6698
Reorganize response processing logic
q-andy Feb 28, 2025
24760a5
Add enable stats setting and reset
q-andy Mar 1, 2025
c15bcff
Adding documentation for state and event stats
q-andy Mar 1, 2025
a8c3d0a
More documentation
q-andy Mar 3, 2025
9425021
Clean up unused state stats manager methods
q-andy Mar 3, 2025
7ba9dcb
Add unit tests and update ITs
q-andy Mar 4, 2025
fe8a616
Merge branch 'opensearch-project:main' into neural-stats
q-andy Mar 4, 2025
56c2d19
Add transport tests, refactor toXContent tests
q-andy Mar 4, 2025
d540f28
Clean up files
q-andy Mar 4, 2025
3da5765
Merge branch 'opensearch-project:main' into neural-stats
q-andy Mar 6, 2025
64032f3
Update changelog
q-andy Mar 7, 2025
f6fc682
Address review comments
q-andy Mar 10, 2025
c56a110
Reformat response
q-andy Mar 11, 2025
73e2228
Add bwc tests for stats
q-andy Mar 12, 2025
0734a6a
Add parameter validation
q-andy Mar 12, 2025
7b532a5
Use readOptionalEnumSet from core
q-andy Mar 13, 2025
2e0c119
Clean up files
q-andy Mar 13, 2025
82504ce
Merge branch 'opensearch-project:main' into neural-stats
q-andy Mar 13, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

import static org.opensearch.neuralsearch.settings.NeuralSearchSettings.NEURAL_SEARCH_HYBRID_SEARCH_DISABLED;
import static org.opensearch.neuralsearch.settings.NeuralSearchSettings.RERANKER_MAX_DOC_FIELDS;
import static org.opensearch.neuralsearch.settings.NeuralSearchSettings.NEURAL_STATS_ENABLED;

import java.util.Arrays;
import java.util.Collection;
Expand All @@ -14,12 +15,22 @@
import java.util.Optional;
import java.util.function.Supplier;

import com.google.common.collect.ImmutableList;
import org.opensearch.action.ActionRequest;
import org.opensearch.neuralsearch.settings.NeuralSearchSettingsAccessor;
import org.opensearch.neuralsearch.stats.events.EventStatsManager;
import org.opensearch.neuralsearch.stats.state.StateStatsManager;
import org.opensearch.transport.client.Client;
import org.opensearch.cluster.metadata.IndexNameExpressionResolver;
import org.opensearch.cluster.node.DiscoveryNodes;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.IndexScopedSettings;
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.settings.SettingsFilter;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.core.action.ActionResponse;
import org.opensearch.core.common.io.stream.NamedWriteableRegistry;
import org.opensearch.core.xcontent.NamedXContentRegistry;
import org.opensearch.env.Environment;
Expand Down Expand Up @@ -55,15 +66,21 @@
import org.opensearch.neuralsearch.query.NeuralQueryBuilder;
import org.opensearch.neuralsearch.query.NeuralSparseQueryBuilder;
import org.opensearch.neuralsearch.query.ext.RerankSearchExtBuilder;
import org.opensearch.neuralsearch.rest.RestNeuralStatsHandler;
import org.opensearch.neuralsearch.search.query.HybridQueryPhaseSearcher;
import org.opensearch.neuralsearch.transport.NeuralStatsAction;
import org.opensearch.neuralsearch.transport.NeuralStatsTransportAction;
import org.opensearch.neuralsearch.util.NeuralSearchClusterUtil;
import org.opensearch.neuralsearch.util.PipelineServiceUtil;
import org.opensearch.plugins.ActionPlugin;
import org.opensearch.plugins.ExtensiblePlugin;
import org.opensearch.plugins.IngestPlugin;
import org.opensearch.plugins.Plugin;
import org.opensearch.plugins.SearchPipelinePlugin;
import org.opensearch.plugins.SearchPlugin;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.rest.RestController;
import org.opensearch.rest.RestHandler;
import org.opensearch.script.ScriptService;
import org.opensearch.search.pipeline.SearchPhaseResultsProcessor;
import org.opensearch.search.pipeline.SearchRequestProcessor;
Expand All @@ -85,6 +102,7 @@ public class NeuralSearch extends Plugin implements ActionPlugin, SearchPlugin,
private final ScoreNormalizationFactory scoreNormalizationFactory = new ScoreNormalizationFactory();
private final ScoreCombinationFactory scoreCombinationFactory = new ScoreCombinationFactory();
public static final String EXPLANATION_RESPONSE_KEY = "explanation_response";
public static final String NEURAL_BASE_URI = "/_plugins/_neural";

@Override
public Collection<Object> createComponents(
Expand All @@ -105,7 +123,11 @@ public Collection<Object> createComponents(
NeuralSparseQueryBuilder.initialize(clientAccessor);
HybridQueryExecutor.initialize(threadPool);
normalizationProcessorWorkflow = new NormalizationProcessorWorkflow(new ScoreNormalizer(), new ScoreCombiner());
return List.of(clientAccessor);

PipelineServiceUtil.instance().initialize(clusterService);
NeuralSearchSettingsAccessor.instance().initialize(clusterService, environment.settings());

return List.of(clientAccessor, EventStatsManager.instance(), StateStatsManager.instance());
}

@Override
Expand All @@ -117,6 +139,25 @@ public List<QuerySpec<?>> getQueries() {
);
}

@Override
public List<RestHandler> getRestHandlers(
Settings settings,
RestController restController,
ClusterSettings clusterSettings,
IndexScopedSettings indexScopedSettings,
SettingsFilter settingsFilter,
IndexNameExpressionResolver indexNameExpressionResolver,
Supplier<DiscoveryNodes> nodesInCluster
) {
RestNeuralStatsHandler restNeuralStatsHandler = new RestNeuralStatsHandler(NeuralSearchSettingsAccessor.instance());
return ImmutableList.of(restNeuralStatsHandler);
}

@Override
public List<ActionHandler<? extends ActionRequest, ? extends ActionResponse>> getActions() {
return Arrays.asList(new ActionHandler<>(NeuralStatsAction.INSTANCE, NeuralStatsTransportAction.class));
}

@Override
public List<ExecutorBuilder<?>> getExecutorBuilders(Settings settings) {
return List.of(HybridQueryExecutor.getExecutorBuilder(settings));
Expand Down Expand Up @@ -167,7 +208,7 @@ public Map<String, org.opensearch.search.pipeline.Processor.Factory<SearchPhaseR

@Override
public List<Setting<?>> getSettings() {
return List.of(NEURAL_SEARCH_HYBRID_SEARCH_DISABLED, RERANKER_MAX_DOC_FIELDS);
return List.of(NEURAL_SEARCH_HYBRID_SEARCH_DISABLED, RERANKER_MAX_DOC_FIELDS, NEURAL_STATS_ENABLED);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
import org.opensearch.neuralsearch.ml.MLCommonsClientAccessor;

import lombok.extern.log4j.Log4j2;
import org.opensearch.neuralsearch.stats.events.EventStatsManager;
import org.opensearch.neuralsearch.stats.events.EventStatName;

/**
* This processor is used for user input data text embedding processing, model_id can be used to indicate which model user use,
Expand Down Expand Up @@ -47,6 +49,7 @@ public void doExecute(
List<String> inferenceList,
BiConsumer<IngestDocument, Exception> handler
) {
EventStatsManager.increment(EventStatName.TEXT_EMBEDDING_PROCESSOR_EXECUTIONS);
mlCommonsClientAccessor.inferenceSentences(
TextInferenceRequest.builder().modelId(this.modelId).inputTexts(inferenceList).build(),
ActionListener.wrap(vectors -> {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/
package org.opensearch.neuralsearch.processor.util;

import org.opensearch.rest.RestRequest;

import java.util.Optional;

public class RestActionUtils {
public static Optional<String[]> splitCommaSeparatedParam(RestRequest request, String paramName) {
return Optional.ofNullable(request.param(paramName)).map(s -> s.split(","));
}

public static Optional<String> getStringParam(RestRequest request, String paramName) {
return Optional.ofNullable(request.param(paramName));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/
package org.opensearch.neuralsearch.rest;

import com.google.common.collect.ImmutableList;
import lombok.AllArgsConstructor;
import lombok.extern.log4j.Log4j2;
import org.apache.commons.lang.StringUtils;
import org.opensearch.core.rest.RestStatus;
import org.opensearch.neuralsearch.settings.NeuralSearchSettingsAccessor;
import org.opensearch.neuralsearch.stats.NeuralStatsInput;
import org.opensearch.neuralsearch.stats.events.EventStatName;
import org.opensearch.neuralsearch.stats.state.StateStatName;
import org.opensearch.neuralsearch.transport.NeuralStatsAction;
import org.opensearch.neuralsearch.transport.NeuralStatsRequest;
import org.opensearch.rest.BaseRestHandler;
import org.opensearch.rest.BytesRestResponse;
import org.opensearch.rest.RestRequest;
import org.opensearch.rest.action.RestActions;
import org.opensearch.transport.client.node.NodeClient;

import java.util.Arrays;
import java.util.EnumSet;
import java.util.List;
import java.util.Locale;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;

import static org.opensearch.neuralsearch.plugin.NeuralSearch.NEURAL_BASE_URI;
import static org.opensearch.neuralsearch.processor.util.RestActionUtils.splitCommaSeparatedParam;

@Log4j2
@AllArgsConstructor
public class RestNeuralStatsHandler extends BaseRestHandler {
private static final String NAME = "neural_stats_action";
public static final String FLATTEN_PARAM = "flat_keys";
public static final String INCLUDE_METADATA_PARAM = "include_metadata";

private static final Set<String> EVENT_STAT_NAMES = EnumSet.allOf(EventStatName.class)
.stream()
.map(EventStatName::getNameString)
.map(String::toLowerCase)
.collect(Collectors.toSet());

private static final Set<String> STATE_STAT_NAMES = EnumSet.allOf(StateStatName.class)
.stream()
.map(StateStatName::getNameString)
.map(String::toLowerCase)
.collect(Collectors.toSet());

private NeuralSearchSettingsAccessor settingsAccessor;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Move this declaration to top

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't static fields typically go before instance fields?


@Override
public String getName() {
return NAME;
}

@Override
public List<Route> routes() {
return ImmutableList.of(
new Route(RestRequest.Method.GET, NEURAL_BASE_URI + "/{nodeId}/stats/"),
new Route(RestRequest.Method.GET, NEURAL_BASE_URI + "/{nodeId}/stats/{stat}"),
new Route(RestRequest.Method.GET, NEURAL_BASE_URI + "/stats/"),
new Route(RestRequest.Method.GET, NEURAL_BASE_URI + "/stats/{stat}")
);
}

@Override
protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) {
if (settingsAccessor.getIsStatsEnabled() == false) {
// Process params, or else will automatically return a 400 instead of a 403
splitCommaSeparatedParam(request, "nodeId");
splitCommaSeparatedParam(request, "stat");
request.paramAsBoolean(FLATTEN_PARAM, false);
request.paramAsBoolean(INCLUDE_METADATA_PARAM, false);

return channel -> channel.sendResponse(new BytesRestResponse(RestStatus.FORBIDDEN, "Stats endpoint is disabled"));
}

// Read inputs and convert to BaseNodesRequest with correct info configured
NeuralStatsRequest neuralStatsRequest = getRequest(request);

return channel -> client.execute(
NeuralStatsAction.INSTANCE,
neuralStatsRequest,
new RestActions.NodesResponseRestListener<>(channel)
);
}

/**
* Creates a NeuralStatsRequest from a RestRequest
*
* @param request Rest request
* @return NeuralStatsRequest
*/
private NeuralStatsRequest getRequest(RestRequest request) {
// parse the nodes the user wants to query
String[] nodeIdsArr = null;
String nodesIdsStr = request.param("nodeId");
if (StringUtils.isNotEmpty(nodesIdsStr)) {
nodeIdsArr = nodesIdsStr.split(",");
}

NeuralStatsInput neuralStatsInput = createNeuralStatsInputFromRequestParams(request);

NeuralStatsRequest neuralStatsRequest = new NeuralStatsRequest(nodeIdsArr, neuralStatsInput);
neuralStatsRequest.timeout(request.param("timeout"));

return neuralStatsRequest;
}

NeuralStatsInput createNeuralStatsInputFromRequestParams(RestRequest request) {
NeuralStatsInput neuralStatsInput = new NeuralStatsInput();

// Parse specified nodes
Optional<String[]> nodeIds = splitCommaSeparatedParam(request, "nodeId");
if (nodeIds.isPresent()) {
neuralStatsInput.getNodeIds().addAll(Arrays.asList(nodeIds.get()));
}

// Parse query parameters
boolean flatten = request.paramAsBoolean(FLATTEN_PARAM, false);
neuralStatsInput.setFlatten(flatten);

boolean includeMetadata = request.paramAsBoolean(INCLUDE_METADATA_PARAM, false);
neuralStatsInput.setIncludeMetadata(includeMetadata);

// Determine which stat names to retrieve based on user parameters
Optional<String[]> stats = splitCommaSeparatedParam(request, "stat");
boolean retrieveAllStats = true;

// Add stats to input to retrieve if specified
if (stats.isPresent()) {
for (String stat : stats.get()) {
stat = stat.toLowerCase(Locale.ROOT);

if (EVENT_STAT_NAMES.contains(stat)) {
retrieveAllStats = false;
neuralStatsInput.getEventStatNames().add(EventStatName.from(stat));
} else if (STATE_STAT_NAMES.contains(stat)) {
retrieveAllStats = false;
neuralStatsInput.getStateStatNames().add(StateStatName.from(stat));
}
}
}

// If no stats are specified, add all stats to retrieve all by default
if (retrieveAllStats) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Even if stats is specified, if there is not matching stats, do we want to return all stats instead of empty?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, currently that's how other stats APIs work, they return all stats if no stats match. I think the reasoning is that invalid stat names are ignored, and when all are ignored then it's treated as a get all.

neuralStatsInput.getEventStatNames().addAll(EnumSet.allOf(EventStatName.class));
neuralStatsInput.getStateStatNames().addAll(EnumSet.allOf(StateStatName.class));
}
return neuralStatsInput;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,4 +34,11 @@ public final class NeuralSearchSettings {
50,
Setting.Property.NodeScope
);

public static final Setting<Boolean> NEURAL_STATS_ENABLED = Setting.boolSetting(
"plugins.neural_search.stats_enabled",
false,
Setting.Property.NodeScope,
Setting.Property.Dynamic
);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/
package org.opensearch.neuralsearch.settings;

import lombok.Getter;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.settings.Settings;
import org.opensearch.neuralsearch.stats.events.EventStatsManager;

public class NeuralSearchSettingsAccessor {
private static NeuralSearchSettingsAccessor INSTANCE;
private boolean initialized;

@Getter
private volatile Boolean isStatsEnabled;

/**
* Return instance of the cluster context, must be initialized first for proper usage
* @return instance of cluster context
*/
public static synchronized NeuralSearchSettingsAccessor instance() {
if (INSTANCE == null) {
INSTANCE = new NeuralSearchSettingsAccessor();
}
return INSTANCE;
}

public void initialize(ClusterService clusterService, Settings settings) {
if (initialized) return;

isStatsEnabled = NeuralSearchSettings.NEURAL_STATS_ENABLED.get(settings);

clusterService.getClusterSettings().addSettingsUpdateConsumer(NeuralSearchSettings.NEURAL_STATS_ENABLED, value -> {
// If stats are being toggled off, clear and reset all stats
if (isStatsEnabled && (value == false)) {
EventStatsManager.instance().reset();
}
isStatsEnabled = value;
});
}

}
Loading
Loading