|
4 | 4 | import tempfile
|
5 | 5 | from typing import Any, Callable, Optional
|
6 | 6 |
|
7 |
| -import connexion # type: ignore[import-untyped] |
8 | 7 | from werkzeug.utils import secure_filename
|
9 | 8 |
|
10 | 9 |
|
@@ -49,52 +48,37 @@ def log_for_run(self, run_id: Optional[str], message: str) -> None:
|
49 | 48 | logging.info("Workflow %s: %s", run_id, message)
|
50 | 49 |
|
51 | 50 | def collect_attachments(
|
52 |
| - self, run_id: Optional[str] = None |
| 51 | + self, args: dict[str, Any], run_id: Optional[str] = None |
53 | 52 | ) -> tuple[str, dict[str, str]]:
|
54 | 53 | """Stage all attachments to a temporary directory."""
|
55 | 54 | tempdir = tempfile.mkdtemp()
|
56 | 55 | body: dict[str, str] = {}
|
57 | 56 | has_attachments = False
|
58 |
| - for k, ls in connexion.request.files.lists(): |
59 |
| - try: |
60 |
| - for v in ls: |
61 |
| - if k == "workflow_attachment": |
62 |
| - sp = v.filename.split("/") |
63 |
| - fn = [] |
64 |
| - for p in sp: |
65 |
| - if p not in ("", ".", ".."): |
66 |
| - fn.append(secure_filename(p)) |
67 |
| - dest = os.path.join(tempdir, *fn) |
68 |
| - if not os.path.isdir(os.path.dirname(dest)): |
69 |
| - os.makedirs(os.path.dirname(dest)) |
70 |
| - self.log_for_run( |
71 |
| - run_id, |
72 |
| - f"Staging attachment {v.filename!r} to {dest!r}", |
73 |
| - ) |
74 |
| - v.save(dest) |
75 |
| - has_attachments = True |
76 |
| - body[k] = ( |
77 |
| - "file://%s" % tempdir |
78 |
| - ) # Reference to temp working dir. |
79 |
| - elif k in ("workflow_params", "tags", "workflow_engine_parameters"): |
80 |
| - content = v.read() |
81 |
| - body[k] = json.loads(content.decode("utf-8")) |
82 |
| - else: |
83 |
| - body[k] = v.read().decode() |
84 |
| - except Exception as e: |
85 |
| - raise ValueError(f"Error reading parameter {k!r}: {e}") from e |
86 |
| - for k, ls in connexion.request.form.lists(): |
87 |
| - try: |
88 |
| - for v in ls: |
89 |
| - if not v: |
90 |
| - continue |
91 |
| - if k in ("workflow_params", "tags", "workflow_engine_parameters"): |
92 |
| - body[k] = json.loads(v) |
93 |
| - else: |
94 |
| - body[k] = v |
95 |
| - except Exception as e: |
96 |
| - raise ValueError(f"Error reading parameter {k!r}: {e}") from e |
97 |
| - |
| 57 | + for k, v in args.items(): |
| 58 | + if k == "workflow_attachment": |
| 59 | + for file in v or []: |
| 60 | + sp = file.filename.split("/") |
| 61 | + fn = [] |
| 62 | + for p in sp: |
| 63 | + if p not in ("", ".", ".."): |
| 64 | + fn.append(secure_filename(p)) |
| 65 | + dest = os.path.join(tempdir, *fn) |
| 66 | + if not os.path.isdir(os.path.dirname(dest)): |
| 67 | + os.makedirs(os.path.dirname(dest)) |
| 68 | + self.log_for_run( |
| 69 | + run_id, |
| 70 | + f"Staging attachment {file.filename!r} to {dest!r}", |
| 71 | + ) |
| 72 | + file.save(dest) |
| 73 | + has_attachments = True |
| 74 | + body["workflow_attachment"] = ( |
| 75 | + "file://%s" % tempdir |
| 76 | + ) # Reference to temp working dir. |
| 77 | + elif k in ("workflow_params", "tags", "workflow_engine_parameters"): |
| 78 | + if v is not None: |
| 79 | + body[k] = json.loads(v) |
| 80 | + else: |
| 81 | + body[k] = v |
98 | 82 | if "workflow_url" in body:
|
99 | 83 | if ":" not in body["workflow_url"]:
|
100 | 84 | if not has_attachments:
|
|
0 commit comments