Skip to content

Commit d7c1fec

Browse files
committed
instantiate dask client inside BaseScheduler, remove dask client injection into scheduler class at extension launch
1 parent 1c5a54d commit d7c1fec

File tree

3 files changed

+12
-29
lines changed

3 files changed

+12
-29
lines changed

conftest.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,6 @@ def jp_scheduler(jp_scheduler_db_url, jp_scheduler_root_dir, jp_scheduler_db):
6060
db_url=jp_scheduler_db_url,
6161
root_dir=str(jp_scheduler_root_dir),
6262
environments_manager=MockEnvironmentManager(),
63-
dask_client_future=AsyncMock(),
6463
)
6564

6665

jupyter_scheduler/extension.py

Lines changed: 4 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
import asyncio
2+
13
from dask.distributed import Client as DaskClient
24
from jupyter_core.paths import jupyter_data_dir
35
from jupyter_server.extension.application import ExtensionApp
@@ -72,15 +74,11 @@ def initialize_settings(self):
7274

7375
environments_manager = self.environment_manager_class()
7476

75-
asyncio_loop = self.serverapp.io_loop.asyncio_loop
76-
dask_client_future = asyncio_loop.create_task(self._get_dask_client())
77-
7877
scheduler = self.scheduler_class(
7978
root_dir=self.serverapp.root_dir,
8079
environments_manager=environments_manager,
8180
db_url=self.db_url,
8281
config=self.config,
83-
dask_client_future=dask_client_future,
8482
)
8583

8684
job_files_manager = self.job_files_manager_class(scheduler=scheduler)
@@ -89,28 +87,8 @@ def initialize_settings(self):
8987
environments_manager=environments_manager,
9088
scheduler=scheduler,
9189
job_files_manager=job_files_manager,
92-
dask_client_future=dask_client_future,
9390
)
9491

9592
if scheduler.task_runner:
96-
asyncio_loop.create_task(scheduler.task_runner.start())
97-
98-
async def _get_dask_client(self):
99-
"""Creates and configures a Dask client."""
100-
return DaskClient(processes=False, asynchronous=True)
101-
102-
async def stop_extension(self):
103-
"""Called by the Jupyter Server when stopping to cleanup resources."""
104-
try:
105-
await self._stop_extension()
106-
except Exception as e:
107-
self.log.error("Error while stopping Jupyter Scheduler:")
108-
self.log.exception(e)
109-
110-
async def _stop_extension(self):
111-
"""Closes the Dask client if it exists."""
112-
if "dask_client_future" in self.settings:
113-
dask_client: DaskClient = await self.settings["dask_client_future"]
114-
self.log.info("Closing Dask client.")
115-
await dask_client.close()
116-
self.log.info("Dask client closed.")
93+
loop = asyncio.get_event_loop()
94+
loop.create_task(scheduler.task_runner.start())

jupyter_scheduler/scheduler.py

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import asyncio
12
import os
23
import random
34
import shutil
@@ -99,14 +100,19 @@ def __init__(
99100
self,
100101
root_dir: str,
101102
environments_manager: Type[EnvironmentManager],
102-
dask_client_future: Awaitable[DaskClient],
103103
config=None,
104104
**kwargs,
105105
):
106106
super().__init__(config=config, **kwargs)
107107
self.root_dir = root_dir
108108
self.environments_manager = environments_manager
109-
self.dask_client_future = dask_client_future
109+
110+
loop = asyncio.get_event_loop()
111+
self.dask_client_future: Awaitable[DaskClient] = loop.create_task(self._get_dask_client())
112+
113+
async def _get_dask_client(self):
114+
"""Creates and configures a Dask client."""
115+
return DaskClient(processes=False, asynchronous=True)
110116

111117
def create_job(self, model: CreateJob) -> str:
112118
"""Creates a new job record, may trigger execution of the job.

0 commit comments

Comments
 (0)