Skip to content

Commit db62d5f

Browse files
stxue1mr-c
andcommitted
Update to connexion 3
Co-authored-by: Michael R. Crusoe <[email protected]>
1 parent 20411c4 commit db62d5f

File tree

7 files changed

+56
-51
lines changed

7 files changed

+56
-51
lines changed

mypy-requirements.txt

+1
Original file line numberDiff line numberDiff line change
@@ -3,3 +3,4 @@ types-PyYAML
33
types-requests
44
types-setuptools
55
arvados-cwl-runner
6+
flask

pyproject.toml

+1-1
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ classifiers = [
2323
]
2424
requires-python = ">=3.9"
2525
dependencies = [
26-
"connexion[swagger-ui] >= 2.0.2, < 3",
26+
"connexion[swagger-ui] >= 3, < 4",
2727
"ruamel.yaml >= 0.15.78",
2828
"schema-salad",
2929
]

wes_service/arvados_wes.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -298,7 +298,7 @@ def RunWorkflow(
298298
)
299299

300300
try:
301-
tempdir, body = self.collect_attachments(cr["uuid"])
301+
tempdir, body = self.collect_attachments(args, cr["uuid"])
302302

303303
workflow_engine_parameters = cast(
304304
dict[str, Any], body.get("workflow_engine_parameters", {})

wes_service/cwl_runner.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -198,7 +198,7 @@ def ListRuns(
198198

199199
def RunWorkflow(self, **args: str) -> dict[str, str]:
200200
"""Submit the workflow run request."""
201-
tempdir, body = self.collect_attachments()
201+
tempdir, body = self.collect_attachments(args)
202202

203203
run_id = uuid.uuid4().hex
204204
job = Workflow(run_id)

wes_service/toil_wes.py

+25-4
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,6 @@
1+
"""Toil backed for the WES service."""
2+
3+
import errno
14
import json
25
import logging
36
import os
@@ -294,18 +297,36 @@ def getstate(self) -> tuple[str, int]:
294297
logging.info("Workflow " + self.run_id + ": EXECUTOR_ERROR")
295298
open(self.staterrorfile, "a").close()
296299
return "EXECUTOR_ERROR", 255
300+
301+
# get the jobstore
302+
with open(self.jobstorefile, "r") as f:
303+
jobstore = f.read().rstrip()
297304
if (
298305
subprocess.run( # nosec B603
299306
[
300307
shutil.which("toil") or "toil",
301308
"status",
302309
"--failIfNotComplete",
303-
self.jobstorefile,
310+
jobstore,
304311
]
305312
).returncode
306313
== 0
307314
):
308-
completed = True
315+
# Get the PID of the running process
316+
with open(self.pidfile, "r") as f:
317+
pid = int(f.read())
318+
try:
319+
os.kill(pid, 0)
320+
except OSError as e:
321+
if e.errno == errno.ESRCH:
322+
# Process is no longer running, could be completed
323+
completed = True
324+
# Reap zombie child processes in a non-blocking manner
325+
os.waitpid(pid, os.WNOHANG)
326+
else:
327+
raise
328+
# If no exception, process is still running
329+
# We can't rely on toil status as the process may not have created the jobstore yet
309330
if completed:
310331
logging.info("Workflow " + self.run_id + ": COMPLETE")
311332
open(self.statcompletefile, "a").close()
@@ -354,9 +375,9 @@ def ListRuns(
354375
workflows = [{"run_id": w.run_id, "state": w.getstate()[0]} for w in wf] # NOQA
355376
return {"workflows": workflows, "next_page_token": ""}
356377

357-
def RunWorkflow(self) -> dict[str, str]:
378+
def RunWorkflow(self, **args: str) -> dict[str, str]:
358379
"""Submit the workflow run request."""
359-
tempdir, body = self.collect_attachments()
380+
tempdir, body = self.collect_attachments(args)
360381

361382
run_id = uuid.uuid4().hex
362383
job = ToilWorkflow(run_id)

wes_service/util.py

+26-42
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@
44
import tempfile
55
from typing import Any, Callable, Optional
66

7-
import connexion # type: ignore[import-untyped]
87
from werkzeug.utils import secure_filename
98

109

@@ -49,52 +48,37 @@ def log_for_run(self, run_id: Optional[str], message: str) -> None:
4948
logging.info("Workflow %s: %s", run_id, message)
5049

5150
def collect_attachments(
52-
self, run_id: Optional[str] = None
51+
self, args: dict[str, Any], run_id: Optional[str] = None
5352
) -> tuple[str, dict[str, str]]:
5453
"""Stage all attachments to a temporary directory."""
5554
tempdir = tempfile.mkdtemp()
5655
body: dict[str, str] = {}
5756
has_attachments = False
58-
for k, ls in connexion.request.files.lists():
59-
try:
60-
for v in ls:
61-
if k == "workflow_attachment":
62-
sp = v.filename.split("/")
63-
fn = []
64-
for p in sp:
65-
if p not in ("", ".", ".."):
66-
fn.append(secure_filename(p))
67-
dest = os.path.join(tempdir, *fn)
68-
if not os.path.isdir(os.path.dirname(dest)):
69-
os.makedirs(os.path.dirname(dest))
70-
self.log_for_run(
71-
run_id,
72-
f"Staging attachment {v.filename!r} to {dest!r}",
73-
)
74-
v.save(dest)
75-
has_attachments = True
76-
body[k] = (
77-
"file://%s" % tempdir
78-
) # Reference to temp working dir.
79-
elif k in ("workflow_params", "tags", "workflow_engine_parameters"):
80-
content = v.read()
81-
body[k] = json.loads(content.decode("utf-8"))
82-
else:
83-
body[k] = v.read().decode()
84-
except Exception as e:
85-
raise ValueError(f"Error reading parameter {k!r}: {e}") from e
86-
for k, ls in connexion.request.form.lists():
87-
try:
88-
for v in ls:
89-
if not v:
90-
continue
91-
if k in ("workflow_params", "tags", "workflow_engine_parameters"):
92-
body[k] = json.loads(v)
93-
else:
94-
body[k] = v
95-
except Exception as e:
96-
raise ValueError(f"Error reading parameter {k!r}: {e}") from e
97-
57+
for k, v in args.items():
58+
if k == "workflow_attachment":
59+
for file in v or []:
60+
sp = file.filename.split("/")
61+
fn = []
62+
for p in sp:
63+
if p not in ("", ".", ".."):
64+
fn.append(secure_filename(p))
65+
dest = os.path.join(tempdir, *fn)
66+
if not os.path.isdir(os.path.dirname(dest)):
67+
os.makedirs(os.path.dirname(dest))
68+
self.log_for_run(
69+
run_id,
70+
f"Staging attachment {file.filename!r} to {dest!r}",
71+
)
72+
file.save(dest)
73+
has_attachments = True
74+
body["workflow_attachment"] = (
75+
"file://%s" % tempdir
76+
) # Reference to temp working dir.
77+
elif k in ("workflow_params", "tags", "workflow_engine_parameters"):
78+
if v is not None:
79+
body[k] = json.loads(v)
80+
else:
81+
body[k] = v
9882
if "workflow_url" in body:
9983
if ":" not in body["workflow_url"]:
10084
if not has_attachments:

wes_service/wes_service_main.py

+1-2
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,6 @@ def get_parser() -> argparse.ArgumentParser:
6363
help="Example: '--opt runner=cwltoil --opt extra=--logLevel=CRITICAL' "
6464
"or '--opt extra=--workDir=/'. Accepts multiple values.",
6565
)
66-
parser.add_argument("--debug", action="store_true", default=False)
6766
parser.add_argument("--version", action="store_true", default=False)
6867
return parser
6968

@@ -78,7 +77,7 @@ def main(argv: list[str] = sys.argv[1:]) -> None:
7877

7978
app = setup(args)
8079

81-
app.run(port=args.port, debug=args.debug)
80+
app.run(port=args.port)
8281

8382

8483
if __name__ == "__main__":

0 commit comments

Comments
 (0)