From b9371ed0e8f3868aa07a12c9c540aaa2782cb8cc Mon Sep 17 00:00:00 2001 From: Andrii Ieroshenko Date: Thu, 8 Aug 2024 10:49:56 -0700 Subject: [PATCH] add prefect and prefect-dask --- jupyter_scheduler/executors.py | 5 +++++ jupyter_scheduler/scheduler.py | 27 ++++++++++----------------- 2 files changed, 15 insertions(+), 17 deletions(-) diff --git a/jupyter_scheduler/executors.py b/jupyter_scheduler/executors.py index 7e1a9974..a1f0660f 100644 --- a/jupyter_scheduler/executors.py +++ b/jupyter_scheduler/executors.py @@ -10,6 +10,8 @@ import nbconvert import nbformat from nbconvert.preprocessors import CellExecutionError, ExecutePreprocessor +from prefect import flow, task +from prefect_dask import DaskTaskRunner from jupyter_scheduler.models import DescribeJob, JobFeature, Status from jupyter_scheduler.orm import Job, create_session @@ -122,6 +124,7 @@ def on_complete(self): class DefaultExecutionManager(ExecutionManager): """Default execution manager that executes notebooks""" + @flow(task_runner=DaskTaskRunner) def execute(self): job = self.model @@ -144,6 +147,7 @@ def execute(self): self.add_side_effects_files(staging_dir) self.create_output_files(job, nb) + @task def add_side_effects_files(self, staging_dir: str): """Scan for side effect files potentially created after input file execution and update the job's packaged_files with these files""" input_notebook = os.path.relpath(self.staging_paths["input"]) @@ -166,6 +170,7 @@ def add_side_effects_files(self, staging_dir: str): ) session.commit() + @task def create_output_files(self, job: DescribeJob, notebook_node): for output_format in job.output_formats: cls = nbconvert.get_exporter(output_format) diff --git a/jupyter_scheduler/scheduler.py b/jupyter_scheduler/scheduler.py index 867034c6..c1d28f6b 100644 --- a/jupyter_scheduler/scheduler.py +++ b/jupyter_scheduler/scheduler.py @@ -9,6 +9,8 @@ from jupyter_core.paths import jupyter_data_dir from jupyter_server.transutils import _i18n from jupyter_server.utils import to_os_path +from prefect import flow, task +from prefect_dask import DaskTaskRunner from sqlalchemy import and_, asc, desc, func from traitlets import Instance from traitlets import Type as TType @@ -478,25 +480,16 @@ def create_job(self, model: CreateJob) -> str: else: self.copy_input_file(model.input_uri, staging_paths["input"]) - # The MP context forces new processes to not be forked on Linux. - # This is necessary because `asyncio.get_event_loop()` is bugged in - # forked processes in Python versions below 3.12. This method is - # called by `jupyter_core` by `nbconvert` in the default executor. - # - # See: https://github.com/python/cpython/issues/66285 - # See also: https://github.com/jupyter/jupyter_core/pull/362 - mp_ctx = mp.get_context("spawn") - p = mp_ctx.Process( - target=self.execution_manager_class( - job_id=job.job_id, - staging_paths=staging_paths, - root_dir=self.root_dir, - db_url=self.db_url, - ).process + execution_manager = self.execution_manager_class( + job_id=job.job_id, + staging_paths=staging_paths, + root_dir=self.root_dir, + db_url=self.db_url, ) - p.start() - job.pid = p.pid + execution_manager.process() + + job.pid = 1 # TODO: fix pid hardcode session.commit() job_id = job.job_id