Skip to content

Metrics framework integration with ml-commons #3661

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 23 commits into from
Jun 11, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
9195bdf
feat(telemetry aware plugin): adding counters and implementing teleme…
pyek-bot Mar 12, 2025
9b388d5
feature: job extension and static job
pyek-bot Mar 14, 2025
e23a9b5
fix: post rebase
pyek-bot May 13, 2025
5edce6e
chore
pyek-bot May 13, 2025
8b87540
feat: add static metric collection of model types
pyek-bot May 16, 2025
cf7ec3d
fix: fetch connector from connector_id
pyek-bot May 19, 2025
ce1a948
refactor: move ragpipeline feature flag out of supplier and use mlfea…
pyek-bot May 20, 2025
039392d
feat: add settings to control metric collection
pyek-bot May 21, 2025
dfaa932
feat: add test cases
pyek-bot May 23, 2025
fc756d8
fix: spotless
pyek-bot May 23, 2025
b50d5d0
feat: capture latency and throughput for model predict
pyek-bot May 29, 2025
9433c1a
spotless
pyek-bot May 29, 2025
78056ff
fix: add header to MetricType.java
pyek-bot Jun 9, 2025
2eaeaaa
review changes: add prefix check for pre-trained model, add java docs…
pyek-bot Jun 10, 2025
88c7477
fix: set task started flag appropriately in taskmanager
pyek-bot Jun 10, 2025
f24fae6
Merge branch 'main' into otel_metrics_integration
dhrubo-os Jun 10, 2025
9e3c409
Merge branch 'main' into otel_metrics_integration
pyek-bot Jun 10, 2025
9661993
fix: test cases
pyek-bot Jun 11, 2025
c3e677c
Merge branch 'main' into otel_metrics_integration
pyek-bot Jun 11, 2025
8c48068
chore: add more tests
pyek-bot Jun 11, 2025
995b7fe
spotless
pyek-bot Jun 11, 2025
62f5df3
test: add to exclusions
pyek-bot Jun 11, 2025
9fc2925
Merge branch 'main' into otel_metrics_integration
pyek-bot Jun 11, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,12 @@ public class CommonValue {
public static final String ML_MEMORY_META_INDEX = ".plugins-ml-memory-meta";
public static final String ML_MEMORY_MESSAGE_INDEX = ".plugins-ml-memory-message";
public static final String ML_STOP_WORDS_INDEX = ".plugins-ml-stop-words";
// index used in 2.19 to track MlTaskBatchUpdate task
public static final String TASK_POLLING_JOB_INDEX = ".ml_commons_task_polling_job";
public static final String MCP_SESSION_MANAGEMENT_INDEX = ".plugins-ml-mcp-session-management";
public static final String MCP_TOOLS_INDEX = ".plugins-ml-mcp-tools";
// index created in 3.1 to track all ml jobs created via job scheduler
public static final String ML_JOBS_INDEX = ".plugins-ml-jobs";
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We already have multiple system index. To avoid adding too many system index, can we reuse .plugins-ml-task index ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe the job scheduler monitors a particular index for its documents. If a new document is added, it starts a new job using certain parameters in document defined in MLJobParameter. If the index has different documents with different format, I'm not sure how this will react. At the same time, how will the existing tasks work if job scheduler documents are present in it? I can test this out and get back to you

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can test this out and get back to you

Any update on this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it is not feasible to use the existing task-index and job scheduler requires its own index with documents following the structure of MLJobParameter.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Other plugins that have extended this plugin also have a separate index for the jobs.

public static final Set<String> stopWordsIndices = ImmutableSet.of(".plugins-ml-stop-words");
public static final String TOOL_PARAMETERS_PREFIX = "tools.parameters.";

Expand Down
388 changes: 388 additions & 0 deletions common/src/main/java/org/opensearch/ml/common/MLModel.java

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -342,4 +342,12 @@ private MLCommonsSettings() {}
/** This setting sets the remote metadata service name */
public static final Setting<String> REMOTE_METADATA_SERVICE_NAME = Setting
.simpleString("plugins.ml_commons." + REMOTE_METADATA_SERVICE_NAME_KEY, Setting.Property.NodeScope, Setting.Property.Final);

// Feature flag for enabling telemetry metric collection via metrics framework
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what's the difference between static metric vs metric?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

static metric refers to the MLStatsJobProcessor that runs every 5 mins to collect state data.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From the name metrics_collection_enabled should include metrics_static_collection_enabled. If user enable metrics_collection_enabled, the metrics_static_collection_enabled also enabled?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

metric_collection_enabled enables the metrics registry and pushes metrics to it. This needs to be enabled irrespective.

metrics_static_collection_enabled enables the job that runs every 5 mins to collect state data.

Any suggestion on what it should be?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what happens if we don't enable this : metrics_static_collection_enabled. do we still get metrics data?

public static final Setting<Boolean> ML_COMMONS_METRIC_COLLECTION_ENABLED = Setting
.boolSetting("plugins.ml_commons.metrics_collection_enabled", false, Setting.Property.NodeScope, Setting.Property.Dynamic);

// Feature flag for enabling telemetry static metric collection job -- MLStatsJobProcessor
public static final Setting<Boolean> ML_COMMONS_STATIC_METRIC_COLLECTION_ENABLED = Setting
.boolSetting("plugins.ml_commons.metrics_static_collection_enabled", false, Setting.Property.NodeScope, Setting.Property.Dynamic);
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,13 @@
import static org.opensearch.ml.common.settings.MLCommonsSettings.ML_COMMONS_CONTROLLER_ENABLED;
import static org.opensearch.ml.common.settings.MLCommonsSettings.ML_COMMONS_LOCAL_MODEL_ENABLED;
import static org.opensearch.ml.common.settings.MLCommonsSettings.ML_COMMONS_MCP_SERVER_ENABLED;
import static org.opensearch.ml.common.settings.MLCommonsSettings.ML_COMMONS_METRIC_COLLECTION_ENABLED;
import static org.opensearch.ml.common.settings.MLCommonsSettings.ML_COMMONS_MULTI_TENANCY_ENABLED;
import static org.opensearch.ml.common.settings.MLCommonsSettings.ML_COMMONS_OFFLINE_BATCH_INFERENCE_ENABLED;
import static org.opensearch.ml.common.settings.MLCommonsSettings.ML_COMMONS_OFFLINE_BATCH_INGESTION_ENABLED;
import static org.opensearch.ml.common.settings.MLCommonsSettings.ML_COMMONS_RAG_PIPELINE_FEATURE_ENABLED;
import static org.opensearch.ml.common.settings.MLCommonsSettings.ML_COMMONS_REMOTE_INFERENCE_ENABLED;
import static org.opensearch.ml.common.settings.MLCommonsSettings.ML_COMMONS_STATIC_METRIC_COLLECTION_ENABLED;

import java.util.ArrayList;
import java.util.List;
Expand Down Expand Up @@ -43,6 +46,11 @@ public class MLFeatureEnabledSetting {

private volatile Boolean isMcpServerEnabled;

private volatile Boolean isRagSearchPipelineEnabled;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need to add flag for each feature? like mlInferenceProcessor and agent?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this PR, I refactored the JobScheduler plugin to be more extensible. This involved extending the JobSchedulerPlugin from MachineLearningPlugin. JobSchedulerPlugin requires an empty constructor, however, our constructor was accepting isRagSearchPipelineEnabled. So, I had to refactor the way this setting works and pass it via MLFeatureEnabledSetting.

This change is specific to that. No, we don't need to add flags for each feature. Just a backlog item, there is also a todo in the old constructor.


private volatile Boolean isMetricCollectionEnabled;
private volatile Boolean isStaticMetricCollectionEnabled;

private final List<SettingsChangeListener> listeners = new ArrayList<>();

public MLFeatureEnabledSetting(ClusterService clusterService, Settings settings) {
Expand All @@ -55,6 +63,9 @@ public MLFeatureEnabledSetting(ClusterService clusterService, Settings settings)
isBatchInferenceEnabled = ML_COMMONS_OFFLINE_BATCH_INFERENCE_ENABLED.get(settings);
isMultiTenancyEnabled = ML_COMMONS_MULTI_TENANCY_ENABLED.get(settings);
isMcpServerEnabled = ML_COMMONS_MCP_SERVER_ENABLED.get(settings);
isRagSearchPipelineEnabled = ML_COMMONS_RAG_PIPELINE_FEATURE_ENABLED.get(settings);
isMetricCollectionEnabled = ML_COMMONS_METRIC_COLLECTION_ENABLED.get(settings);
isStaticMetricCollectionEnabled = ML_COMMONS_STATIC_METRIC_COLLECTION_ENABLED.get(settings);

clusterService
.getClusterSettings()
Expand All @@ -74,6 +85,15 @@ public MLFeatureEnabledSetting(ClusterService clusterService, Settings settings)
.getClusterSettings()
.addSettingsUpdateConsumer(ML_COMMONS_OFFLINE_BATCH_INFERENCE_ENABLED, it -> isBatchInferenceEnabled = it);
clusterService.getClusterSettings().addSettingsUpdateConsumer(ML_COMMONS_MCP_SERVER_ENABLED, it -> isMcpServerEnabled = it);
clusterService
.getClusterSettings()
.addSettingsUpdateConsumer(MLCommonsSettings.ML_COMMONS_RAG_PIPELINE_FEATURE_ENABLED, it -> isRagSearchPipelineEnabled = it);
clusterService
.getClusterSettings()
.addSettingsUpdateConsumer(ML_COMMONS_METRIC_COLLECTION_ENABLED, it -> isMetricCollectionEnabled = it);
clusterService
.getClusterSettings()
.addSettingsUpdateConsumer(ML_COMMONS_STATIC_METRIC_COLLECTION_ENABLED, it -> isStaticMetricCollectionEnabled = it);
}

/**
Expand Down Expand Up @@ -148,6 +168,22 @@ public void addListener(SettingsChangeListener listener) {
listeners.add(listener);
}

/**
* Whether the rag search pipeline feature is enabled. If disabled, APIs in ml-commons will block rag search pipeline.
* @return whether the feature is enabled.
*/
public boolean isRagSearchPipelineEnabled() {
return isRagSearchPipelineEnabled;
}

public boolean isMetricCollectionEnabled() {
return isMetricCollectionEnabled;
}

public boolean isStaticMetricCollectionEnabled() {
return isStaticMetricCollectionEnabled;
}

@VisibleForTesting
public void notifyMultiTenancyListeners(boolean isEnabled) {
for (SettingsChangeListener listener : listeners) {
Expand Down
Loading
Loading