Skip to content

[AQUA Telemetry] Update MD Tracking #1193

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 10 commits into
base: main
Choose a base branch
from
5 changes: 4 additions & 1 deletion ads/aqua/modeldeployment/constants.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
#!/usr/bin/env python
# Copyright (c) 2024 Oracle and/or its affiliates.
# Copyright (c) 2024, 2025 Oracle and/or its affiliates.
# Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/

"""
Expand All @@ -8,3 +8,6 @@

This module contains constants used in Aqua Model Deployment.
"""

DEFAULT_WAIT_TIME = 12000
DEFAULT_POLL_INTERVAL = 10
63 changes: 62 additions & 1 deletion ads/aqua/modeldeployment/deployment.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
extract_base_model_from_ft,
extract_fine_tune_artifacts_path,
)
from ads.aqua.modeldeployment.constants import DEFAULT_POLL_INTERVAL, DEFAULT_WAIT_TIME
from ads.aqua.modeldeployment.entities import (
AquaDeployment,
AquaDeploymentConfig,
Expand All @@ -65,8 +66,10 @@
ModelDeploymentConfigSummary,
)
from ads.aqua.modeldeployment.utils import MultiModelDeploymentConfigLoader
from ads.common.decorator.threaded import thread_pool
from ads.common.object_storage_details import ObjectStorageDetails
from ads.common.utils import UNKNOWN, get_log_links
from ads.common.work_request import DataScienceWorkRequest
from ads.config import (
AQUA_DEPLOYMENT_CONTAINER_CMD_VAR_METADATA_NAME,
AQUA_DEPLOYMENT_CONTAINER_METADATA_NAME,
Expand Down Expand Up @@ -788,7 +791,14 @@ def _create_deployment(

deployment_id = deployment.id
logger.info(
f"Aqua model deployment {deployment_id} created for model {aqua_model_id}."
f"Aqua model deployment {deployment_id} created for model {aqua_model_id}. Work request Id is {deployment.dsc_model_deployment.workflow_req_id}"
)

thread_pool.submit(
self.get_deployment_status,
deployment_id,
deployment.dsc_model_deployment.workflow_req_id,
model_type,
)

# we arbitrarily choose last 8 characters of OCID to identify MD in telemetry
Expand Down Expand Up @@ -1313,3 +1323,54 @@ def list_shapes(self, **kwargs) -> List[ComputeShapeSummary]:
)
for oci_shape in oci_shapes
]

def get_deployment_status(
self, model_deployment_id: str, work_request_id: str, model_type: str
) -> None:
"""Waits for the data science model deployment to be completed and log its status in telemetry.

Parameters
----------

model_deployment_id: str
The id of the deployed aqua model.
work_request_id: str
The work request Id of the model deployment.
model_type: str
The type of aqua model to be deployed. Allowed values are: `custom`, `service` and `multi_model`.

Returns
-------
AquaDeployment
An Aqua deployment instance.
"""
ocid = get_ocid_substring(model_deployment_id, key_len=8)
telemetry_kwargs = {"ocid": ocid}

data_science_work_request: DataScienceWorkRequest = DataScienceWorkRequest(
work_request_id
)

try:
data_science_work_request.wait_work_request(
progress_bar_description="Creating model deployment",
max_wait_time=DEFAULT_WAIT_TIME,
poll_interval=DEFAULT_POLL_INTERVAL,
)
except Exception as e:
logger.error("Error while trying to create model deployment: " + str(e))
print("Error while trying to create model deployment: " + str(e))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's cleanup the print statement.

