3333
3434from pghoard .common import BaseBackupFormat , StrEnum
3535from pghoard .rohmu import compat , dates , get_transfer , rohmufile
36- from pghoard .rohmu .errors import Error , InvalidConfigurationError
36+ from pghoard .rohmu .errors import ( Error , InvalidConfigurationError , MaybeRecoverableError )
3737
3838from . import common , config , logutil , version
3939from .postgres_command import PGHOARD_HOST , PGHOARD_PORT
@@ -640,6 +640,7 @@ def __init__(self, *, app_config, debug, site, pgdata, tablespaces, data_files:
640640 self .manager_class = multiprocessing .Manager if self ._process_count () > 1 else ThreadingManager
641641 self .max_stale_seconds = 120
642642 self .pending_jobs = set ()
643+ self .jobs_to_retry = set ()
643644 self .pgdata = pgdata
644645 # There's no point in spawning child processes if process count is 1
645646 self .pool_class = multiprocessing .Pool if self ._process_count () > 1 else multiprocessing .pool .ThreadPool
@@ -648,6 +649,7 @@ def __init__(self, *, app_config, debug, site, pgdata, tablespaces, data_files:
648649 self .sleep_fn = time .sleep
649650 self .tablespaces = tablespaces
650651 self .total_download_size = 0
652+ self .retry_per_file = {}
651653
652654 def fetch_all (self ):
653655 for retry in range (3 ):
@@ -656,7 +658,7 @@ def fetch_all(self):
656658 self ._setup_progress_tracking (manager )
657659 with self .pool_class (processes = self ._process_count ()) as pool :
658660 self ._queue_jobs (pool )
659- self ._wait_for_jobs_to_complete ()
661+ self ._wait_for_jobs_to_complete (pool )
660662 # Context manager does not seem to properly wait for the subprocesses to exit, let's join
661663 # the pool manually (close need to be called before joining)
662664 pool .close ()
@@ -738,15 +740,33 @@ def job_completed(self, key):
738740 if key in self .pending_jobs :
739741 self .pending_jobs .remove (key )
740742 self .completed_jobs .add (key )
743+ self .retry_per_file .pop (key , None )
741744
742745 def job_failed (self , key , exception ):
746+ if isinstance (exception , MaybeRecoverableError ):
747+ self .log .warning ("Got error which can be recoverable from chunk download %s" , exception )
748+ with self .lock :
749+ if key in self .pending_jobs :
750+ retries = self .retry_per_file .get (key , 0 ) + 1
751+ self .retry_per_file [key ] = retries
752+ self .pending_jobs .remove (key )
753+ if retries <= 2 :
754+ self .jobs_to_retry .add (key )
755+ return
756+ self .errors += 1
757+ self .completed_jobs .add (key )
758+ self .retry_per_file .pop (key , None )
759+ self .log .error ("Giving up on recoverable error: %s" , exception )
760+ return
761+
743762 self .log .error ("Got error from chunk download: %s" , exception )
744763 self .last_progress_ts = time .monotonic ()
745764 with self .lock :
746765 if key in self .pending_jobs :
747766 self .errors += 1
748767 self .pending_jobs .remove (key )
749768 self .completed_jobs .add (key )
769+ self .retry_per_file .pop (key , None )
750770
751771 def jobs_in_progress (self ):
752772 with self .lock :
@@ -788,8 +808,18 @@ def _write_status_output_to_file(self, output_file):
788808 }
789809 )
790810
791- def _wait_for_jobs_to_complete (self ):
811+ def _wait_for_jobs_to_complete (self , pool ):
792812 while self .jobs_in_progress ():
813+ to_queue = []
814+ with self .lock :
815+ if self .jobs_to_retry :
816+ for item in self .data_files :
817+ if item .id in self .jobs_to_retry :
818+ self .pending_jobs .add (item .id )
819+ self .jobs_to_retry .remove (item .id )
820+ to_queue .append (item )
821+ for item in to_queue :
822+ self ._queue_job (pool , item )
793823 self ._print_download_progress ()
794824 if self .status_output_file :
795825 self ._write_status_output_to_file (self .status_output_file )
@@ -910,12 +940,16 @@ def _fetch_delta_file(self, metadata, fetch_fn):
910940 )
911941
912942 def _fetch_and_extract_one_backup (self , metadata , file_size , fetch_fn ):
943+ # Force tar to use the C locale to match errors in stderr
944+ tar_env = os .environ .copy ()
945+ tar_env ["LANG" ] = "C"
913946 with subprocess .Popen (
914947 self ._build_tar_args (metadata ),
915948 bufsize = 0 ,
916949 stdin = subprocess .PIPE ,
917950 stdout = subprocess .DEVNULL ,
918- stderr = subprocess .PIPE
951+ stderr = subprocess .PIPE ,
952+ env = tar_env
919953 ) as tar :
920954 common .increase_pipe_capacity (tar .stdin , tar .stderr )
921955 sink = rohmufile .create_sink_pipeline (
@@ -939,7 +973,18 @@ def _fetch_and_extract_one_backup(self, metadata, file_size, fetch_fn):
939973 exit_code = tar .wait ()
940974 file_name = "<mem_bytes>" if isinstance (self .file_info , FileDataInfo ) else self .file_info .name
941975 if exit_code != 0 :
942- raise Exception ("tar exited with code {!r} for file {!r}, output: {!r}" .format (exit_code , file_name , output ))
976+ ex_message = "tar exited with code {!r} for file {!r}, output: {!r}" .format (exit_code , file_name , output )
977+ # Running multiple tar commands in parallel in the same
978+ # directory can lead to race conditions while creating the
979+ # intermediate directories.
980+ # In that case, try to recover from it.
981+ # See issue #452 and https://savannah.gnu.org/bugs/index.php?61015
982+ if exit_code == 2 and b"Cannot open: No such file or directory" in output :
983+ raise MaybeRecoverableError (ex_message )
984+ else :
985+ raise Exception (
986+ "tar exited with code {!r} for file {!r}, output: {!r}" .format (exit_code , file_name , output )
987+ )
943988 self .log .info ("Processing of %r completed successfully" , file_name )
944989
945990
0 commit comments