Skip to content

Commit 10f7a3a

Browse files
committed
rename Download tables, use mp.quque directly without a wrapper
1 parent e2cbf22 commit 10f7a3a

File tree

5 files changed

+31
-100
lines changed

5 files changed

+31
-100
lines changed

jupyter_scheduler/download_manager.py

Lines changed: 19 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,12 @@
33
from multiprocessing import Queue
44
from typing import List, Optional
55

6-
from jupyter_scheduler.orm import DownloadCacheRecord, create_session, generate_uuid
6+
from jupyter_scheduler.orm import Downloads, create_session, generate_uuid
77
from jupyter_scheduler.pydantic_v1 import BaseModel
88
from jupyter_scheduler.utils import get_utc_timestamp
99

1010

11-
class DescribeDownloadCache(BaseModel):
11+
class DescribeDownload(BaseModel):
1212
job_id: str
1313
download_id: str
1414
download_initiated_time: int
@@ -31,80 +31,54 @@ def __str__(self):
3131
return f"Id: {self.job_id}, Download initiated: {download_initiated_time}"
3232

3333

34-
class MultiprocessQueue:
35-
"""A multiprocess-safe queue using multiprocessing.Queue()"""
36-
37-
def __init__(self):
38-
self.queue = Queue()
39-
40-
def put(self, download: DownloadTask):
41-
self.queue.put(download)
42-
43-
def get(self) -> Optional[DownloadTask]:
44-
return self.queue.get() if not self.queue.empty() else None
45-
46-
def isempty(self) -> bool:
47-
return self.queue.empty()
48-
49-
50-
class DownloadCache:
34+
class DownloadRecordManager:
5135
def __init__(self, db_url):
5236
self.session = create_session(db_url)
5337

54-
def put(self, download: DescribeDownloadCache):
38+
def put(self, download: DescribeDownload):
5539
with self.session() as session:
56-
new_download = DownloadCacheRecord(**download.dict())
40+
new_download = Downloads(**download.dict())
5741
session.add(new_download)
5842
session.commit()
5943

60-
def get(self, job_id: str) -> Optional[DescribeDownloadCache]:
44+
def get(self, job_id: str) -> Optional[DescribeDownload]:
6145
with self.session() as session:
62-
download = (
63-
session.query(DownloadCacheRecord)
64-
.filter(DownloadCacheRecord.job_id == job_id)
65-
.first()
66-
)
46+
download = session.query(Downloads).filter(Downloads.job_id == job_id).first()
6747

6848
if download:
69-
return DescribeDownloadCache.from_orm(download)
49+
return DescribeDownload.from_orm(download)
7050
else:
7151
return None
7252

73-
def get_tasks(self) -> List[DescribeDownloadCache]:
53+
def get_tasks(self) -> List[DescribeDownload]:
7454
with self.session() as session:
75-
return (
76-
session.query(DownloadCacheRecord)
77-
.order_by(DownloadCacheRecord.download_initiated_time)
78-
.all()
79-
)
55+
return session.query(Downloads).order_by(Downloads.download_initiated_time).all()
8056

8157
def delete_download(self, download_id: str):
8258
with self.session() as session:
83-
session.query(DownloadCacheRecord).filter(
84-
DownloadCacheRecord.download_id == download_id
85-
).delete()
59+
session.query(Downloads).filter(Downloads.download_id == download_id).delete()
8660
session.commit()
8761

8862
def delete_job_downloads(self, job_id: str):
8963
with self.session() as session:
90-
session.query(DownloadCacheRecord).filter(DownloadCacheRecord.job_id == job_id).delete()
64+
session.query(Downloads).filter(Downloads.job_id == job_id).delete()
9165
session.commit()
9266

9367

9468
class DownloadManager:
9569
def __init__(self, db_url: str):
96-
self.cache = DownloadCache(db_url=db_url)
97-
self.queue = MultiprocessQueue()
70+
self.record_manager = DownloadRecordManager(db_url=db_url)
71+
self.queue = Queue()
9872

