Skip to content

Commit a8015e0

Browse files
committed
added couple of docstrings
added gcp storage intial code
1 parent fe77928 commit a8015e0

File tree

4 files changed

+120
-23
lines changed

4 files changed

+120
-23
lines changed

cloud/aws.py

Lines changed: 40 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,15 @@
44
import os
55
import time
66
import threading
7-
from common import common
7+
from common import common, checkpoint
88

99
class Upload:
1010

1111
def s3_upload_file(s3_client,bucket,file_name,object_name):
12+
"""It will upload given `file_name` to given s3 `bucket`
13+
and `object_name` s3 path.
14+
"""
15+
1216
try:
1317
response = s3_client.upload_file(file_name,bucket,object_name)
1418
logging.info(f"upload successful at s3://{bucket}/{object_name}")
@@ -19,6 +23,12 @@ def s3_upload_file(s3_client,bucket,file_name,object_name):
1923
logging.error(f"{file_path} upload failed error {e}")
2024

2125
def s3_upload(bucket,dir,topic_name,retry_upload_seconds,thread_count):
26+
"""It will initialize s3 client and
27+
based on checkpoint file for each partition.
28+
It will call `s3_upload_file` function to upload.
29+
30+
It will run after every `retry_upload_seconds`"""
31+
2232
s3_client = boto3.client('s3')
2333
while True:
2434
_topic_dir = os.path.join(dir, topic_name)
@@ -43,27 +53,9 @@ def s3_upload(bucket,dir,topic_name,retry_upload_seconds,thread_count):
4353

4454
class Download:
4555

