diff --git a/README.md b/README.md index fedd64c46..bd6fb2868 100644 --- a/README.md +++ b/README.md @@ -33,6 +33,7 @@ We use pre-commit to make sure the code is consistently formatted. To make sure - To run the unit tests, run `pytest -v tests/unit_test.py` - Any new test functions/scripts can be added into the `tests` folder - NOTE: Functional tests coming soon, will live in `tests/func_test.py` +- To test CLI, run `codeflare` followed by any command. To see list of commands, simply run `codeflare` #### Code Coverage diff --git a/pyproject.toml b/pyproject.toml index 6f8393ef2..77662e908 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -29,6 +29,7 @@ codeflare-torchx = "0.6.0.dev0" cryptography = "40.0.2" executing = "1.2.0" pydantic = "< 2" +click = "8.0.4" [tool.poetry.group.docs] optional = true @@ -40,3 +41,10 @@ pdoc3 = "0.10.0" pytest = "7.4.0" coverage = "7.2.7" pytest-mock = "3.11.1" + +[tool.poetry.scripts] +codeflare = "codeflare_sdk.cli.codeflare_cli:cli" + +[build-system] +requires = ["poetry_core>=1.0.0"] +build-backend = "poetry.core.masonry.api" diff --git a/requirements.txt b/requirements.txt index 2a48812aa..c5d04bdc7 100644 --- a/requirements.txt +++ b/requirements.txt @@ -6,3 +6,4 @@ codeflare-torchx==0.6.0.dev0 pydantic<2 # 2.0+ broke ray[default] see detail: https://github.com/ray-project/ray/pull/37000 cryptography==40.0.2 executing==1.2.0 +click==8.0.4 diff --git a/src/codeflare_sdk.egg-info/SOURCES.txt b/src/codeflare_sdk.egg-info/SOURCES.txt index cfea1dbff..acd40f211 100644 --- a/src/codeflare_sdk.egg-info/SOURCES.txt +++ b/src/codeflare_sdk.egg-info/SOURCES.txt @@ -19,3 +19,7 @@ src/codeflare_sdk/utils/generate_cert.py src/codeflare_sdk/utils/generate_yaml.py src/codeflare_sdk/utils/kube_api_helpers.py src/codeflare_sdk/utils/pretty_print.py +src/codeflare_sdk/cli/__init__.py +src/codeflare_sdk/cli/codeflare_cli.py +src/codeflare_sdk/cli/commands/create.py +src/codeflare_sdk/cli/cli_utils.py diff --git a/src/codeflare_sdk/cli/CodeflareCLI_Design_Doc.md b/src/codeflare_sdk/cli/CodeflareCLI_Design_Doc.md new file mode 100644 index 000000000..da3c2a510 --- /dev/null +++ b/src/codeflare_sdk/cli/CodeflareCLI_Design_Doc.md @@ -0,0 +1,179 @@ +# CodeFlare CLI Design + + +## Context and Scope + + +The primary purpose of the CLI is to serve as an interaction layer between a user and the CodeFlare stack (MCAD, InstaScale, KubeRay) from within the terminal. This addition is required due to the fact that a large set of our target users come from a high-performance computing background and are most familiar and comfortable submitting jobs to a cluster via a CLI. + + +The CLI will utilize the existing CodeFlare SDK. It will allow for similar operations that the SDK provides (such as Ray Cluster and job management) but in the terminal. The CLI adds some additional functions, allows for saved time, simpler workspaces, and automation of certain processes via bash scripts on top of the existing SDK. + + + + +## Goals + + +- Provide users the ability to request, monitor and stop the Kubernetes resources associated with the CodeFlare stack within the terminal. +- Serve as an interaction layer between the data scientist and CodeFlare stack (MCAD, InstaScale, KubeRay) +- Allow for a user-friendly workflow within the terminal +- Allow for automation and scripting of job/RayCluster management via bash scripts + + +## Non-Goals + + +- Do not want to re-make the functionality that is found in the existing CodeFlare SDK or any of the SDK’s clients for Ray, MCAD, or any other service + + +## Architecture and Design + + +The CodeFlare CLI is an extension to the CodeFlare SDK package that allows a user to create, monitor, and shut down framework clusters (RayClusters for now) and distributed training jobs on an authenticated Kubernetes cluster from the terminal. + + +The user should have the ability to do the following from within the terminal: +- Create, view details, view status, submit, delete Ray Clusters via appwrappers +- Create, view logs, view status, submit, delete jobs +- List out all jobs +- List out all ray clusters +- Login to Kubernetes cluster +- Logout of Kubernetes cluster + + +To support these operations, additional functions to the SDK may include: +- Formatted listing ray clusters +- Formatted listing jobs +- Getting a job given the name + + +For the majority of functionality, the CLI will utilize the SDK’s already built functionality. + + +### CLI Framework: + + +[Click](https://click.palletsprojects.com/en/8.1.x/) is the chosen CLI framework for the following reasons +- Simple syntax/layout: Since the CLI commands are very complex, it is important that the CLI framework doesn’t add any unnecessary complexity +- Supports functional commands instead of objects: This is important because the SDK is designed with various functions, and the CLI being similar improves readability +- Comes with testing and help generation: Testing library and automatic help generation quickens development process +- Large community support/documentation: extensive documentation and large community leads to less errors and easier development. + + +### Framework Clusters: + + +When the user invokes the `define raycluster` command, a yaml file with default values is created and put in the user’s current working directory. Users can customize their clusters by adding parameters to the define command and these values will override the defaults when creating the AppWrapper yaml file. + + +Once the appwrapper is defined, the user can create the ray cluster via a create command. When the user invokes the `create raycluster`, they will specify the name of the cluster to submit. The CLI will first check to see whether or not the specified name is already present in the Kubernetes cluster. If it isn’t already present, then it will search the current working directory for a yaml file corresponding to cluster name and apply it to the K8S cluster. If the wait flag is specified, then the CLI will display a loading sign with status updates until the cluster is up. + + +We will try to find a good balance between exposing more parameters and simplifying the process by acting on feedback from CLI users. + + +For `delete raycluster`, the user will invoke the command, and the CLI will shut it down and delete it. + + +### Training Jobs + + +When the user invokes `define job` command, a DDPJobDefiniton object will be created and saved into a file. Users can customize their jobs using parameters to the define command. + + +Once the job is defined, the user can submit the job via a `job submit` command. When the user submits a job, the user will specify the job name. The CLI will then check to see if the job is already on the Kubernetes cluster and if not it will submit the job. The job submitted will be a DDPJob and it will be submitted onto a specified ray cluster. + + +When the user wants to delete a job, they just invoke the job delete command, and the CLI will stop the job and delete it. This can happen at any time assuming there is a job running. + + +### Authentication + + +Users will need to be authenticated into a Kubernetes cluster in order to be able to perform all operations. + + +If the user tries to perform any operation without being logged in, the CLI will prompt them to authenticate. A kubeconfig will have to be valid in the users environment in order to perform any operation. + + +The user will be able to login using a simple `login` command and will have the choice of logging in via server + token. The user can also choose whether or not they want tls-verification. If there is a kubeconfig, the CLI will update it, else it will create one for the user. + + +Alternatively, the user can invoke the login command with their kubeconfig file path, and this will login the user using their kubeconfig file. + + +Users can logout of their cluster using the `logout` command. + + + + +### Listing Info + + +Users can list both ray cluster information and job information by invoking respective commands. CLI will list information for each raycluster/job such as requested resources, status, name, and namespace. + + +## Alternatives Considered + + +- Existing CodeFlare CLI + - Written in TypeScript and overcomplicated. Did not support +- Just using SDK + - Making a CLI saves a lot of time and is easier for the user in some cases +- Interactive CLI + - Interactive CLIs make it harder for automation via bash scripts +- Other CLI libraries + - **Cliff:** Ugly syntax, less readability, not much functionality. + - **Argparse:** Less functionality out of the box. More time spent on unnecessary reimplementation. + - **Cement:** Ugly syntax and low community support. + + +## Security Considerations + + +We will rely on Kubernetes default security, where users can not perform any operations on a cluster if they are not authenticated correctly. + + +## Testing and Validation +The CLI is found within the SDK, so it will be [tested](https://github.com/project-codeflare/codeflare-sdk/blob/main/CodeFlareSDK_Design_Doc.md#testing-and-validation) the same way. + + +## Deployment and Rollout +- The CLI will be deployed within the CodeFlare SDK so similar [considerations](https://github.com/project-codeflare/codeflare-sdk/blob/main/CodeFlareSDK_Design_Doc.md#deployment-and-rollout) will be taken into account. + + +## Command Usage Examples +Create ray cluster +- `codeflare create raycluster [options]` + + +Doing something to a ray cluster: +- `codeflare {operation} raycluster {cluster_name} [options e.g. --gpu=0]` + + +Create job +- `codeflare create job [options]` + + +Doing something to a job: +- `codeflare {operation} job {job_name} [options e.g. cluster-name=”mycluster”]` +- Namespace and ray cluster name will be required as options + + +Listing out clusters +- `codeflare list raycluster -n {namespace} OR codeflare list ray-cluster –all` + + +Listing out jobs +- `codeflare list job -c {cluster_name} -n {namespace}` +- `codeflare list job -n {namespace}` +- `codeflare list job --all` + + +Login to kubernetes cluster +- `codeflare login [options e.g. --configpath={path/to/kubeconfig}]` (if configpath is left blank default value is used) + + +Logout of kubernetes cluster +- `codeflare logout` diff --git a/src/codeflare_sdk/cli/__init__.py b/src/codeflare_sdk/cli/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/src/codeflare_sdk/cli/cli_utils.py b/src/codeflare_sdk/cli/cli_utils.py new file mode 100644 index 000000000..e4ce6432d --- /dev/null +++ b/src/codeflare_sdk/cli/cli_utils.py @@ -0,0 +1,173 @@ +import ast +import click +from kubernetes import client, config +import pickle +import os +from ray.job_submission import JobSubmissionClient +from torchx.runner import get_runner +from rich.table import Table +from rich import print + +from codeflare_sdk.cluster.cluster import list_clusters_all_namespaces, get_cluster +from codeflare_sdk.cluster.model import RayCluster +from codeflare_sdk.cluster.auth import _create_api_client_config, config_check +from codeflare_sdk.utils.kube_api_helpers import _kube_api_error_handling +import codeflare_sdk.cluster.auth as sdk_auth + + +class PythonLiteralOption(click.Option): + def type_cast_value(self, ctx, value): + try: + if not value: + return None + return ast.literal_eval(value) + except: + raise click.BadParameter(value) + + +class AuthenticationConfig: + """ + Authentication configuration that will be stored in a file once + the user logs in using `codeflare login` + """ + + def __init__( + self, + token: str, + server: str, + skip_tls: bool, + ca_cert_path: str, + ): + self.api_client_config = _create_api_client_config( + token, server, skip_tls, ca_cert_path + ) + self.server = server + self.token = token + + def create_client(self): + return client.ApiClient(self.api_client_config) + + +def load_auth(): + """ + Loads AuthenticationConfiguration and stores it in global variables + which can be used by the SDK for authentication + """ + try: + auth_file_path = os.path.expanduser("~/.codeflare/auth") + with open(auth_file_path, "rb") as file: + auth = pickle.load(file) + sdk_auth.api_client = auth.create_client() + return auth + except (IOError, EOFError): + click.echo("No authentication found, trying default kubeconfig") + except client.ApiException: + click.echo("Invalid authentication, trying default kubeconfig") + + +class PluralAlias(click.Group): + def get_command(self, ctx, cmd_name): + rv = click.Group.get_command(self, ctx, cmd_name) + if rv is not None: + return rv + for x in self.list_commands(ctx): + if x + "s" == cmd_name: + return click.Group.get_command(self, ctx, x) + return None + + def resolve_command(self, ctx, args): + # always return the full command name + _, cmd, args = super().resolve_command(ctx, args) + return cmd.name, cmd, args + + +def print_jobs(jobs): + headers = ["Submission ID", "Job ID", "RayCluster", "Namespace", "Status"] + table = Table(show_header=True) + for header in headers: + table.add_column(header) + for job in jobs: + table.add_row(*[job[header] for header in headers]) + print(table) + + +def list_all_kubernetes_jobs(print_to_console=True): + k8s_jobs = [] + runner = get_runner() + jobs = runner.list(scheduler="kubernetes_mcad") + rayclusters = { + raycluster.name for raycluster in list_clusters_all_namespaces(False) + } + for job in jobs: + namespace, name = job.app_id.split(":") + status = job.state + if name not in rayclusters: + k8s_jobs.append( + { + "Submission ID": name, + "Job ID": "N/A", + "RayCluster": "N/A", + "Namespace": namespace, + "Status": str(status), + "App Handle": job.app_handle, + } + ) + if print_to_console: + print_jobs(k8s_jobs) + return k8s_jobs + + +def list_all_jobs(print_to_console=True): + k8s_jobs = list_all_kubernetes_jobs(False) + rc_jobs = list_all_raycluster_jobs(False) + all_jobs = rc_jobs + k8s_jobs + if print_to_console: + print_jobs(all_jobs) + return all_jobs + + +def list_raycluster_jobs(cluster: RayCluster, print_to_console=True): + rc_jobs = [] + client = JobSubmissionClient(cluster.dashboard) + jobs = client.list_jobs() + for job in jobs: + job_obj = { + "Submission ID": job.submission_id, + "Job ID": job.job_id, + "RayCluster": cluster.name, + "Namespace": cluster.namespace, + "Status": str(job.status), + "App Handle": "ray://torchx/" + cluster.dashboard + "-" + job.submission_id, + } + rc_jobs.append(job_obj) + if print_to_console: + print_jobs(rc_jobs) + return rc_jobs + + +def list_all_raycluster_jobs(print_to_console=True): + rc_jobs = [] + clusters = list_clusters_all_namespaces(False) + for cluster in clusters: + cluster.dashboard = "http://" + cluster.dashboard + rc_jobs += list_raycluster_jobs(cluster, False) + if print_to_console: + print_jobs(rc_jobs) + return rc_jobs + + +def get_job_app_handle(job_submission): + job = get_job_object(job_submission) + return job["App Handle"] + + +def get_job_object(job_submission): + all_jobs = list_all_jobs(False) + for job in all_jobs: + if job["Submission ID"] == job_submission: + return job + raise ( + FileNotFoundError( + f"Job {job_submission} not found. Try using 'codeflare list --all' to see all jobs" + ) + ) diff --git a/src/codeflare_sdk/cli/codeflare_cli.py b/src/codeflare_sdk/cli/codeflare_cli.py new file mode 100644 index 000000000..c9f17a6dc --- /dev/null +++ b/src/codeflare_sdk/cli/codeflare_cli.py @@ -0,0 +1,54 @@ +import click +import os + +from codeflare_sdk.cli.cli_utils import load_auth +from codeflare_sdk.cluster.cluster import get_current_namespace + +cmd_folder = os.path.abspath(os.path.join(os.path.dirname(__file__), "commands")) + + +class CodeflareContext: + def __init__(self): + self.codeflare_path = _initialize_codeflare_folder() + self.current_namespace = get_current_namespace() + + +def _initialize_codeflare_folder(): + codeflare_folder = os.path.expanduser("~/.codeflare") + if not os.path.exists(codeflare_folder): + os.makedirs(codeflare_folder) + return codeflare_folder + + +class CodeflareCLI(click.MultiCommand): + def list_commands(self, ctx): + rv = [] + for filename in os.listdir(cmd_folder): + if filename.endswith(".py") and filename != "__init__.py": + rv.append(filename[:-3]) + rv.sort() + return rv + + def get_command(self, ctx, name): + ns = {} + fn = os.path.join(cmd_folder, name + ".py") + try: + with open(fn) as f: + code = compile(f.read(), fn, "exec") + eval(code, ns, ns) + return ns["cli"] + except FileNotFoundError: + return + + +@click.command(cls=CodeflareCLI) +@click.pass_context +def cli(ctx): + if ctx.invoked_subcommand != "login" and ctx.invoked_subcommand != "logout": + load_auth() + ctx.obj = CodeflareContext() # Ran on every command + pass + + +if __name__ == "__main__": + cli() diff --git a/src/codeflare_sdk/cli/commands/cancel.py b/src/codeflare_sdk/cli/commands/cancel.py new file mode 100644 index 000000000..7222d3448 --- /dev/null +++ b/src/codeflare_sdk/cli/commands/cancel.py @@ -0,0 +1,27 @@ +import click +from torchx.runner import get_runner + + +from codeflare_sdk.cli.cli_utils import get_job_app_handle + + +@click.group() +def cli(): + """Cancel a resource""" + pass + + +@cli.command() +@click.pass_context +@click.argument("submission-id", type=str) +def job(ctx, submission_id): + """Cancel a job""" + runner = get_runner() + try: + app_handle = get_job_app_handle(submission_id) + runner.cancel(app_handle=app_handle) + click.echo(f"{submission_id} cancelled successfully") + except FileNotFoundError: + click.echo(f"Submission ID {submission_id} not found in Kubernetes Cluster") + except Exception as e: + click.echo("Error cancelling job: " + str(e)) diff --git a/src/codeflare_sdk/cli/commands/define.py b/src/codeflare_sdk/cli/commands/define.py new file mode 100644 index 000000000..d28262902 --- /dev/null +++ b/src/codeflare_sdk/cli/commands/define.py @@ -0,0 +1,72 @@ +import click +import pickle + +from codeflare_sdk.cluster.cluster import Cluster +from codeflare_sdk.cluster.config import ClusterConfiguration +from codeflare_sdk.cli.cli_utils import PythonLiteralOption +from codeflare_sdk.job.jobs import DDPJobDefinition + + +@click.group() +def cli(): + """Define a resource with parameter specifications""" + pass + + +@cli.command() +@click.pass_context +@click.option("--name", type=str, required=True) +@click.option("--namespace", "-n", type=str) +@click.option("--head-info", cls=PythonLiteralOption, type=list) +@click.option("--machine-types", cls=PythonLiteralOption, type=list) +@click.option("--min-cpus", type=int) +@click.option("--max-cpus", type=int) +@click.option("--num-workers", type=int) +@click.option("--min-memory", type=int) +@click.option("--max-memory", type=int) +@click.option("--num-gpus", type=int) +@click.option("--template", type=str) +@click.option("--instascale", type=bool) +@click.option("--envs", cls=PythonLiteralOption, type=dict) +@click.option("--image", type=str) +@click.option("--local-interactive", type=bool) +@click.option("--image-pull-secrets", cls=PythonLiteralOption, type=list) +def raycluster(ctx, **kwargs): + """Define a RayCluster with parameter specifications""" + filtered_kwargs = {k: v for k, v in kwargs.items() if v is not None} + if "namespace" not in filtered_kwargs.keys(): + filtered_kwargs["namespace"] = ctx.obj.current_namespace + clusterConfig = ClusterConfiguration(**filtered_kwargs) + Cluster(clusterConfig) # Creates yaml file + + +@cli.command() +@click.pass_context +@click.option("--script", type=str, required=True) +@click.option("--m", type=str) +@click.option("--script-args", cls=PythonLiteralOption, type=list) +@click.option("--name", type=str, required=True) +@click.option("--cpu", type=int) +@click.option("--gpu", type=int) +@click.option("--memMB", type=int) +@click.option("--h", type=str) +@click.option("--j", type=str) +@click.option("--env", cls=PythonLiteralOption, type=dict) +@click.option("--max-retries", type=int) +@click.option("--mounts", cls=PythonLiteralOption, type=list) +@click.option("--rdzv-port", type=int) +@click.option("--rdzv-backend", type=str) +@click.option("--scheduler-args", cls=PythonLiteralOption, type=dict) +@click.option("--image", type=str) +@click.option("--workspace", type=str) +def job(ctx, **kwargs): + """Define a job with specified resources""" + filtered_kwargs = {k: v for k, v in kwargs.items() if v is not None} + if "memmb" in filtered_kwargs: + filtered_kwargs["memMB"] = filtered_kwargs["memmb"] + del filtered_kwargs["memmb"] + job_def = DDPJobDefinition(**filtered_kwargs) + job_file_path = ctx.obj.codeflare_path + f"/{job_def.name}" + with open(job_file_path, "wb") as file: + pickle.dump(job_def, file) + click.echo("Job definition saved to " + job_file_path) diff --git a/src/codeflare_sdk/cli/commands/delete.py b/src/codeflare_sdk/cli/commands/delete.py new file mode 100644 index 000000000..c225d428a --- /dev/null +++ b/src/codeflare_sdk/cli/commands/delete.py @@ -0,0 +1,29 @@ +import click + +from codeflare_sdk.cluster.cluster import get_cluster + + +@click.group() +def cli(): + """ + Delete a specified resource from the Kubernetes cluster + """ + pass + + +@cli.command() +@click.pass_context +@click.argument("name", type=str) +@click.option("--namespace", type=str) +def raycluster(ctx, name, namespace): + """ + Delete a specified RayCluster from the Kubernetes cluster + """ + namespace = namespace or ctx.obj.current_namespace + try: + cluster = get_cluster(name, namespace) + except FileNotFoundError: + click.echo(f"Cluster {name} not found in {namespace} namespace") + return + cluster.down() + click.echo(f"Cluster deleted successfully") diff --git a/src/codeflare_sdk/cli/commands/details.py b/src/codeflare_sdk/cli/commands/details.py new file mode 100644 index 000000000..f6890e7d6 --- /dev/null +++ b/src/codeflare_sdk/cli/commands/details.py @@ -0,0 +1,24 @@ +import click + +from codeflare_sdk.cluster.cluster import get_cluster + + +@click.group() +def cli(): + """Get the details of a specified resource""" + pass + + +@cli.command() +@click.argument("name", type=str) +@click.option("--namespace", type=str) +@click.pass_context +def raycluster(ctx, name, namespace): + """Get the details of a specified RayCluster""" + namespace = namespace or ctx.obj.current_namespace + try: + cluster = get_cluster(name, namespace) + except FileNotFoundError: + click.echo(f"Cluster {name} not found in {namespace} namespace") + return + cluster.details() diff --git a/src/codeflare_sdk/cli/commands/list.py b/src/codeflare_sdk/cli/commands/list.py new file mode 100644 index 000000000..a91998e04 --- /dev/null +++ b/src/codeflare_sdk/cli/commands/list.py @@ -0,0 +1,50 @@ +import click + +from codeflare_sdk.cluster.cluster import ( + list_clusters_all_namespaces, + list_all_clusters, +) +from codeflare_sdk.cli.cli_utils import PluralAlias +from codeflare_sdk.cluster.cluster import get_cluster +from codeflare_sdk.cluster.cluster import _copy_to_ray +from codeflare_sdk.cli.cli_utils import list_all_jobs +from codeflare_sdk.cli.cli_utils import list_all_kubernetes_jobs +from codeflare_sdk.cli.cli_utils import list_raycluster_jobs + + +@click.group(cls=PluralAlias) +def cli(): + """List a specified resource""" + pass + + +@cli.command() +@click.option("--namespace", type=str) +@click.pass_context +def raycluster(ctx, namespace): + """ + List all rayclusters + """ + if namespace: + list_all_clusters(namespace) + return + list_clusters_all_namespaces() + + +@cli.command() +@click.pass_context +@click.option("--cluster-name", "-c", type=str) +@click.option("--namespace", "-n", type=str) +@click.option("--kube-mcad-scheduler-only", is_flag=True) +def job(ctx, cluster_name, namespace, kube_mcad_scheduler_only): + """ + List all jobs submitted + """ + if cluster_name: + cluster = get_cluster(cluster_name, namespace or ctx.obj.current_namespace) + list_raycluster_jobs(_copy_to_ray(cluster), True) + return + if kube_mcad_scheduler_only: + list_all_kubernetes_jobs(True) + return + list_all_jobs(True) diff --git a/src/codeflare_sdk/cli/commands/login.py b/src/codeflare_sdk/cli/commands/login.py new file mode 100644 index 000000000..56df911b2 --- /dev/null +++ b/src/codeflare_sdk/cli/commands/login.py @@ -0,0 +1,46 @@ +import click +import pickle +from kubernetes import client +import os + +from codeflare_sdk.cluster.auth import TokenAuthentication +from codeflare_sdk.cli.cli_utils import AuthenticationConfig +import codeflare_sdk.cluster.auth as sdk_auth + + +@click.command() +@click.pass_context +@click.option("--server", "-s", type=str, required=True, help="Cluster API address") +@click.option("--token", "-t", type=str, required=True, help="Authentication token") +@click.option( + "--insecure-skip-tls-verify", + type=bool, + help="If true, server's certificate won't be checked for validity", + default=False, +) +@click.option( + "--certificate-authority", + type=str, + help="Path to cert file for certificate authority", +) +def cli(ctx, server, token, insecure_skip_tls_verify, certificate_authority): + """ + Login to your Kubernetes cluster and save login for later use + """ + auth = TokenAuthentication( + token, server, insecure_skip_tls_verify, certificate_authority + ) + auth.login() + if not sdk_auth.api_client: # TokenAuthentication failed + return + + auth_config = AuthenticationConfig( + token, + server, + insecure_skip_tls_verify, + certificate_authority, + ) + auth_file_path = ctx.obj.codeflare_path + "/auth" + with open(auth_file_path, "wb") as file: + pickle.dump(auth_config, file) + click.echo(f"Logged into '{server}'") diff --git a/src/codeflare_sdk/cli/commands/logout.py b/src/codeflare_sdk/cli/commands/logout.py new file mode 100644 index 000000000..0001b2331 --- /dev/null +++ b/src/codeflare_sdk/cli/commands/logout.py @@ -0,0 +1,19 @@ +import click +import os +import pickle + + +@click.command() +@click.pass_context +def cli(ctx): + """ + Log out of current Kubernetes cluster + """ + try: + auth_file_path = ctx.obj.codeflare_path + "/auth" + with open(auth_file_path, "rb") as file: + auth = pickle.load(file) + os.remove(auth_file_path) + click.echo(f"Successfully logged out of '{auth.server}'") + except: + click.echo("Not logged in") diff --git a/src/codeflare_sdk/cli/commands/logs.py b/src/codeflare_sdk/cli/commands/logs.py new file mode 100644 index 000000000..402cbf2ff --- /dev/null +++ b/src/codeflare_sdk/cli/commands/logs.py @@ -0,0 +1,25 @@ +import click +from torchx.runner import get_runner + +from codeflare_sdk.cli.cli_utils import get_job_app_handle + + +@click.group() +def cli(): + """Get the logs of a specified resource""" + pass + + +@cli.command() +@click.pass_context +@click.argument("submission-id", type=str) +def job(ctx, submission_id): + """Get the logs of a specified job""" + runner = get_runner() + try: + app_handle = get_job_app_handle(submission_id) + click.echo("".join(runner.log_lines(app_handle, None))) + except FileNotFoundError: + click.echo(f"Submission ID {submission_id} not found in Kubernetes Cluster") + except Exception as e: + click.echo("Error getting job logs: " + str(e)) diff --git a/src/codeflare_sdk/cli/commands/status.py b/src/codeflare_sdk/cli/commands/status.py new file mode 100644 index 000000000..cce584c7d --- /dev/null +++ b/src/codeflare_sdk/cli/commands/status.py @@ -0,0 +1,41 @@ +import click +from torchx.runner import get_runner + +from codeflare_sdk.cluster.cluster import get_cluster +from codeflare_sdk.cli.cli_utils import get_job_app_handle + + +@click.group() +def cli(): + """Get the status of a specified resource""" + pass + + +@cli.command() +@click.argument("name", type=str) +@click.option("--namespace", type=str) +@click.pass_context +def raycluster(ctx, name, namespace): + """Get the status of a specified RayCluster""" + namespace = namespace or ctx.obj.current_namespace + try: + cluster = get_cluster(name, namespace) + except FileNotFoundError: + click.echo(f"Cluster {name} not found in {namespace} namespace") + return + cluster.status() + + +@cli.command() +@click.pass_context +@click.argument("submission-id", type=str) +def job(ctx, submission_id): + """Get the status of a specified job""" + runner = get_runner() + try: + app_handle = get_job_app_handle(submission_id) + click.echo(runner.status(app_handle=app_handle)) + except FileNotFoundError: + click.echo(f"Submission ID {submission_id} not found in Kubernetes Cluster") + except Exception as e: + click.echo("Error getting job status: " + str(e)) diff --git a/src/codeflare_sdk/cli/commands/submit.py b/src/codeflare_sdk/cli/commands/submit.py new file mode 100644 index 000000000..7c6760cae --- /dev/null +++ b/src/codeflare_sdk/cli/commands/submit.py @@ -0,0 +1,74 @@ +import click +import os + +from codeflare_sdk.cluster.cluster import Cluster +import pickle +from torchx.runner import get_runner + +from codeflare_sdk.cluster.cluster import get_cluster + + +@click.group() +def cli(): + """ + Submit a defined resource to the Kubernetes cluster + """ + pass + + +@cli.command() +@click.argument("name", type=str) +@click.option("--wait", is_flag=True) +def raycluster(name, wait): + """ + Submit a defined RayCluster to the Kubernetes cluster + """ + cluster = Cluster.from_definition_yaml(name + ".yaml") + if not cluster: + click.echo( + "Error submitting RayCluster. Make sure the RayCluster is defined before submitting it" + ) + return + if not wait: + cluster.up() + click.echo("Cluster submitted successfully") + return + cluster.up() + cluster.wait_ready() + + +@cli.command() +@click.pass_context +@click.argument("name", type=str) +@click.option("--cluster-name", type=str) +@click.option("--namespace", type=str) +def job(ctx, name, cluster_name, namespace): + """ + Submit a defined job to the Kubernetes cluster or a RayCluster + """ + runner = get_runner() + job_path = ctx.obj.codeflare_path + f"/{name}" + if not os.path.isfile(job_path): + click.echo( + f"Error submitting job. Make sure the job is defined before submitting it" + ) + return + with open(job_path, "rb") as file: + job_def = pickle.load(file) + if not cluster_name: + job = job_def.submit() + submission_id = runner.describe(job._app_handle).name.split(":")[1] + click.echo(f"Job {submission_id} submitted successfully") + return + namespace = namespace or ctx.obj.current_namespace + try: + cluster = get_cluster(cluster_name, namespace) + except FileNotFoundError: + click.echo(f"Cluster {name} not found in {namespace} namespace") + return + job = job_def.submit(cluster) + full_name = runner.describe(job._app_handle).name + submission_id = full_name[full_name.rfind(name) :] + click.echo( + f"Job {submission_id} submitted onto {cluster_name} RayCluster successfully\nView dashboard: {cluster.cluster_dashboard_uri()}" + ) diff --git a/src/codeflare_sdk/cluster/auth.py b/src/codeflare_sdk/cluster/auth.py index 85db3d61d..90c1f726a 100644 --- a/src/codeflare_sdk/cluster/auth.py +++ b/src/codeflare_sdk/cluster/auth.py @@ -97,17 +97,11 @@ def login(self) -> str: global config_path global api_client try: - configuration = client.Configuration() - configuration.api_key_prefix["authorization"] = "Bearer" - configuration.host = self.server - configuration.api_key["authorization"] = self.token - if self.skip_tls == False and self.ca_cert_path == None: - configuration.verify_ssl = True - elif self.skip_tls == False: - configuration.ssl_ca_cert = self.ca_cert_path - else: - configuration.verify_ssl = False - api_client = client.ApiClient(configuration) + api_client = client.ApiClient( + _create_api_client_config( + self.token, self.server, self.skip_tls, self.ca_cert_path + ) + ) client.AuthenticationApi(api_client).get_api_group() config_path = None return "Logged into %s" % self.server @@ -154,6 +148,25 @@ def load_kube_config(self): return response +def _create_api_client_config( + token: str, server: str, skip_tls: bool = False, ca_cert_path: str = None +): + """ + Creates Kubernetes client configuration given necessary parameters + """ + configuration = client.Configuration() + configuration.api_key_prefix["authorization"] = "Bearer" + configuration.host = server + configuration.api_key["authorization"] = token + if skip_tls == False and ca_cert_path == None: + configuration.verify_ssl = True + elif skip_tls == False: + configuration.ssl_ca_cert = ca_cert_path + else: + configuration.verify_ssl = False + return configuration + + def config_check() -> str: """ Function for loading the config file at the default config location ~/.kube/config if the user has not diff --git a/src/codeflare_sdk/cluster/cluster.py b/src/codeflare_sdk/cluster/cluster.py index d698331e6..c2cfc1277 100644 --- a/src/codeflare_sdk/cluster/cluster.py +++ b/src/codeflare_sdk/cluster/cluster.py @@ -50,7 +50,7 @@ class Cluster: torchx_scheduler = "ray" - def __init__(self, config: ClusterConfiguration): + def __init__(self, config: ClusterConfiguration, generate_app_wrapper: bool = True): """ Create the resource cluster object by passing in a ClusterConfiguration (defined in the config sub-module). An AppWrapper will then be generated @@ -58,13 +58,17 @@ def __init__(self, config: ClusterConfiguration): request. """ self.config = config - self.app_wrapper_yaml = self.create_app_wrapper() - self.app_wrapper_name = self.app_wrapper_yaml.split(".")[0] + self.app_wrapper_yaml = None + self.app_wrapper_name = None + + if generate_app_wrapper: + self.app_wrapper_yaml = self.create_app_wrapper() + self.app_wrapper_name = self.app_wrapper_yaml.split(".")[0] def create_app_wrapper(self): """ - Called upon cluster object creation, creates an AppWrapper yaml based on - the specifications of the ClusterConfiguration. + Creates an AppWrapper yaml based on the specified cluster config + based on the specifications of the ClusterConfiguration. """ if self.config.namespace is None: @@ -115,6 +119,9 @@ def up(self): Applies the AppWrapper yaml, pushing the resource request onto the MCAD queue. """ + if self.app_wrapper_yaml is None: + self.app_wrapper_yaml = self.create_app_wrapper() + self.app_wrapper_name = self.app_wrapper_yaml.split(".")[0] namespace = self.config.namespace try: config_check() @@ -137,6 +144,9 @@ def down(self): associated with the cluster. """ namespace = self.config.namespace + if not self.config.name and not self.app_wrapper_name: + print("Error taking down cluster: missing name or AppWrapper") + return try: config_check() api_instance = client.CustomObjectsApi(api_config_handler()) @@ -145,7 +155,7 @@ def down(self): version="v1beta1", namespace=namespace, plural="appwrappers", - name=self.app_wrapper_name, + name=self.app_wrapper_name or self.config.name, ) except Exception as e: # pragma: no cover return _kube_api_error_handling(e) @@ -313,7 +323,8 @@ def torchx_config( def from_k8_cluster_object(rc): machine_types = ( rc["metadata"]["labels"]["orderedinstance"].split("_") - if "orderedinstance" in rc["metadata"]["labels"] + if "labels" in rc["metadata"] + and "orderedinstance" in rc["metadata"]["labels"] else [] ) local_interactive = ( @@ -350,7 +361,58 @@ def from_k8_cluster_object(rc): ]["image"], local_interactive=local_interactive, ) - return Cluster(cluster_config) + return Cluster(cluster_config, False) + + def from_definition_yaml(yaml_path): + try: + with open(yaml_path) as yaml_file: + rc = yaml.load(yaml_file, Loader=yaml.FullLoader) + machine_types = ( + rc["metadata"]["labels"]["orderedinstance"].split("_") + if "labels" in rc["metadata"] + and "orderedinstance" in rc["metadata"]["labels"] + else [] + ) + worker_group_specs = rc["spec"]["resources"]["GenericItems"][0][ + "generictemplate" + ]["spec"]["workerGroupSpecs"][0] + local_interactive = ( + "volumeMounts" + in worker_group_specs["template"]["spec"]["containers"][0] + ) + cluster_config = ClusterConfiguration( + name=rc["metadata"]["name"], + namespace=rc["metadata"]["namespace"], + machine_types=machine_types, + num_workers=worker_group_specs["minReplicas"], + min_cpus=worker_group_specs["template"]["spec"]["containers"][0][ + "resources" + ]["requests"]["cpu"], + max_cpus=worker_group_specs["template"]["spec"]["containers"][0][ + "resources" + ]["limits"]["cpu"], + min_memory=int( + worker_group_specs["template"]["spec"]["containers"][0][ + "resources" + ]["requests"]["memory"][:-1] + ), + max_memory=int( + worker_group_specs["template"]["spec"]["containers"][0][ + "resources" + ]["limits"]["memory"][:-1] + ), + num_gpus=worker_group_specs["template"]["spec"]["containers"][0][ + "resources" + ]["requests"]["nvidia.com/gpu"], + instascale=True if machine_types else False, + image=worker_group_specs["template"]["spec"]["containers"][0][ + "image" + ], + local_interactive=local_interactive, + ) + return Cluster(cluster_config) + except IOError: + return None def local_client_url(self): if self.config.local_interactive == True: @@ -364,7 +426,17 @@ def list_all_clusters(namespace: str, print_to_console: bool = True): """ Returns (and prints by default) a list of all clusters in a given namespace. """ - clusters = _get_ray_clusters(namespace) + clusters = _get_all_ray_clusters(namespace) + if print_to_console: + pretty_print.print_clusters(clusters) + return clusters + + +def list_clusters_all_namespaces(print_to_console: bool = True): + """ + Returns (and prints by default) a list of all clusters in the Kubernetes cluster. + """ + clusters = _get_all_ray_clusters() if print_to_console: pretty_print.print_clusters(clusters) return clusters @@ -411,8 +483,8 @@ def get_current_namespace(): # pragma: no cover def get_cluster(cluster_name: str, namespace: str = "default"): try: - config.load_kube_config() - api_instance = client.CustomObjectsApi() + config_check() + api_instance = client.CustomObjectsApi(api_config_handler()) rcs = api_instance.list_namespaced_custom_object( group="ray.io", version="v1alpha1", @@ -481,17 +553,24 @@ def _ray_cluster_status(name, namespace="default") -> Optional[RayCluster]: return None -def _get_ray_clusters(namespace="default") -> List[RayCluster]: +def _get_all_ray_clusters(namespace: str = None) -> List[RayCluster]: list_of_clusters = [] try: config_check() api_instance = client.CustomObjectsApi(api_config_handler()) - rcs = api_instance.list_namespaced_custom_object( - group="ray.io", - version="v1alpha1", - namespace=namespace, - plural="rayclusters", - ) + if namespace: + rcs = api_instance.list_namespaced_custom_object( + group="ray.io", + version="v1alpha1", + namespace=namespace, + plural="rayclusters", + ) + else: + rcs = api_instance.list_cluster_custom_object( + group="ray.io", + version="v1alpha1", + plural="rayclusters", + ) except Exception as e: # pragma: no cover return _kube_api_error_handling(e) diff --git a/tests/cli-test-case.yaml b/tests/cli-test-case.yaml new file mode 100644 index 000000000..41f62b97d --- /dev/null +++ b/tests/cli-test-case.yaml @@ -0,0 +1,195 @@ +apiVersion: mcad.ibm.com/v1beta1 +kind: AppWrapper +metadata: + labels: + orderedinstance: cpu.small_gpu.large + name: cli-test-cluster + namespace: default +spec: + priority: 9 + resources: + GenericItems: + - custompodresources: + - limits: + cpu: 2 + memory: 8G + nvidia.com/gpu: 0 + replicas: 1 + requests: + cpu: 2 + memory: 8G + nvidia.com/gpu: 0 + - limits: + cpu: 4 + memory: 6G + nvidia.com/gpu: 7 + replicas: 1 + requests: + cpu: 3 + memory: 5G + nvidia.com/gpu: 7 + generictemplate: + apiVersion: ray.io/v1alpha1 + kind: RayCluster + metadata: + labels: + appwrapper.mcad.ibm.com: cli-test-cluster + controller-tools.k8s.io: '1.0' + name: cli-test-cluster + namespace: default + spec: + autoscalerOptions: + idleTimeoutSeconds: 60 + imagePullPolicy: Always + resources: + limits: + cpu: 500m + memory: 512Mi + requests: + cpu: 500m + memory: 512Mi + upscalingMode: Default + enableInTreeAutoscaling: false + headGroupSpec: + rayStartParams: + block: 'true' + dashboard-host: 0.0.0.0 + num-gpus: '0' + serviceType: ClusterIP + template: + spec: + affinity: + nodeAffinity: + requiredDuringSchedulingIgnoredDuringExecution: + nodeSelectorTerms: + - matchExpressions: + - key: cli-test-cluster + operator: In + values: + - cli-test-cluster + containers: + - env: + - name: MY_POD_IP + valueFrom: + fieldRef: + fieldPath: status.podIP + - name: RAY_USE_TLS + value: '0' + - name: RAY_TLS_SERVER_CERT + value: /home/ray/workspace/tls/server.crt + - name: RAY_TLS_SERVER_KEY + value: /home/ray/workspace/tls/server.key + - name: RAY_TLS_CA_CERT + value: /home/ray/workspace/tls/ca.crt + image: quay.io/project-codeflare/ray:2.5.0-py38-cu116 + imagePullPolicy: Always + lifecycle: + preStop: + exec: + command: + - /bin/sh + - -c + - ray stop + name: ray-head + ports: + - containerPort: 6379 + name: gcs + - containerPort: 8265 + name: dashboard + - containerPort: 10001 + name: client + resources: + limits: + cpu: 2 + memory: 8G + nvidia.com/gpu: 0 + requests: + cpu: 2 + memory: 8G + nvidia.com/gpu: 0 + imagePullSecrets: + - name: cli-test-pull-secret + rayVersion: 2.1.0 + workerGroupSpecs: + - groupName: small-group-cli-test-cluster + maxReplicas: 1 + minReplicas: 1 + rayStartParams: + block: 'true' + num-gpus: '7' + replicas: 1 + template: + metadata: + annotations: + key: value + labels: + key: value + spec: + affinity: + nodeAffinity: + requiredDuringSchedulingIgnoredDuringExecution: + nodeSelectorTerms: + - matchExpressions: + - key: cli-test-cluster + operator: In + values: + - cli-test-cluster + containers: + - env: + - name: MY_POD_IP + valueFrom: + fieldRef: + fieldPath: status.podIP + - name: RAY_USE_TLS + value: '0' + - name: RAY_TLS_SERVER_CERT + value: /home/ray/workspace/tls/server.crt + - name: RAY_TLS_SERVER_KEY + value: /home/ray/workspace/tls/server.key + - name: RAY_TLS_CA_CERT + value: /home/ray/workspace/tls/ca.crt + image: quay.io/project-codeflare/ray:2.5.0-py38-cu116 + lifecycle: + preStop: + exec: + command: + - /bin/sh + - -c + - ray stop + name: machine-learning + resources: + limits: + cpu: 4 + memory: 6G + nvidia.com/gpu: 7 + requests: + cpu: 3 + memory: 5G + nvidia.com/gpu: 7 + imagePullSecrets: + - name: cli-test-pull-secret + initContainers: + - command: + - sh + - -c + - until nslookup $RAY_IP.$(cat /var/run/secrets/kubernetes.io/serviceaccount/namespace).svc.cluster.local; + do echo waiting for myservice; sleep 2; done + image: busybox:1.28 + name: init-myservice + replicas: 1 + - generictemplate: + apiVersion: route.openshift.io/v1 + kind: Route + metadata: + labels: + odh-ray-cluster-service: cli-test-cluster-head-svc + name: ray-dashboard-cli-test-cluster + namespace: default + spec: + port: + targetPort: dashboard + to: + kind: Service + name: cli-test-cluster-head-svc + replica: 1 + Items: [] diff --git a/tests/unit_test.py b/tests/unit_test.py index ac126016f..0b469b226 100644 --- a/tests/unit_test.py +++ b/tests/unit_test.py @@ -17,6 +17,8 @@ import filecmp import os import re +from click.testing import CliRunner +import pickle parent = Path(__file__).resolve().parents[1] sys.path.append(str(parent) + "/src") @@ -32,6 +34,7 @@ get_cluster, _app_wrapper_status, _ray_cluster_status, + list_clusters_all_namespaces, ) from codeflare_sdk.cluster.auth import ( TokenAuthentication, @@ -63,6 +66,9 @@ generate_tls_cert, export_env, ) +from codeflare_sdk.cli.codeflare_cli import cli +from codeflare_sdk.cli.cli_utils import load_auth +import codeflare_sdk.cluster.auth as sdk_auth import openshift from openshift.selector import Selector @@ -75,6 +81,439 @@ import yaml +# CLI testing +def test_cli_working(): + runner = CliRunner() + result = runner.invoke(cli) + assert result.exit_code == 0 + + +def test_login_cli(mocker): + runner = CliRunner() + mocker.patch.object(client, "ApiClient") + mocker.patch( + "codeflare_sdk.cli.codeflare_cli.get_current_namespace", return_value="ns" + ) + k8s_login_command = """ + login + --server=testserver:6443 + --token=testtoken + """ + login_result = runner.invoke(cli, k8s_login_command) + assert login_result.output == "Logged into 'testserver:6443'\n" + try: + auth_file_path = os.path.expanduser("~/.codeflare/auth") + with open(auth_file_path, "rb") as file: + auth = pickle.load(file) + except: + assert 0 == 1 + assert auth.server == "testserver:6443" + assert auth.token == "testtoken" + assert auth.api_client_config.api_key["authorization"] == "testtoken" + assert auth.api_client_config.verify_ssl + assert auth.api_client_config.host == "testserver:6443" + + +def test_login_tls_cli(mocker): + runner = CliRunner() + mocker.patch.object(client, "ApiClient") + mocker.patch( + "codeflare_sdk.cli.codeflare_cli.get_current_namespace", return_value="ns" + ) + k8s_tls_login_command = """ + login + --server=testserver:6443 + --token=testtoken + --insecure-skip-tls-verify=False + """ + k8s_skip_tls_login_command = """ + login + --server=testserver:6443 + --token=testtoken + --insecure-skip-tls-verify=True + """ + tls_result = runner.invoke(cli, k8s_tls_login_command) + skip_tls_result = runner.invoke(cli, k8s_skip_tls_login_command) + assert ( + "Logged into 'testserver:6443'\n" == tls_result.output == skip_tls_result.output + ) + + +def test_load_auth(): + load_auth() + assert sdk_auth.api_client is not None + + +def test_cluster_definition_cli(mocker): + mocker.patch.object(client, "ApiClient") + mocker.patch( + "codeflare_sdk.cli.codeflare_cli.get_current_namespace", return_value="ns" + ) + runner = CliRunner() + define_cluster_command = """ + define raycluster + --name=cli-test-cluster + --namespace=default + --num-workers=1 + --min-cpus=3 + --max-cpus=4 + --min-memory=5 + --max-memory=6 + --num-gpus=7 + --instascale=True + --machine-types='["cpu.small", "gpu.large"]' + --image-pull-secrets='["cli-test-pull-secret"]' + """ + result = runner.invoke(cli, define_cluster_command) + assert result.output == "Written to: cli-test-cluster.yaml\n" + assert filecmp.cmp( + "cli-test-cluster.yaml", f"{parent}/tests/cli-test-case.yaml", shallow=True + ) + + +def test_cluster_submission_cli(mocker): + mocker.patch.object(client, "ApiClient") + mocker.patch( + "codeflare_sdk.cli.codeflare_cli.get_current_namespace", return_value="ns" + ) + runner = CliRunner() + submit_cluster_command = """ + submit raycluster + cli-test-cluster + """ + result = runner.invoke(cli, submit_cluster_command) + + assert result.exit_code == 0 + assert ( + result.output + == "Written to: cli-test-cluster.yaml\nCluster submitted successfully\n" + ) + + +def test_cluster_deletion_cli(mocker): + mocker.patch.object(client, "ApiClient") + mocker.patch("kubernetes.config.load_kube_config", return_value="ignore") + mocker.patch( + "kubernetes.client.CustomObjectsApi.list_namespaced_custom_object", + side_effect=get_ray_obj, + ) + mocker.patch( + "codeflare_sdk.cli.codeflare_cli.get_current_namespace", return_value="ns" + ) + runner = CliRunner() + delete_cluster_command = """ + delete raycluster + quicktest --namespace=default + """ + result = runner.invoke(cli, delete_cluster_command) + + assert result.exit_code == 0 + assert result.output == "Cluster deleted successfully\n" + + +def test_raycluster_details_cli(mocker): + runner = CliRunner() + mocker.patch( + "kubernetes.client.CustomObjectsApi.list_namespaced_custom_object", + side_effect=get_ray_obj, + ) + mocker.patch( + "codeflare_sdk.cluster.cluster.Cluster.status", + return_value=(False, CodeFlareClusterStatus.UNKNOWN), + ) + mocker.patch( + "codeflare_sdk.cluster.cluster.Cluster.cluster_dashboard_uri", + return_value="", + ) + mocker.patch( + "codeflare_sdk.cli.codeflare_cli.get_current_namespace", return_value="ns" + ) + mocker.patch.object(client, "ApiClient") + raycluster_details_command = """ + details raycluster quicktest --namespace=default + """ + result = runner.invoke(cli, raycluster_details_command) + assert result.output == ( + " 🚀 CodeFlare Cluster Details 🚀 \n" + " \n" + " ╭───────────────────────────────────────────────────────────────╮ \n" + " │ Name │ \n" + " │ quicktest Inactive ❌ │ \n" + " │ │ \n" + " │ URI: ray://quicktest-head-svc.ns.svc:10001 │ \n" + " │ │ \n" + " │ Dashboard🔗 │ \n" + " │ │ \n" + " │ Cluster Resources │ \n" + " │ ╭── Workers ──╮ ╭───────── Worker specs(each) ─────────╮ │ \n" + " │ │ # Workers │ │ Memory CPU GPU │ │ \n" + " │ │ │ │ │ │ \n" + " │ │ 1 │ │ 2~2 1 0 │ │ \n" + " │ │ │ │ │ │ \n" + " │ ╰─────────────╯ ╰──────────────────────────────────────╯ │ \n" + " ╰───────────────────────────────────────────────────────────────╯ \n" + ) + + +def test_raycluster_status_cli(mocker): + runner = CliRunner() + mocker.patch( + "kubernetes.client.CustomObjectsApi.list_namespaced_custom_object", + side_effect=get_ray_obj, + ) + mocker.patch( + "codeflare_sdk.cluster.cluster.get_current_namespace", + return_value="ns", + ) + mocker.patch( + "codeflare_sdk.cluster.cluster.Cluster.cluster_dashboard_uri", + return_value="", + ) + mocker.patch.object(client, "ApiClient") + test_raycluster = RayCluster( + "quicktest", + RayClusterStatus.READY, + 1, + 1, + "1", + "1", + 1, + "default", + "dashboard-url", + ) + mocker.patch( + "codeflare_sdk.cluster.cluster._app_wrapper_status", + return_value=test_raycluster, + ) + mocker.patch( + "codeflare_sdk.cluster.cluster._ray_cluster_status", + return_value=test_raycluster, + ) + raycluster_status_command = """ + status raycluster quicktest --namespace=default + """ + result = runner.invoke(cli, raycluster_status_command) + assert "Active" in result.output + + +def test_raycluster_list_cli(mocker): + runner = CliRunner() + mocker.patch( + "kubernetes.client.CustomObjectsApi.list_namespaced_custom_object", + side_effect=get_ray_obj, + ) + mocker.patch( + "codeflare_sdk.cli.codeflare_cli.get_current_namespace", + return_value="ns", + ) + mocker.patch( + "codeflare_sdk.cluster.cluster.Cluster.status", + return_value=(False, CodeFlareClusterStatus.UNKNOWN), + ) + mocker.patch( + "codeflare_sdk.cluster.cluster.Cluster.cluster_dashboard_uri", + return_value="", + ) + mocker.patch.object(client, "ApiClient") + list_rayclusters_command = """ + list rayclusters --namespace=ns + """ + result = runner.invoke(cli, list_rayclusters_command) + assert result.output == ( + " 🚀 CodeFlare Cluster Details 🚀 \n" + " \n" + " ╭───────────────────────────────────────────────────────────────╮ \n" + " │ Name │ \n" + " │ quicktest Active ✅ │ \n" + " │ │ \n" + " │ URI: ray://quicktest-head-svc.ns.svc:10001 │ \n" + " │ │ \n" + " │ Dashboard🔗 │ \n" + " │ │ \n" + " │ Cluster Resources │ \n" + " │ ╭── Workers ──╮ ╭───────── Worker specs(each) ─────────╮ │ \n" + " │ │ # Workers │ │ Memory CPU GPU │ │ \n" + " │ │ │ │ │ │ \n" + " │ │ 1 │ │ 2G~2G 1 0 │ │ \n" + " │ │ │ │ │ │ \n" + " │ ╰─────────────╯ ╰──────────────────────────────────────╯ │ \n" + " ╰───────────────────────────────────────────────────────────────╯ \n" + ) + + +def test_list_clusters_all_namespaces(mocker, capsys): + mocker.patch( + "kubernetes.client.CustomObjectsApi.list_cluster_custom_object", + side_effect=get_ray_obj_no_namespace, + ) + list_clusters_all_namespaces() + captured = capsys.readouterr() + assert captured.out == ( + " 🚀 CodeFlare Cluster Details 🚀 \n" + " \n" + " ╭───────────────────────────────────────────────────────────────╮ \n" + " │ Name │ \n" + " │ quicktest Active ✅ │ \n" + " │ │ \n" + " │ URI: ray://quicktest-head-svc.ns.svc:10001 │ \n" + " │ │ \n" + " │ Dashboard🔗 │ \n" + " │ │ \n" + " │ Cluster Resources │ \n" + " │ ╭── Workers ──╮ ╭───────── Worker specs(each) ─────────╮ │ \n" + " │ │ # Workers │ │ Memory CPU GPU │ │ \n" + " │ │ │ │ │ │ \n" + " │ │ 1 │ │ 2G~2G 1 0 │ │ \n" + " │ │ │ │ │ │ \n" + " │ ╰─────────────╯ ╰──────────────────────────────────────╯ │ \n" + " ╰───────────────────────────────────────────────────────────────╯ \n" + ) + + +def test_job_definition_cli(mocker): + runner = CliRunner() + mocker.patch( + "codeflare_sdk.cli.codeflare_cli.get_current_namespace", return_value="ns" + ) + define_job_command = """ + define job + --script=test-script.py + --script-args='["arg1", "arg2"]' + --memMB=2 + --image=test-image + --name=test + """ + result = runner.invoke(cli, define_job_command) + file_path = os.path.expanduser("~") + "/.codeflare/test" + assert result.output == "Job definition saved to " + file_path + "\n" + try: + with open(file_path, "rb") as file: + job = pickle.load(file) + except Exception as e: + print("Error opening file: ", e) + assert 0 == 1 + assert job.script == "test-script.py" + assert job.script_args == ["arg1", "arg2"] + assert job.memMB == 2 + assert job.image == "test-image" + assert job.name == "test" + + +def test_job_submission_cli(mocker): + mocker.patch.object(client, "ApiClient") + mocker.patch( + "kubernetes.client.CustomObjectsApi.list_namespaced_custom_object", + side_effect=get_ray_obj, + ) + mocker.patch( + "codeflare_sdk.cli.codeflare_cli.get_current_namespace", return_value="ns" + ) + mocker.patch( + "codeflare_sdk.cluster.cluster.Cluster.cluster_dashboard_uri", + return_value="test-url.com", + ) + mocker.patch( + "codeflare_sdk.job.jobs.torchx_runner.schedule", + return_value="test-url.com", + ) + mocker.patch("torchx.runner.Runner.describe", return_value=AppDef(name="test-1234")) + runner = CliRunner() + submit_job_command = """ + submit job + test + --cluster-name=quicktest + --namespace=default + """ + result = runner.invoke(cli, submit_job_command) + assert ( + result.output + == "Job test-1234 submitted onto quicktest RayCluster successfully\n" + + "View dashboard: test-url.com\n" + ) + + +def test_status_job_cli(mocker): + runner = CliRunner() + mocker.patch( + "codeflare_sdk.cli.codeflare_cli.get_current_namespace", + return_value="opendatahub", + ) + mocker.patch("torchx.runner.Runner.status", return_value="fake-status") + mocker.patch( + "codeflare_sdk.cli.cli_utils.get_job_app_handle", + return_value="fake-handle", + ) + job_status_command = """ + status job test-job + """ + result = runner.invoke(cli, job_status_command) + assert result.output == "fake-status\n" + + +def test_logs_job_cli(mocker): + runner = CliRunner() + mocker.patch.object(client, "ApiClient") + mocker.patch( + "codeflare_sdk.cli.codeflare_cli.get_current_namespace", return_value="ns" + ) + mocker.patch("torchx.runner.Runner.log_lines", return_value=["fake-logs"]) + mocker.patch( + "codeflare_sdk.cli.cli_utils.get_job_app_handle", + return_value="fake-handle", + ) + job_logs_command = """ + logs job test-job + """ + result = runner.invoke(cli, job_logs_command) + assert result.output == "fake-logs\n" + + +def test_list_jobs_cli(mocker): + runner = CliRunner() + mocker.patch.object(client, "ApiClient") + mocker.patch( + "codeflare_sdk.cli.codeflare_cli.get_current_namespace", return_value="ns" + ) + test_job = { + "Submission ID": "fake-id", + "Job ID": "N/A", + "RayCluster": "N/A", + "Namespace": "default", + "Status": "Pending", + "App Handle": "test", + } + mocker.patch( + "codeflare_sdk.cli.cli_utils.list_all_kubernetes_jobs", return_value=[test_job] + ) + mocker.patch( + "codeflare_sdk.cli.cli_utils.list_all_raycluster_jobs", return_value=[test_job] + ) + list_jobs_command = """ + list jobs + """ + result = runner.invoke(cli, list_jobs_command) + assert result.output == ( + "┏━━━━━━━━━━━━━━━┳━━━━━━━━┳━━━━━━━━━━━━┳━━━━━━━━━━━┳━━━━━━━━━┓\n" + + "┃ Submission ID ┃ Job ID ┃ RayCluster ┃ Namespace ┃ Status ┃\n" + + "┡━━━━━━━━━━━━━━━╇━━━━━━━━╇━━━━━━━━━━━━╇━━━━━━━━━━━╇━━━━━━━━━┩\n" + + "│ fake-id │ N/A │ N/A │ default │ Pending │\n" + + "│ fake-id │ N/A │ N/A │ default │ Pending │\n" + + "└───────────────┴────────┴────────────┴───────────┴─────────┘\n" + ) + + +def test_logout_cli(mocker): + runner = CliRunner() + mocker.patch.object(client, "ApiClient") + mocker.patch( + "codeflare_sdk.cli.codeflare_cli.get_current_namespace", return_value="ns" + ) + k8s_logout_command = "logout" + logout_result = runner.invoke(cli, k8s_logout_command) + assert logout_result.output == "Successfully logged out of 'testserver:6443'\n" + assert not os.path.exists(os.path.expanduser("~/.codeflare/auth")) + + # For mocking openshift client results fake_res = openshift.Result("fake") @@ -856,6 +1295,10 @@ def get_ray_obj(group, version, namespace, plural, cls=None): return api_obj +def get_ray_obj_no_namespace(group, version, plural, cls=None): + return get_ray_obj(group, version, "ns", plural, cls) + + def get_aw_obj(group, version, namespace, plural): api_obj1 = { "items": [ @@ -2217,8 +2660,8 @@ def test_cleanup(): os.remove("unit-test-default-cluster.yaml") os.remove("test.yaml") os.remove("raytest2.yaml") - os.remove("quicktest.yaml") os.remove("tls-cluster-namespace/ca.crt") os.remove("tls-cluster-namespace/tls.crt") os.remove("tls-cluster-namespace/tls.key") os.rmdir("tls-cluster-namespace") + os.remove("cli-test-cluster.yaml")