9973
def download_from_staging(self, job_id: str):
10074
download_initiated_time = get_utc_timestamp()
10175
download_id = generate_uuid()
102-
download_cache = DescribeDownloadCache(
76+
download_cache = DescribeDownload(
10377
job_id=job_id,
10478
download_id=download_id,
10579
download_initiated_time=download_initiated_time,
10680
)
107-
self.cache.put(download_cache)
81+
self.record_manager.put(download_cache)
10882
download_task = DownloadTask(
10983
job_id=job_id,
11084
download_id=download_id,
@@ -113,13 +87,13 @@ def download_from_staging(self, job_id: str):
11387
self.queue.put(download_task)
11488

11589
def delete_download(self, download_id: str):
116-
self.cache.delete_download(download_id)
90+
self.record_manager.delete_download(download_id)
11791

11892
def delete_job_downloads(self, job_id: str):
119-
self.cache.delete_job_downloads(job_id)
93+
self.record_manager.delete_job_downloads(job_id)
12094

12195
def populate_queue(self):
122-
tasks = self.cache.get_tasks()
96+
tasks = self.record_manager.get_tasks()
12397
for task in tasks:
12498
download_task = DownloadTask(
12599
job_id=task.job_id,

jupyter_scheduler/download_runner.py

Lines changed: 4 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -14,56 +14,20 @@ def __init__(
1414
self,
1515
download_manager: DownloadManager,
1616
job_files_manager: JobFilesManager,
17-
poll_interval: int = 10,
17+
poll_interval: int = 5,
1818
):
1919
self.download_manager = download_manager
2020
self.job_files_manager = job_files_manager
2121
self.poll_interval = poll_interval
2222

23-
# def add_download(self, job_id: str):
24-
# download_initiated_time = get_utc_timestamp()
25-
# download_id = generate_uuid()
26-
# download_cache = DescribeDownloadCache(
27-
# job_id=job_id,
28-
# download_id=download_id,
29-
# download_initiated_time=download_initiated_time,
30-
# )
31-
# self.download_cache.put(download_cache)
32-
# download_task = DownloadTask(
33-
# job_id=job_id,
34-
# download_id=download_id,
35-
# download_initiated_time=download_initiated_time,
36-
# )
37-
# self.download_queue.put(download_task)
38-
39-
# def delete_download(self, download_id: str):
40-
# self.download_cache.delete_download(download_id)
41-
42-
# def delete_job_downloads(self, job_id: str):
43-
# self.download_cache.delete_job_downloads(job_id)
44-
4523
async def process_download_queue(self):
46-
print("\n\n***\nDownloadRunner.process_download_queue isempty")
47-
print(self.download_manager.queue.isempty())
48-
while not self.download_manager.queue.isempty():
24+
while not self.download_manager.queue.empty():
4925
download = self.download_manager.queue.get()
50-
print(download)
51-
cache = self.download_manager.cache.get(download.job_id)
52-
print(cache)
26+
cache = self.download_manager.record_manager.get(download.job_id)
5327
if not cache or not download:
5428
continue
5529
await self.job_files_manager.copy_from_staging(cache.job_id)
56-
self.download_manager.cache.delete_download(cache.download_id)
57-
58-
# def populate_queue(self):
59-
# tasks = self.download_manager.cache.get_tasks()
60-
# for task in tasks:
61-
# download_task = DownloadTask(
62-
# job_id=task.job_id,
63-
# download_id=task.download_id,
64-
# download_initiated_time=task.download_initiated_time,
65-
# )
66-
# self.download_manager.queue.put(download_task)
30+
self.download_manager.record_manager.delete_download(cache.download_id)
6731

6832
async def start(self):
6933
self.download_manager.populate_queue()

jupyter_scheduler/executors.py

Lines changed: 4 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -13,10 +13,9 @@
1313
from nbconvert.preprocessors import CellExecutionError, ExecutePreprocessor
1414

1515
from jupyter_scheduler.download_manager import (
16-
DescribeDownloadCache,
17-
DownloadCacheRecord,
16+
DescribeDownload,
17+
Downloads,
1818
DownloadTask,
19-
MultiprocessQueue,
2019
)
2120
from jupyter_scheduler.models import DescribeJob, JobFeature, JobFile, Status
2221
from jupyter_scheduler.orm import Job, create_session, generate_uuid
@@ -167,13 +166,13 @@ def execute(self):
167166
def download_from_staging(self, job_id: str):
168167
download_initiated_time = get_utc_timestamp()
169168
download_id = generate_uuid()
170-
download_cache = DescribeDownloadCache(
169+
download_cache = DescribeDownload(
171170
job_id=job_id,
172171
download_id=download_id,
173172
download_initiated_time=download_initiated_time,
174173
)
175174
with self.db_session() as session:
176-
new_download = DownloadCacheRecord(**download_cache.dict())
175+
new_download = Downloads(**download_cache.dict())
177176
session.add(new_download)
178177
session.commit()
179178
download_task = DownloadTask(
@@ -182,11 +181,6 @@ def download_from_staging(self, job_id: str):
182181
download_initiated_time=download_initiated_time,
183182
)
184183
self.download_queue.put(download_task)
185-
# print(
186-
# "\n\n***\n ExecutionManager.download_from_staging uuid and task being put on a qeueue"
187-
# )
188-
# print(download_id)
189-
# print(download_task)
190184

191185
def add_side_effects_files(self, staging_dir):
192186
"""Scan for side effect files potentially created after input file execution and update the job's packaged_files with these files"""

jupyter_scheduler/orm.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -111,8 +111,8 @@ class JobDefinition(CommonColumns, Base):
111111
active = Column(Boolean, default=True)
112112

113113

114-
class DownloadCacheRecord(Base):
115-
__tablename__ = "download_cache"
114+
class Downloads(Base):
115+
__tablename__ = "downloads"
116116
job_id = Column(String(36), primary_key=True)
117117
download_id = Column(String(36), primary_key=True)
118118
download_initiated_time = Column(Integer)

jupyter_scheduler/scheduler.py

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@
1515
from traitlets import Unicode, default
1616
from traitlets.config import LoggingConfigurable
1717

18-
from jupyter_scheduler.download_manager import MultiprocessQueue
1918
from jupyter_scheduler.environments import EnvironmentManager
2019
from jupyter_scheduler.exceptions import (
2120
IdempotencyTokenError,
@@ -405,7 +404,7 @@ def __init__(
405404
root_dir: str,
406405
environments_manager: Type[EnvironmentManager],
407406
db_url: str,
408-
download_queue: MultiprocessQueue,
407+
download_queue: mp.Queue,
409408
config=None,
410409
**kwargs,
411410
):
@@ -495,7 +494,7 @@ def create_job(self, model: CreateJob) -> str:
495494
staging_paths=staging_paths,
496495
root_dir=self.root_dir,
497496
db_url=self.db_url,
498-
download_queue=self.download_queue.queue,
497+
download_queue=self.download_queue,
499498
).process
500499
)
501500
p.start()

0 commit comments

Comments
 (0)