Skip to content

Update to connexion 3 #136

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ test: $(PYSOURCES) FORCE

## testcov : run the wes-service test suite and collect coverage
testcov: $(PYSOURCES)
python -m pytest -rsx --cov ${PYTEST_EXTRA}
python -m pytest ${PYTEST_EXTRA} -rsx --cov

sloccount.sc: $(PYSOURCES) Makefile
sloccount --duplicates --wide --details $^ > $@
Expand Down
1 change: 1 addition & 0 deletions mypy-requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,4 @@ types-PyYAML
types-requests
types-setuptools
arvados-cwl-runner
flask
4 changes: 2 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ build-backend = "setuptools.build_meta"

[project]
name = "wes-service"
version = "4.0"
version = "5.0"
authors = [{name = "GA4GH Containers and Workflows task team", email = "[email protected]"}]
description = "GA4GH Workflow Execution Service reference implementation"
classifiers = [
Expand All @@ -23,7 +23,7 @@ classifiers = [
]
requires-python = ">=3.9"
dependencies = [
"connexion[swagger-ui] >= 2.0.2, < 3",
"connexion[swagger-ui,flask,uvicorn] >= 3, < 4",
"ruamel.yaml >= 0.15.78",
"schema-salad",
]
Expand Down
2 changes: 0 additions & 2 deletions test/test_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,6 @@ def setUp(self) -> None:
"--opt",
"runner=cwltool",
"--port=8080",
"--debug",
]
)
time.sleep(5)
Expand Down Expand Up @@ -304,7 +303,6 @@ def setUp(self) -> None:
os.path.abspath("wes_service/wes_service_main.py"),
"--backend=wes_service.arvados_wes",
"--port=8080",
"--debug",
]
)
self.client.auth = {
Expand Down
17 changes: 10 additions & 7 deletions wes_client/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ def fixpaths(d: Any) -> None:

def build_wes_request(
workflow_file: str, json_path: str, attachments: Optional[list[str]] = None
) -> list[tuple[str, Any]]:
) -> tuple[list[tuple[str, Any]], list[tuple[str, Any]]]:
"""
:param workflow_file: Path to cwl/wdl file. Can be http/https/file.
:param json_path: Path to accompanying json file.
Expand Down Expand Up @@ -157,10 +157,12 @@ def build_wes_request(
("workflow_type_version", wf_version),
]

workflow_attachments = []

if workflow_file.startswith("file://"):
if wfbase is None:
wfbase = os.path.dirname(workflow_file[7:])
parts.append(
workflow_attachments.append(
(
"workflow_attachment",
(os.path.basename(workflow_file[7:]), open(workflow_file[7:], "rb")),
Expand All @@ -182,9 +184,9 @@ def build_wes_request(
attach_f = urlopen(attachment) # nosec B310
relpath = os.path.basename(attach_f)

parts.append(("workflow_attachment", (relpath, attach_f)))
workflow_attachments.append(("workflow_attachment", (relpath, attach_f)))

return parts
return parts, workflow_attachments


def expand_globs(attachments: Optional[Union[list[str], str]]) -> set[str]:
Expand Down Expand Up @@ -275,11 +277,12 @@ def run(
:return: The body of the post result as a dictionary.
"""
attachments = list(expand_globs(attachments))
parts = build_wes_request(wf, jsonyaml, attachments)
parts, files = build_wes_request(wf, jsonyaml, attachments)
postresult = requests.post( # nosec B113
f"{self.proto}://{self.host}/ga4gh/wes/v1/runs",
files=parts,
headers=self.auth,
data=parts,
files=files,
# headers=self.auth,
)
return wes_response(postresult)

Expand Down
2 changes: 1 addition & 1 deletion wes_service/arvados_wes.py
Original file line number Diff line number Diff line change
Expand Up @@ -298,7 +298,7 @@ def RunWorkflow(
)

try:
tempdir, body = self.collect_attachments(cr["uuid"])
tempdir, body = self.collect_attachments(args, cr["uuid"])

