Skip to content

Commit 0ead7d2

Browse files
authored
[CAT-102] byte streams/buffered io streams (#117)
1 parent 09df0e7 commit 0ead7d2

File tree

3 files changed

+65
-18
lines changed

3 files changed

+65
-18
lines changed

indico/http/client.py

+30-8
Original file line numberDiff line numberDiff line change
@@ -82,11 +82,20 @@ def execute_request(self, request: HTTPRequest):
8282

8383
@contextmanager
8484
def _handle_files(self, req_kwargs):
85+
86+
streams = None
87+
# deepcopying buffers is not supported
88+
# so, remove "streams" before the deepcopy.
89+
if "streams" in req_kwargs:
90+
streams = req_kwargs["streams"].copy()
91+
del req_kwargs["streams"]
92+
8593
new_kwargs = deepcopy(req_kwargs)
94+
8695
files = []
8796
file_arg = {}
8897
dup_counts = {}
89-
if "files" in new_kwargs:
98+
if "files" in new_kwargs and new_kwargs["files"] is not None:
9099
for filepath in new_kwargs["files"]:
91100
path = Path(filepath)
92101
fd = path.open("rb")
@@ -99,19 +108,32 @@ def _handle_files(self, req_kwargs):
99108
file_arg[path.stem] = fd
100109
dup_counts[path.stem] = 1
101110

102-
new_kwargs["files"] = file_arg
111+
if streams is not None and len(streams) > 0:
112+
for filename in streams:
113+
# similar operation as above.
114+
stream = streams[filename]
115+
files.append(stream)
116+
if filename in dup_counts:
117+
file_arg[filename + f"({dup_counts[filename]})"] = stream
118+
dup_counts[filename] += 1
119+
else:
120+
file_arg[filename] = stream
121+
dup_counts[filename] = 1
122+
123+
new_kwargs["files"] = file_arg
124+
103125
yield new_kwargs
104126

105127
if files:
106128
[f.close() for f in files]
107129

108130
def _make_request(
109-
self,
110-
method: str,
111-
path: str,
112-
headers: dict = None,
113-
_refreshed=False,
114-
**request_kwargs,
131+
self,
132+
method: str,
133+
path: str,
134+
headers: dict = None,
135+
_refreshed=False,
136+
**request_kwargs,
115137
):
116138
logger.debug(
117139
f"[{method}] {path}\n\t Headers: {headers}\n\tRequest Args:{request_kwargs}"

indico/queries/storage.py

+12-5
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
1+
import io
12
import json
2-
from typing import List
3+
from typing import List, Dict
34
from indico.client.request import HTTPMethod, HTTPRequest, RequestChain
45
from indico.errors import IndicoRequestError, IndicoInputError
56

@@ -46,14 +47,20 @@ class UploadDocument(HTTPRequest):
4647
Used internally for uploading documents to indico platform for later processing
4748
4849
Args:
49-
filepaths (str): list of filepaths to upload
50+
files (str): A list of local filepaths to upload.
51+
streams (Dict[str, io.BufferedIOBase]): A dict of filenames to BufferedIOBase streams
52+
(any class that inherits BufferedIOBase is acceptable).
5053
5154
Returns:
5255
files: storage objects to be use for further processing requests E.G. Document extraction (implicitly called)
5356
"""
5457

55-
def __init__(self, files: List[str]):
56-
super().__init__(HTTPMethod.POST, "/storage/files/store", files=files)
58+
def __init__(self, files: List[str] = None, streams: Dict[str, io.BufferedIOBase] = None):
59+
60+
if (files is None and streams is None) or (files is not None and streams is not None):
61+
raise IndicoInputError("Must define one of files or streams, but not both.")
62+
63+
super().__init__(HTTPMethod.POST, "/storage/files/store", files=files, streams=streams)
5764

5865
def process_response(self, uploaded_files: List[dict]):
5966
files = [
@@ -124,5 +131,5 @@ def process_response(self, uploaded_files: List[dict]) -> List[str]:
124131
return urls
125132

126133

127-
# Alias to ensure backwards compatability
134+
# Alias to ensure backwards compatibility
128135
UploadImages = CreateStorageURLs

indico/queries/workflow.py

+23-5
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
1-
from typing import List, Union
1+
import io
2+
from typing import List, Union, Dict
23

34
from indico.client.request import GraphQLRequest, RequestChain, Debouncer
45
from indico.errors import IndicoError, IndicoInputError
@@ -249,6 +250,9 @@ class WorkflowSubmission(RequestChain):
249250
The format of the submission result file. One of:
250251
{SUBMISSION_RESULT_VERSIONS}
251252
If bundle is enabled, this must be version TWO or later.
253+
streams (Dict[str, io.BufferedIOBase]): List of filename keys mapped to streams
254+
for upload. Similar to files but mutually exclusive with files.
255+
Can take for example: io.BufferedReader, io.BinaryIO, or io.BytesIO.
252256
253257
Returns:
254258
List[int]: If `submission`, these will be submission ids.
@@ -266,18 +270,22 @@ def __init__(
266270
submission: bool = True,
267271
bundle: bool = False,
268272
result_version: str = None,
273+
streams: Dict[str, io.BufferedIOBase] = None
269274
):
270275
self.workflow_id = workflow_id
271276
self.files = files
272277
self.urls = urls
273278
self.submission = submission
274279
self.bundle = bundle
275280
self.result_version = result_version
281+
self.streams = streams.copy()
276282

277-
if not self.files and not self.urls:
278-
raise IndicoInputError("One of 'files' or 'urls' must be specified")
279-
elif self.files and self.urls:
280-
raise IndicoInputError("Only one of 'files' or 'urls' must be specified")
283+
if not self.files and not self.urls and not len(streams) > 0:
284+
raise IndicoInputError("One of 'files', 'streams', or 'urls' must be specified")
285+
elif self.files and len(self.streams) > 0:
286+
raise IndicoInputError("Only one of 'files' or 'streams' or 'urls' may be specified.")
287+
elif (self.files or len(streams) > 0) and self.urls:
288+
raise IndicoInputError("Only one of 'files' or 'streams' or 'urls' may be specified")
281289

282290
def requests(self):
283291
if self.files:
@@ -299,6 +307,16 @@ def requests(self):
299307
bundle=self.bundle,
300308
result_version=self.result_version,
301309
)
310+
elif len(self.streams) > 0:
311+
yield UploadDocument(streams=self.streams)
312+
yield _WorkflowSubmission(
313+
self.detailed_response,
314+
workflow_id=self.workflow_id,
315+
record_submission=self.submission,
316+
files=self.previous,
317+
bundle=self.bundle,
318+
result_version=self.result_version,
319+
)
302320

303321

304322
class WorkflowSubmissionDetailed(WorkflowSubmission):

0 commit comments

Comments
 (0)