9
9
10
10
class Upload :
11
11
12
- def upload_file (blob_service_client , container_name , file_name , object_name ):
12
+ def upload_file (blob_service_client , container_name , file_name , file_object_name , sha_file_name , sha_file_object_name ):
13
13
try :
14
- blob_client = blob_service_client . get_blob_client (
15
- container_name , blob = object_name )
16
-
14
+ # upload tar file
15
+ blob_client_file = blob_service_client . get_blob_client (
16
+ container_name , blob = file_object_name )
17
17
with open (file_name , "rb" ) as data :
18
- blob_client .upload_blob (data )
19
-
18
+ blob_client_file .upload_blob (data )
20
19
logging .info (f"upload successful { file_name } " )
20
+ # upload sha file
21
+ blob_client_sha = blob_service_client .get_blob_client (
22
+ container_name , blob = sha_file_object_name )
23
+ with open (sha_file_name , "rb" ) as data :
24
+ blob_client_sha .upload_blob (data )
25
+ logging .info (f"upload successful { sha_file_name } " )
26
+ # remove both uploaded files
27
+ os .remove (file_name )
28
+ os .remove (sha_file_name )
21
29
22
- if not file_name .endswith (".bin" ):
23
- logging .debug (f"deleting uploaded file { file_name } " )
24
- os .remove (file_name )
25
30
except Exception as e :
26
31
logging .error (f"{ file_name } upload failed error { e } " )
27
32
33
+
28
34
def upload (connect_str , container_name , dir , topic_name , retry_upload_seconds , thread_count ):
29
35
"""Main function to initialize azure blob client and
30
36
based on checkpoint file for each partition.
@@ -35,26 +41,26 @@ def upload(connect_str, container_name, dir, topic_name, retry_upload_seconds, t
35
41
36
42
while True :
37
43
_topic_dir = os .path .join (dir , topic_name )
38
- _count_partition_dirs = len ( common .listDirs (_topic_dir ) )
39
- _list = common . findFilesInFolder ( _topic_dir )
40
- if len ( _list ) > _count_partition_dirs and threading .active_count () <= thread_count :
44
+ _list = common .findFilesInFolder (_topic_dir , pattern = "*.tar.gz" )
45
+ logging . debug ( f"pending files to upload { len ( _list ) } and number of active threads { threading . active_count () } " )
46
+ if threading .active_count () <= thread_count :
41
47
for file_name in _list :
42
- logging .info (file_name )
43
48
file_name = str (file_name )
44
- file_size = os .path .getsize (file_name )
45
- if file_size > 0 and file_name .endswith ((".tar.gz" , ".tar.gz.sha256" )):
46
- object_name = file_name .split (dir )[1 ]
49
+ file_object_name = file_name .split (dir )[1 ]
50
+ sha_file_name = file_name + ".sha256"
51
+ sha_file_object_name = file_object_name + ".sha256"
52
+ if common .isFileAndShaFileExist (file_name , sha_file_name ):
53
+ logging .debug (f"start upload of file { file_name } " )
47
54
threading .Thread (
48
55
target = Upload .upload_file ,
49
56
args = [blob_service_client , container_name ,
50
- file_name , object_name ],
57
+ file_name , file_object_name ,
58
+ sha_file_name , sha_file_object_name ],
51
59
name = f"Azure Upload Thread for { file_name } "
52
60
).start ()
53
- else :
54
- logging .info (
55
- f"Azure upload retry for new files in { retry_upload_seconds } seconds" )
56
- time .sleep (retry_upload_seconds )
57
61
62
+ logging .info (f"Azure upload retry for new files in { retry_upload_seconds } seconds" )
63
+ time .sleep (retry_upload_seconds )
58
64
59
65
class Download :
60
66
0 commit comments