Skip to content

Commit deb4b13

Browse files
authored
added multithreaded s3 upload functions.
1 parent cccd8af commit deb4b13

File tree

2 files changed

+27
-22
lines changed

2 files changed

+27
-22
lines changed

backup.py

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -106,12 +106,7 @@ def main():
106106
logging.debug(f"setting RETRY_UPLOAD_SECONDS to default 60 ")
107107
retry_upload_seconds = 60
108108

109-
_s3_upload = threading.Thread(
110-
target=Upload.s3_upload_files,
111-
args=[bucket,tmp_dir,topic_name,"*",retry_upload_seconds],
112-
name="S3 Upload"
113-
)
114-
_s3_upload.start()
109+
Upload.s3_upload(bucket,tmp_dir,topic_name,retry_upload_seconds,b.NUMBER_OF_KAFKA_THREADS + 1)
115110

116111
except KeyError as e:
117112
logging.error(f"unable to set s3 required variables {e}")

cloud.py

Lines changed: 26 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
from common import Common
77
import pathlib
88
import threading
9+
import more_itertools
910

1011
class Upload:
1112

@@ -17,25 +18,34 @@ def findFiles(dir,pattern="*"):
1718
_list.append(path)
1819
return _list
1920

20-
def s3_upload_files(bucket,dir,topic_name,pattern,retry_upload_seconds):
21+
def s3_upload_file(s3_client,bucket,file_name,object_name):
22+
try:
23+
response = s3_client.upload_file(file_name,bucket,object_name)
24+
logging.info(f"upload successful at s3://{bucket}/{object_name}")
25+
if not file_name.endswith(".bin"):
26+
logging.debug(f"deleting uploaded file {file_name}")
27+
os.remove(file_name)
28+
except ClientError as e:
29+
logging.error(f"{file_path} upload failed error {e}")
30+
31+
def s3_upload(bucket,dir,topic_name,retry_upload_seconds,thread_count):
2132
s3_client = boto3.client('s3')
33+
count = 0
2234
while True:
2335
_topic_dir = os.path.join(dir, topic_name)
2436
_count_partition_dirs = len(Common.listDirs(_topic_dir))
25-
_list = Upload.findFiles(_topic_dir,pattern)
26-
if len(_list) > _count_partition_dirs:
27-
for f in _list:
28-
f = str(f)
29-
if os.path.getsize(f) > 0:
30-
try:
31-
object_name = f.split(dir)[1]
32-
response = s3_client.upload_file(f,bucket,object_name)
33-
logging.info(f"upload successful at s3://{bucket}/{object_name}")
34-
if not f.endswith(".bin"):
35-
logging.debug(f"deleting uploaded file {f}")
36-
os.remove(f)
37-
except ClientError as e:
38-
logging.error(f"{file_path} upload failed error {e}")
37+
_list = Upload.findFiles(_topic_dir)
38+
if len(_list) > _count_partition_dirs and threading.active_count() <= thread_count:
39+
for file_name in _list:
40+
file_name = str(file_name)
41+
if os.path.getsize(file_name) > 0:
42+
object_name = file_name.split(dir)[1]
43+
t = threading.Thread(
44+
target=Upload.s3_upload_file,
45+
args=[s3_client,bucket,file_name,object_name],
46+
name="S3 Upload Threads"
47+
).start()
48+
count += 1
3949
else:
40-
logging.info("waiting for new files to be generated")
50+
logging.info(f"s3 upload retry for new files in {retry_upload_seconds} seconds")
4151
time.sleep(retry_upload_seconds)

0 commit comments

Comments
 (0)