1
+ import os
1
2
from typing import Callable , Optional , List , Tuple , Any , Dict
2
3
from io import BytesIO
3
4
from asyncio import Future
@@ -29,7 +30,10 @@ def __init__(
29
30
self ._upload_session = upload_session
30
31
self ._request_adapter = request_adapter
31
32
self .stream = stream
32
- self .file_size = stream .getbuffer ().nbytes
33
+ try :
34
+ self .file_size = stream .getbuffer ().nbytes
35
+ except AttributeError :
36
+ self .file_size = os .stat (stream .name ).st_size
33
37
self .max_chunk_size = max_chunk_size
34
38
cleaned_value = self .check_value_exists (
35
39
upload_session , 'get_next_expected_range' , ['next_expected_range' , 'NextExpectedRange' ]
@@ -98,7 +102,7 @@ def upload_session_expired(
98
102
interval = now - then
99
103
if not isinstance (interval , timedelta ):
100
104
raise ValueError ("Interval is not a timedelta" )
101
- if interval .total_seconds () < = 0 :
105
+ if interval .total_seconds () > = 0 :
102
106
return True
103
107
return False
104
108
@@ -115,13 +119,15 @@ async def upload(self, after_chunk_upload: Optional[Callable] = None):
115
119
process_next = session
116
120
# determine the range to be uploaded
117
121
# even when resuming existing upload sessions.
118
- range_parts = self .next_range [0 ].split ("-" ) if self .next_range else ['0' ]
122
+ #range_parts = self.next_range[0].split("-") if self.next_range else ['0']
123
+
124
+ range_parts = self .next_range [0 ].split ("-" ) if self .next_range else ['0' , '0' ]
119
125
end = min (int (range_parts [0 ]) + self .max_chunk_size - 1 , self .file_size )
120
126
uploaded_range = [range_parts [0 ], end ]
121
127
while self .chunks > 0 :
122
128
session = process_next
123
129
try :
124
- lfu_session : Optional [ LargeFileUploadSession ] = await session
130
+ lfu_session : LargeFileUploadSession = session
125
131
if lfu_session is None :
126
132
continue
127
133
next_range = lfu_session .next_expected_ranges
@@ -137,9 +143,9 @@ async def upload(self, after_chunk_upload: Optional[Callable] = None):
137
143
self .next_range = next_range [0 ] + "-"
138
144
process_next = await self .next_chunk (self .stream )
139
145
except Exception as error :
140
- logging .error (f "Error uploading chunk { error } " )
141
- raise # remove after manual testing
142
- self .chunks -= 1
146
+ logging .error ("Error uploading chunk %s" , error )
147
+ finally :
148
+ self .chunks -= 1
143
149
return session
144
150
145
151
@property
@@ -152,7 +158,6 @@ def next_range(self, value: Optional[str]) -> None:
152
158
153
159
async def next_chunk (self , file : BytesIO , range_start : int = 0 , range_end : int = 0 ) -> Future :
154
160
upload_url = self .get_validated_upload_url (self .upload_session )
155
-
156
161
if not upload_url :
157
162
raise ValueError ('The upload session URL must not be empty.' )
158
163
info = RequestInformation ()
@@ -177,15 +182,15 @@ async def next_chunk(self, file: BytesIO, range_start: int = 0, range_end: int =
177
182
end = min (end , self .max_chunk_size + start )
178
183
chunk_data = file .read (end - start + 1 )
179
184
info .headers = HeadersCollection ()
185
+ access_token = "<place_holder_pending CAE fix>"
180
186
181
187
info .headers .try_add ('Content-Range' , f'bytes { start } -{ end } /{ self .file_size } ' )
182
- # info.headers.try_add(**info.request_headers) what do we do if headers need to be passed
183
188
info .headers .try_add ('Content-Length' , str (len (chunk_data )))
184
- info .set_stream_content (BytesIO (chunk_data ))
189
+ info .headers .try_add ("Content-Type" , "application/octet-stream" )
190
+ info .headers .try_add ("Authorization" , f"Bearer { access_token } " )
191
+ info .set_stream_content (bytes (chunk_data )) # Convert chunk_data to bytes
185
192
error_map : Dict [str , int ] = {}
186
-
187
- parsable_factory : LargeFileUploadSession = self .upload_session
188
-
193
+ parsable_factory = LargeFileUploadSession
189
194
return await self .request_adapter .send_async (info , parsable_factory , error_map )
190
195
191
196
def get_file (self ) -> BytesIO :
0 commit comments