workflow_engine_parameters = cast(
dict[str, Any], body.get("workflow_engine_parameters", {})
Expand Down
2 changes: 1 addition & 1 deletion wes_service/cwl_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ def ListRuns(

def RunWorkflow(self, **args: str) -> dict[str, str]:
"""Submit the workflow run request."""
tempdir, body = self.collect_attachments()
tempdir, body = self.collect_attachments(args)

run_id = uuid.uuid4().hex
job = Workflow(run_id)
Expand Down
34 changes: 30 additions & 4 deletions wes_service/toil_wes.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
"""Toil backed for the WES service."""

import errno
import json
import logging
import os
Expand Down Expand Up @@ -294,18 +297,41 @@ def getstate(self) -> tuple[str, int]:
logging.info("Workflow " + self.run_id + ": EXECUTOR_ERROR")
open(self.staterrorfile, "a").close()
return "EXECUTOR_ERROR", 255

# get the jobstore
with open(self.jobstorefile, "r") as f:
jobstore = f.read().rstrip()
if (
subprocess.run( # nosec B603
[
shutil.which("toil") or "toil",
"status",
"--failIfNotComplete",
self.jobstorefile,
jobstore,
]
).returncode
== 0
):
completed = True
# Get the PID of the running process
with open(self.pidfile, "r") as f:
pid = int(f.read())
try:
os.kill(pid, 0)
except OSError as e:
if e.errno == errno.ESRCH:
# Process is no longer running, could be completed
completed = True
# Reap zombie child processes in a non-blocking manner
# os.WNOHANG still raises an error if no child processes exist
try:
os.waitpid(pid, os.WNOHANG)
except OSError as e:
if e.errno != errno.ECHILD:
raise
else:
raise
# If no exception, process is still running
# We can't rely on toil status as the process may not have created the jobstore yet
if completed:
logging.info("Workflow " + self.run_id + ": COMPLETE")
open(self.statcompletefile, "a").close()
Expand Down Expand Up @@ -354,9 +380,9 @@ def ListRuns(
workflows = [{"run_id": w.run_id, "state": w.getstate()[0]} for w in wf] # NOQA
return {"workflows": workflows, "next_page_token": ""}

def RunWorkflow(self) -> dict[str, str]:
def RunWorkflow(self, **args: str) -> dict[str, str]:
"""Submit the workflow run request."""
tempdir, body = self.collect_attachments()
tempdir, body = self.collect_attachments(args)

run_id = uuid.uuid4().hex
job = ToilWorkflow(run_id)
Expand Down
68 changes: 26 additions & 42 deletions wes_service/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
import tempfile
from typing import Any, Callable, Optional

import connexion # type: ignore[import-untyped]
from werkzeug.utils import secure_filename


Expand Down Expand Up @@ -49,52 +48,37 @@ def log_for_run(self, run_id: Optional[str], message: str) -> None:
logging.info("Workflow %s: %s", run_id, message)

def collect_attachments(
self, run_id: Optional[str] = None
self, args: dict[str, Any], run_id: Optional[str] = None
) -> tuple[str, dict[str, str]]:
"""Stage all attachments to a temporary directory."""
tempdir = tempfile.mkdtemp()
body: dict[str, str] = {}
has_attachments = False
for k, ls in connexion.request.files.lists():
try:
for v in ls:
if k == "workflow_attachment":
sp = v.filename.split("/")
fn = []
for p in sp:
if p not in ("", ".", ".."):
fn.append(secure_filename(p))
dest = os.path.join(tempdir, *fn)
if not os.path.isdir(os.path.dirname(dest)):
os.makedirs(os.path.dirname(dest))
self.log_for_run(
run_id,
f"Staging attachment {v.filename!r} to {dest!r}",
)
v.save(dest)
has_attachments = True
body[k] = (
"file://%s" % tempdir
) # Reference to temp working dir.
elif k in ("workflow_params", "tags", "workflow_engine_parameters"):
content = v.read()
body[k] = json.loads(content.decode("utf-8"))
else:
body[k] = v.read().decode()
except Exception as e:
raise ValueError(f"Error reading parameter {k!r}: {e}") from e
for k, ls in connexion.request.form.lists():
try:
for v in ls:
if not v:
continue
if k in ("workflow_params", "tags", "workflow_engine_parameters"):
body[k] = json.loads(v)
else:
body[k] = v
except Exception as e:
raise ValueError(f"Error reading parameter {k!r}: {e}") from e

for k, v in args.items():
if k == "workflow_attachment":
for file in v or []:
sp = file.filename.split("/")
fn = []
for p in sp:
if p not in ("", ".", ".."):
fn.append(secure_filename(p))
dest = os.path.join(tempdir, *fn)
if not os.path.isdir(os.path.dirname(dest)):
os.makedirs(os.path.dirname(dest))
self.log_for_run(
run_id,
f"Staging attachment {file.filename!r} to {dest!r}",
)
file.save(dest)
has_attachments = True
body["workflow_attachment"] = (
"file://%s" % tempdir
) # Reference to temp working dir.
elif k in ("workflow_params", "tags", "workflow_engine_parameters"):
if v is not None:
body[k] = json.loads(v)
else:
body[k] = v
if "workflow_url" in body:
if ":" not in body["workflow_url"]:
if not has_attachments:
Expand Down
3 changes: 1 addition & 2 deletions wes_service/wes_service_main.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,6 @@ def get_parser() -> argparse.ArgumentParser:
help="Example: '--opt runner=cwltoil --opt extra=--logLevel=CRITICAL' "
"or '--opt extra=--workDir=/'. Accepts multiple values.",
)
parser.add_argument("--debug", action="store_true", default=False)
parser.add_argument("--version", action="store_true", default=False)
return parser

Expand All @@ -78,7 +77,7 @@ def main(argv: list[str] = sys.argv[1:]) -> None:

app = setup(args)

app.run(port=args.port, debug=args.debug)
app.run(port=args.port)


if __name__ == "__main__":
Expand Down