From eba57bb3f7c2f8193a5010201731a2ddd456214d Mon Sep 17 00:00:00 2001 From: Jacob Tomlinson Date: Wed, 24 May 2023 14:59:08 +0100 Subject: [PATCH 1/6] Add kr8s Dask resource classes --- .../operator/controller/controller.py | 43 +++++++++++++++++++ 1 file changed, 43 insertions(+) diff --git a/dask_kubernetes/operator/controller/controller.py b/dask_kubernetes/operator/controller/controller.py index db47a75a7..8b482bfab 100644 --- a/dask_kubernetes/operator/controller/controller.py +++ b/dask_kubernetes/operator/controller/controller.py @@ -11,6 +11,7 @@ import kubernetes_asyncio as kubernetes from importlib_metadata import entry_points from kubernetes_asyncio.client import ApiException +from kr8s.asyncio.objects import APIObject from dask_kubernetes.common.auth import ClusterAuth from dask_kubernetes.common.networking import get_scheduler_address @@ -39,6 +40,48 @@ class SchedulerCommError(Exception): """Raised when unable to communicate with a scheduler.""" +class DaskCluster(APIObject): + version = "kubernetes.dask.org/v1" + endpoint = "daskclusters" + kind = "DaskCluster" + plural = "daskclusters" + singular = "daskcluster" + namespaced = True + + # TODO make scalable + # scalable = True + # # Dot notation not yet supported in kr8s + # scalable_spec = "worker.replicas" + + +class DaskWorkerGroup(APIObject): + version = "kubernetes.dask.org/v1" + endpoint = "daskworkergroups" + kind = "DaskWorkerGroup" + plural = "daskworkergroups" + singular = "daskworkergroup" + namespaced = True + scalable = True + + +class DaskAutoscaler(APIObject): + version = "kubernetes.dask.org/v1" + endpoint = "daskautoscalers" + kind = "DaskAutoscaler" + plural = "daskautoscalers" + singular = "daskautoscaler" + namespaced = True + + +class DaskJob(APIObject): + version = "kubernetes.dask.org/v1" + endpoint = "daskjobs" + kind = "DaskJob" + plural = "daskjobs" + singular = "daskjob" + namespaced = True + + def _get_annotations(meta): return { annotation_key: annotation_value From 14cffef6659cecae9963370e50408adf5296533f Mon Sep 17 00:00:00 2001 From: Jacob Tomlinson Date: Wed, 24 May 2023 15:02:44 +0100 Subject: [PATCH 2/6] Replace aiopykube with kr8s in handle_scheduler_service_status --- dask_kubernetes/operator/controller/controller.py | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/dask_kubernetes/operator/controller/controller.py b/dask_kubernetes/operator/controller/controller.py index 8b482bfab..d91719512 100644 --- a/dask_kubernetes/operator/controller/controller.py +++ b/dask_kubernetes/operator/controller/controller.py @@ -15,8 +15,6 @@ from dask_kubernetes.common.auth import ClusterAuth from dask_kubernetes.common.networking import get_scheduler_address -from dask_kubernetes.aiopykube import HTTPClient, KubeConfig -from dask_kubernetes.aiopykube.dask import DaskCluster from distributed.core import rpc _ANNOTATION_NAMESPACES_TO_IGNORE = ( @@ -390,9 +388,8 @@ async def handle_scheduler_service_status( else: phase = "Running" - api = HTTPClient(KubeConfig.from_env()) - cluster = await DaskCluster.objects(api, namespace=namespace).get_by_name( - labels["dask.org/cluster-name"] + cluster = await DaskCluster.get( + labels["dask.org/cluster-name"], namespace=namespace ) await cluster.patch({"status": {"phase": phase}}) From c5681ce80a58ff563a6485f3fc96ef78415ad4fc Mon Sep 17 00:00:00 2001 From: Jacob Tomlinson Date: Wed, 24 May 2023 15:03:02 +0100 Subject: [PATCH 3/6] Add kr8s dependency --- requirements.txt | 1 + 1 file changed, 1 insertion(+) diff --git a/requirements.txt b/requirements.txt index 588d0e6ba..7709dbefb 100644 --- a/requirements.txt +++ b/requirements.txt @@ -5,3 +5,4 @@ kubernetes-asyncio>=12.0.1 kopf>=1.35.3 pykube-ng>=22.9.0 rich>=12.5.1 +kr8s==0.4.1 From cbaae52b93e4a789886fc00d14df321f2e496181 Mon Sep 17 00:00:00 2001 From: Jacob Tomlinson Date: Wed, 24 May 2023 17:33:57 +0100 Subject: [PATCH 4/6] Revert everything except the import and class definitions --- dask_kubernetes/operator/controller/controller.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/dask_kubernetes/operator/controller/controller.py b/dask_kubernetes/operator/controller/controller.py index d91719512..256715036 100644 --- a/dask_kubernetes/operator/controller/controller.py +++ b/dask_kubernetes/operator/controller/controller.py @@ -15,6 +15,8 @@ from dask_kubernetes.common.auth import ClusterAuth from dask_kubernetes.common.networking import get_scheduler_address +from dask_kubernetes.aiopykube import HTTPClient, KubeConfig +from dask_kubernetes.aiopykube.dask import DaskCluster as PyKubeDaskCluster from distributed.core import rpc _ANNOTATION_NAMESPACES_TO_IGNORE = ( @@ -388,8 +390,9 @@ async def handle_scheduler_service_status( else: phase = "Running" - cluster = await DaskCluster.get( - labels["dask.org/cluster-name"], namespace=namespace + api = HTTPClient(KubeConfig.from_env()) + cluster = await PyKubeDaskCluster.objects(api, namespace=namespace).get_by_name( + labels["dask.org/cluster-name"] ) await cluster.patch({"status": {"phase": phase}}) From c5807fa452540a225b8f0532fb92882209e12bdc Mon Sep 17 00:00:00 2001 From: Jacob Tomlinson Date: Mon, 12 Jun 2023 14:15:23 +0100 Subject: [PATCH 5/6] Replace iopykube --- dask_kubernetes/operator/controller/controller.py | 13 +++---------- 1 file changed, 3 insertions(+), 10 deletions(-) diff --git a/dask_kubernetes/operator/controller/controller.py b/dask_kubernetes/operator/controller/controller.py index bdb7e724f..8bedfd1a0 100644 --- a/dask_kubernetes/operator/controller/controller.py +++ b/dask_kubernetes/operator/controller/controller.py @@ -15,8 +15,6 @@ from dask_kubernetes.common.auth import ClusterAuth from dask_kubernetes.common.networking import get_scheduler_address -from dask_kubernetes.aiopykube import HTTPClient, KubeConfig -from dask_kubernetes.aiopykube.dask import DaskCluster as PyKubeDaskCluster from distributed.core import rpc, clean_exception from distributed.protocol.pickle import dumps @@ -390,10 +388,8 @@ async def handle_scheduler_service_status( # Otherwise mark it as Running else: phase = "Running" - - api = HTTPClient(KubeConfig.from_env()) - cluster = await PyKubeDaskCluster.objects(api, namespace=namespace).get_by_name( - labels["dask.org/cluster-name"] + cluster = await DaskCluster.get( + labels["dask.org/cluster-name"], namespace=namespace ) await cluster.patch({"status": {"phase": phase}}) @@ -1029,8 +1025,5 @@ async def daskcluster_autoshutdown(spec, name, namespace, logger, **kwargs): logger.warn("Unable to connect to scheduler, skipping autoshutdown check.") return if idle_since and time.time() > idle_since + spec["idleTimeout"]: - api = HTTPClient(KubeConfig.from_env()) - cluster = await PyKubeDaskCluster.objects( - api, namespace=namespace - ).get_by_name(name) + cluster = await DaskCluster.get(name, namespace=namespace) await cluster.delete() From 9b28d6303d8d7b464c260af06b1670e8e1e318d1 Mon Sep 17 00:00:00 2001 From: Jacob Tomlinson Date: Mon, 12 Jun 2023 14:17:30 +0100 Subject: [PATCH 6/6] Bump kr8s to 0.5.1 and make DaskCluster scalable --- dask_kubernetes/operator/controller/controller.py | 7 ++----- requirements.txt | 2 +- 2 files changed, 3 insertions(+), 6 deletions(-) diff --git a/dask_kubernetes/operator/controller/controller.py b/dask_kubernetes/operator/controller/controller.py index 8bedfd1a0..432ccb3e7 100644 --- a/dask_kubernetes/operator/controller/controller.py +++ b/dask_kubernetes/operator/controller/controller.py @@ -46,11 +46,8 @@ class DaskCluster(APIObject): plural = "daskclusters" singular = "daskcluster" namespaced = True - - # TODO make scalable - # scalable = True - # # Dot notation not yet supported in kr8s - # scalable_spec = "worker.replicas" + scalable = True + scalable_spec = "worker.replicas" class DaskWorkerGroup(APIObject): diff --git a/requirements.txt b/requirements.txt index 7709dbefb..04bb03fef 100644 --- a/requirements.txt +++ b/requirements.txt @@ -5,4 +5,4 @@ kubernetes-asyncio>=12.0.1 kopf>=1.35.3 pykube-ng>=22.9.0 rich>=12.5.1 -kr8s==0.4.1 +kr8s==0.5.1