Skip to content

[ADS OPCTL] Display prebuilt watch command for monitoring jobs with OPCTL. #225

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Jun 12, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 6 additions & 4 deletions ads/opctl/backend/ads_dataflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from typing import Dict, Union

from ads.opctl.backend.base import Backend
from ads.opctl.decorator.common import print_watch_command
from ads.common.auth import create_signer, AuthContext
from ads.common.oci_client import OCIClientFactory

Expand Down Expand Up @@ -114,21 +115,22 @@ def init(
**kwargs,
)

def apply(self):
def apply(self) -> Dict:
"""
Create DataFlow and DataFlow Run from YAML.
"""
# TODO add the logic for build dataflow and dataflow run from YAML.
raise NotImplementedError(f"`apply` hasn't been supported for data flow yet.")

def run(self) -> None:
@print_watch_command
def run(self) -> Dict:
"""
Create DataFlow and DataFlow Run from OCID or cli parameters.
"""
with AuthContext(auth=self.auth_type, profile=self.profile):
if self.config["execution"].get("ocid", None):
data_flow_id = self.config["execution"]["ocid"]
run_id = Job.from_dataflow_job(data_flow_id).run().id
job_id = self.config["execution"]["ocid"]
run_id = Job.from_dataflow_job(job_id).run().id
else:
infra = self.config.get("infrastructure", {})
if any(k not in infra for k in REQUIRED_FIELDS):
Expand Down
13 changes: 7 additions & 6 deletions ads/opctl/backend/ads_ml_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,10 @@
ScriptRuntime,
)
from ads.opctl import logger
from ads.opctl.backend.base import (
Backend,
RuntimeFactory,
)
from ads.opctl.backend.base import Backend, RuntimeFactory
from ads.opctl.config.resolver import ConfigResolver
from ads.opctl.constants import DEFAULT_IMAGE_SCRIPT_DIR
from ads.opctl.decorator.common import print_watch_command
from ads.opctl.distributed.common.cluster_config_helper import (
ClusterConfigToJobSpecConverter,
)
Expand Down Expand Up @@ -126,7 +124,8 @@ def init(
**kwargs,
)

def apply(self) -> None:
@print_watch_command
def apply(self) -> Dict:
"""
Create Job and Job Run from YAML.
"""
Expand All @@ -136,8 +135,10 @@ def apply(self) -> None:
job_run = job.run()
print("JOB OCID:", job.id)
print("JOB RUN OCID:", job_run.id)
return {"job_id": job.id, "run_id": job_run.id}

def run(self) -> None:
@print_watch_command
def run(self) -> Dict:
"""
Create Job and Job Run from OCID or cli parameters.
"""
Expand Down
12 changes: 9 additions & 3 deletions ads/opctl/backend/ads_ml_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from ads.common.oci_client import OCIClientFactory
from ads.opctl.backend.base import Backend
from ads.opctl.backend.ads_ml_job import JobRuntimeFactory
from ads.opctl.decorator.common import print_watch_command
from ads.pipeline import Pipeline, PipelineRun, PipelineStep, CustomScriptStep

from ads.jobs import PythonRuntime
Expand All @@ -34,7 +35,8 @@ def __init__(self, config: Dict) -> None:
self.profile = config["execution"].get("oci_profile", None)
self.client = OCIClientFactory(**self.oci_auth).data_science

def apply(self) -> None:
@print_watch_command
def apply(self) -> Dict:
"""
Create Pipeline and Pipeline Run from YAML.
"""
Expand All @@ -44,16 +46,20 @@ def apply(self) -> None:
pipeline_run = pipeline.run()
print("PIPELINE OCID:", pipeline.id)
print("PIPELINE RUN OCID:", pipeline_run.id)
return {"job_id": pipeline.id, "run_id": pipeline_run.id}

