1+ """
2+ rohmu - rclone
3+
4+ Copyright (c) 2016 Ohmu Ltd
5+ See LICENSE for details
6+ """
7+ from ..errors import FileNotFoundFromStorageError , InvalidConfigurationError , StorageError
8+ from .base import BaseTransfer , get_total_memory , KEY_TYPE_PREFIX , KEY_TYPE_OBJECT , IterKeyItem
9+ import json
10+ import subprocess
11+ from io import BytesIO , StringIO
12+ import datetime # for general datetime object handling
13+ # import rfc3339 # for date object -> date string
14+ import iso8601 # for date string -> date object
15+
16+
17+ def calculate_chunk_size ():
18+ total_mem_mib = get_total_memory () or 0
19+ # At least 5 MiB, at most 524 MiB. Max block size used for hosts with ~300+ GB of memory
20+ return max (min (int (total_mem_mib / 600 ), 524 ), 120 ) * 1024 * 1024
21+
22+
23+ MULTIPART_CHUNK_SIZE = calculate_chunk_size ()
24+
25+
26+ def exec_cmd (cmd ):
27+ proc = subprocess .Popen (cmd , stdin = subprocess .PIPE , stdout = subprocess .PIPE , stderr = subprocess .PIPE )
28+ stdout , stderr = proc .communicate ()
29+ if proc .returncode != 0 :
30+ raise Exception ("cmd [%s], stdout [%s], error [%s], code [%s]" % (cmd , stdout , stderr , proc .returncode ))
31+ return stdout , stderr
32+
33+
34+ def exec_cmd_to_stdout (cmd ):
35+ proc = subprocess .Popen (cmd , stdin = subprocess .PIPE , stdout = subprocess .PIPE , stderr = subprocess .PIPE )
36+ return proc .stdout
37+
38+
39+ def exec_cmd_from_stdout (cmd , src_fd , progress_fn = None ):
40+ proc = subprocess .Popen (cmd , stdin = subprocess .PIPE , stdout = subprocess .PIPE , stderr = subprocess .PIPE )
41+ bytes_sent = 0
42+ while True :
43+ content = src_fd .read (MULTIPART_CHUNK_SIZE )
44+ if len (content ) == 0 :
45+ stdout , stderr = proc .communicate ()
46+ break
47+ proc .stdin .write (content )
48+ bytes_sent += len (content )
49+ if progress_fn :
50+ progress_fn (bytes_sent )
51+ if proc .returncode != 0 :
52+ raise Exception ("cmd [%s], stdout [%s], error [%s], code [%s]" % (cmd , stdout , stderr , proc .returncode ))
53+ return stdout , stderr
54+
55+
56+ def new_rclone_dest_key (dest , dest_path , key ):
57+ return "%s:%s/%s" % (dest , dest_path , key )
58+
59+
60+ class RCloneClient :
61+
62+ def __init__ (self , conf_path ):
63+ self .config_path = conf_path
64+ self .base_cmd = ["rclone" , "--config" , self .config_path ]
65+ pass
66+
67+ def new_cmd (self , keys ):
68+ cmd = self .base_cmd [:] + keys [:]
69+ return cmd
70+
71+ # List directories and objects in the path in JSON format.
72+ def head_object (self , key ):
73+ info = self .list_objects (key )
74+ if len (info ) == 0 :
75+ return None
76+ return info [0 ]
77+
78+ def get_object_size (self , key ):
79+ info = self .list_objects (key )
80+ if len (info ) == 0 :
81+ return 0
82+ return int (info [0 ]['Size' ])
83+
84+ # Remove the contents of path.
85+ def delete_object (self , key ):
86+ cmd = self .new_cmd (["deletefile" , key ])
87+ exec_cmd (cmd )
88+
89+ # List directories and objects in the path in JSON format.
90+ def list_objects (self , key , deep = False ):
91+ if deep :
92+ cmd = self .new_cmd (["lsjson" , key , "--recursive" ])
93+ else :
94+ cmd = self .new_cmd (["lsjson" , key ])
95+ stdout , stderr = exec_cmd (cmd )
96+ return json .loads (stdout )
97+
98+ # Copies standard input to file on remote.
99+ def put_object (self , src_fd , dest , progress_fn = None ):
100+ cmd = self .new_cmd (["rcat" , dest ])
101+ exec_cmd_from_stdout (cmd , src_fd , progress_fn )
102+
103+ # Copy files from source to dest, skipping already copied.
104+ def copy_object (self , src , dest ):
105+ cmd = self .new_cmd (["copyto" , src , dest ])
106+ exec_cmd (cmd )
107+
108+ # Concatenates any files and sends them to stdout.
109+ def get_object_stream (self , key ):
110+ cmd = self .new_cmd (["cat" , key ])
111+ fd = exec_cmd_to_stdout (cmd )
112+
113+ info = self .list_objects (key )
114+ if len (info ) == 0 :
115+ length = 0
116+ else :
117+ length = int (info [0 ]['Size' ])
118+ return fd , length
119+
120+ def get_object_content (self , key ):
121+ info = self .head_object (key )
122+ if info is None :
123+ return None , 0
124+ length = int (info ['Size' ])
125+
126+ cmd = self .new_cmd (["cat" , key ])
127+ stdout , stderr = exec_cmd (cmd )
128+ return stdout , length
129+
130+
131+ class RCloneTransfer (BaseTransfer ):
132+
133+ def __init__ (self ,
134+ remote_clone_config_path ,
135+ source ,
136+ destination ,
137+ destination_path ,
138+ prefix = None ):
139+ super ().__init__ (prefix = prefix )
140+ self .remote_clone_client = RCloneClient (remote_clone_config_path )
141+ self .source = source
142+ self .destination = destination
143+ self .destination_path = destination_path
144+ self .log .debug ("RCloneTransfer initialized" )
145+
146+ # data from file, and file is so big, need split file
147+ def store_file_object (self , key , fd , * , cache_control = None , metadata = None , mimetype = None , upload_progress_fn = None ):
148+ target_path = self .format_key_for_backend (key .strip ("/" ))
149+ metadata_path = target_path + ".metadata"
150+ self .log .debug ("Save file: %r, %r" , target_path , metadata_path )
151+
152+ k = new_rclone_dest_key (self .destination , self .destination_path , target_path )
153+ self .remote_clone_client .put_object (fd , k , upload_progress_fn )
154+
155+ bio = BytesIO (json .dumps (self .sanitize_metadata (metadata )).encode ())
156+ k = new_rclone_dest_key (self .destination , self .destination_path , metadata_path )
157+ self .remote_clone_client .put_object (bio , k )
158+
159+ # no use
160+ def store_file_from_disk (self , key , filepath , metadata = None , multipart = None , cache_control = None , mimetype = None ):
161+ target_path = self .format_key_for_backend (key .strip ("/" ))
162+ self .log .debug ("Save file from disk: %r" , target_path )
163+
164+ with open (filepath , "rb" ) as fp :
165+ self .store_file_object (key , fp , metadata = metadata , cache_control = cache_control , mimetype = mimetype )
166+
167+ # data from var string
168+ def store_file_from_memory (self , key , memstring , metadata = None , cache_control = None , mimetype = None ):
169+ target_path = self .format_key_for_backend (key .strip ("/" ))
170+ self .log .debug ("Save file from memory: %r" , target_path )
171+
172+ bio = BytesIO (memstring )
173+ self .store_file_object (key , bio , metadata = metadata , cache_control = cache_control , mimetype = mimetype )
174+
175+ @staticmethod
176+ def _skip_file_name (file_name ):
177+ return file_name .startswith ("." ) or file_name .endswith (".metadata" ) or ".metadata_tmp" in file_name
178+
179+ def iter_key (self , key , * , with_metadata = True , deep = False , include_key = False ):
180+ # add prefix for key
181+ target_path = self .format_key_for_backend (key .strip ("/" ))
182+ try :
183+ # get all dir and obj
184+ rclone_target_path = new_rclone_dest_key (self .destination , self .destination_path , target_path )
185+ response = self .remote_clone_client .list_objects (rclone_target_path , deep )
186+
187+ # check dir and obj
188+ for item in response :
189+ # skip file
190+ if self ._skip_file_name (item ['Path' ]):
191+ continue
192+
193+ # full file key
194+ # when object and bucket using same name, rclone select object first
195+ file_key = (target_path + "/" if len (target_path ) != 0 else "" ) + item ["Path" ]
196+ objs = self .remote_clone_client .list_objects (
197+ new_rclone_dest_key (self .destination , self .destination_path , file_key )
198+ )
199+ if len (objs ) == 0 :
200+ if include_key is False :
201+ continue
202+ file_key = target_path
203+
204+ # check dir
205+ if item ['IsDir' ] is True :
206+ yield IterKeyItem (
207+ type = KEY_TYPE_PREFIX ,
208+ value = file_key ,
209+ )
210+ continue
211+
212+ # check obj
213+ if with_metadata :
214+ try :
215+ metadata_path = (key .strip ("/" ) + "/" if len (key .strip ("/" )) != 0 else "" ) + item ["Path" ]
216+ metadata = self .get_metadata_for_key (metadata_path )
217+ except FileNotFoundFromStorageError as ex :
218+ self .log .debug ("get metadata file error %s" , ex )
219+ metadata = None
220+ pass
221+ else :
222+ metadata = None
223+
224+ yield IterKeyItem (
225+ type = KEY_TYPE_OBJECT ,
226+ value = {
227+ "last_modified" : iso8601 .parse_date (item ["ModTime" ]).astimezone (tz = datetime .timezone .utc ),
228+ "metadata" : metadata ,
229+ "name" : file_key ,
230+ "size" : item ["Size" ],
231+ },
232+ )
233+ except Exception as ex :
234+ self .log .debug ("itr_key error %s" , ex )
235+ return
236+
237+ def get_metadata_for_key (self , key , trailing_slash = False ):
238+ source_path = self .format_key_for_backend (key .strip ("/" ), trailing_slash = trailing_slash )
239+ metadata_path = source_path + ".metadata"
240+ self .log .debug ("Get metadata: %r" , metadata_path )
241+
242+ k = new_rclone_dest_key (self .destination , self .destination_path , metadata_path )
243+ stdout , length = self .remote_clone_client .get_object_content (k )
244+ if stdout is None :
245+ raise FileNotFoundFromStorageError (key )
246+ return json .loads (stdout )
247+
248+ # unit is Byte
249+ def get_file_size (self , key ):
250+ key = self .format_key_for_backend (key , remove_slash_prefix = True )
251+ self .log .debug ("Get file size: %r" , key )
252+
253+ k = new_rclone_dest_key (self .destination , self .destination_path , key )
254+ response = self .remote_clone_client .list_objects (k )
255+ if len (response ) == 0 :
256+ raise FileNotFoundFromStorageError (key )
257+
258+ k = new_rclone_dest_key (self .destination , self .destination_path , key )
259+ response = self .remote_clone_client .get_object_size (k )
260+ return response
261+
262+ def get_contents_to_stream (self , key ):
263+ target_key = self .format_key_for_backend (key , remove_slash_prefix = True )
264+ self .log .debug ("Get content to stream: %r" , target_key )
265+
266+ k = new_rclone_dest_key (self .destination , self .destination_path , target_key )
267+ response = self .remote_clone_client .list_objects (k )
268+ if len (response ) == 0 :
269+ raise FileNotFoundFromStorageError (key )
270+
271+ k = new_rclone_dest_key (self .destination , self .destination_path , target_key )
272+ response , _ = self .remote_clone_client .get_object_stream (k )
273+ metadata = self .get_metadata_for_key (key )
274+ return response , metadata
275+
276+ def get_contents_to_string (self , key ):
277+ response , metadata = self .get_contents_to_stream (key )
278+ return response .read (), metadata
279+
280+ def get_contents_to_fileobj (self , key , fileobj_to_store_to , * , progress_callback = None ):
281+ stream , metadata = self .get_contents_to_stream (key )
282+ length = self .get_file_size (key )
283+ self ._read_object_to_fileobj (fileobj_to_store_to , stream , length , cb = progress_callback )
284+ return metadata
285+
286+ # no use
287+ def get_contents_to_file (self , key , filepath_to_store_to , * , progress_callback = None ):
288+ with open (filepath_to_store_to , "wb" ) as fh :
289+ return self .get_contents_to_fileobj (key , fh )
290+
291+ def delete_key (self , key ):
292+ target_path = self .format_key_for_backend (key , remove_slash_prefix = True )
293+ metadata_path = target_path + ".metadata"
294+ self .log .debug ("Deleting key: %r, %r" , target_path , metadata_path )
295+
296+ k = new_rclone_dest_key (self .destination , self .destination_path , target_path )
297+ infos = self .remote_clone_client .list_objects (k )
298+ if len (infos ) == 0 :
299+ raise FileNotFoundFromStorageError (key )
300+
301+ self .remote_clone_client .delete_object (
302+ new_rclone_dest_key (self .destination , self .destination_path , target_path )
303+ )
304+ self .remote_clone_client .delete_object (
305+ new_rclone_dest_key (self .destination , self .destination_path , metadata_path )
306+ )
307+
308+ # for small file, just copy
309+ def copy_file (self , * , source_key , destination_key , metadata = None , ** _kwargs ):
310+ source_path = self .format_key_for_backend (source_key .strip ("/" ))
311+ destination_path = self .format_key_for_backend (destination_key .strip ("/" ))
312+ self .log .debug ("Copy file from %r -> %r" , source_path , destination_path )
313+
314+ k = new_rclone_dest_key (self .destination , self .destination_path , source_path )
315+ infos = self .remote_clone_client .list_objects (k )
316+ if len (infos ) == 0 :
317+ raise FileNotFoundFromStorageError (source_key )
318+
319+ self .remote_clone_client .copy_object (
320+ new_rclone_dest_key (self .destination , self .destination_path , source_path ),
321+ new_rclone_dest_key (self .destination , self .destination_path , destination_path )
322+ )
323+
324+ if metadata is None :
325+ metadata = self .get_metadata_for_key (source_key )
326+
327+ metadata_path = destination_path + ".metadata"
328+ self .log .debug ("Save metadata: %r" , metadata_path )
329+
330+ bio = BytesIO (json .dumps (self .sanitize_metadata (metadata )).encode ())
331+ k = new_rclone_dest_key (self .destination , self .destination_path , metadata_path )
332+ self .remote_clone_client .put_object (bio , k )
333+
334+ def _read_object_to_fileobj (self , fileobj , streaming_body , body_length , cb = None ):
335+ data_read = 0
336+ while data_read < body_length :
337+ read_amount = body_length - data_read
338+ if read_amount > MULTIPART_CHUNK_SIZE :
339+ read_amount = MULTIPART_CHUNK_SIZE
340+ data = streaming_body .read (read_amount )
341+ if len (data ) != read_amount :
342+ raise StorageError ("Rclone read data error, need %d but %d" % (read_amount , len (data )))
343+ fileobj .write (data )
344+ data_read += len (data )
345+ if cb :
346+ cb (data_read , body_length )
347+ if cb :
348+ cb (data_read , body_length )
0 commit comments