-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathgcp.py
53 lines (46 loc) · 2.05 KB
/
gcp.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
import logging
import os
import time
import threading
from common import common
from google.cloud import storage
class Upload:
def gcs_upload_file(gcs_client,bucket,file_name,object_name):
blob = bucket.blob(object_name)
blob.upload_from_filename(file_name)
logging.info(f"upload successful at gcs://{bucket}/{object_name}")
if not file_name.endswith(".bin"):
logging.debug(f"deleting uploaded file {file_name}")
os.remove(file_name)
def gcs_upload(bucket,dir,topic_name,retry_upload_seconds,thread_count):
gcs_client = storage.client()
_bucket = gcs_client.get_bucket(bucket)
while True:
_topic_dir = os.path.join(dir, topic_name)
_count_partition_dirs = len(common.listDirs(_topic_dir))
_list = common.findFilesInFolder(_topic_dir)
if len(_list) > _count_partition_dirs and threading.active_count() <= thread_count:
for file_name in _list:
file_name = str(file_name)
file_size = os.path.getsize(file_name)
if ( file_size > 0 and file_name.endswith(".tar.gz")
or
file_size > 0 and file_name.endswith(".tar.gz.sha256")):
object_name = file_name.split(dir)[1]
t = threading.Thread(
target=gUpload.gcs_upload_file,
args=[gcs_client,bucket,file_name,object_name],
name="GCS Upload Threads"
).start()
else:
logging.info(f"gcs upload retry for new files in {retry_upload_seconds} seconds")
time.sleep(retry_upload_seconds)
class Download:
def gcs_list_blobs_with_prefix(gcs_client,bucket,start_offset=None,end_offset=None,prefix=None):
return gcs_client.list_blobs(
bucket,
prefix=prefix,
delimiter="/",
start_offset=start_offset,
end_offset=end_offset
)