Skip to content

Commit d057bf3

Browse files
andrii-ipre-commit-ci[bot]github-actions[bot]JasonWeill
committed
Package input files (no autodownload, no multiprocessing DownloadManager) (jupyter-server#510)
* package input files and folders (backend) * package input files and folders (frontend) * remove "input_dir" from staging_paths dict * ensure execution context matches the notebook directory * update snapshots * copy staging folder to output folder after job runs (SUCESS or FAILURE) * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * copy staging folder and side effects to output after job runs, track and redownload files * remove staging to output copying logic from executor * refactor output files creation logic into a separate function for clarity * Fix job definition data model * add packaged_files to JobDefinition and DescribeJobDefinition model * fix existing pytests * clarify FilesDirectoryLink title * Dynamically display input folder in the checkbox text * display packageInputFolder parameter as 'Files included' * use helper text with input directory for 'include files' checkbox * Update Playwright Snapshots * add test side effects accountability test for execution manager * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * Use "Run job with input folder" for packageInputFolder checkbox text * Update Playwright Snapshots * Use "Ran with input folder" in detail page * Update src/components/input-folder-checkbox.tsx Co-authored-by: Jason Weill <[email protected]> * fix lint error * Update Playwright Snapshots * Update existing screenshots * Update "Submit the Create Job" section mentioning “Run job with input folder” option * Update docs/users/index.md Co-authored-by: Jason Weill <[email protected]> * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * Update src/components/input-folder-checkbox.tsx Co-authored-by: Jason Weill <[email protected]> * Update Playwright Snapshots * Describe side effects behavior better --------- Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com> Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com> Co-authored-by: Jason Weill <[email protected]>
1 parent 5daf94c commit d057bf3

23 files changed

+396
-31
lines changed

docs/operators/index.md

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -89,9 +89,9 @@ For more information on writing a custom implementation, please see the {doc}`de
8989
### Example: Capturing side effect files
9090

9191
The default scheduler and execution manager classes do not capture
92-
**side effect files**, files that are created as a side effect of executing
93-
cells in a notebook. The `ArchivingScheduler` and `ArchivingExecutionManager`
94-
classes do capture side effect files. If you intend to run notebooks that produce
92+
**side effect files** (files that are created as a side effect of executing
93+
cells in a notebook) unless “Run job with input folder” is checked. The `ArchivingScheduler` and `ArchivingExecutionManager`
94+
classes do capture side effect files by default. If you intend to run notebooks that produce
9595
side effect files, you can use these classes by running:
9696

9797
```

docs/users/images/create_job_form.png

17.4 KB
Loading

docs/users/index.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,10 @@ To create a _job_ or _job definition_ from an open notebook, click on a “Creat
4444

4545
Give your notebook job or job definition a name, choose an environment to run it in, select its output formats, and provide parameters that are set as local variables when your notebook gets executed. This parameterized execution is similar to the one used in [Papermill](https://papermill.readthedocs.io/en/latest/).
4646

47+
If you check "Run job with input folder", the scheduled job will have access to all files within the same folder as the input file.
48+
The scheduler will copy all files from the input file to a staging directory at runtime, and will make these files and any side effect files created during the job run available for download after the job has finished.
49+
Use caution with this option if your input file's directory has many large files in it.
50+
4751
To create a _job_ that runs once, select "Run now" in the "Schedule" section, and click "Create".
4852
!["Create Job Form"](./images/create_job_form.png)
4953

jupyter_scheduler/executors.py

Lines changed: 34 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -131,21 +131,47 @@ def execute(self):
131131
if job.parameters:
132132
nb = add_parameters(nb, job.parameters)
133133

134+
staging_dir = os.path.dirname(self.staging_paths["input"])
134135
ep = ExecutePreprocessor(
135-
kernel_name=nb.metadata.kernelspec["name"],
136-
store_widget_state=True,
136+
kernel_name=nb.metadata.kernelspec["name"], store_widget_state=True, cwd=staging_dir
137137
)
138138

139139
try:
140-
ep.preprocess(nb)
140+
ep.preprocess(nb, {"metadata": {"path": staging_dir}})
141141
except CellExecutionError as e:
142142
raise e
143143
finally:
144-
for output_format in job.output_formats:
145-
cls = nbconvert.get_exporter(output_format)
146-
output, resources = cls().from_notebook_node(nb)
147-
with fsspec.open(self.staging_paths[output_format], "w", encoding="utf-8") as f:
148-
f.write(output)
144+
self.add_side_effects_files(staging_dir)
145+
self.create_output_files(job, nb)
146+
147+
def add_side_effects_files(self, staging_dir: str):
148+
"""Scan for side effect files potentially created after input file execution and update the job's packaged_files with these files"""
149+
input_notebook = os.path.relpath(self.staging_paths["input"])
150+
new_files_set = set()
151+
for root, _, files in os.walk(staging_dir):
152+
for file in files:
153+
file_rel_path = os.path.relpath(os.path.join(root, file), staging_dir)
154+
if file_rel_path != input_notebook:
155+
new_files_set.add(file_rel_path)
156+
157+
if new_files_set:
158+
with self.db_session() as session:
159+
current_packaged_files_set = set(
160+
session.query(Job.packaged_files).filter(Job.job_id == self.job_id).scalar()
161+
or []
162+
)
163+
updated_packaged_files = list(current_packaged_files_set.union(new_files_set))
164+
session.query(Job).filter(Job.job_id == self.job_id).update(
165+
{"packaged_files": updated_packaged_files}
166+
)
167+
session.commit()
168+
169+
def create_output_files(self, job: DescribeJob, notebook_node):
170+
for output_format in job.output_formats:
171+
cls = nbconvert.get_exporter(output_format)
172+
output, _ = cls().from_notebook_node(notebook_node)
173+
with fsspec.open(self.staging_paths[output_format], "w", encoding="utf-8") as f:
174+
f.write(output)
149175

150176
def supported_features(cls) -> Dict[JobFeature, bool]:
151177
return {

jupyter_scheduler/job_files_manager.py

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ async def copy_from_staging(self, job_id: str, redownload: Optional[bool] = Fals
2121
job = await ensure_async(self.scheduler.get_job(job_id, False))
2222
staging_paths = await ensure_async(self.scheduler.get_staging_paths(job))
2323
output_filenames = self.scheduler.get_job_filenames(job)
24-
output_dir = self.scheduler.get_local_output_path(job)
24+
output_dir = self.scheduler.get_local_output_path(model=job, root_dir_relative=True)
2525

2626
p = Process(
2727
target=Downloader(
@@ -30,6 +30,7 @@ async def copy_from_staging(self, job_id: str, redownload: Optional[bool] = Fals
3030
staging_paths=staging_paths,
3131
output_dir=output_dir,
3232
redownload=redownload,
33+
include_staging_files=job.package_input_folder,
3334
).download
3435
)
3536
p.start()
@@ -43,22 +44,30 @@ def __init__(
4344
staging_paths: Dict[str, str],
4445
output_dir: str,
4546
redownload: bool,
47+
include_staging_files: bool = False,
4648
):
4749
self.output_formats = output_formats
4850
self.output_filenames = output_filenames
4951
self.staging_paths = staging_paths
5052
self.output_dir = output_dir
5153
self.redownload = redownload
54+
self.include_staging_files = include_staging_files
5255

5356
def generate_filepaths(self):
5457
"""A generator that produces filepaths"""
5558
output_formats = self.output_formats + ["input"]
56-
5759
for output_format in output_formats:
5860
input_filepath = self.staging_paths[output_format]
5961
output_filepath = os.path.join(self.output_dir, self.output_filenames[output_format])
6062
if not os.path.exists(output_filepath) or self.redownload:
6163
yield input_filepath, output_filepath
64+
if self.include_staging_files:
65+
staging_dir = os.path.dirname(self.staging_paths["input"])
66+
for file_relative_path in self.output_filenames["files"]:
67+
input_filepath = os.path.join(staging_dir, file_relative_path)
68+
output_filepath = os.path.join(self.output_dir, file_relative_path)
69+
if not os.path.exists(output_filepath) or self.redownload:
70+
yield input_filepath, output_filepath
6271

6372
def download_tar(self, archive_format: str = "tar"):
6473
archive_filepath = self.staging_paths[archive_format]

jupyter_scheduler/models.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,7 @@ class CreateJob(BaseModel):
8585
name: str
8686
output_filename_template: Optional[str] = OUTPUT_FILENAME_TEMPLATE
8787
compute_type: Optional[str] = None
88+
package_input_folder: Optional[bool] = None
8889

8990
@root_validator
9091
def compute_input_filename(cls, values) -> Dict:
@@ -145,6 +146,8 @@ class DescribeJob(BaseModel):
145146
status: Status = Status.CREATED
146147
status_message: Optional[str] = None
147148
downloaded: bool = False
149+
package_input_folder: Optional[bool] = None
150+
packaged_files: Optional[List[str]] = []
148151

149152
class Config:
150153
orm_mode = True
@@ -209,6 +212,7 @@ class CreateJobDefinition(BaseModel):
209212
compute_type: Optional[str] = None
210213
schedule: Optional[str] = None
211214
timezone: Optional[str] = None
215+
package_input_folder: Optional[bool] = None
212216

213217
@root_validator
214218
def compute_input_filename(cls, values) -> Dict:
@@ -234,6 +238,8 @@ class DescribeJobDefinition(BaseModel):
234238
create_time: int
235239
update_time: int
236240
active: bool
241+
package_input_folder: Optional[bool] = None
242+
packaged_files: Optional[List[str]] = []
237243

238244
class Config:
239245
orm_mode = True

jupyter_scheduler/orm.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,8 @@ class CommonColumns:
8585
output_filename_template = Column(String(256))
8686
update_time = Column(Integer, default=get_utc_timestamp, onupdate=get_utc_timestamp)
8787
create_time = Column(Integer, default=get_utc_timestamp)
88+
package_input_folder = Column(Boolean)
89+
packaged_files = Column(JsonType, default=[])
8890

8991

9092
class Job(CommonColumns, Base):

jupyter_scheduler/scheduler.py

Lines changed: 96 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
import os
33
import random
44
import shutil
5-
from typing import Dict, Optional, Type, Union
5+
from typing import Dict, List, Optional, Type, Union
66

77
import fsspec
88
import psutil
@@ -39,7 +39,11 @@
3939
UpdateJobDefinition,
4040
)
4141
from jupyter_scheduler.orm import Job, JobDefinition, create_session
42-
from jupyter_scheduler.utils import create_output_directory, create_output_filename
42+
from jupyter_scheduler.utils import (
43+
copy_directory,
44+
create_output_directory,
45+
create_output_filename,
46+
)
4347

4448

4549
class BaseScheduler(LoggingConfigurable):
@@ -248,7 +252,29 @@ def file_exists(self, path: str):
248252
else:
249253
return os.path.isfile(os_path)
250254

251-
def get_job_filenames(self, model: DescribeJob) -> Dict[str, str]:
255+
def dir_exists(self, path: str):
256+
"""Returns True if the directory exists, else returns False.
257+
258+
API-style wrapper for os.path.isdir
259+
260+
Parameters
261+
----------
262+
path : string
263+
The relative path to the directory (with '/' as separator)
264+
265+
Returns
266+
-------
267+
exists : bool
268+
Whether the directory exists.
269+
"""
270+
root = os.path.abspath(self.root_dir)
271+
os_path = to_os_path(path, root)
272+
if not (os.path.abspath(os_path) + os.path.sep).startswith(root):
273+
return False
274+
else:
275+
return os.path.isdir(os_path)
276+
277+
def get_job_filenames(self, model: DescribeJob) -> Dict[str, Union[str, List[str]]]:
252278
"""Returns dictionary mapping output formats to
253279
the job filenames in the JupyterLab workspace.
254280
@@ -265,7 +291,8 @@ def get_job_filenames(self, model: DescribeJob) -> Dict[str, str]:
265291
{
266292
'ipynb': 'helloworld-2022-10-10.ipynb',
267293
'html': 'helloworld-2022-10-10.html',
268-
'input': 'helloworld.ipynb'
294+
'input': 'helloworld.ipynb',
295+
'files': ['data/helloworld.csv', 'images/helloworld.png']
269296
}
270297
271298
"""
@@ -278,6 +305,9 @@ def get_job_filenames(self, model: DescribeJob) -> Dict[str, str]:
278305

279306
filenames["input"] = model.input_filename
280307

308+
if model.package_input_folder and model.packaged_files:
309+
filenames["files"] = [relative_path for relative_path in model.packaged_files]
310+
281311
return filenames
282312

283313
def add_job_files(self, model: DescribeJob):
@@ -289,7 +319,8 @@ def add_job_files(self, model: DescribeJob):
289319
mapping = self.environments_manager.output_formats_mapping()
290320
job_files = []
291321
output_filenames = self.get_job_filenames(model)
292-
output_dir = os.path.relpath(self.get_local_output_path(model), self.root_dir)
322+
output_dir = self.get_local_output_path(model, root_dir_relative=True)
323+
293324
for output_format in model.output_formats:
294325
filename = output_filenames[output_format]
295326
output_path = os.path.join(output_dir, filename)
@@ -313,16 +344,42 @@ def add_job_files(self, model: DescribeJob):
313344
)
314345
)
315346

347+
# Add link to output folder with packaged input files and side effects
348+
if model.package_input_folder and model.packaged_files:
349+
job_files.append(
350+
JobFile(
351+
display_name="Files",
352+
file_format="files",
353+
file_path=output_dir if self.dir_exists(output_dir) else None,
354+
)
355+
)
356+
316357
model.job_files = job_files
317-
model.downloaded = all(job_file.file_path for job_file in job_files)
318358

319-
def get_local_output_path(self, model: DescribeJob) -> str:
359+
packaged_files = []
360+
if model.package_input_folder and model.packaged_files:
361+
packaged_files = [
362+
os.path.join(output_dir, packaged_file_rel_path)
363+
for packaged_file_rel_path in model.packaged_files
364+
]
365+
model.downloaded = all(job_file.file_path for job_file in job_files) and all(
366+
self.file_exists(file_path) for file_path in packaged_files
367+
)
368+
369+
def get_local_output_path(
370+
self, model: DescribeJob, root_dir_relative: Optional[bool] = False
371+
) -> str:
320372
"""Returns the local output directory path
321373
where all the job files will be downloaded
322374
from the staging location.
323375
"""
324376
output_dir_name = create_output_directory(model.input_filename, model.job_id)
325-
return os.path.join(self.root_dir, self.output_directory, output_dir_name)
377+
if root_dir_relative:
378+
return os.path.relpath(
379+
os.path.join(self.root_dir, self.output_directory, output_dir_name), self.root_dir
380+
)
381+
else:
382+
return os.path.join(self.root_dir, self.output_directory, output_dir_name)
326383

327384

328385
class Scheduler(BaseScheduler):
@@ -371,6 +428,15 @@ def copy_input_file(self, input_uri: str, copy_to_path: str):
371428
with fsspec.open(copy_to_path, "wb") as output_file:
372429
output_file.write(input_file.read())
373430

431+
def copy_input_folder(self, input_uri: str, nb_copy_to_path: str) -> List[str]:
432+
"""Copies the input file along with the input directory to the staging directory, returns the list of copied files relative to the staging directory"""
433+
input_dir_path = os.path.dirname(os.path.join(self.root_dir, input_uri))
434+
staging_dir = os.path.dirname(nb_copy_to_path)
435+
return copy_directory(
436+
source_dir=input_dir_path,
437+
destination_dir=staging_dir,
438+
)
439+
374440
def create_job(self, model: CreateJob) -> str:
375441
if not model.job_definition_id and not self.file_exists(model.input_uri):
376442
raise InputUriError(model.input_uri)
@@ -397,11 +463,20 @@ def create_job(self, model: CreateJob) -> str:
397463
model.output_formats = []
398464

399465
job = Job(**model.dict(exclude_none=True, exclude={"input_uri"}))
466+
400467
session.add(job)
401468
session.commit()
402469

403470
staging_paths = self.get_staging_paths(DescribeJob.from_orm(job))
404-
self.copy_input_file(model.input_uri, staging_paths["input"])
471+
if model.package_input_folder:
472+
copied_files = self.copy_input_folder(model.input_uri, staging_paths["input"])
473+
input_notebook_filename = os.path.basename(model.input_uri)
474+
job.packaged_files = [
475+
file for file in copied_files if file != input_notebook_filename
476+
]
477+
session.commit()
478+
else:
479+
self.copy_input_file(model.input_uri, staging_paths["input"])
405480

406481
# The MP context forces new processes to not be forked on Linux.
407482
# This is necessary because `asyncio.get_event_loop()` is bugged in
@@ -538,12 +613,22 @@ def create_job_definition(self, model: CreateJobDefinition) -> str:
538613
session.add(job_definition)
539614
session.commit()
540615

616+
# copy values for use after session is closed to avoid DetachedInstanceError
541617
job_definition_id = job_definition.job_definition_id
618+
job_definition_schedule = job_definition.schedule
542619

543620
staging_paths = self.get_staging_paths(DescribeJobDefinition.from_orm(job_definition))
544-
self.copy_input_file(model.input_uri, staging_paths["input"])
621+
if model.package_input_folder:
622+
copied_files = self.copy_input_folder(model.input_uri, staging_paths["input"])
623+
input_notebook_filename = os.path.basename(model.input_uri)
624+
job_definition.packaged_files = [
625+
file for file in copied_files if file != input_notebook_filename
626+
]
627+
session.commit()
628+
else:
629+
self.copy_input_file(model.input_uri, staging_paths["input"])
545630

546-
if self.task_runner and job_definition.schedule:
631+
if self.task_runner and job_definition_schedule:
547632
self.task_runner.add_job_definition(job_definition_id)
548633

549634
return job_definition_id

0 commit comments

Comments
 (0)