self.telemetry.record_event_async(
category=f"aqua/{model_type}/deployment/status",
action="FAILED",
detail=data_science_work_request._error_message,
value=ocid,
**telemetry_kwargs,
)
else:
self.telemetry.record_event_async(
category=f"aqua/{model_type}/deployment/status",
action="SUCCEEDED",
value=ocid,
**telemetry_kwargs,
)
4 changes: 3 additions & 1 deletion ads/common/work_request.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ def __init__(
description: str = "Processing",
config: dict = None,
signer: Signer = None,
client_kwargs: dict = None,
client_kwargs: dict = None,
**kwargs
) -> None:
"""Initializes ADSWorkRequest object.
Expand Down Expand Up @@ -65,6 +65,7 @@ def __init__(
self._description = description
self._percentage = 0
self._status = None
_error_message = None
super().__init__(config, signer, client_kwargs, **kwargs)


Expand All @@ -78,6 +79,7 @@ def _sync(self):
self._percentage= work_request.percent_complete
self._status = work_request.status
self._description = work_request_logs[-1].message if work_request_logs else "Processing"
if work_request.status == 'FAILED' : self._error_message = self.client.list_work_request_errors
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

might be good to show an example output for failed and successful MD in the PR description.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also let's use ruff formatter to format the code.


def watch(
self,
Expand Down
17 changes: 6 additions & 11 deletions ads/model/service/oci_datascience_model_deployment.py
Original file line number Diff line number Diff line change
Expand Up @@ -185,9 +185,9 @@ def activate(
self.id,
)


self.workflow_req_id = response.headers.get("opc-work-request-id", None)
if wait_for_completion:
self.workflow_req_id = response.headers.get("opc-work-request-id", None)

try:
DataScienceWorkRequest(self.workflow_req_id).wait_work_request(
progress_bar_description="Activating model deployment",
Expand Down Expand Up @@ -233,11 +233,9 @@ def create(
response = self.client.create_model_deployment(create_model_deployment_details)
self.update_from_oci_model(response.data)
logger.info(f"Creating model deployment `{self.id}`.")
print(f"Model Deployment OCID: {self.id}")

self.workflow_req_id = response.headers.get("opc-work-request-id", None)
if wait_for_completion:
self.workflow_req_id = response.headers.get("opc-work-request-id", None)

try:
DataScienceWorkRequest(self.workflow_req_id).wait_work_request(
progress_bar_description="Creating model deployment",
Expand Down Expand Up @@ -287,10 +285,8 @@ def deactivate(
response = self.client.deactivate_model_deployment(
self.id,
)

self.workflow_req_id = response.headers.get("opc-work-request-id", None)
if wait_for_completion:
self.workflow_req_id = response.headers.get("opc-work-request-id", None)

try:
DataScienceWorkRequest(self.workflow_req_id).wait_work_request(
progress_bar_description="Deactivating model deployment",
Expand Down Expand Up @@ -355,10 +351,9 @@ def delete(
response = self.client.delete_model_deployment(
self.id,
)


self.workflow_req_id = response.headers.get("opc-work-request-id", None)
if wait_for_completion:
self.workflow_req_id = response.headers.get("opc-work-request-id", None)

try:
DataScienceWorkRequest(self.workflow_req_id).wait_work_request(
progress_bar_description="Deleting model deployment",
Expand Down
11 changes: 6 additions & 5 deletions ads/telemetry/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,11 @@
# Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/


import concurrent.futures
import logging
import threading
import traceback
import urllib.parse
from typing import Optional
import concurrent.futures

import oci

Expand All @@ -20,6 +19,7 @@
THREAD_POOL_SIZE = 16
thread_pool = concurrent.futures.ThreadPoolExecutor(max_workers=THREAD_POOL_SIZE)


class TelemetryClient(TelemetryBase):
"""Represents a telemetry python client providing functions to record an event.

Expand Down Expand Up @@ -81,7 +81,8 @@ def record_event(
# Here `endpoint`` is for debugging purpose
# For some federated/domain users, the `endpoint` may not be a valid URL
endpoint = f"{self.service_endpoint}/n/{self.namespace}/b/{self.bucket}/o/telemetry/{category}/{action}"
logger.debug(f"Sending telemetry to endpoint: {endpoint}")
logger.info(f"Sending telemetry to endpoint: {endpoint}")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's use debug instead

print(f"Sending telemetry to endpoint: {endpoint}")

self.os_client.base_client.user_agent = self._encode_user_agent(**kwargs)
try:
Expand All @@ -104,7 +105,7 @@ def record_event(

def record_event_async(
self, category: str = None, action: str = None, detail: str = None, **kwargs
)-> None:
) -> None:
"""Send a head request to generate an event record.

Parameters
Expand All @@ -119,4 +120,4 @@ def record_event_async(
Thread
A started thread to send a head request to generate an event record.
"""
thread_pool.submit(self.record_event, args=(category, action, detail), kwargs=kwargs)
thread_pool.submit(self.record_event, category, action, detail, **kwargs)
25 changes: 25 additions & 0 deletions tests/unitary/with_extras/aqua/test_deployment.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
from importlib import reload
from unittest.mock import MagicMock, patch

from ads.aqua.modeldeployment.constants import DEFAULT_POLL_INTERVAL, DEFAULT_WAIT_TIME
from ads.model.service.oci_datascience_model_deployment import OCIDataScienceModelDeployment
import oci
import pytest
from oci.data_science.models import (
Expand Down Expand Up @@ -2276,3 +2278,26 @@ def test_validate_multimodel_deployment_feasibility_positive_single(
total_gpus,
"test_data/deployment/aqua_summary_multi_model_single.json",
)

def test_get_deployment_status(self) :
deployment_id = "fakeid.datasciencemodeldeployment.oc1.iad.xxx"
work_request_id = "fakeid.workrequest.oc1.iad.xxx"
model_type = "custom"

with patch(
"ads.model.service.oci_datascience_model_deployment.DataScienceWorkRequest.__init__"
) as mock_ds_work_request:
mock_ds_work_request.return_value = None
with patch(
"ads.model.service.oci_datascience_model_deployment.DataScienceWorkRequest.wait_work_request"
) as mock_wait:
self.app.get_deployment_status(
deployment_id, work_request_id, model_type
)

mock_ds_work_request.assert_called_with("test")
mock_wait.assert_called_with(
progress_bar_description='Creating model deployment',
max_wait_time=DEFAULT_WAIT_TIME,
poll_interval=DEFAULT_POLL_INTERVAL
)
Loading