46-
def s3_read_checkpoint_partition(dir,topic,partition):
47-
try:
48-
logging.debug(os.path.join(dir,topic,partition,"checkpoint"))
49-
with open(os.path.join(dir,topic,partition,"checkpoint")) as c:
50-
line = c.readline().strip()
51-
_ck_file = line.split()[0]
52-
_total_files = line.split()[1]
53-
return {"checkpoint": _ck_file, "total_files": _total_files}
54-
except FileNotFoundError as e:
55-
logging.debug(e)
56-
return None
57-
58-
def s3_write_checkpoint_partition(dir,topic,partition,msg):
59-
try:
60-
with open(os.path.join(dir,topic,partition,"checkpoint"), "w") as cp:
61-
cp.write(msg)
62-
except TypeError as e:
63-
logging.error(e)
64-
logging.error(f"writing checkpoint {msg} for {topic}, {partition} is failed")
65-
6656
def s3_count_partitions(s3_client,bucket,topic):
57+
"""It will return number of objects in a given s3 bucket and s3 bucket path."""
58+
6759
try:
6860
return s3_client.list_objects_v2(
6961
Bucket=bucket,
@@ -75,6 +67,8 @@ def s3_count_partitions(s3_client,bucket,topic):
7567
exit(1)
7668

7769
def s3_list_files(s3_client,bucket,path):
70+
"""It will list all files for given s3 bucket and s3 bucket path."""
71+
7872
_list = []
7973
paginator = s3_client.get_paginator('list_objects_v2')
8074
operation_parameters = {'Bucket': bucket,'Prefix': path}
@@ -88,6 +82,21 @@ def s3_list_files(s3_client,bucket,path):
8882
return sorted(_list)
8983

9084
def s3_download_file(s3_client,bucket,object_path,file_path):
85+
"""It will download two files .tar.gz and .tar.gz.sha256 .
86+
87+
Parameters
88+
----------
89+
s3_client : boto3.client('s3')
90+
91+
bucket: str
92+
93+
object_path: str
94+
Description: path in s3 bucket
95+
96+
file_path: str
97+
Description: path from local filesystem
98+
"""
99+
91100
try:
92101
# donwload .tar.gz
93102
s3_client.download_file(bucket, object_path, file_path)
@@ -98,6 +107,13 @@ def s3_download_file(s3_client,bucket,object_path,file_path):
98107
logging.error(f"{file_path} failed with error {e}")
99108

100109
def s3_download(bucket,topic,tmp_dir,retry_download_seconds=60):
110+
"""It will initialize s3 client and
111+
based on checkpoint file for each partition.
112+
It will call `s3_download_file` function to download backup
113+
and backup sha file.
114+
115+
It will run after every `retry_download_seconds`"""
116+
101117
s3_client = boto3.client('s3')
102118
while True:
103119
_pc = Download.s3_count_partitions(s3_client,bucket,topic)
@@ -106,7 +122,8 @@ def s3_download(bucket,topic,tmp_dir,retry_download_seconds=60):
106122
os.makedirs(os.path.join(tmp_dir,topic,str(p)),exist_ok=True)
107123

108124
for _pt in range(_pc):
109-
_ck = Download.s3_read_checkpoint_partition(tmp_dir,topic,str(_pt))
125+
126+
_ck = checkpoint.read_checkpoint_partition(tmp_dir,topic,str(_pt)
110127
_partition_path = os.path.join(topic,str(_pt))
111128
_s3_partition_files = Download.s3_list_files(s3_client,bucket,_partition_path)
112129
if _ck is not None:
@@ -134,7 +151,7 @@ def s3_download(bucket,topic,tmp_dir,retry_download_seconds=60):
134151
Download.s3_download_file(s3_client,bucket,file,os.path.join(tmp_dir,file))
135152
if file.endswith(".tar.gz"):
136153
_ck['total_files'] += 1
137-
Download.s3_write_checkpoint_partition(tmp_dir,topic,str(_pt),file + " " + str(_ck['total_files']))
154+
checkpoint.write_checkpoint_partition(tmp_dir,topic,str(_pt),file + " " + str(_ck['total_files']))
138155

139156
if _pc == 0:
140157
logging.error(f"No Partitions found in given S3 path s3://{bucket}/{topic} retry seconds {retry_download_seconds}s")

cloud/gcp.py

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
import logging
2+
import os
3+
import time
4+
import threading
5+
from common import common
6+
from google.cloud import storage
7+
8+
class gUpload:
9+
10+
def gcs_upload_file(gcs_client,bucket,file_name,object_name):
11+
12+
blob = bucket.blob(object_name)
13+
blob.upload_from_filename(file_name)
14+
logging.info(f"upload successful at gcs://{bucket}/{object_name}")
15+
if not file_name.endswith(".bin"):
16+
logging.debug(f"deleting uploaded file {file_name}")
17+
os.remove(file_name)
18+
19+
def gcs_upload(bucket,dir,topic_name,retry_upload_seconds,thread_count):
20+
gcs_client = storage.client()
21+
_bucket = gcs_client.get_bucket(bucket)
22+
while True:
23+
_topic_dir = os.path.join(dir, topic_name)
24+
_count_partition_dirs = len(common.listDirs(_topic_dir))
25+
_list = common.findFilesInFolder(_topic_dir)
26+
if len(_list) > _count_partition_dirs and threading.active_count() <= thread_count:
27+
for file_name in _list:
28+
file_name = str(file_name)
29+
file_size = os.path.getsize(file_name)
30+
if ( file_size > 0 and file_name.endswith(".tar.gz")
31+
or
32+
file_size > 0 and file_name.endswith(".tar.gz.sha256")):
33+
object_name = file_name.split(dir)[1]
34+
t = threading.Thread(
35+
target=gUpload.gcs_upload_file,
36+
args=[gcs_client,bucket,file_name,object_name],
37+
name="GCS Upload Threads"
38+
).start()
39+
else:
40+
logging.info(f"gcs upload retry for new files in {retry_upload_seconds} seconds")
41+
time.sleep(retry_upload_seconds)
42+
43+
class gDownload:
44+
45+
def gcs_list_blobs_with_prefix(gcs_client,bucket,start_offset=None,end_offset=None,prefix=None):
46+
return gcs_client.list_blobs(
47+
bucket,
48+
prefix=prefix,
49+
delimiter="/",
50+
start_offset=start_offset,
51+
end_offset=end_offset
52+
)
53+

common/checkpoint.py

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
import os
2+
import logging
3+
4+
def read_checkpoint_partition(dir,topic,partition):
5+
"""It will read checkpoint file for given topic and partition."""
6+
7+
try:
8+
logging.debug(os.path.join(dir,topic,partition,"checkpoint"))
9+
with open(os.path.join(dir,topic,partition,"checkpoint")) as c:
10+
line = c.readline().strip()
11+
_ck_file = line.split()[0]
12+
_total_files = line.split()[1]
13+
return {"checkpoint": _ck_file, "total_files": _total_files}
14+
except FileNotFoundError as e:
15+
logging.error(e)
16+
return None
17+
18+
def write_checkpoint_partition(dir,topic,partition,msg):
19+
"""It will write checkpoint message for given topic and partition to given checkpoint file"""
20+
21+
try:
22+
with open(os.path.join(dir,topic,partition,"checkpoint"), "w") as cp:
23+
cp.write(msg)
24+
except TypeError as e:
25+
logging.error(e)
26+
logging.error(f"writing checkpoint {msg} for {topic}, {partition} is failed")

requirements.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,3 @@
11
confluent_kafka
22
boto3
3+
google-cloud-storage

0 commit comments

Comments
 (0)