Skip to content

Commit f83c6df

Browse files
Use APScheduler
Use APScheduler Use APScheduler Use APScheduler Use APScheduler
1 parent bc808fe commit f83c6df

File tree

10 files changed

+1701
-1278
lines changed

10 files changed

+1701
-1278
lines changed

pydatalab/pyproject.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ server = [
5353
"Flask-PyMongo ~= 2.3",
5454
"Flask-Mail ~= 0.10",
5555
"Flask-Compress ~= 1.15",
56+
"APScheduler ~= 3.10",
5657
"Werkzeug ~= 3.0",
5758
"python-dotenv ~= 1.0",
5859
"pillow ~= 11.0",

pydatalab/src/pydatalab/export_utils.py

Lines changed: 7 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -146,36 +146,21 @@ def create_eln_file(collection_id: str, output_path: str) -> None:
146146

147147
item_metadata = {k: v for k, v in item.items() if k not in ["_id", "file_ObjectIds"]}
148148

149-
for key, value in item_metadata.items():
150-
if isinstance(value, ObjectId):
151-
item_metadata[key] = str(value)
152-
elif isinstance(value, datetime):
153-
item_metadata[key] = value.isoformat()
154-
elif isinstance(value, dict):
155-
item_metadata[key] = _convert_objectids_in_dict(value)
156-
elif isinstance(value, list):
157-
item_metadata[key] = [
158-
str(v)
159-
if isinstance(v, ObjectId)
160-
else v.isoformat()
161-
if isinstance(v, datetime)
162-
else _convert_objectids_in_dict(v)
163-
if isinstance(v, dict)
164-
else v
165-
for v in value
166-
]
149+
item_metadata = _convert_objectids_in_dict(item_metadata)
167150

168151
with open(item_folder / "metadata.json", "w", encoding="utf-8") as f:
169152
json.dump(item_metadata, f, indent=2, ensure_ascii=False)
170153

171154
if item.get("file_ObjectIds"):
172-
for file_id in item["file_ObjectIds"]:
155+
for file_id in item.get("file_ObjectIds", []):
173156
file_data = flask_mongo.db.files.find_one({"_id": ObjectId(file_id)})
174-
if file_data and file_data.get("location"):
157+
if file_data:
175158
source_path = Path(file_data["location"])
176159
if source_path.exists():
177-
dest_path = item_folder / file_data["name"]
178-
shutil.copy2(source_path, dest_path)
160+
dest_file = item_folder / file_data["name"]
161+
shutil.copy2(source_path, dest_file)
162+
else:
163+
print(f"Warning: File not found on disk: {file_data['location']}")
179164

180165
with zipfile.ZipFile(output_path, "w", zipfile.ZIP_DEFLATED) as zipf:
181166
for file_path in root_folder.rglob("*"):

pydatalab/src/pydatalab/models/export_task.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
from datetime import datetime
1+
from datetime import datetime, timezone
22
from enum import Enum
33

44
from pydantic import BaseModel, Field
@@ -23,7 +23,8 @@ class ExportTask(BaseModel):
2323
)
2424
creator_id: str = Field(..., description="ID of the user who created the export")
2525
created_at: datetime = Field(
26-
default_factory=datetime.now, description="When the task was created"
26+
default_factory=lambda: datetime.now(tz=timezone.utc),
27+
description="When the task was created",
2728
)
2829
completed_at: datetime | None = Field(None, description="When the task was completed")
2930
file_path: str | None = Field(None, description="Path to the generated .eln file")

pydatalab/src/pydatalab/mongo.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -217,5 +217,6 @@ def create_user_fts():
217217
ret += db.export_tasks.create_index(
218218
"created_at", name="export task created at", background=background
219219
)
220+
ret += db.export_tasks.create_index("status", name="export task status", background=background)
220221

221222
return ret

pydatalab/src/pydatalab/routes/v0_1/export.py

Lines changed: 54 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,17 @@
11
import os
2-
import threading
32
import uuid
43
from datetime import datetime, timezone
54
from pathlib import Path
65

7-
from flask import Blueprint, jsonify, send_file
6+
from flask import Blueprint, current_app, jsonify, send_file
87
from flask_login import current_user
98

109
from pydatalab.config import CONFIG
1110
from pydatalab.export_utils import create_eln_file
1211
from pydatalab.models.export_task import ExportStatus, ExportTask
1312
from pydatalab.mongo import flask_mongo
1413
from pydatalab.permissions import active_users_or_get_only
14+
from pydatalab.scheduler import export_scheduler
1515

1616
EXPORT = Blueprint("export", __name__)
1717

@@ -21,62 +21,63 @@
2121
def _(): ...
2222

2323