def run(self) -> None:
@print_watch_command
def run(self) -> Dict:
"""
Create Pipeline and Pipeline Run from OCID.
"""
pipeline_id = self.config["execution"]["ocid"]
with AuthContext(auth=self.auth_type, profile=self.profile):
pipeline = Pipeline.from_ocid(ocid=pipeline_id)
pipeline_run = pipeline.run()
print("PIPELINE OCID:", pipeline.id)
print("PIPELINE RUN OCID:", pipeline_run.id)
return {"job_id": pipeline.id, "run_id": pipeline_run.id}

def delete(self) -> None:
"""
Expand Down Expand Up @@ -84,7 +90,7 @@ def watch(self) -> None:
Watch Pipeline Run from OCID.
"""
run_id = self.config["execution"]["run_id"]
log_type = self.config["execution"]["log_type"]
log_type = self.config["execution"].get("log_type")
with AuthContext(auth=self.auth_type, profile=self.profile):
PipelineRun.from_ocid(run_id).watch(log_type=log_type)

Expand Down
6 changes: 3 additions & 3 deletions ads/opctl/backend/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ def run(self) -> Dict:

Returns
-------
None
Dict
"""

def delete(self) -> None:
Expand Down Expand Up @@ -62,13 +62,13 @@ def cancel(self) -> None:
None
"""

def apply(self) -> None:
def apply(self) -> Dict:
"""
Initiate Data Science service from YAML.

Returns
-------
None
Dict
"""

def activate(self) -> None:
Expand Down
1 change: 1 addition & 0 deletions ads/opctl/cmds.py
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,7 @@ def run(config: Dict, **kwargs) -> Dict:
if (
"kind" in p.config
and p.config["execution"].get("backend", None) != BACKEND_NAME.LOCAL.value
and "ocid" not in p.config["execution"]
):
p.config["execution"]["backend"] = p.config["kind"]
return _BackendFactory(p.config).backend.apply()
Expand Down
5 changes: 5 additions & 0 deletions ads/opctl/decorator/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
#!/usr/bin/env python
# -*- coding: utf-8; -*-

# Copyright (c) 2023 Oracle and its affiliates.
# Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/
25 changes: 25 additions & 0 deletions ads/opctl/decorator/common.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
#!/usr/bin/env python
# -*- coding: utf-8; -*-

# Copyright (c) 2023 Oracle and/or its affiliates.
# Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/

from typing import Dict, Callable
from functools import wraps

RUN_ID_FIELD = "run_id"

def print_watch_command(func: callable)->Callable:
"""The decorator to help build the `opctl watch` command."""
@wraps(func)
def wrapper(*args, **kwargs)->Dict:
result = func(*args, **kwargs)
if result and isinstance(result, Dict) and RUN_ID_FIELD in result:
msg_header = (
f"{'*' * 40} To monitor the progress of a task, execute the following command {'*' * 40}"
)
print(msg_header)
print(f"ads opctl watch {result[RUN_ID_FIELD]}")
print("*" * len(msg_header))
return result
return wrapper
12 changes: 6 additions & 6 deletions docs/source/user_guide/cli/opctl/_template/jobs.rst
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,12 @@ Prerequisite

:doc:`Install ADS CLI <../../quickstart>`

Running a Pre Defined Job
Running a Pre Defined Job
-------------------------

.. code-block:: shell

ads opctl run -j <job ocid>
ads opctl run --ocid <job ocid>

Delete Job or Job Run
---------------------
Expand All @@ -36,15 +36,15 @@ Stop a running cluster using ``cancel`` subcommand.
**Option 1: Using Job OCID and Work Dir**

.. code-block:: shell
ads opctl cancel -j <job ocid> --work-dir <Object storage working directory specified when the cluster was created>

ads opctl cancel <job ocid> --work-dir <Object storage working directory specified when the cluster was created>

**Option 2: Using cluster info file**

Cluster info file is a yaml file with output generated from ``ads opctl run -f``

.. code-block:: shell
ads opctl cancel -j <job ocid> --work-dir <Object storage working directory specified when the cluster was created>

ads opctl cancel <job ocid> --work-dir <Object storage working directory specified when the cluster was created>

This command requires an api key or resource principal setup. The logs are streamed from the logging service. If your job is not attached to logging service, this option will show only the lifecycle state.