Skip to content

Commit ece6ccc

Browse files
committed
use dask futures for files download instead of multiprocessing module
1 parent dd38b8c commit ece6ccc

File tree

1 file changed

+9
-6
lines changed

1 file changed

+9
-6
lines changed

jupyter_scheduler/job_files_manager.py

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,10 @@
11
import os
22
import random
33
import tarfile
4-
from multiprocessing import Process
5-
from typing import Dict, List, Optional, Type
4+
from typing import Awaitable, Dict, List, Optional, Type
65

76
import fsspec
7+
from dask.distributed import Client as DaskClient
88
from jupyter_server.utils import ensure_async
99

1010
from jupyter_scheduler.exceptions import SchedulerError
@@ -14,7 +14,10 @@
1414
class JobFilesManager:
1515
scheduler = None
1616

17-
def __init__(self, scheduler: Type[BaseScheduler]):
17+
def __init__(
18+
self,
19+
scheduler: Type[BaseScheduler],
20+
):
1821
self.scheduler = scheduler
1922

2023
async def copy_from_staging(self, job_id: str, redownload: Optional[bool] = False):
@@ -23,8 +26,9 @@ async def copy_from_staging(self, job_id: str, redownload: Optional[bool] = Fals
2326
output_filenames = self.scheduler.get_job_filenames(job)
2427
output_dir = self.scheduler.get_local_output_path(model=job, root_dir_relative=True)
2528

26-
p = Process(
27-
target=Downloader(
29+
dask_client: DaskClient = await self.scheduler.dask_client_future
30+
dask_client.submit(
31+
Downloader(
2832
output_formats=job.output_formats,
2933
output_filenames=output_filenames,
3034
staging_paths=staging_paths,
@@ -33,7 +37,6 @@ async def copy_from_staging(self, job_id: str, redownload: Optional[bool] = Fals
3337
include_staging_files=job.package_input_folder,
3438
).download
3539
)
36-
p.start()
3740

3841

3942
class Downloader:

0 commit comments

Comments
 (0)