24-
def _generate_export_in_background(task_id: str, collection_id: str):
24+
def _generate_export_in_background(task_id: str, collection_id: str, app):
2525
"""Background function to generate the .eln file.
2626
2727
Parameters:
2828
task_id: ID of the export task
2929
collection_id: ID of the collection to export
30+
app: Flask application instance
3031
"""
31-
try:
32-
flask_mongo.db.export_tasks.update_one(
33-
{"task_id": task_id}, {"$set": {"status": ExportStatus.PROCESSING}}
34-
)
35-
36-
export_dir = Path(CONFIG.FILE_DIRECTORY) / "exports"
37-
export_dir.mkdir(exist_ok=True)
38-
39-
output_path = export_dir / f"{task_id}.eln"
40-
create_eln_file(collection_id, str(output_path))
41-
42-
flask_mongo.db.export_tasks.update_one(
43-
{"task_id": task_id},
44-
{
45-
"$set": {
46-
"status": ExportStatus.READY,
47-
"file_path": str(output_path),
48-
"completed_at": datetime.now(tz=timezone.utc),
49-
}
50-
},
51-
)
52-
53-
except Exception as e:
54-
flask_mongo.db.export_tasks.update_one(
55-
{"task_id": task_id},
56-
{
57-
"$set": {
58-
"status": ExportStatus.ERROR,
59-
"error_message": str(e),
60-
"completed_at": datetime.now(tz=timezone.utc),
61-
}
62-
},
63-
)
32+
with app.app_context():
33+
try:
34+
flask_mongo.db.export_tasks.update_one(
35+
{"task_id": task_id}, {"$set": {"status": ExportStatus.PROCESSING}}
36+
)
37+
38+
export_dir = Path(CONFIG.FILE_DIRECTORY) / "exports"
39+
export_dir.mkdir(exist_ok=True)
40+
41+
output_path = export_dir / f"{task_id}.eln"
42+
create_eln_file(collection_id, str(output_path))
43+
44+
flask_mongo.db.export_tasks.update_one(
45+
{"task_id": task_id},
46+
{
47+
"$set": {
48+
"status": ExportStatus.READY,
49+
"file_path": str(output_path),
50+
"completed_at": datetime.now(tz=timezone.utc),
51+
}
52+
},
53+
)
54+
55+
except Exception as e:
56+
flask_mongo.db.export_tasks.update_one(
57+
{"task_id": task_id},
58+
{
59+
"$set": {
60+
"status": ExportStatus.ERROR,
61+
"error_message": str(e),
62+
"completed_at": datetime.now(tz=timezone.utc),
63+
}
64+
},
65+
)
6466

6567

6668
@EXPORT.route("/collections/<string:collection_id>/export", methods=["POST"])
6769
def start_collection_export(collection_id: str):
68-
"""Start exporting a collection to .eln format.
69-
70-
Parameters:
71-
collection_id: The collection ID to export
70+
from pydatalab.permissions import get_default_permissions
7271

73-
Returns:
74-
JSON response with task_id and status_url
75-
"""
72+
collection_exists = flask_mongo.db.collections.find_one({"collection_id": collection_id})
73+
if not collection_exists:
74+
return jsonify({"status": "error", "message": "Collection not found"}), 404
7675

77-
collection = flask_mongo.db.collections.find_one({"collection_id": collection_id})
78-
if not collection:
79-
return jsonify({"status": "error", "message": f"Collection {collection_id} not found"}), 404
76+
collection_with_perms = flask_mongo.db.collections.find_one(
77+
{"collection_id": collection_id, **get_default_permissions(user_only=True)}
78+
)
79+
if not collection_with_perms:
80+
return jsonify({"status": "error", "message": "Access denied"}), 403
8081

8182
task_id = str(uuid.uuid4())
8283

@@ -94,9 +95,13 @@ def start_collection_export(collection_id: str):
9495

9596
flask_mongo.db.export_tasks.insert_one(export_task.dict())
9697

97-
thread = threading.Thread(target=_generate_export_in_background, args=(task_id, collection_id))
98-
thread.daemon = True
99-
thread.start()
98+
app = current_app._get_current_object()
99+
100+
export_scheduler.add_job(
101+
func=_generate_export_in_background,
102+
args=[task_id, collection_id, app],
103+
job_id=f"export_{task_id}",
104+
)
100105

101106
return jsonify(
102107
{"status": "success", "task_id": task_id, "status_url": f"/exports/{task_id}/status"}
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
from apscheduler.executors.pool import ThreadPoolExecutor
2+
from apscheduler.jobstores.memory import MemoryJobStore
3+
from apscheduler.schedulers.background import BackgroundScheduler
4+
5+
6+
class ExportScheduler:
7+
_instance = None
8+
_scheduler = None
9+
10+
def __new__(cls):
11+
if cls._instance is None:
12+
cls._instance = super().__new__(cls)
13+
return cls._instance
14+
15+
def init_scheduler(self):
16+
if self._scheduler is None:
17+
jobstores = {"default": MemoryJobStore()}
18+
executors = {"default": ThreadPoolExecutor(10)}
19+
job_defaults = {"coalesce": False, "max_instances": 3}
20+
21+
self._scheduler = BackgroundScheduler(
22+
jobstores=jobstores, executors=executors, job_defaults=job_defaults, timezone="UTC"
23+
)
24+
self._scheduler.start()
25+
return self._scheduler
26+
27+
def get_scheduler(self):
28+
if self._scheduler is None:
29+
self.init_scheduler()
30+
return self._scheduler
31+
32+
def add_job(self, func, args, job_id=None):
33+
"""Add a job to the scheduler or run it in a thread if APScheduler not available."""
34+
if self._scheduler:
35+
return self._scheduler.add_job(func=func, args=args, id=job_id, replace_existing=True)
36+
else:
37+
import threading
38+
39+
thread = threading.Thread(target=func, args=args)
40+
thread.daemon = True
41+
thread.start()
42+
return None
43+
44+
def shutdown(self):
45+
if self._scheduler and self._scheduler.running:
46+
self._scheduler.shutdown()
47+
48+
49+
export_scheduler = ExportScheduler()

0 commit comments

Comments
 (0)