Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature branch] Add Neural Stats API #1208

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
19480ae
Add neural stats
q-andy Feb 25, 2025
a1f24f0
Refactor event/state stats
q-andy Feb 25, 2025
064465b
Extend state stats and refactor stat combination
q-andy Feb 26, 2025
e4abeca
Fix rest integ test
q-andy Feb 26, 2025
9274a71
Add stats and nodes filtering
q-andy Feb 26, 2025
e1cb163
Add flatten and include metadata query parameter
q-andy Feb 26, 2025
282db07
Add EventStatData to snapshot EventStat info
q-andy Feb 26, 2025
7d08433
Refactor stat filtering
q-andy Feb 27, 2025
df81f11
Add stat aggregations, metadata
q-andy Feb 27, 2025
4677199
Add trailing interval and minutes since last event metadata
q-andy Feb 28, 2025
8cc6698
Reorganize response processing logic
q-andy Feb 28, 2025
24760a5
Add enable stats setting and reset
q-andy Mar 1, 2025
c15bcff
Adding documentation for state and event stats
q-andy Mar 1, 2025
a8c3d0a
More documentation
q-andy Mar 3, 2025
9425021
Clean up unused state stats manager methods
q-andy Mar 3, 2025
7ba9dcb
Add unit tests and update ITs
q-andy Mar 4, 2025
fe8a616
Merge branch 'opensearch-project:main' into neural-stats
q-andy Mar 4, 2025
56c2d19
Add transport tests, refactor toXContent tests
q-andy Mar 4, 2025
d540f28
Clean up files
q-andy Mar 4, 2025
3da5765
Merge branch 'opensearch-project:main' into neural-stats
q-andy Mar 6, 2025
64032f3
Update changelog
q-andy Mar 7, 2025
f6fc682
Address review comments
q-andy Mar 10, 2025
c56a110
Reformat response
q-andy Mar 11, 2025
73e2228
Add bwc tests for stats
q-andy Mar 12, 2025
0734a6a
Add parameter validation
q-andy Mar 12, 2025
7b532a5
Use readOptionalEnumSet from core
q-andy Mar 13, 2025
2e0c119
Clean up files
q-andy Mar 13, 2025
82504ce
Merge branch 'opensearch-project:main' into neural-stats
q-andy Mar 13, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
## [Unreleased 3.x](https://github.com/opensearch-project/neural-search/compare/main...HEAD)
### Features
- Lower bound for min-max normalization technique in hybrid query ([#1195](https://github.com/opensearch-project/neural-search/pull/1195))
- Add stats API ([#1208](https://github.com/opensearch-project/neural-search/pull/1208))
### Enhancements
### Bug Fixes
### Infrastructure
Expand Down
19 changes: 19 additions & 0 deletions qa/restart-upgrade/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,11 @@ def versionsBelow2_13 = versionsBelow2_12 + "2.12"
def versionsBelow2_14 = versionsBelow2_13 + "2.13"
def versionsBelow2_15 = versionsBelow2_14 + "2.14"
def versionsBelow2_16 = versionsBelow2_15 + "2.15"
def versionsBelow2_17 = versionsBelow2_16 + "2.16"
def versionsBelow2_18 = versionsBelow2_17 + "2.17"
def versionsBelow2_19 = versionsBelow2_18 + "2.18"
def versionsBelow2_20 = versionsBelow2_19 + "2.19"
def versionsBelow3_0 = versionsBelow2_20 + "2.20"

// Task to run BWC tests against the old cluster
task testAgainstOldCluster(type: StandaloneRestIntegTestTask) {
Expand Down Expand Up @@ -114,6 +119,13 @@ task testAgainstOldCluster(type: StandaloneRestIntegTestTask) {
}
}

// Excluding stats tests because we introduce this feature in 3.0
if (versionsBelow3_0.any { ext.neural_search_bwc_version.startsWith(it) }){
filter {
excludeTestsMatching "org.opensearch.neuralsearch.bwc.restart.RestNeuralStatsActionIT.*"
}
}

nonInputProperties.systemProperty('tests.rest.cluster', "${-> testClusters."${baseName}".allHttpSocketURI.join(",")}")
nonInputProperties.systemProperty('tests.clustername', "${-> testClusters."${baseName}".getName()}")
systemProperty 'tests.security.manager', 'false'
Expand Down Expand Up @@ -179,6 +191,13 @@ task testAgainstNewCluster(type: StandaloneRestIntegTestTask) {
}
}

// Excluding stats tests because we introduce this feature in 3.0
if (versionsBelow3_0.any { ext.neural_search_bwc_version.startsWith(it) }){
filter {
excludeTestsMatching "org.opensearch.neuralsearch.bwc.restart.RestNeuralStatsActionIT.*"
}
}

nonInputProperties.systemProperty('tests.rest.cluster', "${-> testClusters."${baseName}".allHttpSocketURI.join(",")}")
nonInputProperties.systemProperty('tests.clustername', "${-> testClusters."${baseName}".getName()}")
systemProperty 'tests.security.manager', 'false'
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/
package org.opensearch.neuralsearch.bwc.restart;

import org.opensearch.neuralsearch.stats.events.EventStatName;
import org.opensearch.neuralsearch.stats.info.InfoStatName;

import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Map;

import static org.opensearch.neuralsearch.util.TestUtils.NODES_BWC_CLUSTER;
import static org.opensearch.neuralsearch.util.TestUtils.TEXT_EMBEDDING_PROCESSOR;
import static org.opensearch.neuralsearch.util.TestUtils.getModelId;

public class RestNeuralStatsActionIT extends AbstractRestartUpgradeRestTestCase {
private static final String PIPELINE_NAME = "nlp-pipeline";
private static final String TEST_FIELD = "passage_text";
private static final String TEXT = "Hello world";
private static final String TEXT_1 = "Hello world a";

// Test restart-upgrade with neural stats
// Enabled/disabled settings should persist between restarts
// Event stats should be reset on restart
// Info stats based on persistent constructs should be persisted between restarts
public void testNeuralStats_E2EFlow() throws Exception {
waitForClusterHealthGreen(NODES_BWC_CLUSTER);
updateClusterSettings("plugins.neural_search.stats_enabled", true);

// Currently using text embedding processor executions stat since that's the only one implemented
// Once other stats are implemented, it may be smarter to use those instead of text embedding processor
// to avoid having to upload a model and run inference.
if (isRunningAgainstOldCluster()) {
String modelId = uploadTextEmbeddingModel();
loadModel(modelId);
createPipelineProcessor(modelId, PIPELINE_NAME);
createIndexWithConfiguration(
getIndexNameForTest(),
Files.readString(Path.of(classLoader.getResource("processor/IndexMappingMultipleShard.json").toURI())),
PIPELINE_NAME
);
addDocument(getIndexNameForTest(), "0", TEST_FIELD, TEXT, null, null);

// Get stats request
String responseBody = executeNeuralStatRequest(new ArrayList<>(), new ArrayList<>());
Map<String, Object> infoStats = parseInfoStatsResponse(responseBody);
Map<String, Object> aggregatedNodeStats = parseAggregatedNodeStatsResponse(responseBody);

assertEquals(1, getNestedValue(aggregatedNodeStats, EventStatName.TEXT_EMBEDDING_PROCESSOR_EXECUTIONS));
assertEquals(1, getNestedValue(infoStats, InfoStatName.TEXT_EMBEDDING_PROCESSORS));
} else {
String modelId = null;
try {
modelId = getModelId(getIngestionPipeline(PIPELINE_NAME), TEXT_EMBEDDING_PROCESSOR);
loadModel(modelId);
addDocument(getIndexNameForTest(), "1", TEST_FIELD, TEXT_1, null, null);
addDocument(getIndexNameForTest(), "2", TEST_FIELD, TEXT_1, null, null);
addDocument(getIndexNameForTest(), "3", TEST_FIELD, TEXT_1, null, null);

// Get stats request
String responseBody = executeNeuralStatRequest(new ArrayList<>(), new ArrayList<>());
Map<String, Object> infoStats = parseInfoStatsResponse(responseBody);
Map<String, Object> aggregatedNodeStats = parseAggregatedNodeStatsResponse(responseBody);

assertEquals(3, getNestedValue(aggregatedNodeStats, EventStatName.TEXT_EMBEDDING_PROCESSOR_EXECUTIONS));
assertEquals(1, getNestedValue(infoStats, InfoStatName.TEXT_EMBEDDING_PROCESSORS));
} finally {
wipeOfTestResources(getIndexNameForTest(), PIPELINE_NAME, modelId, null);
}
}
}
}
33 changes: 33 additions & 0 deletions qa/rolling-upgrade/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,11 @@ def versionsBelow2_13 = versionsBelow2_12 + "2.12"
def versionsBelow2_14 = versionsBelow2_13 + "2.13"
def versionsBelow2_15 = versionsBelow2_14 + "2.14"
def versionsBelow2_16 = versionsBelow2_15 + "2.15"
def versionsBelow2_17 = versionsBelow2_16 + "2.16"
def versionsBelow2_18 = versionsBelow2_17 + "2.17"
def versionsBelow2_19 = versionsBelow2_18 + "2.18"
def versionsBelow2_20 = versionsBelow2_19 + "2.19"
def versionsBelow3_0 = versionsBelow2_20 + "2.20"

// Task to run BWC tests against the old cluster
task testAgainstOldCluster(type: StandaloneRestIntegTestTask) {
Expand All @@ -75,6 +80,13 @@ task testAgainstOldCluster(type: StandaloneRestIntegTestTask) {
nonInputProperties.systemProperty('tests.rest.cluster', "${-> testClusters."${baseName}".allHttpSocketURI.join(",")}")
nonInputProperties.systemProperty('tests.clustername', "${-> testClusters."${baseName}".getName()}")
systemProperty 'tests.security.manager', 'false'

// Excluding stats tests because we introduce this feature in 3.0
if (versionsBelow3_0.any { ext.neural_search_bwc_version.startsWith(it) }){
filter {
excludeTestsMatching "org.opensearch.neuralsearch.bwc.rolling.RestNeuralStatsActionIT.*"
}
}
}

// Part of rolling upgrade. Upgrades one node of the old cluster to new OpenSearch version with upgraded plugin version
Expand All @@ -100,6 +112,13 @@ task testAgainstOneThirdUpgradedCluster(type: StandaloneRestIntegTestTask) {
nonInputProperties.systemProperty('tests.rest.cluster', "${-> testClusters."${baseName}".allHttpSocketURI.join(",")}")
nonInputProperties.systemProperty('tests.clustername', "${-> testClusters."${baseName}".getName()}")
systemProperty 'tests.security.manager', 'false'

// Excluding stats tests because we introduce this feature in 3.0
if (versionsBelow3_0.any { ext.neural_search_bwc_version.startsWith(it) }){
filter {
excludeTestsMatching "org.opensearch.neuralsearch.bwc.rolling.RestNeuralStatsActionIT.*"
}
}
}

// Part of rolling upgrade. Upgrades the second node to new OpenSearch version with upgraded plugin version after the
Expand All @@ -124,6 +143,13 @@ task testAgainstTwoThirdsUpgradedCluster(type: StandaloneRestIntegTestTask) {
nonInputProperties.systemProperty('tests.rest.cluster', "${-> testClusters."${baseName}".allHttpSocketURI.join(",")}")
nonInputProperties.systemProperty('tests.clustername', "${-> testClusters."${baseName}".getName()}")
systemProperty 'tests.security.manager', 'false'

// Excluding stats tests because we introduce this feature in 3.0
if (versionsBelow3_0.any { ext.neural_search_bwc_version.startsWith(it) }){
filter {
excludeTestsMatching "org.opensearch.neuralsearch.bwc.rolling.RestNeuralStatsActionIT.*"
}
}
}

// Part of rolling upgrade. Upgrades the third node to new OpenSearch version with upgraded plugin version after the
Expand All @@ -148,4 +174,11 @@ task testRollingUpgrade(type: StandaloneRestIntegTestTask) {
nonInputProperties.systemProperty('tests.rest.cluster', "${-> testClusters."${baseName}".allHttpSocketURI.join(",")}")
nonInputProperties.systemProperty('tests.clustername', "${-> testClusters."${baseName}".getName()}")
systemProperty 'tests.security.manager', 'false'

// Excluding stats tests because we introduce this feature in 3.0
if (versionsBelow3_0.any { ext.neural_search_bwc_version.startsWith(it) }){
filter {
excludeTestsMatching "org.opensearch.neuralsearch.bwc.rolling.RestNeuralStatsActionIT.*"
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/
package org.opensearch.neuralsearch.bwc.rolling;

import org.opensearch.neuralsearch.stats.events.EventStatName;
import org.opensearch.neuralsearch.stats.info.InfoStatName;

import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Map;

import static org.opensearch.neuralsearch.util.TestUtils.NODES_BWC_CLUSTER;

public class RestNeuralStatsActionIT extends AbstractRollingUpgradeTestCase {
private static final String PIPELINE_NAME = "nlp-pipeline";
private static final String TEST_FIELD = "passage_text";
private static final String TEXT = "Hello world";
private static final String TEXT_MIXED = "Hello world mixed";
private static final String TEXT_UPGRADED = "Hello world upgraded";
private static final int NUM_DOCS_PER_ROUND = 1;
private static String modelId = "";

// Test rolling-upgrade neural stats action
// Create Text Embedding Processor, Ingestion Pipeline and add document
// Validate stats are correct during upgrade
// When new stats are added, we will also want to validate handling fetching stats from previous versions
// that don't have those stats.
public void testStats_E2EFlow() throws Exception {
waitForClusterHealthGreen(NODES_BWC_CLUSTER, 90);
updateClusterSettings("plugins.neural_search.stats_enabled", true);

switch (getClusterType()) {
case OLD:
modelId = uploadTextEmbeddingModel();
loadModel(modelId);
createPipelineProcessor(modelId, PIPELINE_NAME);
createIndexWithConfiguration(
getIndexNameForTest(),
Files.readString(Path.of(classLoader.getResource("processor/IndexMappings.json").toURI())),
PIPELINE_NAME
);
addDocument(getIndexNameForTest(), "0", TEST_FIELD, TEXT, null, null);

// Get stats request
String responseBody = executeNeuralStatRequest(new ArrayList<>(), new ArrayList<>());
Map<String, Object> infoStats = parseInfoStatsResponse(responseBody);
Map<String, Object> aggregatedNodeStats = parseAggregatedNodeStatsResponse(responseBody);

assertEquals(1, getNestedValue(aggregatedNodeStats, EventStatName.TEXT_EMBEDDING_PROCESSOR_EXECUTIONS));
assertEquals(1, getNestedValue(infoStats, InfoStatName.TEXT_EMBEDDING_PROCESSORS));
break;
case MIXED:
// Get stats request
responseBody = executeNeuralStatRequest(new ArrayList<>(), new ArrayList<>());
infoStats = parseInfoStatsResponse(responseBody);

assertEquals(1, getNestedValue(infoStats, InfoStatName.TEXT_EMBEDDING_PROCESSORS));
break;
case UPGRADED:
try {
// Get stats request
responseBody = executeNeuralStatRequest(new ArrayList<>(), new ArrayList<>());
infoStats = parseInfoStatsResponse(responseBody);
aggregatedNodeStats = parseAggregatedNodeStatsResponse(responseBody);

// After all nodes have be restarted, all event stats should be reset as well
assertEquals(0, getNestedValue(aggregatedNodeStats, EventStatName.TEXT_EMBEDDING_PROCESSOR_EXECUTIONS));
assertEquals(1, getNestedValue(infoStats, InfoStatName.TEXT_EMBEDDING_PROCESSORS));
} finally {
wipeOfTestResources(getIndexNameForTest(), PIPELINE_NAME, modelId, null);
}
break;
default:
throw new IllegalStateException("Unexpected value: " + getClusterType());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

import static org.opensearch.neuralsearch.settings.NeuralSearchSettings.NEURAL_SEARCH_HYBRID_SEARCH_DISABLED;
import static org.opensearch.neuralsearch.settings.NeuralSearchSettings.RERANKER_MAX_DOC_FIELDS;
import static org.opensearch.neuralsearch.settings.NeuralSearchSettings.NEURAL_STATS_ENABLED;

import java.util.Arrays;
import java.util.Collection;
Expand All @@ -14,12 +15,22 @@
import java.util.Optional;
import java.util.function.Supplier;

import com.google.common.collect.ImmutableList;
import org.opensearch.action.ActionRequest;
import org.opensearch.neuralsearch.settings.NeuralSearchSettingsAccessor;
import org.opensearch.neuralsearch.stats.events.EventStatsManager;
import org.opensearch.neuralsearch.stats.info.InfoStatsManager;
import org.opensearch.transport.client.Client;
import org.opensearch.cluster.metadata.IndexNameExpressionResolver;
import org.opensearch.cluster.node.DiscoveryNodes;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.IndexScopedSettings;
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.settings.SettingsFilter;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.core.action.ActionResponse;
import org.opensearch.core.common.io.stream.NamedWriteableRegistry;
import org.opensearch.core.xcontent.NamedXContentRegistry;
import org.opensearch.env.Environment;
Expand Down Expand Up @@ -55,15 +66,21 @@
import org.opensearch.neuralsearch.query.NeuralQueryBuilder;
import org.opensearch.neuralsearch.query.NeuralSparseQueryBuilder;
import org.opensearch.neuralsearch.query.ext.RerankSearchExtBuilder;
import org.opensearch.neuralsearch.rest.RestNeuralStatsAction;
import org.opensearch.neuralsearch.search.query.HybridQueryPhaseSearcher;
import org.opensearch.neuralsearch.transport.NeuralStatsAction;
import org.opensearch.neuralsearch.transport.NeuralStatsTransportAction;
import org.opensearch.neuralsearch.util.NeuralSearchClusterUtil;
import org.opensearch.neuralsearch.util.PipelineServiceUtil;
import org.opensearch.plugins.ActionPlugin;
import org.opensearch.plugins.ExtensiblePlugin;
import org.opensearch.plugins.IngestPlugin;
import org.opensearch.plugins.Plugin;
import org.opensearch.plugins.SearchPipelinePlugin;
import org.opensearch.plugins.SearchPlugin;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.rest.RestController;
import org.opensearch.rest.RestHandler;
import org.opensearch.script.ScriptService;
import org.opensearch.search.pipeline.SearchPhaseResultsProcessor;
import org.opensearch.search.pipeline.SearchRequestProcessor;
Expand All @@ -82,9 +99,13 @@
public class NeuralSearch extends Plugin implements ActionPlugin, SearchPlugin, IngestPlugin, ExtensiblePlugin, SearchPipelinePlugin {
private MLCommonsClientAccessor clientAccessor;
private NormalizationProcessorWorkflow normalizationProcessorWorkflow;
private NeuralSearchSettingsAccessor settingsAccessor;
private PipelineServiceUtil pipelineServiceUtil;
private InfoStatsManager infoStatsManager;
private final ScoreNormalizationFactory scoreNormalizationFactory = new ScoreNormalizationFactory();
private final ScoreCombinationFactory scoreCombinationFactory = new ScoreCombinationFactory();
public static final String EXPLANATION_RESPONSE_KEY = "explanation_response";
public static final String NEURAL_BASE_URI = "/_plugins/_neural";

@Override
public Collection<Object> createComponents(
Expand All @@ -105,7 +126,11 @@ public Collection<Object> createComponents(
NeuralSparseQueryBuilder.initialize(clientAccessor);
HybridQueryExecutor.initialize(threadPool);
normalizationProcessorWorkflow = new NormalizationProcessorWorkflow(new ScoreNormalizer(), new ScoreCombiner());
return List.of(clientAccessor);
settingsAccessor = new NeuralSearchSettingsAccessor(clusterService, environment.settings());
pipelineServiceUtil = new PipelineServiceUtil(clusterService);
infoStatsManager = new InfoStatsManager(NeuralSearchClusterUtil.instance(), settingsAccessor, pipelineServiceUtil);
EventStatsManager.instance().initialize(settingsAccessor);
return List.of(clientAccessor, EventStatsManager.instance(), infoStatsManager);
}

@Override
Expand All @@ -117,6 +142,25 @@ public List<QuerySpec<?>> getQueries() {
);
}

@Override
public List<RestHandler> getRestHandlers(
Settings settings,
RestController restController,
ClusterSettings clusterSettings,
IndexScopedSettings indexScopedSettings,
SettingsFilter settingsFilter,
IndexNameExpressionResolver indexNameExpressionResolver,
Supplier<DiscoveryNodes> nodesInCluster
) {
RestNeuralStatsAction restNeuralStatsAction = new RestNeuralStatsAction(settingsAccessor);
return ImmutableList.of(restNeuralStatsAction);
}

@Override
public List<ActionHandler<? extends ActionRequest, ? extends ActionResponse>> getActions() {
return Arrays.asList(new ActionHandler<>(NeuralStatsAction.INSTANCE, NeuralStatsTransportAction.class));
}

@Override
public List<ExecutorBuilder<?>> getExecutorBuilders(Settings settings) {
return List.of(HybridQueryExecutor.getExecutorBuilder(settings));
Expand Down Expand Up @@ -167,7 +211,7 @@ public Map<String, org.opensearch.search.pipeline.Processor.Factory<SearchPhaseR

@Override
public List<Setting<?>> getSettings() {
return List.of(NEURAL_SEARCH_HYBRID_SEARCH_DISABLED, RERANKER_MAX_DOC_FIELDS);
return List.of(NEURAL_SEARCH_HYBRID_SEARCH_DISABLED, RERANKER_MAX_DOC_FIELDS, NEURAL_STATS_ENABLED);
}

@Override
Expand Down
Loading
Loading