Skip to content

Commit 74aaf9a

Browse files
committed
add scheduler.dask_cluster_url traitlet, create dask cluster based on it
1 parent f4d8f8a commit 74aaf9a

File tree

1 file changed

+12
-2
lines changed

1 file changed

+12
-2
lines changed

jupyter_scheduler/scheduler.py

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
import fsspec
88
import psutil
99
from dask.distributed import Client as DaskClient
10+
from distributed import LocalCluster
1011
from jupyter_core.paths import jupyter_data_dir
1112
from jupyter_server.transutils import _i18n
1213
from jupyter_server.utils import to_os_path
@@ -402,6 +403,12 @@ class Scheduler(BaseScheduler):
402403
),
403404
)
404405

406+
dask_cluster_url = Unicode(
407+
allow_none=True,
408+
config=True,
409+
help="URL of the Dask cluster to connect to.",
410+
)
411+
405412
db_url = Unicode(help=_i18n("Scheduler database url"))
406413

407414
task_runner = Instance(allow_none=True, klass="jupyter_scheduler.task_runner.BaseTaskRunner")
@@ -425,7 +432,10 @@ def __init__(
425432

426433
def _get_dask_client(self):
427434
"""Creates and configures a Dask client."""
428-
return DaskClient()
435+
if self.dask_cluster_url:
436+
return DaskClient(self.dask_cluster_url)
437+
cluster = LocalCluster(processes=True)
438+
return DaskClient(cluster)
429439

430440
@property
431441
def db_session(self):
@@ -786,7 +796,7 @@ async def stop_extension(self):
786796
Cleanup code to run when the server is stopping.
787797
"""
788798
if self.dask_client:
789-
self.dask_client.close()
799+
await self.dask_client.close()
790800

791801

792802
class ArchivingScheduler(Scheduler):

0 commit comments

Comments
 (0)